Skip to content

Add Support for an HttpApi style topic schema validation #14

@anglinb

Description

@anglinb

This is a pretty rough cut but was imagining something like this!

import { NodeRuntime } from "@effect/platform-node";
import { Console, Effect, Layer } from "effect";
import { Consumer, ConsumerRecord, KafkaJSInstance, MessageRouter } from "../src";

const TopicGroupLive = TopicGroup.make("com.example.events.Users")
   .topic(".Signup").schema(Schema.Struct({ ... }))
   .topic(".Checkout").schema(Schema.Struct({ ... }))


class TopicsLive extends Topics.empty.add(TopicGroupLive) {}

// the `TopicRouter.group` api returns a `Layer`
const TopicRouterLive: Layer.Layer<TopicGroup.TopicGroup<"com.example.events.Users">> =
  TopicRouter.group(TopicsLive,  "com.example.events.Users", (router) => 
      router.subscribe('.Signup', Effect.flatMap(ConsumerRecord.ConsumerRecord, ({ topic: _, partition, ...message }) =>
      Console.log({
        partition,
        ...message,
        key: message.key?.toString(),
        value: message.value?.toString(),
      }),
    ).subscribe('.Checkout' ...
)

const TopicRouterLive: Layer.Layer<...> = TopicRouter.build(TopicsLive).pipe(
  Layer.provide(UsersApiLive)
)

const KafkaLive = KafkaJSInstance.layer({ brokers: ["localhost:19092"] });
const MainLive = ConsumerLive.pipe(Layer.provide(KafkaLive));

NodeRuntime.runMain(Layer.launch(MainLive));

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions