Skip to content

Commit 227b6b3

Browse files
committed
Remove join_util, add JoinManager tests (wip)
1 parent 54939ac commit 227b6b3

File tree

4 files changed

+109
-2124
lines changed

4 files changed

+109
-2124
lines changed

pygeoapi/api/joins.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@
3030
from typing import Any
3131
from datetime import datetime, timedelta, timezone
3232

33-
from pygeoapi import l10n, join_util
33+
from pygeoapi import l10n
34+
from pygeoapi.join.manager import (
35+
JoinManager, JoinSourceNotFoundError, JoinSourceMissingError
36+
)
3437
from pygeoapi.api import (
3538
APIRequest, API, SYSTEM_LOCALE, FORMAT_TYPES,
3639
F_JSON, F_JSONLD, F_HTML, HTTPStatus

pygeoapi/join/manager.py

Lines changed: 105 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -175,18 +175,24 @@ def _build_refs(self):
175175
with self._db() as db:
176176
for file in self.source_dir.iterdir():
177177
if file.is_file() and _SOURCE_FILE_PATTERN.match(file.name):
178-
with open(file, 'r') as f:
179-
data = json.load(f)
180-
doc = {
181-
'id': data['id'],
182-
'collectionId': data['collectionId'],
183-
'timeStamp': data['timeStamp'],
184-
'joinSource': data['joinSource'],
185-
'ref': str(file)
186-
}
187-
q = Query()
188-
db.upsert(doc, q.id == data['id'] & q.collectionId == data['collectionId']) # noqa
189-
result.setdefault(data['collectionId'], {})[data['id']] = doc # noqa
178+
with FileLock(file.with_suffix(file.suffix + '.lock')):
179+
with open(file, 'r') as f:
180+
try:
181+
data = json.load(f)
182+
doc = {
183+
'id': data['id'],
184+
'collectionId': data['collectionId'],
185+
'timeStamp': data['timeStamp'],
186+
'joinSource': data['joinSource'],
187+
'ref': str(file)
188+
}
189+
except Exception as e:
190+
# Ignore file if not valid JSON
191+
LOGGER.debug(str(e), exc_info=True)
192+
continue
193+
q = Query()
194+
db.upsert(doc, (q.id == data['id']) & (q.collectionId == data['collectionId'])) # noqa
195+
result.setdefault(data['collectionId'], {})[data['id']] = doc # noqa
190196
return result
191197

192198
@staticmethod
@@ -204,34 +210,17 @@ def _delete_source(path: Path, silent: bool = False) -> bool:
204210
LOGGER.debug(f'file {path} already removed')
205211
return True
206212

207-
try:
208-
# Acquire exclusive lock before deletion
209-
# This prevents readers from starting while we delete
210-
lock = FileLock(path)
211-
212-
with lock.acquire(timeout=10):
213-
# Remove file
214-
try:
215-
path.unlink(missing_ok=True)
216-
LOGGER.debug(f'removed join source file: {path}')
217-
except Exception as e:
218-
LOGGER.warning(f'failed to remove join source {path}: {e}')
219-
if not silent:
220-
raise
221-
else:
222-
return False
223-
finally:
224-
# Remove lock file
225-
lock_file = path.with_suffix(path.suffix + '.lock')
226-
lock_file.unlink(missing_ok=True)
227-
228-
except Exception as e:
229-
LOGGER.error(f'Error during file deletion: {e}',
230-
exc_info=True)
231-
if not silent:
232-
raise
233-
else:
234-
return False
213+
with FileLock(path.with_suffix(path.suffix + '.lock')):
214+
# Remove file
215+
try:
216+
path.unlink(missing_ok=True)
217+
LOGGER.debug(f'removed join source file: {path}')
218+
except Exception as e:
219+
LOGGER.warning(f'failed to remove join source {path}: {e}')
220+
if not silent:
221+
raise
222+
else:
223+
return False
235224

236225
return True
237226

@@ -252,7 +241,7 @@ def _cleanup_sources(self):
252241
# and output as tuple (timestamp, id, ref)
253242
source_items = sorted(
254243
[(util.str_to_datetime(info['timeStamp']), info['id'],
255-
info['ref']) for info in sources.values()],
244+
Path(info['ref'])) for info in sources.values()],
256245
key=itemgetter(0)
257246
)
258247

@@ -262,16 +251,16 @@ def _cleanup_sources(self):
262251
if now - timestamp <= max_age:
263252
continue
264253
if self._delete_source(ref, True):
265-
db.remove(q.collectionId == collection_id & q.id == source_id) # noqa
254+
db.remove((q.collectionId == collection_id) & (q.id == source_id)) # noqa
266255
LOGGER.debug(f'removed stale source: {ref}')
267256
else:
268257
LOGGER.warning(f'could not remove stale source: {ref}') # noqa
269258

270259
# pass 2: limit by max_files (if configured)
271260
if 0 < self.max_files < len(sources):
272-
for _, ref in list(reversed(source_items))[:self.max_files]: # noqa
261+
for _, source_id, ref in list(reversed(source_items))[:self.max_files]: # noqa
273262
if self._delete_source(ref, True):
274-
db.remove(q.collectionId == collection_id & q.id == source_id) # noqa
263+
db.remove((q.collectionId == collection_id) & (q.id == source_id)) # noqa
275264
LOGGER.debug(f'removed stale source: {ref}')
276265
else:
277266
LOGGER.warning(f'could not remove stale source: {ref}') # noqa
@@ -288,6 +277,38 @@ def _make_source_path(self, join_id: str) -> Path:
288277
json_file = self.source_dir / f'table-{join_id}.json'
289278
return json_file
290279

280+
def _find_source_path(self, collection_id: str, join_id: str) -> Path:
281+
"""
282+
Finds the path to a join source file on disk.
283+
Raises a JoinSourceNotFoundError if the source is not found.
284+
Raises a JoinSourceMissingError if the source reference is missing.
285+
286+
:param collection_id: Collection identifier
287+
:param join_id: Join source identifier
288+
289+
:returns: `Path` instance to the join source file
290+
"""
291+
with self._db() as db:
292+
q = Query()
293+
result = db.get((q.id == join_id) &
294+
(q.collectionId == collection_id))
295+
if not result:
296+
raise JoinSourceNotFoundError(
297+
f'join source {join_id} not found for collection {collection_id}') # noqa
298+
299+
file_path = Path(result['ref'])
300+
301+
# Verify file still exists
302+
if not file_path.is_file():
303+
# Clean up orphaned database entry
304+
LOGGER.warning(f'Join source file missing: {file_path}')
305+
db.remove((q.id == join_id) &
306+
(q.collectionId == collection_id))
307+
raise JoinSourceMissingError(
308+
f'join source {join_id} for collection {collection_id} was removed') # noqa
309+
310+
return file_path
311+
291312
@staticmethod
292313
def _valid_id(join_id: Any) -> bool:
293314
"""
@@ -308,6 +329,23 @@ def process_csv(self, collection_id: str,
308329
Processes the CSV form data and stores the result as a JSON file
309330
in a temporary directory.
310331
332+
Example response:
333+
{
334+
"id": "13b40adb-aef3-4f6b-8d32-acf3ab082d2d",
335+
"timeStamp": "2025-12-10T12:26:17.542928Z",
336+
"collectionId": "cities",
337+
"collectionKey": "id",
338+
"joinSource": "city_data.csv",
339+
"joinKey": "city_id",
340+
"joinFields": ["city_name", "population"],
341+
"numberOfRows": 50,
342+
"data": {
343+
"12345": ["Amsterdam", "1100000"],
344+
"67890": ["Rotterdam", "650000"]
345+
}
346+
}
347+
348+
311349
:param collection_id: collection name to apply join source to
312350
:param collection_provider: feature collection provider
313351
:param form_data: parameters dict (from request form data)
@@ -364,8 +402,9 @@ def process_csv(self, collection_id: str,
364402
LOGGER.debug('Reading CSV data from stream')
365403
try:
366404
# Wrap binary stream in TextIOWrapper for reading
367-
# TODO: support other encodings
368-
text_stream = TextIOWrapper(csv_data.buffer, encoding='utf-8')
405+
# TODO: support other encodings (in OGC API - Joins spec)
406+
text_stream = TextIOWrapper(csv_data.buffer,
407+
encoding='utf-8', errors='replace')
369408
all_lines = text_stream.readlines()
370409
num_lines = len(all_lines)
371410

@@ -471,10 +510,14 @@ def process_csv(self, collection_id: str,
471510
"data": join_data
472511
}
473512

513+
# Lazily clean up any stale sources
514+
self._cleanup_sources()
515+
474516
# Store the output as JSON file named 'table-{uuid}.json'
475517
json_file = self._make_source_path(source_id)
476-
with open(json_file, 'w', encoding='utf-8') as f:
477-
json.dump(output, f, indent=4)
518+
with FileLock(json_file.with_suffix(json_file.suffix + '.lock')):
519+
with open(json_file, 'w', encoding='utf-8') as f:
520+
json.dump(output, f, indent=4)
478521

479522
# Write source file reference to TinyDB for lookup
480523
with self._db() as db:
@@ -487,9 +530,6 @@ def process_csv(self, collection_id: str,
487530
}
488531
db.insert(doc)
489532

490-
# Lazily clean up any stale sources
491-
self._cleanup_sources()
492-
493533
return output
494534

495535
def list_sources(self, collection_id: str) -> dict:
@@ -499,7 +539,7 @@ def list_sources(self, collection_id: str) -> dict:
499539
500540
:param collection_id: name of feature collection
501541
502-
:returns: list of dict with source references
542+
:returns: dict[str, dict] with references for each source ID
503543
"""
504544
with self._db() as db:
505545
q = Query()
@@ -527,6 +567,7 @@ def read_join_source(self, collection_id: str, join_id: str) -> dict:
527567
"""
528568
Read specific join source metadata.
529569
Raises a JoinSourceNotFoundError if the source is not found.
570+
Raises a JoinSourceMissingError if the source reference is missing.
530571
531572
:param collection_id: Collection identifier
532573
:param join_id: Join source identifier
@@ -535,32 +576,15 @@ def read_join_source(self, collection_id: str, join_id: str) -> dict:
535576
if not self._valid_id(join_id):
536577
raise ValueError('invalid join source ID')
537578

538-
with self._db() as db:
539-
q = Query()
540-
result = db.get((q.id == join_id) &
541-
(q.collectionId == collection_id))
542-
if not result:
543-
raise JoinSourceNotFoundError(
544-
f'join source {join_id} not found for collection {collection_id}') # noqa
545-
546-
file_path = Path(result['ref'])
547-
548-
# Verify file still exists
549-
if not file_path.is_file():
550-
# Clean up orphaned database entry
551-
LOGGER.warning(f'Join source file missing: {file_path}')
552-
db.remove((q.id == join_id) &
553-
(q.collectionId == collection_id))
554-
raise JoinSourceMissingError(
555-
f'join source {join_id} for collection {collection_id} was removed') # noqa
579+
# This may raise JoinSourceNotFoundError or JoinSourceMissingError
580+
source_path = self._find_source_path(collection_id, join_id)
556581

557-
# Read full JSON source and return document
558-
lock = FileLock(file_path)
559-
with lock.acquire(timeout=5):
560-
with open(file_path, 'r', encoding='utf-8') as f:
561-
source_dict = json.load(f)
582+
# Read full JSON source and return document
583+
with FileLock(source_path.with_suffix(source_path.suffix + '.lock')):
584+
with open(source_path, 'r', encoding='utf-8') as f:
585+
source_dict = json.load(f)
562586

563-
return source_dict
587+
return source_dict
564588

565589
def perform_join(self, features: dict, collection_id: str, join_id: str):
566590
"""
@@ -617,7 +641,7 @@ def remove_source(self, collection_id: str, join_id: str) -> bool:
617641
raise ValueError('invalid join source ID')
618642

619643
try:
620-
source = self.read_join_source(collection_id, join_id)
644+
source_path = self._find_source_path(collection_id, join_id)
621645
except JoinSourceNotFoundError:
622646
# If the join source was not found, we should respond with a 404
623647
return False
@@ -626,9 +650,9 @@ def remove_source(self, collection_id: str, join_id: str) -> bool:
626650
return True
627651

628652
# Remove the JSON file from disk
629-
deleted = self._delete_source(Path(source['ref']))
653+
deleted = self._delete_source(source_path)
630654

631-
# Remove reference
655+
# Remove reference (clean up orphan)
632656
if deleted:
633657
with self._db() as db:
634658
q = Query()
@@ -639,10 +663,10 @@ def remove_source(self, collection_id: str, join_id: str) -> bool:
639663

640664

641665
class JoinSourceNotFoundError(Exception):
642-
"""Join source is not found (by ID or collection)."""
666+
"""Join source is not found (by ID and/or collection)."""
643667
pass
644668

645669

646670
class JoinSourceMissingError(FileNotFoundError):
647-
"""Join source is missing (but still referenced)."""
671+
"""Join source is missing but still referenced (orphan)."""
648672
pass

0 commit comments

Comments
 (0)