» Node.js: Building Event-Driven Microservices with Kafka » 3. Consumer: Trend Service » 3.2 Express API Server

Express API Server

Create src/domain/model/trend.ts:

import { Book } from ".";

export interface Trend {
  query: string;
  books: Book[];
  created_at: Date | null;
}

Create src/trend/domain/gateway/trend_manager.ts:

import { Trend } from "../../../domain/model";

export interface TrendManager {
  createTrend(t: Trend): Promise<number>;
  topTrends(offset: number): Promise<Trend[]>;
}

In Redis, a ZSET (sorted set) is a data structure that combines the features of both sets and sorted lists. It is similar to a regular set, but it also maintains a sorting order based on a score associated with each member. This score allows members to be sorted in ascending or descending order.

So, we can use a ZSET to store the trends.

Install redis dependency:

npm i ioredis

Create src/trend/infrastructure/cache/redis.ts:

import Redis, { RedisOptions } from "ioredis";

import { CacheConfig } from "../config/config";
import { TrendManager } from "../../domain/gateway";
import { Trend } from "../../../domain/model";

const trendsKey = "trends";
const queryKeyPrefix = "q-";

export class RedisCache implements TrendManager {
  private client: Redis;

  constructor(c: CacheConfig) {
    const options: RedisOptions = {
      host: c.host,
      port: c.port,
      password: c.password,
      db: c.db,
      commandTimeout: c.timeout,
    };
    this.client = new Redis(options);
    console.log("Connected to Redis");
  }

  async createTrend(t: Trend): Promise<number> {
    const member = t.query;
    const score = await this.client.zincrby(trendsKey, 1, member);

    const k = queryKeyPrefix + t.query;
    const results = JSON.stringify(t.books);
    await this.client.set(k, results);
    return Number(score);
  }

  async topTrends(offset: number): Promise<Trend[]> {
    const topItems = await this.client.zrevrange(
      trendsKey,
      0,
      offset,
      "WITHSCORES"
    );
    const trends: Trend[] = [];
    for (let i = 0; i < topItems.length; i += 2) {
      const query = topItems[i];
      const t = { query: query, books: [], created_at: null };
      const k = queryKeyPrefix + query;
      const value = await this.client.get(k);
      if (value !== null) {
        t.books = JSON.parse(value);
      }
      trends.push(t);
    }
    return trends;
  }
}

Create src/trend/infrastructure/config/config.ts:

import { readFileSync } from "fs";

interface ApplicationConfig {
  port: number;
}

export interface CacheConfig {
  host: string;
  port: number;
  password: string;
  db: number;
  timeout: number; // in milliseconds
}

export interface Config {
  app: ApplicationConfig;
  cache: CacheConfig;
}

export function parseConfig(filename: string): Config {
  return JSON.parse(readFileSync(filename, "utf-8"));
}

Create src/trend/config.json:

{
  "app": {
    "port": 3001
  },
  "cache": {
    "host": "localhost",
    "port": 6379,
    "password": "test_pass",
    "db": 0,
    "timeout": 5000
  }
}

Create src/trend/application/wire_helper.ts:

import { Config } from "../infrastructure/config";
import { TrendManager } from "../domain/gateway";
import { RedisCache } from "../infrastructure/cache";

// WireHelper is the helper for dependency injection
export class WireHelper {
  private kv_store: RedisCache;

  constructor(c: Config) {
    this.kv_store = new RedisCache(c.cache);
  }

  trendManager(): TrendManager {
    return this.kv_store;
  }
}

Create src/trend/application/executor/trend_operator.ts:

import { Trend } from "../../../domain/model";
import { TrendManager } from "../../domain/gateway";

export class TrendOperator {
  private trendManager: TrendManager;

  constructor(t: TrendManager) {
    this.trendManager = t;
  }

  async createTrend(t: Trend): Promise<number> {
    return await this.trendManager.createTrend(t);
  }

  async topTrends(offset: number): Promise<Trend[]> {
    return await this.trendManager.topTrends(offset);
  }
}

Create src/trend/adapter/router.ts:

import express, { Request, Response } from "express";

import { TrendOperator } from "../application/executor";
import { WireHelper } from "../application";

class RestHandler {
  private trendOperator: TrendOperator;

  constructor(trendOperator: TrendOperator) {
    this.trendOperator = trendOperator;
  }

  public async getTrends(req: Request, res: Response): Promise<void> {
    let offset = parseInt(req.query.o as string) || 0;
    try {
      const books = await this.trendOperator.topTrends(offset);
      res.status(200).json(books);
    } catch (err) {
      console.error(`Failed to get trends: ${err}`);
      res.status(404).json({ error: "Failed to get trends" });
    }
  }
}

// Create router
function MakeRouter(wireHelper: WireHelper): express.Router {
  const restHandler = new RestHandler(
    new TrendOperator(wireHelper.trendManager())
  );

  const router = express.Router();
  router.get("/trends", restHandler.getTrends.bind(restHandler));
  return router;
}

export function InitApp(wireHelper: WireHelper): express.Express {
  const app = express();

  // Middleware to parse JSON bodies
  app.use(express.json());

  const r = MakeRouter(wireHelper);
  app.use("", r);
  return app;
}

Finally, add src/trend/app.ts:

import { WireHelper } from "./application";
import { InitApp } from "./adapter/router";
import { parseConfig } from "./infrastructure/config";

const config_filename = "src/trend/config.json";

const c = parseConfig(config_filename);
const wireHelper = new WireHelper(c);
const app = InitApp(wireHelper);

app.listen(c.app.port, () => {
  console.log(`Running on port ${c.app.port}`);
});

Remember to add index.ts files for the above sub-folders.

Add a script in package.json:

@@ -5,6 +5,7 @@
   "main": "app.js",
   "scripts": {
     "dev-web": "ts-node src/web/app.ts",
+    "dev-trend": "ts-node src/trend/app.ts",
     "build": "tsc"
   },
   "repository": {
@@ -25,6 +26,7 @@
   "dependencies": {
     "express": "^4.19.2",
     "express-handlebars": "^7.1.2",
+    "ioredis": "^5.3.2",
     "kafkajs": "^2.2.4",
     "mysql2": "^3.9.4"
   },

Run the server:

npm run dev-trend

Try to hit the URL http://localhost:3001/trends with curl.