Skip to content

Comments

TIMX 543 - keyset pagination for read methods#169

Merged
ghukill merged 1 commit intomainfrom
TIMX-543-keyset-pagination-for-reading
Sep 2, 2025
Merged

TIMX 543 - keyset pagination for read methods#169
ghukill merged 1 commit intomainfrom
TIMX-543-keyset-pagination-for-reading

Conversation

@ghukill
Copy link
Contributor

@ghukill ghukill commented Aug 29, 2025

Purpose and background context

For all read methods, the former approach was to perform a metadata query and store the entire results in memory, then loop through chunks of that metadata and build SQL queries to perform data retrieval. Even for metadata queries that may bring back 3-4 million results, this worked, but there is an upper limit.

Ideally, we would perform all of our queries -- metadata and data -- in chunks to ease memory pressure. And in some cases, this can increase performance.

This reworks the base read_batches_iter() method to perform smaller, chunked metadata queries. To paginate the results, instead of using the slow LIMIT / OFFSET approach, we use keyset pagination, which means we can look for values greater than a tuple of values that are ordered. This is often the preferred way to perform paginated querying when you have nicely ordered columns.

In support of this, we ask the method _iter_meta_chunks() to do substantially more work, but less work elsewhere.

In support of this, we also begin hashing the filename and run_id columns for ordering, providing almost an order magnitude speedup. The performance penalty for creating the hash is offset by the speedup of ordering integers versus very long strings.

The net effect is no changes to the input/ouput signatures of the read methods, but improved memory usage and performance.

How can a reviewer manually see the effects of these changes?

The following will perform some individual record retreival and bulk record retrieval, demonstrating that read operations are essentially as quick, though sometimes quicker, than the previous single metadata pull. It's difficutl to compare directly in this PR without branch hopping, but lots of testing has been done to confirm it's as quick but much more memory effecient.

1- Set Dev1 AWS TimdexManagers credentials in terminal and set env vars:

TDA_LOG_LEVEL=DEBUG
WARNING_ONLY_LOGGERS=asyncio,botocore,urllib3,s3transfer,boto3,MARKDOWN
TIMDEX_DATASET_LOCATION=s3://timdex-extract-dev-222053980223/dataset_scratch/prod-clone

2- Open Ipython shell

pipenv run ipython

3- Load a TIMDEXDataset instance:

import os

from timdex_dataset_api import TIMDEXDataset
from timdex_dataset_api.config import configure_dev_logger

configure_dev_logger()

td = TIMDEXDataset(os.environ["TIMDEX_DATASET_LOCATION"])

4- Perform a "needle in a haystack" query that finds all versions of three records with lots of versions:

df = td.read_dataframe(
    table="records",
    columns=["timdex_record_id", "transformed_record"],
    timdex_record_id=[
        "aspace:repositories-2-resources-1350",
        "libguides:guides-1385096",
        "alma:990024041390106761",
    ],
)
"""
DEBUG:timdex_dataset_api.dataset:read_batches_iter batch 1, yielded: 114 @ 16 records/second, total yielded: 114
DEBUG:timdex_dataset_api.dataset:read_batches_iter() elapsed: 6.78s
"""

Note that the logging has changed slightly, given what we know and when we know it, but this particularly query is retrieving 114 records from 114 parquet files! That is good and fast projection.

5- Perform some deep pagination, getting 500k timdex_record_id's for current alma records:

for batch in td.read_batches_iter(
        table="current_records",
        columns=["timdex_record_id"],
        source="alma",
        action="index",
        limit=500_000,
):
    pass
"""
DEBUG:timdex_dataset_api.dataset:read_batches_iter batch 1, yielded: 100000 @ 53022 records/second, total yielded: 100000
DEBUG:timdex_dataset_api.dataset:read_batches_iter batch 2, yielded: 100000 @ 25097 records/second, total yielded: 200000
DEBUG:timdex_dataset_api.dataset:read_batches_iter batch 3, yielded: 100000 @ 36010 records/second, total yielded: 300000
DEBUG:timdex_dataset_api.dataset:read_batches_iter batch 4, yielded: 100000 @ 17977 records/second, total yielded: 400000
DEBUG:timdex_dataset_api.dataset:read_batches_iter batch 5, yielded: 100000 @ 43077 records/second, total yielded: 500000
DEBUG:timdex_dataset_api.dataset:read_batches_iter() elapsed: 22.32s
"""

6- Increase the metadata <--> data join size and repeat the last deep pagination:

td.config.duckdb_join_batch_size = 500_000

for batch in td.read_batches_iter(
        table="current_records",
        columns=["timdex_record_id"],
        source="alma",
        action="index",
        limit=500_000,
):
    pass
"""
DEBUG:timdex_dataset_api.dataset:read_batches_iter batch 1, yielded: 500000 @ 56889 records/second, total yielded: 500000
DEBUG:timdex_dataset_api.dataset:read_batches_iter() elapsed: 10.36s
"""

Note the speed increase! There is also a memory increase, but it's not substantial. Roughly speaking, between the "join" size of metadata <--> data and the "read chunk" size which is how many records we stream back from DuckDB, we have quite a bit of control over the data flow that will serve us well going forward. Current default configurations are slightly conservative, but more than sufficient for our purposes.

Includes new or updated dependencies?

NO

Changes expectations for external applications?

NO

What are the relevant tickets?

Why these changes are being introduced:

For all read methods, the former approach was to perform a metadata query
and store the entire results in memory, then loop through chunks of that
metadata and build SQL queries to perform data retrieval.  Even for
metadata queries that may bring back 3-4 million results, this worked,
but there is an upper limit.

Ideally, we would perform all of our queries -- metadata and data -- in
chunks to ease memory pressure.  And in some cases, this can increase
performance.

How this addresses that need:

This reworks the base read_batches_iter() method to perform smaller,
chunked metadata queries.  To paginate the results, instead of using
the slow LIMIT / OFFSET approach, we use keyset pagination, which means
we can look for values greater than a tuple of values that are ordered.
This is often the preferred way to perform paginated querying when you
have nicely ordered columns.

In support of this, we also begin hashing the filename and run_id
columns for ordering, providing almost an order magnitude speedup.
The performance penalty for creating the hash is offset by the speedup
of ordering integers versus very long strings.

The net effect is no changes to the input/ouput signatures of the read
methods, but improved memory usage and performance.

Side effects of this change:
* None

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/TIMX-543
@ghukill ghukill force-pushed the TIMX-543-keyset-pagination-for-reading branch from efb3973 to f439f18 Compare August 29, 2025 20:22
@coveralls
Copy link

Pull Request Test Coverage Report for Build 17333521896

Details

  • 51 of 55 (92.73%) changed or added relevant lines in 3 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage increased (+0.02%) to 93.011%

Changes Missing Coverage Covered Lines Changed/Added Lines %
timdex_dataset_api/dataset.py 40 44 90.91%
Totals Coverage Status
Change from base Build 17242126793: 0.02%
Covered Lines: 519
Relevant Lines: 558

💛 - Coveralls

@ghukill ghukill requested a review from a team September 2, 2025 13:04
@ghukill ghukill marked this pull request as ready for review September 2, 2025 13:04
Comment on lines +493 to +499
# update keyset value using the last row from this chunk
last_row = meta_chunk_df.iloc[-1]
keyset_value = (
int(last_row.filename_hash),
int(last_row.run_id_hash),
int(last_row.run_record_offset),
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of the key to the new keyset pagination.

We have ordered the columns filename_hash, run_id_hash, run_record_offset for the batch, so by taking the last row from the batch, we can perform our next quere where (tuple of values) > (last batch tuple of values).

Because we do convert the batch results to a pandas dataframe, getting this last row from the batch is trivial.

Comment on lines +513 to +531
# build list of explicit parquet files to read from
filenames = list(meta_chunk_df["filename"].unique())
if self.location_scheme == "s3":
filenames = [
f"s3://{f.removeprefix('s3://')}" for f in filenames # type: ignore[union-attr]
]
parquet_list_sql = "[" + ",".join(f"'{f}'" for f in filenames) + "]"

# build run_record_offset WHERE clause to leverage row group pruning
rro_values = meta_chunk_df["run_record_offset"].unique()
rro_values.sort()
if len(rro_values) <= 1_000: # noqa: PLR2004
rro_clause = (
f"and run_record_offset in ({','.join(str(rro) for rro in rro_values)})"
)
else:
rro_clause = (
f"and run_record_offset between {rro_values[0]} and {rro_values[-1]}"
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very little meaningful change here. This is primarily code from sub-methods that were removed, where the misdirection of jumping around to them felt worse than just having it here.

Copy link

@ehanson8 ehanson8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good and works as expected!

@ghukill ghukill merged commit 69fd5d7 into main Sep 2, 2025
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants