Skip to content

feature: Add Temporal workflow example for Scala 3#463

Open
xerial wants to merge 3 commits intomainfrom
feature/temporal-example
Open

feature: Add Temporal workflow example for Scala 3#463
xerial wants to merge 3 commits intomainfrom
feature/temporal-example

Conversation

@xerial
Copy link
Copy Markdown
Member

@xerial xerial commented Apr 5, 2026

Summary

  • Adds uni-temporal-example module with Temporal SDK (1.27.0) to explore workflow orchestration usability in Scala 3
  • Implements two example workflows: HelloWorkflow (basic greeting) and DataPipelineWorkflow (multi-step pipeline with retry handling)
  • Includes ScalaDataConverter for Jackson/Scala 3 case class serialization interop
  • Tests use Temporal's in-process TestWorkflowEnvironment — no external server needed

Key findings so far

  • Temporal Java SDK works with Scala 3 traits as workflow/activity interfaces
  • Jackson requires jackson-module-scala for case class deserialization
  • TestWorkflowEnvironment makes unit testing straightforward
  • Workflow/Activity interface separation is enforced but natural in Scala

Test plan

  • HelloWorkflowTest — basic greeting workflow (2 tests)
  • DataPipelineWorkflowTest — multi-step pipeline with transient failure retry (2 tests)
  • Manual testing with temporal server start-dev (via TemporalWorkerApp / TemporalClientApp)

🤖 Generated with Claude Code

…exploration

Adds a uni-temporal-example module with two example workflows:
- HelloWorkflow: minimal workflow to demonstrate the basics
- DataPipelineWorkflow: multi-step pipeline (fetch → transform → store)
  with retry handling for transient failures

Includes ScalaDataConverter for Jackson/Scala 3 case class interop,
and tests using Temporal's in-process TestWorkflowEnvironment.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@github-actions github-actions bot added doc Improvements or additions to documentation feature New feature labels Apr 5, 2026
@xerial
Copy link
Copy Markdown
Member Author

xerial commented Apr 5, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new module, uni-temporal-example, to explore Temporal workflow orchestration in Scala 3. It includes implementations for a simple 'Hello World' workflow and a multi-step data pipeline, along with unit tests and a local worker application. Feedback focuses on ensuring that WorkflowClient instances in both the worker and client applications are configured with the custom ScalaDataConverter to correctly serialize Scala 3 case classes. Additionally, it is recommended to use Temporal's ActivityExecutionContext instead of a manual AtomicInteger for tracking activity attempts to ensure robustness across worker instances.

logger.info("Connecting to local Temporal dev server...")

val service = WorkflowServiceStubs.newLocalServiceStubs()
val client = WorkflowClient.newInstance(service)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The WorkflowClient needs to be configured with ScalaDataConverter to correctly handle Scala 3 case classes (like PipelineInput and PipelineResult). Without this, the worker will fail to deserialize workflow inputs and activity results because the default Jackson converter doesn't support Scala 3 case classes without a no-arg constructor.

  val client  = WorkflowClient.newInstance(
    service,
    io.temporal.client.WorkflowClientOptions.newBuilder().setDataConverter(ScalaDataConverter.converter).build()
  )

object TemporalClientApp extends App with LogSupport:

val service = WorkflowServiceStubs.newLocalServiceStubs()
val client = WorkflowClient.newInstance(service)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to the worker, the client application must also use ScalaDataConverter to serialize Scala case classes when starting workflows. Otherwise, the Temporal server will receive incompatible payloads that the worker won't be able to process.

  val client  = WorkflowClient.newInstance(
    service,
    io.temporal.client.WorkflowClientOptions.newBuilder().setDataConverter(ScalaDataConverter.converter).build()
  )

Comment on lines +47 to +64
* DataActivities implementation with a simulated transient failure on the first fetch attempt.
*
* The static failCount tracks attempts across activity retries (the activity worker process is the
* same in tests). In production each activity runs in its own thread; Temporal supplies the
* attempt number via `ActivityExecutionContext`.
*/
class DataActivitiesImpl extends DataActivities with LogSupport:

// Tracks how many times fetchData has been attempted (simulates transient failures)
private val fetchAttempts = AtomicInteger(0)

override def fetchData(sourceId: String): String =
val attempt = fetchAttempts.incrementAndGet()
logger.info(s"fetchData attempt ${attempt} for source '${sourceId}'")
if attempt == 1 then
// Simulate a transient failure on the first attempt
throw RuntimeException(s"Transient network error fetching '${sourceId}' (attempt 1)")
s"raw-data-for-${sourceId}"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Instead of using a manual AtomicInteger to track attempts, it is recommended to use Temporal's ActivityExecutionContext. This is more robust as it works correctly across different worker instances and provides the actual attempt number tracked by the Temporal server, which is useful for implementing custom retry logic or logging.

/**
  * DataActivities implementation with a simulated transient failure on the first fetch attempt.
  *
  * Temporal supplies the attempt number via ActivityExecutionContext, which is the recommended
  * way to track attempts across retries.
  */
class DataActivitiesImpl extends DataActivities with LogSupport:

  override def fetchData(sourceId: String): String =
    // Use Temporal's ActivityExecutionContext to get the current attempt number
    val attempt = io.temporal.activity.Activity.getExecutionContext().getInfo().getAttempt()
    logger.info(s"fetchData attempt ${attempt} for source '${sourceId}'")
    if attempt == 1 then
      // Simulate a transient failure on the first attempt
      throw RuntimeException(s"Transient network error fetching '${sourceId}' (attempt 1)")
    s"raw-data-for-${sourceId}"

…flow IDs

Address code review findings:
- TemporalWorkerApp and TemporalClientApp now use ScalaDataConverter
  for proper Scala case class serialization
- Workflow IDs use UUID to avoid WorkflowExecutionAlreadyStarted on re-runs

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new module, uni-temporal-example, to explore Temporal workflow orchestration in Scala 3. It includes a design document, workflow and activity implementations for a data pipeline and a simple greeting service, custom Jackson serialization for Scala case classes, and integration tests. Feedback focuses on improving maintainability by extracting version constants in build.sbt, reducing test boilerplate through fixture methods, and optimizing workflow stub usage in tests.

build.sbt Outdated
Comment on lines +325 to +327
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.14.2",
// Redirect slf4j to airframe-log
"org.slf4j" % "slf4j-jdk14" % "2.0.17"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with TEMPORAL_SDK_VERSION and for better maintainability, please extract the hardcoded versions for jackson-module-scala and slf4j-jdk14 into val constants, defined near the TEMPORAL_SDK_VERSION definition.

* Each test creates its own TestWorkflowEnvironment so state (e.g. attempt counters in
* DataActivitiesImpl) is isolated between test runs.
*/
class DataPipelineWorkflowTest extends UniTest:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is significant boilerplate code duplicated across the two test cases (environment setup, worker registration, stub creation). To improve maintainability and reduce duplication, consider extracting this common setup into a fixture method that uses the loan pattern. This would make the tests cleaner and easier to read.

For example:

private def withWorkflowStub(testCode: DataPipelineWorkflow => Unit): Unit = {
  val testEnv = newTestEnv()
  try {
    val worker = testEnv.newWorker(TemporalExample.TaskQueue)
    worker.registerWorkflowImplementationTypes(classOf[DataPipelineWorkflowImpl])
    worker.registerActivitiesImplementations(new DataActivitiesImpl())
    testEnv.start()

    val stub = testEnv.getWorkflowClient.newWorkflowStub(
      classOf[DataPipelineWorkflow],
      WorkflowOptions.newBuilder().setTaskQueue(TemporalExample.TaskQueue).build()
    )
    testCode(stub)
  } finally {
    testEnv.close()
  }
}

// Then in your test:
test("DataPipelineWorkflow completes after transient fetch failure") {
  withWorkflowStub { stub =>
    val result = stub.process(PipelineInput("sensor-42", "raw,data,stream"))
    // assertions...
  }
}

Note that new DataActivitiesImpl() is used to ensure state is not shared between tests.

Comment on lines +45 to +50
for name <- Seq("Alice", "Bob", "Temporal") do
val stub = client.newWorkflowStub(
classOf[HelloWorkflow],
WorkflowOptions.newBuilder().setTaskQueue(TemporalExample.TaskQueue).build()
)
stub.sayHello(name) shouldBe s"Hello, ${name}!"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The workflow stub is being recreated inside the for loop for each iteration. This is unnecessary and inefficient. The stub should be created once before the loop and reused.

      val stub = client.newWorkflowStub(
        classOf[HelloWorkflow],
        WorkflowOptions.newBuilder().setTaskQueue(TemporalExample.TaskQueue).build()
      )
      for name <- Seq("Alice", "Bob", "Temporal") do
        stub.sayHello(name) shouldBe s"Hello, ${name}!"

* TestWorkflowEnvironment runs a full Temporal server in memory — no external process needed. This
* allows testing workflow + activity integration without a real Temporal cluster.
*/
class HelloWorkflowTest extends UniTest:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is significant boilerplate code duplicated across the two test cases (environment setup, worker registration, stub creation). To improve maintainability and reduce duplication, consider extracting this common setup into a fixture method that uses the loan pattern. This would also address the inefficient stub creation in the second test.

- Use Activity.getExecutionContext() for attempt tracking instead of AtomicInteger
- Extract JACKSON_SCALA_VERSION constant in build.sbt
- Apply loan pattern to test fixtures to reduce boilerplate
- Wire ScalaDataConverter into worker/client apps (already done, refined)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc Improvements or additions to documentation feature New feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant