feature: Add Temporal workflow example for Scala 3#463
Conversation
…exploration Adds a uni-temporal-example module with two example workflows: - HelloWorkflow: minimal workflow to demonstrate the basics - DataPipelineWorkflow: multi-step pipeline (fetch → transform → store) with retry handling for transient failures Includes ScalaDataConverter for Jackson/Scala 3 case class interop, and tests using Temporal's in-process TestWorkflowEnvironment. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a new module, uni-temporal-example, to explore Temporal workflow orchestration in Scala 3. It includes implementations for a simple 'Hello World' workflow and a multi-step data pipeline, along with unit tests and a local worker application. Feedback focuses on ensuring that WorkflowClient instances in both the worker and client applications are configured with the custom ScalaDataConverter to correctly serialize Scala 3 case classes. Additionally, it is recommended to use Temporal's ActivityExecutionContext instead of a manual AtomicInteger for tracking activity attempts to ensure robustness across worker instances.
| logger.info("Connecting to local Temporal dev server...") | ||
|
|
||
| val service = WorkflowServiceStubs.newLocalServiceStubs() | ||
| val client = WorkflowClient.newInstance(service) |
There was a problem hiding this comment.
The WorkflowClient needs to be configured with ScalaDataConverter to correctly handle Scala 3 case classes (like PipelineInput and PipelineResult). Without this, the worker will fail to deserialize workflow inputs and activity results because the default Jackson converter doesn't support Scala 3 case classes without a no-arg constructor.
val client = WorkflowClient.newInstance(
service,
io.temporal.client.WorkflowClientOptions.newBuilder().setDataConverter(ScalaDataConverter.converter).build()
)| object TemporalClientApp extends App with LogSupport: | ||
|
|
||
| val service = WorkflowServiceStubs.newLocalServiceStubs() | ||
| val client = WorkflowClient.newInstance(service) |
There was a problem hiding this comment.
Similar to the worker, the client application must also use ScalaDataConverter to serialize Scala case classes when starting workflows. Otherwise, the Temporal server will receive incompatible payloads that the worker won't be able to process.
val client = WorkflowClient.newInstance(
service,
io.temporal.client.WorkflowClientOptions.newBuilder().setDataConverter(ScalaDataConverter.converter).build()
)| * DataActivities implementation with a simulated transient failure on the first fetch attempt. | ||
| * | ||
| * The static failCount tracks attempts across activity retries (the activity worker process is the | ||
| * same in tests). In production each activity runs in its own thread; Temporal supplies the | ||
| * attempt number via `ActivityExecutionContext`. | ||
| */ | ||
| class DataActivitiesImpl extends DataActivities with LogSupport: | ||
|
|
||
| // Tracks how many times fetchData has been attempted (simulates transient failures) | ||
| private val fetchAttempts = AtomicInteger(0) | ||
|
|
||
| override def fetchData(sourceId: String): String = | ||
| val attempt = fetchAttempts.incrementAndGet() | ||
| logger.info(s"fetchData attempt ${attempt} for source '${sourceId}'") | ||
| if attempt == 1 then | ||
| // Simulate a transient failure on the first attempt | ||
| throw RuntimeException(s"Transient network error fetching '${sourceId}' (attempt 1)") | ||
| s"raw-data-for-${sourceId}" |
There was a problem hiding this comment.
Instead of using a manual AtomicInteger to track attempts, it is recommended to use Temporal's ActivityExecutionContext. This is more robust as it works correctly across different worker instances and provides the actual attempt number tracked by the Temporal server, which is useful for implementing custom retry logic or logging.
/**
* DataActivities implementation with a simulated transient failure on the first fetch attempt.
*
* Temporal supplies the attempt number via ActivityExecutionContext, which is the recommended
* way to track attempts across retries.
*/
class DataActivitiesImpl extends DataActivities with LogSupport:
override def fetchData(sourceId: String): String =
// Use Temporal's ActivityExecutionContext to get the current attempt number
val attempt = io.temporal.activity.Activity.getExecutionContext().getInfo().getAttempt()
logger.info(s"fetchData attempt ${attempt} for source '${sourceId}'")
if attempt == 1 then
// Simulate a transient failure on the first attempt
throw RuntimeException(s"Transient network error fetching '${sourceId}' (attempt 1)")
s"raw-data-for-${sourceId}"…flow IDs Address code review findings: - TemporalWorkerApp and TemporalClientApp now use ScalaDataConverter for proper Scala case class serialization - Workflow IDs use UUID to avoid WorkflowExecutionAlreadyStarted on re-runs Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a new module, uni-temporal-example, to explore Temporal workflow orchestration in Scala 3. It includes a design document, workflow and activity implementations for a data pipeline and a simple greeting service, custom Jackson serialization for Scala case classes, and integration tests. Feedback focuses on improving maintainability by extracting version constants in build.sbt, reducing test boilerplate through fixture methods, and optimizing workflow stub usage in tests.
build.sbt
Outdated
| "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.14.2", | ||
| // Redirect slf4j to airframe-log | ||
| "org.slf4j" % "slf4j-jdk14" % "2.0.17" |
| * Each test creates its own TestWorkflowEnvironment so state (e.g. attempt counters in | ||
| * DataActivitiesImpl) is isolated between test runs. | ||
| */ | ||
| class DataPipelineWorkflowTest extends UniTest: |
There was a problem hiding this comment.
There is significant boilerplate code duplicated across the two test cases (environment setup, worker registration, stub creation). To improve maintainability and reduce duplication, consider extracting this common setup into a fixture method that uses the loan pattern. This would make the tests cleaner and easier to read.
For example:
private def withWorkflowStub(testCode: DataPipelineWorkflow => Unit): Unit = {
val testEnv = newTestEnv()
try {
val worker = testEnv.newWorker(TemporalExample.TaskQueue)
worker.registerWorkflowImplementationTypes(classOf[DataPipelineWorkflowImpl])
worker.registerActivitiesImplementations(new DataActivitiesImpl())
testEnv.start()
val stub = testEnv.getWorkflowClient.newWorkflowStub(
classOf[DataPipelineWorkflow],
WorkflowOptions.newBuilder().setTaskQueue(TemporalExample.TaskQueue).build()
)
testCode(stub)
} finally {
testEnv.close()
}
}
// Then in your test:
test("DataPipelineWorkflow completes after transient fetch failure") {
withWorkflowStub { stub =>
val result = stub.process(PipelineInput("sensor-42", "raw,data,stream"))
// assertions...
}
}Note that new DataActivitiesImpl() is used to ensure state is not shared between tests.
| for name <- Seq("Alice", "Bob", "Temporal") do | ||
| val stub = client.newWorkflowStub( | ||
| classOf[HelloWorkflow], | ||
| WorkflowOptions.newBuilder().setTaskQueue(TemporalExample.TaskQueue).build() | ||
| ) | ||
| stub.sayHello(name) shouldBe s"Hello, ${name}!" |
There was a problem hiding this comment.
The workflow stub is being recreated inside the for loop for each iteration. This is unnecessary and inefficient. The stub should be created once before the loop and reused.
val stub = client.newWorkflowStub(
classOf[HelloWorkflow],
WorkflowOptions.newBuilder().setTaskQueue(TemporalExample.TaskQueue).build()
)
for name <- Seq("Alice", "Bob", "Temporal") do
stub.sayHello(name) shouldBe s"Hello, ${name}!"| * TestWorkflowEnvironment runs a full Temporal server in memory — no external process needed. This | ||
| * allows testing workflow + activity integration without a real Temporal cluster. | ||
| */ | ||
| class HelloWorkflowTest extends UniTest: |
There was a problem hiding this comment.
There is significant boilerplate code duplicated across the two test cases (environment setup, worker registration, stub creation). To improve maintainability and reduce duplication, consider extracting this common setup into a fixture method that uses the loan pattern. This would also address the inefficient stub creation in the second test.
- Use Activity.getExecutionContext() for attempt tracking instead of AtomicInteger - Extract JACKSON_SCALA_VERSION constant in build.sbt - Apply loan pattern to test fixtures to reduce boilerplate - Wire ScalaDataConverter into worker/client apps (already done, refined) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
uni-temporal-examplemodule with Temporal SDK (1.27.0) to explore workflow orchestration usability in Scala 3HelloWorkflow(basic greeting) andDataPipelineWorkflow(multi-step pipeline with retry handling)ScalaDataConverterfor Jackson/Scala 3 case class serialization interopTestWorkflowEnvironment— no external server neededKey findings so far
jackson-module-scalafor case class deserializationTestWorkflowEnvironmentmakes unit testing straightforwardTest plan
HelloWorkflowTest— basic greeting workflow (2 tests)DataPipelineWorkflowTest— multi-step pipeline with transient failure retry (2 tests)temporal server start-dev(viaTemporalWorkerApp/TemporalClientApp)🤖 Generated with Claude Code