Skip to content

Commit 08d312b

Browse files
fix: address review comments — SDK compatibility, race condition, wording
1. Python SDK: Add STREAM_KEEPALIVE to ActionType enum so the SDK recognizes the new value instead of crashing on unknown enum. 2. Python SDK: Make convert_proto_enum_to_python return None for unknown enum values instead of raising KeyError. This makes the SDK forward-compatible with future ActionType additions. 3. Python SDK: Skip STREAM_KEEPALIVE (and any unknown) actions early in the action listener generator before attempting to parse payload. 4. Engine: Use subscribedWorker.sendMu mutex for keepalive sends to avoid racing with real AssignedAction dispatches from other goroutines. gRPC server streams are not safe for concurrent sends. 5. Engine: Update comment to say "periodic" instead of "idle" — the keepalive fires unconditionally at the interval, not only when idle. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
1 parent 0c9f082 commit 08d312b

File tree

4 files changed

+36
-13
lines changed

4 files changed

+36
-13
lines changed

internal/services/dispatcher/server.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,8 @@ func (s *DispatcherImpl) ListenV2(request *contracts.WorkerListenRequest, stream
343343

344344
fin := make(chan bool)
345345

346-
s.workers.Add(workerId, sessionId, newSubscribedWorker(stream, fin, workerId, s.defaultMaxWorkerBacklogSize, s.pubBuffer))
346+
sw := newSubscribedWorker(stream, fin, workerId, s.defaultMaxWorkerBacklogSize, s.pubBuffer)
347+
s.workers.Add(workerId, sessionId, sw)
347348

348349
defer func() {
349350
// non-blocking send
@@ -355,10 +356,12 @@ func (s *DispatcherImpl) ListenV2(request *contracts.WorkerListenRequest, stream
355356
s.workers.DeleteForSession(workerId, sessionId)
356357
}()
357358

358-
// Optionally send periodic keepalive messages on the stream to prevent
359-
// L7 proxies (e.g., Envoy on Azure Container Apps) from killing the stream
360-
// due to stream_idle_timeout. This does NOT affect worker liveness detection,
361-
// which is handled by the separate Heartbeat RPC.
359+
// Optionally send periodic keepalive messages on the ListenV2 stream to
360+
// prevent L7 proxies (e.g., Envoy on Azure Container Apps) from killing the
361+
// stream due to stream_idle_timeout. Keepalives are sent unconditionally at
362+
// the configured interval regardless of other stream activity. This does NOT
363+
// affect worker liveness detection, which is handled by the separate
364+
// Heartbeat RPC.
362365
var keepaliveTicker *time.Ticker
363366
var keepaliveC <-chan time.Time
364367

@@ -397,9 +400,13 @@ func (s *DispatcherImpl) ListenV2(request *contracts.WorkerListenRequest, stream
397400

398401
return nil
399402
case <-keepaliveC:
400-
if err := stream.Send(&contracts.AssignedAction{
403+
sw.sendMu.Lock()
404+
err := stream.Send(&contracts.AssignedAction{
401405
ActionType: contracts.ActionType_STREAM_KEEPALIVE,
402-
}); err != nil {
406+
})
407+
sw.sendMu.Unlock()
408+
409+
if err != nil {
403410
s.l.Debug().Err(err).Msgf("failed to send stream keepalive for worker %s, closing stream", request.WorkerId)
404411
return nil
405412
}

sdks/python/hatchet_sdk/clients/dispatcher/action_listener.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,17 @@ async def _generator(self) -> AsyncGenerator[Action | None, None]:
215215

216216
assigned_action = result.data
217217

218+
# Skip keepalive messages — they only exist to prevent
219+
# L7 proxies from killing idle streams.
220+
action_type = convert_proto_enum_to_python(
221+
assigned_action.action_type,
222+
ActionType,
223+
ActionTypeProto,
224+
)
225+
226+
if action_type is None or action_type == ActionType.STREAM_KEEPALIVE:
227+
continue
228+
218229
try:
219230
action_payload = (
220231
ActionPayload()
@@ -239,11 +250,7 @@ async def _generator(self) -> AsyncGenerator[Action | None, None]:
239250
step_run_id=assigned_action.task_run_external_id,
240251
action_id=assigned_action.action_id,
241252
action_payload=action_payload,
242-
action_type=convert_proto_enum_to_python(
243-
assigned_action.action_type,
244-
ActionType,
245-
ActionTypeProto,
246-
),
253+
action_type=action_type,
247254
retry_count=assigned_action.retry_count,
248255
additional_metadata=parse_additional_metadata(
249256
assigned_action.additional_metadata

sdks/python/hatchet_sdk/runnables/action.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def validate_filter_payload(self) -> "ActionPayload":
4949
class ActionType(str, Enum):
5050
START_STEP_RUN = "START_STEP_RUN"
5151
CANCEL_STEP_RUN = "CANCEL_STEP_RUN"
52+
STREAM_KEEPALIVE = "STREAM_KEEPALIVE"
5253

5354

5455
class Action(BaseModel):

sdks/python/hatchet_sdk/utils/proto_enums.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,12 @@ def convert_proto_enum_to_python(
4444
if value is None:
4545
return None
4646

47-
return python_enum_class[proto_enum.Name(value)]
47+
name = proto_enum.Name(value)
48+
49+
try:
50+
return python_enum_class[name]
51+
except KeyError:
52+
# Unknown enum values (e.g., from a newer server) are returned as None
53+
# so callers can skip them gracefully. This prevents SDK crashes when the
54+
# engine sends new ActionType values like STREAM_KEEPALIVE.
55+
return None

0 commit comments

Comments
 (0)