From 0c5fc9fe2114366289cc19cbda24b862de1d6a67 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sat, 13 Jun 2026 18:52:16 -0700 Subject: [PATCH 1/4] refactor(amber): centralize worker-id parsing in virtual_identity Generalize the worker-name regex so the workflow id and operator id are captured explicitly, and add `get_operator_id` to extract the logical operator id from a worker actor name. Both `get_worker_index` and `get_operator_id` use `re.fullmatch`, so a malformed worker id with trailing junk now fails loudly with a `ValueError` instead of parsing silently -- mirroring the Scala `VirtualIdentityUtils.getPhysicalOpId` full-match semantics referenced in the docstring. `get_worker_index` returns the same value for well-formed ids (the worker index is still the final capture group); `get_operator_id` is new and gains its production caller with the loop feature. Split out of the loop-operators PR (apache/texera#5700) to keep that PR reviewable. This helper is independent of the rest of that work and behavior-preserving for well-formed worker ids. --- .../main/python/core/util/virtual_identity.py | 22 ++++++++++++-- .../python/core/util/test_virtual_identity.py | 30 +++++++++++++++++++ 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/amber/src/main/python/core/util/virtual_identity.py b/amber/src/main/python/core/util/virtual_identity.py index 49da75fcd58..143aa86d0e4 100644 --- a/amber/src/main/python/core/util/virtual_identity.py +++ b/amber/src/main/python/core/util/virtual_identity.py @@ -24,18 +24,34 @@ ActorVirtualIdentity, ) -worker_name_pattern = re.compile(r"Worker:WF\d+-.+-(\w+)-(\d+)") +worker_name_pattern = re.compile(r"Worker:WF(\d+)-(.+)-(\w+)-(\d+)") MATERIALIZATION_READER_ACTOR_PREFIX = "MATERIALIZATION_READER_" def get_worker_index(worker_id: str) -> int: - match = worker_name_pattern.match(worker_id) + match = worker_name_pattern.fullmatch(worker_id) if match: - return int(match.group(2)) + return int(match.group(4)) raise ValueError("Invalid worker ID format") +def get_operator_id(worker_id: str) -> str: + """ + Extract the logical operator id from a worker actor name of the form + ``Worker:WF---``. + + Mirrors the canonical parse in Scala ``VirtualIdentityUtils.getPhysicalOpId``. + Unlike the Scala sibling (which returns a ``__DummyOperator`` sentinel on a + non-match), this raises ``ValueError`` so a malformed worker id fails loudly + rather than yielding a wrong id silently. + """ + match = worker_name_pattern.fullmatch(worker_id) + if match: + return match.group(2) + raise ValueError(f"Invalid worker ID format: {worker_id}") + + def serialize_global_port_identity(obj: GlobalPortIdentity) -> str: """ Serialize GlobalPortIdentity into a custom human-readable string. diff --git a/amber/src/test/python/core/util/test_virtual_identity.py b/amber/src/test/python/core/util/test_virtual_identity.py index c2f6f636850..4eabd5a08f7 100644 --- a/amber/src/test/python/core/util/test_virtual_identity.py +++ b/amber/src/test/python/core/util/test_virtual_identity.py @@ -20,6 +20,7 @@ from core.util.virtual_identity import ( deserialize_global_port_identity, get_from_actor_id_for_input_port_storage, + get_operator_id, get_worker_index, serialize_global_port_identity, ) @@ -76,6 +77,35 @@ def test_extracts_trailing_index_even_when_layer_name_contains_hyphens(self): assert get_worker_index("Worker:WF1-myOp-1st-physical-op-3") == 3 +class TestGetOperatorId: + def test_extracts_operator_id_from_canonical_name(self): + assert get_operator_id("Worker:WF1-myOp-main-0") == "myOp" + + def test_isolates_operator_id_containing_hyphens(self): + # Load-bearing: operator ids contain dashes; greedy `.+` must still + # stop at the final - tokens. + assert ( + get_operator_id("Worker:WF12-PythonUDFV2-abc-def-main-0") + == "PythonUDFV2-abc-def" + ) + + def test_handles_non_main_layer_and_nonzero_index(self): + # The exact case the old `rsplit("-main-0")` got silently wrong. + assert get_operator_id("Worker:WF3-op-loopLayer-7") == "op" + + def test_operator_id_ending_in_digits(self): + assert get_operator_id("Worker:WF1-op123-main-0") == "op123" + + def test_raises_value_error_on_special_actor_id(self): + # Companions like CONTROLLER / SELF must fail loudly, not return junk. + with pytest.raises(ValueError, match="Invalid worker ID format"): + get_operator_id("CONTROLLER") + + def test_raises_value_error_on_partial_match(self): + with pytest.raises(ValueError, match="Invalid worker ID format"): + get_operator_id("Worker:WF1-myOp-main") + + class TestSerializeGlobalPortIdentity: def test_emits_documented_format_for_canonical_input(self): encoded = serialize_global_port_identity(_gpi()) From a749c4696c07a02eeb1e08cc42d6c46e40d5ea88 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sat, 13 Jun 2026 19:30:35 -0700 Subject: [PATCH 2/4] test(amber): pin fullmatch trailing-junk behavior and align error message Address Copilot review on apache/texera#5706: - Add trailing-junk regression tests for get_worker_index and get_operator_id so the re.match -> re.fullmatch change (whose only observable effect is rejecting a worker id with trailing junk) cannot silently regress. The existing partial-match tests only cover a missing component, which fails under both match and fullmatch. - Include the offending worker_id in get_worker_index's ValueError, matching get_operator_id, for easier debugging and consistency. --- .../src/main/python/core/util/virtual_identity.py | 2 +- .../test/python/core/util/test_virtual_identity.py | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/amber/src/main/python/core/util/virtual_identity.py b/amber/src/main/python/core/util/virtual_identity.py index 143aa86d0e4..73db04fa5ca 100644 --- a/amber/src/main/python/core/util/virtual_identity.py +++ b/amber/src/main/python/core/util/virtual_identity.py @@ -33,7 +33,7 @@ def get_worker_index(worker_id: str) -> int: match = worker_name_pattern.fullmatch(worker_id) if match: return int(match.group(4)) - raise ValueError("Invalid worker ID format") + raise ValueError(f"Invalid worker ID format: {worker_id}") def get_operator_id(worker_id: str) -> str: diff --git a/amber/src/test/python/core/util/test_virtual_identity.py b/amber/src/test/python/core/util/test_virtual_identity.py index 4eabd5a08f7..ea4fe03b37b 100644 --- a/amber/src/test/python/core/util/test_virtual_identity.py +++ b/amber/src/test/python/core/util/test_virtual_identity.py @@ -76,6 +76,13 @@ def test_extracts_trailing_index_even_when_layer_name_contains_hyphens(self): # greedy `.+` and breaks the trailing match surfaces here. assert get_worker_index("Worker:WF1-myOp-1st-physical-op-3") == 3 + def test_raises_value_error_on_trailing_junk(self): + # fullmatch (not match) anchors the end of the string: a well-formed + # prefix followed by trailing junk must fail loudly. The old + # start-anchored match() would have silently returned 7 here. + with pytest.raises(ValueError, match="Invalid worker ID format"): + get_worker_index("Worker:WF1-myOp-main-7extra") + class TestGetOperatorId: def test_extracts_operator_id_from_canonical_name(self): @@ -105,6 +112,13 @@ def test_raises_value_error_on_partial_match(self): with pytest.raises(ValueError, match="Invalid worker ID format"): get_operator_id("Worker:WF1-myOp-main") + def test_raises_value_error_on_trailing_junk(self): + # fullmatch anchors the end: a valid-looking prefix with trailing junk + # must fail loudly. The old start-anchored match() would have silently + # returned "myOp" here. + with pytest.raises(ValueError, match="Invalid worker ID format"): + get_operator_id("Worker:WF1-myOp-main-0extra") + class TestSerializeGlobalPortIdentity: def test_emits_documented_format_for_canonical_input(self): From f4eedd5601bde816ec8249525568249e56f81ba0 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sat, 13 Jun 2026 22:33:09 -0700 Subject: [PATCH 3/4] refactor(amber): add Scala getOperatorId helper for parity with Python MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address Yicong's feedback on #5706 — the Python side added `core.util.virtual_identity.get_operator_id` as a one-shot helper for extracting the logical operator id from a worker actor id. Mirror that on the Scala side via `VirtualIdentityUtils.getOperatorId` so both languages have the same API surface. - Add `getOperatorId(workerId: ActorVirtualIdentity): String` as a thin wrapper around `getPhysicalOpId(workerId).logicalOpId.id`. Docstring notes the deliberate semantic difference from the Python sibling (Scala preserves the existing `"__DummyOperator"` sentinel-on-miss behavior so it stays a drop-in replacement for inline call sites; Python raises ValueError for fail-loud semantics). - Migrate the one existing call site in `ErrorUtils.getOperatorFromActorIdOpt` to use the new helper. - Add VirtualIdentityUtilsSpec cases pinning: extraction on a worker actor name, agreement with `getPhysicalOpId(...).logicalOpId.id`, and the sentinel fallback for non-worker actor names. --- .../texera/amber/error/ErrorUtils.scala | 2 +- .../amber/util/VirtualIdentityUtils.scala | 15 ++++++++++ .../amber/util/VirtualIdentityUtilsSpec.scala | 28 +++++++++++++++++++ 3 files changed, 44 insertions(+), 1 deletion(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala b/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala index af16044af29..84d232feea2 100644 --- a/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala +++ b/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala @@ -110,7 +110,7 @@ object ErrorUtils { var operatorId = "unknown operator" var workerId = "" if (actorIdOpt.isDefined) { - operatorId = VirtualIdentityUtils.getPhysicalOpId(actorIdOpt.get).logicalOpId.id + operatorId = VirtualIdentityUtils.getOperatorId(actorIdOpt.get) workerId = actorIdOpt.get.name } (operatorId, workerId) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala index 586d77a9c55..0baa972f910 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala @@ -71,6 +71,21 @@ object VirtualIdentityUtils { } } + /** + * Extract the logical operator id from a worker actor id of the form + * `Worker:WF---`. + * + * Convenience wrapper for the common `getPhysicalOpId(workerId).logicalOpId.id` + * pattern; the Python sibling is `core.util.virtual_identity.get_operator_id`. + * The Python helper raises `ValueError` on a non-match for fail-loud + * semantics; this Scala helper preserves the existing sentinel-on-miss + * behavior (`"__DummyOperator"`) so it stays a drop-in replacement for + * the inline pattern at call sites. + */ + def getOperatorId(workerId: ActorVirtualIdentity): String = { + getPhysicalOpId(workerId).logicalOpId.id + } + def getWorkerIndex(workerId: ActorVirtualIdentity): Option[Int] = { workerId.name match { case workerNamePattern(_, _, _, idx) => diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala index 8ebf7dabec1..b9f9f042c92 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala @@ -94,6 +94,34 @@ class VirtualIdentityUtilsSpec extends AnyFlatSpec with Matchers { } } + // ----- getOperatorId ----- + + "getOperatorId" should "return the logical operator id from a worker actor name" in { + val actor = ActorVirtualIdentity("Worker:WF7-myOp-main-3") + VirtualIdentityUtils.getOperatorId(actor) shouldBe "myOp" + } + + it should "match getPhysicalOpId(...).logicalOpId.id for worker actor names" in { + // Pin the helper as a thin wrapper — `getOperatorId(workerId)` and + // `getPhysicalOpId(workerId).logicalOpId.id` must always agree, so + // call sites that migrate to the helper are guaranteed to keep + // identical behavior. + val actor = ActorVirtualIdentity("Worker:WF1-multi-part-op-main-0") + VirtualIdentityUtils.getOperatorId(actor) shouldBe + VirtualIdentityUtils.getPhysicalOpId(actor).logicalOpId.id + } + + it should "fall back to the __DummyOperator sentinel for non-worker actor names" in { + // The Python sibling raises ValueError on a non-match; the Scala + // helper preserves the existing __DummyOperator sentinel so it + // stays a drop-in replacement for the inline pattern at call sites + // (see VirtualIdentityUtils.getOperatorId docstring). + val controller = ActorVirtualIdentity("CONTROLLER") + VirtualIdentityUtils.getOperatorId(controller) shouldBe "__DummyOperator" + val self = ActorVirtualIdentity("SELF") + VirtualIdentityUtils.getOperatorId(self) shouldBe "__DummyOperator" + } + // ----- getWorkerIndex ----- "getWorkerIndex" should "return the trailing numeric workerId from a worker actor name" in { From 0c8a7e7307b0743004405fdcb4c41076e25a93c5 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sat, 13 Jun 2026 22:37:45 -0700 Subject: [PATCH 4/4] refactor(amber): rename getOperatorId -> getLogicalOpId on both sides MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address Yicong's follow-up on #5706 — the bare name 'operator id' doesn't make the logical-vs-physical distinction visible at the call site. Rename to mirror the existing Scala 'getPhysicalOpId' convention: - Scala: VirtualIdentityUtils.getOperatorId -> getLogicalOpId - Python: core.util.virtual_identity.get_operator_id -> get_logical_op_id Updated all callers (ErrorUtils, both spec files) and the cross-language docstrings. --- .../main/python/core/util/virtual_identity.py | 15 ++++++++++----- .../apache/texera/amber/error/ErrorUtils.scala | 2 +- .../python/core/util/test_virtual_identity.py | 18 +++++++++--------- .../amber/util/VirtualIdentityUtils.scala | 13 +++++++++---- .../amber/util/VirtualIdentityUtilsSpec.scala | 16 ++++++++-------- 5 files changed, 37 insertions(+), 27 deletions(-) diff --git a/amber/src/main/python/core/util/virtual_identity.py b/amber/src/main/python/core/util/virtual_identity.py index 73db04fa5ca..6893e7e8f05 100644 --- a/amber/src/main/python/core/util/virtual_identity.py +++ b/amber/src/main/python/core/util/virtual_identity.py @@ -36,15 +36,20 @@ def get_worker_index(worker_id: str) -> int: raise ValueError(f"Invalid worker ID format: {worker_id}") -def get_operator_id(worker_id: str) -> str: +def get_logical_op_id(worker_id: str) -> str: """ Extract the logical operator id from a worker actor name of the form ``Worker:WF---``. - Mirrors the canonical parse in Scala ``VirtualIdentityUtils.getPhysicalOpId``. - Unlike the Scala sibling (which returns a ``__DummyOperator`` sentinel on a - non-match), this raises ``ValueError`` so a malformed worker id fails loudly - rather than yielding a wrong id silently. + Returns the logical operator id only (the ```` segment); the + physical operator id additionally carries the ````. Name + parallels Scala ``VirtualIdentityUtils.getLogicalOpId`` so the logical / + physical distinction is visible at every call site (the matching Scala + physical-id accessor is ``getPhysicalOpId``). + + Unlike the Scala sibling (which returns a ``__DummyOperator`` sentinel + on a non-match), this raises ``ValueError`` so a malformed worker id + fails loudly rather than yielding a wrong id silently. """ match = worker_name_pattern.fullmatch(worker_id) if match: diff --git a/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala b/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala index 84d232feea2..d7c021524c0 100644 --- a/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala +++ b/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala @@ -110,7 +110,7 @@ object ErrorUtils { var operatorId = "unknown operator" var workerId = "" if (actorIdOpt.isDefined) { - operatorId = VirtualIdentityUtils.getOperatorId(actorIdOpt.get) + operatorId = VirtualIdentityUtils.getLogicalOpId(actorIdOpt.get) workerId = actorIdOpt.get.name } (operatorId, workerId) diff --git a/amber/src/test/python/core/util/test_virtual_identity.py b/amber/src/test/python/core/util/test_virtual_identity.py index ea4fe03b37b..b94213c4143 100644 --- a/amber/src/test/python/core/util/test_virtual_identity.py +++ b/amber/src/test/python/core/util/test_virtual_identity.py @@ -20,7 +20,7 @@ from core.util.virtual_identity import ( deserialize_global_port_identity, get_from_actor_id_for_input_port_storage, - get_operator_id, + get_logical_op_id, get_worker_index, serialize_global_port_identity, ) @@ -84,40 +84,40 @@ def test_raises_value_error_on_trailing_junk(self): get_worker_index("Worker:WF1-myOp-main-7extra") -class TestGetOperatorId: +class TestGetLogicalOpId: def test_extracts_operator_id_from_canonical_name(self): - assert get_operator_id("Worker:WF1-myOp-main-0") == "myOp" + assert get_logical_op_id("Worker:WF1-myOp-main-0") == "myOp" def test_isolates_operator_id_containing_hyphens(self): # Load-bearing: operator ids contain dashes; greedy `.+` must still # stop at the final - tokens. assert ( - get_operator_id("Worker:WF12-PythonUDFV2-abc-def-main-0") + get_logical_op_id("Worker:WF12-PythonUDFV2-abc-def-main-0") == "PythonUDFV2-abc-def" ) def test_handles_non_main_layer_and_nonzero_index(self): # The exact case the old `rsplit("-main-0")` got silently wrong. - assert get_operator_id("Worker:WF3-op-loopLayer-7") == "op" + assert get_logical_op_id("Worker:WF3-op-loopLayer-7") == "op" def test_operator_id_ending_in_digits(self): - assert get_operator_id("Worker:WF1-op123-main-0") == "op123" + assert get_logical_op_id("Worker:WF1-op123-main-0") == "op123" def test_raises_value_error_on_special_actor_id(self): # Companions like CONTROLLER / SELF must fail loudly, not return junk. with pytest.raises(ValueError, match="Invalid worker ID format"): - get_operator_id("CONTROLLER") + get_logical_op_id("CONTROLLER") def test_raises_value_error_on_partial_match(self): with pytest.raises(ValueError, match="Invalid worker ID format"): - get_operator_id("Worker:WF1-myOp-main") + get_logical_op_id("Worker:WF1-myOp-main") def test_raises_value_error_on_trailing_junk(self): # fullmatch anchors the end: a valid-looking prefix with trailing junk # must fail loudly. The old start-anchored match() would have silently # returned "myOp" here. with pytest.raises(ValueError, match="Invalid worker ID format"): - get_operator_id("Worker:WF1-myOp-main-0extra") + get_logical_op_id("Worker:WF1-myOp-main-0extra") class TestSerializeGlobalPortIdentity: diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala index 0baa972f910..f91cca6af3a 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala @@ -75,14 +75,19 @@ object VirtualIdentityUtils { * Extract the logical operator id from a worker actor id of the form * `Worker:WF---`. * - * Convenience wrapper for the common `getPhysicalOpId(workerId).logicalOpId.id` - * pattern; the Python sibling is `core.util.virtual_identity.get_operator_id`. + * Returns the logical operator id only (the `` segment); + * the physical operator id additionally carries the `` and + * is exposed by [[getPhysicalOpId]]. Method name parallels + * `getPhysicalOpId` so callers can distinguish the two at the call + * site; the Python sibling is `core.util.virtual_identity.get_logical_op_id`. + * * The Python helper raises `ValueError` on a non-match for fail-loud * semantics; this Scala helper preserves the existing sentinel-on-miss * behavior (`"__DummyOperator"`) so it stays a drop-in replacement for - * the inline pattern at call sites. + * the inline `getPhysicalOpId(workerId).logicalOpId.id` pattern at + * call sites. */ - def getOperatorId(workerId: ActorVirtualIdentity): String = { + def getLogicalOpId(workerId: ActorVirtualIdentity): String = { getPhysicalOpId(workerId).logicalOpId.id } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala index b9f9f042c92..58e58f68e4b 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala @@ -94,20 +94,20 @@ class VirtualIdentityUtilsSpec extends AnyFlatSpec with Matchers { } } - // ----- getOperatorId ----- + // ----- getLogicalOpId ----- - "getOperatorId" should "return the logical operator id from a worker actor name" in { + "getLogicalOpId" should "return the logical operator id from a worker actor name" in { val actor = ActorVirtualIdentity("Worker:WF7-myOp-main-3") - VirtualIdentityUtils.getOperatorId(actor) shouldBe "myOp" + VirtualIdentityUtils.getLogicalOpId(actor) shouldBe "myOp" } it should "match getPhysicalOpId(...).logicalOpId.id for worker actor names" in { - // Pin the helper as a thin wrapper — `getOperatorId(workerId)` and + // Pin the helper as a thin wrapper — `getLogicalOpId(workerId)` and // `getPhysicalOpId(workerId).logicalOpId.id` must always agree, so // call sites that migrate to the helper are guaranteed to keep // identical behavior. val actor = ActorVirtualIdentity("Worker:WF1-multi-part-op-main-0") - VirtualIdentityUtils.getOperatorId(actor) shouldBe + VirtualIdentityUtils.getLogicalOpId(actor) shouldBe VirtualIdentityUtils.getPhysicalOpId(actor).logicalOpId.id } @@ -115,11 +115,11 @@ class VirtualIdentityUtilsSpec extends AnyFlatSpec with Matchers { // The Python sibling raises ValueError on a non-match; the Scala // helper preserves the existing __DummyOperator sentinel so it // stays a drop-in replacement for the inline pattern at call sites - // (see VirtualIdentityUtils.getOperatorId docstring). + // (see VirtualIdentityUtils.getLogicalOpId docstring). val controller = ActorVirtualIdentity("CONTROLLER") - VirtualIdentityUtils.getOperatorId(controller) shouldBe "__DummyOperator" + VirtualIdentityUtils.getLogicalOpId(controller) shouldBe "__DummyOperator" val self = ActorVirtualIdentity("SELF") - VirtualIdentityUtils.getOperatorId(self) shouldBe "__DummyOperator" + VirtualIdentityUtils.getLogicalOpId(self) shouldBe "__DummyOperator" } // ----- getWorkerIndex -----