Skip to content

Commit a58c60a

Browse files
authored
Merge pull request #7 from rfcx/feature/AI-241-sdk-enables-users-to-download-in-parallel
AI-241 SDK enables users to download in parallel
2 parents 7e336aa + 3a42f5c commit a58c60a

File tree

3 files changed

+116
-9
lines changed

3 files changed

+116
-9
lines changed

package-rfcx/rfcx/audio.py

Lines changed: 71 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,28 @@
1-
import urllib.request
1+
import datetime
2+
import requests
23
import shutil
4+
import os
5+
import concurrent.futures
6+
from rfcx._api_rfcx import guardianAudio
37

48
def __save_file(url, local_path):
59
""" Download the file from `url` and save it locally under `local_path` """
6-
with urllib.request.urlopen(url) as response, open(local_path, 'wb') as out_file:
7-
shutil.copyfileobj(response, out_file)
10+
response = requests.get(url, stream=True)
11+
if (response.status_code == 200):
12+
with open(local_path, 'wb') as out_file:
13+
response.raw.decode_content = True
14+
shutil.copyfileobj(response.raw, out_file)
15+
else:
16+
print("Can not download {} with status {}".format(url, response.status_code))
817

9-
def __local_audio_file_path(path, audio_id, audio_extension):
18+
def __local_audio_file_path(path, audio_name, audio_extension):
1019
""" Create string for the name and the path """
11-
return path + '/' + audio_id + "." + audio_extension
20+
return path + '/' + audio_name + "." + audio_extension
1221

1322
def save_audio_file(destination_path, audio_id, source_audio_extension='opus'):
1423
""" Prepare `url` and `local_path` and save it using function `__save_file`
1524
Args:
16-
destination_path: Path to the save directory.
25+
destination_path: Audio save path.
1726
audio_id: RFCx audio id.
1827
source_audio_extension: (optional, default= '.opus') Extension for saving audio files.
1928
@@ -28,3 +37,59 @@ def save_audio_file(destination_path, audio_id, source_audio_extension='opus'):
2837
local_path = __local_audio_file_path(destination_path, audio_id, source_audio_extension)
2938
__save_file(url, local_path)
3039
print('File {}.{} saved to {}'.format(audio_id, source_audio_extension, destination_path))
40+
41+
def __generate_date_list_in_isoformat(start, end):
42+
""" Generate list of date in iso format ending with `Z` """
43+
delta = end - start
44+
dates = [(start + datetime.timedelta(days=i)).replace(microsecond=0).isoformat() + 'Z' for i in range(delta.days + 1)]
45+
return dates
46+
47+
def __segmentDownload(audio_path, file_ext, segment):
48+
audio_id = segment['guid']
49+
audio_name = "{}_{}_{}".format(segment['guardian_guid'], segment['measured_at'].replace(':', '-').replace('.', '-'), audio_id)
50+
url = "https://assets.rfcx.org/audio/" + audio_id + "." + file_ext
51+
local_path = __local_audio_file_path(audio_path, audio_name, file_ext)
52+
__save_file(url, local_path)
53+
54+
def downloadGuardianAudio(token, destination_path, guardian_id, min_date, max_date, file_ext='opus', parallel=True):
55+
""" Download RFCx audio on specific time range using `guardianAudio` to get audio segments information
56+
and save it using function `__save_file`
57+
Args:
58+
token: RFCx client token.
59+
destination_path: Audio save path.
60+
guardian_id: RFCx guardian id
61+
min_date: Download start date
62+
max_date: Download end date
63+
file_ext: (optional, default= '.opus') Extension for saving audio file.
64+
parallel: (optional, default= True) Enable to parallel download audio from RFCx
65+
66+
Returns:
67+
None.
68+
69+
Raises:
70+
TypeError: if missing required arguements.
71+
72+
"""
73+
audio_path = destination_path + '/' + guardian_id
74+
if not os.path.exists(audio_path):
75+
os.makedirs(audio_path)
76+
dates = __generate_date_list_in_isoformat(min_date, max_date)
77+
78+
for date in dates:
79+
date_end = date.replace('00:00:00', '23:59:59')
80+
segments = guardianAudio(token, guardian_id, date, date_end, limit=1000, descending=False)
81+
82+
if segments:
83+
if(parallel):
84+
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
85+
futures = []
86+
for segment in segments:
87+
futures.append(executor.submit(__segmentDownload, audio_path=audio_path, file_ext=file_ext, segment=segment))
88+
89+
futures, _ = concurrent.futures.wait(futures)
90+
else:
91+
for segment in segments:
92+
__segmentDownload(audio_path, file_ext, segment)
93+
print("Finish download on", guardian_id, date[:-10])
94+
else:
95+
print("No data on date:", date[:-10])

package-rfcx/rfcx/client.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import getpass
22
import datetime
3-
from os import path
3+
import os
4+
import re
5+
import rfcx.audio as audio
46
import rfcx._pkce as pkce
57
import rfcx._api_rfcx as api_rfcx
68
import rfcx._api_auth as api_auth
@@ -38,7 +40,7 @@ def authenticate(self, persist=True):
3840
access_token = None
3941

4042
# Attempt to load the credentials from disk
41-
if path.exists(self.persisted_credentials_path):
43+
if os.path.exists(self.persisted_credentials_path):
4244
with open(self.persisted_credentials_path, 'r') as f:
4345
lines = f.read().splitlines()
4446
if len(lines) == 5 and lines[0] == 'version 1':
@@ -186,3 +188,43 @@ def tags(self, type, labels, start=None, end=None, sites=None, limit=1000):
186188
end = datetime.datetime.utcnow().replace(microsecond=0).isoformat() + 'Z'
187189

188190
return api_rfcx.tags(self.credentials.id_token, type, labels, start, end, sites, limit)
191+
192+
def downloadGuardianAudio(self, dest_path=None, guardian_id=None, min_date=None, max_date=None, file_ext='opus', parallel=True):
193+
"""Download audio using audio information from `guardianAudio`
194+
195+
Args:
196+
dest_path: (Required) Path to save audio.
197+
guardianId: (Required) The guid of a guardian
198+
min_date: Minimum timestamp of the audio. If None then defaults to exactly 30 days ago.
199+
max_date: Maximum timestamp of the audio. If None then defaults to now.
200+
file_ext: Audio file extension. Default to `.opus`
201+
parallel: Parallel download audio. Defaults to True.
202+
203+
Returns:
204+
None.
205+
"""
206+
if self.credentials == None:
207+
print('Not authenticated')
208+
return
209+
210+
if dest_path == None:
211+
if not os.path.exists('./audios'):
212+
os.makedirs('./audios')
213+
if guardian_id == None:
214+
print("Please specific the guardian id.")
215+
return
216+
217+
if min_date == None:
218+
min_date = datetime.datetime.utcnow() - datetime.timedelta(days=30)
219+
if not isinstance(min_date, datetime.datetime):
220+
print("min_date is not type datetime")
221+
return
222+
223+
if max_date == None:
224+
max_date = datetime.datetime.utcnow()
225+
if not isinstance(max_date, datetime.datetime):
226+
print("max_date is not type datetime")
227+
return
228+
229+
return audio.downloadGuardianAudio(self.credentials.id_token, dest_path, guardian_id, min_date, max_date, file_ext, parallel)
230+

package-rfcx/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
REQUIRED_PACKAGES = ['httplib2', 'six']
44

55
setup(name='rfcx',
6-
version='0.0.7',
6+
version='0.0.9',
77
url='https://github.com/rfcx/rfcx-sdk-python',
88
license='None',
99
author='Rainforest Connection',

0 commit comments

Comments
 (0)