Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

import pymongo.errors
from kubetester import kubetester
from tests import test_logger
Expand All @@ -12,15 +14,17 @@ class SampleMoviesSearchHelper:
db_name: str
col_name: str
archive_url: str
tools_pod: ToolsPod
tools_pod: Optional[ToolsPod]

def __init__(self, search_tester: SearchTester, tools_pod: ToolsPod):
def __init__(self, search_tester: SearchTester, tools_pod: ToolsPod = None):
self.search_tester = search_tester
self.tools_pod = tools_pod
self.db_name = "sample_mflix"
self.col_name = "movies"

def restore_sample_database(self):
if self.tools_pod is None:
raise ValueError("tools_pod is required for restore_sample_database, pass it to the constructor while creating SampleMoviesSearchHelper instance")
self.search_tester.mongorestore_from_url(
"https://atlas-education.s3.amazonaws.com/sample_mflix.archive",
f"{self.db_name}.*",
Expand Down Expand Up @@ -129,3 +133,38 @@ def wait_for_auto_emb_search_results():
sleep_time=1,
msg="Auto-embedding vector search query to return correct data",
)

def text_search_movies(self, query: str, index: str = "default", path: str = "title", limit: int = 10):
return list(self.search_tester.client[self.db_name][self.col_name].aggregate([
{"$search": {"index": index, "text": {"query": query, "path": path}}},
{"$limit": limit},
{"$project": {"_id": 0, "title": 1, "score": {"$meta": "searchScore"}}},
]))

def wildcard_search_movies(self, wildcard: str = "*", index: str = "default"):
return list(self.search_tester.client[self.db_name][self.col_name].aggregate([
{
"$search": {
"index": index,
"wildcard": {"query": wildcard, "path": "title", "allowAnalyzedField": True},
}
},
{"$project": {"_id": 0, "title": 1}},
]))

# get_shard_document_counts returns the documents counts in each shard for a specific collection.
# It is supposed to be called after sharding a collection and making sure that
# the chunks have been distributed properly to the shards.
def get_shard_document_counts(self) -> dict[str, int]:
"""Get per-shard document counts for the collection.

Returns:
Dict mapping shard name to document count stored in that shard.
"""
db = self.search_tester.client[self.db_name]
stats = db.command("collStats", self.col_name)
shard_counts = {}
for shard_name, shard_stats in stats["shards"].items():
shard_counts[shard_name] = shard_stats["count"]
logger.info(f"Shard {shard_name}: {shard_stats['count']} documents")
return shard_counts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from typing import Optional, Self

import pymongo
import pymongo.errors

import kubetester
from kubetester.mongotester import MongoTester
from pymongo.operations import SearchIndexModel
Expand Down Expand Up @@ -74,6 +77,37 @@ def for_sharded(
)
return cls(conn_str, use_ssl=use_ssl, ca_path=ca_path)

# shard_and_distribute_collection tries to implement mongnosh helper `sh.shardAndDistributeCollection` method
# which is equivalent to running `shardCollection` and `reshardCollection` consecutively. It means that
# immediately after the shard is created, resharding would happen and the collection data would be distributed
# in different shards using chunks. Balancer is not really involved here initially because resharding handles the
# chunk creation and distribution. The command is synchronous which means it will only return when the chunks are
# distributed properly among shards.
def shard_and_distribute_collection(self, database_name: str, collection_name: str):
ns = f"{database_name}.{collection_name}"
try:
self.client[database_name][collection_name].create_index([("_id", pymongo.HASHED)])
self.client.admin.command(
"shardCollection",
ns,
key={"_id": "hashed"},
unique=False,
)

self.client.admin.command(
"reshardCollection",
ns,
key={"_id": "hashed"},
forceRedistribution=True, # require 8.0+
)

logger.info(f"{collection_name} collection sharded and chunks distributed using the method shardAndDistributeCollection")
except pymongo.errors.OperationFailure as e:
if "already sharded" in str(e) or e.code == 20: # AlreadyInitialized for sharding
logger.info(f"{collection_name} collection already sharded")
else:
raise

def mongorestore_from_url(self, archive_url: str, ns_include: str, tools_pod: ToolsPod):
"""Run mongorestore from a URL using the tools pod.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from tests import test_logger
from tests.common.mongodb_tools_pod import mongodb_tools_pod
from tests.common.search.search_tester import SearchTester
from tests.common.search.movies_search_helper import SampleMoviesSearchHelper
from tests.conftest import get_default_operator, get_issuer_ca_filepath
from tests.search.om_deployment import get_ops_manager

Expand Down Expand Up @@ -495,48 +496,32 @@ def test_search_restore_sample_database(mdb: MongoDB, tools_pod: mongodb_tools_p
@mark.e2e_search_sharded_external_mongod_single_mongot
def test_search_shard_collections(mdb: MongoDB):
search_tester = get_admin_search_tester(mdb, use_ssl=True)
client = search_tester.client
admin_db = client.admin
sample_mflix_db = client["sample_mflix"]

# Enable sharding on database
try:
admin_db.command("enableSharding", "sample_mflix")
logger.info("Sharding enabled on sample_mflix database")
except pymongo.errors.OperationFailure as e:
if (
"already enabled" in str(e) or e.code == 23 or e.code == 59
): # AlreadyInitialized or CommandNotFound (MongoDB 8.0+)
logger.info("Sharding already enabled on sample_mflix")
else:
raise

# Shard movies collection
try:
sample_mflix_db["movies"].create_index([("_id", pymongo.HASHED)])
admin_db.command("shardCollection", "sample_mflix.movies", key={"_id": "hashed"})
logger.info("movies collection sharded")
except pymongo.errors.OperationFailure as e:
if "already sharded" in str(e) or e.code == 20: # AlreadyInitialized for sharding
logger.info("movies collection already sharded")
else:
raise

# Shard embedded_movies collection
try:
sample_mflix_db["embedded_movies"].create_index([("_id", pymongo.HASHED)])
admin_db.command("shardCollection", "sample_mflix.embedded_movies", key={"_id": "hashed"})
logger.info("embedded_movies collection sharded")
except pymongo.errors.OperationFailure as e:
if "already sharded" in str(e) or e.code == 20: # AlreadyInitialized for sharding
logger.info("embedded_movies collection already sharded")
else:
raise
search_tester.shard_and_distribute_collection("sample_mflix", "movies")

logger.info("Collections sharded and chunks are distributed")


@mark.e2e_search_sharded_external_mongod_single_mongot
def test_verify_documents_count_in_shards(mdb: MongoDB):
search_tester = get_admin_search_tester(mdb, use_ssl=True)
movies_helper = SampleMoviesSearchHelper(search_tester)

total_docs = search_tester.client["sample_mflix"]["movies"].count_documents({})
logger.info(f"Total documents in movies collection: {total_docs}")

shard_counts = movies_helper.get_shard_document_counts()

assert len(shard_counts) == SHARD_COUNT, (
f"Expected {SHARD_COUNT} shards, found {len(shard_counts)}"
)
for shard_name, count in shard_counts.items():
assert count > 0, f"Shard {shard_name} has 0 documents, data was not distributed"

# Wait for balancer to distribute chunks
# TODO: execute mdb command to wait for the balancer
time.sleep(10)
logger.info("Collections sharded and balanced")
shard_total = sum(shard_counts.values())
assert shard_total == total_docs, (
f"Sum of shard counts ({shard_total}) != total documents ({total_docs})"
)
logger.info(f"Document distribution verified: {shard_counts}, total: {shard_total}")


@mark.e2e_search_sharded_external_mongod_single_mongot
Expand All @@ -556,18 +541,11 @@ def test_search_wait_for_search_index_ready(mdb: MongoDB):
@mark.e2e_search_sharded_external_mongod_single_mongot
def test_search_assert_search_query(mdb: MongoDB):
search_tester = get_user_search_tester(mdb, use_ssl=True)
movies_helper = SampleMoviesSearchHelper(search_tester)

def execute_search():
try:
results = list(
search_tester.client["sample_mflix"]["movies"].aggregate(
[
{"$search": {"index": "default", "text": {"query": "star wars", "path": "title"}}},
{"$limit": 10},
{"$project": {"_id": 0, "title": 1, "score": {"$meta": "searchScore"}}},
]
)
)
results = movies_helper.text_search_movies("star wars")

result_count = len(results)
logger.info(f"Search returned {result_count} results")
Expand All @@ -587,38 +565,31 @@ def execute_search():
@mark.e2e_search_sharded_external_mongod_single_mongot
def test_search_verify_results_from_all_shards(mdb: MongoDB):
search_tester = get_user_search_tester(mdb, use_ssl=True)
movies_collection = search_tester.client["sample_mflix"]["movies"]

movies_helper = SampleMoviesSearchHelper(search_tester)
# Get total document count
total_docs = movies_collection.count_documents({})
total_docs = search_tester.client["sample_mflix"]["movies"].count_documents({})
logger.info(f"Total documents in collection: {total_docs}")

# Execute wildcard search to get all documents
results = list(
movies_collection.aggregate(
[
{
"$search": {
"index": "default",
"wildcard": {"query": "*", "path": "title", "allowAnalyzedField": True},
}
},
{"$project": {"_id": 0, "title": 1}},
]
)
)

search_count = len(results)
logger.info(f"Search through mongos returned {search_count} documents")

# Verify search returns all documents (or close to it - some tolerance for timing)
# TODO: verify 100% of documents, perhaps clone the movies collection to have sharded and unsharded queries to compare
assert search_count > 0, "Search returned no results"
assert (
search_count >= total_docs * 0.9
), f"Search returned {search_count} but collection has {total_docs} (expected >= 90%)"
# we have a document in our movies collection whose title is `$`, that's it. And because
# of that Lucene doesn't tokenize that document and as a result the respective entry is not
# made/found in the Lucene Inverted index and that's where the wildcard query looks for data.
# That's why we are expecting 1 less document because that one untokenzed data is not going
# to be found ever in inverted index.
expected_docs = total_docs - 1

def execute_all_docs_search():
# Execute wildcard search to get all documents
results = movies_helper.wildcard_search_movies()
search_count = len(results)
logger.info(f"Search through mongos returned {search_count} documents")

if search_count == expected_docs:
return True, f""
else:
return False, f"Search query for all documents returned {search_count} documents, expected were {expected_docs}"

logger.info(f"Search results verified: {search_count}/{total_docs} documents from all shards")
run_periodically(execute_all_docs_search, timeout=120, sleep_time=5, msg="search query for all docs")
logger.info(f"Search results for all documents verified.")


def get_admin_search_tester(mdb: MongoDB, use_ssl: bool = False) -> SearchTester:
Expand Down