Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
59759fe
:bug: implement list limit handling in combine function to prevent Do…
techouse Jan 11, 2026
629fdb8
:arrow_up: bump qs dependency to version 6.14.1
techouse Jan 11, 2026
1461a0d
refactor: improve list handling in combine function to prevent memory…
techouse Jan 11, 2026
bf56e62
feat: add OverflowDict to public API for improved list handling
techouse Jan 11, 2026
b6fb492
test: add unit tests for OverflowDict handling in combine function
techouse Jan 11, 2026
ab775d9
test: add unit tests for OverflowDict handling in merge function
techouse Jan 11, 2026
7176a1c
feat: enhance list limit handling in combine function to prevent memo…
techouse Jan 11, 2026
0147c43
feat: improve list limit handling in combine function to prevent memo…
techouse Jan 11, 2026
17e7d5d
feat: implement list limit handling in combine function to prevent Do…
techouse Jan 11, 2026
5658b19
test: add unit tests for combine function with OverflowDict handling
techouse Jan 11, 2026
260d11d
refactor: remove unused import from utils_test.py
techouse Jan 11, 2026
ea95808
feat: skip Undefined values in combine function for list and Overflow…
techouse Jan 11, 2026
d32a24b
feat: enhance combine function to skip non-numeric keys and handle Un…
techouse Jan 11, 2026
da9341d
test: add tests for list limit handling in combine function to ensure…
techouse Jan 11, 2026
fc20217
feat: implement list limit handling in combine function to prevent Do…
techouse Jan 11, 2026
25c4055
feat: optimize overflow handling in combine function by sorting numer…
techouse Jan 11, 2026
685e927
feat: preserve non-numeric keys in merge function of OverflowDict
techouse Jan 11, 2026
ab9adb4
feat: update merge function to return OverflowDict when merging with …
techouse Jan 11, 2026
ceef2ee
feat: refactor merge logic in Utils to improve handling of overlappin…
techouse Jan 11, 2026
50c930b
feat: enhance combine function to handle list limits and prevent memo…
techouse Jan 11, 2026
9a79f23
feat: add offset handling in combine function to ensure correct index…
techouse Jan 11, 2026
9761e49
feat: remove OverflowDict from public API to streamline package exports
techouse Jan 11, 2026
aa8ca11
feat: update merge function to prefer exact key matches over string n…
techouse Jan 11, 2026
efdd86b
feat: add tests for list limit handling in combine function to preven…
techouse Jan 11, 2026
2984d57
feat: implement OverflowDict to handle list limit conversions and pre…
techouse Jan 11, 2026
74096fb
refactor: move _numeric_key_pairs function to top-level scope for bet…
techouse Jan 11, 2026
e7d2461
feat: update combine function to handle negative list limits correctl…
techouse Jan 11, 2026
1d6e43c
feat: implement deepcopy for OverflowDict to preserve overflow markers
techouse Jan 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/qs_codec/decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .enums.duplicates import Duplicates
from .enums.sentinel import Sentinel
from .models.decode_options import DecodeOptions
from .models.overflow_dict import OverflowDict
from .models.undefined import UNDEFINED
from .utils.decode_utils import DecodeUtils
from .utils.utils import Utils
Expand Down Expand Up @@ -288,7 +289,7 @@ def _parse_query_string_values(value: str, options: DecodeOptions) -> t.Dict[str

# Combine/overwrite according to the configured duplicates policy.
if existing and options.duplicates == Duplicates.COMBINE:
obj[key] = Utils.combine(obj[key], val)
obj[key] = Utils.combine(obj[key], val, options)
elif not existing or options.duplicates == Duplicates.LAST:
obj[key] = val

Expand Down Expand Up @@ -361,10 +362,14 @@ def _parse_object(
root: str = chain[i]

if root == "[]" and options.parse_lists:
if options.allow_empty_lists and (leaf == "" or (options.strict_null_handling and leaf is None)):
if Utils.is_overflow(leaf):
obj = leaf
elif options.allow_empty_lists and (leaf == "" or (options.strict_null_handling and leaf is None)):
obj = []
else:
obj = list(leaf) if isinstance(leaf, (list, tuple)) else [leaf]
if options.list_limit is not None and len(obj) > options.list_limit:
obj = OverflowDict({str(i): x for i, x in enumerate(obj)})
else:
obj = dict()

Expand All @@ -389,7 +394,10 @@ def _parse_object(
index = None

if not options.parse_lists and decoded_root == "":
obj = {"0": leaf}
if Utils.is_overflow(leaf):
obj = leaf
else:
obj = {"0": leaf}
elif (
index is not None
and index >= 0
Expand Down
25 changes: 25 additions & 0 deletions src/qs_codec/models/overflow_dict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""Overflow marker for list limit conversions."""

from __future__ import annotations

import copy


class OverflowDict(dict):
"""A mutable marker for list overflows when `list_limit` is exceeded."""

def copy(self) -> "OverflowDict":
"""Return an OverflowDict copy to preserve the overflow marker."""
return OverflowDict(super().copy())

def __copy__(self) -> "OverflowDict":
"""Return an OverflowDict copy to preserve the overflow marker."""
return OverflowDict(super().copy())

def __deepcopy__(self, memo: dict[int, object]) -> "OverflowDict":
"""Return an OverflowDict deepcopy to preserve the overflow marker."""
copied = OverflowDict()
memo[id(self)] = copied
for key, value in self.items():
copied[copy.deepcopy(key, memo)] = copy.deepcopy(value, memo)
return copied
145 changes: 134 additions & 11 deletions src/qs_codec/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,26 @@
from enum import Enum

from ..models.decode_options import DecodeOptions
from ..models.overflow_dict import OverflowDict
from ..models.undefined import Undefined


def _numeric_key_pairs(mapping: t.Mapping[t.Any, t.Any]) -> t.List[t.Tuple[int, t.Any]]:
"""Return (numeric_key, original_key) for keys that coerce to int.

Note: distinct keys like "01" and "1" both coerce to 1; downstream merges
may overwrite earlier values when materializing numeric-keyed dicts.
"""
pairs: t.List[t.Tuple[int, t.Any]] = []
for key in mapping.keys():
try:
numeric_key = int(key)
except (TypeError, ValueError):
continue
pairs.append((numeric_key, key))
return pairs


class Utils:
"""
Namespace container for stateless utility routines.
Expand Down Expand Up @@ -143,6 +160,9 @@ def merge(
target = list(target)
target.append(source)
elif isinstance(target, t.Mapping):
if Utils.is_overflow(target):
return Utils.combine(target, source, options)

# Target is a mapping but source is a sequence — coerce indices to string keys.
if isinstance(source, (list, tuple)):
_new = dict(target)
Expand All @@ -166,28 +186,58 @@ def merge(
**source,
}

if Utils.is_overflow(source):
source_of = t.cast(OverflowDict, source)
sorted_pairs = sorted(_numeric_key_pairs(source_of), key=lambda item: item[0])
numeric_keys = {key for _, key in sorted_pairs}
result = OverflowDict()
offset = 0
if not isinstance(target, Undefined):
result["0"] = target
offset = 1
for numeric_key, key in sorted_pairs:
val = source_of[key]
if not isinstance(val, Undefined):
# Offset ensures target occupies index "0"; source indices shift up by 1
result[str(numeric_key + offset)] = val
for key, val in source_of.items():
if key in numeric_keys:
continue
if not isinstance(val, Undefined):
result[key] = val
return result

_res: t.List[t.Any] = []
_iter1 = target if isinstance(target, (list, tuple)) else [target]
for _el in _iter1:
if not isinstance(_el, Undefined):
_res.append(_el)
_iter2 = source if isinstance(source, (list, tuple)) else [source]
_iter2 = [source]
for _el in _iter2:
if not isinstance(_el, Undefined):
_res.append(_el)
return _res

# Prepare a mutable copy of the target we can merge into.
is_overflow_target = Utils.is_overflow(target)
merge_target: t.Dict[str, t.Any] = copy.deepcopy(target if isinstance(target, dict) else dict(target))

# For overlapping keys, merge recursively; otherwise, take the new value.
return {
merged_updates: t.Dict[t.Any, t.Any] = {}
# Prefer exact key matches; fall back to string normalization only when needed.
for key, value in source.items():
normalized_key = str(key)
if key in merge_target:
merged_updates[key] = Utils.merge(merge_target[key], value, options)
elif normalized_key in merge_target:
merged_updates[normalized_key] = Utils.merge(merge_target[normalized_key], value, options)
else:
merged_updates[key] = value
merged = {
**merge_target,
**{
str(key): Utils.merge(merge_target[key], value, options) if key in merge_target else value
for key, value in source.items()
},
**merged_updates,
}
return OverflowDict(merged) if is_overflow_target else merged

@staticmethod
def compact(root: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
Expand Down Expand Up @@ -324,17 +374,90 @@ def _dicts_are_equal(
else:
return d1 == d2

@staticmethod
def is_overflow(obj: t.Any) -> bool:
"""Check if an object is an OverflowDict."""
return isinstance(obj, OverflowDict)

@staticmethod
def combine(
a: t.Union[t.List[t.Any], t.Tuple[t.Any], t.Any],
b: t.Union[t.List[t.Any], t.Tuple[t.Any], t.Any],
) -> t.List[t.Any]:
options: t.Optional[DecodeOptions] = None,
) -> t.Union[t.List[t.Any], t.Dict[str, t.Any]]:
"""
Concatenate two values, treating non‑sequences as singletons.

Returns a new `list`; tuples are expanded but not preserved as tuples.
Concatenate two values, treating non-sequences as singletons.

If `list_limit` is exceeded, converts the list to an `OverflowDict`
(a dict with numeric keys) to prevent memory exhaustion.
When `options` is provided, its ``list_limit`` controls when a list is
converted into an :class:`OverflowDict` (a dict with numeric keys) to
prevent unbounded growth. If ``options`` is ``None``, the default
``list_limit`` from :class:`DecodeOptions` is used.
A negative ``list_limit`` is treated as "overflow immediately": any
non-empty combined result will be converted to :class:`OverflowDict`.
This helper never raises an exception when the limit is exceeded; even
if :class:`DecodeOptions` has ``raise_on_limit_exceeded`` set to
``True``, ``combine`` will still handle overflow only by converting the
list to :class:`OverflowDict`.
"""
return [*(a if isinstance(a, (list, tuple)) else [a]), *(b if isinstance(b, (list, tuple)) else [b])]
if Utils.is_overflow(a):
# a is already an OverflowDict. Append b to a *copy* at the next numeric index.
# We assume sequential keys; len(a_copy) gives the next index.
orig_a = t.cast(OverflowDict, a)
a_copy = OverflowDict({k: v for k, v in orig_a.items() if not isinstance(v, Undefined)})
# Use max key + 1 to handle sparse dicts safely, rather than len(a)
key_pairs = _numeric_key_pairs(a_copy)
idx = (max(key for key, _ in key_pairs) + 1) if key_pairs else 0

if isinstance(b, (list, tuple)):
for item in b:
if not isinstance(item, Undefined):
a_copy[str(idx)] = item
idx += 1
elif Utils.is_overflow(b):
b = t.cast(OverflowDict, b)
# Iterate in numeric key order to preserve list semantics
for _, k in sorted(_numeric_key_pairs(b), key=lambda item: item[0]):
val = b[k]
if not isinstance(val, Undefined):
a_copy[str(idx)] = val
idx += 1
else:
if not isinstance(b, Undefined):
a_copy[str(idx)] = b
return a_copy

# Normal combination: flatten lists/tuples
# Flatten a
if isinstance(a, (list, tuple)):
list_a = [x for x in a if not isinstance(x, Undefined)]
else:
list_a = [a] if not isinstance(a, Undefined) else []

# Flatten b, handling OverflowDict as a list source
if isinstance(b, (list, tuple)):
list_b = [x for x in b if not isinstance(x, Undefined)]
elif Utils.is_overflow(b):
b_of = t.cast(OverflowDict, b)
list_b = [
b_of[k]
for _, k in sorted(_numeric_key_pairs(b_of), key=lambda item: item[0])
if not isinstance(b_of[k], Undefined)
]
else:
list_b = [b] if not isinstance(b, Undefined) else []

res = [*list_a, *list_b]

list_limit = options.list_limit if options else DecodeOptions().list_limit
if list_limit < 0:
return OverflowDict({str(i): x for i, x in enumerate(res)}) if res else res
if len(res) > list_limit:
# Convert to OverflowDict
return OverflowDict({str(i): x for i, x in enumerate(res)})

return res

@staticmethod
def apply(
Expand Down
2 changes: 1 addition & 1 deletion tests/comparison/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
"author": "Klemen Tusar",
"license": "BSD-3-Clause",
"dependencies": {
"qs": "^6.14.0"
"qs": "^6.14.1"
}
}
63 changes: 58 additions & 5 deletions tests/unit/decode_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from qs_codec import Charset, DecodeOptions, Duplicates, decode, load, loads
from qs_codec.decode import _parse_object
from qs_codec.enums.decode_kind import DecodeKind
from qs_codec.models.overflow_dict import OverflowDict
from qs_codec.utils.decode_utils import DecodeUtils


Expand Down Expand Up @@ -318,14 +319,20 @@ def test_parses_an_explicit_list(self, query: str, expected: t.Dict) -> None:
pytest.param("a=b&a[0]=c", None, {"a": ["b", "c"]}, id="simple-first-indexed-list-second"),
pytest.param("a[1]=b&a=c", DecodeOptions(list_limit=20), {"a": ["b", "c"]}, id="indexed-list-with-limit"),
pytest.param(
"a[]=b&a=c", DecodeOptions(list_limit=0), {"a": ["b", "c"]}, id="explicit-list-with-zero-limit"
"a[]=b&a=c",
DecodeOptions(list_limit=0),
{"a": {"0": "b", "1": "c"}},
id="explicit-list-with-zero-limit",
),
pytest.param("a[]=b&a=c", None, {"a": ["b", "c"]}, id="explicit-list-default"),
pytest.param(
"a=b&a[1]=c", DecodeOptions(list_limit=20), {"a": ["b", "c"]}, id="simple-and-indexed-with-limit"
),
pytest.param(
"a=b&a[]=c", DecodeOptions(list_limit=0), {"a": ["b", "c"]}, id="simple-and-explicit-zero-limit"
"a=b&a[]=c",
DecodeOptions(list_limit=0),
{"a": {"0": "b", "1": "c"}},
id="simple-and-explicit-zero-limit",
),
pytest.param("a=b&a[]=c", None, {"a": ["b", "c"]}, id="simple-and-explicit-default"),
],
Expand Down Expand Up @@ -574,7 +581,7 @@ def test_parses_lists_of_dicts(self, query: str, expected: t.Mapping[str, t.Any]
pytest.param(
"a[]=b&a[]&a[]=c&a[]=",
DecodeOptions(strict_null_handling=True, list_limit=0),
{"a": ["b", None, "c", ""]},
{"a": {"0": "b", "1": None, "2": "c", "3": ""}},
id="strict-null-and-empty-zero-limit",
),
pytest.param(
Expand All @@ -586,7 +593,7 @@ def test_parses_lists_of_dicts(self, query: str, expected: t.Mapping[str, t.Any]
pytest.param(
"a[]=b&a[]=&a[]=c&a[]",
DecodeOptions(strict_null_handling=True, list_limit=0),
{"a": ["b", "", "c", None]},
{"a": {"0": "b", "1": "", "2": "c", "3": None}},
id="empty-and-strict-null-zero-limit",
),
pytest.param("a[]=&a[]=b&a[]=c", None, {"a": ["", "b", "c"]}, id="explicit-empty-first"),
Expand Down Expand Up @@ -1328,7 +1335,9 @@ def test_current_list_length_calculation(self) -> None:
False,
id="convert-to-map",
),
pytest.param("a[]=1&a[]=2", DecodeOptions(list_limit=0), {"a": ["1", "2"]}, False, id="zero-list-limit"),
pytest.param(
"a[]=1&a[]=2", DecodeOptions(list_limit=0), {"a": {"0": "1", "1": "2"}}, False, id="zero-list-limit"
),
pytest.param(
"a[]=1&a[]=2",
DecodeOptions(list_limit=-1, raise_on_limit_exceeded=True),
Expand Down Expand Up @@ -1680,3 +1689,47 @@ def test_strict_depth_overflow_raises_for_well_formed(self) -> None:
def test_unterminated_group_does_not_raise_under_strict_depth(self) -> None:
segs = DecodeUtils.split_key_into_segments("a[b[c", allow_dots=False, max_depth=5, strict_depth=True)
assert segs == ["a", "[[b[c]"]


class TestCVE2024:
def test_dos_attack(self) -> None:
# JS test:
# var arr = [];
# for (var i = 0; i < 105; i++) {
# arr[arr.length] = 'x';
# }
# var attack = 'a[]=' + arr.join('&a[]=');
# var result = qs.parse(attack, { arrayLimit: 100 });
# t.notOk(Array.isArray(result.a))

arr = ["x"] * 105
# Construct query: a[]=x&a[]=x...
attack = "a[]=" + "&a[]=".join(arr)

# list_limit is the python equivalent of arrayLimit
options = DecodeOptions(list_limit=100)
result = decode(attack, options=options)

assert isinstance(result["a"], dict), "Should be a dict when limit exceeded"
assert isinstance(result["a"], OverflowDict)
assert len(result["a"]) == 105
assert result["a"]["0"] == "x"
assert result["a"]["104"] == "x"

def test_array_limit_checks(self) -> None:
# JS patch tests
# st.deepEqual(qs.parse('a[]=b', { arrayLimit: 0 }), { a: { 0: 'b' } });
assert decode("a[]=b", DecodeOptions(list_limit=0)) == {"a": {"0": "b"}}

# st.deepEqual(qs.parse('a[]=b&a[]=c', { arrayLimit: 0 }), { a: { 0: 'b', 1: 'c' } });
assert decode("a[]=b&a[]=c", DecodeOptions(list_limit=0)) == {"a": {"0": "b", "1": "c"}}

# st.deepEqual(qs.parse('a[]=b&a[]=c', { arrayLimit: 1 }), { a: { 0: 'b', 1: 'c' } });
assert decode("a[]=b&a[]=c", DecodeOptions(list_limit=1)) == {"a": {"0": "b", "1": "c"}}

# st.deepEqual(qs.parse('a[]=b&a[]=c&a[]=d', { arrayLimit: 2 }), { a: { 0: 'b', 1: 'c', 2: 'd' } });
assert decode("a[]=b&a[]=c&a[]=d", DecodeOptions(list_limit=2)) == {"a": {"0": "b", "1": "c", "2": "d"}}

def test_no_limit_does_not_overflow(self) -> None:
# Verify that within limit it stays a list
assert decode("a[]=b&a[]=c", DecodeOptions(list_limit=2)) == {"a": ["b", "c"]}
Loading