Event Consumer
Tune domain/model/trend.go:
@@ -1,11 +1,8 @@
package model
-import "time"
-
// Trend represents the structure of a
// trendy query and its related books.
type Trend struct {
- Query string `json:"query"`
- Books []Book `json:"books"`
- CreatedAt time.Time `json:"created_at"`
+ Query string `json:"query"`
+ Books []Book `json:"books"`
}
We‘ll use the timestamp from the kafka event for time tracking, so let‘s remove this field.
Tune service/trend/domain/gateway/trend_manager.go:
@@ -9,8 +9,16 @@ import (
"literank.com/event-books/domain/model"
)
+type ConsumeCallback func(key, value []byte) error
+
// TrendManager manages all trends
type TrendManager interface {
CreateTrend(ctx context.Context, t *model.Trend) (uint, error)
- TopTrends(ctx context.Context, offset int) ([]*model.Trend, error)
+ TopTrends(ctx context.Context, pageSize uint) ([]*model.Trend, error)
+}
+
+// TrendEventConsumer consumes trend events
+type TrendEventConsumer interface {
+ ConsumeEvents(ctx context.Context, callback ConsumeCallback)
+ Stop() error
}
Implement TrendEventConsumer
in service/trend/infrastructure/mq/kafka.go:
/*
Package mq does all message queue jobs.
*/
package mq
import (
"context"
"fmt"
"log"
"github.com/IBM/sarama"
"literank.com/event-books/service/trend/domain/gateway"
)
const (
groupID = "trend-svr"
)
// KafkaConsumer consumers events from the kafka queue
type KafkaConsumer struct {
cg sarama.ConsumerGroup
topic string
}
// NewKafkaConsumer constructs a new KafkaConsumer
func NewKafkaConsumer(brokers []string, topic string) (*KafkaConsumer, error) {
// Create a new consumer configuration
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()
config.Consumer.Offsets.Initial = sarama.OffsetOldest
// Create consumer
fmt.Println(brokers)
consumer, err := sarama.NewConsumerGroup(brokers, groupID, config)
if err != nil {
return nil, err
}
return &KafkaConsumer{consumer, topic}, nil
}
func (k *KafkaConsumer) ConsumeEvents(ctx context.Context, callback gateway.ConsumeCallback) {
consumer := Consumer{callback}
if err := k.cg.Consume(ctx, []string{k.topic}, &consumer); err != nil {
log.Panicf("Failed to start consuming: %v", err)
}
}
func (k *KafkaConsumer) Stop() error {
return k.cg.Close()
}
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
callback gateway.ConsumeCallback
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
log.Println("Started to consume events...")
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29
for {
select {
case message, ok := <-claim.Messages():
if !ok {
log.Printf("message channel was closed")
return nil
}
if err := c.callback(message.Key, message.Value); err != nil {
log.Printf("Failed to handle event from [%s] key = %s, timestamp = %v, value = %s, error: %v", message.Topic, string(message.Key), message.Timestamp, string(message.Value), err)
}
session.MarkMessage(message, "")
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/IBM/sarama/issues/1192
case <-session.Context().Done():
return nil
}
}
}
Tune TrendManager
implementations in service/trend/infrastructure/cache/redis.go:
@@ -9,6 +9,7 @@ import (
"fmt"
"github.com/redis/go-redis/v9"
+
"literank.com/event-books/domain/model"
)
@@ -41,12 +42,13 @@ func (r *RedisCache) CreateTrend(ctx context.Context, t *model.Trend) (uint, err
if err != nil {
if err == redis.Nil {
// Member doesn't exist, add it with initial score of 1
- err := r.c.ZAdd(ctx, trendsKey, redis.Z{Score: 1, Member: member}).Err()
+ err = r.c.ZAdd(ctx, trendsKey, redis.Z{Score: 1, Member: member}).Err()
if err != nil {
return 0, err
}
+ } else {
+ return 0, err
}
- return 0, err
}
score, err := r.c.ZIncrBy(ctx, trendsKey, 1, member).Result()
if err != nil {
@@ -65,8 +67,8 @@ func (r *RedisCache) CreateTrend(ctx context.Context, t *model.Trend) (uint, err
return uint(score), nil
}
-func (r *RedisCache) TopTrends(ctx context.Context, offset int) ([]*model.Trend, error) {
- topItems, err := r.c.ZRevRangeWithScores(ctx, trendsKey, 0, int64(offset)).Result()
+func (r *RedisCache) TopTrends(ctx context.Context, pageSize uint) ([]*model.Trend, error) {
+ topItems, err := r.c.ZRevRangeWithScores(ctx, trendsKey, 0, int64(pageSize)-1).Result()
if err != nil {
return nil, err
}
@@ -81,11 +83,17 @@ func (r *RedisCache) TopTrends(ctx context.Context, offset int) ([]*model.Trend,
}
k := queryKeyPrefix + query
value, err := r.c.Get(ctx, k).Result()
- if err != nil && err != redis.Nil {
- return nil, err
- }
- if err := json.Unmarshal([]byte(value), &t.Books); err != nil {
+ if err != nil {
+ if err == redis.Nil {
+ t.Books = make([]model.Book, 0)
+ trends = append(trends, t)
+ continue
+ }
return nil, err
+ } else {
+ if err := json.Unmarshal([]byte(value), &t.Books); err != nil {
+ return nil, err
+ }
}
trends = append(trends, t)
}
Tune service/trend/application/wire_helper.go:
@@ -7,18 +7,25 @@ import (
"literank.com/event-books/service/trend/domain/gateway"
"literank.com/event-books/service/trend/infrastructure/cache"
"literank.com/event-books/service/trend/infrastructure/config"
+ "literank.com/event-books/service/trend/infrastructure/mq"
)
// WireHelper is the helper for dependency injection
type WireHelper struct {
- kvStore *cache.RedisCache
+ kvStore *cache.RedisCache
+ consumer *mq.KafkaConsumer
}
// NewWireHelper constructs a new WireHelper
func NewWireHelper(c *config.Config) (*WireHelper, error) {
kv := cache.NewRedisCache(c.Cache.Address, c.Cache.Password, c.Cache.DB)
+ consumer, err := mq.NewKafkaConsumer(c.MQ.Brokers, c.MQ.Topic)
+ if err != nil {
+ return nil, err
+ }
return &WireHelper{
- kvStore: kv,
+ kvStore: kv,
+ consumer: consumer,
}, nil
}
@@ -26,3 +33,8 @@ func NewWireHelper(c *config.Config) (*WireHelper, error) {
func (w *WireHelper) TrendManager() gateway.TrendManager {
return w.kvStore
}
+
+// TrendEventConsumer returns an instance of TrendEventConsumer
+func (w *WireHelper) TrendEventConsumer() gateway.TrendEventConsumer {
+ return w.consumer
+}
Add config items in service/trend/infrastructure/config/config.go:
@@ -7,6 +7,7 @@ package config
type Config struct {
App ApplicationConfig `json:"app" yaml:"app"`
Cache CacheConfig `json:"cache" yaml:"cache"`
+ MQ MQConfig `json:"mq" yaml:"mq"`
}
// ApplicationConfig is the configuration of main app.
@@ -20,3 +21,9 @@ type CacheConfig struct {
Password string `json:"password" yaml:"password"`
DB int `json:"db" yaml:"db"`
}
+
+// MQConfig is the configuration of message queues.
+type MQConfig struct {
+ Brokers []string `json:"brokers" yaml:"brokers"`
+ Topic string `json:"topic" yaml:"topic"`
+}
Put in config values in service/web/config.yml:
@@ -3,4 +3,8 @@ app:
cache:
address: localhost:6379
password: test_pass
- db: 0
+ db: 0
+mq:
+ brokers:
+ - localhost:9094
+ topic: "lr-book-searches"
We‘re using port
9094
for external access to the kafka running in a docker container.
See “Accessing Apache Kafka with internal and external clients“ configuration inbitnami/kafka
.
Add service/trend/application/consumer/trend.go:
/*
Package consumer handles event-trigger style business logic.
*/
package consumer
import (
"context"
"encoding/json"
"literank.com/event-books/domain/model"
"literank.com/event-books/service/trend/domain/gateway"
)
type TrendConsumer struct {
trendManager gateway.TrendManager
eventConsumer gateway.TrendEventConsumer
}
func NewTrendConsumer(t gateway.TrendManager, e gateway.TrendEventConsumer) *TrendConsumer {
return &TrendConsumer{trendManager: t, eventConsumer: e}
}
func (c *TrendConsumer) Start(ctx context.Context) {
c.eventConsumer.ConsumeEvents(ctx, func(key, data []byte) error {
t := &model.Trend{
Query: string(key),
}
if err := json.Unmarshal(data, &t.Books); err != nil {
return err
}
_, err := c.trendManager.CreateTrend(ctx, t)
return err
})
}
func (c *TrendConsumer) EventConsumer() gateway.TrendEventConsumer {
return c.eventConsumer
}
Tune service/trend/application/executor/trend_operator.go:
@@ -20,12 +20,7 @@ func NewTrendOperator(t gateway.TrendManager) *TrendOperator {
return &TrendOperator{trendManager: t}
}
-// CreateTrend creates a new trend
-func (o *TrendOperator) CreateTrend(ctx context.Context, t *model.Trend) (uint, error) {
- return o.trendManager.CreateTrend(ctx, t)
-}
-
// TopTrends gets the top trends order by hits in descending order
-func (o *TrendOperator) TopTrends(ctx context.Context, offset int) ([]*model.Trend, error) {
- return o.trendManager.TopTrends(ctx, offset)
+func (o *TrendOperator) TopTrends(ctx context.Context, pageSize uint) ([]*model.Trend, error) {
+ return o.trendManager.TopTrends(ctx, pageSize)
}
Tune service/trend/adapter/router.go:
@@ -14,7 +14,7 @@ import (
)
const (
- fieldOffset = "o"
+ fieldPageSize = "ps"
)
// RestHandler handles all restful requests
@@ -41,19 +41,19 @@ func MakeRouter(wireHelper *application.WireHelper) (*gin.Engine, error) {
// Get all trends
func (r *RestHandler) getTrends(c *gin.Context) {
- offset := 0
- offsetParam := c.Query(fieldOffset)
- if offsetParam != "" {
- value, err := strconv.Atoi(offsetParam)
+ ps := 10
+ psParam := c.Query(fieldPageSize)
+ if psParam != "" {
+ value, err := strconv.Atoi(psParam)
if err != nil {
- c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid offset"})
+ c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid page size"})
return
}
- offset = value
+ ps = value
}
- trends, err := r.trendOperator.TopTrends(c, offset)
+ trends, err := r.trendOperator.TopTrends(c, uint(ps))
if err != nil {
- c.JSON(http.StatusNotFound, gin.H{"error": "failed to get trends"})
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get trends"})
return
}
c.JSON(http.StatusOK, trends)
Modify main.go to allow both the consumer and the router stop gracefully:
@@ -1,11 +1,18 @@
package main
import (
+ "context"
"fmt"
+ "log"
+ "net/http"
+ "os"
+ "os/signal"
+ "syscall"
"literank.com/event-books/infrastructure/parser"
"literank.com/event-books/service/trend/adapter"
"literank.com/event-books/service/trend/application"
+ "literank.com/event-books/service/trend/application/consumer"
"literank.com/event-books/service/trend/infrastructure/config"
)
@@ -24,13 +31,39 @@ func main() {
panic(err)
}
+ // Run the consumer
+ tc := consumer.NewTrendConsumer(wireHelper.TrendManager(), wireHelper.TrendEventConsumer())
+ eventConsumer := tc.EventConsumer()
+ go func() {
+ tc.Start(context.Background())
+ }()
+
// Build main router
r, err := adapter.MakeRouter(wireHelper)
if err != nil {
panic(err)
}
+
+ svr := &http.Server{
+ Addr: fmt.Sprintf(":%d", c.App.Port),
+ Handler: r,
+ }
+
+ // Shutdown signals
+ stopAll := make(chan os.Signal, 1)
+ signal.Notify(stopAll, syscall.SIGINT, syscall.SIGTERM)
+ go func() {
+ <-stopAll
+ if err := eventConsumer.Stop(); err != nil {
+ log.Panicf("Failed to close consumer group: %v", err)
+ }
+ if err := svr.Shutdown(context.Background()); err != nil {
+ log.Panicf("Failed to shutdown Gin server: %v", err)
+ }
+ }()
+
// Run the server on the specified port
- if err := r.Run(fmt.Sprintf(":%d", c.App.Port)); err != nil {
+ if err := svr.ListenAndServe(); err != nil && err != http.ErrServerClosed {
panic(err)
}
}
The kafka consumer and the gin server are basically 2 “thread“s running in the same process. You need to catch the system signals, SIGINT
and SIGTERM
, and manually stop both of them gracefully.
Start both the web service and the trend service. Try searching keywords such as “love“, “war“ and “wealth“ in the web service‘s index page http://localhost:8080/?q=wealth.
Then you should be able to see some results like this from the trend service (http://localhost:8081/trends):
[
{
"query": "love",
"books": [
{
"id": 4,
"title": "Pride and Prejudice",
"author": "Jane Austen",
"published_at": "1813-01-28",
"description": "A classic novel exploring the themes of love, reputation, and social class in Georgian England.",
"created_at": "2024-04-02T21:02:59.314+08:00"
},
{
"id": 10,
"title": "War and Peace",
"author": "Leo Tolstoy",
"published_at": "1869-01-01",
"description": "A novel depicting the Napoleonic era in Russia, exploring themes of love, war, and historical determinism.",
"created_at": "2024-04-02T21:02:59.42+08:00"
}
]
},
{
"query": "war",
"books": [
{
"id": 10,
"title": "War and Peace",
"author": "Leo Tolstoy",
"published_at": "1869-01-01",
"description": "A novel depicting the Napoleonic era in Russia, exploring themes of love, war, and historical determinism.",
"created_at": "2024-04-02T21:02:59.42+08:00"
},
{
"id": 12,
"title": "The Odyssey",
"author": "Homer",
"published_at": "8th Century BC",
"description": "An ancient Greek epic poem attributed to Homer, detailing the journey of Odysseus after the Trojan War.",
"created_at": "2024-04-02T21:02:59.455+08:00"
}
]
},
{
"query": "wealth",
"books": [
{
"id": 1,
"title": "The Great Gatsby",
"author": "F. Scott Fitzgerald",
"published_at": "1925-04-10",
"description": "A novel depicting the opulent lives of wealthy Long Island residents during the Jazz Age.",
"created_at": "2024-04-02T20:36:01.015+08:00"
}
]
}
]