» Go: Building Event-Driven Microservices with Kafka » 2. Producer: Web Service » 2.5 Event Producer

Event Producer

In event-driven architecture, event producers and consumers play crucial roles in facilitating communication and interaction between different components or services within a system.

An event producer is any component or service within a system that generates or emits events.

An event consumer is a component or service within a system that subscribes to and processes events emitted by event producers.

Event Queue

In this diagram, the “Bookstore Web Service“ is the producer, and the other two services are the consumers.

Key benefits of event-driven architecture include:

  • Loose coupling: Event-driven architecture promotes loose coupling between components, as producers and consumers interact through asynchronous communication based on events.
  • Scalability: It enables scalable systems by allowing event consumers to scale independently from event producers, ensuring that the system can handle varying workloads efficiently.
  • Fault tolerance: Event-driven architecture supports fault tolerance and resilience, as events can be reliably processed even if some components or services are temporarily unavailable.

Use Apache Kafka

  1. Install Apache Kafka on your machine and start it.

Using docker image is the easist way to have kafka on your machine.

docker pull apache/kafka:3.7.0
docker run -p 9092:9092 apache/kafka:3.7.0
  1. Create a topic to store your events.

Events are organized and durably stored in topics. Very simplified, a topic is similar to a folder in a filesystem, and the events are the files in that folder.

bin/kafka-topics.sh --create --topic lr-book-searches --bootstrap-server localhost:9092

Topics in Kafka are always multi-producer and multi-subscriber: a topic can have zero, one, or many producers that write events to it, as well as zero, one, or many consumers that subscribe to these events.

  1. Add kafka dependency:
go get -u github.com/IBM/sarama

Sarama is an MIT-licensed Go client library for Apache Kafka.

  1. Update code.

Create infrastructure/mq/helper.go:

package mq

// Helper sends events to the message queue.
type Helper interface {
	SendEvent(key string, value []byte) (bool, error)
}

Create infrastructure/mq/kafka.go:

/*
Package mq does all message queue jobs.
*/
package mq

import (
	"github.com/IBM/sarama"
)

// KafkaQueue runs all Kafka operations
type KafkaQueue struct {
	producer sarama.SyncProducer
	topic    string
}

// NewKafkaQueue constructs a new KafkaQueue
func NewKafkaQueue(brokers []string, topic string) (*KafkaQueue, error) {
	// Configuration
	c := sarama.NewConfig()
	c.Producer.RequiredAcks = sarama.WaitForLocal
	c.Producer.Retry.Max = 3
	c.Producer.Return.Successes = true
	// Create producer
	producer, err := sarama.NewSyncProducer(brokers, c)
	if err != nil {
		return nil, err
	}
	return &KafkaQueue{producer, topic}, nil
}

// CreateBook creates a new book
func (k *KafkaQueue) SendEvent(key string, value []byte) (bool, error) {
	// Send a message
	message := &sarama.ProducerMessage{
		Topic: k.topic,
		Key:   sarama.StringEncoder(key),
		Value: sarama.ByteEncoder(value),
	}
	// Send message to Kafka
	_, _, err := k.producer.SendMessage(message)
	if err != nil {
		return false, err
	}
	return true, nil
}

Add message queue config items in infrastructure/config/config.go:

@@ -14,6 +14,7 @@ import (
 type Config struct {
        App ApplicationConfig `json:"app" yaml:"app"`
        DB  DBConfig          `json:"db" yaml:"db"`
+       MQ  MQConfig          `json:"mq" yaml:"mq"`
 }
 
 // DBConfig is the configuration of databases.
@@ -28,6 +29,12 @@ type ApplicationConfig struct {
        TemplatesPattern string `json:"templates_pattern" yaml:"templates_pattern"`
 }
 
+// MQConfig is the configuration of message queues.
+type MQConfig struct {
+       Brokers []string `json:"brokers" yaml:"brokers"`
+       Topic   string   `json:"topic" yaml:"topic"`
+}
+
 // Parse parses config file and returns a Config.
 func Parse(filename string) (*Config, error) {
        buf, err := os.ReadFile(filename)

Put in config values in config.yml:

@@ -3,4 +3,8 @@ app:
   page_size: 5
   templates_pattern: "adapter/templates/*.html"
 db:
-  dsn: "test_user:test_pass@tcp(127.0.0.1:3306)/lr_event_book?charset=utf8mb4&parseTime=True&loc=Local"
\ No newline at end of file
+  dsn: "test_user:test_pass@tcp(127.0.0.1:3306)/lr_event_book?charset=utf8mb4&parseTime=True&loc=Local"
+mq:
+  brokers:
+    - localhost:9092
+  topic: "lr-book-searches"

Tune application/wire_helper.go:

@@ -7,11 +7,13 @@ import (
        "literank.com/event-books/domain/gateway"
        "literank.com/event-books/infrastructure/config"
        "literank.com/event-books/infrastructure/database"
+       "literank.com/event-books/infrastructure/mq"
 )
 
 // WireHelper is the helper for dependency injection
 type WireHelper struct {
        sqlPersistence *database.MySQLPersistence
+       mq             *mq.KafkaQueue
 }
 
 // NewWireHelper constructs a new WireHelper
@@ -20,12 +22,22 @@ func NewWireHelper(c *config.Config) (*WireHelper, error) {
        if err != nil {
                return nil, err
        }
+       mq, err := mq.NewKafkaQueue(c.MQ.Brokers, c.MQ.Topic)
+       if err != nil {
+               return nil, err
+       }
 
        return &WireHelper{
-               sqlPersistence: db}, nil
+               sqlPersistence: db, mq: mq,
+       }, nil
 }
 
 // BookManager returns an instance of BookManager
 func (w *WireHelper) BookManager() gateway.BookManager {
        return w.sqlPersistence
 }
+
+// MessageQueueHelper returns an instance of mq helper
+func (w *WireHelper) MessageQueueHelper() mq.Helper {
+       return w.mq
+}

Tune application/executor/book_operator.go:

@@ -5,19 +5,23 @@ package executor
 
 import (
        "context"
+       "encoding/json"
+       "fmt"
 
        "literank.com/event-books/domain/gateway"
        "literank.com/event-books/domain/model"
+       "literank.com/event-books/infrastructure/mq"
 )
 
 // BookOperator handles book input/output and proxies operations to the book manager.
 type BookOperator struct {
        bookManager gateway.BookManager
+       mqHelper    mq.Helper
 }
 
 // NewBookOperator constructs a new BookOperator
-func NewBookOperator(b gateway.BookManager) *BookOperator {
-       return &BookOperator{bookManager: b}
+func NewBookOperator(b gateway.BookManager, m mq.Helper) *BookOperator {
+       return &BookOperator{bookManager: b, mqHelper: m}
 }
 
 // CreateBook creates a new book
@@ -32,5 +36,17 @@ func (o *BookOperator) CreateBook(ctx context.Context, b *model.Book) (*model.Bo
 
 // GetBooks gets a list of books by offset and keyword, and caches its result if needed
 func (o *BookOperator) GetBooks(ctx context.Context, offset int, query string) ([]*model.Book, error) {
-       return o.bookManager.GetBooks(ctx, offset, query)
+       books, err := o.bookManager.GetBooks(ctx, offset, query)
+       if err != nil {
+               return nil, err
+       }
+       // Send search query and its results
+       if query != "" {
+               jsonData, err := json.Marshal(books)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to send event due to %w", err)
+               }
+               o.mqHelper.SendEvent(query, jsonData)
+       }
+       return books, nil
 }

Tune adapter/router.go:

@@ -27,7 +27,7 @@ type RestHandler struct {
 
 func newRestHandler(wireHelper *application.WireHelper) *RestHandler {
        return &RestHandler{
-               bookOperator: executor.NewBookOperator(wireHelper.BookManager()),
+               bookOperator: executor.NewBookOperator(wireHelper.BookManager(), wireHelper.MessageQueueHelper()),
        }
 }

Restart your server and try some searches, you‘ll see some kafka logs similar to this:

[2024-04-03 05:48:16,240] INFO [QuorumController id=0] CreateTopics result(s): CreatableTopic(name='lr-book-searches', numPartitions=1, replicationFactor=1, assignments=[], configs=[]): SUCCESS (org.apache.kafka.controller.ReplicationControlManager)
[2024-04-03 05:48:16,241] INFO [QuorumController id=0] Replayed TopicRecord for topic lr-book-searches with topic ID UVB4UXbPT6SjcFYtElt56A. (org.apache.kafka.controller.ReplicationControlManager)
[2024-04-03 05:48:16,241] INFO [QuorumController id=0] Replayed PartitionRecord for new partition lr-book-searches-0 with topic ID UVB4UXbPT6SjcFYtElt56A and PartitionRegistration(replicas=[0], directories=[2_8o-ARVFEZ2Ue7fnPRGrg], isr=[0], removingReplicas=[], addingReplicas=[], elr=[], lastKnownElr=[], leader=0, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager)
[2024-04-03 05:48:16,271] INFO [Broker id=0] Transitioning 1 partition(s) to local leaders. (state.change.logger)
PrevNext