From 1cea7d19daf4a390d6eb6b43711c9bc361c3a076 Mon Sep 17 00:00:00 2001 From: Serhat Karabulut Date: Mon, 7 Oct 2024 10:09:19 +0300 Subject: [PATCH] feat: add mutate in array append and increment and maxQueueSize config (#22) --- README.md | 33 ++++++------ config/config.go | 5 ++ couchbase/client.go | 114 +++++++++++++++++++++++++++++++++++++++++- couchbase/document.go | 24 +++++++++ 4 files changed, 159 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index e938943..82b3147 100644 --- a/README.md +++ b/README.md @@ -89,22 +89,23 @@ Check out on [go-dcp](https://github.com/Trendyol/go-dcp#configuration) ### Couchbase Specific Configuration -| Variable | Type | Required | Default | Description | -|----------------------------------|---------------|----------|----------|------------------------------------------------------------------------------------------------| -| `couchbase.hosts` | []string | yes | | Couchbase connection urls | -| `couchbase.username` | string | yes | | Defines Couchbase username | -| `couchbase.password` | string | yes | | Defines Couchbase password | -| `couchbase.bucketName` | string | yes | | Defines Couchbase bucket name | -| `couchbase.scopeName` | string | no | _default | Defines Couchbase scope name | -| `couchbase.collectionName` | string | no | _default | Defines Couchbase collection name | -| `couchbase.batchSizeLimit` | int | no | 1000 | Maximum message count for batch, if exceed flush will be triggered. | -| `couchbase.batchTickerDuration` | time.Duration | no | 10s | Batch is being flushed automatically at specific time intervals for long waiting messages in batch. | -| `couchbase.batchByteSizeLimit` | int, string | no | 10mb | Maximum size(byte) for batch, if exceed flush will be triggered. `10mb` is default. | -| `couchbase.requestTimeout` | time.Duration | no | 3s | Maximum request waiting time. Value type milliseconds. | -| `couchbase.secureConnection` | bool | no | false | Enables secure connection. | -| `couchbase.rootCAPath` | string | no | false | Defines root CA path. | -| `couchbase.connectionBufferSize` | uint | no | 20971520 | Defines connectionBufferSize. | -| `couchbase.connectionTimeout` | time.Duration | no | 5s | Defines connectionTimeout. | +| Variable | Type | Required | Default | Description | +|----------------------------------|---------------|----------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------| +| `couchbase.hosts` | []string | yes | | Couchbase connection urls | +| `couchbase.username` | string | yes | | Defines Couchbase username | +| `couchbase.password` | string | yes | | Defines Couchbase password | +| `couchbase.bucketName` | string | yes | | Defines Couchbase bucket name | +| `couchbase.scopeName` | string | no | _default | Defines Couchbase scope name | +| `couchbase.collectionName` | string | no | _default | Defines Couchbase collection name | +| `couchbase.batchSizeLimit` | int | no | 1000 | Maximum message count for batch, if exceed flush will be triggered. | +| `couchbase.batchTickerDuration` | time.Duration | no | 10s | Batch is being flushed automatically at specific time intervals for long waiting messages in batch. | +| `couchbase.batchByteSizeLimit` | int, string | no | 10mb | Maximum size(byte) for batch, if exceed flush will be triggered. `10mb` is default. | +| `couchbase.requestTimeout` | time.Duration | no | 3s | Maximum request waiting time. Value type milliseconds. | +| `couchbase.secureConnection` | bool | no | false | Enables secure connection. | +| `couchbase.rootCAPath` | string | no | false | Defines root CA path. | +| `couchbase.connectionBufferSize` | uint | no | 20971520 | Defines connectionBufferSize. | +| `couchbase.maxQueueSize` | int | no | 2048 | The maximum number of requests that can be queued waiting to be sent to a node. `2048` is default. Check this if you get queue overflowed or queue full. | +| `couchbase.connectionTimeout` | time.Duration | no | 5s | Defines connectionTimeout. | ## Exposed metrics diff --git a/config/config.go b/config/config.go index d511e28..7552a3b 100644 --- a/config/config.go +++ b/config/config.go @@ -25,6 +25,7 @@ type Couchbase struct { BatchTickerDuration time.Duration `yaml:"batchTickerDuration"` ConnectionTimeout time.Duration `yaml:"connectionTimeout"` BatchSizeLimit int `yaml:"batchSizeLimit"` + MaxQueueSize int `yaml:"maxQueueSize"` ConnectionBufferSize uint `yaml:"connectionBufferSize"` RequestTimeout time.Duration `yaml:"requestTimeout"` SecureConnection bool `yaml:"secureConnection"` @@ -57,6 +58,10 @@ func (c *Config) applyDefaultScopeName() { func (c *Config) applyDefaultConnectionSettings() { c.Couchbase.ConnectionTimeout = 5 * time.Second c.Couchbase.ConnectionBufferSize = 20971520 + + if c.Couchbase.MaxQueueSize == 0 { + c.Couchbase.MaxQueueSize = 2048 + } } func (c *Config) applyDefaultProcess() { diff --git a/couchbase/client.go b/couchbase/client.go index 5c57a09..5d14a60 100644 --- a/couchbase/client.go +++ b/couchbase/client.go @@ -52,6 +52,29 @@ type Client interface { preserveExpiry bool, cb gocbcore.MutateInCallback, ) error + ArrayAppend(ctx context.Context, + scopeName string, + collectionName string, + id []byte, + path []byte, + value []byte, + flags memd.SubdocDocFlag, + cas *gocbcore.Cas, + expiry uint32, + preserveExpiry bool, + cb gocbcore.MutateInCallback, + ) error + Increment(ctx context.Context, + scopeName string, + collectionName string, + id []byte, + delta uint64, + initial uint64, + cas *gocbcore.Cas, + expiry uint32, + preserveExpiry bool, + cb gocbcore.CounterCallback, + ) error Execute(ctx context.Context, action *CBActionDocument, callback func(err error)) Close() } @@ -65,7 +88,7 @@ func (s *client) Connect() error { agent, err := couchbase.CreateAgent( s.config.Hosts, s.config.BucketName, s.config.Username, s.config.Password, s.config.SecureConnection, s.config.RootCAPath, - s.config.ConnectionBufferSize, s.config.ConnectionTimeout, + s.config.MaxQueueSize, s.config.ConnectionBufferSize, s.config.ConnectionTimeout, ) if err != nil { logger.Log.Error("error while connect to target bucket, err: %v", err) @@ -165,6 +188,78 @@ func (s *client) CreatePath(ctx context.Context, return err } +func (s *client) ArrayAppend(ctx context.Context, + scopeName string, + collectionName string, + id []byte, + path []byte, + value []byte, + flags memd.SubdocDocFlag, + cas *gocbcore.Cas, + expiry uint32, + preserveExpiry bool, + cb gocbcore.MutateInCallback, +) error { + deadline, _ := ctx.Deadline() + + options := gocbcore.MutateInOptions{ + Key: id, + Flags: flags, + Ops: []gocbcore.SubDocOp{ + { + Op: memd.SubDocOpArrayPushLast, + Value: value, + Path: string(path), + }, + }, + Expiry: expiry, + PreserveExpiry: preserveExpiry, + Deadline: deadline, + ScopeName: scopeName, + CollectionName: collectionName, + } + + if cas != nil { + options.Cas = *cas + } + + _, err := s.agent.MutateIn(options, cb) + + return err +} + +func (s *client) Increment(ctx context.Context, + scopeName string, + collectionName string, + id []byte, + delta uint64, + initial uint64, + cas *gocbcore.Cas, + expiry uint32, + preserveExpiry bool, + cb gocbcore.CounterCallback, +) error { + deadline, _ := ctx.Deadline() + + options := gocbcore.CounterOptions{ + Key: id, + Delta: delta, + Initial: initial, + Expiry: expiry, + CollectionName: collectionName, + ScopeName: scopeName, + Deadline: deadline, + PreserveExpiry: preserveExpiry, + } + + if cas != nil { + options.Cas = *cas + } + + _, err := s.agent.Increment(options, cb) + return err +} + func (s *client) CreateDocument(ctx context.Context, scopeName string, collectionName string, @@ -283,6 +378,17 @@ func (s *client) Execute(ctx context.Context, action *CBActionDocument, callback func(result *gocbcore.MutateInResult, err error) { callback(err) }) + case action.Type == ArrayAppend: + flags := memd.SubdocDocFlagMkDoc + if action.DisableAutoCreate { + flags = memd.SubdocDocFlagNone + } + + err = s.ArrayAppend(ctx, s.config.ScopeName, s.config.CollectionName, + action.ID, action.Path, action.Source, flags, casPtr, action.Expiry, action.PreserveExpiry, + func(result *gocbcore.MutateInResult, err error) { + callback(err) + }) case action.Type == DeletePath: err = s.DeletePath(ctx, s.config.ScopeName, s.config.CollectionName, action.ID, action.Path, casPtr, action.Expiry, action.PreserveExpiry, @@ -295,6 +401,12 @@ func (s *client) Execute(ctx context.Context, action *CBActionDocument, callback func(result *gocbcore.DeleteResult, err error) { callback(err) }) + case action.Type == Increment: + err = s.Increment(ctx, s.config.ScopeName, s.config.CollectionName, + action.ID, action.Delta, action.Initial, casPtr, action.Expiry, action.PreserveExpiry, + func(result *gocbcore.CounterResult, err error) { + callback(err) + }) default: err = fmt.Errorf("unexpected action type: %v", action.Type) } diff --git a/couchbase/document.go b/couchbase/document.go index f4a32da..a9be720 100644 --- a/couchbase/document.go +++ b/couchbase/document.go @@ -13,6 +13,8 @@ const ( MutateIn CbAction = "MutateIn" MultiMutateIn CbAction = "MultiMutateIn" DeletePath CbAction = "DeletePath" + ArrayAppend CbAction = "ArrayAppend" + Increment CbAction = "Increment" ) type CBActionDocument struct { @@ -26,6 +28,8 @@ type CBActionDocument struct { Expiry uint32 PreserveExpiry bool DisableAutoCreate bool + Initial uint64 + Delta uint64 } func (doc *CBActionDocument) SetCas(cas uint64) { @@ -93,3 +97,23 @@ func NewDeletePathAction(key []byte, path []byte) CBActionDocument { Size: len(key) + len(path), } } + +func NewIncrementAction(key []byte, initial uint64, delta uint64) CBActionDocument { + return CBActionDocument{ + ID: key, + Type: Increment, + Initial: initial, + Delta: delta, + Size: len(key), + } +} + +func NewArrayAppendAction(key []byte, path []byte, source []byte) CBActionDocument { + return CBActionDocument{ + ID: key, + Source: source, + Type: ArrayAppend, + Path: path, + Size: len(key) + len(path) + len(source), + } +}