Skip to content

Commit fa9f694

Browse files
Merge branch 'main' into reductionist_count_as_bytes
2 parents 75e3b62 + 7c1b51e commit fa9f694

7 files changed

Lines changed: 60 additions & 23 deletions

File tree

activestorage/active.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,8 @@ def __init__(self,
190190
interface_type: str = None,
191191
max_threads: int = 100,
192192
storage_options: dict = None,
193-
active_storage_url: str = None) -> None:
193+
active_storage_url: str = None,
194+
option_disable_chunk_cache: bool = False) -> None:
194195
"""
195196
Instantiate with a NetCDF4 dataset URI and the variable of interest within that file.
196197
(We need the variable, because we need variable specific metadata from within that
@@ -199,6 +200,7 @@ def __init__(self,
199200
200201
:param storage_options: s3fs.S3FileSystem options
201202
:param active_storage_url: Reductionist server URL
203+
:param option_disable_chunk_cache: flag to disable chunk caching
202204
"""
203205
self.ds = None
204206
input_variable = False
@@ -256,6 +258,9 @@ def __init__(self,
256258
self.storage_options = storage_options
257259
self.active_storage_url = active_storage_url
258260

261+
# turn off chunk caching
262+
self.option_disable_chunk_cache = option_disable_chunk_cache
263+
259264
# basic check on file
260265
if not input_variable:
261266
if not os.path.isfile(self.uri) and not self.interface_type:
@@ -660,6 +665,9 @@ def _process_chunk(self,
660665
# Axes over which to apply a reduction
661666
axis = self._axis
662667

668+
# turn off chunk caching
669+
chunk_caching = self.option_disable_chunk_cache
670+
663671
if self.interface_type == 's3' and self._version == 1:
664672
tmp, count = reduce_opens3_chunk(ds._fh,
665673
offset,
@@ -702,7 +710,8 @@ def _process_chunk(self,
702710
ds._order,
703711
chunk_selection,
704712
axis,
705-
operation=self._method)
713+
operation=self._method,
714+
option_disable_chunk_cache=chunk_caching,)
706715
else:
707716
if self.storage_options.get("anon", None) is True:
708717
bucket = os.path.dirname(parsed_url.path)
@@ -722,7 +731,8 @@ def _process_chunk(self,
722731
ds._order,
723732
chunk_selection,
724733
axis,
725-
operation=self._method)
734+
operation=self._method,
735+
option_disable_chunk_cache=chunk_caching,)
726736
elif self.interface_type == "https" and self._version == 2:
727737
tmp, count = reductionist.reduce_chunk(session,
728738
self.active_storage_url,
@@ -738,7 +748,8 @@ def _process_chunk(self,
738748
chunk_selection,
739749
axis,
740750
operation=self._method,
741-
interface_type="https")
751+
interface_type="https",
752+
option_disable_chunk_cache=chunk_caching,)
742753

743754
elif self.interface_type == 'ActivePosix' and self.version == 2:
744755
# This is where the DDN Fuse and Infinia wrappers go

activestorage/reductionist.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ def reduce_chunk(session,
4848
chunk_selection,
4949
axis,
5050
operation,
51-
interface_type=None):
51+
interface_type=None,
52+
option_disable_chunk_cache=False):
5253
"""Perform a reduction on a chunk using Reductionist.
5354
5455
:param server: Reductionist server URL
@@ -71,6 +72,7 @@ def reduce_chunk(session,
7172
:param axis: tuple of the axes to be reduced (non-negative integers)
7273
:param operation: name of operation to perform
7374
:param interface_type: optional testing flag to allow HTTPS reduction
75+
:param option_disable_chunk_cache: optional turn off chunk cache
7476
:returns: the reduced data as a numpy array or scalar
7577
:raises ReductionistError: if the request to Reductionist fails
7678
"""
@@ -86,7 +88,8 @@ def reduce_chunk(session,
8688
order,
8789
chunk_selection,
8890
axis,
89-
interface_type=interface_type)
91+
interface_type=interface_type,
92+
option_disable_chunk_cache=option_disable_chunk_cache)
9093
if DEBUG:
9194
print(f"Reductionist request data dictionary: {request_data}")
9295
api_operation = "sum" if operation == "mean" else operation or "select"
@@ -184,7 +187,8 @@ def build_request_data(url: str,
184187
order,
185188
selection,
186189
axis,
187-
interface_type=None) -> dict:
190+
interface_type=None,
191+
option_disable_chunk_cache=False) -> dict:
188192
"""Build request data for Reductionist API."""
189193
request_data = {
190194
'interface_type': interface_type if interface_type else "s3",
@@ -209,6 +213,8 @@ def build_request_data(url: str,
209213
request_data["filters"] = encode_filters(filters)
210214
if any(missing):
211215
request_data["missing"] = encode_missing(missing)
216+
if option_disable_chunk_cache:
217+
request_data["option_disable_chunk_cache"] = True
212218

213219
if axis is not None:
214220
request_data['axis'] = axis

activestorage/storage.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ def reduce_chunk(rfile,
1616
order,
1717
chunk_selection,
1818
axis,
19-
method=None):
19+
method=None,
20+
option_disable_chunk_cache=False,):
2021
"""
2122
We do our own read of chunks and decoding etc
2223

tests/test_real_https.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,17 @@ def test_https():
3636
# v2: declared storage type
3737
active = Active(test_file_uri, "ta",
3838
interface_type="https",
39-
active_storage_url=active_storage_url)
39+
active_storage_url=active_storage_url,
40+
option_disable_chunk_cache=True)
4041
active._version = 2
4142
result = active.min()[0:3, 4:6, 7:9]
4243
print("Result is", result)
4344
assert result == np.array([220.3180694580078], dtype="float32")
4445

4546
# v2: inferred storage type
4647
active = Active(test_file_uri, "ta",
47-
active_storage_url=active_storage_url)
48+
active_storage_url=active_storage_url,
49+
option_disable_chunk_cache=True)
4850
active._version = 2
4951
result = active.min()[0:3, 4:6, 7:9]
5052
print("Result is", result)
@@ -57,7 +59,8 @@ def test_https():
5759
# v2: inferred storage type, pop axis
5860
active = Active(test_file_uri, "ta",
5961
interface_type="https",
60-
active_storage_url=active_storage_url)
62+
active_storage_url=active_storage_url,
63+
option_disable_chunk_cache=True)
6164
active._version = 2
6265
result = active.min(axis=(0, 1))[:]
6366
print("Result is", result)
@@ -84,7 +87,8 @@ def test_https():
8487
active = Active(test_file_uri, "ta",
8588
interface_type="https",
8689
storage_options={"username": None, "password": None},
87-
active_storage_url=active_storage_url)
90+
active_storage_url=active_storage_url,
91+
option_disable_chunk_cache=True)
8892
active._version = 2
8993
result = active.min(axis=(0, 1))[:]
9094
print("Result is", result)
@@ -137,7 +141,9 @@ def test_https_bigger_file():
137141
"""Run a true test with a https FILE."""
138142
test_file_uri = "https://esgf.ceda.ac.uk/thredds/fileServer/esg_cmip6/CMIP6/AerChemMIP/MOHC/UKESM1-0-LL/ssp370SST-lowNTCF/r1i1p1f2/Amon/cl/gn/latest/cl_Amon_UKESM1-0-LL_ssp370SST-lowNTCF_r1i1p1f2_gn_205001-209912.nc"
139143
active_storage_url = "https://reductionist.jasmin.ac.uk/" # Wacasoft new Reductionist
140-
active = Active(test_file_uri, "cl", active_storage_url=active_storage_url)
144+
active = Active(test_file_uri, "cl",
145+
active_storage_url=active_storage_url,
146+
option_disable_chunk_cache=True)
141147
active._version = 2
142148
result = active.min()[0:3, 4:6, 7:9]
143149
print("Result is", result)

tests/test_real_s3.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ def test_anon_s3():
2929
'endpoint_url': "https://uor-aces-o.s3-ext.jc.rl.ac.uk"
3030
}
3131
},
32-
active_storage_url=active_storage_url)
32+
active_storage_url=active_storage_url,
33+
option_disable_chunk_cache=True)
3334
active._version = 2
3435
with pytest.raises(ReductionistError):
3536
result = active.min()[:]
@@ -54,7 +55,8 @@ def test_s3_small_file():
5455
active = Active(test_file_uri,
5556
'tas',
5657
storage_options=storage_options,
57-
active_storage_url=active_storage_url)
58+
active_storage_url=active_storage_url,
59+
option_disable_chunk_cache=True)
5860
active._version = 2
5961
result = active.min()[0:3, 4:6, 7:9]
6062
print("Result is", result)
@@ -80,7 +82,8 @@ def test_s3_small_dataset():
8082
av = dataset['tas']
8183
active = Active(av,
8284
storage_options=storage_options,
83-
active_storage_url=active_storage_url)
85+
active_storage_url=active_storage_url,
86+
option_disable_chunk_cache=True)
8487
active._version = 2
8588
result = active.min()[0:3, 4:6, 7:9]
8689
print("Result is", result)
@@ -109,7 +112,8 @@ def test_s3_dataset():
109112
'UM_m01s16i202_vn1106',
110113
interface_type="s3",
111114
storage_options=storage_options,
112-
active_storage_url=active_storage_url)
115+
active_storage_url=active_storage_url,
116+
option_disable_chunk_cache=True)
113117
active._version = 2
114118
result = active.min()[0:3, 4:6, 7:9] # standardized slice
115119
print("Result is", result)
@@ -119,7 +123,8 @@ def test_s3_dataset():
119123
active = Active(test_file_uri,
120124
'UM_m01s16i202_vn1106',
121125
storage_options=storage_options,
122-
active_storage_url=active_storage_url)
126+
active_storage_url=active_storage_url,
127+
option_disable_chunk_cache=True)
123128
active._version = 2
124129
result = active.min()[0:3, 4:6, 7:9] # standardized slice
125130
print("Result is", result)
@@ -133,7 +138,8 @@ def test_s3_dataset():
133138
active = Active(av,
134139
interface_type="s3",
135140
storage_options=storage_options,
136-
active_storage_url=active_storage_url)
141+
active_storage_url=active_storage_url,
142+
option_disable_chunk_cache=True)
137143
active._version = 2
138144
result = active.min()[0:3, 4:6, 7:9] # standardized slice
139145
print("Result is", result)
@@ -142,7 +148,8 @@ def test_s3_dataset():
142148
# dataset: implicit interface_type
143149
active = Active(av,
144150
storage_options=storage_options,
145-
active_storage_url=active_storage_url)
151+
active_storage_url=active_storage_url,
152+
option_disable_chunk_cache=True)
146153
active._version = 2
147154
result = active.min()[0:3, 4:6, 7:9] # standardized slice
148155
print("Result is", result)

tests/test_real_s3_with_axes.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ def build_active_test1_file():
2525
print("S3 Test file path:", test_file_uri)
2626
active = Active(test_file_uri, 'tas', interface_type="s3",
2727
storage_options=storage_options,
28-
active_storage_url=active_storage_url)
28+
active_storage_url=active_storage_url,
29+
option_disable_chunk_cache=True)
2930

3031
active._version = 2
3132

@@ -49,7 +50,8 @@ def build_active_small_file():
4950
print("S3 Test file path:", test_file_uri)
5051
active = Active(test_file_uri, 'tas', interface_type="s3",
5152
storage_options=storage_options,
52-
active_storage_url=active_storage_url)
53+
active_storage_url=active_storage_url,
54+
option_disable_chunk_cache=True)
5355

5456
active._version = 2
5557

@@ -104,7 +106,8 @@ def build_active():
104106
print("S3 Test file path:", test_file_uri)
105107
active = Active(test_file_uri, 'm01s30i111', interface_type="s3", # 'm01s06i247_4', interface_type="s3",
106108
storage_options=storage_options,
107-
active_storage_url=active_storage_url)
109+
active_storage_url=active_storage_url,
110+
option_disable_chunk_cache=True)
108111

109112
active._version = 2
110113

tests/unit/test_storage_types.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ def reduce_chunk(
4646
chunk_selection,
4747
axis,
4848
operation,
49+
option_disable_chunk_cache,
4950
):
5051
return activestorage.storage.reduce_chunk(
5152
test_file,
@@ -60,6 +61,7 @@ def reduce_chunk(
6061
chunk_selection,
6162
axis,
6263
np.max,
64+
False,
6365
)
6466

6567
mock_load.side_effect = load_from_s3
@@ -102,6 +104,7 @@ def reduce_chunk(
102104
mock.ANY,
103105
mock.ANY,
104106
operation="max",
107+
option_disable_chunk_cache=False,
105108
)
106109

107110

0 commit comments

Comments
 (0)