» Node.js: Building Event-Driven Microservices with Kafka » 4. Consumer: Recommendation Service » 4.2 Event Consumer

Event Consumer

Since the recommendation service‘s consumer has so many things in common with the trend service‘s counterpart, let‘s try to reuse them by moving those common parts into the higher level folders.

Restructure files

  • Tune src/trend/domain/gateway/trend_manager.ts:
@@ -1,13 +1,6 @@
 import { Trend } from "../../../domain/model";
 
-export type ConsumeCallback = (key: Buffer, value: Buffer) => void;
-
 export interface TrendManager {
   createTrend(t: Trend): Promise<number>;
   topTrends(pageSize: number): Promise<Trend[]>;
 }
-
-export interface TrendEventConsumer {
-  consumeEvents(callback: ConsumeCallback): Promise<void>;
-  stop(): Promise<void>;
-}

Move the common TrendEventConsumer and ConsumeCallback types out of this file.

  • Put those types in src/domain/gateway/event_consumer.ts:
export type ConsumeCallback = (key: Buffer, value: Buffer) => void;

export interface TrendEventConsumer {
  consumeEvents(callback: ConsumeCallback): Promise<void>;
  stop(): Promise<void>;
}
  • Move src/trend/infrastructure/mq/kafka.ts to src/infrastructure/mq/kafka_consumer.ts.
export class KafkaConsumer implements TrendEventConsumer {
  ...

  constructor(brokers: string[], topic: string, groupId: string) {

Add a new parameter named groupId for the consumer group.

  • Add config items in src/trend/infrastructure/config/config.ts:
@@ -15,6 +15,7 @@ export interface CacheConfig {
 interface MQConfig {
   brokers: string[];
   topic: string;
+  groupId: string;
 }
 
 export interface Config {
  • Put values in src/trend/config.json:
@@ -11,6 +11,7 @@
   },
   "mq": {
     "brokers": ["localhost:9094"],
-    "topic": "lr-book-searches"
+    "topic": "lr-book-searches",
+    "groupId": "trend-svr"
   }
 }
  • Tune import pathes in src/trend/application/consumer/trend.ts:
@@ -1,5 +1,6 @@
 import { Trend } from "../../../domain/model";
-import { TrendManager, TrendEventConsumer } from "../../domain/gateway";
+import { TrendManager } from "../../domain/gateway";
+import { TrendEventConsumer } from "../../../domain/gateway";
 
 export class TrendConsumer {
   private trendManager: TrendManager;
  • Tune import paths in src/trend/application/wire_helper.ts:
@@ -1,7 +1,8 @@
 import { Config } from "../infrastructure/config";
-import { TrendManager, TrendEventConsumer } from "../domain/gateway";
+import { TrendManager } from "../domain/gateway";
+import { TrendEventConsumer } from "../../domain/gateway";
 import { RedisCache } from "../infrastructure/cache";
-import { KafkaConsumer } from "../infrastructure/mq";
+import { KafkaConsumer } from "../../infrastructure/mq";
 
 // WireHelper is the helper for dependency injection
 export class WireHelper {
@@ -10,7 +11,7 @@ export class WireHelper {
 
   constructor(c: Config) {
     this.kv_store = new RedisCache(c.cache);
-    this.consumer = new KafkaConsumer(c.mq.brokers, c.mq.topic);
+    this.consumer = new KafkaConsumer(c.mq.brokers, c.mq.topic, c.mq.groupId);
   }
 
   trendManager(): TrendManager {
  • Remember to create or update index.ts files:
    • src/trend/domain/gateway/index.ts
    • src/infrastructure/mq/index.ts
    • src/domain/gateway/index.ts

Tune the producer‘s events in the web service

Add the userId parameter in src/web/application/executor/book_operator.ts:

@@ -17,11 +17,16 @@ export class BookOperator {
     return b;
   }
 
-  async getBooks(offset: number, query: string): Promise<Book[]> {
+  async getBooks(
+    offset: number,
+    userId: string,
+    query: string
+  ): Promise<Book[]> {
     const books = await this.bookManager.getBooks(offset, query);
     if (query) {
+      const k = query + ":" + userId;
       const jsonData = JSON.stringify(books);
-      await this.mqHelper.sendEvent(query, Buffer.from(jsonData, "utf8"));
+      await this.mqHelper.sendEvent(k, Buffer.from(jsonData, "utf8"));
     }
     return books;
   }

Install cookie-parser:

npm i cookie-parser

And its types:

npm i -D @types/cookie-parser

Read the cookie value and pass in as the userId in src/web/adapter/router.ts:

@@ -1,4 +1,5 @@
 import express, { Request, Response } from "express";
+import cookieParser from "cookie-parser";
 import { engine } from "express-handlebars";
 
 import { Book, Trend } from "../../domain/model";
@@ -6,6 +7,8 @@ import { BookOperator } from "../application/executor";
 import { WireHelper } from "../application";
 import { RemoteServiceConfig } from "../infrastructure/config";
 
+const FIELD_UID = "uid";
+
 class RestHandler {
   private bookOperator: BookOperator;
   private remote: RemoteServiceConfig;
@@ -16,10 +19,15 @@ class RestHandler {
   }
 
   public async indexPage(req: Request, res: Response): Promise<void> {
+    let user_id = req.cookies.uid;
+    if (!user_id) {
+      user_id = randomString(5);
+      res.cookie(FIELD_UID, user_id, { maxAge: 1000 * 3600 * 24 * 30 });
+    }
     let books: Book[];
     const q = req.query.q as string;
     try {
-      books = await this.bookOperator.getBooks(0, q);
+      books = await this.bookOperator.getBooks(0, user_id, q);
     } catch (err) {
       console.warn(`Failed to get books: ${err}`);
       books = [];
@@ -50,6 +58,7 @@ class RestHandler {
     try {
       const books = await this.bookOperator.getBooks(
         offset,
+        "",
         req.query.q as string
       );
       res.status(200).json(books);
@@ -98,6 +107,9 @@ export function InitApp(
   // Middleware to parse JSON bodies
   app.use(express.json());
 
+  // Use cookie parser middleware
+  app.use(cookieParser());
+
   // Set Handlebars as the template engine
   app.engine("handlebars", engine());
   app.set("view engine", "handlebars");
@@ -108,3 +120,12 @@ export function InitApp(
   app.use("", r);
   return app;
 }
+
+function randomString(length: number): string {
+  const charset: string = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+  let result: string = "";
+  for (let i = 0; i < length; i++) {
+    result += charset.charAt(Math.floor(Math.random() * charset.length));
+  }
+  return result;
+}

Tune the consumer in the trend service

Since the key‘s format has changed, we need to adapt to that in the trend service.

Update src/trend/application/consumer/trend.ts:

   start() {
     const processEvent = async (key: Buffer, data: Buffer): Promise<void> => {
       if (key && data) {
-        const query = key.toString("utf-8");
+        const parts = key.toString("utf-8").split(":");
+        const query = parts[0];
         const books: Book[] = JSON.parse(data.toString("utf-8"));

Recommendation Service‘s consumer

Add src/domain/model/interest.ts:

export interface Interest {
  userId: string;
  title: string;
  author: string;
  score: number;
}

Add src/recommendation/domain/gateway/interest_manager.ts:

import { Interest } from "../../../domain/model";

export interface InterestManager {
  increaseInterest(i: Interest): Promise<void>;
  listInterests(userId: string): Promise<Interest[]>;
}

Install mongo dependencies:

npm i mongodb

Prepare the mongoDB database:

  • Install mongoDB on your machine and start it.
  • Create a database named lr_event_rec.
  • Create necessary indexes for your collections.

Implement the InterestManager in src/recommendation/infrastructure/database/mongo.ts:

import { MongoClient, Db, Collection } from "mongodb";

import { Interest } from "../../../domain/model";
import { InterestManager } from "../../domain/gateway";

const COLL_INTERESTS = "interests";

export class MongoPersistence implements InterestManager {
  private db!: Db;
  private coll!: Collection;
  private pageSize: number;

  constructor(uri: string, dbName: string, pageSize: number) {
    this.pageSize = pageSize;
    const client = new MongoClient(uri);
    client.connect().then(() => {
      this.db = client.db(dbName);
      this.coll = this.db.collection(COLL_INTERESTS);
    });
  }

  async increaseInterest(interest: Interest): Promise<void> {
    const filterQuery = {
      user_id: interest.userId,
      title: interest.title,
      author: interest.author,
    };
    const updateQuery = {
      $inc: { score: 1 },
    };
    await this.coll.updateOne(filterQuery, updateQuery, { upsert: true });
  }

  async listInterests(userId: string): Promise<Interest[]> {
    const filterQuery = { user_id: userId };
    const cursor = this.coll
      .find(filterQuery)
      .sort({ score: -1 })
      .limit(this.pageSize);
    const interestDocs = await cursor.toArray();
    return interestDocs.map((doc) => ({
      userId: doc.user_id,
      title: doc.title,
      author: doc.author,
      score: doc.score,
    }));
  }
}

Add src/recommendation/infrastructure/config/config.ts:

import { readFileSync } from "fs";

interface ApplicationConfig {
  port: number;
  pageSize: number;
}

export interface DatabaseConfig {
  uri: string;
  dbName: string;
}

interface MQConfig {
  brokers: string[];
  topic: string;
  groupId: string;
}

export interface Config {
  app: ApplicationConfig;
  db: DatabaseConfig;
  mq: MQConfig;
}

export function parseConfig(filename: string): Config {
  return JSON.parse(readFileSync(filename, "utf-8"));
}

Add src/recommendation/config.json:

{
  "app": {
    "port": 3002,
    "pageSize": 10
  },
  "db": {
    "uri": "mongodb://localhost:27017",
    "dbName": "lr_event_rec"
  },
  "mq": {
    "brokers": ["localhost:9094"],
    "topic": "lr-book-searches",
    "groupId": "rec-svr"
  }
}

Add src/recommendation/application/consumer/interest.ts:

import { Book } from "../../../domain/model";
import { InterestManager } from "../../domain/gateway";
import { TrendEventConsumer } from "../../../domain/gateway";

export class InterestConsumer {
  private interestManager: InterestManager;
  private eventConsumer: TrendEventConsumer;

  constructor(i: InterestManager, e: TrendEventConsumer) {
    this.interestManager = i;
    this.eventConsumer = e;
  }

  start() {
    const processEvent = async (key: Buffer, data: Buffer): Promise<void> => {
      if (key && data) {
        const parts = key.toString("utf-8").split(":");
        if (parts.length === 1) {
          // no user id, ignore it
          return;
        }
        const query = parts[0];
        const books: Book[] = JSON.parse(data.toString("utf-8"));
        const userId = parts[1];
        for (let book of books) {
          this.interestManager.increaseInterest({
            userId,
            title: book.title,
            author: book.author,
            score: 0,
          });
        }
      }
    };

    this.eventConsumer.consumeEvents(processEvent).catch((err) => {
      console.log("Consumer error:", err);
    });
  }

  getEventConsumer(): TrendEventConsumer {
    return this.eventConsumer;
  }
}

Add src/recommendation/application/wire_helper.ts:

import { Config } from "../infrastructure/config";
import { InterestManager } from "../domain/gateway";
import { TrendEventConsumer } from "../../domain/gateway";
import { MongoPersistence } from "../infrastructure/database";
import { KafkaConsumer } from "../../infrastructure/mq";

// WireHelper is the helper for dependency injection
export class WireHelper {
  private noSQLPersistence: MongoPersistence;
  private consumer: KafkaConsumer;

  constructor(c: Config) {
    this.noSQLPersistence = new MongoPersistence(
      c.db.uri,
      c.db.dbName,
      c.app.pageSize
    );
    this.consumer = new KafkaConsumer(c.mq.brokers, c.mq.topic, c.mq.groupId);
  }

  interestManager(): InterestManager {
    return this.noSQLPersistence;
  }

  trendEventConsumer(): TrendEventConsumer {
    return this.consumer;
  }
}

Add src/recommendation/app.ts:

import { WireHelper } from "./application";
import { parseConfig } from "./infrastructure/config";
import { InterestConsumer } from "./application/consumer";

const configFilename = "src/recommendation/config.json";

const c = parseConfig(configFilename);
const wireHelper = new WireHelper(c);

const tc = new InterestConsumer(
  wireHelper.interestManager(),
  wireHelper.trendEventConsumer()
);
tc.start();

Add start scirpt in package.json:

@@ -6,6 +6,7 @@
   "scripts": {
     "dev-web": "ts-node src/web/app.ts",
     "dev-trend": "tsc && node dist/trend/app.js",
+    "dev-rec": "ts-node src/recommendation/app.ts",
     "build": "tsc"
   },
   "repository": {
@@ -29,6 +30,7 @@
     "express-handlebars": "^7.1.2",
     "ioredis": "^5.3.2",
     "kafkajs": "^2.2.4",
+    "mongodb": "^6.5.0",
     "mysql2": "^3.9.4"
   },
   "devDependencies": {

Remember to create or update index.ts files:

  • src/domain/model/index.ts
  • src/recommendation/application/consumer/index.ts
  • src/recommendation/application/index.ts
  • src/recommendation/domain/gateway/index.ts
  • src/recommendation/infrastructure/config/index.ts
  • src/recommendation/infrastructure/database/index.ts

Start the recommendation consumer:

npm run dev-rec

You should see something similar to this:

{"level":"INFO","timestamp":"2024-04-17T06:55:37.291Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"rec-svr"}
{"level":"INFO","timestamp":"2024-04-17T06:55:40.320Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"rec-svr","memberId":"kafkajs-946c4ada-9ce8-4596-913e-977667990920","leaderId":"kafkajs-946c4ada-9ce8-4596-913e-977667990920","isLeader":true,"memberAssignment":{"lr-book-searches":[0]},"groupProtocol":"RoundRobinAssigner","duration":3027}

From another terminal, restart your web service and try searching with terms like “love“ and “peace“ a couple of times.

Then, you will have records like below in your mongoDB:

lr_event_rec> db.interests.find();
[
  {
    _id: ObjectId('661f7029f2f9d14bb96731a4'),
    title: 'War and Peace',
    user_id: 'VI0GS',
    author: 'Leo Tolstoy',
    score: 2
  },
  {
    _id: ObjectId('661f7029f2f9d14bb96731a5'),
    title: 'Pride and Prejudice',
    user_id: 'VI0GS',
    author: 'Jane Austen',
    score: 1
  },
  {
    _id: ObjectId('661f7030f2f9d14bb96731ac'),
    title: 'The Great Gatsby',
    user_id: 'VI0GS',
    author: 'F. Scott Fitzgerald',
    score: 1
  }
]

That means your consumer is doing well.