refactor(amber): centralize worker-id parsing in virtual_identity#5706
Conversation
Generalize the worker-name regex so the workflow id and operator id are captured explicitly, and add `get_operator_id` to extract the logical operator id from a worker actor name. Both `get_worker_index` and `get_operator_id` use `re.fullmatch`, so a malformed worker id with trailing junk now fails loudly with a `ValueError` instead of parsing silently -- mirroring the Scala `VirtualIdentityUtils.getPhysicalOpId` full-match semantics referenced in the docstring. `get_worker_index` returns the same value for well-formed ids (the worker index is still the final capture group); `get_operator_id` is new and gains its production caller with the loop feature. Split out of the loop-operators PR (apache#5700) to keep that PR reviewable. This helper is independent of the rest of that work and behavior-preserving for well-formed worker ids.
There was a problem hiding this comment.
Pull request overview
This PR refactors Amber’s Python virtual_identity utilities to centralize worker actor-name parsing and to introduce a single helper for extracting logical operator IDs from worker IDs, aligning behavior more closely with the Scala parsing semantics.
Changes:
- Generalize
worker_name_patternto capture workflow id + operator id explicitly, and switch parsing tore.fullmatch. - Add
get_operator_id(worker_id)helper that extracts the logical operator id and raisesValueErroron invalid formats. - Add unit tests covering
get_operator_idfor canonical names, hyphenated operator ids, and invalid inputs.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
amber/src/main/python/core/util/virtual_identity.py |
Updates worker-id regex + parsing strictness; adds get_operator_id; adjusts group indexing. |
amber/src/test/python/core/util/test_virtual_identity.py |
Adds new TestGetOperatorId coverage for operator-id extraction and invalid formats. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #5706 +/- ##
============================================
- Coverage 52.87% 52.84% -0.04%
+ Complexity 2627 2622 -5
============================================
Files 1090 1090
Lines 42176 42182 +6
Branches 4531 4531
============================================
- Hits 22302 22292 -10
- Misses 18569 18579 +10
- Partials 1305 1311 +6
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 412 | 0.252 | 22,939/34,665/34,665 us | 🔴 +16.3% / ⚪ within ±5% |
| 🔴 | bs=100 sw=10 sl=64 | 925 | 0.565 | 107,898/133,719/133,719 us | 🔴 +9.5% / ⚪ within ±5% |
| ⚪ | bs=1000 sw=10 sl=64 | 1,082 | 0.66 | 922,319/962,987/962,987 us | ⚪ within ±5% / 🟢 -5.9% |
Baseline details
Latest main b21540d from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 412 tuples/sec | 457 tuples/sec | 410.82 tuples/sec | -9.8% | +0.3% |
| bs=10 sw=10 sl=64 | MB/s | 0.252 MB/s | 0.279 MB/s | 0.251 MB/s | -9.7% | +0.5% |
| bs=10 sw=10 sl=64 | p50 | 22,939 us | 19,732 us | 23,785 us | +16.3% | -3.6% |
| bs=10 sw=10 sl=64 | p95 | 34,665 us | 31,351 us | 34,980 us | +10.6% | -0.9% |
| bs=10 sw=10 sl=64 | p99 | 34,665 us | 31,351 us | 34,980 us | +10.6% | -0.9% |
| bs=100 sw=10 sl=64 | throughput | 925 tuples/sec | 1,006 tuples/sec | 891.94 tuples/sec | -8.1% | +3.7% |
| bs=100 sw=10 sl=64 | MB/s | 0.565 MB/s | 0.614 MB/s | 0.544 MB/s | -8.0% | +3.8% |
| bs=100 sw=10 sl=64 | p50 | 107,898 us | 98,574 us | 112,277 us | +9.5% | -3.9% |
| bs=100 sw=10 sl=64 | p95 | 133,719 us | 124,883 us | 139,802 us | +7.1% | -4.4% |
| bs=100 sw=10 sl=64 | p99 | 133,719 us | 124,883 us | 139,802 us | +7.1% | -4.4% |
| bs=1000 sw=10 sl=64 | throughput | 1,082 tuples/sec | 1,127 tuples/sec | 1,041 tuples/sec | -4.0% | +3.9% |
| bs=1000 sw=10 sl=64 | MB/s | 0.66 MB/s | 0.688 MB/s | 0.635 MB/s | -4.1% | +3.9% |
| bs=1000 sw=10 sl=64 | p50 | 922,319 us | 888,800 us | 972,714 us | +3.8% | -5.2% |
| bs=1000 sw=10 sl=64 | p95 | 962,987 us | 938,261 us | 1,023,057 us | +2.6% | -5.9% |
| bs=1000 sw=10 sl=64 | p99 | 962,987 us | 938,261 us | 1,023,057 us | +2.6% | -5.9% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,484.86,200,128000,412,0.252,22939.32,34664.94,34664.94
1,100,10,64,20,2161.81,2000,1280000,925,0.565,107897.77,133718.74,133718.74
2,1000,10,64,20,18487.53,20000,12800000,1082,0.660,922318.91,962987.01,962987.01…sage Address Copilot review on apache#5706: - Add trailing-junk regression tests for get_worker_index and get_operator_id so the re.match -> re.fullmatch change (whose only observable effect is rejecting a worker id with trailing junk) cannot silently regress. The existing partial-match tests only cover a missing component, which fails under both match and fullmatch. - Include the offending worker_id in get_worker_index's ValueError, matching get_operator_id, for easier debugging and consistency.
Address Yicong's feedback on apache#5706 — the Python side added `core.util.virtual_identity.get_operator_id` as a one-shot helper for extracting the logical operator id from a worker actor id. Mirror that on the Scala side via `VirtualIdentityUtils.getOperatorId` so both languages have the same API surface. - Add `getOperatorId(workerId: ActorVirtualIdentity): String` as a thin wrapper around `getPhysicalOpId(workerId).logicalOpId.id`. Docstring notes the deliberate semantic difference from the Python sibling (Scala preserves the existing `"__DummyOperator"` sentinel-on-miss behavior so it stays a drop-in replacement for inline call sites; Python raises ValueError for fail-loud semantics). - Migrate the one existing call site in `ErrorUtils.getOperatorFromActorIdOpt` to use the new helper. - Add VirtualIdentityUtilsSpec cases pinning: extraction on a worker actor name, agreement with `getPhysicalOpId(...).logicalOpId.id`, and the sentinel fallback for non-worker actor names.
Address Yicong's follow-up on apache#5706 — the bare name 'operator id' doesn't make the logical-vs-physical distinction visible at the call site. Rename to mirror the existing Scala 'getPhysicalOpId' convention: - Scala: VirtualIdentityUtils.getOperatorId -> getLogicalOpId - Python: core.util.virtual_identity.get_operator_id -> get_logical_op_id Updated all callers (ErrorUtils, both spec files) and the cross-language docstrings.
apache#5706) The worker-id helper PR (apache#5706) renamed the helper to get_logical_op_id to make the logical-vs-physical distinction explicit. Apply the same rename here so loop-feb stays internally consistent and rebases cleanly onto main once apache#5706 lands: - virtual_identity.py + its test: brought in line with apache#5706's final versions (rename, fail-loud error message including the worker id, and trailing-junk regression coverage) - main_loop._compute_loop_start_id: call get_logical_op_id - test_main_loop: update the helper name in two explanatory comments
What changes were proposed in this PR?
Centralizes the Python worker's worker-id parsing in
core/util/virtual_identity.py:get_operator_id(worker_id)— extracts the logical operator id from a worker actor name (Worker:WF<wf>-<op>-<layer>-<idx>), raisingValueErroron a malformed id.worker_name_patternto capture the workflow id and operator id explicitly.get_worker_indexandget_operator_idtore.fullmatch, so a malformed id with trailing junk now fails loudly instead of parsing silently — matching the ScalaVirtualIdentityUtils.getPhysicalOpIdfull-match semantics the docstring already claims.get_worker_index, well-formed idget_worker_index, malformed id (trailing junk)ValueErrorget_operator_idBehavior-preserving for well-formed worker ids.
get_operator_id's production caller lands with the for-loop feature; the helper and its test are independent and mergeable now.Any related issues, documentation, discussions?
Resolves #5708 (sub-issue of #4442 "Introduce for loop"). Split out of #5700 to keep that PR reviewable, per @Xiao-zhen-Liu's review.
How was this PR tested?
pytest src/test/python/core/util/test_virtual_identity.py— 23 passing, covering well-formed ids, the newget_operator_id, and malformed ids that now raiseValueError.ruff check/formatclean.Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF.