» Node.js: Building Event-Driven Microservices with Kafka » 4. Consumer: Recommendation Service » 4.3 Express API Server
Express API Server
Add src/recommendation/application/executor/interest_operator.ts:
import { Interest } from "../../../domain/model";
import { InterestManager } from "../../domain/gateway";
export class InterestOperator {
private interestManager: InterestManager;
constructor(i: InterestManager) {
this.interestManager = i;
}
async interestsForUser(userId: string): Promise<Interest[]> {
return await this.interestManager.listInterests(userId);
}
}
Add src/recommendation/application/executor/index.ts:
export { InterestOperator } from "./interest_operator";
Add src/recommendation/adapter/router.ts:
import express, { Request, Response } from "express";
import { InterestOperator } from "../application/executor";
import { WireHelper } from "../application";
class RestHandler {
private interestOperator: InterestOperator;
constructor(interestOperator: InterestOperator) {
this.interestOperator = interestOperator;
}
public async getInterests(req: Request, res: Response): Promise<void> {
const uid = (req.query.uid as string) || "";
try {
const interests = await this.interestOperator.interestsForUser(uid);
res.status(200).json(interests);
} catch (err) {
console.error(`Failed to get interests: ${err}`);
res.status(404).json({ error: "Failed to get interests" });
}
}
}
// Create router
function MakeRouter(wireHelper: WireHelper): express.Router {
const restHandler = new RestHandler(
new InterestOperator(wireHelper.interestManager())
);
const router = express.Router();
router.get("/recommendations", restHandler.getInterests.bind(restHandler));
return router;
}
export function InitApp(wireHelper: WireHelper): express.Express {
const app = express();
// Middleware to parse JSON bodies
app.use(express.json());
const r = MakeRouter(wireHelper);
app.use("", r);
return app;
}
Start Express server as well in src/recommendation/app.ts:
@@ -1,14 +1,54 @@
+import { Worker, isMainThread, parentPort } from "worker_threads";
+
import { WireHelper } from "./application";
+import { InitApp } from "./adapter/router";
import { parseConfig } from "./infrastructure/config";
import { InterestConsumer } from "./application/consumer";
const configFilename = "src/recommendation/config.json";
+const stopConsumer = "stop-consumer";
+const stopServer = "stop-svr";
const c = parseConfig(configFilename);
const wireHelper = new WireHelper(c);
-const tc = new InterestConsumer(
- wireHelper.interestManager(),
- wireHelper.trendEventConsumer()
-);
-tc.start();
+if (isMainThread) {
+ const worker = new Worker(__filename);
+
+ const app = InitApp(wireHelper);
+ const svr = app.listen(c.app.port, () => {
+ console.log(`Running on port ${c.app.port}`);
+
+ worker.on("message", (msg) => {
+ // Close the server
+ if (msg === stopServer) {
+ svr.close(() => {
+ console.log("Server is gracefully closed");
+ process.exit(0);
+ });
+ }
+ });
+
+ const shutdown = () => {
+ console.log("Server is shutting down...");
+ // Stop the consumer
+ worker.postMessage(stopConsumer);
+ };
+
+ // Handle SIGINT (Ctrl+C) and SIGTERM signals
+ process.on("SIGINT", shutdown);
+ process.on("SIGTERM", shutdown);
+ });
+} else {
+ const tc = new InterestConsumer(
+ wireHelper.interestManager(),
+ wireHelper.trendEventConsumer()
+ );
+ parentPort?.on("message", async (msg) => {
+ if (msg === stopConsumer) {
+ await tc.getEventConsumer().stop();
+ parentPort?.postMessage(stopServer);
+ }
+ });
+ tc.start();
+}
We use the worker_threads
to start/stop the consumer gracefully.
Change the start script in package.json:
@@ -6,7 +6,7 @@
"scripts": {
"dev-web": "ts-node src/web/app.ts",
"dev-trend": "tsc && node dist/trend/app.js",
- "dev-rec": "ts-node src/recommendation/app.ts",
+ "dev-rec": "tsc && node dist/recommendation/app.js",
"build": "tsc"
},
"repository": {
Run the server:
npm run dev-rec
Try to hit the URL http://localhost:3002/recommendations?uid=VI0GS with curl.
“VI0GS“ is my cookie
uid
value. Replace it with yours in the URL.
The result looks like this:
[
{
"userId": "VI0GS",
"title": "War and Peace",
"author": "Leo Tolstoy",
"score": 2
},
{
"userId": "VI0GS",
"title": "Pride and Prejudice",
"author": "Jane Austen",
"score": 1
},
{
"userId": "VI0GS",
"title": "The Great Gatsby",
"author": "F. Scott Fitzgerald",
"score": 1
}
]
Loading...
> code result goes here