Skip to content

Commit 2e6b37a

Browse files
committed
feat(upgrade): cache earliest ts, live status refresh
1 parent 959c552 commit 2e6b37a

File tree

3 files changed

+69
-39
lines changed

3 files changed

+69
-39
lines changed

pychunkedgraph/ingest/upgrade/atomic_layer.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,12 @@ def update_chunk(cg: ChunkedGraph, chunk_coords: list[int]):
121121

122122
nodes = []
123123
nodes_ts = []
124-
earliest_ts = cg.get_earliest_timestamp()
124+
try:
125+
earliest_ts = os.environ["EARLIEST_TS"]
126+
earliest_ts = datetime.fromisoformat(earliest_ts)
127+
except KeyError:
128+
earliest_ts = cg.get_earliest_timestamp()
129+
125130
corrupt_nodes = []
126131
for k, v in rr.items():
127132
try:

pychunkedgraph/ingest/upgrade/parent_layer.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,12 @@ def _update_cross_edges_helper(args):
159159
corrupt_nodes = []
160160
earliest_ts = None
161161
if clean_task:
162-
earliest_ts = cg.get_earliest_timestamp()
162+
try:
163+
earliest_ts = os.environ["EARLIEST_TS"]
164+
earliest_ts = datetime.fromisoformat(earliest_ts)
165+
except KeyError:
166+
earliest_ts = cg.get_earliest_timestamp()
167+
163168
for node, parent, node_ts in zip(nodes, parents, nodes_ts):
164169
if parent == 0:
165170
# ignore invalid nodes from failed ingest tasks, w/o parent column entry

pychunkedgraph/ingest/utils.py

Lines changed: 57 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import logging
44
import functools
5-
import math
5+
import math, sys
66
from os import environ
77
from time import sleep
88
from typing import Any, Generator, Tuple
@@ -51,6 +51,10 @@ def bootstrap(
5151
return (meta, ingest_config, client_info)
5252

5353

54+
def move_up(lines: int = 1):
55+
sys.stdout.write(f"\033[{lines}A")
56+
57+
5458
def postprocess_edge_data(im, edge_dict):
5559
data_version = im.cg_meta.data_source.DATA_VERSION
5660
if data_version == 2:
@@ -125,13 +129,16 @@ def get_chunks_not_done(
125129
return [coord for coord, c in zip(coords, completed) if not c]
126130

127131

128-
def print_completion_rate(imanager: IngestionManager, layer: int, span: int = 10):
129-
counts = []
130-
for _ in range(span + 1):
131-
counts.append(imanager.redis.scard(f"{layer}c"))
132-
sleep(1)
133-
rate = np.diff(counts).sum() / span
134-
print(f"{rate} chunks per second.")
132+
def print_completion_rate(imanager: IngestionManager, layer: int, span: int = 30):
133+
rate = 0.0
134+
while True:
135+
counts = []
136+
print(f"{rate} chunks per second.")
137+
for _ in range(span + 1):
138+
counts.append(imanager.redis.scard(f"{layer}c"))
139+
sleep(1)
140+
rate = np.diff(counts).sum() / span
141+
move_up()
135142

136143

137144
def print_status(imanager: IngestionManager, redis, upgrade: bool = False):
@@ -143,32 +150,38 @@ def print_status(imanager: IngestionManager, redis, upgrade: bool = False):
143150
layers = range(2, imanager.cg_meta.layer_count + 1)
144151
if upgrade:
145152
layers = range(2, imanager.cg_meta.layer_count)
146-
layer_counts = imanager.cg_meta.layer_chunk_counts
147153

148-
pipeline = redis.pipeline()
149-
pipeline.get(r_keys.JOB_TYPE)
150-
worker_busy = []
151-
for layer in layers:
152-
pipeline.scard(f"{layer}c")
153-
queue = Queue(f"l{layer}", connection=redis)
154-
pipeline.llen(queue.key)
155-
pipeline.zcard(queue.failed_job_registry.key)
156-
workers = Worker.all(queue=queue)
157-
worker_busy.append(sum([w.get_state() == WorkerStatus.BUSY for w in workers]))
158-
159-
results = pipeline.execute()
160-
job_type = "not_available"
161-
if results[0] is not None:
162-
job_type = results[0].decode()
163-
completed = []
164-
queued = []
165-
failed = []
166-
for i in range(1, len(results), 3):
167-
result = results[i : i + 3]
168-
completed.append(result[0])
169-
queued.append(result[1])
170-
failed.append(result[2])
154+
def _refresh_status():
155+
pipeline = redis.pipeline()
156+
pipeline.get(r_keys.JOB_TYPE)
157+
worker_busy = []
158+
for layer in layers:
159+
pipeline.scard(f"{layer}c")
160+
queue = Queue(f"l{layer}", connection=redis)
161+
pipeline.llen(queue.key)
162+
pipeline.zcard(queue.failed_job_registry.key)
163+
workers = Worker.all(queue=queue)
164+
worker_busy.append(
165+
sum([w.get_state() == WorkerStatus.BUSY for w in workers])
166+
)
167+
168+
results = pipeline.execute()
169+
job_type = "not_available"
170+
if results[0] is not None:
171+
job_type = results[0].decode()
172+
completed = []
173+
queued = []
174+
failed = []
175+
for i in range(1, len(results), 3):
176+
result = results[i : i + 3]
177+
completed.append(result[0])
178+
queued.append(result[1])
179+
failed.append(result[2])
180+
return job_type, completed, queued, failed, worker_busy
181+
182+
job_type, completed, queued, failed, worker_busy = _refresh_status()
171183

184+
layer_counts = imanager.cg_meta.layer_chunk_counts
172185
header = (
173186
f"\njob_type: \t{job_type}"
174187
f"\nversion: \t{imanager.cg.version}"
@@ -177,12 +190,19 @@ def print_status(imanager: IngestionManager, redis, upgrade: bool = False):
177190
"\n\nlayer status:"
178191
)
179192
print(header)
180-
for layer, done, count in zip(layers, completed, layer_counts):
181-
print(f"{layer}\t| {done:9} / {count} \t| {math.floor((done/count)*100):6}%")
193+
while True:
194+
for layer, done, count in zip(layers, completed, layer_counts):
195+
print(
196+
f"{layer}\t| {done:9} / {count} \t| {math.floor((done/count)*100):6}%"
197+
)
182198

183-
print("\n\nqueue status:")
184-
for layer, q, f, wb in zip(layers, queued, failed, worker_busy):
185-
print(f"l{layer}\t| queued: {q:<10} failed: {f:<10} busy: {wb}")
199+
print("\n\nqueue status:")
200+
for layer, q, f, wb in zip(layers, queued, failed, worker_busy):
201+
print(f"l{layer}\t| queued: {q:<10} failed: {f:<10} busy: {wb}")
202+
203+
sleep(1)
204+
_, completed, queued, failed, worker_busy = _refresh_status()
205+
move_up(lines=2 * len(layers) + 3)
186206

187207

188208
def queue_layer_helper(

0 commit comments

Comments
 (0)