Skip to content

Commit c59e264

Browse files
authored
feat(AwsDownload): zip partial download from s3 (#1561)
1 parent d325727 commit c59e264

5 files changed

Lines changed: 193 additions & 18 deletions

File tree

eodag/plugins/download/aws.py

Lines changed: 87 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
NotAvailableError,
6161
TimeOutError,
6262
)
63+
from eodag.utils.s3 import open_s3_zipped_object
6364

6465
if TYPE_CHECKING:
6566
from boto3.resources.collection import ResourceCollection
@@ -233,6 +234,7 @@ def __init__(self, provider: str, config: PluginConfig) -> None:
233234
super(AwsDownload, self).__init__(provider, config)
234235
self.requester_pays = getattr(self.config, "requester_pays", False)
235236
self.s3_session: Optional[boto3.session.Session] = None
237+
self.s3_resource: Optional[boto3.resources.base.ServiceResource] = None
236238

237239
def download(
238240
self,
@@ -326,19 +328,32 @@ def download(
326328
bucket_names_and_prefixes, auth
327329
)
328330

331+
# files in zip
332+
updated_bucket_names_and_prefixes = self._download_file_in_zip(
333+
product, bucket_names_and_prefixes, product_local_path, progress_callback
334+
)
335+
# prevent nothing-to-download errors if download was performed in zip
336+
raise_error = (
337+
False
338+
if len(updated_bucket_names_and_prefixes) != len(bucket_names_and_prefixes)
339+
else True
340+
)
341+
329342
# downloadable files
330343
unique_product_chunks = self._get_unique_products(
331-
bucket_names_and_prefixes,
344+
updated_bucket_names_and_prefixes,
332345
authenticated_objects,
333346
asset_filter,
334347
ignore_assets,
335348
product,
349+
raise_error=raise_error,
336350
)
337351

338352
total_size = sum([p.size for p in unique_product_chunks]) or None
339353

340354
# download
341-
progress_callback.reset(total=total_size)
355+
if len(unique_product_chunks) > 0:
356+
progress_callback.reset(total=total_size)
342357
try:
343358
for product_chunk in unique_product_chunks:
344359
try:
@@ -390,17 +405,65 @@ def download(
390405

391406
return product_local_path
392407

408+
def _download_file_in_zip(
409+
self, product, bucket_names_and_prefixes, product_local_path, progress_callback
410+
):
411+
"""
412+
Download file in zip from a prefix like `foo/bar.zip!file.txt`
413+
"""
414+
if self.s3_resource is None:
415+
logger.debug("Cannot check files in s3 zip without s3 resource")
416+
return bucket_names_and_prefixes
417+
418+
s3_client = self.s3_resource.meta.client
419+
420+
downloaded = []
421+
for i, pack in enumerate(bucket_names_and_prefixes):
422+
bucket_name, prefix = pack
423+
if ".zip!" in prefix:
424+
splitted_path = prefix.split(".zip!")
425+
zip_prefix = f"{splitted_path[0]}.zip"
426+
rel_path = splitted_path[-1]
427+
dest_file = os.path.join(product_local_path, rel_path)
428+
dest_abs_path_dir = os.path.dirname(dest_file)
429+
if not os.path.isdir(dest_abs_path_dir):
430+
os.makedirs(dest_abs_path_dir)
431+
432+
with open_s3_zipped_object(
433+
bucket_name, zip_prefix, s3_client, partial=False
434+
) as zip_file:
435+
# file size
436+
file_info = zip_file.getinfo(rel_path)
437+
progress_callback.reset(total=file_info.file_size)
438+
with zip_file.open(rel_path) as extracted, open(
439+
dest_file, "wb"
440+
) as output_file:
441+
# Read in 1MB chunks
442+
for zchunk in iter(lambda: extracted.read(1024 * 1024), b""):
443+
output_file.write(zchunk)
444+
progress_callback(len(zchunk))
445+
446+
downloaded.append(i)
447+
448+
return [
449+
pack
450+
for i, pack in enumerate(bucket_names_and_prefixes)
451+
if i not in downloaded
452+
]
453+
393454
def _download_preparation(
394455
self,
395456
product: EOProduct,
396457
progress_callback: ProgressCallback,
397458
**kwargs: Unpack[DownloadConf],
398459
) -> tuple[Optional[str], Optional[str]]:
399460
"""
400-
preparation for the download:
461+
Preparation for the download:
462+
401463
- check if file was already downloaded
402464
- get file path
403465
- create directories
466+
404467
:param product: product to be downloaded
405468
:param progress_callback: progress callback to be used
406469
:param kwargs: additional arguments
@@ -424,7 +487,8 @@ def _download_preparation(
424487

425488
def _configure_safe_build(self, build_safe: bool, product: EOProduct):
426489
"""
427-
updates the product properties with fetch metadata if safe build is enabled
490+
Updates the product properties with fetch metadata if safe build is enabled
491+
428492
:param build_safe: if safe build is enabled
429493
:param product: product to be updated
430494
"""
@@ -514,10 +578,11 @@ def _do_authentication(
514578
auth: Optional[Union[AuthBase, S3SessionKwargs]] = None,
515579
) -> tuple[dict[str, Any], ResourceCollection]:
516580
"""
517-
authenticates with s3 and retrieves the available objects
518-
raises an error when authentication is not possible
581+
Authenticates with s3 and retrieves the available objects
582+
519583
:param bucket_names_and_prefixes: list of bucket names and corresponding path prefixes
520584
:param auth: authentication information
585+
:raises AuthenticationError: authentication is not possible
521586
:return: authenticated objects per bucket, list of available objects
522587
"""
523588
if not isinstance(auth, (dict, type(None))):
@@ -584,14 +649,17 @@ def _get_unique_products(
584649
asset_filter: Optional[str],
585650
ignore_assets: bool,
586651
product: EOProduct,
652+
raise_error: bool = True,
587653
) -> set[Any]:
588654
"""
589-
retrieve unique product chunks based on authenticated objects and asset filters
655+
Retrieve unique product chunks based on authenticated objects and asset filters
656+
590657
:param bucket_names_and_prefixes: list of bucket names and corresponding path prefixes
591658
:param authenticated_objects: available objects per bucket
592659
:param asset_filter: text for which assets should be filtered
593660
:param ignore_assets: if product instead of individual assets should be used
594661
:param product: product that shall be downloaded
662+
:param raise_error: raise error if there is nothing to download
595663
:return: set of product chunks that can be downloaded
596664
"""
597665
product_chunks: list[Any] = []
@@ -613,12 +681,12 @@ def _get_unique_products(
613681
unique_product_chunks,
614682
)
615683
)
616-
if not unique_product_chunks:
684+
if not unique_product_chunks and raise_error:
617685
raise NotAvailableError(
618686
rf"No file basename matching re.fullmatch(r'{asset_filter}') was found in {product.remote_location}"
619687
)
620688

621-
if not unique_product_chunks:
689+
if not unique_product_chunks and raise_error:
622690
raise NoMatchingProductType("No product found to download.")
623691

624692
return unique_product_chunks
@@ -702,6 +770,13 @@ def _stream_download_dict(
702770
bucket_names_and_prefixes, auth
703771
)
704772

773+
# stream not implemented for prefixes like `foo/bar.zip!file.txt`
774+
for _, prefix in bucket_names_and_prefixes:
775+
if prefix and ".zip!" in prefix:
776+
raise NotImplementedError(
777+
"Download streaming is not implemented for files in zip on S3"
778+
)
779+
705780
# downloadable files
706781
unique_product_chunks = self._get_unique_products(
707782
bucket_names_and_prefixes,
@@ -936,6 +1011,7 @@ def _get_authenticated_objects_from_auth_profile(
9361011
objects = s3_resource.Bucket(bucket_name).objects
9371012
list(objects.filter(Prefix=prefix).limit(1))
9381013
self.s3_session = s3_session
1014+
self.s3_resource = s3_resource
9391015
return objects
9401016
else:
9411017
return None
@@ -966,6 +1042,7 @@ def _get_authenticated_objects_from_auth_keys(
9661042
objects = s3_resource.Bucket(bucket_name).objects
9671043
list(objects.filter(Prefix=prefix).limit(1))
9681044
self.s3_session = s3_session
1045+
self.s3_resource = s3_resource
9691046
return objects
9701047
else:
9711048
return None
@@ -987,6 +1064,7 @@ def _get_authenticated_objects_from_env(
9871064
objects = s3_resource.Bucket(bucket_name).objects
9881065
list(objects.filter(Prefix=prefix).limit(1))
9891066
self.s3_session = s3_session
1067+
self.s3_resource = s3_resource
9901068
return objects
9911069

9921070
def get_product_bucket_name_and_prefix(

eodag/utils/s3.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
)
3737

3838
if TYPE_CHECKING:
39-
from zipfile import ZipInfo
39+
from zipfile import ZipFile, ZipInfo
4040

4141
from mypy_boto3_s3.client import S3Client
4242

@@ -78,18 +78,19 @@ def parse_int(bytes: bytes) -> int:
7878
return val
7979

8080

81-
def list_files_in_s3_zipped_object(
82-
bucket_name: str, key_name: str, client_s3: S3Client
83-
) -> List[ZipInfo]:
81+
def open_s3_zipped_object(
82+
bucket_name: str, key_name: str, client_s3: S3Client, partial: bool = True
83+
) -> ZipFile:
8484
"""
85-
List files in s3 zipped object, without downloading it.
85+
Open s3 zipped object, without downloading it.
8686
8787
See https://stackoverflow.com/questions/41789176/how-to-count-files-inside-zip-in-aws-s3-without-downloading-it;
8888
Based on https://stackoverflow.com/questions/51351000/read-zip-files-from-s3-without-downloading-the-entire-file
8989
9090
:param bucket_name: Bucket name of the object to fetch
9191
:param key_name: Key name of the object to fetch
9292
:param client_s3: s3 client used to fetch the object
93+
:param partial: fetch partial data if only content info is needed
9394
:returns: List of files in zip
9495
"""
9596
response = client_s3.head_object(Bucket=bucket_name, Key=key_name)
@@ -104,11 +105,33 @@ def list_files_in_s3_zipped_object(
104105

105106
# fetch central directory, append EOCD, and open as zipfile
106107
cd = fetch(bucket_name, key_name, cd_start, cd_size, client_s3)
107-
zip = zipfile.ZipFile(io.BytesIO(cd + eocd))
108108

109-
logger.debug("Found %s files in %s" % (len(zip.filelist), key_name))
109+
zip_data = (
110+
cd + eocd if partial else fetch(bucket_name, key_name, 0, size, client_s3)
111+
)
112+
113+
zip = zipfile.ZipFile(io.BytesIO(zip_data))
114+
115+
return zip
116+
117+
118+
def list_files_in_s3_zipped_object(
119+
bucket_name: str, key_name: str, client_s3: S3Client
120+
) -> List[ZipInfo]:
121+
"""
122+
List files in s3 zipped object, without downloading it.
123+
124+
See https://stackoverflow.com/questions/41789176/how-to-count-files-inside-zip-in-aws-s3-without-downloading-it;
125+
Based on https://stackoverflow.com/questions/51351000/read-zip-files-from-s3-without-downloading-the-entire-file
110126
111-
return zip.filelist
127+
:param bucket_name: Bucket name of the object to fetch
128+
:param key_name: Key name of the object to fetch
129+
:param client_s3: s3 client used to fetch the object
130+
:returns: List of files in zip
131+
"""
132+
with open_s3_zipped_object(bucket_name, key_name, client_s3) as zip_file:
133+
logger.debug("Found %s files in %s" % (len(zip_file.filelist), key_name))
134+
return zip_file.filelist
112135

113136

114137
def update_assets_from_s3(

tests/context.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,11 @@
9898
get_ssl_context,
9999
)
100100
from eodag.utils.requests import fetch_json
101-
from eodag.utils.s3 import list_files_in_s3_zipped_object, update_assets_from_s3
101+
from eodag.utils.s3 import (
102+
list_files_in_s3_zipped_object,
103+
update_assets_from_s3,
104+
open_s3_zipped_object,
105+
)
102106
from eodag.utils.exceptions import (
103107
AddressNotFound,
104108
AuthenticationError,

tests/units/test_download_plugins.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1729,6 +1729,66 @@ def test_plugins_download_aws_no_safe_build_flatten_top_dirs(
17291729
os.path.join(self.output_dir, self.product.properties["title"]),
17301730
)
17311731

1732+
@mock.patch(
1733+
"eodag.plugins.download.aws.open_s3_zipped_object",
1734+
autospec=True,
1735+
)
1736+
@mock.patch(
1737+
"eodag.plugins.download.aws.AwsDownload.get_authenticated_objects",
1738+
autospec=True,
1739+
)
1740+
def test_plugins_download_aws_in_zip(
1741+
self,
1742+
mock_get_authenticated_objects: mock.Mock,
1743+
mock_open_s3_zipped_object: mock.Mock,
1744+
):
1745+
"""AwsDownload.download() must handle files in zip"""
1746+
1747+
def _open_zip(*args, **kwargs):
1748+
return zipfile.ZipFile(
1749+
os.path.join(
1750+
TEST_RESOURCES_PATH,
1751+
"products",
1752+
"as_archive",
1753+
"S2A_MSIL1C_20180101T105441_N0206_R051_T31TDH_20180101T124911.zip",
1754+
)
1755+
)
1756+
1757+
mock_open_s3_zipped_object.side_effect = _open_zip
1758+
1759+
plugin = self.get_download_plugin(self.product)
1760+
plugin.s3_resource = mock.Mock()
1761+
self.product.assets.clear()
1762+
self.product.assets.update(
1763+
{
1764+
"file1": {
1765+
"href": (
1766+
"http://example.com/path/to/foo.zip!"
1767+
"GRANULE/L1C_T31TDH_A013204_20180101T105435/IMG_DATA/T31TDH_20180101T105441_B01.jp2"
1768+
)
1769+
},
1770+
"file2": {
1771+
"href": "http://example.com/path/to/foo.zip!GRANULE/L1C_T31TDH_A013204_20180101T105435/MTD_TL.xml"
1772+
},
1773+
}
1774+
)
1775+
# no SAFE build and flatten_top_dirs
1776+
plugin.config.products[self.product.product_type]["build_safe"] = False
1777+
plugin.config.flatten_top_dirs = True
1778+
1779+
path = plugin.download(self.product, output_dir=self.output_dir)
1780+
1781+
self.assertEqual(mock_open_s3_zipped_object.call_count, 2)
1782+
mock_open_s3_zipped_object.assert_called_with(
1783+
"example", "path/to/foo.zip", plugin.s3_resource.meta.client, partial=False
1784+
)
1785+
self.assertTrue(
1786+
os.path.isfile(
1787+
os.path.join(path, "IMG_DATA/T31TDH_20180101T105441_B01.jp2")
1788+
)
1789+
)
1790+
self.assertTrue(os.path.isfile(os.path.join(path, "MTD_TL.xml")))
1791+
17321792
@mock.patch("eodag.plugins.download.aws.flatten_top_directories", autospec=True)
17331793
@mock.patch(
17341794
"eodag.plugins.download.aws.AwsDownload.check_manifest_file_list", autospec=True

tests/units/test_utils_s3.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import zipfile
23
from unittest import TestCase
34

45
import boto3
@@ -11,6 +12,7 @@
1112
MisconfiguredError,
1213
PluginConfig,
1314
list_files_in_s3_zipped_object,
15+
open_s3_zipped_object,
1416
update_assets_from_s3,
1517
)
1618

@@ -73,6 +75,14 @@ def test_utils_s3_list_files_in_s3_zipped_object(self):
7375
],
7476
)
7577

78+
def test_utils_s3_open_s3_zipped_object(self):
79+
"""list_files_in_s3_zipped_object must list the files in a zipped object stored in S3"""
80+
zip_file = open_s3_zipped_object(
81+
"mybucket", "path/to/product.zip", self.s3_client
82+
)
83+
self.assertIsInstance(zip_file, zipfile.ZipFile)
84+
self.assertEqual(len(zip_file.filelist), 6)
85+
7686
def test_utils_s3_update_assets_from_s3_zip(self):
7787
"""update_assets_from_s3 must update the assets of a product from a zipped object stored in S3"""
7888
prod = EOProduct("dummy", dict(geometry="POINT (0 0)", id="foo"))

0 commit comments

Comments
 (0)