-
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathstreamConsumer.ts
More file actions
21 lines (18 loc) · 677 Bytes
/
streamConsumer.ts
File metadata and controls
21 lines (18 loc) · 677 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import { NodeRuntime } from "@effect/platform-node";
import { Console, Effect, Stream } from "effect";
import { Consumer } from "../src";
import { KafkaJS } from "../src/KafkaJS";
const program = Consumer.serveStream("test-topic").pipe(
Stream.runForEach(({ topic, partition, ...message }) =>
Console.log({
topic,
partition,
offset: message.offset,
value: message.value?.toString(),
}),
),
);
const ConsumerLive = Consumer.layer({ groupId: "group" });
const KafkaLive = KafkaJS.layer({ brokers: ["localhost:19092"] });
const MainLive = program.pipe(Effect.provide(ConsumerLive), Effect.provide(KafkaLive));
NodeRuntime.runMain(MainLive);