diff --git a/activestorage/active.py b/activestorage/active.py index fb963f32..c3380636 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -275,6 +275,7 @@ def __load_nc_file(self): nc = load_from_https(self.uri) self.filename = self.uri self.ds = nc[ncvar] + print("Loaded dataset", self.ds) def __get_missing_attributes(self): if self.ds is None: @@ -365,19 +366,22 @@ def method(self, value): self._method = value - @property - def mean(self): + def mean(self, axis=None): self._method = "mean" + if axis is not None: + self._axis = axis return self - @property - def min(self): + def min(self, axis=None): self._method = "min" + if axis is not None: + self._axis = axis return self - @property - def max(self): + def max(self, axis=None): self._method = "max" + if axis is not None: + self._axis = axis return self @property diff --git a/activestorage/reductionist.py b/activestorage/reductionist.py index 66240dd5..3c6809e8 100644 --- a/activestorage/reductionist.py +++ b/activestorage/reductionist.py @@ -10,7 +10,7 @@ import numpy as np import requests -REDUCTIONIST_AXIS_READY = False +REDUCTIONIST_AXIS_READY = True DEBUG = 0 @@ -88,6 +88,7 @@ def reduce_chunk(session, chunk_selection, axis, storage_type=storage_type) + print(f"Reductionist request data dictionary: {request_data}") if DEBUG: print(f"Reductionist request data dictionary: {request_data}") api_operation = "sum" if operation == "mean" else operation or "select" @@ -234,15 +235,18 @@ def request(session: requests.Session, url: str, request_data: dict): def decode_result(response): """Decode a successful response, return as a 2-tuple of (numpy array or scalar, count).""" - dtype = response.headers['x-activestorage-dtype'] - shape = json.loads(response.headers['x-activestorage-shape']) + reduction_result = json.loads(response.content) + print("Reduction result: ", reduction_result) + print("Reduction result size: ", sys.getsizeof(reduction_result)) + dtype = reduction_result['dtype'] + shape = reduction_result['shape'] if "shape" in reduction_result else None # Result - result = np.frombuffer(response.content, dtype=dtype) + result = np.frombuffer(bytes(reduction_result['bytes']), dtype=dtype) result = result.reshape(shape) # Counts - count = json.loads(response.headers['x-activestorage-count']) + count = reduction_result['count'] # TODO: When reductionist is ready, we need to fix 'count' # Mask the result diff --git a/tests/test_bigger_data.py b/tests/test_bigger_data.py index a305dec1..99446489 100644 --- a/tests/test_bigger_data.py +++ b/tests/test_bigger_data.py @@ -136,7 +136,7 @@ def test_cl_mean(tmp_path): active = Active(ncfile, "cl", storage_type=utils.get_storage_type()) active._version = 2 active.components = True - result2 = active.mean[4:5, 1:2] + result2 = active.mean()[4:5, 1:2] print(result2, ncfile) # expect {'sum': array([[[[264.]]]], dtype=float32), 'n': array([[[[12]]]])} # check for typing and structure @@ -151,7 +151,7 @@ def test_cl_min(tmp_path): ncfile = save_cl_file_with_a(tmp_path) active = Active(ncfile, "cl", storage_type=utils.get_storage_type()) active._version = 2 - result2 = active.min[4:5, 1:2] + result2 = active.min()[4:5, 1:2] np.testing.assert_array_equal(result2, np.array([[[[22.]]]], dtype="float32")) @@ -160,7 +160,7 @@ def test_cl_max(tmp_path): ncfile = save_cl_file_with_a(tmp_path) active = Active(ncfile, "cl", storage_type=utils.get_storage_type()) active._version = 2 - result2 = active.max[4:5, 1:2] + result2 = active.max()[4:5, 1:2] np.testing.assert_array_equal(result2, np.array([[[[22.]]]], dtype="float32")) @@ -169,7 +169,7 @@ def test_cl_global_max(tmp_path): ncfile = save_cl_file_with_a(tmp_path) active = Active(ncfile, "cl", storage_type=utils.get_storage_type()) active._version = 2 - result2 = active.max[:] + result2 = active.max()[:] np.testing.assert_array_equal(result2, np.array([[[[22.]]]], dtype="float32")) @@ -192,7 +192,7 @@ def test_ps(tmp_path): active = Active(ncfile, "ps", storage_type=utils.get_storage_type()) active._version = 2 active.components = True - result2 = active.mean[4:5, 1:2] + result2 = active.mean()[4:5, 1:2] print(result2, ncfile) # expect {'sum': array([[[22.]]]), 'n': array([[[4]]])} # check for typing and structure @@ -381,7 +381,7 @@ def test_daily_data_masked_two_stats(test_data_path): # first a mean active = Active(uri, "ta", storage_type=utils.get_storage_type()) active._version = 2 - result2 = active.min[:] + result2 = active.min()[:] assert result2 == 245.0020751953125 # then recycle Active object for something else diff --git a/tests/test_real_https.py b/tests/test_real_https.py index 5a718c35..2bb6f7cf 100644 --- a/tests/test_real_https.py +++ b/tests/test_real_https.py @@ -15,7 +15,7 @@ def test_https(): active = Active(test_file_uri, "cl", storage_type="https") active._version = 1 - result = active.min[0:3, 4:6, 7:9] + result = active.min()[0:3, 4:6, 7:9] print("Result is", result) assert result == np.array([0.6909787], dtype="float32") @@ -26,7 +26,7 @@ def test_https_100years(): test_file_uri = "https://esgf.ceda.ac.uk/thredds/fileServer/esg_cmip6/CMIP6/CMIP/MOHC/UKESM1-1-LL/historical/r1i1p1f2/Amon/pr/gn/latest/pr_Amon_UKESM1-1-LL_historical_r1i1p1f2_gn_195001-201412.nc" active = Active(test_file_uri, "pr") active._version = 1 - result = active.min[0:3, 4:6, 7:9] + result = active.min()[0:3, 4:6, 7:9] print("Result is", result) assert result == np.array([5.4734613e-07], dtype="float32") @@ -43,7 +43,7 @@ def test_https_reductionist(): with pytest.raises(activestorage.reductionist.ReductionistError): active = Active(test_file_uri, "cl") active._version = 2 - result = active.min[0:3, 4:6, 7:9] + result = active.min()[0:3, 4:6, 7:9] print("Result is", result) assert result == np.array([0.6909787], dtype="float32") @@ -57,7 +57,7 @@ def test_https_implicit_storage(): active = Active(test_file_uri, "cl") active._version = 1 - result = active.min[0:3, 4:6, 7:9] + result = active.min()[0:3, 4:6, 7:9] print("Result is", result) assert result == np.array([0.6909787], dtype="float32") @@ -73,7 +73,7 @@ def test_https_implicit_storage_file_not_found(): with pytest.raises(FileNotFoundError): active = Active(test_file_uri, "cl") active._version = 1 - result = active.min[0:3, 4:6, 7:9] + result = active.min()[0:3, 4:6, 7:9] def test_https_implicit_storage_wrong_url(): @@ -98,7 +98,7 @@ def test_https_dataset(): active = Active(av, storage_type="https") active._version = 1 - result = active.min[0:3, 4:6, 7:9] + result = active.min()[0:3, 4:6, 7:9] print("Result is", result) assert result == np.array([0.6909787], dtype="float32") @@ -114,6 +114,6 @@ def test_https_dataset_implicit_storage(): active = Active(av) active._version = 1 - result = active.min[0:3, 4:6, 7:9] + result = active.min()[0:3, 4:6, 7:9] print("Result is", result) assert result == np.array([0.6909787], dtype="float32") diff --git a/tests/test_real_s3.py b/tests/test_real_s3.py index 70e774f1..8821a9a7 100644 --- a/tests/test_real_s3.py +++ b/tests/test_real_s3.py @@ -39,7 +39,7 @@ def test_s3_dataset(): storage_options=storage_options, active_storage_url=active_storage_url) active._version = 2 - result = active.min[0:3, 4:6, 7:9] # standardized slice + result = active.min()[0:3, 4:6, 7:9] # standardized slice print("Result is", result) assert result == 5098.625 @@ -49,7 +49,7 @@ def test_s3_dataset(): storage_options=storage_options, active_storage_url=active_storage_url) active._version = 2 - result = active.min[0:3, 4:6, 7:9] # standardized slice + result = active.min()[0:3, 4:6, 7:9] # standardized slice print("Result is", result) assert result == 5098.625 @@ -63,7 +63,7 @@ def test_s3_dataset(): storage_options=storage_options, active_storage_url=active_storage_url) active._version = 2 - result = active.min[0:3, 4:6, 7:9] # standardized slice + result = active.min()[0:3, 4:6, 7:9] # standardized slice print("Result is", result) assert result == 5098.625 @@ -72,6 +72,6 @@ def test_s3_dataset(): storage_options=storage_options, active_storage_url=active_storage_url) active._version = 2 - result = active.min[0:3, 4:6, 7:9] # standardized slice + result = active.min()[0:3, 4:6, 7:9] # standardized slice print("Result is", result) assert result == 5098.625 diff --git a/tests/test_real_s3_with_axes.py b/tests/test_real_s3_with_axes.py new file mode 100644 index 00000000..826103b7 --- /dev/null +++ b/tests/test_real_s3_with_axes.py @@ -0,0 +1,134 @@ +import os +import numpy as np +import pyfive + +from activestorage.active import Active + + +S3_BUCKET = "bnl" + +def build_active_small_file(): + """Run an integration test with real data off S3 but with a small file.""" + storage_options = { + 'key': "f2d55c6dcfc7618b2c34e00b58df3cef", + 'secret': "$/'#M{0{/4rVhp%n^(XeX$q@y#&(NM3W1->~N.Q6VP.5[@bLpi='nt]AfH)>78pT", + 'client_kwargs': {'endpoint_url': "https://uor-aces-o.s3-ext.jc.rl.ac.uk"}, # final proxy + } + active_storage_url = "https://reductionist.jasmin.ac.uk/" # Wacasoft new Reductionist + bigger_file = "CMIP6-test.nc" # tas; 15 (time) x 143 x 144 + + test_file_uri = os.path.join( + S3_BUCKET, + bigger_file + ) + print("S3 Test file path:", test_file_uri) + active = Active(test_file_uri, 'tas', storage_type="s3", + storage_options=storage_options, + active_storage_url=active_storage_url) + + active._version = 2 + + return active + + +def test_small_file_axis_0_1(): + """Fails: activestorage.reductionist.ReductionistError: Reductionist error: HTTP 502: -""" + active = build_active_small_file() + result = active.min(axis=(0, 1))[:] + print("Reductionist final result", result) + assert min(result[0][0]) == 197.69595 + + +def test_small_file_axis_0_1_compare_with_numpy(): + """Fails: activestorage.reductionist.ReductionistError: Reductionist error: HTTP 502: -""" + active = build_active_small_file() + result = active.min(axis=(0, 1))[:] + print("Reductionist final result", result) + + # use numpy and local test data + ds = pyfive.File("tests/test_data/CMIP6-test.nc")["tas"] + minarr= np.min(ds[:], axis=(0, 1), keepdims=True) + print(len(minarr)) # 144 + print(min(minarr)) # 197.69595 + assert np.min(result) == np.min(minarr) + np.testing.assert_array_equal(result, minarr) + + +def build_active(): + """Run an integration test with real data off S3.""" + storage_options = { + 'key': "f2d55c6dcfc7618b2c34e00b58df3cef", + 'secret': "$/'#M{0{/4rVhp%n^(XeX$q@y#&(NM3W1->~N.Q6VP.5[@bLpi='nt]AfH)>78pT", + 'client_kwargs': {'endpoint_url': "https://uor-aces-o.s3-ext.jc.rl.ac.uk"}, # final proxy + } + active_storage_url = "https://reductionist.jasmin.ac.uk/" # Wacasoft new Reductionist + bigger_file = "da193a_25_6hr_t_pt_cordex__198807-198807.nc" # m01s30i111 ## older 3GB 30 chunks + + test_file_uri = os.path.join( + S3_BUCKET, + bigger_file + ) + print("S3 Test file path:", test_file_uri) + active = Active(test_file_uri, 'm01s30i111', storage_type="s3", # 'm01s06i247_4', storage_type="s3", + storage_options=storage_options, + active_storage_url=active_storage_url) + + active._version = 2 + + return active + + +## Active loads a 4dim dataset +## Loaded dataset +## default axis arg (when axis=None): 'axis': (0, 1, 2, 3) + +def test_no_axis(): + """ + Fails: it should pass: 'axis': (0, 1, 2, 3) default + are fine! + + activestorage.reductionist.ReductionistError: Reductionist error: HTTP 400: {"error": {"message": "request data is not valid", "caused_by": ["__all__: Validation error: Number of reduction axes must be less than length of shape - to reduce over all axes omit the axis field completely [{}]"]}} + """ + active = build_active() + result = active.min()[:] + assert result == [[[[164.8125]]]] + + +def test_no_axis_2(): + """ + Fails: it should pass: 'axis': (0, 1, 2, 3) default + are fine! + + activestorage.reductionist.ReductionistError: Reductionist error: HTTP 400: {"error": {"message": "request data is not valid", "caused_by": ["__all__: Validation error: Number of reduction axes must be less than length of shape - to reduce over all axes omit the axis field completely [{}]"]}} + """ + active = build_active() + result = active.min(axis=())[:] + assert result == [[[[164.8125]]]] + + +def test_axis_0(): + """Fails: activestorage.reductionist.ReductionistError: Reductionist error: HTTP 502: -""" + active = build_active() + result = active.min(axis=(0, ))[:] + assert result == [[[[164.8125]]]] + + +def test_axis_0_1(): + """Fails: activestorage.reductionist.ReductionistError: Reductionist error: HTTP 502: -""" + active = build_active() + result = active.min(axis=(0, 1))[:] + assert result == [[[[164.8125]]]] + + +def test_axis_1(): + """Fails: activestorage.reductionist.ReductionistError: Reductionist error: HTTP 502: -""" + active = build_active() + result = active.min(axis=(1, ))[:] + assert result == [[[[164.8125]]]] + + +def test_axis_0_1_2(): + """Passes fine.""" + active = build_active() + result = active.min(axis=(0, 1, 2))[:] + assert result[0][0][0][0] == 171.05126953125 diff --git a/tests/unit/test_active.py b/tests/unit/test_active.py index d2f5c600..e2535ad1 100644 --- a/tests/unit/test_active.py +++ b/tests/unit/test_active.py @@ -119,7 +119,7 @@ def test_activevariable_pyfive_with_attributed_min(): ncvar = "TREFHT" ds = pyfive.File(uri)[ncvar] av = Active(ds) - av_slice_min = av.min[3:5] + av_slice_min = av.min()[3:5] assert av_slice_min == np.array(258.62814, dtype="float32") # test with Numpy np_slice_min = np.min(ds[3:5]) @@ -132,7 +132,7 @@ def test_activevariable_pyfive_with_attributed_mean(): ds = pyfive.File(uri)[ncvar] av = Active(ds) av.components = True - av_slice_min = av.mean[3:5] + av_slice_min = av.mean()[3:5] actual_mean = av_slice_min["sum"] / av_slice_min["n"] assert actual_mean == np.array(283.39508056640625, dtype="float32") # test with Numpy diff --git a/tests/unit/test_active_axis.py b/tests/unit/test_active_axis.py index 3bbc085f..d3d62cf1 100644 --- a/tests/unit/test_active_axis.py +++ b/tests/unit/test_active_axis.py @@ -76,14 +76,34 @@ def test_active_axis_format_1(): active1 = Active(rfile, ncvar, axis=[0, 2]) active2 = Active(rfile, ncvar, axis=(-1, -3)) - x1 = active2.mean[...] - x2 = active2.mean[...] + x1 = active2.mean()[...] + x2 = active2.mean()[...] assert x1.shape == x2.shape assert (x1.mask == x2.mask).all() assert np.ma.allclose(x1, x2) +def test_active_axis_format_new_api(): + """Unit test for class:Active axis format with Numpy-style API.""" + active1 = Active(rfile, ncvar) + active2 = Active(rfile, ncvar) + + x1 = active2.mean(axis=(0, 2))[...] + assert active2._axis == (0, 2) + x2 = active2.mean(axis=(-1, -3))[...] + assert active2._axis == (-1, -3) + + assert x1.shape == x2.shape + assert (x1.mask == x2.mask).all() + assert np.ma.allclose(x1, x2) + + xmin = active2.min(axis=(0, 2))[...] + xmax = active2.max(axis=(0, 2))[...] + assert xmin[0][0][0] == 209.44680786132812 + assert xmax[0][0][0] == 255.54661560058594 + + def test_active_axis_format_2(): """Unit test for class:Active axis format.""" # Disallow out-of-range axes diff --git a/tests/unit/test_reductionist.py b/tests/unit/test_reductionist.py index 4be797db..b1aa9831 100644 --- a/tests/unit/test_reductionist.py +++ b/tests/unit/test_reductionist.py @@ -2,6 +2,7 @@ import sys from unittest import mock +import json import numcodecs import numpy as np import pytest @@ -11,15 +12,18 @@ def make_response(content, status_code, dtype=None, shape=None, count=None): - response = requests.Response() - response._content = content - response.status_code = status_code + reduction_result = { + "bytes": list(content) + } if dtype: - response.headers["x-activestorage-dtype"] = dtype + reduction_result["dtype"] = dtype if shape: - response.headers["x-activestorage-shape"] = shape + reduction_result["shape"] = shape if count: - response.headers["x-activestorage-count"] = count + reduction_result["count"] = count + response = requests.Response() + response._content = json.dumps(reduction_result) + response.status_code = status_code return response @@ -27,7 +31,7 @@ def make_response(content, status_code, dtype=None, shape=None, count=None): def test_reduce_chunk_defaults(mock_request): """Unit test for reduce_chunk with default arguments.""" result = np.int32(134351386) - response = make_response(result.tobytes(), 200, "int32", "[]", "2") + response = make_response(result.tobytes(), 200, "int32", [], 2) mock_request.return_value = response active_url = "https://s3.example.com" @@ -86,7 +90,7 @@ def test_reduce_chunk_defaults(mock_request): def test_reduce_chunk_compression(mock_request, compression, filters): """Unit test for reduce_chunk with compression and filter arguments.""" result = np.int32(134351386) - response = make_response(result.tobytes(), 200, "int32", "[]", "2") + response = make_response(result.tobytes(), 200, "int32", [], 2) mock_request.return_value = response active_url = "https://s3.example.com" @@ -149,6 +153,8 @@ def test_reduce_chunk_compression(mock_request, compression, filters): "id": filter.codec_id, "element_size": filter.elementsize } for filter in filters], + "axis": + axis, } mock_request.assert_called_once_with(session, expected_url, expected_data) @@ -200,7 +206,7 @@ def test_reduce_chunk_missing(mock_request, missing): reduce_arg, api_arg = missing result = np.float32(-42.) - response = make_response(result.tobytes(), 200, "float32", "[]", "2") + response = make_response(result.tobytes(), 200, "float32", [], 2) mock_request.return_value = response active_url = "https://s3.example.com" @@ -258,6 +264,8 @@ def test_reduce_chunk_missing(mock_request, missing): ]], "missing": api_arg, + "axis": + axis, } mock_request.assert_called_once_with(session, expected_url, expected_data)