diff --git a/amber/src/main/python/core/util/virtual_identity.py b/amber/src/main/python/core/util/virtual_identity.py index 49da75fcd58..6893e7e8f05 100644 --- a/amber/src/main/python/core/util/virtual_identity.py +++ b/amber/src/main/python/core/util/virtual_identity.py @@ -24,16 +24,37 @@ ActorVirtualIdentity, ) -worker_name_pattern = re.compile(r"Worker:WF\d+-.+-(\w+)-(\d+)") +worker_name_pattern = re.compile(r"Worker:WF(\d+)-(.+)-(\w+)-(\d+)") MATERIALIZATION_READER_ACTOR_PREFIX = "MATERIALIZATION_READER_" def get_worker_index(worker_id: str) -> int: - match = worker_name_pattern.match(worker_id) + match = worker_name_pattern.fullmatch(worker_id) if match: - return int(match.group(2)) - raise ValueError("Invalid worker ID format") + return int(match.group(4)) + raise ValueError(f"Invalid worker ID format: {worker_id}") + + +def get_logical_op_id(worker_id: str) -> str: + """ + Extract the logical operator id from a worker actor name of the form + ``Worker:WF---``. + + Returns the logical operator id only (the ```` segment); the + physical operator id additionally carries the ````. Name + parallels Scala ``VirtualIdentityUtils.getLogicalOpId`` so the logical / + physical distinction is visible at every call site (the matching Scala + physical-id accessor is ``getPhysicalOpId``). + + Unlike the Scala sibling (which returns a ``__DummyOperator`` sentinel + on a non-match), this raises ``ValueError`` so a malformed worker id + fails loudly rather than yielding a wrong id silently. + """ + match = worker_name_pattern.fullmatch(worker_id) + if match: + return match.group(2) + raise ValueError(f"Invalid worker ID format: {worker_id}") def serialize_global_port_identity(obj: GlobalPortIdentity) -> str: diff --git a/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala b/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala index af16044af29..d7c021524c0 100644 --- a/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala +++ b/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala @@ -110,7 +110,7 @@ object ErrorUtils { var operatorId = "unknown operator" var workerId = "" if (actorIdOpt.isDefined) { - operatorId = VirtualIdentityUtils.getPhysicalOpId(actorIdOpt.get).logicalOpId.id + operatorId = VirtualIdentityUtils.getLogicalOpId(actorIdOpt.get) workerId = actorIdOpt.get.name } (operatorId, workerId) diff --git a/amber/src/test/python/core/util/test_virtual_identity.py b/amber/src/test/python/core/util/test_virtual_identity.py index c2f6f636850..b94213c4143 100644 --- a/amber/src/test/python/core/util/test_virtual_identity.py +++ b/amber/src/test/python/core/util/test_virtual_identity.py @@ -20,6 +20,7 @@ from core.util.virtual_identity import ( deserialize_global_port_identity, get_from_actor_id_for_input_port_storage, + get_logical_op_id, get_worker_index, serialize_global_port_identity, ) @@ -75,6 +76,49 @@ def test_extracts_trailing_index_even_when_layer_name_contains_hyphens(self): # greedy `.+` and breaks the trailing match surfaces here. assert get_worker_index("Worker:WF1-myOp-1st-physical-op-3") == 3 + def test_raises_value_error_on_trailing_junk(self): + # fullmatch (not match) anchors the end of the string: a well-formed + # prefix followed by trailing junk must fail loudly. The old + # start-anchored match() would have silently returned 7 here. + with pytest.raises(ValueError, match="Invalid worker ID format"): + get_worker_index("Worker:WF1-myOp-main-7extra") + + +class TestGetLogicalOpId: + def test_extracts_operator_id_from_canonical_name(self): + assert get_logical_op_id("Worker:WF1-myOp-main-0") == "myOp" + + def test_isolates_operator_id_containing_hyphens(self): + # Load-bearing: operator ids contain dashes; greedy `.+` must still + # stop at the final - tokens. + assert ( + get_logical_op_id("Worker:WF12-PythonUDFV2-abc-def-main-0") + == "PythonUDFV2-abc-def" + ) + + def test_handles_non_main_layer_and_nonzero_index(self): + # The exact case the old `rsplit("-main-0")` got silently wrong. + assert get_logical_op_id("Worker:WF3-op-loopLayer-7") == "op" + + def test_operator_id_ending_in_digits(self): + assert get_logical_op_id("Worker:WF1-op123-main-0") == "op123" + + def test_raises_value_error_on_special_actor_id(self): + # Companions like CONTROLLER / SELF must fail loudly, not return junk. + with pytest.raises(ValueError, match="Invalid worker ID format"): + get_logical_op_id("CONTROLLER") + + def test_raises_value_error_on_partial_match(self): + with pytest.raises(ValueError, match="Invalid worker ID format"): + get_logical_op_id("Worker:WF1-myOp-main") + + def test_raises_value_error_on_trailing_junk(self): + # fullmatch anchors the end: a valid-looking prefix with trailing junk + # must fail loudly. The old start-anchored match() would have silently + # returned "myOp" here. + with pytest.raises(ValueError, match="Invalid worker ID format"): + get_logical_op_id("Worker:WF1-myOp-main-0extra") + class TestSerializeGlobalPortIdentity: def test_emits_documented_format_for_canonical_input(self): diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala index 586d77a9c55..f91cca6af3a 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala @@ -71,6 +71,26 @@ object VirtualIdentityUtils { } } + /** + * Extract the logical operator id from a worker actor id of the form + * `Worker:WF---`. + * + * Returns the logical operator id only (the `` segment); + * the physical operator id additionally carries the `` and + * is exposed by [[getPhysicalOpId]]. Method name parallels + * `getPhysicalOpId` so callers can distinguish the two at the call + * site; the Python sibling is `core.util.virtual_identity.get_logical_op_id`. + * + * The Python helper raises `ValueError` on a non-match for fail-loud + * semantics; this Scala helper preserves the existing sentinel-on-miss + * behavior (`"__DummyOperator"`) so it stays a drop-in replacement for + * the inline `getPhysicalOpId(workerId).logicalOpId.id` pattern at + * call sites. + */ + def getLogicalOpId(workerId: ActorVirtualIdentity): String = { + getPhysicalOpId(workerId).logicalOpId.id + } + def getWorkerIndex(workerId: ActorVirtualIdentity): Option[Int] = { workerId.name match { case workerNamePattern(_, _, _, idx) => diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala index 8ebf7dabec1..58e58f68e4b 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala @@ -94,6 +94,34 @@ class VirtualIdentityUtilsSpec extends AnyFlatSpec with Matchers { } } + // ----- getLogicalOpId ----- + + "getLogicalOpId" should "return the logical operator id from a worker actor name" in { + val actor = ActorVirtualIdentity("Worker:WF7-myOp-main-3") + VirtualIdentityUtils.getLogicalOpId(actor) shouldBe "myOp" + } + + it should "match getPhysicalOpId(...).logicalOpId.id for worker actor names" in { + // Pin the helper as a thin wrapper — `getLogicalOpId(workerId)` and + // `getPhysicalOpId(workerId).logicalOpId.id` must always agree, so + // call sites that migrate to the helper are guaranteed to keep + // identical behavior. + val actor = ActorVirtualIdentity("Worker:WF1-multi-part-op-main-0") + VirtualIdentityUtils.getLogicalOpId(actor) shouldBe + VirtualIdentityUtils.getPhysicalOpId(actor).logicalOpId.id + } + + it should "fall back to the __DummyOperator sentinel for non-worker actor names" in { + // The Python sibling raises ValueError on a non-match; the Scala + // helper preserves the existing __DummyOperator sentinel so it + // stays a drop-in replacement for the inline pattern at call sites + // (see VirtualIdentityUtils.getLogicalOpId docstring). + val controller = ActorVirtualIdentity("CONTROLLER") + VirtualIdentityUtils.getLogicalOpId(controller) shouldBe "__DummyOperator" + val self = ActorVirtualIdentity("SELF") + VirtualIdentityUtils.getLogicalOpId(self) shouldBe "__DummyOperator" + } + // ----- getWorkerIndex ----- "getWorkerIndex" should "return the trailing numeric workerId from a worker actor name" in {