-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfile_utils.py
More file actions
54 lines (44 loc) · 1.75 KB
/
file_utils.py
File metadata and controls
54 lines (44 loc) · 1.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import aiofiles
import aiofiles.os
import base64
def binary_to_base64(binary_data: bytes) -> str:
"""Converts binary data to a base64-encoded string."""
if not isinstance(binary_data, bytes):
raise TypeError(f"Expected bytes for binary_data, got {type(binary_data)}")
base64_bytes = base64.b64encode(binary_data)
base64_str = base64_bytes.decode('utf-8')
return base64_str
def base64_to_binary(base64_str: str) -> bytes:
"""Converts a base64-encoded string to binary data."""
if not isinstance(base64_str, str):
raise TypeError(f"Expected str for base64_str, got {type(base64_str)}")
base64_bytes = base64_str.encode('utf-8')
binary_data = base64.b64decode(base64_bytes)
return binary_data
async def save_base64_as_binary(
file_path: str,
base64_str: str
) -> str:
assert isinstance(file_path, str)
assert isinstance(base64_str, str)
binary_data = base64_to_binary(base64_str)
async with aiofiles.open(file_path, "wb") as file:
await file.write(binary_data)
return file_path
async def read_file_bytes(
file_path: str
) -> bytes:
"""Read a file asynchronously and return its content as bytes."""
if not isinstance(file_path, str):
raise TypeError(f"Expected str for file_path, got {type(file_path)}")
if not await aiofiles.os.path.exists(file_path):
raise FileNotFoundError(f"File does not exist: {file_path}")
async with aiofiles.open(file_path, "rb") as file:
content = await file.read()
return content
async def read_file_base64(
file_path: str
) -> str:
"""Read a file asynchronously and return its content as a base64-encoded string."""
file_bytes = await read_file_bytes(file_path)
return binary_to_base64(file_bytes)