» Go: Building Event-Driven Microservices with Kafka » 3. Consumer: Trend Service » 3.2 Gin API Server

Gin API Server

Create domain/model/trend.go:

package model

import "time"

// Trend represents the structure of a
// trendy query and its related books.
type Trend struct {
	Query     string    `json:"query"`
	Books     []Book    `json:"books"`
	CreatedAt time.Time `json:"created_at"`
}

Create service/trend/domain/gateway/trend_manager.go:

/*
Package gateway contains all domain gateways.
*/
package gateway

import (
	"context"

	"literank.com/event-books/domain/model"
)

// TrendManager manages all trends
type TrendManager interface {
	CreateTrend(ctx context.Context, t *model.Trend) (uint, error)
	TopTrends(ctx context.Context, offset int) ([]*model.Trend, error)
}

In Redis, a ZSET (sorted set) is a data structure that combines the features of both sets and sorted lists. It is similar to a regular set, but it also maintains a sorting order based on a score associated with each member. This score allows members to be sorted in ascending or descending order.

So, we can use a ZSET to store the trends.

Install redis dependency:

go get -u github.com/redis/go-redis/v9

Create service/trend/infrastructure/cache/redis.go:

/*
Package cache has all cache-related implementations.
*/
package cache

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/redis/go-redis/v9"
	"literank.com/event-books/domain/model"
)

const (
	trendsKey      = "trends"
	queryKeyPrefix = "q-"
)

// RedisCache implements cache with redis
type RedisCache struct {
	c redis.UniversalClient
}

// NewRedisCache constructs a new RedisCache
func NewRedisCache(address, password string, db int) *RedisCache {
	r := redis.NewClient(&redis.Options{
		Addr:     address,
		Password: password,
		DB:       db,
	})
	return &RedisCache{
		c: r,
	}
}

func (r *RedisCache) CreateTrend(ctx context.Context, t *model.Trend) (uint, error) {
	// Store the search query in a sorted set
	member := t.Query
	_, err := r.c.ZScore(ctx, trendsKey, member).Result()
	if err != nil {
		if err == redis.Nil {
			// Member doesn't exist, add it with initial score of 1
			err := r.c.ZAdd(ctx, trendsKey, redis.Z{Score: 1, Member: member}).Err()
			if err != nil {
				return 0, err
			}
		}
		return 0, err
	}
	score, err := r.c.ZIncrBy(ctx, trendsKey, 1, member).Result()
	if err != nil {
		return 0, err
	}
	// Store the search query results
	k := queryKeyPrefix + t.Query
	results, err := json.Marshal(t.Books)
	if err != nil {
		return 0, err
	}
	_, err = r.c.Set(ctx, k, string(results), -1).Result()
	if err != nil {
		return 0, err
	}
	return uint(score), nil
}

func (r *RedisCache) TopTrends(ctx context.Context, offset int) ([]*model.Trend, error) {
	topItems, err := r.c.ZRevRangeWithScores(ctx, trendsKey, 0, int64(offset)).Result()
	if err != nil {
		return nil, err
	}
	trends := make([]*model.Trend, 0)
	for _, item := range topItems {
		query, ok := item.Member.(string)
		if !ok {
			return nil, fmt.Errorf("invalid non-string member: %s", item.Member)
		}
		t := &model.Trend{
			Query: query,
		}
		k := queryKeyPrefix + query
		value, err := r.c.Get(ctx, k).Result()
		if err != nil && err != redis.Nil {
			return nil, err
		}
		if err := json.Unmarshal([]byte(value), &t.Books); err != nil {
			return nil, err
		}
		trends = append(trends, t)
	}
	return trends, nil
}

Create service/trend/infrastructure/config/config.go:

/*
Package config provides config structures and parse funcs.
*/
package config

// Config is the global configuration.
type Config struct {
	App   ApplicationConfig `json:"app" yaml:"app"`
	Cache CacheConfig       `json:"cache" yaml:"cache"`
}

// ApplicationConfig is the configuration of main app.
type ApplicationConfig struct {
	Port int `json:"port" yaml:"port"`
}

// CacheConfig is the configuration of cache.
type CacheConfig struct {
	Address  string `json:"address" yaml:"address"`
	Password string `json:"password" yaml:"password"`
	DB       int    `json:"db" yaml:"db"`
}

Create service/trend/config.yml:

app:
  port: 8081
cache:
  address: localhost:6379
  password: test_pass
  db: 0

Create service/trend/application/wire_helper.go:

/*
Package application provides all common structures and functions of the application layer.
*/
package application

import (
	"literank.com/event-books/service/trend/domain/gateway"
	"literank.com/event-books/service/trend/infrastructure/cache"
	"literank.com/event-books/service/trend/infrastructure/config"
)

// WireHelper is the helper for dependency injection
type WireHelper struct {
	kvStore *cache.RedisCache
}

// NewWireHelper constructs a new WireHelper
func NewWireHelper(c *config.Config) (*WireHelper, error) {
	kv := cache.NewRedisCache(c.Cache.Address, c.Cache.Password, c.Cache.DB)
	return &WireHelper{
		kvStore: kv,
	}, nil
}

// TrendManager returns an instance of TrendManager
func (w *WireHelper) TrendManager() gateway.TrendManager {
	return w.kvStore
}

Create service/trend/application/executor/trend_operator.go:

/*
Package executor handles request-response style business logic.
*/
package executor

import (
	"context"

	"literank.com/event-books/domain/model"
	"literank.com/event-books/service/trend/domain/gateway"
)

// TrendOperator handles trend input/output and proxies operations to the trend manager.
type TrendOperator struct {
	trendManager gateway.TrendManager
}

// NewTrendOperator constructs a new TrendOperator
func NewTrendOperator(t gateway.TrendManager) *TrendOperator {
	return &TrendOperator{trendManager: t}
}

// CreateTrend creates a new trend
func (o *TrendOperator) CreateTrend(ctx context.Context, t *model.Trend) (uint, error) {
	return o.trendManager.CreateTrend(ctx, t)
}

// TopTrends gets the top trends order by hits in descending order
func (o *TrendOperator) TopTrends(ctx context.Context, offset int) ([]*model.Trend, error) {
	return o.trendManager.TopTrends(ctx, offset)
}

Create service/trend/adapter/router.go:

/*
Package adapter adapts to all kinds of framework or protocols.
*/
package adapter

import (
	"net/http"
	"strconv"

	"github.com/gin-gonic/gin"

	"literank.com/event-books/service/trend/application"
	"literank.com/event-books/service/trend/application/executor"
)

const (
	fieldOffset = "o"
)

// RestHandler handles all restful requests
type RestHandler struct {
	trendOperator *executor.TrendOperator
}

func newRestHandler(wireHelper *application.WireHelper) *RestHandler {
	return &RestHandler{
		trendOperator: executor.NewTrendOperator(wireHelper.TrendManager()),
	}
}

// MakeRouter makes the main router
func MakeRouter(wireHelper *application.WireHelper) (*gin.Engine, error) {
	rest := newRestHandler(wireHelper)
	// Create a new Gin router
	r := gin.Default()

	// Define a health endpoint handler
	r.GET("/trends", rest.getTrends)
	return r, nil
}

// Get all trends
func (r *RestHandler) getTrends(c *gin.Context) {
	offset := 0
	offsetParam := c.Query(fieldOffset)
	if offsetParam != "" {
		value, err := strconv.Atoi(offsetParam)
		if err != nil {
			c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid offset"})
			return
		}
		offset = value
	}
	trends, err := r.trendOperator.TopTrends(c, offset)
	if err != nil {
		c.JSON(http.StatusNotFound, gin.H{"error": "failed to get trends"})
		return
	}
	c.JSON(http.StatusOK, trends)
}

Finally, add service/trend/main.go:

package main

import (
	"fmt"

	"literank.com/event-books/infrastructure/parser"
	"literank.com/event-books/service/trend/adapter"
	"literank.com/event-books/service/trend/application"
	"literank.com/event-books/service/trend/infrastructure/config"
)

const configFileName = "config.yml"

func main() {
	// Read the config
	c, err := parser.Parse[config.Config](configFileName)
	if err != nil {
		panic(err)
	}

	// Prepare dependencies
	wireHelper, err := application.NewWireHelper(c)
	if err != nil {
		panic(err)
	}

	// Build main router
	r, err := adapter.MakeRouter(wireHelper)
	if err != nil {
		panic(err)
	}
	// Run the server on the specified port
	if err := r.Run(fmt.Sprintf(":%d", c.App.Port)); err != nil {
		panic(err)
	}
}

Run the server and try to hit the URL http://localhost:8081/trends with curl.