» Go: Building Event-Driven Microservices with Kafka » 3. Consumer: Trend Service » 3.3 Event Consumer

Event Consumer

Tune domain/model/trend.go:

@@ -1,11 +1,8 @@
 package model
 
-import "time"
-
 // Trend represents the structure of a
 // trendy query and its related books.
 type Trend struct {
-       Query     string    `json:"query"`
-       Books     []Book    `json:"books"`
-       CreatedAt time.Time `json:"created_at"`
+       Query string `json:"query"`
+       Books []Book `json:"books"`
 }

We‘ll use the timestamp from the kafka event for time tracking, so let‘s remove this field.

Tune service/trend/domain/gateway/trend_manager.go:

@@ -9,8 +9,16 @@ import (
        "literank.com/event-books/domain/model"
 )
 
+type ConsumeCallback func(key, value []byte) error
+
 // TrendManager manages all trends
 type TrendManager interface {
        CreateTrend(ctx context.Context, t *model.Trend) (uint, error)
-       TopTrends(ctx context.Context, offset int) ([]*model.Trend, error)
+       TopTrends(ctx context.Context, pageSize uint) ([]*model.Trend, error)
+}
+
+// TrendEventConsumer consumes trend events
+type TrendEventConsumer interface {
+       ConsumeEvents(ctx context.Context, callback ConsumeCallback)
+       Stop() error
 }

Implement TrendEventConsumer in service/trend/infrastructure/mq/kafka.go:

/*
Package mq does all message queue jobs.
*/
package mq

import (
	"context"
	"fmt"
	"log"

	"github.com/IBM/sarama"

	"literank.com/event-books/service/trend/domain/gateway"
)

const (
	groupID = "trend-svr"
)

// KafkaConsumer consumers events from the kafka queue
type KafkaConsumer struct {
	cg    sarama.ConsumerGroup
	topic string
}

// NewKafkaConsumer constructs a new KafkaConsumer
func NewKafkaConsumer(brokers []string, topic string) (*KafkaConsumer, error) {
	// Create a new consumer configuration
	config := sarama.NewConfig()
	config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()
	config.Consumer.Offsets.Initial = sarama.OffsetOldest

	// Create consumer
	fmt.Println(brokers)
	consumer, err := sarama.NewConsumerGroup(brokers, groupID, config)
	if err != nil {
		return nil, err
	}
	return &KafkaConsumer{consumer, topic}, nil
}

func (k *KafkaConsumer) ConsumeEvents(ctx context.Context, callback gateway.ConsumeCallback) {

	consumer := Consumer{callback}
	if err := k.cg.Consume(ctx, []string{k.topic}, &consumer); err != nil {
		log.Panicf("Failed to start consuming: %v", err)
	}
}

func (k *KafkaConsumer) Stop() error {
	return k.cg.Close()
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
	callback gateway.ConsumeCallback
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
	log.Println("Started to consume events...")
	return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	// NOTE:
	// Do not move the code below to a goroutine.
	// The `ConsumeClaim` itself is called within a goroutine, see:
	// https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29
	for {
		select {
		case message, ok := <-claim.Messages():
			if !ok {
				log.Printf("message channel was closed")
				return nil
			}
			if err := c.callback(message.Key, message.Value); err != nil {
				log.Printf("Failed to handle event from [%s] key = %s, timestamp = %v, value = %s, error: %v", message.Topic, string(message.Key), message.Timestamp, string(message.Value), err)
			}
			session.MarkMessage(message, "")
		// Should return when `session.Context()` is done.
		// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
		// https://github.com/IBM/sarama/issues/1192
		case <-session.Context().Done():
			return nil
		}
	}
}

Tune TrendManager implementations in service/trend/infrastructure/cache/redis.go:

@@ -9,6 +9,7 @@ import (
        "fmt"
 
        "github.com/redis/go-redis/v9"
+
        "literank.com/event-books/domain/model"
 )
 
@@ -41,12 +42,13 @@ func (r *RedisCache) CreateTrend(ctx context.Context, t *model.Trend) (uint, err
        if err != nil {
                if err == redis.Nil {
                        // Member doesn't exist, add it with initial score of 1
-                       err := r.c.ZAdd(ctx, trendsKey, redis.Z{Score: 1, Member: member}).Err()
+                       err = r.c.ZAdd(ctx, trendsKey, redis.Z{Score: 1, Member: member}).Err()
                        if err != nil {
                                return 0, err
                        }
+               } else {
+                       return 0, err
                }
-               return 0, err
        }
        score, err := r.c.ZIncrBy(ctx, trendsKey, 1, member).Result()
        if err != nil {
@@ -65,8 +67,8 @@ func (r *RedisCache) CreateTrend(ctx context.Context, t *model.Trend) (uint, err
        return uint(score), nil
 }
 
-func (r *RedisCache) TopTrends(ctx context.Context, offset int) ([]*model.Trend, error) {
-       topItems, err := r.c.ZRevRangeWithScores(ctx, trendsKey, 0, int64(offset)).Result()
+func (r *RedisCache) TopTrends(ctx context.Context, pageSize uint) ([]*model.Trend, error) {
+       topItems, err := r.c.ZRevRangeWithScores(ctx, trendsKey, 0, int64(pageSize)-1).Result()
        if err != nil {
                return nil, err
        }
@@ -81,11 +83,17 @@ func (r *RedisCache) TopTrends(ctx context.Context, offset int) ([]*model.Trend,
                }
                k := queryKeyPrefix + query
                value, err := r.c.Get(ctx, k).Result()
-               if err != nil && err != redis.Nil {
-                       return nil, err
-               }
-               if err := json.Unmarshal([]byte(value), &t.Books); err != nil {
+               if err != nil {
+                       if err == redis.Nil {
+                               t.Books = make([]model.Book, 0)
+                               trends = append(trends, t)
+                               continue
+                       }
                        return nil, err
+               } else {
+                       if err := json.Unmarshal([]byte(value), &t.Books); err != nil {
+                               return nil, err
+                       }
                }
                trends = append(trends, t)
        }

Tune service/trend/application/wire_helper.go:

@@ -7,18 +7,25 @@ import (
        "literank.com/event-books/service/trend/domain/gateway"
        "literank.com/event-books/service/trend/infrastructure/cache"
        "literank.com/event-books/service/trend/infrastructure/config"
+       "literank.com/event-books/service/trend/infrastructure/mq"
 )
 
 // WireHelper is the helper for dependency injection
 type WireHelper struct {
-       kvStore *cache.RedisCache
+       kvStore  *cache.RedisCache
+       consumer *mq.KafkaConsumer
 }
 
 // NewWireHelper constructs a new WireHelper
 func NewWireHelper(c *config.Config) (*WireHelper, error) {
        kv := cache.NewRedisCache(c.Cache.Address, c.Cache.Password, c.Cache.DB)
+       consumer, err := mq.NewKafkaConsumer(c.MQ.Brokers, c.MQ.Topic)
+       if err != nil {
+               return nil, err
+       }
        return &WireHelper{
-               kvStore: kv,
+               kvStore:  kv,
+               consumer: consumer,
        }, nil
 }
 
@@ -26,3 +33,8 @@ func NewWireHelper(c *config.Config) (*WireHelper, error) {
 func (w *WireHelper) TrendManager() gateway.TrendManager {
        return w.kvStore
 }
+
+// TrendEventConsumer returns an instance of TrendEventConsumer
+func (w *WireHelper) TrendEventConsumer() gateway.TrendEventConsumer {
+       return w.consumer
+}

Add config items in service/trend/infrastructure/config/config.go:

@@ -7,6 +7,7 @@ package config
 type Config struct {
        App   ApplicationConfig `json:"app" yaml:"app"`
        Cache CacheConfig       `json:"cache" yaml:"cache"`
+       MQ    MQConfig          `json:"mq" yaml:"mq"`
 }
 
 // ApplicationConfig is the configuration of main app.
@@ -20,3 +21,9 @@ type CacheConfig struct {
        Password string `json:"password" yaml:"password"`
        DB       int    `json:"db" yaml:"db"`
 }
+
+// MQConfig is the configuration of message queues.
+type MQConfig struct {
+       Brokers []string `json:"brokers" yaml:"brokers"`
+       Topic   string   `json:"topic" yaml:"topic"`
+}

Put in config values in service/web/config.yml:

@@ -3,4 +3,8 @@ app:
 cache:
   address: localhost:6379
   password: test_pass
-  db: 0
+  db: 0
+mq:
+  brokers:
+    - localhost:9094
+  topic: "lr-book-searches"

We‘re using port 9094 for external access to the kafka running in a docker container.
See “Accessing Apache Kafka with internal and external clients“ configuration in bitnami/kafka.

Add service/trend/application/consumer/trend.go:

/*
Package consumer handles event-trigger style business logic.
*/
package consumer

import (
	"context"
	"encoding/json"

	"literank.com/event-books/domain/model"
	"literank.com/event-books/service/trend/domain/gateway"
)

type TrendConsumer struct {
	trendManager  gateway.TrendManager
	eventConsumer gateway.TrendEventConsumer
}

func NewTrendConsumer(t gateway.TrendManager, e gateway.TrendEventConsumer) *TrendConsumer {
	return &TrendConsumer{trendManager: t, eventConsumer: e}
}

func (c *TrendConsumer) Start(ctx context.Context) {
	c.eventConsumer.ConsumeEvents(ctx, func(key, data []byte) error {
		t := &model.Trend{
			Query: string(key),
		}
		if err := json.Unmarshal(data, &t.Books); err != nil {
			return err
		}
		_, err := c.trendManager.CreateTrend(ctx, t)
		return err
	})
}

func (c *TrendConsumer) EventConsumer() gateway.TrendEventConsumer {
	return c.eventConsumer
}

Tune service/trend/application/executor/trend_operator.go:

@@ -20,12 +20,7 @@ func NewTrendOperator(t gateway.TrendManager) *TrendOperator {
        return &TrendOperator{trendManager: t}
 }
 
-// CreateTrend creates a new trend
-func (o *TrendOperator) CreateTrend(ctx context.Context, t *model.Trend) (uint, error) {
-       return o.trendManager.CreateTrend(ctx, t)
-}
-
 // TopTrends gets the top trends order by hits in descending order
-func (o *TrendOperator) TopTrends(ctx context.Context, offset int) ([]*model.Trend, error) {
-       return o.trendManager.TopTrends(ctx, offset)
+func (o *TrendOperator) TopTrends(ctx context.Context, pageSize uint) ([]*model.Trend, error) {
+       return o.trendManager.TopTrends(ctx, pageSize)
 }

Tune service/trend/adapter/router.go:

@@ -14,7 +14,7 @@ import (
 )
 
 const (
-       fieldOffset = "o"
+       fieldPageSize = "ps"
 )
 
 // RestHandler handles all restful requests
@@ -41,19 +41,19 @@ func MakeRouter(wireHelper *application.WireHelper) (*gin.Engine, error) {
 
 // Get all trends
 func (r *RestHandler) getTrends(c *gin.Context) {
-       offset := 0
-       offsetParam := c.Query(fieldOffset)
-       if offsetParam != "" {
-               value, err := strconv.Atoi(offsetParam)
+       ps := 10
+       psParam := c.Query(fieldPageSize)
+       if psParam != "" {
+               value, err := strconv.Atoi(psParam)
                if err != nil {
-                       c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid offset"})
+                       c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid page size"})
                        return
                }
-               offset = value
+               ps = value
        }
-       trends, err := r.trendOperator.TopTrends(c, offset)
+       trends, err := r.trendOperator.TopTrends(c, uint(ps))
        if err != nil {
-               c.JSON(http.StatusNotFound, gin.H{"error": "failed to get trends"})
+               c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get trends"})
                return
        }
        c.JSON(http.StatusOK, trends)

Modify main.go to allow both the consumer and the router stop gracefully:

@@ -1,11 +1,18 @@
 package main
 
 import (
+       "context"
        "fmt"
+       "log"
+       "net/http"
+       "os"
+       "os/signal"
+       "syscall"
 
        "literank.com/event-books/infrastructure/parser"
        "literank.com/event-books/service/trend/adapter"
        "literank.com/event-books/service/trend/application"
+       "literank.com/event-books/service/trend/application/consumer"
        "literank.com/event-books/service/trend/infrastructure/config"
 )
 
@@ -24,13 +31,39 @@ func main() {
                panic(err)
        }
 
+       // Run the consumer
+       tc := consumer.NewTrendConsumer(wireHelper.TrendManager(), wireHelper.TrendEventConsumer())
+       eventConsumer := tc.EventConsumer()
+       go func() {
+               tc.Start(context.Background())
+       }()
+
        // Build main router
        r, err := adapter.MakeRouter(wireHelper)
        if err != nil {
                panic(err)
        }
+
+       svr := &http.Server{
+               Addr:    fmt.Sprintf(":%d", c.App.Port),
+               Handler: r,
+       }
+
+       // Shutdown signals
+       stopAll := make(chan os.Signal, 1)
+       signal.Notify(stopAll, syscall.SIGINT, syscall.SIGTERM)
+       go func() {
+               <-stopAll
+               if err := eventConsumer.Stop(); err != nil {
+                       log.Panicf("Failed to close consumer group: %v", err)
+               }
+               if err := svr.Shutdown(context.Background()); err != nil {
+                       log.Panicf("Failed to shutdown Gin server: %v", err)
+               }
+       }()
+
        // Run the server on the specified port
-       if err := r.Run(fmt.Sprintf(":%d", c.App.Port)); err != nil {
+       if err := svr.ListenAndServe(); err != nil && err != http.ErrServerClosed {
                panic(err)
        }
 }

The kafka consumer and the gin server are basically 2 “thread“s running in the same process. You need to catch the system signals, SIGINT and SIGTERM, and manually stop both of them gracefully.

Start both the web service and the trend service. Try searching keywords such as “love“, “war“ and “wealth“ in the web service‘s index page http://localhost:8080/?q=wealth.

Then you should be able to see some results like this from the trend service (http://localhost:8081/trends):

[
  {
    "query": "love",
    "books": [
      {
        "id": 4,
        "title": "Pride and Prejudice",
        "author": "Jane Austen",
        "published_at": "1813-01-28",
        "description": "A classic novel exploring the themes of love, reputation, and social class in Georgian England.",
        "created_at": "2024-04-02T21:02:59.314+08:00"
      },
      {
        "id": 10,
        "title": "War and Peace",
        "author": "Leo Tolstoy",
        "published_at": "1869-01-01",
        "description": "A novel depicting the Napoleonic era in Russia, exploring themes of love, war, and historical determinism.",
        "created_at": "2024-04-02T21:02:59.42+08:00"
      }
    ]
  },
  {
    "query": "war",
    "books": [
      {
        "id": 10,
        "title": "War and Peace",
        "author": "Leo Tolstoy",
        "published_at": "1869-01-01",
        "description": "A novel depicting the Napoleonic era in Russia, exploring themes of love, war, and historical determinism.",
        "created_at": "2024-04-02T21:02:59.42+08:00"
      },
      {
        "id": 12,
        "title": "The Odyssey",
        "author": "Homer",
        "published_at": "8th Century BC",
        "description": "An ancient Greek epic poem attributed to Homer, detailing the journey of Odysseus after the Trojan War.",
        "created_at": "2024-04-02T21:02:59.455+08:00"
      }
    ]
  },
  {
    "query": "wealth",
    "books": [
      {
        "id": 1,
        "title": "The Great Gatsby",
        "author": "F. Scott Fitzgerald",
        "published_at": "1925-04-10",
        "description": "A novel depicting the opulent lives of wealthy Long Island residents during the Jazz Age.",
        "created_at": "2024-04-02T20:36:01.015+08:00"
      }
    ]
  }
]
PrevNext