Event Consumer
Since the recommendation service‘s consumer has so many things in common with the trend service‘s counterpart, let‘s try to reuse them by moving those common parts into the higher level folders.
Restructure files
- Tune service/trend/domain/gateway/trend_manager.go:
@@ -9,16 +9,8 @@ import (
"literank.com/event-books/domain/model"
)
-type ConsumeCallback func(key, value []byte) error
-
// TrendManager manages all trends
type TrendManager interface {
CreateTrend(ctx context.Context, t *model.Trend) (uint, error)
TopTrends(ctx context.Context, pageSize uint) ([]*model.Trend, error)
}
-
-// TrendEventConsumer consumes trend events
-type TrendEventConsumer interface {
- ConsumeEvents(ctx context.Context, callback ConsumeCallback)
- Stop() error
-}
Move the common TrendEventConsumer
and ConsumeCallback
types out of this file.
- Put those types in domain/gateway/event_consumer.go:
package gateway
import "context"
type ConsumeCallback func(key, value []byte) error
// EventConsumer consumes all kinds of events
type EventConsumer interface {
ConsumeEvents(ctx context.Context, callback ConsumeCallback)
Stop() error
}
- Rename infrastructure/mq/kafka.go to infrastructure/mq/kafka_producer.go.
- Move service/trend/infrastructure/mq/kafka.go to infrastructure/mq/kafka_consumer.go.
func NewKafkaConsumer(brokers []string, topic, groupID string) (*KafkaConsumer, error) {
Add a new parameter named groupID
for the consumer group.
- Add config items in service/trend/infrastructure/config/config.go:
@@ -26,4 +26,5 @@ type CacheConfig struct {
type MQConfig struct {
Brokers []string `json:"brokers" yaml:"brokers"`
Topic string `json:"topic" yaml:"topic"`
+ GroupID string `json:"group_id" yaml:"group_id"`
}
- Put values in service/trend/config.yml:
@@ -7,4 +7,5 @@ cache:
mq:
brokers:
- localhost:9094
topic: "lr-book-searches"
+ group_id: "trend-svr"
- Tune import pathes in service/trend/application/consumer/trend.go:
@@ -7,16 +7,17 @@ import (
"context"
"encoding/json"
+ topgw "literank.com/event-books/domain/gateway"
"literank.com/event-books/domain/model"
"literank.com/event-books/service/trend/domain/gateway"
)
type TrendConsumer struct {
trendManager gateway.TrendManager
- eventConsumer gateway.TrendEventConsumer
+ eventConsumer topgw.EventConsumer
}
-func NewTrendConsumer(t gateway.TrendManager, e gateway.TrendEventConsumer) *TrendConsumer {
+func NewTrendConsumer(t gateway.TrendManager, e topgw.EventConsumer) *TrendConsumer {
return &TrendConsumer{trendManager: t, eventConsumer: e}
}
@@ -33,6 +34,6 @@ func (c *TrendConsumer) Start(ctx context.Context) {
})
}
-func (c *TrendConsumer) EventConsumer() gateway.TrendEventConsumer {
+func (c *TrendConsumer) EventConsumer() topgw.EventConsumer {
return c.eventConsumer
}
- Tune import paths in service/trend/application/wire_helper.go:
@@ -4,10 +4,11 @@ Package application provides all common structures and functions of the applicat
package application
import (
+ topgw "literank.com/event-books/domain/gateway"
+ "literank.com/event-books/infrastructure/mq"
"literank.com/event-books/service/trend/domain/gateway"
"literank.com/event-books/service/trend/infrastructure/cache"
"literank.com/event-books/service/trend/infrastructure/config"
- "literank.com/event-books/service/trend/infrastructure/mq"
)
// WireHelper is the helper for dependency injection
@@ -19,7 +20,7 @@ type WireHelper struct {
// NewWireHelper constructs a new WireHelper
func NewWireHelper(c *config.Config) (*WireHelper, error) {
kv := cache.NewRedisCache(c.Cache.Address, c.Cache.Password, c.Cache.DB)
- consumer, err := mq.NewKafkaConsumer(c.MQ.Brokers, c.MQ.Topic)
+ consumer, err := mq.NewKafkaConsumer(c.MQ.Brokers, c.MQ.Topic, c.MQ.GroupID)
if err != nil {
return nil, err
}
@@ -35,6 +36,6 @@ func (w *WireHelper) TrendManager() gateway.TrendManager {
}
// TrendEventConsumer returns an instance of TrendEventConsumer
-func (w *WireHelper) TrendEventConsumer() gateway.TrendEventConsumer {
+func (w *WireHelper) TrendEventConsumer() topgw.EventConsumer {
return w.consumer
}
Tune the producer‘s events in the web service
Add the userID
parameter in service/web/application/executor/book_operator.go:
@@ -36,18 +36,19 @@ func (o *BookOperator) CreateBook(ctx context.Context, b *model.Book) (*model.Bo
}
// GetBooks gets a list of books by offset and keyword, and caches its result if needed
-func (o *BookOperator) GetBooks(ctx context.Context, offset int, query string) ([]*model.Book, error) {
+func (o *BookOperator) GetBooks(ctx context.Context, offset int, userID, query string) ([]*model.Book, error) {
books, err := o.bookManager.GetBooks(ctx, offset, query)
if err != nil {
return nil, err
}
- // Send search query and its results
+ // Send a user's search query and its results
if query != "" {
+ k := query + ":" + userID
jsonData, err := json.Marshal(books)
if err != nil {
return nil, fmt.Errorf("failed to send event due to %w", err)
}
- o.mqHelper.SendEvent(query, jsonData)
+ o.mqHelper.SendEvent(k, jsonData)
}
return books, nil
}
Read the cookie value and pass in as the userID
in service/web/adapter/router.go:
@@ -6,8 +6,10 @@ package adapter
import (
"fmt"
"log"
+ "math/rand"
"net/http"
"strconv"
+ "time"
"github.com/gin-gonic/gin"
@@ -20,6 +22,7 @@ import (
const (
fieldOffset = "o"
fieldQuery = "q"
+ fieldUID = "uid"
)
// RestHandler handles all restful requests
@@ -54,8 +57,14 @@ func MakeRouter(templates_pattern string, remote *config.RemoteServiceConfig, wi
// Render and show the index page
func (r *RestHandler) indexPage(c *gin.Context) {
+ userID, err := c.Cookie(fieldUID)
+ if err != nil {
+ // Doesn't exist, make a new one
+ userID = randomString(5)
+ c.SetCookie(fieldUID, userID, 3600*24*30, "/", "", false, false)
+ }
q := c.Query(fieldQuery)
- books, err := r.bookOperator.GetBooks(c, 0, q)
+ books, err := r.bookOperator.GetBooks(c, 0, userID, q)
if err != nil {
c.String(http.StatusNotFound, "failed to get books")
return
@@ -87,7 +96,7 @@ func (r *RestHandler) getBooks(c *gin.Context) {
}
offset = value
}
- books, err := r.bookOperator.GetBooks(c, offset, c.Query(fieldQuery))
+ books, err := r.bookOperator.GetBooks(c, offset, "", c.Query(fieldQuery))
if err != nil {
fmt.Printf("Failed to get books: %v\n", err)
c.JSON(http.StatusNotFound, gin.H{"error": "failed to get books"})
@@ -112,3 +121,14 @@ func (r *RestHandler) createBook(c *gin.Context) {
}
c.JSON(http.StatusCreated, book)
}
+
+func randomString(length int) string {
+ source := rand.NewSource(time.Now().UnixNano())
+ random := rand.New(source)
+ const charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+ result := make([]byte, length)
+ for i := range result {
+ result[i] = charset[random.Intn(len(charset))]
+ }
+ return string(result)
+}
Tune the consumer in the trend service
Since the key
‘s format has changed, we need to adapt to that in the trend service.
Update service/trend/application/consumer/trend.go:
@@ -6,6 +6,7 @@ package consumer
import (
"context"
"encoding/json"
+ "strings"
topgw "literank.com/event-books/domain/gateway"
"literank.com/event-books/domain/model"
@@ -23,8 +24,10 @@ func NewTrendConsumer(t gateway.TrendManager, e topgw.EventConsumer) *TrendConsu
func (c *TrendConsumer) Start(ctx context.Context) {
c.eventConsumer.ConsumeEvents(ctx, func(key, data []byte) error {
+ parts := strings.Split(string(key), ":")
+ query := parts[0]
t := &model.Trend{
- Query: string(key),
+ Query: query,
}
if err := json.Unmarshal(data, &t.Books); err != nil {
return err
Recommendation Service‘s consumer
Add service/recommendation/domain/model/interest.go:
package model
// Interest represents a user's interest in a book.
type Interest struct {
UserID string `json:"user_id" bson:"user_id"`
Title string `json:"title"`
Author string `json:"author"`
Score float32 `json:"score"`
}
Add service/recommendation/domain/gateway/interest_manager.go:
/*
Package gateway contains all domain gateways.
*/
package gateway
import (
"context"
"literank.com/event-books/service/recommendation/domain/model"
)
// InterestManager manages all interests
type InterestManager interface {
IncreaseInterest(ctx context.Context, i *model.Interest) error
ListInterests(ctx context.Context, userID string) ([]*model.Interest, error)
}
Install mongo
dependencies:
go get -u go.mongodb.org/mongo-driver/mongo
Prepare the mongoDB database:
- Install mongoDB on your machine and start it.
- Create a database named
lr_event_rec
.- Create necessary indexes for your collections.
Implement the InterestManager
in service/recommendation/infrastructure/database/mongo.go:
/*
Package database does all db persistence implementations.
*/
package database
import (
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"literank.com/event-books/service/recommendation/domain/model"
)
const (
collReview = "interests"
)
// MongoPersistence runs all mongoDB operations
type MongoPersistence struct {
db *mongo.Database
coll *mongo.Collection
pageSize int
}
// NewMongoPersistence constructs a new MongoPersistence
func NewMongoPersistence(mongoURI, dbName string, pageSize int) (*MongoPersistence, error) {
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(mongoURI))
if err != nil {
return nil, err
}
db := client.Database(dbName)
coll := db.Collection(collReview)
return &MongoPersistence{db, coll, pageSize}, nil
}
// GetReview gets a review by ID
func (m *MongoPersistence) IncreaseInterest(ctx context.Context, i *model.Interest) error {
filter := bson.M{
"user_id": i.UserID,
"title": i.Title,
"author": i.Author,
}
update := bson.M{"$inc": bson.M{"score": 1}}
opts := options.Update().SetUpsert(true)
if _, err := m.coll.UpdateOne(ctx, filter, update, opts); err != nil {
return err
}
return nil
}
// ListInterests lists user interests by a use id
func (m *MongoPersistence) ListInterests(ctx context.Context, userID string) ([]*model.Interest, error) {
filter := bson.M{"user_id": userID}
opts := options.Find()
opts.SetSort(bson.M{"score": -1})
opts.SetLimit(int64(m.pageSize))
cursor, err := m.coll.Find(ctx, filter, opts)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
interests := make([]*model.Interest, 0)
if err := cursor.All(ctx, &interests); err != nil {
return nil, err
}
return interests, nil
}
Add service/recommendation/infrastructure/config/config.go:
/*
Package config provides config structures and parse funcs.
*/
package config
// Config is the global configuration.
type Config struct {
App ApplicationConfig `json:"app" yaml:"app"`
DB DBConfig `json:"db" yaml:"db"`
MQ MQConfig `json:"mq" yaml:"mq"`
}
// DBConfig is the configuration of databases.
type DBConfig struct {
MongoURI string `json:"mongo_uri" yaml:"mongo_uri"`
MongoDBName string `json:"mongo_db_name" yaml:"mongo_db_name"`
}
// ApplicationConfig is the configuration of main app.
type ApplicationConfig struct {
Port int `json:"port" yaml:"port"`
PageSize int `json:"page_size" yaml:"page_size"`
}
// MQConfig is the configuration of message queues.
type MQConfig struct {
Brokers []string `json:"brokers" yaml:"brokers"`
Topic string `json:"topic" yaml:"topic"`
GroupID string `json:"group_id" yaml:"group_id"`
}
Add service/recommendation/config.yml:
app:
port: 8082
page_size: 10
db:
mongo_uri: "mongodb://localhost:27017"
mongo_db_name: "lr_event_rec"
mq:
brokers:
- localhost:9094
topic: "lr-book-searches"
group_id: "rec-svr"
Add service/recommendation/application/consumer/interest.go:
/*
Package consumer handles event-trigger style business logic.
*/
package consumer
import (
"context"
"encoding/json"
"strings"
topgw "literank.com/event-books/domain/gateway"
topmodel "literank.com/event-books/domain/model"
"literank.com/event-books/service/recommendation/domain/gateway"
"literank.com/event-books/service/recommendation/domain/model"
)
type InterestConsumer struct {
interestManager gateway.InterestManager
eventConsumer topgw.EventConsumer
}
func NewInterestConsumer(t gateway.InterestManager, e topgw.EventConsumer) *InterestConsumer {
return &InterestConsumer{interestManager: t, eventConsumer: e}
}
func (c *InterestConsumer) Start(ctx context.Context) {
c.eventConsumer.ConsumeEvents(ctx, func(key, data []byte) error {
parts := strings.Split(string(key), ":")
if len(parts) == 1 {
// No userID, ignore it
return nil
}
var books []*topmodel.Book
if err := json.Unmarshal(data, &books); err != nil {
return err
}
userID := parts[1]
for _, book := range books {
i := &model.Interest{
UserID: userID,
Title: book.Title,
Author: book.Author,
}
if err := c.interestManager.IncreaseInterest(ctx, i); err != nil {
return err
}
}
return nil
})
}
func (c *InterestConsumer) EventConsumer() topgw.EventConsumer {
return c.eventConsumer
}
Add service/recommendation/application/wire_helper.go:
/*
Package application provides all common structures and functions of the application layer.
*/
package application
import (
topgw "literank.com/event-books/domain/gateway"
"literank.com/event-books/infrastructure/mq"
"literank.com/event-books/service/recommendation/domain/gateway"
"literank.com/event-books/service/recommendation/infrastructure/config"
"literank.com/event-books/service/recommendation/infrastructure/database"
)
// WireHelper is the helper for dependency injection
type WireHelper struct {
noSQLPersistence *database.MongoPersistence
consumer *mq.KafkaConsumer
}
// NewWireHelper constructs a new WireHelper
func NewWireHelper(c *config.Config) (*WireHelper, error) {
mdb, err := database.NewMongoPersistence(c.DB.MongoURI, c.DB.MongoDBName, c.App.PageSize)
if err != nil {
return nil, err
}
consumer, err := mq.NewKafkaConsumer(c.MQ.Brokers, c.MQ.Topic, c.MQ.GroupID)
if err != nil {
return nil, err
}
return &WireHelper{
noSQLPersistence: mdb, consumer: consumer,
}, nil
}
// InterestManager returns an instance of InterestManager
func (w *WireHelper) InterestManager() gateway.InterestManager {
return w.noSQLPersistence
}
// TrendEventConsumer returns an instance of TrendEventConsumer
func (w *WireHelper) TrendEventConsumer() topgw.EventConsumer {
return w.consumer
}
Add service/recommendation/main.go:
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"literank.com/event-books/infrastructure/parser"
"literank.com/event-books/service/recommendation/application"
"literank.com/event-books/service/recommendation/application/consumer"
"literank.com/event-books/service/recommendation/infrastructure/config"
)
const configFileName = "config.yml"
func main() {
// Read the config
c, err := parser.Parse[config.Config](configFileName)
if err != nil {
panic(err)
}
// Prepare dependencies
wireHelper, err := application.NewWireHelper(c)
if err != nil {
panic(err)
}
// Run the consumer
tc := consumer.NewInterestConsumer(wireHelper.InterestManager(), wireHelper.TrendEventConsumer())
eventConsumer := tc.EventConsumer()
go func() {
// Shutdown signals
stopAll := make(chan os.Signal, 1)
signal.Notify(stopAll, syscall.SIGINT, syscall.SIGTERM)
<-stopAll
if err := eventConsumer.Stop(); err != nil {
log.Panicf("Failed to close consumer group: %v", err)
}
}()
tc.Start(context.Background())
}
Start the recommendation consumer:
# in service/recommendation
go run main.go
You should see something similar to this:
2024/04/05 23:56:59 Started to consume events...
From another terminal, restart your web service and try searching with terms like “love“ and “war“ a couple of times.
Then, you will have records like below in your mongoDB:
lr_event_rec> db.interests.find();
[
{
_id: ObjectId('66101a027a7027627b271269'),
author: 'Jane Austen',
title: 'Pride and Prejudice',
user_id: 'YVM2P',
score: 1
},
{
_id: ObjectId('66101a027a7027627b27126b'),
author: 'Leo Tolstoy',
title: 'War and Peace',
user_id: 'YVM2P',
score: 2
},
{
_id: ObjectId('66101a437a7027627b271299'),
author: 'Homer',
title: 'The Odyssey',
user_id: 'YVM2P',
score: 1
}
]
That means your consumer is doing well.