Skip to content

Commit fe2b9e3

Browse files
committed
v0.5.8 More control over header deduplication
1 parent 01e730d commit fe2b9e3

File tree

3 files changed

+100
-10
lines changed

3 files changed

+100
-10
lines changed

dataflows/VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.5.7
1+
0.5.8

dataflows/processors/load.py

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ def __init__(self, load_source, name=None, resources=None, strip=True, limit_row
5757
override_schema=None, override_fields=None,
5858
extract_missing_values=None,
5959
deduplicate_headers=False,
60+
deduplicate_headers_case_sensitive=True,
61+
deduplicate_headers_format=' (%s)',
6062
on_error=raise_exception,
6163
**options):
6264
super(load, self).__init__()
@@ -70,6 +72,8 @@ def __init__(self, load_source, name=None, resources=None, strip=True, limit_row
7072
self.override_schema = override_schema
7173
self.override_fields = override_fields
7274
self.deduplicate_headers = deduplicate_headers
75+
self.deduplicate_headers_case_sensitive = deduplicate_headers_case_sensitive
76+
self.deduplicate_headers_format = deduplicate_headers_format
7377

7478
# Extract missing values
7579
self.extract_missing_values = None
@@ -180,12 +184,21 @@ def safe_process_datapackage(self, dp: Package):
180184
self.options.setdefault('headers', 1)
181185
self.options.setdefault('sample_size', 1000)
182186
stream: Stream = Stream(self.load_source, **self.options).open()
183-
if len(stream.headers) != len(set(stream.headers)):
187+
if self.deduplicate_headers_case_sensitive:
188+
duplication_test = len(stream.headers) != len(set(stream.headers))
189+
else:
190+
lower_headers = [header.lower() for header in stream.headers]
191+
duplication_test = len(lower_headers) != len(set(lower_headers))
192+
# duplication_test = len(stream.headers) != len(set(stream.headers))
193+
if duplication_test:
184194
if not self.deduplicate_headers:
185195
raise ValueError(
186196
'Found duplicate headers.' +
187197
'Use the `deduplicate_headers` flag (found headers=%r)' % stream.headers)
188-
stream.headers = self.rename_duplicate_headers(stream.headers)
198+
stream.headers = self.rename_duplicate_headers(
199+
stream.headers, case_sensitive=self.deduplicate_headers_case_sensitive,
200+
deduplicate_format=self.deduplicate_headers_format
201+
)
189202
schema = Schema(self.override_schema or {}).infer(
190203
stream.sample, headers=stream.headers,
191204
confidence=1, guesser_cls=self.guesser)
@@ -269,15 +282,21 @@ def process_resources(self, resources):
269282
yield it
270283

271284
@staticmethod
272-
def rename_duplicate_headers(duplicate_headers):
285+
def rename_duplicate_headers(duplicate_headers, case_sensitive=True, deduplicate_format=' (%s)'):
273286
counter = {}
274287
headers = []
288+
header_keys = []
275289
for header in duplicate_headers:
276-
counter.setdefault(header, 0)
277-
counter[header] += 1
278-
if counter[header] > 1:
279-
if counter[header] == 2:
280-
headers[headers.index(header)] = '%s (%s)' % (header, 1)
281-
header = '%s (%s)' % (header, counter[header])
290+
header_key = header
291+
header_keys.append(header_key)
292+
if not case_sensitive:
293+
header_key = header_key.lower()
294+
counter.setdefault(header_key, 0)
295+
counter[header_key] += 1
296+
if counter[header_key] > 1:
297+
if counter[header_key] == 2:
298+
prev_index = header_keys.index(header_key)
299+
headers[prev_index] = ('%s' + deduplicate_format) % (headers[prev_index], 1)
300+
header = ('%s' + deduplicate_format) % (header, counter[header_key])
282301
headers.append(header)
283302
return headers

tests/test_lib.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1955,6 +1955,21 @@ def test_load_duplicate_headers():
19551955
assert 'duplicate headers' in str(cause)
19561956

19571957

1958+
def test_load_duplicate_headers_with_deduplicate_headers_flag_and_format():
1959+
from dataflows import load
1960+
flow = Flow(
1961+
load('data/duplicate_headers.csv', deduplicate_headers=True, deduplicate_headers_format='__%s'),
1962+
)
1963+
data, package, stats = flow.results()
1964+
assert package.descriptor['resources'][0]['schema']['fields'] == [
1965+
{'name': 'header1', 'type': 'string', 'format': 'default'},
1966+
{'name': 'header2__1', 'type': 'string', 'format': 'default'},
1967+
{'name': 'header2__2', 'type': 'string', 'format': 'default'},
1968+
]
1969+
assert data == [[
1970+
{'header1': 'value1', 'header2__1': 'value2', 'header2__2': 'value3'},
1971+
]]
1972+
19581973
def test_load_duplicate_headers_with_deduplicate_headers_flag():
19591974
from dataflows import load
19601975
flow = Flow(
@@ -1970,6 +1985,40 @@ def test_load_duplicate_headers_with_deduplicate_headers_flag():
19701985
{'header1': 'value1', 'header2 (1)': 'value2', 'header2 (2)': 'value3'},
19711986
]]
19721987

1988+
def test_load_duplicate_headers_case():
1989+
from dataflows import load, exceptions
1990+
flow = Flow(
1991+
load('data/duplicate_headers_case.csv'),
1992+
)
1993+
data, package, stats = flow.results()
1994+
assert data == [[
1995+
{'header1': 'value1', 'header2': 'value2', 'HEADER2': 'value3'},
1996+
]]
1997+
1998+
flow = Flow(
1999+
load('data/duplicate_headers_case.csv', deduplicate_headers_case_sensitive=False),
2000+
)
2001+
with pytest.raises(exceptions.ProcessorError) as excinfo:
2002+
flow.results()
2003+
cause = excinfo.value.cause
2004+
assert 'duplicate headers' in str(cause)
2005+
2006+
2007+
def test_load_duplicate_headers_case_with_deduplicate_headers_flag():
2008+
from dataflows import load
2009+
flow = Flow(
2010+
load('data/duplicate_headers_case.csv', deduplicate_headers=True, deduplicate_headers_case_sensitive=False),
2011+
)
2012+
data, package, stats = flow.results()
2013+
assert package.descriptor['resources'][0]['schema']['fields'] == [
2014+
{'name': 'header1', 'type': 'string', 'format': 'default'},
2015+
{'name': 'header2 (1)', 'type': 'string', 'format': 'default'},
2016+
{'name': 'HEADER2 (2)', 'type': 'string', 'format': 'default'},
2017+
]
2018+
assert data == [[
2019+
{'header1': 'value1', 'header2 (1)': 'value2', 'HEADER2 (2)': 'value3'},
2020+
]]
2021+
19732022

19742023
# Temporal format
19752024

@@ -2389,6 +2438,28 @@ def test_rename_fields_simple():
23892438

23902439
assert res == [dict(A=i, B=i, c=i) for i in range(5)]
23912440

2441+
2442+
def test_rename_fields_disable_regex():
2443+
from dataflows import Flow, rename_fields
2444+
2445+
data = []
2446+
for i in range(5):
2447+
x = dict()
2448+
x['a (1)'] = i
2449+
x['b (2)'] = i
2450+
x['c (3)'] = i
2451+
data.append(x)
2452+
rename = dict()
2453+
rename['a (1)'] = 'A'
2454+
rename['b (2)'] = 'B'
2455+
rename['c (3)'] = 'C'
2456+
res = Flow(
2457+
data,
2458+
rename_fields(rename, regex=False),
2459+
).results()[0][0]
2460+
2461+
assert res == [dict(A=i, B=i, C=i) for i in range(5)]
2462+
23922463
def test_rename_fields_regex():
23932464
from dataflows import Flow, rename_fields
23942465

0 commit comments

Comments
 (0)