Skip to content

Commit e1c0c6a

Browse files
Merge pull request #15 from MITLibraries/TIMX-432-rework-dataset-partitions
Rework dataset partitions to only year, month, day
2 parents 5769260 + 8a30ca3 commit e1c0c6a

File tree

8 files changed

+390
-436
lines changed

8 files changed

+390
-436
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,3 +156,6 @@ cython_debug/
156156

157157
# PyCharm
158158
.idea/
159+
160+
# VSCode
161+
.vscode

Pipfile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ verify_ssl = true
44
name = "pypi"
55

66
[packages]
7+
attrs = "*"
78
boto3 = "*"
89
duckdb = "*"
910
pandas = "*"
@@ -14,15 +15,14 @@ black = "*"
1415
boto3-stubs = {version = "*", extras = ["s3"]}
1516
coveralls = "*"
1617
ipython = "*"
18+
moto = "*"
1719
mypy = "*"
20+
pandas-stubs = "*"
1821
pre-commit = "*"
22+
pytest-mock = "*"
1923
pyarrow-stubs = "*"
2024
pytest = "*"
2125
ruff = "*"
2226
setuptools = "*"
23-
pandas-stubs = "*"
24-
moto = "*"
25-
pytest-mock = "*"
26-
2727
[requires]
2828
python_version = "3.12"

Pipfile.lock

Lines changed: 246 additions & 230 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/conftest.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,7 @@ def sample_records_iter_without_partitions():
5151

5252
def _records_iter(num_records):
5353
return generate_sample_records(
54-
num_records,
55-
source=None,
56-
run_date=None,
57-
run_type=None,
58-
action=None,
59-
run_id=None,
54+
num_records, run_date="invalid run-date", year=None, month=None, day=None
6055
)
6156

6257
return _records_iter

tests/test_dataset_write.py

Lines changed: 86 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,24 @@
11
# ruff: noqa: S105, S106, SLF001, PLR2004, PD901, D209, D205
2-
3-
import datetime
42
import math
53
import os
4+
import re
5+
from datetime import UTC, datetime
6+
from unittest.mock import patch
67

78
import pyarrow.dataset as ds
89
import pytest
910

11+
from tests.utils import generate_sample_records
1012
from timdex_dataset_api.dataset import (
1113
MAX_ROWS_PER_FILE,
1214
TIMDEX_DATASET_SCHEMA,
1315
DatasetNotLoadedError,
1416
TIMDEXDataset,
1517
)
16-
from timdex_dataset_api.exceptions import InvalidDatasetRecordError
1718
from timdex_dataset_api.record import DatasetRecord
1819

1920

20-
def test_dataset_record_serialization():
21+
def test_dataset_record_init():
2122
values = {
2223
"timdex_record_id": "alma:123",
2324
"source_record": b"<record><title>Hello World.</title></record>",
@@ -26,38 +27,35 @@ def test_dataset_record_serialization():
2627
"run_date": "2024-12-01",
2728
"run_type": "full",
2829
"action": "index",
29-
"run_id": "abc123",
30+
"run_id": "000-111-aaa-bbb",
3031
}
31-
dataset_record = DatasetRecord(**values)
32-
assert dataset_record.to_dict() == values
32+
record = DatasetRecord(**values)
33+
assert record
34+
assert (record.year, record.month, record.day) == (
35+
"2024",
36+
"12",
37+
"01",
38+
)
3339

3440

35-
def test_dataset_record_serialization_with_partition_values_provided():
36-
dataset_record = DatasetRecord(
37-
timdex_record_id="alma:123",
38-
source_record=b"<record><title>Hello World.</title></record>",
39-
transformed_record=b"""{"title":["Hello World."]}""",
40-
)
41-
partition_values = {
42-
"source": "alma",
43-
"run_date": "2024-12-01",
44-
"run_type": "daily",
45-
"action": "index",
46-
"run_id": "000-111-aaa-bbb",
47-
}
48-
assert dataset_record.to_dict(partition_values=partition_values) == {
41+
def test_dataset_record_init_with_invalid_run_date_raise_error():
42+
values = {
4943
"timdex_record_id": "alma:123",
5044
"source_record": b"<record><title>Hello World.</title></record>",
5145
"transformed_record": b"""{"title":["Hello World."]}""",
52-
"source": "alma",
53-
"run_date": "2024-12-01",
54-
"run_type": "daily",
46+
"source": "libguides",
47+
"run_date": "-12-01",
48+
"run_type": "full",
5549
"action": "index",
5650
"run_id": "000-111-aaa-bbb",
5751
}
52+
with pytest.raises(
53+
ValueError, match=re.escape("time data '-12-01' does not match format '%Y-%m-%d'")
54+
):
55+
DatasetRecord(**values)
5856

5957

60-
def test_dataset_record_serialization_missing_partition_raise_error():
58+
def test_dataset_record_serialization():
6159
values = {
6260
"timdex_record_id": "alma:123",
6361
"source_record": b"<record><title>Hello World.</title></record>",
@@ -66,14 +64,22 @@ def test_dataset_record_serialization_missing_partition_raise_error():
6664
"run_date": "2024-12-01",
6765
"run_type": "full",
6866
"action": "index",
69-
"run_id": None, # <------ missing partition here
67+
"run_id": "abc123",
7068
}
7169
dataset_record = DatasetRecord(**values)
72-
with pytest.raises(
73-
InvalidDatasetRecordError,
74-
match="Partition values are missing: run_id",
75-
):
76-
assert dataset_record.to_dict() == values
70+
assert dataset_record.to_dict() == {
71+
"timdex_record_id": "alma:123",
72+
"source_record": b"<record><title>Hello World.</title></record>",
73+
"transformed_record": b"""{"title":["Hello World."]}""",
74+
"source": "libguides",
75+
"run_date": datetime(2024, 12, 1).astimezone(UTC),
76+
"run_type": "full",
77+
"action": "index",
78+
"run_id": "abc123",
79+
"year": "2024",
80+
"month": "12",
81+
"day": "01",
82+
}
7783

7884

7985
def test_dataset_write_records_to_new_dataset(new_dataset, sample_records_iter):
@@ -134,52 +140,6 @@ def test_dataset_write_to_multiple_locations_raise_error(sample_records_iter):
134140
timdex_dataset.write(sample_records_iter(10))
135141

136142

137-
def test_dataset_write_mixin_partition_values_used(
138-
new_dataset, sample_records_iter_without_partitions
139-
):
140-
partition_values = {
141-
"source": "alma",
142-
"run_date": "2024-12-01",
143-
"run_type": "daily",
144-
"action": "index",
145-
"run_id": "000-111-aaa-bbb",
146-
}
147-
_written_files = new_dataset.write(
148-
sample_records_iter_without_partitions(10),
149-
partition_values=partition_values,
150-
)
151-
new_dataset.reload()
152-
153-
# load as pandas dataframe and assert column values
154-
df = new_dataset.dataset.to_table().to_pandas()
155-
row = df.iloc[0]
156-
assert row.source == partition_values["source"]
157-
assert row.run_date == datetime.date(2024, 12, 1)
158-
assert row.run_type == partition_values["run_type"]
159-
assert row.action == partition_values["action"]
160-
assert row.action == partition_values["action"]
161-
162-
163-
def test_dataset_write_schema_partitions_correctly_ordered(
164-
new_dataset, sample_records_iter
165-
):
166-
written_files = new_dataset.write(
167-
sample_records_iter(10),
168-
partition_values={
169-
"source": "alma",
170-
"run_date": "2024-12-01",
171-
"run_type": "daily",
172-
"run_id": "000-111-aaa-bbb",
173-
"action": "index",
174-
},
175-
)
176-
file = written_files[0]
177-
assert (
178-
"/source=alma/run_date=2024-12-01/run_type=daily"
179-
"/run_id=000-111-aaa-bbb/action=index/" in file.path
180-
)
181-
182-
183143
def test_dataset_write_schema_applied_to_dataset(new_dataset, sample_records_iter):
184144
new_dataset.write(sample_records_iter(10))
185145

@@ -194,67 +154,63 @@ def test_dataset_write_schema_applied_to_dataset(new_dataset, sample_records_ite
194154
assert set(dataset.schema.names) == set(TIMDEX_DATASET_SCHEMA.names)
195155

196156

197-
def test_dataset_write_partition_deleted_when_written_to_again(
198-
new_dataset, sample_records_iter
199-
):
200-
"""This tests the existing_data_behavior="delete_matching" configuration when writing
201-
to a dataset."""
202-
partition_values = {
203-
"source": "alma",
204-
"run_date": "2024-12-01",
205-
"run_type": "daily",
206-
"action": "index",
207-
"run_id": "000-111-aaa-bbb",
208-
}
157+
def test_dataset_write_partition_for_single_source(new_dataset, sample_records_iter):
158+
written_files = new_dataset.write(sample_records_iter(10))
159+
assert len(written_files) == 1
160+
assert os.path.exists(new_dataset.location)
161+
assert "year=2024/month=12/day=01" in written_files[0].path
209162

210-
# perform FIRST write to run_date="2024-12-01"
211-
written_files_1 = new_dataset.write(
212-
sample_records_iter(10),
213-
partition_values=partition_values,
214-
)
215163

216-
# assert that files from first write are present at this time
217-
assert os.path.exists(written_files_1[0].path)
164+
def test_dataset_write_partition_for_multiple_sources(new_dataset, sample_records_iter):
165+
# perform write for source="alma" and run_date="2024-12-01"
166+
written_files_source_a = new_dataset.write(sample_records_iter(10))
167+
new_dataset.reload()
218168

219-
# perform unrelated write with new run_date to confirm this is untouched during delete
220-
new_partition_values = partition_values.copy()
221-
new_partition_values["run_date"] = "2024-12-15"
222-
new_partition_values["run_id"] = "222-333-ccc-ddd"
223-
written_files_x = new_dataset.write(
224-
sample_records_iter(7),
225-
partition_values=new_partition_values,
226-
)
169+
assert os.path.exists(written_files_source_a[0].path)
170+
assert new_dataset.row_count == 10
227171

228-
# perform SECOND write to run_date="2024-12-01", expecting this to delete everything
229-
# under this combination of partitions (i.e. the first write)
230-
written_files_2 = new_dataset.write(
231-
sample_records_iter(10),
232-
partition_values=partition_values,
172+
# perform write for source="libguides" and run_date="2024-12-01"
173+
written_files_source_b = new_dataset.write(
174+
generate_sample_records(
175+
num_records=7, timdex_record_id_prefix="libguides", source="libguides"
176+
)
233177
)
234-
235178
new_dataset.reload()
236179

237-
# assert 17 rows: second write for run_date="2024-12-01" @ 10 rows +
238-
# run_date="2024-12-15" @ 5 rows
180+
assert os.path.exists(written_files_source_b[0].path)
181+
assert os.path.exists(written_files_source_a[0].path)
239182
assert new_dataset.row_count == 17
240183

241-
# assert that files from first run_date="2024-12-01" are gone, second exist
242-
# and files from run_date="2024-12-15" also exist
243-
assert not os.path.exists(written_files_1[0].path)
244-
assert os.path.exists(written_files_2[0].path)
245-
assert os.path.exists(written_files_x[0].path)
246184

185+
def test_dataset_write_partition_ignore_existing_data(new_dataset, sample_records_iter):
186+
# perform two (2) writes for source="alma" and run_date="2024-12-01"
187+
written_files_source_a0 = new_dataset.write(sample_records_iter(10))
188+
written_files_source_a1 = new_dataset.write(sample_records_iter(10))
189+
new_dataset.reload()
247190

248-
def test_dataset_write_missing_partitions_raise_error(new_dataset, sample_records_iter):
249-
missing_partition_values = {
250-
"source": "libguides",
251-
"run_date": None,
252-
"run_type": None,
253-
"action": None,
254-
"run_id": None,
255-
}
256-
with pytest.raises(InvalidDatasetRecordError, match="Partition values are missing"):
257-
_ = new_dataset.write(
258-
sample_records_iter(10),
259-
partition_values=missing_partition_values,
260-
)
191+
# assert that both files exist and no overwriting occurs
192+
assert os.path.exists(written_files_source_a0[0].path)
193+
assert os.path.exists(written_files_source_a1[0].path)
194+
assert new_dataset.row_count == 20
195+
196+
197+
@patch("timdex_dataset_api.dataset.uuid.uuid4")
198+
def test_dataset_write_partition_overwrite_files_with_same_name(
199+
mock_uuid, new_dataset, sample_records_iter
200+
):
201+
"""This test is to demonstrate existing_data_behavior="overwrite_or_ignore".
202+
203+
It is extremely unlikely for the uuid.uuid4 method to generate duplicate values,
204+
so for testing purposes, this method is patched to return the same value
205+
and therefore generate similarly named files.
206+
"""
207+
mock_uuid.return_value = "abc"
208+
209+
# perform two (2) writes for source="alma" and run_date="2024-12-01"
210+
_ = new_dataset.write(sample_records_iter(10))
211+
written_files_source_a1 = new_dataset.write(sample_records_iter(7))
212+
new_dataset.reload()
213+
214+
# assert that only the second file exists and overwriting occurs
215+
assert os.path.exists(written_files_source_a1[0].path)
216+
assert new_dataset.row_count == 7

timdex_dataset_api/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from timdex_dataset_api.dataset import TIMDEXDataset
44
from timdex_dataset_api.record import DatasetRecord
55

6-
__version__ = "0.2.0"
6+
__version__ = "0.3.0"
77

88
__all__ = [
99
"DatasetRecord",

0 commit comments

Comments
 (0)