Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 105 additions & 67 deletions relink.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,64 @@
# Set up logger
logger = logging.getLogger(__name__)

# Define a custom log level that always prints
ALWAYS = logging.CRITICAL * 2
logging.addLevelName(ALWAYS, "ALWAYS")

def find_and_replace_owned_files(source_dir, target_dir, username, dry_run=False):

def always(self, message, *args, **kwargs):
"""Log message that always appears regardless of log level."""
if self.isEnabledFor(ALWAYS):
# pylint: disable=protected-access
self._log(ALWAYS, message, args, **kwargs)


logging.Logger.always = always


def find_owned_files_scandir(directory, user_uid):
"""
Efficiently find all files owned by a specific user using os.scandir().

This is more efficient than os.walk() because os.scandir() caches stat
information during directory traversal, reducing system calls.

Args:
directory (str): The root directory to search.
user_uid (int): The UID of the user whose files to find.

Yields:
str: Absolute paths to files owned by the user.
"""
try:
with os.scandir(directory) as entries:
for entry in entries:
try:
# Check if it's a file (not following symlinks)
if entry.is_file(follow_symlinks=False):
# Get stat info (cached by scandir, very efficient)
stat_info = entry.stat(follow_symlinks=False)

if stat_info.st_uid == user_uid:
yield entry.path

# Recursively process directories (not following symlinks)
elif entry.is_dir(follow_symlinks=False):
yield from find_owned_files_scandir(entry.path, user_uid)

# Skip symlinks
elif entry.is_symlink():
logger.info("Skipping symlink: %s", entry.path)

except (OSError, PermissionError) as e:
logger.debug("Error accessing %s: %s. Skipping.", entry.path, e)
continue

except (OSError, PermissionError) as e:
logger.debug("Error accessing %s: %s. Skipping.", directory, e)


def replace_files_with_symlinks(source_dir, target_dir, username, dry_run=False):
"""
Finds files owned by a specific user in a source directory tree,
deletes them, and replaces them with symbolic links to the same
Expand Down Expand Up @@ -52,70 +108,52 @@ def find_and_replace_owned_files(source_dir, target_dir, username, dry_run=False
source_dir,
)

for dirpath, _, filenames in os.walk(source_dir):
for filename in filenames:
file_path = os.path.join(dirpath, filename)

# Use os.stat().st_uid to get the file's owner UID
try:
if os.path.islink(file_path):
logger.info("Skipping symlink: %s", file_path)
continue

file_uid = os.stat(file_path).st_uid
except FileNotFoundError:
continue # Skip if file was deleted during traversal

if file_uid == user_uid:
logger.info("Found owned file: %s", file_path)

# Determine the relative path and the new link's destination
relative_path = os.path.relpath(file_path, source_dir)
link_target = os.path.join(target_dir, relative_path)

# Check if the target file actually exists
if not os.path.exists(link_target):
logger.warning(
"Warning: Corresponding file not found in '%s' "
"for '%s'. Skipping.",
target_dir,
file_path,
)
continue

# Get the link name
link_name = file_path

if dry_run:
logger.info(
"[DRY RUN] Would create symbolic link: %s -> %s",
link_name,
link_target,
)
continue

# Remove the original file
try:
os.rename(link_name, link_name + ".tmp")
logger.info("Deleted original file: %s", link_name)
except OSError as e:
logger.error("Error deleting file %s: %s. Skipping.", link_name, e)
continue

# Create the symbolic link, handling necessary parent directories
try:
# Create parent directories for the link if they don't exist
os.makedirs(os.path.dirname(link_name), exist_ok=True)
os.symlink(link_target, link_name)
os.remove(link_name + ".tmp")
logger.info(
"Created symbolic link: %s -> %s", link_name, link_target
)
except OSError as e:
os.rename(link_name + ".tmp", link_name)
logger.error(
"Error creating symlink for %s: %s. Skipping.", link_name, e
)
# Use efficient scandir-based search
for file_path in find_owned_files_scandir(source_dir, user_uid):
logger.info("Found owned file: %s", file_path)

# Determine the relative path and the new link's destination
relative_path = os.path.relpath(file_path, source_dir)
link_target = os.path.join(target_dir, relative_path)

# Check if the target file actually exists
if not os.path.exists(link_target):
logger.warning(
"Warning: Corresponding file not found in '%s' for '%s'. Skipping.",
target_dir,
file_path,
)
continue

# Get the link name
link_name = file_path

if dry_run:
logger.info(
"[DRY RUN] Would create symbolic link: %s -> %s",
link_name,
link_target,
)
continue

# Remove the original file
try:
os.rename(link_name, link_name + ".tmp")
logger.info("Deleted original file: %s", link_name)
except OSError as e:
logger.error("Error deleting file %s: %s. Skipping.", link_name, e)
continue

# Create the symbolic link, handling necessary parent directories
try:
# Create parent directories for the link if they don't exist
os.makedirs(os.path.dirname(link_name), exist_ok=True)
os.symlink(link_target, link_name)
os.remove(link_name + ".tmp")
logger.info("Created symbolic link: %s -> %s", link_name, link_target)
except OSError as e:
os.rename(link_name + ".tmp", link_name)
logger.error("Error creating symlink for %s: %s. Skipping.", link_name, e)


def validate_directory(path):
Expand Down Expand Up @@ -228,13 +266,13 @@ def main():
start_time = time.time()

# --- Execution ---
find_and_replace_owned_files(
replace_files_with_symlinks(
args.source_root, args.target_root, my_username, dry_run=args.dry_run
)

if args.timing:
elapsed_time = time.time() - start_time
logger.info("Execution time: %.2f seconds", elapsed_time)
logger.always("Execution time: %.2f seconds", elapsed_time)


if __name__ == "__main__":
Expand Down
6 changes: 3 additions & 3 deletions tests/relink/test_dryrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def test_dry_run_no_changes(dry_run_setup, caplog):

# Run in dry-run mode
with caplog.at_level(logging.INFO):
relink.find_and_replace_owned_files(
relink.replace_files_with_symlinks(
source_dir, target_dir, username, dry_run=True
)

Expand All @@ -63,7 +63,7 @@ def test_dry_run_shows_message(dry_run_setup, caplog):

# Run in dry-run mode
with caplog.at_level(logging.INFO):
relink.find_and_replace_owned_files(
relink.replace_files_with_symlinks(
source_dir, target_dir, username, dry_run=True
)

Expand All @@ -79,7 +79,7 @@ def test_dry_run_no_delete_or_create_messages(dry_run_setup, caplog):

# Run in dry-run mode
with caplog.at_level(logging.INFO):
relink.find_and_replace_owned_files(
relink.replace_files_with_symlinks(
source_dir, target_dir, username, dry_run=True
)

Expand Down
Loading