From 59759fe9edcd41829c9d26644f99c67c5819c376 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 11:20:08 +0000 Subject: [PATCH 01/28] :bug: implement list limit handling in combine function to prevent DoS via memory exhaustion --- src/qs_codec/decode.py | 15 +++++++--- src/qs_codec/utils/utils.py | 54 +++++++++++++++++++++++++++++++--- tests/unit/decode_test.py | 58 ++++++++++++++++++++++++++++++++++--- 3 files changed, 115 insertions(+), 12 deletions(-) diff --git a/src/qs_codec/decode.py b/src/qs_codec/decode.py index 42669bf..d169e11 100644 --- a/src/qs_codec/decode.py +++ b/src/qs_codec/decode.py @@ -27,7 +27,7 @@ from .models.decode_options import DecodeOptions from .models.undefined import UNDEFINED from .utils.decode_utils import DecodeUtils -from .utils.utils import Utils +from .utils.utils import OverflowDict, Utils def decode( @@ -288,7 +288,7 @@ def _parse_query_string_values(value: str, options: DecodeOptions) -> t.Dict[str # Combine/overwrite according to the configured duplicates policy. if existing and options.duplicates == Duplicates.COMBINE: - obj[key] = Utils.combine(obj[key], val) + obj[key] = Utils.combine(obj[key], val, options) elif not existing or options.duplicates == Duplicates.LAST: obj[key] = val @@ -361,10 +361,14 @@ def _parse_object( root: str = chain[i] if root == "[]" and options.parse_lists: - if options.allow_empty_lists and (leaf == "" or (options.strict_null_handling and leaf is None)): + if Utils.is_overflow(leaf): + obj = leaf + elif options.allow_empty_lists and (leaf == "" or (options.strict_null_handling and leaf is None)): obj = [] else: obj = list(leaf) if isinstance(leaf, (list, tuple)) else [leaf] + if options.list_limit is not None and len(obj) > options.list_limit: + obj = OverflowDict({str(i): x for i, x in enumerate(obj)}) else: obj = dict() @@ -389,7 +393,10 @@ def _parse_object( index = None if not options.parse_lists and decoded_root == "": - obj = {"0": leaf} + if Utils.is_overflow(leaf): + obj = leaf + else: + obj = {"0": leaf} elif ( index is not None and index >= 0 diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index b8428ca..ea6e68f 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -28,6 +28,12 @@ from ..models.undefined import Undefined +class OverflowDict(dict): + """A dictionary subclass used to mark objects that have been converted from lists due to the `list_limit` being exceeded.""" + + pass + + class Utils: """ Namespace container for stateless utility routines. @@ -143,6 +149,9 @@ def merge( target = list(target) target.append(source) elif isinstance(target, t.Mapping): + if Utils.is_overflow(target): + return Utils.combine(target, source, options) + # Target is a mapping but source is a sequence — coerce indices to string keys. if isinstance(source, (list, tuple)): _new = dict(target) @@ -171,7 +180,11 @@ def merge( for _el in _iter1: if not isinstance(_el, Undefined): _res.append(_el) - _iter2 = source if isinstance(source, (list, tuple)) else [source] + _iter2 = ( + source + if isinstance(source, (list, tuple)) + else (list(source.values()) if Utils.is_overflow(source) else [source]) + ) for _el in _iter2: if not isinstance(_el, Undefined): _res.append(_el) @@ -324,17 +337,50 @@ def _dicts_are_equal( else: return d1 == d2 + @staticmethod + def is_overflow(obj: t.Any) -> bool: + """Check if an object is an OverflowDict.""" + return isinstance(obj, OverflowDict) + @staticmethod def combine( a: t.Union[t.List[t.Any], t.Tuple[t.Any], t.Any], b: t.Union[t.List[t.Any], t.Tuple[t.Any], t.Any], - ) -> t.List[t.Any]: + options: t.Optional[DecodeOptions] = None, + ) -> t.Union[t.List[t.Any], t.Dict[str, t.Any]]: """ Concatenate two values, treating non‑sequences as singletons. - Returns a new `list`; tuples are expanded but not preserved as tuples. + If `list_limit` is exceeded, converts the list to an `OverflowDict` + (a dict with numeric keys) to prevent memory exhaustion. """ - return [*(a if isinstance(a, (list, tuple)) else [a]), *(b if isinstance(b, (list, tuple)) else [b])] + if Utils.is_overflow(a): + # a is already an OverflowDict. Append b to it at the next numeric index. + # We assume sequential keys; len(a) gives the next index. + a = t.cast(OverflowDict, a) + idx = len(a) + if isinstance(b, (list, tuple)): + for item in b: + a[str(idx)] = item + idx += 1 + elif Utils.is_overflow(b): + b = t.cast(OverflowDict, b) + for item in b.values(): + a[str(idx)] = item + idx += 1 + else: + a[str(idx)] = b + return a + + # Normal combination: flatten lists/tuples + res = [*(a if isinstance(a, (list, tuple)) else [a]), *(b if isinstance(b, (list, tuple)) else [b])] + + list_limit = options.list_limit if options else 20 + if len(res) > list_limit: + # Convert to OverflowDict + return OverflowDict({str(i): x for i, x in enumerate(res)}) + + return res @staticmethod def apply( diff --git a/tests/unit/decode_test.py b/tests/unit/decode_test.py index 9311e14..eb2e430 100644 --- a/tests/unit/decode_test.py +++ b/tests/unit/decode_test.py @@ -10,6 +10,7 @@ from qs_codec.decode import _parse_object from qs_codec.enums.decode_kind import DecodeKind from qs_codec.utils.decode_utils import DecodeUtils +from qs_codec.utils.utils import OverflowDict class TestDecode: @@ -318,7 +319,10 @@ def test_parses_an_explicit_list(self, query: str, expected: t.Dict) -> None: pytest.param("a=b&a[0]=c", None, {"a": ["b", "c"]}, id="simple-first-indexed-list-second"), pytest.param("a[1]=b&a=c", DecodeOptions(list_limit=20), {"a": ["b", "c"]}, id="indexed-list-with-limit"), pytest.param( - "a[]=b&a=c", DecodeOptions(list_limit=0), {"a": ["b", "c"]}, id="explicit-list-with-zero-limit" + "a[]=b&a=c", + DecodeOptions(list_limit=0), + {"a": {"0": "b", "1": "c"}}, + id="explicit-list-with-zero-limit", ), pytest.param("a[]=b&a=c", None, {"a": ["b", "c"]}, id="explicit-list-default"), pytest.param( @@ -574,7 +578,7 @@ def test_parses_lists_of_dicts(self, query: str, expected: t.Mapping[str, t.Any] pytest.param( "a[]=b&a[]&a[]=c&a[]=", DecodeOptions(strict_null_handling=True, list_limit=0), - {"a": ["b", None, "c", ""]}, + {"a": {"0": "b", "1": None, "2": "c", "3": ""}}, id="strict-null-and-empty-zero-limit", ), pytest.param( @@ -586,7 +590,7 @@ def test_parses_lists_of_dicts(self, query: str, expected: t.Mapping[str, t.Any] pytest.param( "a[]=b&a[]=&a[]=c&a[]", DecodeOptions(strict_null_handling=True, list_limit=0), - {"a": ["b", "", "c", None]}, + {"a": {"0": "b", "1": "", "2": "c", "3": None}}, id="empty-and-strict-null-zero-limit", ), pytest.param("a[]=&a[]=b&a[]=c", None, {"a": ["", "b", "c"]}, id="explicit-empty-first"), @@ -1328,7 +1332,9 @@ def test_current_list_length_calculation(self) -> None: False, id="convert-to-map", ), - pytest.param("a[]=1&a[]=2", DecodeOptions(list_limit=0), {"a": ["1", "2"]}, False, id="zero-list-limit"), + pytest.param( + "a[]=1&a[]=2", DecodeOptions(list_limit=0), {"a": {"0": "1", "1": "2"}}, False, id="zero-list-limit" + ), pytest.param( "a[]=1&a[]=2", DecodeOptions(list_limit=-1, raise_on_limit_exceeded=True), @@ -1680,3 +1686,47 @@ def test_strict_depth_overflow_raises_for_well_formed(self) -> None: def test_unterminated_group_does_not_raise_under_strict_depth(self) -> None: segs = DecodeUtils.split_key_into_segments("a[b[c", allow_dots=False, max_depth=5, strict_depth=True) assert segs == ["a", "[[b[c]"] + + +class TestCVE2024: + def test_dos_attack(self) -> None: + # JS test: + # var arr = []; + # for (var i = 0; i < 105; i++) { + # arr[arr.length] = 'x'; + # } + # var attack = 'a[]=' + arr.join('&a[]='); + # var result = qs.parse(attack, { arrayLimit: 100 }); + # t.notOk(Array.isArray(result.a)) + + arr = ["x"] * 105 + # Construct query: a[]=x&a[]=x... + attack = "a[]=" + "&a[]=".join(arr) + + # list_limit is the python equivalent of arrayLimit + options = DecodeOptions(list_limit=100) + result = decode(attack, options=options) + + assert isinstance(result["a"], dict), "Should be a dict when limit exceeded" + assert isinstance(result["a"], OverflowDict) + assert len(result["a"]) == 105 + assert result["a"]["0"] == "x" + assert result["a"]["104"] == "x" + + def test_array_limit_checks(self) -> None: + # JS patch tests + # st.deepEqual(qs.parse('a[]=b', { arrayLimit: 0 }), { a: { 0: 'b' } }); + assert decode("a[]=b", DecodeOptions(list_limit=0)) == {"a": {"0": "b"}} + + # st.deepEqual(qs.parse('a[]=b&a[]=c', { arrayLimit: 0 }), { a: { 0: 'b', 1: 'c' } }); + assert decode("a[]=b&a[]=c", DecodeOptions(list_limit=0)) == {"a": {"0": "b", "1": "c"}} + + # st.deepEqual(qs.parse('a[]=b&a[]=c', { arrayLimit: 1 }), { a: { 0: 'b', 1: 'c' } }); + assert decode("a[]=b&a[]=c", DecodeOptions(list_limit=1)) == {"a": {"0": "b", "1": "c"}} + + # st.deepEqual(qs.parse('a[]=b&a[]=c&a[]=d', { arrayLimit: 2 }), { a: { 0: 'b', 1: 'c', 2: 'd' } }); + assert decode("a[]=b&a[]=c&a[]=d", DecodeOptions(list_limit=2)) == {"a": {"0": "b", "1": "c", "2": "d"}} + + def test_no_limit_does_not_overflow(self) -> None: + # Verify that within limit it stays a list + assert decode("a[]=b&a[]=c", DecodeOptions(list_limit=2)) == {"a": ["b", "c"]} From 629fdb87053f28c1199ebfc55f21016d869268db Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 11:38:58 +0000 Subject: [PATCH 02/28] :arrow_up: bump qs dependency to version 6.14.1 --- tests/comparison/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/comparison/package.json b/tests/comparison/package.json index bc38bc5..43215c2 100644 --- a/tests/comparison/package.json +++ b/tests/comparison/package.json @@ -5,6 +5,6 @@ "author": "Klemen Tusar", "license": "BSD-3-Clause", "dependencies": { - "qs": "^6.14.0" + "qs": "^6.14.1" } } From 1461a0d17fe8808023171cf28cc3cd6a238d7892 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 12:03:15 +0000 Subject: [PATCH 03/28] refactor: improve list handling in combine function to prevent memory exhaustion --- src/qs_codec/utils/utils.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index ea6e68f..2e73d09 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -355,22 +355,23 @@ def combine( (a dict with numeric keys) to prevent memory exhaustion. """ if Utils.is_overflow(a): - # a is already an OverflowDict. Append b to it at the next numeric index. - # We assume sequential keys; len(a) gives the next index. - a = t.cast(OverflowDict, a) - idx = len(a) + # a is already an OverflowDict. Append b to a *copy* at the next numeric index. + # We assume sequential keys; len(a_copy) gives the next index. + orig_a = t.cast(OverflowDict, a) + a_copy = OverflowDict(orig_a) + idx = len(a_copy) if isinstance(b, (list, tuple)): for item in b: - a[str(idx)] = item + a_copy[str(idx)] = item idx += 1 elif Utils.is_overflow(b): b = t.cast(OverflowDict, b) for item in b.values(): - a[str(idx)] = item + a_copy[str(idx)] = item idx += 1 else: - a[str(idx)] = b - return a + a_copy[str(idx)] = b + return a_copy # Normal combination: flatten lists/tuples res = [*(a if isinstance(a, (list, tuple)) else [a]), *(b if isinstance(b, (list, tuple)) else [b])] From bf56e62b99f4157b075de4978a9d78679076808c Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 12:08:15 +0000 Subject: [PATCH 04/28] feat: add OverflowDict to public API for improved list handling --- src/qs_codec/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/qs_codec/__init__.py b/src/qs_codec/__init__.py index 738deac..503c505 100644 --- a/src/qs_codec/__init__.py +++ b/src/qs_codec/__init__.py @@ -27,6 +27,7 @@ from .models.decode_options import DecodeOptions from .models.encode_options import EncodeOptions from .models.undefined import Undefined +from .utils.utils import OverflowDict # Public API surface re-exported at the package root. @@ -45,4 +46,5 @@ "DecodeOptions", "EncodeOptions", "Undefined", + "OverflowDict", ] From b6fb4926230ff1953ab112303b8234ccd69d7202 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 12:11:00 +0000 Subject: [PATCH 05/28] test: add unit tests for OverflowDict handling in combine function --- tests/unit/utils_test.py | 43 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index e01bf09..5cf10e5 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -9,7 +9,7 @@ from qs_codec.models.undefined import Undefined from qs_codec.utils.decode_utils import DecodeUtils from qs_codec.utils.encode_utils import EncodeUtils -from qs_codec.utils.utils import Utils +from qs_codec.utils.utils import OverflowDict, Utils class TestUtils: @@ -628,6 +628,47 @@ def test_combine_neither_is_an_array(self) -> None: assert b is not combined assert combined == [1, 2] + def test_combine_list_limit_exceeded_creates_overflow_dict(self) -> None: + a = [1] * 10 + b = [2] * 11 + # Total 21 items, default limit is 20 + combined = Utils.combine(a, b) + assert isinstance(combined, OverflowDict) + assert len(combined) == 21 + assert combined["0"] == 1 + assert combined["20"] == 2 + + def test_combine_with_overflow_dict(self) -> None: + a = OverflowDict({"0": "x"}) + b = "y" + combined = Utils.combine(a, b) + assert isinstance(combined, OverflowDict) + assert combined is not a # Check for immutability (copy) + assert combined["0"] == "x" + assert combined["1"] == "y" + assert len(combined) == 2 + + # Verify 'a' was not mutated + assert len(a) == 1 + assert "1" not in a + + def test_combine_options_default(self) -> None: + # Default options should imply list_limit=20 + a = [1] * 20 + b = [2] + combined = Utils.combine(a, b, options=None) + assert isinstance(combined, OverflowDict) + assert len(combined) == 21 + + def test_combine_overflow_dict_with_overflow_dict(self) -> None: + a = OverflowDict({"0": "x"}) + b = OverflowDict({"0": "y"}) + combined = Utils.combine(a, b) + assert isinstance(combined, OverflowDict) + assert combined["0"] == "x" + assert combined["1"] == "y" + assert len(combined) == 2 + def test_compact_removes_undefined_entries_and_avoids_cycles(self) -> None: root: t.Dict[str, t.Any] = { "keep": 1, From ab775d97c18f23cc62b62195584a0993e28ca0b8 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 12:14:38 +0000 Subject: [PATCH 06/28] test: add unit tests for OverflowDict handling in merge function --- tests/unit/utils_test.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index 5cf10e5..102dd22 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -1,5 +1,6 @@ import re import typing as t +from unittest.mock import patch import pytest @@ -877,6 +878,39 @@ def test_encode_string(self): assert EncodeUtils._encode_string("💩", Format.RFC3986) == "%F0%9F%92%A9" assert EncodeUtils._encode_string("A💩B", Format.RFC3986) == "A%F0%9F%92%A9B" + def test_merge_target_is_overflow_dict(self) -> None: + target = OverflowDict({"0": "a"}) + source = "b" + # Should delegate to combine, which appends 'b' at index 1 + result = Utils.merge(target, source) + assert isinstance(result, OverflowDict) + assert result == {"0": "a", "1": "b"} + + def test_merge_source_is_overflow_dict_into_dict(self) -> None: + target = {"a": 1} + source = OverflowDict({"b": 2}) + result = Utils.merge(target, source) + assert isinstance(result, dict) + assert result == {"a": 1, "b": 2} + + def test_merge_source_is_overflow_dict_into_list(self) -> None: + target = ["a"] + # source has key '0', which collides with target's index 0 + source = OverflowDict({"0": "b"}) + result = Utils.merge(target, source) + assert isinstance(result, dict) + # Source overwrites target at key '0' + assert result == {"0": "b"} + + def test_merge_delegates_to_combine_with_options(self) -> None: + target = OverflowDict({"0": "a"}) + source = "b" + options = DecodeOptions(list_limit=50) + + with patch("qs_codec.utils.utils.Utils.combine") as mock_combine: + Utils.merge(target, source, options) + mock_combine.assert_called_once_with(target, source, options) + class TestDecodeUtilsHelpers: def test_dot_to_bracket_preserves_ambiguous_dot_before_closing_bracket(self) -> None: From 7176a1cc4e82009cb27c63adf79dda80759b160f Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 12:18:27 +0000 Subject: [PATCH 07/28] feat: enhance list limit handling in combine function to prevent memory exhaustion --- src/qs_codec/utils/utils.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index 2e73d09..c6b10db 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -353,6 +353,17 @@ def combine( If `list_limit` is exceeded, converts the list to an `OverflowDict` (a dict with numeric keys) to prevent memory exhaustion. + When `options` is provided, its ``list_limit`` controls when a list is + converted into an :class:`OverflowDict` (a dict with numeric keys) to + prevent unbounded growth. If ``options`` is ``None``, a default + ``list_limit`` of ``20`` is used. + A negative ``list_limit`` is treated as "overflow immediately": any + non‑empty combined result will be converted to :class:`OverflowDict` + because ``len(res) > list_limit`` is then always true for ``len(res) >= 0``. + This helper never raises an exception when the limit is exceeded; even + if :class:`DecodeOptions` has ``raise_on_limit_exceeded`` set to + ``True``, ``combine`` will still handle overflow only by converting the + list to :class:`OverflowDict`. """ if Utils.is_overflow(a): # a is already an OverflowDict. Append b to a *copy* at the next numeric index. From 0147c43f24ecf392247d6e756c6fb3dbe7ec53dd Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 12:31:46 +0000 Subject: [PATCH 08/28] feat: improve list limit handling in combine function to prevent memory exhaustion --- src/qs_codec/utils/utils.py | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index c6b10db..b46a6a3 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -349,7 +349,7 @@ def combine( options: t.Optional[DecodeOptions] = None, ) -> t.Union[t.List[t.Any], t.Dict[str, t.Any]]: """ - Concatenate two values, treating non‑sequences as singletons. + Concatenate two values, treating non-sequences as singletons. If `list_limit` is exceeded, converts the list to an `OverflowDict` (a dict with numeric keys) to prevent memory exhaustion. @@ -358,7 +358,7 @@ def combine( prevent unbounded growth. If ``options`` is ``None``, a default ``list_limit`` of ``20`` is used. A negative ``list_limit`` is treated as "overflow immediately": any - non‑empty combined result will be converted to :class:`OverflowDict` + non-empty combined result will be converted to :class:`OverflowDict` because ``len(res) > list_limit`` is then always true for ``len(res) >= 0``. This helper never raises an exception when the limit is exceeded; even if :class:`DecodeOptions` has ``raise_on_limit_exceeded`` set to @@ -370,22 +370,41 @@ def combine( # We assume sequential keys; len(a_copy) gives the next index. orig_a = t.cast(OverflowDict, a) a_copy = OverflowDict(orig_a) - idx = len(a_copy) + # Use max key + 1 to handle sparse dicts safely, rather than len(a) + keys = [int(k) for k in a_copy] + idx = (max(keys) + 1) if keys else 0 + if isinstance(b, (list, tuple)): for item in b: a_copy[str(idx)] = item idx += 1 elif Utils.is_overflow(b): b = t.cast(OverflowDict, b) - for item in b.values(): - a_copy[str(idx)] = item + # Iterate in numeric key order to preserve list semantics + for k in sorted(b.keys(), key=int): + a_copy[str(idx)] = b[k] idx += 1 else: a_copy[str(idx)] = b return a_copy # Normal combination: flatten lists/tuples - res = [*(a if isinstance(a, (list, tuple)) else [a]), *(b if isinstance(b, (list, tuple)) else [b])] + # Flatten a + if isinstance(a, (list, tuple)): + list_a = list(a) + else: + list_a = [a] + + # Flatten b, handling OverflowDict as a list source + if isinstance(b, (list, tuple)): + list_b = list(b) + elif Utils.is_overflow(b): + b_of = t.cast(OverflowDict, b) + list_b = [b_of[k] for k in sorted(b_of.keys(), key=int)] + else: + list_b = [b] + + res = [*list_a, *list_b] list_limit = options.list_limit if options else 20 if len(res) > list_limit: From 17e7d5d2e50b1931e4fb958e68ad1e14d4569c65 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 12:35:19 +0000 Subject: [PATCH 09/28] feat: implement list limit handling in combine function to prevent DoS via memory exhaustion --- src/qs_codec/utils/utils.py | 11 ++++++----- tests/unit/utils_test.py | 37 ++++++++++++++++++++++++++++++------- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index b46a6a3..35e3756 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -180,11 +180,12 @@ def merge( for _el in _iter1: if not isinstance(_el, Undefined): _res.append(_el) - _iter2 = ( - source - if isinstance(source, (list, tuple)) - else (list(source.values()) if Utils.is_overflow(source) else [source]) - ) + if Utils.is_overflow(source): + # Iterate in numeric key order to preserve list semantics + source_of = t.cast(OverflowDict, source) + _iter2 = [source_of[k] for k in sorted(source_of.keys(), key=int)] + else: + _iter2 = [source] for _el in _iter2: if not isinstance(_el, Undefined): _res.append(_el) diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index 102dd22..9b27ce1 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -902,14 +902,37 @@ def test_merge_source_is_overflow_dict_into_list(self) -> None: # Source overwrites target at key '0' assert result == {"0": "b"} - def test_merge_delegates_to_combine_with_options(self) -> None: - target = OverflowDict({"0": "a"}) - source = "b" - options = DecodeOptions(list_limit=50) + def test_combine_sparse_overflow_dict(self) -> None: + # Create an OverflowDict with a sparse key + a = OverflowDict({"999": "a"}) + b = "b" + # Combine should append at index 1000 (max key + 1) + result = Utils.combine(a, b) + assert isinstance(result, OverflowDict) + assert result == {"999": "a", "1000": "b"} + # Verify it uses integer sorting for keys when determining max + assert len(result) == 2 - with patch("qs_codec.utils.utils.Utils.combine") as mock_combine: - Utils.merge(target, source, options) - mock_combine.assert_called_once_with(target, source, options) + def test_merge_target_is_sparse_overflow_dict(self) -> None: + # Merge delegates to combine, so this should also use max key + 1 + target = OverflowDict({"999": "a"}) + source = "b" + result = Utils.merge(target, source) + assert isinstance(result, OverflowDict) + assert result == {"999": "a", "1000": "b"} + + def test_merge_scalar_target_with_sparse_overflow_dict_source(self) -> None: + # Merging OverflowDict source into a scalar target (which becomes a list) + # should flatten the OverflowDict values in numeric key order. + target = "a" + # Insert in reverse order to verify sorting + source = OverflowDict({}) + source["10"] = "c" + source["2"] = "b" + + # Utils.merge should produce [target, *source_values_sorted] + result = Utils.merge(target, source) + assert result == ["a", "b", "c"] class TestDecodeUtilsHelpers: From 5658b1945afeea825402be946be36185cdd56ab3 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 12:38:21 +0000 Subject: [PATCH 10/28] test: add unit tests for combine function with OverflowDict handling --- tests/unit/utils_test.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index 9b27ce1..0f57d1d 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -934,6 +934,26 @@ def test_merge_scalar_target_with_sparse_overflow_dict_source(self) -> None: result = Utils.merge(target, source) assert result == ["a", "b", "c"] + def test_combine_scalar_with_overflow_dict(self) -> None: + # Test for coverage of Utils.combine lines 403-404 + # Case where 'a' is scalar (not overflow) and 'b' is OverflowDict + a = "start" + b = OverflowDict({"1": "y", "0": "x"}) # Unordered to verify sorting + + # Should flatten 'b' into ["x", "y"] and prepend 'a' -> ["start", "x", "y"] + result = Utils.combine(a, b) + assert result == ["start", "x", "y"] + + def test_combine_list_with_overflow_dict(self) -> None: + # Test for coverage of Utils.combine lines 403-404 + # Case where 'a' is list and 'b' is OverflowDict + a = ["start"] + b = OverflowDict({"1": "y", "0": "x"}) + + # Should flatten 'b' into ["x", "y"] and extend 'a' -> ["start", "x", "y"] + result = Utils.combine(a, b) + assert result == ["start", "x", "y"] + class TestDecodeUtilsHelpers: def test_dot_to_bracket_preserves_ambiguous_dot_before_closing_bracket(self) -> None: From 260d11dafa129074d0abf23efd22bc1bd30d9b58 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 12:48:29 +0000 Subject: [PATCH 11/28] refactor: remove unused import from utils_test.py --- tests/unit/utils_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index 0f57d1d..316faa3 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -1,6 +1,5 @@ import re import typing as t -from unittest.mock import patch import pytest From ea95808581379d96d9522554dba18742f65f7b18 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 12:52:53 +0000 Subject: [PATCH 12/28] feat: skip Undefined values in combine function for list and OverflowDict handling --- src/qs_codec/utils/utils.py | 24 ++++++++++++++---------- tests/unit/utils_test.py | 19 +++++++++++++++++++ 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index 35e3756..8c07a48 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -377,33 +377,37 @@ def combine( if isinstance(b, (list, tuple)): for item in b: - a_copy[str(idx)] = item - idx += 1 + if not isinstance(item, Undefined): + a_copy[str(idx)] = item + idx += 1 elif Utils.is_overflow(b): b = t.cast(OverflowDict, b) # Iterate in numeric key order to preserve list semantics for k in sorted(b.keys(), key=int): - a_copy[str(idx)] = b[k] - idx += 1 + val = b[k] + if not isinstance(val, Undefined): + a_copy[str(idx)] = val + idx += 1 else: - a_copy[str(idx)] = b + if not isinstance(b, Undefined): + a_copy[str(idx)] = b return a_copy # Normal combination: flatten lists/tuples # Flatten a if isinstance(a, (list, tuple)): - list_a = list(a) + list_a = [x for x in a if not isinstance(x, Undefined)] else: - list_a = [a] + list_a = [a] if not isinstance(a, Undefined) else [] # Flatten b, handling OverflowDict as a list source if isinstance(b, (list, tuple)): - list_b = list(b) + list_b = [x for x in b if not isinstance(x, Undefined)] elif Utils.is_overflow(b): b_of = t.cast(OverflowDict, b) - list_b = [b_of[k] for k in sorted(b_of.keys(), key=int)] + list_b = [b_of[k] for k in sorted(b_of.keys(), key=int) if not isinstance(b_of[k], Undefined)] else: - list_b = [b] + list_b = [b] if not isinstance(b, Undefined) else [] res = [*list_a, *list_b] diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index 316faa3..af62feb 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -953,6 +953,25 @@ def test_combine_list_with_overflow_dict(self) -> None: result = Utils.combine(a, b) assert result == ["start", "x", "y"] + def test_combine_skips_undefined_in_overflow_dict_append(self) -> None: + a = OverflowDict({"0": "x"}) + b = ["y", Undefined(), "z"] + result = Utils.combine(a, b) + assert isinstance(result, OverflowDict) + assert result == {"0": "x", "1": "y", "2": "z"} + + def test_combine_skips_undefined_in_list_flattening(self) -> None: + a = ["x", Undefined()] + b = [Undefined(), "y"] + result = Utils.combine(a, b) + assert result == ["x", "y"] + + def test_combine_skips_undefined_scalar(self) -> None: + a = ["x"] + b = Undefined() + result = Utils.combine(a, b) + assert result == ["x"] + class TestDecodeUtilsHelpers: def test_dot_to_bracket_preserves_ambiguous_dot_before_closing_bracket(self) -> None: From d32a24b2a7478bc0dbccb7fb7941a0f420ca610c Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 13:15:46 +0000 Subject: [PATCH 13/28] feat: enhance combine function to skip non-numeric keys and handle Undefined values in OverflowDict --- src/qs_codec/utils/utils.py | 30 ++++++++++++++++++++++++------ tests/unit/utils_test.py | 24 ++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 6 deletions(-) diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index 8c07a48..a62daf9 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -183,7 +183,9 @@ def merge( if Utils.is_overflow(source): # Iterate in numeric key order to preserve list semantics source_of = t.cast(OverflowDict, source) - _iter2 = [source_of[k] for k in sorted(source_of.keys(), key=int)] + _iter2 = [ + source_of[k] for _, k in sorted(Utils._numeric_key_pairs(source_of), key=lambda item: item[0]) + ] else: _iter2 = [source] for _el in _iter2: @@ -343,6 +345,18 @@ def is_overflow(obj: t.Any) -> bool: """Check if an object is an OverflowDict.""" return isinstance(obj, OverflowDict) + @staticmethod + def _numeric_key_pairs(mapping: t.Mapping[t.Any, t.Any]) -> t.List[t.Tuple[int, t.Any]]: + """Return (numeric_key, original_key) for keys that coerce to int.""" + pairs: t.List[t.Tuple[int, t.Any]] = [] + for key in mapping.keys(): + try: + numeric_key = int(key) + except (TypeError, ValueError): + continue + pairs.append((numeric_key, key)) + return pairs + @staticmethod def combine( a: t.Union[t.List[t.Any], t.Tuple[t.Any], t.Any], @@ -370,10 +384,10 @@ def combine( # a is already an OverflowDict. Append b to a *copy* at the next numeric index. # We assume sequential keys; len(a_copy) gives the next index. orig_a = t.cast(OverflowDict, a) - a_copy = OverflowDict(orig_a) + a_copy = OverflowDict({k: v for k, v in orig_a.items() if not isinstance(v, Undefined)}) # Use max key + 1 to handle sparse dicts safely, rather than len(a) - keys = [int(k) for k in a_copy] - idx = (max(keys) + 1) if keys else 0 + key_pairs = Utils._numeric_key_pairs(a_copy) + idx = (max(key for key, _ in key_pairs) + 1) if key_pairs else 0 if isinstance(b, (list, tuple)): for item in b: @@ -383,7 +397,7 @@ def combine( elif Utils.is_overflow(b): b = t.cast(OverflowDict, b) # Iterate in numeric key order to preserve list semantics - for k in sorted(b.keys(), key=int): + for _, k in sorted(Utils._numeric_key_pairs(b), key=lambda item: item[0]): val = b[k] if not isinstance(val, Undefined): a_copy[str(idx)] = val @@ -405,7 +419,11 @@ def combine( list_b = [x for x in b if not isinstance(x, Undefined)] elif Utils.is_overflow(b): b_of = t.cast(OverflowDict, b) - list_b = [b_of[k] for k in sorted(b_of.keys(), key=int) if not isinstance(b_of[k], Undefined)] + list_b = [ + b_of[k] + for _, k in sorted(Utils._numeric_key_pairs(b_of), key=lambda item: item[0]) + if not isinstance(b_of[k], Undefined) + ] else: list_b = [b] if not isinstance(b, Undefined) else [] diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index af62feb..e68247f 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -972,6 +972,30 @@ def test_combine_skips_undefined_scalar(self) -> None: result = Utils.combine(a, b) assert result == ["x"] + def test_combine_overflow_dict_skips_existing_undefined_and_ignores_non_numeric_keys_for_index(self) -> None: + a = OverflowDict({"0": "x", "skip": "keep", "1": Undefined()}) + b = "y" + result = Utils.combine(a, b) + assert isinstance(result, OverflowDict) + assert result["0"] == "x" + assert result["1"] == "y" + assert result["skip"] == "keep" + assert "1" in a # Original should remain unchanged + + def test_combine_overflow_dict_source_skips_non_numeric_keys(self) -> None: + a = OverflowDict({"0": "x"}) + b = OverflowDict({"foo": "bar", "1": "y", "0": "z"}) + result = Utils.combine(a, b) + assert isinstance(result, OverflowDict) + assert result == {"0": "x", "1": "z", "2": "y"} + assert "foo" not in result + + def test_merge_overflow_dict_source_skips_non_numeric_keys(self) -> None: + target = "a" + source = OverflowDict({"foo": "skip", "1": "b"}) + result = Utils.merge(target, source) + assert result == ["a", "b"] + class TestDecodeUtilsHelpers: def test_dot_to_bracket_preserves_ambiguous_dot_before_closing_bracket(self) -> None: From da9341d48d19d9f5fb9f8f401f739484b8801ada Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 13:59:28 +0000 Subject: [PATCH 14/28] test: add tests for list limit handling in combine function to ensure OverflowDict creation --- tests/unit/utils_test.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index e68247f..313a860 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -638,6 +638,18 @@ def test_combine_list_limit_exceeded_creates_overflow_dict(self) -> None: assert combined["0"] == 1 assert combined["20"] == 2 + def test_combine_list_limit_zero_creates_overflow_dict(self) -> None: + options = DecodeOptions(list_limit=0) + combined = Utils.combine(["a"], [], options) + assert isinstance(combined, OverflowDict) + assert combined == {"0": "a"} + + def test_combine_negative_list_limit_overflows_non_empty(self) -> None: + options = DecodeOptions(list_limit=-1) + combined = Utils.combine([], ["a"], options) + assert isinstance(combined, OverflowDict) + assert combined == {"0": "a"} + def test_combine_with_overflow_dict(self) -> None: a = OverflowDict({"0": "x"}) b = "y" From fc20217d137b7fe85fde071677d7876176ce7b13 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 14:19:43 +0000 Subject: [PATCH 15/28] feat: implement list limit handling in combine function to prevent DoS via memory exhaustion --- src/qs_codec/utils/utils.py | 13 +++++++++++++ tests/unit/decode_test.py | 5 ++++- tests/unit/utils_test.py | 8 +++++--- 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index a62daf9..3eb5b75 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -175,6 +175,19 @@ def merge( **source, } + if Utils.is_overflow(source): + source_of = t.cast(OverflowDict, source) + result = OverflowDict() + offset = 0 + if not isinstance(target, Undefined): + result["0"] = target + offset = 1 + for numeric_key, key in sorted(Utils._numeric_key_pairs(source_of), key=lambda item: item[0]): + val = source_of[key] + if not isinstance(val, Undefined): + result[str(numeric_key + offset)] = val + return result + _res: t.List[t.Any] = [] _iter1 = target if isinstance(target, (list, tuple)) else [target] for _el in _iter1: diff --git a/tests/unit/decode_test.py b/tests/unit/decode_test.py index eb2e430..a5e338e 100644 --- a/tests/unit/decode_test.py +++ b/tests/unit/decode_test.py @@ -329,7 +329,10 @@ def test_parses_an_explicit_list(self, query: str, expected: t.Dict) -> None: "a=b&a[1]=c", DecodeOptions(list_limit=20), {"a": ["b", "c"]}, id="simple-and-indexed-with-limit" ), pytest.param( - "a=b&a[]=c", DecodeOptions(list_limit=0), {"a": ["b", "c"]}, id="simple-and-explicit-zero-limit" + "a=b&a[]=c", + DecodeOptions(list_limit=0), + {"a": {"0": "b", "1": "c"}}, + id="simple-and-explicit-zero-limit", ), pytest.param("a=b&a[]=c", None, {"a": ["b", "c"]}, id="simple-and-explicit-default"), ], diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index 313a860..fc596be 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -934,7 +934,7 @@ def test_merge_target_is_sparse_overflow_dict(self) -> None: def test_merge_scalar_target_with_sparse_overflow_dict_source(self) -> None: # Merging OverflowDict source into a scalar target (which becomes a list) - # should flatten the OverflowDict values in numeric key order. + # should preserve overflow semantics and shift numeric indices by 1. target = "a" # Insert in reverse order to verify sorting source = OverflowDict({}) @@ -943,7 +943,8 @@ def test_merge_scalar_target_with_sparse_overflow_dict_source(self) -> None: # Utils.merge should produce [target, *source_values_sorted] result = Utils.merge(target, source) - assert result == ["a", "b", "c"] + assert isinstance(result, OverflowDict) + assert result == {"0": "a", "3": "b", "11": "c"} def test_combine_scalar_with_overflow_dict(self) -> None: # Test for coverage of Utils.combine lines 403-404 @@ -1006,7 +1007,8 @@ def test_merge_overflow_dict_source_skips_non_numeric_keys(self) -> None: target = "a" source = OverflowDict({"foo": "skip", "1": "b"}) result = Utils.merge(target, source) - assert result == ["a", "b"] + assert isinstance(result, OverflowDict) + assert result == {"0": "a", "2": "b"} class TestDecodeUtilsHelpers: From 25c40559244f2c9a41b730ff3d4328ffc0f36581 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 14:32:12 +0000 Subject: [PATCH 16/28] feat: optimize overflow handling in combine function by sorting numeric keys --- src/qs_codec/utils/utils.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index 3eb5b75..9b3caf1 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -177,13 +177,14 @@ def merge( if Utils.is_overflow(source): source_of = t.cast(OverflowDict, source) + sorted_pairs = sorted(Utils._numeric_key_pairs(source_of), key=lambda item: item[0]) + overflow_values = [source_of[k] for _, k in sorted_pairs] result = OverflowDict() offset = 0 if not isinstance(target, Undefined): result["0"] = target offset = 1 - for numeric_key, key in sorted(Utils._numeric_key_pairs(source_of), key=lambda item: item[0]): - val = source_of[key] + for (numeric_key, _), val in zip(sorted_pairs, overflow_values): if not isinstance(val, Undefined): result[str(numeric_key + offset)] = val return result @@ -193,14 +194,7 @@ def merge( for _el in _iter1: if not isinstance(_el, Undefined): _res.append(_el) - if Utils.is_overflow(source): - # Iterate in numeric key order to preserve list semantics - source_of = t.cast(OverflowDict, source) - _iter2 = [ - source_of[k] for _, k in sorted(Utils._numeric_key_pairs(source_of), key=lambda item: item[0]) - ] - else: - _iter2 = [source] + _iter2 = [source] for _el in _iter2: if not isinstance(_el, Undefined): _res.append(_el) From 685e927762f588dab9f45dd5d15876028c04d4f4 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 14:54:00 +0000 Subject: [PATCH 17/28] feat: preserve non-numeric keys in merge function of OverflowDict --- src/qs_codec/utils/utils.py | 5 +++++ tests/unit/utils_test.py | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index 9b3caf1..226e8da 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -179,6 +179,8 @@ def merge( source_of = t.cast(OverflowDict, source) sorted_pairs = sorted(Utils._numeric_key_pairs(source_of), key=lambda item: item[0]) overflow_values = [source_of[k] for _, k in sorted_pairs] + numeric_keys = {key for _, key in sorted_pairs} + non_numeric_items = [(key, val) for key, val in source_of.items() if key not in numeric_keys] result = OverflowDict() offset = 0 if not isinstance(target, Undefined): @@ -187,6 +189,9 @@ def merge( for (numeric_key, _), val in zip(sorted_pairs, overflow_values): if not isinstance(val, Undefined): result[str(numeric_key + offset)] = val + for key, val in non_numeric_items: + if not isinstance(val, Undefined): + result[key] = val return result _res: t.List[t.Any] = [] diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index fc596be..58865a5 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -1003,12 +1003,12 @@ def test_combine_overflow_dict_source_skips_non_numeric_keys(self) -> None: assert result == {"0": "x", "1": "z", "2": "y"} assert "foo" not in result - def test_merge_overflow_dict_source_skips_non_numeric_keys(self) -> None: + def test_merge_overflow_dict_source_preserves_non_numeric_keys(self) -> None: target = "a" source = OverflowDict({"foo": "skip", "1": "b"}) result = Utils.merge(target, source) assert isinstance(result, OverflowDict) - assert result == {"0": "a", "2": "b"} + assert result == {"0": "a", "2": "b", "foo": "skip"} class TestDecodeUtilsHelpers: From ab9adb40d30ea8bb519f606aa92c381f303cdad9 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 17:46:14 +0000 Subject: [PATCH 18/28] feat: update merge function to return OverflowDict when merging with OverflowDict --- src/qs_codec/utils/utils.py | 4 +++- tests/unit/utils_test.py | 7 +++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index 226e8da..0bbaacb 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -206,16 +206,18 @@ def merge( return _res # Prepare a mutable copy of the target we can merge into. + is_overflow_target = Utils.is_overflow(target) merge_target: t.Dict[str, t.Any] = copy.deepcopy(target if isinstance(target, dict) else dict(target)) # For overlapping keys, merge recursively; otherwise, take the new value. - return { + merged = { **merge_target, **{ str(key): Utils.merge(merge_target[key], value, options) if key in merge_target else value for key, value in source.items() }, } + return OverflowDict(merged) if is_overflow_target else merged @staticmethod def compact(root: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index 58865a5..d8ed879 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -913,6 +913,13 @@ def test_merge_source_is_overflow_dict_into_list(self) -> None: # Source overwrites target at key '0' assert result == {"0": "b"} + def test_merge_overflow_dict_with_mapping_preserves_overflow(self) -> None: + target = OverflowDict({"0": "a"}) + source = {"foo": "bar"} + result = Utils.merge(target, source) + assert isinstance(result, OverflowDict) + assert result == {"0": "a", "foo": "bar"} + def test_combine_sparse_overflow_dict(self) -> None: # Create an OverflowDict with a sparse key a = OverflowDict({"999": "a"}) From ceef2eec9d67959a76e362e8a225ae54530e0616 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 18:01:00 +0000 Subject: [PATCH 19/28] feat: refactor merge logic in Utils to improve handling of overlapping keys --- src/qs_codec/utils/utils.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index 0bbaacb..e6db827 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -210,12 +210,16 @@ def merge( merge_target: t.Dict[str, t.Any] = copy.deepcopy(target if isinstance(target, dict) else dict(target)) # For overlapping keys, merge recursively; otherwise, take the new value. + merged_updates: t.Dict[str, t.Any] = {} + for key, value in source.items(): + normalized_key = str(key) + if normalized_key in merge_target: + merged_updates[normalized_key] = Utils.merge(merge_target[normalized_key], value, options) + else: + merged_updates[normalized_key] = value merged = { **merge_target, - **{ - str(key): Utils.merge(merge_target[key], value, options) if key in merge_target else value - for key, value in source.items() - }, + **merged_updates, } return OverflowDict(merged) if is_overflow_target else merged From 50c930b49f13e54b95d9a83d405cc03901827c61 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 18:04:27 +0000 Subject: [PATCH 20/28] feat: enhance combine function to handle list limits and prevent memory exhaustion --- src/qs_codec/utils/utils.py | 17 +++++++++++------ tests/unit/utils_test.py | 18 +++++++++--------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index e6db827..7941ac7 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -29,7 +29,7 @@ class OverflowDict(dict): - """A dictionary subclass used to mark objects that have been converted from lists due to the `list_limit` being exceeded.""" + """A mutable marker for list overflows when `list_limit` is exceeded.""" pass @@ -178,18 +178,19 @@ def merge( if Utils.is_overflow(source): source_of = t.cast(OverflowDict, source) sorted_pairs = sorted(Utils._numeric_key_pairs(source_of), key=lambda item: item[0]) - overflow_values = [source_of[k] for _, k in sorted_pairs] numeric_keys = {key for _, key in sorted_pairs} - non_numeric_items = [(key, val) for key, val in source_of.items() if key not in numeric_keys] result = OverflowDict() offset = 0 if not isinstance(target, Undefined): result["0"] = target offset = 1 - for (numeric_key, _), val in zip(sorted_pairs, overflow_values): + for numeric_key, key in sorted_pairs: + val = source_of[key] if not isinstance(val, Undefined): result[str(numeric_key + offset)] = val - for key, val in non_numeric_items: + for key, val in source_of.items(): + if key in numeric_keys: + continue if not isinstance(val, Undefined): result[key] = val return result @@ -365,7 +366,11 @@ def is_overflow(obj: t.Any) -> bool: @staticmethod def _numeric_key_pairs(mapping: t.Mapping[t.Any, t.Any]) -> t.List[t.Tuple[int, t.Any]]: - """Return (numeric_key, original_key) for keys that coerce to int.""" + """Return (numeric_key, original_key) for keys that coerce to int. + + Note: distinct keys like "01" and "1" both coerce to 1; downstream merges + may overwrite earlier values when materializing numeric-keyed dicts. + """ pairs: t.List[t.Tuple[int, t.Any]] = [] for key in mapping.keys(): try: diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index d8ed879..c2f2feb 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -629,14 +629,14 @@ def test_combine_neither_is_an_array(self) -> None: assert combined == [1, 2] def test_combine_list_limit_exceeded_creates_overflow_dict(self) -> None: - a = [1] * 10 - b = [2] * 11 - # Total 21 items, default limit is 20 + default_limit = DecodeOptions().list_limit + a = [1] * max(1, default_limit) + b = [2] combined = Utils.combine(a, b) assert isinstance(combined, OverflowDict) - assert len(combined) == 21 + assert len(combined) == len(a) + len(b) assert combined["0"] == 1 - assert combined["20"] == 2 + assert combined[str(len(a) + len(b) - 1)] == 2 def test_combine_list_limit_zero_creates_overflow_dict(self) -> None: options = DecodeOptions(list_limit=0) @@ -655,7 +655,7 @@ def test_combine_with_overflow_dict(self) -> None: b = "y" combined = Utils.combine(a, b) assert isinstance(combined, OverflowDict) - assert combined is not a # Check for immutability (copy) + assert combined is not a # Check for copy-on-write (no mutation) assert combined["0"] == "x" assert combined["1"] == "y" assert len(combined) == 2 @@ -665,12 +665,12 @@ def test_combine_with_overflow_dict(self) -> None: assert "1" not in a def test_combine_options_default(self) -> None: - # Default options should imply list_limit=20 - a = [1] * 20 + default_limit = DecodeOptions().list_limit + a = [1] * max(0, default_limit) b = [2] combined = Utils.combine(a, b, options=None) assert isinstance(combined, OverflowDict) - assert len(combined) == 21 + assert len(combined) == len(a) + len(b) def test_combine_overflow_dict_with_overflow_dict(self) -> None: a = OverflowDict({"0": "x"}) From 9a79f235e883369cebab3bb11a137f0fea5e4591 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 18:11:14 +0000 Subject: [PATCH 21/28] feat: add offset handling in combine function to ensure correct index placement --- src/qs_codec/utils/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index 7941ac7..fc25253 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -187,6 +187,7 @@ def merge( for numeric_key, key in sorted_pairs: val = source_of[key] if not isinstance(val, Undefined): + # Offset ensures target occupies index "0"; source indices shift up by 1 result[str(numeric_key + offset)] = val for key, val in source_of.items(): if key in numeric_keys: From 9761e4920fa22ac44d0476bee2ca24a1ea4939f3 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 18:14:15 +0000 Subject: [PATCH 22/28] feat: remove OverflowDict from public API to streamline package exports --- src/qs_codec/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/qs_codec/__init__.py b/src/qs_codec/__init__.py index 503c505..738deac 100644 --- a/src/qs_codec/__init__.py +++ b/src/qs_codec/__init__.py @@ -27,7 +27,6 @@ from .models.decode_options import DecodeOptions from .models.encode_options import EncodeOptions from .models.undefined import Undefined -from .utils.utils import OverflowDict # Public API surface re-exported at the package root. @@ -46,5 +45,4 @@ "DecodeOptions", "EncodeOptions", "Undefined", - "OverflowDict", ] From aa8ca11ef034eafb80dabbe18680523735715955 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 18:15:48 +0000 Subject: [PATCH 23/28] feat: update merge function to prefer exact key matches over string normalization --- src/qs_codec/utils/utils.py | 9 ++++++--- tests/unit/utils_test.py | 12 ++++++++++++ 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index fc25253..781cdd7 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -212,13 +212,16 @@ def merge( merge_target: t.Dict[str, t.Any] = copy.deepcopy(target if isinstance(target, dict) else dict(target)) # For overlapping keys, merge recursively; otherwise, take the new value. - merged_updates: t.Dict[str, t.Any] = {} + merged_updates: t.Dict[t.Any, t.Any] = {} + # Prefer exact key matches; fall back to string normalization only when needed. for key, value in source.items(): normalized_key = str(key) - if normalized_key in merge_target: + if key in merge_target: + merged_updates[key] = Utils.merge(merge_target[key], value, options) + elif normalized_key in merge_target: merged_updates[normalized_key] = Utils.merge(merge_target[normalized_key], value, options) else: - merged_updates[normalized_key] = value + merged_updates[key] = value merged = { **merge_target, **merged_updates, diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index c2f2feb..b91097f 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -920,6 +920,18 @@ def test_merge_overflow_dict_with_mapping_preserves_overflow(self) -> None: assert isinstance(result, OverflowDict) assert result == {"0": "a", "foo": "bar"} + def test_merge_prefers_exact_key_match_before_string_normalization(self) -> None: + target = {1: {"a": "x"}} + source = {1: {"b": "y"}} + result = Utils.merge(target, source) + assert result == {1: {"a": "x", "b": "y"}} + + target = {"1": {"a": "x"}} + source = {1: {"b": "y"}} + result = Utils.merge(target, source) + assert result == {"1": {"a": "x", "b": "y"}} + assert 1 not in result + def test_combine_sparse_overflow_dict(self) -> None: # Create an OverflowDict with a sparse key a = OverflowDict({"999": "a"}) From efdd86bcf21c1fc3311d334c13e8e5318fa16d2d Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 18:17:11 +0000 Subject: [PATCH 24/28] feat: add tests for list limit handling in combine function to prevent overflow --- tests/unit/utils_test.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index b91097f..6af95df 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -650,6 +650,17 @@ def test_combine_negative_list_limit_overflows_non_empty(self) -> None: assert isinstance(combined, OverflowDict) assert combined == {"0": "a"} + def test_combine_at_list_limit_stays_list(self) -> None: + options = DecodeOptions(list_limit=2) + combined = Utils.combine(["a"], ["b"], options) + assert combined == ["a", "b"] + + def test_combine_negative_list_limit_with_empty_result_overflows(self) -> None: + options = DecodeOptions(list_limit=-1) + combined = Utils.combine([], [], options) + assert isinstance(combined, OverflowDict) + assert combined == {} + def test_combine_with_overflow_dict(self) -> None: a = OverflowDict({"0": "x"}) b = "y" From 2984d57f8da19854566f6409a9f6a362b7de31fd Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 18:21:15 +0000 Subject: [PATCH 25/28] feat: implement OverflowDict to handle list limit conversions and preserve overflow markers --- src/qs_codec/decode.py | 3 ++- src/qs_codec/models/overflow_dict.py | 15 +++++++++++++++ src/qs_codec/utils/utils.py | 7 +------ tests/unit/decode_test.py | 2 +- tests/unit/utils_test.py | 14 +++++++++++++- 5 files changed, 32 insertions(+), 9 deletions(-) create mode 100644 src/qs_codec/models/overflow_dict.py diff --git a/src/qs_codec/decode.py b/src/qs_codec/decode.py index d169e11..0d75e47 100644 --- a/src/qs_codec/decode.py +++ b/src/qs_codec/decode.py @@ -25,9 +25,10 @@ from .enums.duplicates import Duplicates from .enums.sentinel import Sentinel from .models.decode_options import DecodeOptions +from .models.overflow_dict import OverflowDict from .models.undefined import UNDEFINED from .utils.decode_utils import DecodeUtils -from .utils.utils import OverflowDict, Utils +from .utils.utils import Utils def decode( diff --git a/src/qs_codec/models/overflow_dict.py b/src/qs_codec/models/overflow_dict.py new file mode 100644 index 0000000..32be191 --- /dev/null +++ b/src/qs_codec/models/overflow_dict.py @@ -0,0 +1,15 @@ +"""Overflow marker for list limit conversions.""" + +from __future__ import annotations + + +class OverflowDict(dict): + """A mutable marker for list overflows when `list_limit` is exceeded.""" + + def copy(self) -> "OverflowDict": + """Return an OverflowDict copy to preserve the overflow marker.""" + return OverflowDict(super().copy()) + + def __copy__(self) -> "OverflowDict": + """Return an OverflowDict copy to preserve the overflow marker.""" + return OverflowDict(super().copy()) diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index 781cdd7..9526e4e 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -25,15 +25,10 @@ from enum import Enum from ..models.decode_options import DecodeOptions +from ..models.overflow_dict import OverflowDict from ..models.undefined import Undefined -class OverflowDict(dict): - """A mutable marker for list overflows when `list_limit` is exceeded.""" - - pass - - class Utils: """ Namespace container for stateless utility routines. diff --git a/tests/unit/decode_test.py b/tests/unit/decode_test.py index a5e338e..6fbeba4 100644 --- a/tests/unit/decode_test.py +++ b/tests/unit/decode_test.py @@ -9,8 +9,8 @@ from qs_codec import Charset, DecodeOptions, Duplicates, decode, load, loads from qs_codec.decode import _parse_object from qs_codec.enums.decode_kind import DecodeKind +from qs_codec.models.overflow_dict import OverflowDict from qs_codec.utils.decode_utils import DecodeUtils -from qs_codec.utils.utils import OverflowDict class TestDecode: diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index 6af95df..9d5f5d1 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -1,3 +1,4 @@ +import copy import re import typing as t @@ -6,10 +7,11 @@ from qs_codec.enums.charset import Charset from qs_codec.enums.format import Format from qs_codec.models.decode_options import DecodeOptions +from qs_codec.models.overflow_dict import OverflowDict from qs_codec.models.undefined import Undefined from qs_codec.utils.decode_utils import DecodeUtils from qs_codec.utils.encode_utils import EncodeUtils -from qs_codec.utils.utils import OverflowDict, Utils +from qs_codec.utils.utils import Utils class TestUtils: @@ -943,6 +945,16 @@ def test_merge_prefers_exact_key_match_before_string_normalization(self) -> None assert result == {"1": {"a": "x", "b": "y"}} assert 1 not in result + def test_overflow_dict_copy_preserves_type(self) -> None: + target = OverflowDict({"0": "a"}) + result = target.copy() + assert isinstance(result, OverflowDict) + assert result == {"0": "a"} + + shallow = copy.copy(target) + assert isinstance(shallow, OverflowDict) + assert shallow == {"0": "a"} + def test_combine_sparse_overflow_dict(self) -> None: # Create an OverflowDict with a sparse key a = OverflowDict({"999": "a"}) From 74096fb6de4e01644fb677891b4b5768d283d7e1 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 18:23:52 +0000 Subject: [PATCH 26/28] refactor: move _numeric_key_pairs function to top-level scope for better accessibility --- src/qs_codec/utils/utils.py | 40 ++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index 9526e4e..88c6379 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -29,6 +29,22 @@ from ..models.undefined import Undefined +def _numeric_key_pairs(mapping: t.Mapping[t.Any, t.Any]) -> t.List[t.Tuple[int, t.Any]]: + """Return (numeric_key, original_key) for keys that coerce to int. + + Note: distinct keys like "01" and "1" both coerce to 1; downstream merges + may overwrite earlier values when materializing numeric-keyed dicts. + """ + pairs: t.List[t.Tuple[int, t.Any]] = [] + for key in mapping.keys(): + try: + numeric_key = int(key) + except (TypeError, ValueError): + continue + pairs.append((numeric_key, key)) + return pairs + + class Utils: """ Namespace container for stateless utility routines. @@ -172,7 +188,7 @@ def merge( if Utils.is_overflow(source): source_of = t.cast(OverflowDict, source) - sorted_pairs = sorted(Utils._numeric_key_pairs(source_of), key=lambda item: item[0]) + sorted_pairs = sorted(_numeric_key_pairs(source_of), key=lambda item: item[0]) numeric_keys = {key for _, key in sorted_pairs} result = OverflowDict() offset = 0 @@ -363,22 +379,6 @@ def is_overflow(obj: t.Any) -> bool: """Check if an object is an OverflowDict.""" return isinstance(obj, OverflowDict) - @staticmethod - def _numeric_key_pairs(mapping: t.Mapping[t.Any, t.Any]) -> t.List[t.Tuple[int, t.Any]]: - """Return (numeric_key, original_key) for keys that coerce to int. - - Note: distinct keys like "01" and "1" both coerce to 1; downstream merges - may overwrite earlier values when materializing numeric-keyed dicts. - """ - pairs: t.List[t.Tuple[int, t.Any]] = [] - for key in mapping.keys(): - try: - numeric_key = int(key) - except (TypeError, ValueError): - continue - pairs.append((numeric_key, key)) - return pairs - @staticmethod def combine( a: t.Union[t.List[t.Any], t.Tuple[t.Any], t.Any], @@ -408,7 +408,7 @@ def combine( orig_a = t.cast(OverflowDict, a) a_copy = OverflowDict({k: v for k, v in orig_a.items() if not isinstance(v, Undefined)}) # Use max key + 1 to handle sparse dicts safely, rather than len(a) - key_pairs = Utils._numeric_key_pairs(a_copy) + key_pairs = _numeric_key_pairs(a_copy) idx = (max(key for key, _ in key_pairs) + 1) if key_pairs else 0 if isinstance(b, (list, tuple)): @@ -419,7 +419,7 @@ def combine( elif Utils.is_overflow(b): b = t.cast(OverflowDict, b) # Iterate in numeric key order to preserve list semantics - for _, k in sorted(Utils._numeric_key_pairs(b), key=lambda item: item[0]): + for _, k in sorted(_numeric_key_pairs(b), key=lambda item: item[0]): val = b[k] if not isinstance(val, Undefined): a_copy[str(idx)] = val @@ -443,7 +443,7 @@ def combine( b_of = t.cast(OverflowDict, b) list_b = [ b_of[k] - for _, k in sorted(Utils._numeric_key_pairs(b_of), key=lambda item: item[0]) + for _, k in sorted(_numeric_key_pairs(b_of), key=lambda item: item[0]) if not isinstance(b_of[k], Undefined) ] else: From e7d24611b735675cb7b057ae9b98db4f59ce831f Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 18:26:49 +0000 Subject: [PATCH 27/28] feat: update combine function to handle negative list limits correctly and prevent overflow with empty results --- src/qs_codec/utils/utils.py | 11 ++++++----- tests/unit/utils_test.py | 5 ++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/qs_codec/utils/utils.py b/src/qs_codec/utils/utils.py index 88c6379..f4f6f0c 100644 --- a/src/qs_codec/utils/utils.py +++ b/src/qs_codec/utils/utils.py @@ -392,11 +392,10 @@ def combine( (a dict with numeric keys) to prevent memory exhaustion. When `options` is provided, its ``list_limit`` controls when a list is converted into an :class:`OverflowDict` (a dict with numeric keys) to - prevent unbounded growth. If ``options`` is ``None``, a default - ``list_limit`` of ``20`` is used. + prevent unbounded growth. If ``options`` is ``None``, the default + ``list_limit`` from :class:`DecodeOptions` is used. A negative ``list_limit`` is treated as "overflow immediately": any - non-empty combined result will be converted to :class:`OverflowDict` - because ``len(res) > list_limit`` is then always true for ``len(res) >= 0``. + non-empty combined result will be converted to :class:`OverflowDict`. This helper never raises an exception when the limit is exceeded; even if :class:`DecodeOptions` has ``raise_on_limit_exceeded`` set to ``True``, ``combine`` will still handle overflow only by converting the @@ -451,7 +450,9 @@ def combine( res = [*list_a, *list_b] - list_limit = options.list_limit if options else 20 + list_limit = options.list_limit if options else DecodeOptions().list_limit + if list_limit < 0: + return OverflowDict({str(i): x for i, x in enumerate(res)}) if res else res if len(res) > list_limit: # Convert to OverflowDict return OverflowDict({str(i): x for i, x in enumerate(res)}) diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index 9d5f5d1..7e0d9a4 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -657,11 +657,10 @@ def test_combine_at_list_limit_stays_list(self) -> None: combined = Utils.combine(["a"], ["b"], options) assert combined == ["a", "b"] - def test_combine_negative_list_limit_with_empty_result_overflows(self) -> None: + def test_combine_negative_list_limit_with_empty_result_stays_list(self) -> None: options = DecodeOptions(list_limit=-1) combined = Utils.combine([], [], options) - assert isinstance(combined, OverflowDict) - assert combined == {} + assert combined == [] def test_combine_with_overflow_dict(self) -> None: a = OverflowDict({"0": "x"}) From 1d6e43c94690c845eb61967d0196c9b409e1b3eb Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 11 Jan 2026 18:36:03 +0000 Subject: [PATCH 28/28] feat: implement deepcopy for OverflowDict to preserve overflow markers --- src/qs_codec/models/overflow_dict.py | 10 ++++++++++ tests/unit/utils_test.py | 4 ++++ 2 files changed, 14 insertions(+) diff --git a/src/qs_codec/models/overflow_dict.py b/src/qs_codec/models/overflow_dict.py index 32be191..96ca37a 100644 --- a/src/qs_codec/models/overflow_dict.py +++ b/src/qs_codec/models/overflow_dict.py @@ -2,6 +2,8 @@ from __future__ import annotations +import copy + class OverflowDict(dict): """A mutable marker for list overflows when `list_limit` is exceeded.""" @@ -13,3 +15,11 @@ def copy(self) -> "OverflowDict": def __copy__(self) -> "OverflowDict": """Return an OverflowDict copy to preserve the overflow marker.""" return OverflowDict(super().copy()) + + def __deepcopy__(self, memo: dict[int, object]) -> "OverflowDict": + """Return an OverflowDict deepcopy to preserve the overflow marker.""" + copied = OverflowDict() + memo[id(self)] = copied + for key, value in self.items(): + copied[copy.deepcopy(key, memo)] = copy.deepcopy(value, memo) + return copied diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index 7e0d9a4..36242ba 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -954,6 +954,10 @@ def test_overflow_dict_copy_preserves_type(self) -> None: assert isinstance(shallow, OverflowDict) assert shallow == {"0": "a"} + deep = copy.deepcopy(target) + assert isinstance(deep, OverflowDict) + assert deep == {"0": "a"} + def test_combine_sparse_overflow_dict(self) -> None: # Create an OverflowDict with a sparse key a = OverflowDict({"999": "a"})