Skip to content

Commit

Permalink
feat: version 2 (#6)
Browse files Browse the repository at this point in the history
## what
- switch to the new module name `github.com/alebabai/go-kit-kafka/v2`
- use `github.com/alebabai/go-kafka` as a middleware for Apache Kafka
transport
- completely re-write examples
- improve documentation

## why
To separate Apache Kafka abstractions from `go-kit` bindings
  • Loading branch information
alebabai authored May 10, 2024
1 parent 8a2214f commit 59f9309
Show file tree
Hide file tree
Showing 60 changed files with 908 additions and 3,812 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: 1.16
go-version: 1.22
- run: go version
- uses: actions/cache@v4
with:
Expand All @@ -25,7 +25,7 @@ jobs:
- name: Lint
uses: golangci/golangci-lint-action@v4
with:
version: v1.55.2
version: v1.56.2
- name: Build
run: make build
- name: Test
Expand Down
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ linters-settings:
goimports:
# put imports beginning with prefix after 3rd-party packages;
# it's a comma-separated list of prefixes
local-prefixes: github.com/alebabai/go-kit-kafka
local-prefixes: github.com/alebabai/go-kit-kafka/v2

linters:
enable:
Expand Down
28 changes: 6 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# go-kit-kafka

> Apache Kafka integration module for go-kit
> Apache Kafka integration module for go-kit powered by [alebabai/go-kafka](https://github.com/alebabai/go-kafka)
[![build](https://img.shields.io/github/actions/workflow/status/alebabai/go-kit-kafka/ci.yml)](https://github.com/alebabai/go-kit-kafka/actions?query=workflow%3ACI)
[![version](https://img.shields.io/github/go-mod/go-version/alebabai/go-kit-kafka)](https://go.dev/)
Expand All @@ -16,37 +16,21 @@ Go modules are supported.
Manual install:

```bash
go get -u github.com/alebabai/go-kit-kafka
go get -u github.com/alebabai/go-kit-kafka/v2
```

Golang import:

```go
import "github.com/alebabai/go-kit-kafka/kafka"
import "github.com/alebabai/go-kit-kafka/v2"
```

## Usage

To use consumer/producer transport abstractions converters to the following types from the chosen Apache Kafka
client library should be implemented:
Just import it into your project and use the provided abstractions to couple your preferred Apache Kafka client library with the well-known [go-kit/kit](https://github.com/go-kit/kit).

```go
type Message struct {
Topic string
Partition int32
Offset int64
Key []byte
Value []byte
Headers []Header
Timestamp time.Time
}

type Header struct {
Key []byte
Value []byte
}
```
For detailed information, please refer to the [alebabai/go-kafka](https://github.com/alebabai/go-kafka) repository.

## Examples

Go to [Examples](examples).
For additional usage details, please check out the [examples](./examples) folder.
12 changes: 6 additions & 6 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
## Implementations

- [confluent](confluent)
- [sarama](sarama)
- [confluent](./confluent)
- [sarama](./sarama)

## Reference

### Services

- _producer_ - the service that provide an endpoint to produce an event
- **producer** - a service that provides an endpoint for generating and producing events to the specified topic in Apache Kafka

- _consumer_ - the service that able to consume events from kafka topic, store them the in inmemory storage and provide
- **consumer** - the service that able to consume events from kafka topic, store them the in inmemory storage and provide
an endpoint to list all consumed events

## Usage
Expand All @@ -29,13 +29,13 @@ docker compose -f <example>/compose.yml up
To produce an event send the following request:

```bash
curl -X POST http://localhost:8080/events
curl -X GET http://localhost:8081/events

```

To view all events send the following request:

```bash
curl -X GET http://localhost:8081/events
curl -X GET http://localhost:8080/events

```
6 changes: 6 additions & 0 deletions examples/common/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package common

const (
BrokerAddr = ":9094"
KafkaTopic = "events"
)
6 changes: 6 additions & 0 deletions examples/common/consumer/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package consumer

const (
HTTPServerAddr = ":8080"
KafkaGroupID = "events-consumer"
)
16 changes: 7 additions & 9 deletions examples/common/consumer/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,32 @@ import (
"github.com/go-kit/kit/endpoint"
)

type Endpoints struct {
CreateEventEndpoint endpoint.Endpoint
ListEventsEndpoint endpoint.Endpoint
}

func MakeCreateEventEndpoint(svc Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
req := request.(CreateEventRequest)

if err := svc.Create(ctx, *req.Payload); err != nil {
res, err := svc.CreateEvent(ctx, req.Payload)
if err != nil {
return nil, fmt.Errorf("failed to create event: %w", err)
}

return CreateEventResponse{}, nil
return CreateEventResponse{
Result: *res,
}, nil
}
}

func MakeListEventsEndpoint(svc Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
_ = request.(ListEventsRequest)

ee, err := svc.List(ctx)
res, err := svc.ListEvents(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list events: %w", err)
}

return ListEventsResponse{
Results: ee,
Results: res,
}, nil
}
}
7 changes: 4 additions & 3 deletions examples/common/consumer/request_response.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package consumer

import (
"github.com/alebabai/go-kit-kafka/examples/common/domain"
"github.com/alebabai/go-kit-kafka/v2/examples/common"
)

type CreateEventRequest struct {
Payload *domain.Event
Payload common.Event
}

type CreateEventResponse struct {
Result common.Event
}

type ListEventsRequest struct {
}

type ListEventsResponse struct {
Results []domain.Event
Results []common.Event
}
63 changes: 60 additions & 3 deletions examples/common/consumer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,68 @@ package consumer

import (
"context"
"fmt"
"sort"
"sync"

"github.com/alebabai/go-kit-kafka/examples/common/domain"
"github.com/alebabai/go-kit-kafka/v2/examples/common"
"github.com/go-kit/log"
)

type Service interface {
Create(ctx context.Context, e domain.Event) error
List(ctx context.Context) ([]domain.Event, error)
CreateEvent(ctx context.Context, in common.Event) (*common.Event, error)
ListEvents(ctx context.Context) ([]common.Event, error)
}

type service struct {
cache map[string]common.Event
m sync.Mutex
logger log.Logger
}

func NewService(logger log.Logger) Service {
return &service{
cache: make(map[string]common.Event),
logger: logger,
}
}

func (svc *service) CreateEvent(_ context.Context, in common.Event) (*common.Event, error) {
_ = svc.logger.Log("msg", "saving an event", "event_id", in.ID)

if _, ok := svc.cache[in.ID]; ok {
return nil, fmt.Errorf("event with id=%v already exists", in.ID)
}

in.State = "new"

svc.m.Lock()
svc.cache[in.ID] = in
svc.m.Unlock()

return &in, nil
}

func (svc *service) ListEvents(_ context.Context) ([]common.Event, error) {
svc.m.Lock()

out := make([]common.Event, 0)
for _, e := range svc.cache {
out = append(out, e)
}

sort.Slice(out, func(i, j int) bool {
return out[i].CreatedAt.Before(out[j].CreatedAt)
})

// mark all viewed events as expired
for k := range svc.cache {
e := svc.cache[k]
e.State = "expired"
svc.cache[k] = e
}

svc.m.Unlock()

return out, nil
}
65 changes: 0 additions & 65 deletions examples/common/consumer/storage.go

This file was deleted.

30 changes: 12 additions & 18 deletions examples/common/consumer/transport_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,19 @@ import (
"context"
"net/http"

"github.com/go-kit/kit/endpoint"
httptransport "github.com/go-kit/kit/transport/http"
"github.com/gorilla/mux"
)

func NewHTTPHandler(endpoints Endpoints) http.Handler {
r := mux.
NewRouter().
StrictSlash(true)

r.
Path("/events").
Methods("GET").
Handler(httptransport.NewServer(
endpoints.ListEventsEndpoint,
decodeListEventsHTTPRequest,
encodeListEventsHTTPResponse,
))

return r
func NewHTTPHandler(e endpoint.Endpoint) http.Handler {
m := http.NewServeMux()
m.Handle("/events", httptransport.NewServer(
e,
decodeListEventsHTTPRequest,
encodeListEventsHTTPResponse,
))

return m
}

func decodeListEventsHTTPRequest(_ context.Context, _ *http.Request) (interface{}, error) {
Expand All @@ -32,8 +26,8 @@ func decodeListEventsHTTPRequest(_ context.Context, _ *http.Request) (interface{
func encodeListEventsHTTPResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
httptransport.SetContentType("application/json")(ctx, w)

res := response.(ListEventsResponse)
if err := httptransport.EncodeJSONResponse(ctx, w, res.Results); err != nil {
resp := response.(ListEventsResponse)
if err := httptransport.EncodeJSONResponse(ctx, w, resp.Results); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 59f9309

Please sign in to comment.