Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions amber/src/main/python/core/util/virtual_identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,37 @@
ActorVirtualIdentity,
)

worker_name_pattern = re.compile(r"Worker:WF\d+-.+-(\w+)-(\d+)")
worker_name_pattern = re.compile(r"Worker:WF(\d+)-(.+)-(\w+)-(\d+)")

MATERIALIZATION_READER_ACTOR_PREFIX = "MATERIALIZATION_READER_"


def get_worker_index(worker_id: str) -> int:
match = worker_name_pattern.match(worker_id)
match = worker_name_pattern.fullmatch(worker_id)
if match:
return int(match.group(2))
raise ValueError("Invalid worker ID format")
return int(match.group(4))
raise ValueError(f"Invalid worker ID format: {worker_id}")


def get_logical_op_id(worker_id: str) -> str:
"""
Extract the logical operator id from a worker actor name of the form
``Worker:WF<workflowId>-<operatorId>-<layerName>-<workerIndex>``.

Returns the logical operator id only (the ``<operatorId>`` segment); the
physical operator id additionally carries the ``<layerName>``. Name
parallels Scala ``VirtualIdentityUtils.getLogicalOpId`` so the logical /
physical distinction is visible at every call site (the matching Scala
physical-id accessor is ``getPhysicalOpId``).

Unlike the Scala sibling (which returns a ``__DummyOperator`` sentinel
on a non-match), this raises ``ValueError`` so a malformed worker id
fails loudly rather than yielding a wrong id silently.
"""
match = worker_name_pattern.fullmatch(worker_id)
if match:
return match.group(2)
raise ValueError(f"Invalid worker ID format: {worker_id}")


def serialize_global_port_identity(obj: GlobalPortIdentity) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ object ErrorUtils {
var operatorId = "unknown operator"
var workerId = ""
if (actorIdOpt.isDefined) {
operatorId = VirtualIdentityUtils.getPhysicalOpId(actorIdOpt.get).logicalOpId.id
operatorId = VirtualIdentityUtils.getLogicalOpId(actorIdOpt.get)
workerId = actorIdOpt.get.name
}
(operatorId, workerId)
Expand Down
44 changes: 44 additions & 0 deletions amber/src/test/python/core/util/test_virtual_identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from core.util.virtual_identity import (
deserialize_global_port_identity,
get_from_actor_id_for_input_port_storage,
get_logical_op_id,
get_worker_index,
serialize_global_port_identity,
)
Expand Down Expand Up @@ -75,6 +76,49 @@ def test_extracts_trailing_index_even_when_layer_name_contains_hyphens(self):
# greedy `.+` and breaks the trailing match surfaces here.
assert get_worker_index("Worker:WF1-myOp-1st-physical-op-3") == 3
Comment thread
aglinxinyuan marked this conversation as resolved.

def test_raises_value_error_on_trailing_junk(self):
# fullmatch (not match) anchors the end of the string: a well-formed
# prefix followed by trailing junk must fail loudly. The old
# start-anchored match() would have silently returned 7 here.
with pytest.raises(ValueError, match="Invalid worker ID format"):
get_worker_index("Worker:WF1-myOp-main-7extra")


class TestGetLogicalOpId:
def test_extracts_operator_id_from_canonical_name(self):
assert get_logical_op_id("Worker:WF1-myOp-main-0") == "myOp"

def test_isolates_operator_id_containing_hyphens(self):
# Load-bearing: operator ids contain dashes; greedy `.+` must still
# stop at the final <layer>-<index> tokens.
assert (
get_logical_op_id("Worker:WF12-PythonUDFV2-abc-def-main-0")
== "PythonUDFV2-abc-def"
)

def test_handles_non_main_layer_and_nonzero_index(self):
# The exact case the old `rsplit("-main-0")` got silently wrong.
assert get_logical_op_id("Worker:WF3-op-loopLayer-7") == "op"

def test_operator_id_ending_in_digits(self):
assert get_logical_op_id("Worker:WF1-op123-main-0") == "op123"

def test_raises_value_error_on_special_actor_id(self):
# Companions like CONTROLLER / SELF must fail loudly, not return junk.
with pytest.raises(ValueError, match="Invalid worker ID format"):
get_logical_op_id("CONTROLLER")

def test_raises_value_error_on_partial_match(self):
with pytest.raises(ValueError, match="Invalid worker ID format"):
get_logical_op_id("Worker:WF1-myOp-main")

def test_raises_value_error_on_trailing_junk(self):
# fullmatch anchors the end: a valid-looking prefix with trailing junk
# must fail loudly. The old start-anchored match() would have silently
# returned "myOp" here.
with pytest.raises(ValueError, match="Invalid worker ID format"):
get_logical_op_id("Worker:WF1-myOp-main-0extra")


class TestSerializeGlobalPortIdentity:
def test_emits_documented_format_for_canonical_input(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,26 @@ object VirtualIdentityUtils {
}
}

/**
* Extract the logical operator id from a worker actor id of the form
* `Worker:WF<workflowId>-<operatorId>-<layerName>-<workerIndex>`.
*
* Returns the logical operator id only (the `<operatorId>` segment);
* the physical operator id additionally carries the `<layerName>` and
* is exposed by [[getPhysicalOpId]]. Method name parallels
* `getPhysicalOpId` so callers can distinguish the two at the call
* site; the Python sibling is `core.util.virtual_identity.get_logical_op_id`.
*
* The Python helper raises `ValueError` on a non-match for fail-loud
* semantics; this Scala helper preserves the existing sentinel-on-miss
* behavior (`"__DummyOperator"`) so it stays a drop-in replacement for
* the inline `getPhysicalOpId(workerId).logicalOpId.id` pattern at
* call sites.
*/
def getLogicalOpId(workerId: ActorVirtualIdentity): String = {
getPhysicalOpId(workerId).logicalOpId.id
}

def getWorkerIndex(workerId: ActorVirtualIdentity): Option[Int] = {
workerId.name match {
case workerNamePattern(_, _, _, idx) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,34 @@ class VirtualIdentityUtilsSpec extends AnyFlatSpec with Matchers {
}
}

// ----- getLogicalOpId -----

"getLogicalOpId" should "return the logical operator id from a worker actor name" in {
val actor = ActorVirtualIdentity("Worker:WF7-myOp-main-3")
VirtualIdentityUtils.getLogicalOpId(actor) shouldBe "myOp"
}

it should "match getPhysicalOpId(...).logicalOpId.id for worker actor names" in {
// Pin the helper as a thin wrapper — `getLogicalOpId(workerId)` and
// `getPhysicalOpId(workerId).logicalOpId.id` must always agree, so
// call sites that migrate to the helper are guaranteed to keep
// identical behavior.
val actor = ActorVirtualIdentity("Worker:WF1-multi-part-op-main-0")
VirtualIdentityUtils.getLogicalOpId(actor) shouldBe
VirtualIdentityUtils.getPhysicalOpId(actor).logicalOpId.id
}

it should "fall back to the __DummyOperator sentinel for non-worker actor names" in {
// The Python sibling raises ValueError on a non-match; the Scala
// helper preserves the existing __DummyOperator sentinel so it
// stays a drop-in replacement for the inline pattern at call sites
// (see VirtualIdentityUtils.getLogicalOpId docstring).
val controller = ActorVirtualIdentity("CONTROLLER")
VirtualIdentityUtils.getLogicalOpId(controller) shouldBe "__DummyOperator"
val self = ActorVirtualIdentity("SELF")
VirtualIdentityUtils.getLogicalOpId(self) shouldBe "__DummyOperator"
}

// ----- getWorkerIndex -----

"getWorkerIndex" should "return the trailing numeric workerId from a worker actor name" in {
Expand Down
Loading