Skip to content

Commit

Permalink
feat: bump go-dcp to v1.2.0-rc.4 and return to bulk requester
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Nov 15, 2024
1 parent 2b3eff0 commit 3abdfad
Show file tree
Hide file tree
Showing 18 changed files with 241 additions and 90 deletions.
37 changes: 20 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Couchbase bucket in near real-time.
* **Less resource usage** and **higher throughput**.
* **Update multiple documents** for a DCP event(see [Example](#example)).
* Handling different DCP events such as **expiration, deletion and mutation**(see [Example](#example)).
* **Managing inflight request size to Couchbase.
* **Managing batch configurations** such as maximum batch, batch bytes, batch ticker durations.
* **Scale up and down** by custom membership algorithms(Couchbase, KubernetesHa, Kubernetes StatefulSet or
Static, see [examples](https://github.com/Trendyol/go-dcp#examples)).
* **Easily manageable configurations**.
Expand Down Expand Up @@ -68,6 +68,7 @@ func main() {
Username: "user",
Password: "password",
BucketName: "dcp-test-backup",
BatchSizeLimit: 10,
RequestTimeout: 10 * time.Second,
},
}).SetMapper(dcpcouchbase.DefaultMapper).Build()
Expand All @@ -88,22 +89,24 @@ Check out on [go-dcp](https://github.com/Trendyol/go-dcp#configuration)

### Couchbase Specific Configuration

| Variable | Type | Required | Default | Description |
|----------------------------------|---------------|----------|----------|-------------------------------------|
| `couchbase.hosts` | []string | yes | | Couchbase connection urls |
| `couchbase.username` | string | yes | | Defines Couchbase username |
| `couchbase.password` | string | yes | | Defines Couchbase password |
| `couchbase.bucketName` | string | yes | | Defines Couchbase bucket name |
| `couchbase.scopeName` | string | no | _default | Defines Couchbase scope name |
| `couchbase.collectionName` | string | no | _default | Defines Couchbase collection name |
| `couchbase.maxInflightRequests` | int | no | 2048 | Maximum message count for Couchbase |
| `couchbase.writePoolSizePerNode` | int | no | 1 | Write connection pool size per node |
| `couchbase.requestTimeout` | time.Duration | no | 3s | Maximum request waiting time |
| `couchbase.secureConnection` | bool | no | false | Enables secure connection. |
| `couchbase.rootCAPath` | string | no | false | Defines root CA path. |
| `couchbase.maxQueueSize` | int | no | 2048 | The maximum number of requests that can be queued waiting to be sent to a node. `2048` is default. Check this if you get queue overflowed or queue full. |
| `couchbase.connectionBufferSize` | uint | no | 20971520 | Defines connectionBufferSize. |
| `couchbase.connectionTimeout` | time.Duration | no | 5s | Defines connectionTimeout. |
| Variable | Type | Required | Default | Description |
|----------------------------------|---------------|----------|-----------------|-----------------------------------------------------------------------------------------------------|
| `couchbase.hosts` | []string | yes | | Couchbase connection urls |
| `couchbase.username` | string | yes | | Defines Couchbase username |
| `couchbase.password` | string | yes | | Defines Couchbase password |
| `couchbase.bucketName` | string | yes | | Defines Couchbase bucket name |
| `couchbase.scopeName` | string | no | _default | Defines Couchbase scope name |
| `couchbase.collectionName` | string | no | _default | Defines Couchbase collection name |
| `couchbase.batchSizeLimit` | int | no | 2048 | Maximum message count for batch, if exceed flush will be triggered. |
| `couchbase.batchTickerDuration` | time.Duration | no | 10s | Batch is being flushed automatically at specific time intervals for long waiting messages in batch. |
| `couchbase.batchByteSizeLimit` | int, string | no | 10mb | Maximum size(byte) for batch, if exceed flush will be triggered. `10mb` is default. |
| `couchbase.maxInflightRequests` | int | no | $batchSizeLimit | Maximum request count for Couchbase |
| `couchbase.writePoolSizePerNode` | int | no | 1 | Write connection pool size per node |
| `couchbase.requestTimeout` | time.Duration | no | 1m | Maximum request waiting time |
| `couchbase.secureConnection` | bool | no | false | Enables secure connection. |
| `couchbase.rootCAPath` | string | no | false | Defines root CA path. |
| `couchbase.connectionBufferSize` | uint | no | 20971520 | Defines connectionBufferSize. |
| `couchbase.connectionTimeout` | time.Duration | no | 1m | Defines connectionTimeout. |

## Exposed metrics

Expand Down
36 changes: 24 additions & 12 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package config
import (
"time"

"github.com/Trendyol/go-dcp/helpers"

"github.com/Trendyol/go-dcp/config"
)

Expand All @@ -12,17 +14,19 @@ const (
)

type Couchbase struct {
BatchByteSizeLimit any `yaml:"batchByteSizeLimit"`
RootCAPath string `yaml:"rootCAPath"`
CollectionName string `yaml:"collectionName"`
Username string `yaml:"username"`
Password string `yaml:"password"`
BucketName string `yaml:"bucketName"`
ScopeName string `yaml:"scopeName"`
CollectionName string `yaml:"collectionName"`
RootCAPath string `yaml:"rootCAPath"`
Hosts []string `yaml:"hosts"`
BatchSizeLimit int `yaml:"batchSizeLimit"`
BatchTickerDuration time.Duration `yaml:"batchTickerDuration"`
WritePoolSizePerNode int `yaml:"writePoolSizePerNode"`
MaxInflightRequests int `yaml:"maxInflightRequests"`
ConnectionTimeout time.Duration `yaml:"connectionTimeout"`
MaxQueueSize int `yaml:"maxQueueSize"`
ConnectionBufferSize uint `yaml:"connectionBufferSize"`
RequestTimeout time.Duration `yaml:"requestTimeout"`
SecureConnection bool `yaml:"secureConnection"`
Expand Down Expand Up @@ -53,24 +57,32 @@ func (c *Config) applyDefaultScopeName() {
}

func (c *Config) applyDefaultConnectionSettings() {
c.Couchbase.ConnectionTimeout = 5 * time.Second
c.Couchbase.ConnectionTimeout = 1 * time.Minute
c.Couchbase.ConnectionBufferSize = 20971520

if c.Couchbase.MaxQueueSize == 0 {
c.Couchbase.MaxQueueSize = 2048
}
}

func (c *Config) applyDefaultProcess() {
if c.Couchbase.WritePoolSizePerNode == 0 {
c.Couchbase.WritePoolSizePerNode = 1
}

if c.Couchbase.BatchTickerDuration == 0 {
c.Couchbase.BatchTickerDuration = 10 * time.Second
}

if c.Couchbase.BatchSizeLimit == 0 {
c.Couchbase.BatchSizeLimit = 2048
}

if c.Couchbase.MaxInflightRequests == 0 {
c.Couchbase.MaxInflightRequests = 2048
c.Couchbase.MaxInflightRequests = c.Couchbase.BatchSizeLimit
}

if c.Couchbase.WritePoolSizePerNode == 0 {
c.Couchbase.WritePoolSizePerNode = 1
if c.Couchbase.BatchByteSizeLimit == nil {
c.Couchbase.BatchByteSizeLimit = helpers.ResolveUnionIntOrStringValue("10mb")
}

if c.Couchbase.RequestTimeout == 0 {
c.Couchbase.RequestTimeout = 3 * time.Second
c.Couchbase.RequestTimeout = 1 * time.Minute
}
}
19 changes: 15 additions & 4 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"os"
"time"

"github.com/Trendyol/go-dcp/helpers"

jsoniter "github.com/json-iterator/go"

dcpCouchbase "github.com/Trendyol/go-dcp/couchbase"
Expand Down Expand Up @@ -49,6 +51,7 @@ type Metric struct {
func (c *connector) Start() {
go func() {
<-c.dcp.WaitUntilReady()
c.processor.StartProcessor()
}()
c.dcp.Start()
}
Expand Down Expand Up @@ -104,10 +107,15 @@ func (c *connector) listener(ctx *models.ListenerContext) {
return
}

for i := 0; i < len(actions); i++ {
func(idx int) {
c.processor.AddAction(ctx, e.EventTime, &actions[idx])
}(i)
batchSizeLimit := c.config.Couchbase.BatchSizeLimit
if len(actions) > batchSizeLimit {
chunks := helpers.ChunkSliceWithSize[couchbase.CBActionDocument](actions, batchSizeLimit)
lastChunkIndex := len(chunks) - 1
for idx, chunk := range chunks {
c.processor.AddActions(ctx, e.EventTime, chunk, idx == lastChunkIndex)
}
} else {
c.processor.AddActions(ctx, e.EventTime, actions, true)
}
}

Expand Down Expand Up @@ -144,6 +152,9 @@ func newConnector(cf any, mapper Mapper, sinkResponseHandler couchbase.SinkRespo
copyOfConfig := cfg.Couchbase
printConfiguration(copyOfConfig)

dcpConfig := dcp.GetConfig()
dcpConfig.Checkpoint.Type = "manual"

connector.dcp = dcp

client := couchbase.NewClient(&cfg.Couchbase)
Expand Down
121 changes: 102 additions & 19 deletions couchbase/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package couchbase
import (
"context"
"errors"
"sync"
"time"

"github.com/Trendyol/go-dcp/helpers"

"github.com/Trendyol/go-dcp/logger"

"github.com/couchbase/gocbcore/v10/memd"
Expand All @@ -21,13 +24,24 @@ type Processor struct {
client Client
metric *Metric
dcpCheckpointCommit func()
listenerCtxCh chan *models.ListenerContext
inflightCh chan struct{}
batchTicker *time.Ticker
batch []CBActionDocument
requestTimeout time.Duration
batchTickerDuration time.Duration
batchByteSizeLimit int
batchByteSize int
batchSizeLimit int
batchSize int
flushLock sync.Mutex
isDcpRebalancing bool
}

type Metric struct {
ProcessLatencyMs int64
ProcessLatencyMs int64
BulkRequestProcessLatencyMs int64
BulkRequestSize int64
BulkRequestByteSize int64
}

func NewProcessor(
Expand All @@ -44,49 +58,89 @@ func NewProcessor(
metric: &Metric{},
sinkResponseHandler: sinkResponseHandler,
targetClient: targetClient,
listenerCtxCh: make(chan *models.ListenerContext, config.Couchbase.MaxInflightRequests),
inflightCh: make(chan struct{}, config.Couchbase.MaxInflightRequests),
batchTicker: time.NewTicker(config.Couchbase.BatchTickerDuration),
batchSizeLimit: config.Couchbase.BatchSizeLimit,
batchByteSizeLimit: helpers.ResolveUnionIntOrStringValue(config.Couchbase.BatchByteSizeLimit),
batchTickerDuration: config.Couchbase.BatchTickerDuration,
}

return processor, nil
}

func (b *Processor) StartProcessor() {
for range b.batchTicker.C {
b.flushMessages()
}
}

func (b *Processor) Close() {
b.batchTicker.Stop()
b.flushMessages()
b.client.Close()
}

func (b *Processor) flushMessages() {
b.flushLock.Lock()
defer b.flushLock.Unlock()
if b.isDcpRebalancing {
return
}
if len(b.batch) > 0 {
b.bulkRequest()
b.batchTicker.Reset(b.batchTickerDuration)
b.batch = b.batch[:0]
b.batchSize = 0
b.batchByteSize = 0
}
b.dcpCheckpointCommit()
}

func (b *Processor) PrepareStartRebalancing() {
b.flushLock.Lock()
defer b.flushLock.Unlock()
b.isDcpRebalancing = true
b.batch = b.batch[:0]
b.batchSize = 0
b.batchByteSize = 0
}

func (b *Processor) PrepareEndRebalancing() {
b.flushLock.Lock()
defer b.flushLock.Unlock()
b.isDcpRebalancing = false
}

func (b *Processor) AddAction(
listenerCtx *models.ListenerContext,
func (b *Processor) AddActions(
ctx *models.ListenerContext,
eventTime time.Time,
action *CBActionDocument,
actions []CBActionDocument,
isLastChunk bool,
) {
b.metric.ProcessLatencyMs = time.Since(eventTime).Milliseconds()

if b.isDcpRebalancing {
return
b.flushLock.Lock()
b.batch = append(b.batch, actions...)
b.batchSize += len(actions)
for _, action := range actions {
b.batchByteSize += action.Size
}
if isLastChunk {
ctx.Ack()
}
b.flushLock.Unlock()

b.listenerCtxCh <- listenerCtx
ctx, cancel := context.WithTimeout(context.Background(), b.requestTimeout)
b.client.Execute(ctx, action, func(err error) {
lCtx := <-b.listenerCtxCh
cancel()
go b.panicOrGo(lCtx, action, err)
})
if isLastChunk {
b.metric.ProcessLatencyMs = time.Since(eventTime).Milliseconds()
}
if b.batchSize >= b.batchSizeLimit || b.batchByteSize >= b.batchByteSizeLimit {
b.flushMessages()
}
}

func (b *Processor) GetMetric() *Metric {
return b.metric
}

func (b *Processor) panicOrGo(listenerCtx *models.ListenerContext, action *CBActionDocument, err error) {
func (b *Processor) panicOrGo(action *CBActionDocument, err error) {
isRequestSuccessful := false
if err == nil {
isRequestSuccessful = true
Expand All @@ -102,7 +156,6 @@ func (b *Processor) panicOrGo(listenerCtx *models.ListenerContext, action *CBAct

if isRequestSuccessful {
b.handleSuccess(action)
listenerCtx.Ack()
return
}

Expand All @@ -123,8 +176,10 @@ func (b *Processor) handleError(action *CBActionDocument, err error) {
s.Retry = func(ctx context.Context) error {
errCh := make(chan error, 1)

b.inflightCh <- struct{}{}
b.client.Execute(ctx, s.Action, func(err error) {
errCh <- err
<-b.inflightCh
})

return <-errCh
Expand All @@ -141,12 +196,40 @@ func (b *Processor) handleSuccess(action *CBActionDocument) {
s.Retry = func(ctx context.Context) error {
errCh := make(chan error, 1)

b.inflightCh <- struct{}{}
b.client.Execute(ctx, s.Action, func(err error) {
errCh <- err
<-b.inflightCh
})

return <-errCh
}
b.sinkResponseHandler.OnSuccess(s)
}
}

func (b *Processor) handleResponse(idx int, wg *sync.WaitGroup, err error) {
b.panicOrGo(&b.batch[idx], err)
wg.Done()
}

func (b *Processor) bulkRequest() {
startedTime := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), b.requestTimeout)
defer cancel()
var wg sync.WaitGroup
wg.Add(len(b.batch))
for i := 0; i < len(b.batch); i++ {
func(idx int) {
b.inflightCh <- struct{}{}
b.client.Execute(ctx, &b.batch[idx], func(err error) {
go b.handleResponse(idx, &wg, err)
<-b.inflightCh
})
}(i)
}
wg.Wait()
b.metric.BulkRequestProcessLatencyMs = time.Since(startedTime).Milliseconds()
b.metric.BulkRequestSize = int64(b.batchSize)
b.metric.BulkRequestByteSize = int64(b.batchByteSize)
}
Loading

0 comments on commit 3abdfad

Please sign in to comment.