Skip to content

Commit 0849ee3

Browse files
Update TIMDEXDataset.write method to only overwrite similarly named parquet files
Why these changes are being introduced: * Since the TIMDEXDataset partitions are now the [year, month, day] of the 'run_date', parquet files from different source runs will be written to the same partition. The previous configuration of existing_data_behavior="delete_matching" would result in the deletion of any existing parquet files from the partition directory with every source run, which is not the desired outcome. To support the new partitions, this updates the configuration existing_data_behavior="overwrite_or_ignore" which will ignore any existing data and will only overwrite files with the same filename. How this addresses that need: * Set existing_data_behavior="overwrite_or_ignore" in ds.write_dataset method call * Add unit tests to demonstrate updated existing_data_behavior Side effects of this change: * In the event the multiple runs are performed for the same 'source' and 'run-date', which is unlikely to occur, parquet files from both runs will exist in the partitioned directory. DatasetRecords are can still be uniquely identified via the 'run_id' column. Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-432
1 parent 76347b1 commit 0849ee3

File tree

2 files changed

+59
-28
lines changed

2 files changed

+59
-28
lines changed

tests/test_dataset_write.py

Lines changed: 53 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import math
44
import os
55
import re
6+
from unittest.mock import patch
67

78
import pyarrow.dataset as ds
89
import pytest
@@ -148,34 +149,63 @@ def test_dataset_write_schema_applied_to_dataset(new_dataset, sample_records_ite
148149
assert set(dataset.schema.names) == set(TIMDEX_DATASET_SCHEMA.names)
149150

150151

151-
def test_dataset_write_partition_deleted_when_written_to_again(
152-
new_dataset, sample_records_iter
153-
):
154-
"""This tests the existing_data_behavior="delete_matching" configuration when writing
155-
to a dataset."""
156-
# perform FIRST write to run_date="2024-12-01"
157-
written_files_1 = new_dataset.write(sample_records_iter(10))
152+
def test_dataset_write_partition_for_single_source(new_dataset, sample_records_iter):
153+
written_files = new_dataset.write(sample_records_iter(10))
154+
assert len(written_files) == 1
155+
assert os.path.exists(new_dataset.location)
156+
assert "year=2024/month=12/day=01" in written_files[0].path
158157

159-
# assert that files from first write are present at this time
160-
assert os.path.exists(written_files_1[0].path)
161158

162-
# perform unrelated write with new run_date to confirm this is untouched during delete
163-
written_files_x = new_dataset.write(
164-
generate_sample_records(7, run_date="2024-12-15"),
165-
)
159+
def test_dataset_write_partition_for_multiple_sources(new_dataset, sample_records_iter):
160+
# perform write for source="alma" and run_date="2024-12-01"
161+
written_files_source_a = new_dataset.write(sample_records_iter(10))
162+
new_dataset.reload()
166163

167-
# perform SECOND write to run_date="2024-12-01", expecting this to delete everything
168-
# under this combination of partitions (i.e. the first write)
169-
written_files_2 = new_dataset.write(sample_records_iter(10))
164+
assert os.path.exists(written_files_source_a[0].path)
165+
assert new_dataset.row_count == 10
170166

167+
# perform write for source="libguides" and run_date="2024-12-01"
168+
written_files_source_b = new_dataset.write(
169+
generate_sample_records(
170+
num_records=7, timdex_record_id_prefix="libguides", source="libguides"
171+
)
172+
)
171173
new_dataset.reload()
172174

173-
# assert 17 rows: second write for run_date="2024-12-01" @ 10 rows +
174-
# run_date="2024-12-15" @ 5 rows
175+
assert os.path.exists(written_files_source_b[0].path)
176+
assert os.path.exists(written_files_source_a[0].path)
175177
assert new_dataset.row_count == 17
176178

177-
# assert that files from first run_date="2024-12-01" are gone, second exist
178-
# and files from run_date="2024-12-15" also exist
179-
assert not os.path.exists(written_files_1[0].path)
180-
assert os.path.exists(written_files_2[0].path)
181-
assert os.path.exists(written_files_x[0].path)
179+
180+
def test_dataset_write_partition_ignore_existing_data(new_dataset, sample_records_iter):
181+
# perform two (2) writes for source="alma" and run_date="2024-12-01"
182+
written_files_source_a0 = new_dataset.write(sample_records_iter(10))
183+
written_files_source_a1 = new_dataset.write(sample_records_iter(10))
184+
new_dataset.reload()
185+
186+
# assert that both files exist and no overwriting occurs
187+
assert os.path.exists(written_files_source_a0[0].path)
188+
assert os.path.exists(written_files_source_a1[0].path)
189+
assert new_dataset.row_count == 20
190+
191+
192+
@patch("timdex_dataset_api.dataset.uuid.uuid4")
193+
def test_dataset_write_partition_overwrite_files_with_same_name(
194+
mock_uuid, new_dataset, sample_records_iter
195+
):
196+
"""This test is to demonstrate existing_data_behavior="overwrite_or_ignore".
197+
198+
It is extremely unlikely for the uuid.uuid4 method to generate duplicate values,
199+
so for testing purposes, this method is patched to return the same value
200+
and therefore generate similarly named files.
201+
"""
202+
mock_uuid.return_value = "abc"
203+
204+
# perform two (2) writes for source="alma" and run_date="2024-12-01"
205+
_ = new_dataset.write(sample_records_iter(10))
206+
written_files_source_a1 = new_dataset.write(sample_records_iter(7))
207+
new_dataset.reload()
208+
209+
# assert that only the second file exists and overwriting occurs
210+
assert os.path.exists(written_files_source_a1[0].path)
211+
assert new_dataset.row_count == 7

timdex_dataset_api/dataset.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,11 @@ def write(
177177
optimizations (e.g. batching) so that the calling context can focus on yielding
178178
data.
179179
180-
For write, the configuration existing_data_behavior="delete_matching" is used.
181-
This means that during write, if any pre-existing files are found for the exact
182-
combinations of partitions for that batch, those pre-existing files will be
183-
deleted. This effectively makes a write idempotent to the TIMDEX dataset.
180+
This method uses the configuration existing_data_behavior="overwrite_or_ignore",
181+
which will ignore any existing data and will overwrite files with the same name
182+
as the parquet file. Since a UUID is generated for each write via the
183+
basename_template, this effectively makes a write idempotent to the
184+
TIMDEX dataset.
184185
185186
A max_open_files=500 configuration is set to avoid AWS S3 503 error "SLOW_DOWN"
186187
if too many PutObject calls are made in parallel. Testing suggests this does not
@@ -209,7 +210,7 @@ def write(
209210
record_batches_iter,
210211
base_dir=self.source,
211212
basename_template="%s-{i}.parquet" % (str(uuid.uuid4())), # noqa: UP031
212-
existing_data_behavior="delete_matching",
213+
existing_data_behavior="overwrite_or_ignore",
213214
filesystem=self.filesystem,
214215
file_visitor=lambda written_file: self._written_files.append(written_file), # type: ignore[arg-type]
215216
format="parquet",

0 commit comments

Comments
 (0)