Skip to content

Commit 45f8b07

Browse files
author
Test User
committed
harden warm pool lifecycle before release
1 parent e796ac0 commit 45f8b07

23 files changed

Lines changed: 1361 additions & 461 deletions

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ lint-imports:
5252

5353
audit:
5454
@echo "$(GREEN)Running dependency security audit...$(NC)"
55-
pip-audit --strict --desc --skip-editable
55+
pip-audit --strict --desc --skip-editable --ignore-vuln CVE-2026-4539
5656

5757
test:
5858
$(PYTEST) tests/unit tests/contracts -q --tb=short -m "not slow"

src/goldfish/cloud/adapters/gcp/gce_launcher.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -853,17 +853,20 @@ def get_instance_logs(
853853
tail_lines: int | None = None,
854854
since: str | None = None,
855855
retry_on_empty: bool = False,
856+
serial_console_name: str | None = None,
856857
) -> str:
857858
"""Retrieve logs from GCE instance.
858859
859860
Tries to fetch from GCS first, falls back to serial console.
860861
861862
Args:
862-
instance_name: Instance identifier
863-
tail_lines: Number of lines from end to return
864-
since: Only return logs after this ISO timestamp
863+
instance_name: Identifier for GCS log paths (stage_run_id for warm pool reuse).
864+
tail_lines: Number of lines from end to return.
865+
since: Only return logs after this ISO timestamp.
865866
retry_on_empty: If True, retry once after delay if logs are empty
866-
(handles GCS eventual consistency)
867+
(handles GCS eventual consistency).
868+
serial_console_name: Real GCE instance name for serial console fallback.
869+
If None, falls back to instance_name.
867870
868871
Returns:
869872
Instance logs as string
@@ -1030,10 +1033,11 @@ def _fetch_gcs_logs() -> str | None:
10301033
if result is not None:
10311034
return result
10321035

1033-
# Fall back to serial console - find correct zone first
1034-
zone = self._find_instance_zone(instance_name)
1036+
# Fall back to serial console - use real GCE instance name
1037+
console_name = self._sanitize_name(serial_console_name) if serial_console_name else instance_name
1038+
zone = self._find_instance_zone(console_name)
10351039
if not zone:
1036-
return f"Instance {instance_name} not found in any configured zone"
1040+
return f"Instance {console_name} not found in any configured zone"
10371041

10381042
def _filter_serial_noise(lines: list[str]) -> list[str]:
10391043
"""Filter out noisy metadata syncer and startup script debug output."""
@@ -1071,7 +1075,7 @@ def _filter_serial_noise(lines: list[str]) -> list[str]:
10711075
"compute",
10721076
"instances",
10731077
"get-serial-port-output",
1074-
instance_name,
1078+
console_name,
10751079
f"--zone={zone}",
10761080
"--port=1",
10771081
]

src/goldfish/cloud/adapters/gcp/run_backend.py

Lines changed: 87 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -203,11 +203,23 @@ def launch(self, spec: RunSpec) -> RunHandle:
203203

204204
# Try warm pool first — reuse an idle VM matching hardware spec
205205
if self._warm_pool and self._warm_pool.is_enabled_for(spec.profile):
206-
warm_handle = self._try_warm_pool_claim(spec)
206+
try:
207+
warm_handle = self._try_warm_pool_claim(spec)
208+
except Exception as e:
209+
# Dispatch failure where the VM might still be alive (timeout,
210+
# ambiguous gcloud error). The instance is transitioning to deleting
211+
# but could have received the signal. Do NOT fall through to a fresh
212+
# launch — that risks double-dispatch for the same stage_run_id.
213+
# (Dead-VM failures return None from try_claim and fall through safely.)
214+
raise LaunchError(
215+
f"Warm pool dispatch failed after assignment: {e}",
216+
stage_run_id=spec.stage_run_id,
217+
cause="warm_dispatch_failure",
218+
) from e
207219
if warm_handle is not None:
208220
logger.info("Warm pool claim succeeded: %s → %s", spec.stage_run_id, warm_handle.backend_handle)
209221
return warm_handle
210-
logger.info("Warm pool claim failed for %s, falling through to fresh launch", spec.stage_run_id)
222+
logger.info("No idle warm instance for %s, falling through to fresh launch", spec.stage_run_id)
211223

212224
# Initialize warm-pool tracking variables BEFORE the try block so the
213225
# exception handler can safely reference them without UnboundLocalError.
@@ -351,24 +363,34 @@ def launch(self, spec: RunSpec) -> RunHandle:
351363
spec.stage_run_id,
352364
)
353365
if not ctrl_result.success:
354-
# Controller couldn't take ownership. Remove the pre-registered
355-
# row and disable the idle loop via metadata so the VM self-deletes
356-
# after the first job instead of entering the untracked idle loop.
366+
# Controller couldn't take ownership. Disable the idle loop
367+
# via metadata FIRST so the VM self-deletes after the first job,
368+
# then remove the pre-registered row. Order matters: if we delete
369+
# the row first and the metadata call fails, the VM enters the
370+
# idle loop as an untracked warm instance that the daemon can't reap.
357371
logger.warning(
358372
"Controller on_fresh_launch failed for %s: %s — disabling idle loop",
359373
actual_name,
360374
ctrl_result.details,
361375
)
362376
try:
363-
self._warm_pool._db.delete_warm_instance(actual_name)
364377
self._warm_pool._set_instance_metadata(
365378
actual_name,
366379
actual_zone,
367380
"goldfish_warm_pool_disabled",
368381
"true",
369382
)
370383
except Exception:
371-
pass
384+
# Metadata set failed — do NOT delete the row. Keeping the row
385+
# ensures the daemon can still track and eventually delete this VM.
386+
logger.warning(
387+
"Failed to disable idle loop for %s — keeping DB row for daemon tracking",
388+
actual_name,
389+
)
390+
# Leave the row so daemon can reap it via launching timeout
391+
return handle
392+
# Metadata set succeeded — safe to remove the row now
393+
self._warm_pool._db.delete_warm_instance(actual_name)
372394

373395
return handle
374396

@@ -458,25 +480,26 @@ def _try_warm_pool_claim(self, spec: RunSpec) -> RunHandle | None:
458480
else:
459481
docker_cmd = "/entrypoint.sh"
460482

461-
# Build env for the warm-pool job. This must preserve the same Goldfish
462-
# runtime contract as a cold launch so goldfish.io and runtime helpers
463-
# see the expected stage config, run id, and input/output mount paths.
483+
# Build env for the warm-pool job. spec.env already contains the full
484+
# GOLDFISH_STAGE_CONFIG from the executor (with per-signal config, format,
485+
# schema, output definitions). Only fill in keys that aren't already set.
464486
warm_env = dict(spec.env)
465-
warm_env.update(
466-
{
467-
"GOLDFISH_STAGE_CONFIG": json.dumps(
468-
{
469-
"inputs": serialized_inputs,
470-
"compute": {"max_runtime_seconds": spec.timeout_seconds},
471-
}
472-
),
473-
"GOLDFISH_RUN_ID": spec.stage_run_id,
474-
"GOLDFISH_INPUTS_DIR": "/mnt/inputs",
475-
"GOLDFISH_OUTPUTS_DIR": "/mnt/outputs",
476-
}
487+
warm_env.setdefault("GOLDFISH_RUN_ID", spec.stage_run_id)
488+
warm_env.setdefault("GOLDFISH_INPUTS_DIR", "/mnt/inputs")
489+
warm_env.setdefault("GOLDFISH_OUTPUTS_DIR", "/mnt/outputs")
490+
warm_env.setdefault(
491+
"GOLDFISH_STAGE_CONFIG",
492+
json.dumps(
493+
{
494+
"inputs": serialized_inputs,
495+
"compute": {"max_runtime_seconds": spec.timeout_seconds},
496+
}
497+
),
477498
)
478499
if spec.gpu_count and spec.gpu_count > 0:
479-
warm_env["LD_LIBRARY_PATH"] = "/tmp/cuda-symlinks:/usr/lib/x86_64-linux-gnu"
500+
existing_ld = warm_env.get("LD_LIBRARY_PATH", "")
501+
prefix = "/tmp/cuda-symlinks:/usr/lib/x86_64-linux-gnu"
502+
warm_env["LD_LIBRARY_PATH"] = f"{prefix}:{existing_ld}" if existing_ld else prefix
480503

481504
job_spec = {
482505
"image": spec.image,
@@ -497,6 +520,11 @@ def _try_warm_pool_claim(self, spec: RunSpec) -> RunHandle | None:
497520
"max_runtime_seconds": spec.timeout_seconds,
498521
}
499522

523+
# Extract allowed zones from the resolved profile to enforce placement
524+
# constraints. Without this, two profiles sharing hardware but pinning
525+
# different zones could reuse each other's idle VMs.
526+
profile_zones = profile.get("zones")
527+
500528
return self._warm_pool.try_claim(
501529
machine_type=spec.machine_type or "n1-standard-4",
502530
gpu_count=spec.gpu_count,
@@ -505,6 +533,7 @@ def _try_warm_pool_claim(self, spec: RunSpec) -> RunHandle | None:
505533
preemptible=spec.spot,
506534
stage_run_id=spec.stage_run_id,
507535
job_spec=job_spec,
536+
allowed_zones=profile_zones,
508537
)
509538

510539
def get_status(self, handle: RunHandle) -> BackendStatus:
@@ -532,15 +561,38 @@ def get_status(self, handle: RunHandle) -> BackendStatus:
532561
# - Warm: exit code appears, VM enters idle loop instead of self-deleting
533562
# CRITICAL: Use stage_run_id (not instance_name) because warm instances
534563
# reuse the same VM for different stage_run_ids.
535-
exit_result = self._launcher._get_exit_code(handle.stage_run_id)
536-
if exit_result.exists and exit_result.code is not None:
537-
return BackendStatus.from_exit_code(exit_result.code)
564+
# Only check if launcher has a bucket — without one, _get_exit_code()
565+
# returns a synthetic code 0 which would misclassify running VMs as completed.
566+
if self._launcher.bucket_uri:
567+
exit_result = self._launcher._get_exit_code(handle.stage_run_id)
568+
if exit_result.exists and exit_result.code is not None:
569+
return BackendStatus.from_exit_code(exit_result.code)
570+
571+
# Warm-pool fallback: if the per-run exit_code upload to GCS failed,
572+
# the warm idle loop publishes the exit code and run ID in instance
573+
# metadata before returning to idle. Only trust metadata when it is
574+
# explicitly tagged with THIS stage_run_id; otherwise a stale exit
575+
# code from a previous lease could misclassify a new run.
576+
if self._warm_pool and handle.zone:
577+
try:
578+
metadata = self._warm_pool.get_instance_metadata(instance_name, handle.zone)
579+
if metadata.get("goldfish_exit_run_id") == handle.stage_run_id:
580+
exit_code_str = metadata.get("goldfish_exit_code")
581+
if isinstance(exit_code_str, str) and exit_code_str != "":
582+
return BackendStatus.from_exit_code(int(exit_code_str))
583+
except Exception as e:
584+
logger.debug("Warm-pool metadata exit-code check failed for %s: %s", instance_name, e)
585+
538586
return BackendStatus(status=RunStatus.RUNNING)
539587
elif status_str in (StageState.COMPLETED, StageState.FAILED):
540588
# GCE instance is TERMINATED/STOPPED. get_instance_status derives
541589
# COMPLETED/FAILED from the instance_name's exit code path, but for
542590
# warm-pool instances that's the FIRST job's exit code, not the current one.
543591
# Always re-check using the current stage_run_id to get the right exit code.
592+
# Only check if launcher has a bucket — without one, _get_exit_code()
593+
# returns a synthetic code 0 which would mask the real termination cause.
594+
if not self._launcher.bucket_uri:
595+
return BackendStatus(status=RunStatus.FAILED, exit_code=1)
544596
exit_result = self._launcher._get_exit_code(handle.stage_run_id)
545597
if exit_result.exists and exit_result.code is not None:
546598
return BackendStatus.from_exit_code(exit_result.code)
@@ -561,6 +613,10 @@ def get_status(self, handle: RunHandle) -> BackendStatus:
561613
elif status_str == "not_found":
562614
# Instance is gone - try to recover exit code from GCS
563615
# This handles spot preemption where instance disappears but wrote exit code
616+
# Only check if launcher has a bucket — without one, _get_exit_code()
617+
# returns a synthetic code 0 which would mask the not-found condition.
618+
if not self._launcher.bucket_uri:
619+
raise NotFoundError(f"instance:{instance_name}")
564620
exit_result = self._launcher._get_exit_code(handle.stage_run_id)
565621
if exit_result.exists and exit_result.code is not None:
566622
# Exit code found - instance ran and terminated
@@ -605,15 +661,18 @@ def get_logs(self, handle: RunHandle, tail: int = 200, since: str | None = None)
605661
Returns:
606662
Log content as string.
607663
"""
608-
# Use stage_run_id for GCS log paths, not instance_name.
609-
# Warm-pool reuse uploads logs under runs/<stage_run_id>/, not runs/<instance_name>/.
664+
# Use stage_run_id for GCS log paths (warm-pool reuse uploads under
665+
# runs/<stage_run_id>/), but pass backend_handle for serial console
666+
# fallback (the real GCE instance name).
610667
log_key = handle.stage_run_id or handle.backend_handle
668+
serial_name = handle.backend_handle if handle.stage_run_id else None
611669

612670
try:
613671
return self._launcher.get_instance_logs(
614672
instance_name=log_key,
615673
tail_lines=tail if tail > 0 else None,
616674
since=since,
675+
serial_console_name=serial_name,
617676
)
618677
except Exception as e:
619678
logger.warning("Error getting logs for %s: %s", log_key, e)

0 commit comments

Comments
 (0)