Event Producer
In event-driven architecture, event producers and consumers play crucial roles in facilitating communication and interaction between different components or services within a system.
An event producer is any component or service within a system that generates or emits events.
An event consumer is a component or service within a system that subscribes to and processes events emitted by event producers.
In this diagram, the “Bookstore Web Service“ is the producer, and the other two services are the consumers.
Key benefits of event-driven architecture include:
- Loose coupling: Event-driven architecture promotes loose coupling between components, as producers and consumers interact through asynchronous communication based on events.
- Scalability: It enables scalable systems by allowing event consumers to scale independently from event producers, ensuring that the system can handle varying workloads efficiently.
- Fault tolerance: Event-driven architecture supports fault tolerance and resilience, as events can be reliably processed even if some components or services are temporarily unavailable.
Use Apache Kafka
- Install Apache Kafka on your machine and start it.
Using docker image is the easist way to have kafka on your machine.
docker pull apache/kafka:3.7.0 docker run -p 9092:9092 apache/kafka:3.7.0
- Create a topic to store your events.
Events are organized and durably stored in topics. Very simplified, a topic is similar to a folder in a filesystem, and the events are the files in that folder.
bin/kafka-topics.sh --create --topic lr-book-searches --bootstrap-server localhost:9092
Topics in Kafka are always multi-producer and multi-subscriber: a topic can have zero, one, or many producers that write events to it, as well as zero, one, or many consumers that subscribe to these events.
- Add kafka dependency:
npm i kafkajs
KafkaJS, a modern Apache Kafka client for Node.js.
- Update code.
Create src/infrastructure/mq/index.ts:
export { KafkaQueue } from "./kafka";
export interface MQHelper {
sendEvent(key: string, value: Buffer): Promise<boolean>;
}
Create src/infrastructure/mq/kafka.ts:
import { Kafka, Producer } from "kafkajs";
import { MQHelper } from ".";
export class KafkaQueue implements MQHelper {
private producer: Producer;
private topic: string;
private _connected: boolean = false;
constructor(brokers: string[], topic: string) {
this.topic = topic;
const k = new Kafka({
clientId: "web-svr",
brokers,
});
this.producer = k.producer({ allowAutoTopicCreation: true });
}
async sendEvent(key: string, value: Buffer): Promise<boolean> {
if (!this._connected) {
await this.connect();
}
await this.producer.send({
topic: this.topic,
messages: [
{
key,
value,
},
],
});
return true;
}
async connect(): Promise<void> {
await this.producer.connect();
this._connected = true;
}
async close(): Promise<void> {
await this.producer.disconnect();
}
}
Add message queue config items in src/infrastructure/config/config.ts:
@@ -10,9 +10,15 @@ interface ApplicationConfig {
templates_dir: string;
}
+interface MQConfig {
+ brokers: string[];
+ topic: string;
+}
+
export interface Config {
app: ApplicationConfig;
db: DBConfig;
+ mq: MQConfig;
}
export function parseConfig(filename: string): Config {
Put in config values in config.json:
@@ -6,5 +6,9 @@
},
"db": {
"dsn": "mysql://test_user:test_pass@127.0.0.1:3306/lr_event_book?charset=utf8mb4"
+ },
+ "mq": {
+ "brokers": ["localhost:9094"],
+ "topic": "lr-book-searches"
}
}
We‘re using port
9094
for external access to the kafka running in a docker container.
See “Accessing Apache Kafka with internal and external clients“ configuration inbitnami/kafka
.
Tune src/application/wire_helper.ts:
@@ -1,16 +1,23 @@
import { Config } from "../infrastructure/config";
import { BookManager } from "../domain/gateway";
import { MySQLPersistence } from "../infrastructure/database";
+import { KafkaQueue, MQHelper } from "../infrastructure/mq";
// WireHelper is the helper for dependency injection
export class WireHelper {
private sql_persistence: MySQLPersistence;
+ private mq: KafkaQueue;
constructor(c: Config) {
this.sql_persistence = new MySQLPersistence(c.db.dsn, c.app.page_size);
+ this.mq = new KafkaQueue(c.mq.brokers, c.mq.topic);
}
bookManager(): BookManager {
return this.sql_persistence;
}
+
+ messageQueueHelper(): MQHelper {
+ return this.mq;
+ }
}
Tune src/application/executor/book_operator.ts:
@@ -1,11 +1,14 @@
import { BookManager } from "../../domain/gateway";
import { Book } from "../../domain/model";
+import { MQHelper } from "../../infrastructure/mq";
export class BookOperator {
private bookManager: BookManager;
+ private mqHelper: MQHelper;
- constructor(b: BookManager) {
+ constructor(b: BookManager, m: MQHelper) {
this.bookManager = b;
+ this.mqHelper = m;
}
async createBook(b: Book): Promise<Book> {
@@ -15,6 +18,11 @@ export class BookOperator {
}
async getBooks(offset: number, query: string): Promise<Book[]> {
- return await this.bookManager.getBooks(offset, query);
+ const books = await this.bookManager.getBooks(offset, query);
+ if (query) {
+ const jsonData = JSON.stringify(books);
+ await this.mqHelper.sendEvent(query, Buffer.from(jsonData, "utf8"));
+ }
+ return books;
}
}
Tune src/adapter/router.ts:
@@ -63,7 +63,7 @@ class RestHandler {
// Create router
function MakeRouter(wireHelper: WireHelper): express.Router {
const restHandler = new RestHandler(
- new BookOperator(wireHelper.bookManager())
+ new BookOperator(wireHelper.bookManager(), wireHelper.messageQueueHelper())
);
const router = express.Router();
Restart your server and try some searches, you‘ll see some kafka logs similar to this:
[2024-04-14 18:00:27,716] INFO Sent auto-creation request for Set(lr-book-searches) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
[2024-04-14 18:00:27,737] INFO [QuorumController id=0] CreateTopics result(s): CreatableTopic(name='lr-book-searches', numPartitions=1, replicationFactor=1, assignments=[], configs=[]): SUCCESS (org.apache.kafka.controller.ReplicationControlManager)
[2024-04-14 18:00:27,737] INFO [QuorumController id=0] Replayed TopicRecord for topic lr-book-searches with topic ID T02I1daRSEKcB3YZLrC3hg. (org.apache.kafka.controller.ReplicationControlManager)
[2024-04-14 18:00:27,738] INFO [QuorumController id=0] Replayed PartitionRecord for new partition lr-book-searches-0 with topic ID T02I1daRSEKcB3YZLrC3hg and PartitionRegistration(replicas=[0], directories=[9pTFQhR6i-GUUBD0tbOFlg], isr=[0], removingReplicas=[], addingReplicas=[], elr=[], lastKnownElr=[], leader=0, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager)
[2024-04-14 18:00:27,770] INFO [Broker id=0] Transitioning 1 partition(s) to local leaders. (state.change.logger)
[2024-04-14 18:00:27,771] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(lr-book-searches-0) (kafka.server.ReplicaFetcherManager)
[2024-04-14 18:00:27,774] INFO [Broker id=0] Creating new partition lr-book-searches-0 with topic id T02I1daRSEKcB3YZLrC3hg. (state.change.logger)
[2024-04-14 18:00:27,794] INFO [LogLoader partition=lr-book-searches-0, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
[2024-04-14 18:00:27,798] INFO Created log for partition lr-book-searches-0 in /bitnami/kafka/data/lr-book-searches-0 with properties {} (kafka.log.LogManager)
[2024-04-14 18:00:27,800] INFO [Partition lr-book-searches-0 broker=0] No checkpointed highwatermark is found for partition lr-book-searches-0 (kafka.cluster.Partition)
[2024-04-14 18:00:27,802] INFO [Partition lr-book-searches-0 broker=0] Log loaded for partition lr-book-searches-0 with initial high watermark 0 (kafka.cluster.Partition)
[2024-04-14 18:00:27,805] INFO [Broker id=0] Leader lr-book-searches-0 with topic id Some(T02I1daRSEKcB3YZLrC3hg) starts at leader epoch 0 from offset 0 with partition epoch 0, high watermark 0, ISR [0], adding replicas [] and removing replicas [] . Previous leader None and previous leader epoch was -1. (state.change.logger)