Skip to content

Commit c4c1fe3

Browse files
authored
Fix S3Path copy to Local: name collision of file / dir (#533)
* tests: add copy test for fs with key/prefix collision * upath.core: implement support for on_name_collision in UPath._copy_from * upath.implementations.local: fix PosixUPath/WindowsUPath _copy_from * upath.implementations.cloud: fix _copy_from for S3Path * tests: fix s3 to local name collision test * upath.implementations.cloud: fix S3Path.copy * upath.implementations.cloud: S3Path._copy_from doesn't need the fix * upath.implementations.cloud: S3Path.copy ensure type in super * upath.types: fix type error on py3.9 * tests: fix windows test path comparision * upath.types: correct __all__ * tests: skip azurite version check for now * tests: switch gcs fixtures to memory backend to support file/dir collisions * tests: add GCSPath test for copy_into (xfail) * tests: add AzurePath test for copy_into (xfail)
1 parent 9ceaf2f commit c4c1fe3

File tree

8 files changed

+260
-17
lines changed

8 files changed

+260
-17
lines changed

upath/core.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from upath.registry import get_upath_class
4242
from upath.types import UNSET_DEFAULT
4343
from upath.types import JoinablePathLike
44+
from upath.types import OnNameCollisionFunc
4445
from upath.types import PathInfo
4546
from upath.types import ReadablePath
4647
from upath.types import ReadablePathLike
@@ -1305,9 +1306,48 @@ def _copy_from(
13051306
self,
13061307
source: ReadablePath,
13071308
follow_symlinks: bool = True,
1309+
on_name_collision: OnNameCollisionFunc | None = None,
13081310
**kwargs: Any,
13091311
) -> None:
1310-
return super()._copy_from(source, follow_symlinks)
1312+
"""
1313+
UPath custom:: Recursively copy the given path to this path.
1314+
"""
1315+
# fixme: it would be best if this would be upstreamed
1316+
from pathlib_abc import vfsopen
1317+
from pathlib_abc import vfspath
1318+
from pathlib_abc._os import copyfileobj
1319+
from pathlib_abc._os import ensure_different_files
1320+
1321+
stack: list[tuple[ReadablePath, WritablePath]] = [(source, self)]
1322+
while stack:
1323+
src, dst = stack.pop()
1324+
info = src.info
1325+
if not follow_symlinks and info.is_symlink():
1326+
dst.symlink_to(vfspath(src.readlink()), src.info.is_dir())
1327+
elif on_name_collision and info.is_file() and info.is_dir():
1328+
dst_file, dst_dir = on_name_collision(src, dst)
1329+
if dst_file is not None:
1330+
ensure_different_files(src, dst_file)
1331+
with vfsopen(src, "rb") as source_f:
1332+
with vfsopen(dst_file, "wb") as target_f:
1333+
copyfileobj(source_f, target_f)
1334+
if dst_dir is not None:
1335+
children = src.iterdir()
1336+
dst_dir.mkdir()
1337+
# feed through dict.fromkeys to remove duplicates
1338+
for child in dict.fromkeys(children):
1339+
stack.append((child, dst_dir.joinpath(child.name)))
1340+
elif info.is_dir():
1341+
children = src.iterdir()
1342+
dst.mkdir()
1343+
# feed through dict.fromkeys to remove duplicates
1344+
for child in dict.fromkeys(children):
1345+
stack.append((child, dst.joinpath(child.name)))
1346+
else:
1347+
ensure_different_files(src, dst)
1348+
with vfsopen(src, "rb") as source_f:
1349+
with vfsopen(dst, "wb") as target_f:
1350+
copyfileobj(source_f, target_f)
13111351

13121352
# --- WritablePath attributes -------------------------------------
13131353

upath/implementations/cloud.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,19 @@
55
from collections.abc import Sequence
66
from typing import TYPE_CHECKING
77
from typing import Any
8+
from typing import overload
89

910
from upath import UnsupportedOperation
1011
from upath._chain import DEFAULT_CHAIN_PARSER
1112
from upath._flavour import upath_strip_protocol
1213
from upath.core import UPath
1314
from upath.types import JoinablePathLike
15+
from upath.types import SupportsPathLike
16+
from upath.types import WritablePath
1417

1518
if TYPE_CHECKING:
1619
from typing import Literal
20+
from typing import TypeVar
1721

1822
if sys.version_info >= (3, 11):
1923
from typing import Self
@@ -28,6 +32,8 @@
2832
from upath.types.storage_options import HfStorageOptions
2933
from upath.types.storage_options import S3StorageOptions
3034

35+
_WT = TypeVar("_WT", bound="WritablePath")
36+
3137
__all__ = [
3238
"CloudPath",
3339
"GCSPath",
@@ -147,6 +153,30 @@ def __init__(
147153
if not self.drive and len(self.parts) > 1:
148154
raise ValueError("non key-like path provided (bucket/container missing)")
149155

156+
@overload
157+
def copy(self, target: _WT, **kwargs: Any) -> _WT: ...
158+
159+
@overload
160+
def copy(self, target: SupportsPathLike | str, **kwargs: Any) -> Self: ...
161+
162+
def copy(self, target: _WT | SupportsPathLike | str, **kwargs: Any) -> _WT | UPath:
163+
"""
164+
Recursively copy this file or directory tree to the given destination.
165+
"""
166+
# to allow _copy_from to check if a path isfile AND isdir
167+
# we need to disable s3fs's dircache mechanism because it
168+
# currently implements a XOR relation the two for objects
169+
# ref: fsspec/s3fs#999
170+
sopts = dict(self.storage_options)
171+
sopts["use_listings_cache"] = False
172+
new_self = type(self)(
173+
self.path,
174+
protocol=self.protocol, # type: ignore
175+
**sopts,
176+
)
177+
assert type(self) is type(new_self)
178+
return super(type(new_self), new_self).copy(target, **kwargs)
179+
150180

151181
class AzurePath(CloudPath):
152182
__slots__ = ()

upath/implementations/local.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,11 @@ def rmdir(self, recursive: bool = UNSET_DEFAULT) -> None:
364364
else:
365365
shutil.rmtree(self)
366366

367+
# we need to override pathlib.Path._copy_from to support it as a
368+
# WritablePath._copy_from target with support for on_name_collision
369+
# Issue: https://github.com/barneygale/pathlib-abc/issues/48
370+
_copy_from = UPath._copy_from
371+
367372
if sys.version_info < (3, 14): # noqa: C901
368373

369374
@overload
@@ -720,17 +725,6 @@ def chmod(
720725
)
721726
return super().chmod(mode)
722727

723-
if not hasattr(pathlib.Path, "_copy_from"):
724-
725-
def _copy_from(
726-
self,
727-
source: ReadablePath | LocalPath,
728-
follow_symlinks: bool = True,
729-
preserve_metadata: bool = False,
730-
) -> None:
731-
_copy_from: Any = WritablePath._copy_from.__get__(self)
732-
_copy_from(source, follow_symlinks=follow_symlinks)
733-
734728

735729
UPath.register(LocalPath)
736730

upath/tests/conftest.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -238,9 +238,19 @@ def docker_gcs():
238238
pytest.skip("docker not installed")
239239

240240
container = "gcsfs_test"
241-
cmd = (
242-
"docker run -d -p 4443:4443 --name gcsfs_test fsouza/fake-gcs-server:latest -scheme " # noqa: E501
243-
"http -public-host http://localhost:4443 -external-url http://localhost:4443" # noqa: E501
241+
cmd = " ".join(
242+
[
243+
"docker",
244+
"run",
245+
"-d",
246+
"-p 4443:4443",
247+
"--name gcsfs_test",
248+
"fsouza/fake-gcs-server:latest",
249+
"-scheme http",
250+
"-public-host http://localhost:4443",
251+
"-external-url http://localhost:4443",
252+
"-backend memory",
253+
]
244254
)
245255
stop_docker(container)
246256
subprocess.check_output(shlex.split(cmd))
@@ -385,8 +395,8 @@ def docker_azurite(azurite_credentials):
385395
image = "mcr.microsoft.com/azure-storage/azurite"
386396
container_name = "azure_test"
387397
cmd = (
388-
f"docker run --rm -d -p {AZURITE_PORT}:10000 --name {container_name} {image}" # noqa: E501
389-
" azurite-blob --loose --blobHost 0.0.0.0" # noqa: E501
398+
f"docker run --rm -d -p {AZURITE_PORT}:10000 --name {container_name} {image}:latest" # noqa: E501
399+
" azurite-blob --loose --blobHost 0.0.0.0 --skipApiVersionCheck" # noqa: E501
390400
)
391401
url = f"http://localhost:{AZURITE_PORT}"
392402

upath/tests/implementations/test_azure.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import warnings
2+
3+
import fsspec
14
import pytest
25

36
from upath import UPath
@@ -7,6 +10,7 @@
710
from ..utils import OverrideMeta
811
from ..utils import extends_base
912
from ..utils import overrides_base
13+
from ..utils import posixify
1014
from ..utils import skip_on_windows
1115

1216

@@ -61,3 +65,56 @@ def test_broken_mkdir(self):
6165

6266
(path / "file").write_text("foo")
6367
assert path.exists()
68+
69+
70+
@skip_on_windows
71+
@pytest.mark.xfail(reason="adlfs returns isdir false")
72+
def test_copy__object_key_collides_with_dir_prefix(azurite_credentials, tmp_path):
73+
account_name, connection_string = azurite_credentials
74+
storage_options = {
75+
"account_name": account_name,
76+
"connection_string": connection_string,
77+
}
78+
79+
az = fsspec.filesystem("az", **storage_options, use_listings_cache=False)
80+
container = "copy-into-collision-container"
81+
az.mkdir(container)
82+
# store more objects with same prefix
83+
az.pipe_file(f"{container}/src/common_prefix/file1.txt", b"1")
84+
az.pipe_file(f"{container}/src/common_prefix/file2.txt", b"2")
85+
# object under common prefix as key
86+
az.pipe_file(f"{container}/src/common_prefix", b"hello world")
87+
az.invalidate_cache()
88+
89+
# make sure the sources have a collision
90+
assert az.isfile(f"{container}/src/common_prefix")
91+
assert az.isdir(f"{container}/src/common_prefix")
92+
assert az.isfile(f"{container}/src/common_prefix/file1.txt")
93+
assert az.isfile(f"{container}/src/common_prefix/file2.txt")
94+
# prepare source and destination
95+
src = UPath(f"az://{container}/src", **storage_options)
96+
dst = UPath(tmp_path)
97+
98+
def on_collision_rename_file(src, dst):
99+
warnings.warn(
100+
f"{src!s} collides with prefix. Renaming target file object to {dst!s}",
101+
UserWarning,
102+
stacklevel=3,
103+
)
104+
return (
105+
dst.with_suffix(dst.suffix + ".COLLISION"),
106+
dst,
107+
)
108+
109+
# perform copy
110+
src.copy_into(dst, on_name_collision=on_collision_rename_file)
111+
112+
# check results
113+
dst_files = sorted(posixify(x.relative_to(tmp_path)) for x in dst.glob("**/*"))
114+
assert dst_files == [
115+
"src",
116+
"src/common_prefix",
117+
"src/common_prefix.COLLISION",
118+
"src/common_prefix/file1.txt",
119+
"src/common_prefix/file2.txt",
120+
]

upath/tests/implementations/test_gcs.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import warnings
2+
13
import fsspec
24
import pytest
35

@@ -8,6 +10,7 @@
810
from ..utils import OverrideMeta
911
from ..utils import extends_base
1012
from ..utils import overrides_base
13+
from ..utils import posixify
1114
from ..utils import skip_on_windows
1215

1316

@@ -49,3 +52,56 @@ def test_mkdir_in_empty_bucket(docker_gcs):
4952
endpoint_url=docker_gcs,
5053
token="anon",
5154
).parent.mkdir(parents=True, exist_ok=True)
55+
56+
57+
@skip_on_windows
58+
@pytest.mark.xfail(reason="gcsfs returns isdir false")
59+
def test_copy__object_key_collides_with_dir_prefix(docker_gcs, tmp_path):
60+
gcs = fsspec.filesystem(
61+
"gcs",
62+
endpoint_url=docker_gcs,
63+
token="anon",
64+
use_listings_cache=False,
65+
)
66+
bucket = "copy_into_collision_bucket"
67+
gcs.mkdir(bucket)
68+
# gcs.mkdir(bucket + "/src" + "/common_prefix/")
69+
# object under common prefix as key
70+
gcs.pipe_file(f"{bucket}/src/common_prefix", b"hello world")
71+
# store more objects with same prefix
72+
gcs.pipe_file(f"{bucket}/src/common_prefix/file1.txt", b"1")
73+
gcs.pipe_file(f"{bucket}/src/common_prefix/file2.txt", b"2")
74+
gcs.invalidate_cache()
75+
76+
# make sure the sources have a collision
77+
assert gcs.isfile(f"{bucket}/src/common_prefix")
78+
assert gcs.isdir(f"{bucket}/src/common_prefix") # BROKEN in gcsfs
79+
assert gcs.isfile(f"{bucket}/src/common_prefix/file1.txt")
80+
assert gcs.isfile(f"{bucket}/src/common_prefix/file2.txt")
81+
# prepare source and destination
82+
src = UPath(f"gs://{bucket}/src", endpoint_url=docker_gcs, token="anon")
83+
dst = UPath(tmp_path)
84+
85+
def on_collision_rename_file(src, dst):
86+
warnings.warn(
87+
f"{src!s} collides with prefix. Renaming target file object to {dst!s}",
88+
UserWarning,
89+
stacklevel=3,
90+
)
91+
return (
92+
dst.with_suffix(dst.suffix + ".COLLISION"),
93+
dst,
94+
)
95+
96+
# perform copy
97+
src.copy_into(dst, on_name_collision=on_collision_rename_file)
98+
99+
# check results
100+
dst_files = sorted(posixify(x.relative_to(tmp_path)) for x in dst.glob("**/*"))
101+
assert dst_files == [
102+
"src",
103+
"src/common_prefix",
104+
"src/common_prefix.COLLISION",
105+
"src/common_prefix/file1.txt",
106+
"src/common_prefix/file2.txt",
107+
]

upath/tests/implementations/test_s3.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""see upath/tests/conftest.py for fixtures"""
22

33
import sys
4+
import warnings
45

56
import fsspec
67
import pytest
@@ -12,6 +13,7 @@
1213
from ..utils import OverrideMeta
1314
from ..utils import extends_base
1415
from ..utils import overrides_base
16+
from ..utils import posixify
1517

1618

1719
def silence_botocore_datetime_deprecation(cls):
@@ -165,3 +167,49 @@ def test_pathlib_consistent_join():
165167
b1 = UPath("s3://mybucket/withkey").joinpath("subfolder/myfile.txt")
166168
assert b0 == b1
167169
assert "s3://mybucket/withkey/subfolder/myfile.txt" == str(b0) == str(b1)
170+
171+
172+
def test_copy__object_key_collides_with_dir_prefix(s3_server, tmp_path):
173+
anon, s3so = s3_server
174+
175+
s3 = fsspec.filesystem("s3", anon=anon, **{**s3so, "use_listings_cache": False})
176+
bucket = "copy_into_collision_bucket"
177+
s3.mkdir(bucket + "/src" + "/common_prefix/")
178+
# object under common prefix as key
179+
s3.pipe_file(f"{bucket}/src/common_prefix", b"hello world")
180+
# store more objects with same prefix
181+
s3.pipe_file(f"{bucket}/src/common_prefix/file1.txt", b"1")
182+
s3.pipe_file(f"{bucket}/src/common_prefix/file2.txt", b"2")
183+
184+
# make sure the sources have a collision
185+
assert s3.isdir(f"{bucket}/src/common_prefix")
186+
assert s3.isfile(f"{bucket}/src/common_prefix")
187+
assert s3.isfile(f"{bucket}/src/common_prefix/file1.txt")
188+
assert s3.isfile(f"{bucket}/src/common_prefix/file2.txt")
189+
# prepare source and destination
190+
src = UPath(f"s3://{bucket}/src", anon=anon, **s3so)
191+
dst = UPath(tmp_path)
192+
193+
def on_collision_rename_file(src, dst):
194+
warnings.warn(
195+
f"{src!s} collides with prefix. Renaming target file object to {dst!s}",
196+
UserWarning,
197+
stacklevel=3,
198+
)
199+
return (
200+
dst.with_suffix(dst.suffix + ".COLLISION"),
201+
dst,
202+
)
203+
204+
# perform copy
205+
src.copy_into(dst, on_name_collision=on_collision_rename_file)
206+
207+
# check results
208+
dst_files = sorted(posixify(x.relative_to(tmp_path)) for x in dst.glob("**/*"))
209+
assert dst_files == [
210+
"src",
211+
"src/common_prefix",
212+
"src/common_prefix.COLLISION",
213+
"src/common_prefix/file1.txt",
214+
"src/common_prefix/file2.txt",
215+
]

0 commit comments

Comments
 (0)