-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathworker.py
More file actions
36 lines (26 loc) · 922 Bytes
/
worker.py
File metadata and controls
36 lines (26 loc) · 922 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import datetime
import schedule
import time
from cron_utils import is_valid_cron, check_cron_condition
from data_fetching import list_cron_jobs
MINUTE_GRANULARITY = 1
def sample_task():
print("This task is an example for implementation of task functions.")
return
def run_task(task_ref):
if task_ref == "sample":
sample_task()
def job():
end_time = datetime.datetime.now().replace(second=0, microsecond=0)
start_time = end_time - datetime.timedelta(minutes=MINUTE_GRANULARITY-1)
cron_list = list_cron_jobs()
for cron in cron_list:
if not is_valid_cron(cron['cron_expression']):
continue
if check_cron_condition(cron['cron_expression'], start_time, end_time):
run_task(cron['task_ref'])
if __name__ == "__main__":
schedule.every(MINUTE_GRANULARITY).minute.do(job)
while True:
schedule.run_pending()
time.sleep(1)