» Go: Building Event-Driven Microservices with Kafka » 4. Consumer: Recommendation Service » 4.2 Event Consumer

Event Consumer

Since the recommendation service‘s consumer has so many things in common with the trend service‘s counterpart, let‘s try to reuse them by moving those common parts into the higher level folders.

Restructure files

  • Tune service/trend/domain/gateway/trend_manager.go:
@@ -9,16 +9,8 @@ import (
        "literank.com/event-books/domain/model"
 )
 
-type ConsumeCallback func(key, value []byte) error
-
 // TrendManager manages all trends
 type TrendManager interface {
        CreateTrend(ctx context.Context, t *model.Trend) (uint, error)
        TopTrends(ctx context.Context, pageSize uint) ([]*model.Trend, error)
 }
-
-// TrendEventConsumer consumes trend events
-type TrendEventConsumer interface {
-       ConsumeEvents(ctx context.Context, callback ConsumeCallback)
-       Stop() error
-}

Move the common TrendEventConsumer and ConsumeCallback types out of this file.

  • Put those types in domain/gateway/event_consumer.go:
package gateway

import "context"

type ConsumeCallback func(key, value []byte) error

// EventConsumer consumes all kinds of events
type EventConsumer interface {
	ConsumeEvents(ctx context.Context, callback ConsumeCallback)
	Stop() error
}
  • Rename infrastructure/mq/kafka.go to infrastructure/mq/kafka_producer.go.
  • Move service/trend/infrastructure/mq/kafka.go to infrastructure/mq/kafka_consumer.go.
func NewKafkaConsumer(brokers []string, topic, groupID string) (*KafkaConsumer, error) {

Add a new parameter named groupID for the consumer group.

  • Add config items in service/trend/infrastructure/config/config.go:
@@ -26,4 +26,5 @@ type CacheConfig struct {
 type MQConfig struct {
        Brokers []string `json:"brokers" yaml:"brokers"`
        Topic   string   `json:"topic" yaml:"topic"`
+       GroupID string   `json:"group_id" yaml:"group_id"`
 }
  • Put values in service/trend/config.yml:
@@ -7,4 +7,5 @@ cache:
 mq:
   brokers:
     - localhost:9094
   topic: "lr-book-searches"
+  group_id: "trend-svr"
  • Tune import pathes in service/trend/application/consumer/trend.go:
@@ -7,16 +7,17 @@ import (
        "context"
        "encoding/json"
 
+       topgw "literank.com/event-books/domain/gateway"
        "literank.com/event-books/domain/model"
        "literank.com/event-books/service/trend/domain/gateway"
 )
 
 type TrendConsumer struct {
        trendManager  gateway.TrendManager
-       eventConsumer gateway.TrendEventConsumer
+       eventConsumer topgw.EventConsumer
 }
 
-func NewTrendConsumer(t gateway.TrendManager, e gateway.TrendEventConsumer) *TrendConsumer {
+func NewTrendConsumer(t gateway.TrendManager, e topgw.EventConsumer) *TrendConsumer {
        return &TrendConsumer{trendManager: t, eventConsumer: e}
 }
 
@@ -33,6 +34,6 @@ func (c *TrendConsumer) Start(ctx context.Context) {
        })
 }
 
-func (c *TrendConsumer) EventConsumer() gateway.TrendEventConsumer {
+func (c *TrendConsumer) EventConsumer() topgw.EventConsumer {
        return c.eventConsumer
 }
  • Tune import paths in service/trend/application/wire_helper.go:
@@ -4,10 +4,11 @@ Package application provides all common structures and functions of the applicat
 package application
 
 import (
+       topgw "literank.com/event-books/domain/gateway"
+       "literank.com/event-books/infrastructure/mq"
        "literank.com/event-books/service/trend/domain/gateway"
        "literank.com/event-books/service/trend/infrastructure/cache"
        "literank.com/event-books/service/trend/infrastructure/config"
-       "literank.com/event-books/service/trend/infrastructure/mq"
 )
 
 // WireHelper is the helper for dependency injection
@@ -19,7 +20,7 @@ type WireHelper struct {
 // NewWireHelper constructs a new WireHelper
 func NewWireHelper(c *config.Config) (*WireHelper, error) {
        kv := cache.NewRedisCache(c.Cache.Address, c.Cache.Password, c.Cache.DB)
-       consumer, err := mq.NewKafkaConsumer(c.MQ.Brokers, c.MQ.Topic)
+       consumer, err := mq.NewKafkaConsumer(c.MQ.Brokers, c.MQ.Topic, c.MQ.GroupID)
        if err != nil {
                return nil, err
        }
@@ -35,6 +36,6 @@ func (w *WireHelper) TrendManager() gateway.TrendManager {
 }
 
 // TrendEventConsumer returns an instance of TrendEventConsumer
-func (w *WireHelper) TrendEventConsumer() gateway.TrendEventConsumer {
+func (w *WireHelper) TrendEventConsumer() topgw.EventConsumer {
        return w.consumer
 }

Tune the producer‘s events in the web service

Add the userID parameter in service/web/application/executor/book_operator.go:

@@ -36,18 +36,19 @@ func (o *BookOperator) CreateBook(ctx context.Context, b *model.Book) (*model.Bo
 }
 
 // GetBooks gets a list of books by offset and keyword, and caches its result if needed
-func (o *BookOperator) GetBooks(ctx context.Context, offset int, query string) ([]*model.Book, error) {
+func (o *BookOperator) GetBooks(ctx context.Context, offset int, userID, query string) ([]*model.Book, error) {
        books, err := o.bookManager.GetBooks(ctx, offset, query)
        if err != nil {
                return nil, err
        }
-       // Send search query and its results
+       // Send a user's search query and its results
        if query != "" {
+               k := query + ":" + userID
                jsonData, err := json.Marshal(books)
                if err != nil {
                        return nil, fmt.Errorf("failed to send event due to %w", err)
                }
-               o.mqHelper.SendEvent(query, jsonData)
+               o.mqHelper.SendEvent(k, jsonData)
        }
        return books, nil
 }

Read the cookie value and pass in as the userID in service/web/adapter/router.go:

@@ -6,8 +6,10 @@ package adapter
 import (
        "fmt"
        "log"
+       "math/rand"
        "net/http"
        "strconv"
+       "time"
 
        "github.com/gin-gonic/gin"
 
@@ -20,6 +22,7 @@ import (
 const (
        fieldOffset = "o"
        fieldQuery  = "q"
+       fieldUID    = "uid"
 )
 
 // RestHandler handles all restful requests
@@ -54,8 +57,14 @@ func MakeRouter(templates_pattern string, remote *config.RemoteServiceConfig, wi
 
 // Render and show the index page
 func (r *RestHandler) indexPage(c *gin.Context) {
+       userID, err := c.Cookie(fieldUID)
+       if err != nil {
+               // Doesn't exist, make a new one
+               userID = randomString(5)
+               c.SetCookie(fieldUID, userID, 3600*24*30, "/", "", false, false)
+       }
        q := c.Query(fieldQuery)
-       books, err := r.bookOperator.GetBooks(c, 0, q)
+       books, err := r.bookOperator.GetBooks(c, 0, userID, q)
        if err != nil {
                c.String(http.StatusNotFound, "failed to get books")
                return
@@ -87,7 +96,7 @@ func (r *RestHandler) getBooks(c *gin.Context) {
                }
                offset = value
        }
-       books, err := r.bookOperator.GetBooks(c, offset, c.Query(fieldQuery))
+       books, err := r.bookOperator.GetBooks(c, offset, "", c.Query(fieldQuery))
        if err != nil {
                fmt.Printf("Failed to get books: %v\n", err)
                c.JSON(http.StatusNotFound, gin.H{"error": "failed to get books"})
@@ -112,3 +121,14 @@ func (r *RestHandler) createBook(c *gin.Context) {
        }
        c.JSON(http.StatusCreated, book)
 }
+
+func randomString(length int) string {
+       source := rand.NewSource(time.Now().UnixNano())
+       random := rand.New(source)
+       const charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+       result := make([]byte, length)
+       for i := range result {
+               result[i] = charset[random.Intn(len(charset))]
+       }
+       return string(result)
+}

Tune the consumer in the trend service

Since the key‘s format has changed, we need to adapt to that in the trend service.

Update service/trend/application/consumer/trend.go:

@@ -6,6 +6,7 @@ package consumer
 import (
        "context"
        "encoding/json"
+       "strings"
 
        topgw "literank.com/event-books/domain/gateway"
        "literank.com/event-books/domain/model"
@@ -23,8 +24,10 @@ func NewTrendConsumer(t gateway.TrendManager, e topgw.EventConsumer) *TrendConsu
 
 func (c *TrendConsumer) Start(ctx context.Context) {
        c.eventConsumer.ConsumeEvents(ctx, func(key, data []byte) error {
+               parts := strings.Split(string(key), ":")
+               query := parts[0]
                t := &model.Trend{
-                       Query: string(key),
+                       Query: query,
                }
                if err := json.Unmarshal(data, &t.Books); err != nil {
                        return err

Recommendation Service‘s consumer

Add service/recommendation/domain/model/interest.go:

package model

// Interest represents a user's interest in a book.
type Interest struct {
	UserID string  `json:"user_id" bson:"user_id"`
	Title  string  `json:"title"`
	Author string  `json:"author"`
	Score  float32 `json:"score"`
}

Add service/recommendation/domain/gateway/interest_manager.go:

/*
Package gateway contains all domain gateways.
*/
package gateway

import (
	"context"

	"literank.com/event-books/service/recommendation/domain/model"
)

// InterestManager manages all interests
type InterestManager interface {
	IncreaseInterest(ctx context.Context, i *model.Interest) error
	ListInterests(ctx context.Context, userID string) ([]*model.Interest, error)
}

Install mongo dependencies:

go get -u go.mongodb.org/mongo-driver/mongo

Prepare the mongoDB database:

  • Install mongoDB on your machine and start it.
  • Create a database named lr_event_rec.
  • Create necessary indexes for your collections.

Implement the InterestManager in service/recommendation/infrastructure/database/mongo.go:

/*
Package database does all db persistence implementations.
*/
package database

import (
	"context"

	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"

	"literank.com/event-books/service/recommendation/domain/model"
)

const (
	collReview = "interests"
)

// MongoPersistence runs all mongoDB operations
type MongoPersistence struct {
	db       *mongo.Database
	coll     *mongo.Collection
	pageSize int
}

// NewMongoPersistence constructs a new MongoPersistence
func NewMongoPersistence(mongoURI, dbName string, pageSize int) (*MongoPersistence, error) {
	client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(mongoURI))
	if err != nil {
		return nil, err
	}
	db := client.Database(dbName)
	coll := db.Collection(collReview)
	return &MongoPersistence{db, coll, pageSize}, nil
}

// GetReview gets a review by ID
func (m *MongoPersistence) IncreaseInterest(ctx context.Context, i *model.Interest) error {
	filter := bson.M{
		"user_id": i.UserID,
		"title":   i.Title,
		"author":  i.Author,
	}
	update := bson.M{"$inc": bson.M{"score": 1}}
	opts := options.Update().SetUpsert(true)

	if _, err := m.coll.UpdateOne(ctx, filter, update, opts); err != nil {
		return err
	}
	return nil
}

// ListInterests lists user interests by a use id
func (m *MongoPersistence) ListInterests(ctx context.Context, userID string) ([]*model.Interest, error) {
	filter := bson.M{"user_id": userID}

	opts := options.Find()
	opts.SetSort(bson.M{"score": -1})
	opts.SetLimit(int64(m.pageSize))

	cursor, err := m.coll.Find(ctx, filter, opts)
	if err != nil {
		return nil, err
	}
	defer cursor.Close(ctx)

	interests := make([]*model.Interest, 0)
	if err := cursor.All(ctx, &interests); err != nil {
		return nil, err
	}
	return interests, nil
}

Add service/recommendation/infrastructure/config/config.go:

/*
Package config provides config structures and parse funcs.
*/
package config

// Config is the global configuration.
type Config struct {
	App ApplicationConfig `json:"app" yaml:"app"`
	DB  DBConfig          `json:"db" yaml:"db"`
	MQ  MQConfig          `json:"mq" yaml:"mq"`
}

// DBConfig is the configuration of databases.
type DBConfig struct {
	MongoURI    string `json:"mongo_uri" yaml:"mongo_uri"`
	MongoDBName string `json:"mongo_db_name" yaml:"mongo_db_name"`
}

// ApplicationConfig is the configuration of main app.
type ApplicationConfig struct {
	Port     int `json:"port" yaml:"port"`
	PageSize int `json:"page_size" yaml:"page_size"`
}

// MQConfig is the configuration of message queues.
type MQConfig struct {
	Brokers []string `json:"brokers" yaml:"brokers"`
	Topic   string   `json:"topic" yaml:"topic"`
	GroupID string   `json:"group_id" yaml:"group_id"`
}

Add service/recommendation/config.yml:

app:
  port: 8082
  page_size: 10
db:
  mongo_uri: "mongodb://localhost:27017"
  mongo_db_name: "lr_event_rec"
mq:
  brokers:
    - localhost:9094
  topic: "lr-book-searches"
  group_id: "rec-svr"

Add service/recommendation/application/consumer/interest.go:

/*
Package consumer handles event-trigger style business logic.
*/
package consumer

import (
	"context"
	"encoding/json"
	"strings"

	topgw "literank.com/event-books/domain/gateway"
	topmodel "literank.com/event-books/domain/model"
	"literank.com/event-books/service/recommendation/domain/gateway"
	"literank.com/event-books/service/recommendation/domain/model"
)

type InterestConsumer struct {
	interestManager gateway.InterestManager
	eventConsumer   topgw.EventConsumer
}

func NewInterestConsumer(t gateway.InterestManager, e topgw.EventConsumer) *InterestConsumer {
	return &InterestConsumer{interestManager: t, eventConsumer: e}
}

func (c *InterestConsumer) Start(ctx context.Context) {
	c.eventConsumer.ConsumeEvents(ctx, func(key, data []byte) error {
		parts := strings.Split(string(key), ":")
		if len(parts) == 1 {
			// No userID, ignore it
			return nil
		}

		var books []*topmodel.Book
		if err := json.Unmarshal(data, &books); err != nil {
			return err
		}
		userID := parts[1]
		for _, book := range books {
			i := &model.Interest{
				UserID: userID,
				Title:  book.Title,
				Author: book.Author,
			}
			if err := c.interestManager.IncreaseInterest(ctx, i); err != nil {
				return err
			}
		}
		return nil
	})
}

func (c *InterestConsumer) EventConsumer() topgw.EventConsumer {
	return c.eventConsumer
}

Add service/recommendation/application/wire_helper.go:

/*
Package application provides all common structures and functions of the application layer.
*/
package application

import (
	topgw "literank.com/event-books/domain/gateway"
	"literank.com/event-books/infrastructure/mq"
	"literank.com/event-books/service/recommendation/domain/gateway"
	"literank.com/event-books/service/recommendation/infrastructure/config"
	"literank.com/event-books/service/recommendation/infrastructure/database"
)

// WireHelper is the helper for dependency injection
type WireHelper struct {
	noSQLPersistence *database.MongoPersistence
	consumer         *mq.KafkaConsumer
}

// NewWireHelper constructs a new WireHelper
func NewWireHelper(c *config.Config) (*WireHelper, error) {
	mdb, err := database.NewMongoPersistence(c.DB.MongoURI, c.DB.MongoDBName, c.App.PageSize)
	if err != nil {
		return nil, err
	}
	consumer, err := mq.NewKafkaConsumer(c.MQ.Brokers, c.MQ.Topic, c.MQ.GroupID)
	if err != nil {
		return nil, err
	}
	return &WireHelper{
		noSQLPersistence: mdb, consumer: consumer,
	}, nil
}

// InterestManager returns an instance of InterestManager
func (w *WireHelper) InterestManager() gateway.InterestManager {
	return w.noSQLPersistence
}

// TrendEventConsumer returns an instance of TrendEventConsumer
func (w *WireHelper) TrendEventConsumer() topgw.EventConsumer {
	return w.consumer
}

Add service/recommendation/main.go:

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"

	"literank.com/event-books/infrastructure/parser"
	"literank.com/event-books/service/recommendation/application"
	"literank.com/event-books/service/recommendation/application/consumer"
	"literank.com/event-books/service/recommendation/infrastructure/config"
)

const configFileName = "config.yml"

func main() {
	// Read the config
	c, err := parser.Parse[config.Config](configFileName)
	if err != nil {
		panic(err)
	}

	// Prepare dependencies
	wireHelper, err := application.NewWireHelper(c)
	if err != nil {
		panic(err)
	}

	// Run the consumer
	tc := consumer.NewInterestConsumer(wireHelper.InterestManager(), wireHelper.TrendEventConsumer())
	eventConsumer := tc.EventConsumer()
	go func() {
		// Shutdown signals
		stopAll := make(chan os.Signal, 1)
		signal.Notify(stopAll, syscall.SIGINT, syscall.SIGTERM)
		<-stopAll
		if err := eventConsumer.Stop(); err != nil {
			log.Panicf("Failed to close consumer group: %v", err)
		}
	}()
	tc.Start(context.Background())
}

Start the recommendation consumer:

# in service/recommendation
go run main.go

You should see something similar to this:

2024/04/05 23:56:59 Started to consume events...

From another terminal, restart your web service and try searching with terms like “love“ and “war“ a couple of times.

Then, you will have records like below in your mongoDB:

lr_event_rec> db.interests.find();
[
  {
    _id: ObjectId('66101a027a7027627b271269'),
    author: 'Jane Austen',
    title: 'Pride and Prejudice',
    user_id: 'YVM2P',
    score: 1
  },
  {
    _id: ObjectId('66101a027a7027627b27126b'),
    author: 'Leo Tolstoy',
    title: 'War and Peace',
    user_id: 'YVM2P',
    score: 2
  },
  {
    _id: ObjectId('66101a437a7027627b271299'),
    author: 'Homer',
    title: 'The Odyssey',
    user_id: 'YVM2P',
    score: 1
  }
]

That means your consumer is doing well.

PrevNext