Event Consumer
Tune src/domain/model/trend.ts:
@@ -3,5 +3,4 @@ import { Book } from ".";
export interface Trend {
query: string;
books: Book[];
- created_at: Date | null;
}
We‘ll use the timestamp from the kafka event for time tracking, so let‘s remove this field.
Tune src/trend/domain/gateway/trend_manager.ts:
@@ -1,6 +1,13 @@
import { Trend } from "../../../domain/model";
+export type ConsumeCallback = (key: Buffer, value: Buffer) => void;
+
export interface TrendManager {
createTrend(t: Trend): Promise<number>;
- topTrends(offset: number): Promise<Trend[]>;
+ topTrends(pageSize: number): Promise<Trend[]>;
+}
+
+export interface TrendEventConsumer {
+ consumeEvents(callback: ConsumeCallback): Promise<void>;
+ stop(): Promise<void>;
}
Implement TrendEventConsumer
in src/trend/infrastructure/mq/kafka.ts:
import { Kafka, Consumer, EachMessagePayload } from "kafkajs";
import { ConsumeCallback, TrendEventConsumer } from "../../domain/gateway";
const GROUP_ID = "trend-svr";
export class KafkaConsumer implements TrendEventConsumer {
private consumer: Consumer;
private topic: string;
constructor(brokers: string[], topic: string) {
const kafka = new Kafka({ brokers });
this.consumer = kafka.consumer({ groupId: GROUP_ID });
this.topic = topic;
}
async consumeEvents(callback: ConsumeCallback): Promise<void> {
await this.consumer.connect();
await this.consumer.subscribe({ topic: this.topic });
await this.consumer.run({
eachMessage: async ({
topic,
partition,
message,
}: EachMessagePayload) => {
if (message.key && message.value) {
await callback(message.key, message.value);
} else {
console.warn(`Null message from ${topic}-${partition}`);
}
},
});
}
async stop(): Promise<void> {
await this.consumer.disconnect();
}
}
Tune TrendManager
implementations in src/trend/infrastructure/cache/redis.ts:
@@ -19,7 +19,6 @@ export class RedisCache implements TrendManager {
commandTimeout: c.timeout,
};
this.client = new Redis(options);
- console.log("Connected to Redis");
}
async createTrend(t: Trend): Promise<number> {
@@ -32,17 +31,17 @@ export class RedisCache implements TrendManager {
return Number(score);
}
- async topTrends(offset: number): Promise<Trend[]> {
+ async topTrends(pageSize: number): Promise<Trend[]> {
const topItems = await this.client.zrevrange(
trendsKey,
0,
- offset,
+ pageSize - 1,
"WITHSCORES"
);
const trends: Trend[] = [];
for (let i = 0; i < topItems.length; i += 2) {
const query = topItems[i];
- const t = { query: query, books: [], created_at: null };
+ const t = { query: query, books: [] };
const k = queryKeyPrefix + query;
const value = await this.client.get(k);
if (value !== null) {
Tune src/trend/application/wire_helper.ts:
@@ -1,16 +1,23 @@
import { Config } from "../infrastructure/config";
-import { TrendManager } from "../domain/gateway";
+import { TrendManager, TrendEventConsumer } from "../domain/gateway";
import { RedisCache } from "../infrastructure/cache";
+import { KafkaConsumer } from "../infrastructure/mq";
// WireHelper is the helper for dependency injection
export class WireHelper {
private kv_store: RedisCache;
+ private consumer: KafkaConsumer;
constructor(c: Config) {
this.kv_store = new RedisCache(c.cache);
+ this.consumer = new KafkaConsumer(c.mq.brokers, c.mq.topic);
}
trendManager(): TrendManager {
return this.kv_store;
}
+
+ trendEventConsumer(): TrendEventConsumer {
+ return this.consumer;
+ }
}
Add config items in src/trend/infrastructure/config/config.ts:
@@ -12,9 +12,15 @@ export interface CacheConfig {
timeout: number; // in milliseconds
}
+interface MQConfig {
+ brokers: string[];
+ topic: string;
+}
+
export interface Config {
app: ApplicationConfig;
cache: CacheConfig;
+ mq: MQConfig;
}
export function parseConfig(filename: string): Config {
Put in config values in src/trend/config.json:
@@ -8,5 +8,9 @@
"password": "test_pass",
"db": 0,
"timeout": 5000
+ },
+ "mq": {
+ "brokers": ["localhost:9094"],
+ "topic": "lr-book-searches"
}
}
We‘re using port
9094
for external access to the kafka running in a docker container.
See “Accessing Apache Kafka with internal and external clients“ configuration inbitnami/kafka
.
Add src/trend/application/consumer/trend.ts:
import { Trend } from "../../../domain/model";
import { TrendManager, TrendEventConsumer } from "../../domain/gateway";
export class TrendConsumer {
private trendManager: TrendManager;
private eventConsumer: TrendEventConsumer;
constructor(t: TrendManager, e: TrendEventConsumer) {
this.trendManager = t;
this.eventConsumer = e;
}
start() {
const processEvent = async (key: Buffer, data: Buffer): Promise<void> => {
if (key && data) {
const query: string = key.toString("utf-8");
const books: any = JSON.parse(data.toString("utf-8"));
const trend: Trend = { query, books };
await this.trendManager.createTrend(trend);
}
};
this.eventConsumer.consumeEvents(processEvent).catch((err) => {
console.log("Consumer error:", err);
});
}
getEventConsumer(): TrendEventConsumer {
return this.eventConsumer;
}
}
Tune src/trend/application/executor/trend_operator.ts:
@@ -8,11 +8,7 @@ export class TrendOperator {
this.trendManager = t;
}
- async createTrend(t: Trend): Promise<number> {
- return await this.trendManager.createTrend(t);
- }
-
- async topTrends(offset: number): Promise<Trend[]> {
- return await this.trendManager.topTrends(offset);
+ async topTrends(pageSize: number): Promise<Trend[]> {
+ return await this.trendManager.topTrends(pageSize);
}
}
Tune src/trend/adapter/router.ts:
@@ -11,9 +11,9 @@ class RestHandler {
}
public async getTrends(req: Request, res: Response): Promise<void> {
- let offset = parseInt(req.query.o as string) || 0;
+ let pageSize = parseInt(req.query.ps as string) || 0;
try {
- const books = await this.trendOperator.topTrends(offset);
+ const books = await this.trendOperator.topTrends(pageSize);
res.status(200).json(books);
} catch (err) {
console.error(`Failed to get trends: ${err}`);
Modify src/trend/app.ts to allow both the consumer and the router stop gracefully:
@@ -1,13 +1,54 @@
+import { Worker, isMainThread, parentPort } from "worker_threads";
+
import { WireHelper } from "./application";
import { InitApp } from "./adapter/router";
import { parseConfig } from "./infrastructure/config";
+import { TrendConsumer } from "./application/consumer";
-const config_filename = "src/trend/config.json";
+const configFilename = "src/trend/config.json";
+const stopConsumer = "stop-consumer";
+const stopServer = "stop-svr";
-const c = parseConfig(config_filename);
+const c = parseConfig(configFilename);
const wireHelper = new WireHelper(c);
const app = InitApp(wireHelper);
-app.listen(c.app.port, () => {
- console.log(`Running on port ${c.app.port}`);
-});
+if (isMainThread) {
+ const worker = new Worker(__filename);
+
+ const svr = app.listen(c.app.port, () => {
+ console.log(`Running on port ${c.app.port}`);
+
+ worker.on("message", (msg) => {
+ // Close the server
+ if (msg === stopServer) {
+ svr.close(() => {
+ console.log("Server is gracefully closed");
+ process.exit(0);
+ });
+ }
+ });
+
+ const shutdown = () => {
+ console.log("Server is shutting down...");
+ // Stop the consumer
+ worker.postMessage(stopConsumer);
+ };
+
+ // Handle SIGINT (Ctrl+C) and SIGTERM signals
+ process.on("SIGINT", shutdown);
+ process.on("SIGTERM", shutdown);
+ });
+} else {
+ const tc = new TrendConsumer(
+ wireHelper.trendManager(),
+ wireHelper.trendEventConsumer()
+ );
+ parentPort?.on("message", async (msg) => {
+ if (msg === stopConsumer) {
+ await tc.getEventConsumer().stop();
+ parentPort?.postMessage(stopServer);
+ }
+ });
+ tc.start();
+}
The kafka consumer and the express server are 2 threads running in the same process. We useworker_threads
‘s message communications to manually start and stop both of them gracefully.
Remember to create or modify index.ts files to re-export the symbols.
- src/trend/domain/gateway/index.ts
- src/trend/application/consumer/index.ts
- src/trend/infrastructure/mq/index.ts
Tune package.json:
@@ -5,7 +5,7 @@
"main": "app.js",
"scripts": {
"dev-web": "ts-node src/web/app.ts",
- "dev-trend": "ts-node src/trend/app.ts",
+ "dev-trend": "tsc && node dist/trend/app.js",
"build": "tsc"
},
"repository": {
ts-node
doesn't go well with worker_threads
. Let‘s compile the code and run it with original node
.
Start both the web service and the trend service. Try searching keywords such as “love“, “war“ and “wealth“ in the web service‘s index page http://localhost:3000/?q=wealth.
Then you should be able to see some results like this from the trend service (http://localhost:3001/trends):
[
{
"query": "love",
"books": [
{
"id": 4,
"title": "Pride and Prejudice",
"author": "Jane Austen",
"published_at": "1813-01-28",
"description": "A classic novel exploring the themes of love, reputation, and social class in Georgian England.",
"created_at": "2024-04-02T21:02:59.314+08:00"
},
{
"id": 10,
"title": "War and Peace",
"author": "Leo Tolstoy",
"published_at": "1869-01-01",
"description": "A novel depicting the Napoleonic era in Russia, exploring themes of love, war, and historical determinism.",
"created_at": "2024-04-02T21:02:59.42+08:00"
}
]
},
{
"query": "war",
"books": [
{
"id": 10,
"title": "War and Peace",
"author": "Leo Tolstoy",
"published_at": "1869-01-01",
"description": "A novel depicting the Napoleonic era in Russia, exploring themes of love, war, and historical determinism.",
"created_at": "2024-04-02T21:02:59.42+08:00"
},
{
"id": 12,
"title": "The Odyssey",
"author": "Homer",
"published_at": "8th Century BC",
"description": "An ancient Greek epic poem attributed to Homer, detailing the journey of Odysseus after the Trojan War.",
"created_at": "2024-04-02T21:02:59.455+08:00"
}
]
},
{
"query": "wealth",
"books": [
{
"id": 1,
"title": "The Great Gatsby",
"author": "F. Scott Fitzgerald",
"published_at": "1925-04-10",
"description": "A novel depicting the opulent lives of wealthy Long Island residents during the Jazz Age.",
"created_at": "2024-04-02T20:36:01.015+08:00"
}
]
}
]