Skip to content

Commit 3a30eaa

Browse files
Define method for merging append deltas into static metadata db file
Why these changes are being introduced: * TDA requires a method to support regular merging of append deltas into the static metadata.duckdb database file as new rows and then deletes (the append deltas) once merged. How this addresses that need: * Add 'merge_append_deltas' method * Add unit test Side effects of this change: * It's worth noting that this method, when run, will delete the append delta parquet files that existed in the directory at the time of execution. Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-528
1 parent e3aedce commit 3a30eaa

File tree

5 files changed

+171
-0
lines changed

5 files changed

+171
-0
lines changed

tests/conftest.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""tests/conftest.py"""
22

3+
import shutil
34
from collections.abc import Iterator
45

56
import boto3
@@ -265,6 +266,26 @@ def timdex_metadata_with_deltas(
265266
return TIMDEXDatasetMetadata(timdex_dataset_with_runs.location)
266267

267268

269+
@pytest.fixture
270+
def timdex_metadata_merged_deltas(
271+
tmp_path, timdex_metadata_with_deltas, timdex_dataset_with_runs
272+
):
273+
"""TIMDEXDatasetMetadata after merging append deltas to static database file."""
274+
# copy directory of a dataset with runs
275+
dataset_location = str(tmp_path / "cloned_dataset_with_runs/")
276+
shutil.copytree(timdex_metadata_with_deltas.location, dataset_location)
277+
278+
# clone dataset with runs using new dataset location
279+
td = TIMDEXDataset(dataset_location, config=timdex_dataset_with_runs.config)
280+
281+
# clone metadata and merge append deltas
282+
metadata = TIMDEXDatasetMetadata(td.location)
283+
metadata.merge_append_deltas()
284+
metadata.refresh()
285+
286+
return metadata
287+
288+
268289
# ================================================================================
269290
# Utility Fixtures
270291
# ================================================================================

tests/test_metadata.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,18 @@
88

99
from timdex_dataset_api import TIMDEXDatasetMetadata
1010

11+
ORDERED_METADATA_COLUMN_NAMES = [
12+
"timdex_record_id",
13+
"source",
14+
"run_date",
15+
"run_type",
16+
"action",
17+
"run_id",
18+
"run_record_offset",
19+
"run_timestamp",
20+
"filename",
21+
]
22+
1123

1224
def test_tdm_init_no_metadata_file_warning_success(caplog, timdex_dataset_with_runs):
1325
TIMDEXDatasetMetadata(timdex_dataset_with_runs.location)
@@ -266,6 +278,49 @@ def test_tdm_current_records_most_recent_version(timdex_metadata_with_deltas):
266278
assert current_version.iloc[0]["run_id"] == most_recent.iloc[0]["run_id"]
267279

268280

281+
def test_tdm_merge_append_deltas_static_counts_match_records_count_before_merge(
282+
timdex_metadata_with_deltas, timdex_metadata_merged_deltas
283+
):
284+
static_count_merged_deltas = timdex_metadata_merged_deltas.conn.query(
285+
"""select count(*) as count from static_db.records;"""
286+
).fetchone()[0]
287+
assert static_count_merged_deltas == timdex_metadata_with_deltas.records_count
288+
289+
290+
def test_tdm_merge_append_deltas_adds_records_to_static_db(
291+
timdex_metadata_with_deltas, timdex_metadata_merged_deltas
292+
):
293+
append_deltas = timdex_metadata_with_deltas.conn.query(
294+
f"""
295+
select
296+
{','.join(ORDERED_METADATA_COLUMN_NAMES)}
297+
from metadata.append_deltas
298+
"""
299+
).to_df()
300+
301+
merged_static_db = timdex_metadata_merged_deltas.conn.query(
302+
f"""
303+
select
304+
{','.join(ORDERED_METADATA_COLUMN_NAMES)}
305+
from static_db.records
306+
"""
307+
).to_df()
308+
309+
assert set(map(tuple, append_deltas.to_numpy())).issubset(
310+
set(map(tuple, merged_static_db.to_numpy()))
311+
)
312+
313+
314+
def test_tdm_merge_append_deltas_deletes_append_deltas(
315+
timdex_metadata_with_deltas, timdex_metadata_merged_deltas
316+
):
317+
assert timdex_metadata_with_deltas.append_deltas_count != 0
318+
assert os.listdir(timdex_metadata_with_deltas.append_deltas_path)
319+
320+
assert timdex_metadata_merged_deltas.append_deltas_count == 0
321+
assert not os.listdir(timdex_metadata_merged_deltas.append_deltas_path)
322+
323+
269324
def test_tdm_prepare_duckdb_secret_and_extensions_home_env_var_set_and_valid(
270325
monkeypatch, tmp_path_factory, timdex_dataset_with_runs
271326
):

tests/test_s3client.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,22 @@ def test_split_s3_uri_invalid():
4242
client._split_s3_uri("timdex/path/to/file.txt")
4343

4444

45+
def test_list_objects(s3_bucket_mocked, tmp_path):
46+
client = S3Client()
47+
48+
# Create a test file
49+
test_file = tmp_path / "test.txt"
50+
test_file.write_text("test content")
51+
52+
# Upload the file
53+
s3_uri = "s3://timdex/metadata/append_deltas/test.txt"
54+
client.upload_file(test_file, s3_uri)
55+
56+
# Verify list of objects
57+
s3_prefix = "s3://timdex/metadata/append_deltas"
58+
assert client.list_objects(s3_prefix) == ["metadata/append_deltas/test.txt"]
59+
60+
4561
def test_upload_download_file(s3_bucket_mocked, tmp_path):
4662
"""Test upload_file and download_file methods."""
4763
client = S3Client()

timdex_dataset_api/metadata.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,79 @@ def _create_current_records_view(self, conn: DuckDBPyConnection) -> None:
467467
"""
468468
conn.execute(query)
469469

470+
def merge_append_deltas(self) -> None:
471+
"""Merge append deltas into the static metadata database file."""
472+
logger.info("merging append deltas into static metadata database file")
473+
474+
start_time = time.perf_counter()
475+
476+
s3_client = S3Client()
477+
478+
# get filenames of append deltas
479+
append_delta_filenames = (
480+
self.conn.query(
481+
"""
482+
select distinct(append_delta_filename)
483+
from metadata.append_deltas
484+
"""
485+
)
486+
.to_df()["append_delta_filename"]
487+
.to_list()
488+
)
489+
490+
if len(append_delta_filenames) == 0:
491+
logger.info("no append deltas found")
492+
return
493+
494+
logger.debug(f"{len(append_delta_filenames)} append deltas found")
495+
496+
with tempfile.TemporaryDirectory() as temp_dir:
497+
# create local copy of the static metadata database (static db) file
498+
local_db_path = str(Path(temp_dir) / self.metadata_database_filename)
499+
if self.location_scheme == "s3":
500+
s3_client.download_file(
501+
s3_uri=self.metadata_database_path, local_path=local_db_path
502+
)
503+
else:
504+
shutil.copy(src=self.metadata_database_path, dst=local_db_path)
505+
506+
# attach to local static db
507+
self.conn.execute(f"""attach '{local_db_path}' AS local_static_db;""")
508+
509+
# insert records from append deltas to local static db
510+
self.conn.execute(
511+
f"""
512+
insert into local_static_db.records
513+
select
514+
{','.join(ORDERED_METADATA_COLUMN_NAMES)}
515+
from metadata.append_deltas
516+
"""
517+
)
518+
519+
# detach from local static db
520+
self.conn.execute("""detach local_static_db;""")
521+
522+
# overwrite static db file with local version
523+
if self.location_scheme == "s3":
524+
s3_client.upload_file(
525+
local_db_path,
526+
self.metadata_database_path,
527+
)
528+
else:
529+
shutil.copy(src=local_db_path, dst=self.metadata_database_path)
530+
531+
# delete append deltas
532+
for append_delta_filename in append_delta_filenames:
533+
if self.location_scheme == "s3":
534+
s3_client.delete_file(s3_uri=append_delta_filename)
535+
else:
536+
os.remove(append_delta_filename)
537+
538+
logger.debug(
539+
"append deltas merged into the static metadata database file: "
540+
f"{self.metadata_database_path}, {time.perf_counter()-start_time}s"
541+
)
542+
470543
def write_append_delta_duckdb(self, filepath: str) -> None:
471544
"""Write an append delta for an ETL parquet file.
472545

timdex_dataset_api/utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,12 @@ def object_exists(self, s3_uri: str) -> bool:
5959
return False
6060
raise
6161

62+
def list_objects(self, s3_prefix: str) -> list[str]:
63+
bucket, _ = self._split_s3_uri(s3_prefix)
64+
objects = [obj.key for obj in self.resource.Bucket(bucket).objects.all()]
65+
logger.debug(f"Found {len(objects)} objects in {s3_prefix}: {objects}")
66+
return objects
67+
6268
def download_file(self, s3_uri: str, local_path: str | pathlib.Path) -> None:
6369
bucket, key = self._split_s3_uri(s3_uri)
6470
local_path = pathlib.Path(local_path)

0 commit comments

Comments
 (0)