» Node.js: Building Event-Driven Microservices with Kafka » 3. Consumer: Trend Service » 3.3 Event Consumer

Event Consumer

Tune src/domain/model/trend.ts:

@@ -3,5 +3,4 @@ import { Book } from ".";
 export interface Trend {
   query: string;
   books: Book[];
-  created_at: Date | null;
 }

We‘ll use the timestamp from the kafka event for time tracking, so let‘s remove this field.

Tune src/trend/domain/gateway/trend_manager.ts:

@@ -1,6 +1,13 @@
 import { Trend } from "../../../domain/model";
 
+export type ConsumeCallback = (key: Buffer, value: Buffer) => void;
+
 export interface TrendManager {
   createTrend(t: Trend): Promise<number>;
-  topTrends(offset: number): Promise<Trend[]>;
+  topTrends(pageSize: number): Promise<Trend[]>;
+}
+
+export interface TrendEventConsumer {
+  consumeEvents(callback: ConsumeCallback): Promise<void>;
+  stop(): Promise<void>;
 }

Implement TrendEventConsumer in src/trend/infrastructure/mq/kafka.ts:

import { Kafka, Consumer, EachMessagePayload } from "kafkajs";

import { ConsumeCallback, TrendEventConsumer } from "../../domain/gateway";

const GROUP_ID = "trend-svr";

export class KafkaConsumer implements TrendEventConsumer {
  private consumer: Consumer;
  private topic: string;

  constructor(brokers: string[], topic: string) {
    const kafka = new Kafka({ brokers });
    this.consumer = kafka.consumer({ groupId: GROUP_ID });
    this.topic = topic;
  }

  async consumeEvents(callback: ConsumeCallback): Promise<void> {
    await this.consumer.connect();
    await this.consumer.subscribe({ topic: this.topic });

    await this.consumer.run({
      eachMessage: async ({
        topic,
        partition,
        message,
      }: EachMessagePayload) => {
        if (message.key && message.value) {
          await callback(message.key, message.value);
        } else {
          console.warn(`Null message from ${topic}-${partition}`);
        }
      },
    });
  }

  async stop(): Promise<void> {
    await this.consumer.disconnect();
  }
}

Tune TrendManager implementations in src/trend/infrastructure/cache/redis.ts:

@@ -19,7 +19,6 @@ export class RedisCache implements TrendManager {
       commandTimeout: c.timeout,
     };
     this.client = new Redis(options);
-    console.log("Connected to Redis");
   }
 
   async createTrend(t: Trend): Promise<number> {
@@ -32,17 +31,17 @@ export class RedisCache implements TrendManager {
     return Number(score);
   }
 
-  async topTrends(offset: number): Promise<Trend[]> {
+  async topTrends(pageSize: number): Promise<Trend[]> {
     const topItems = await this.client.zrevrange(
       trendsKey,
       0,
-      offset,
+      pageSize - 1,
       "WITHSCORES"
     );
     const trends: Trend[] = [];
     for (let i = 0; i < topItems.length; i += 2) {
       const query = topItems[i];
-      const t = { query: query, books: [], created_at: null };
+      const t = { query: query, books: [] };
       const k = queryKeyPrefix + query;
       const value = await this.client.get(k);
       if (value !== null) {

Tune src/trend/application/wire_helper.ts:

@@ -1,16 +1,23 @@
 import { Config } from "../infrastructure/config";
-import { TrendManager } from "../domain/gateway";
+import { TrendManager, TrendEventConsumer } from "../domain/gateway";
 import { RedisCache } from "../infrastructure/cache";
+import { KafkaConsumer } from "../infrastructure/mq";
 
 // WireHelper is the helper for dependency injection
 export class WireHelper {
   private kv_store: RedisCache;
+  private consumer: KafkaConsumer;
 
   constructor(c: Config) {
     this.kv_store = new RedisCache(c.cache);
+    this.consumer = new KafkaConsumer(c.mq.brokers, c.mq.topic);
   }
 
   trendManager(): TrendManager {
     return this.kv_store;
   }
+
+  trendEventConsumer(): TrendEventConsumer {
+    return this.consumer;
+  }
 }

Add config items in src/trend/infrastructure/config/config.ts:

@@ -12,9 +12,15 @@ export interface CacheConfig {
   timeout: number; // in milliseconds
 }
 
+interface MQConfig {
+  brokers: string[];
+  topic: string;
+}
+
 export interface Config {
   app: ApplicationConfig;
   cache: CacheConfig;
+  mq: MQConfig;
 }
 
 export function parseConfig(filename: string): Config {

Put in config values in src/trend/config.json:

@@ -8,5 +8,9 @@
     "password": "test_pass",
     "db": 0,
     "timeout": 5000
+  },
+  "mq": {
+    "brokers": ["localhost:9094"],
+    "topic": "lr-book-searches"
   }
 }

We‘re using port 9094 for external access to the kafka running in a docker container.
See “Accessing Apache Kafka with internal and external clients“ configuration in bitnami/kafka.

Add src/trend/application/consumer/trend.ts:

import { Trend } from "../../../domain/model";
import { TrendManager, TrendEventConsumer } from "../../domain/gateway";

export class TrendConsumer {
  private trendManager: TrendManager;
  private eventConsumer: TrendEventConsumer;

  constructor(t: TrendManager, e: TrendEventConsumer) {
    this.trendManager = t;
    this.eventConsumer = e;
  }

  start() {
    const processEvent = async (key: Buffer, data: Buffer): Promise<void> => {
      if (key && data) {
        const query: string = key.toString("utf-8");
        const books: any = JSON.parse(data.toString("utf-8"));
        const trend: Trend = { query, books };
        await this.trendManager.createTrend(trend);
      }
    };

    this.eventConsumer.consumeEvents(processEvent).catch((err) => {
      console.log("Consumer error:", err);
    });
  }

  getEventConsumer(): TrendEventConsumer {
    return this.eventConsumer;
  }
}

Tune src/trend/application/executor/trend_operator.ts:

@@ -8,11 +8,7 @@ export class TrendOperator {
     this.trendManager = t;
   }
 
-  async createTrend(t: Trend): Promise<number> {
-    return await this.trendManager.createTrend(t);
-  }
-
-  async topTrends(offset: number): Promise<Trend[]> {
-    return await this.trendManager.topTrends(offset);
+  async topTrends(pageSize: number): Promise<Trend[]> {
+    return await this.trendManager.topTrends(pageSize);
   }
 }

Tune src/trend/adapter/router.ts:

@@ -11,9 +11,9 @@ class RestHandler {
   }
 
   public async getTrends(req: Request, res: Response): Promise<void> {
-    let offset = parseInt(req.query.o as string) || 0;
+    let pageSize = parseInt(req.query.ps as string) || 0;
     try {
-      const books = await this.trendOperator.topTrends(offset);
+      const books = await this.trendOperator.topTrends(pageSize);
       res.status(200).json(books);
     } catch (err) {
       console.error(`Failed to get trends: ${err}`);

Modify src/trend/app.ts to allow both the consumer and the router stop gracefully:

@@ -1,13 +1,54 @@
+import { Worker, isMainThread, parentPort } from "worker_threads";
+
 import { WireHelper } from "./application";
 import { InitApp } from "./adapter/router";
 import { parseConfig } from "./infrastructure/config";
+import { TrendConsumer } from "./application/consumer";
 
-const config_filename = "src/trend/config.json";
+const configFilename = "src/trend/config.json";
+const stopConsumer = "stop-consumer";
+const stopServer = "stop-svr";
 
-const c = parseConfig(config_filename);
+const c = parseConfig(configFilename);
 const wireHelper = new WireHelper(c);
 const app = InitApp(wireHelper);
 
-app.listen(c.app.port, () => {
-  console.log(`Running on port ${c.app.port}`);
-});
+if (isMainThread) {
+  const worker = new Worker(__filename);
+
+  const svr = app.listen(c.app.port, () => {
+    console.log(`Running on port ${c.app.port}`);
+
+    worker.on("message", (msg) => {
+      // Close the server
+      if (msg === stopServer) {
+        svr.close(() => {
+          console.log("Server is gracefully closed");
+          process.exit(0);
+        });
+      }
+    });
+
+    const shutdown = () => {
+      console.log("Server is shutting down...");
+      // Stop the consumer
+      worker.postMessage(stopConsumer);
+    };
+
+    // Handle SIGINT (Ctrl+C) and SIGTERM signals
+    process.on("SIGINT", shutdown);
+    process.on("SIGTERM", shutdown);
+  });
+} else {
+  const tc = new TrendConsumer(
+    wireHelper.trendManager(),
+    wireHelper.trendEventConsumer()
+  );
+  parentPort?.on("message", async (msg) => {
+    if (msg === stopConsumer) {
+      await tc.getEventConsumer().stop();
+      parentPort?.postMessage(stopServer);
+    }
+  });
+  tc.start();
+}

The kafka consumer and the express server are 2 threads running in the same process. We useworker_threads‘s message communications to manually start and stop both of them gracefully.

Remember to create or modify index.ts files to re-export the symbols.

  • src/trend/domain/gateway/index.ts
  • src/trend/application/consumer/index.ts
  • src/trend/infrastructure/mq/index.ts

Tune package.json:

@@ -5,7 +5,7 @@
   "main": "app.js",
   "scripts": {
     "dev-web": "ts-node src/web/app.ts",
-    "dev-trend": "ts-node src/trend/app.ts",
+    "dev-trend": "tsc && node dist/trend/app.js",
     "build": "tsc"
   },
   "repository": {

ts-node doesn't go well with worker_threads. Let‘s compile the code and run it with original node.

Start both the web service and the trend service. Try searching keywords such as “love“, “war“ and “wealth“ in the web service‘s index page http://localhost:3000/?q=wealth.

Then you should be able to see some results like this from the trend service (http://localhost:3001/trends):

[
  {
    "query": "love",
    "books": [
      {
        "id": 4,
        "title": "Pride and Prejudice",
        "author": "Jane Austen",
        "published_at": "1813-01-28",
        "description": "A classic novel exploring the themes of love, reputation, and social class in Georgian England.",
        "created_at": "2024-04-02T21:02:59.314+08:00"
      },
      {
        "id": 10,
        "title": "War and Peace",
        "author": "Leo Tolstoy",
        "published_at": "1869-01-01",
        "description": "A novel depicting the Napoleonic era in Russia, exploring themes of love, war, and historical determinism.",
        "created_at": "2024-04-02T21:02:59.42+08:00"
      }
    ]
  },
  {
    "query": "war",
    "books": [
      {
        "id": 10,
        "title": "War and Peace",
        "author": "Leo Tolstoy",
        "published_at": "1869-01-01",
        "description": "A novel depicting the Napoleonic era in Russia, exploring themes of love, war, and historical determinism.",
        "created_at": "2024-04-02T21:02:59.42+08:00"
      },
      {
        "id": 12,
        "title": "The Odyssey",
        "author": "Homer",
        "published_at": "8th Century BC",
        "description": "An ancient Greek epic poem attributed to Homer, detailing the journey of Odysseus after the Trojan War.",
        "created_at": "2024-04-02T21:02:59.455+08:00"
      }
    ]
  },
  {
    "query": "wealth",
    "books": [
      {
        "id": 1,
        "title": "The Great Gatsby",
        "author": "F. Scott Fitzgerald",
        "published_at": "1925-04-10",
        "description": "A novel depicting the opulent lives of wealthy Long Island residents during the Jazz Age.",
        "created_at": "2024-04-02T20:36:01.015+08:00"
      }
    ]
  }
]
PrevNext