Skip to content

Commit

Permalink
feat: add mutate in array append and increment and maxQueueSize config (
Browse files Browse the repository at this point in the history
  • Loading branch information
3n0ugh authored Oct 7, 2024
1 parent 7cdb910 commit 1cea7d1
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 17 deletions.
33 changes: 17 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,23 @@ Check out on [go-dcp](https://github.com/Trendyol/go-dcp#configuration)

### Couchbase Specific Configuration

| Variable | Type | Required | Default | Description |
|----------------------------------|---------------|----------|----------|------------------------------------------------------------------------------------------------|
| `couchbase.hosts` | []string | yes | | Couchbase connection urls |
| `couchbase.username` | string | yes | | Defines Couchbase username |
| `couchbase.password` | string | yes | | Defines Couchbase password |
| `couchbase.bucketName` | string | yes | | Defines Couchbase bucket name |
| `couchbase.scopeName` | string | no | _default | Defines Couchbase scope name |
| `couchbase.collectionName` | string | no | _default | Defines Couchbase collection name |
| `couchbase.batchSizeLimit` | int | no | 1000 | Maximum message count for batch, if exceed flush will be triggered. |
| `couchbase.batchTickerDuration` | time.Duration | no | 10s | Batch is being flushed automatically at specific time intervals for long waiting messages in batch. |
| `couchbase.batchByteSizeLimit` | int, string | no | 10mb | Maximum size(byte) for batch, if exceed flush will be triggered. `10mb` is default. |
| `couchbase.requestTimeout` | time.Duration | no | 3s | Maximum request waiting time. Value type milliseconds. |
| `couchbase.secureConnection` | bool | no | false | Enables secure connection. |
| `couchbase.rootCAPath` | string | no | false | Defines root CA path. |
| `couchbase.connectionBufferSize` | uint | no | 20971520 | Defines connectionBufferSize. |
| `couchbase.connectionTimeout` | time.Duration | no | 5s | Defines connectionTimeout. |
| Variable | Type | Required | Default | Description |
|----------------------------------|---------------|----------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------|
| `couchbase.hosts` | []string | yes | | Couchbase connection urls |
| `couchbase.username` | string | yes | | Defines Couchbase username |
| `couchbase.password` | string | yes | | Defines Couchbase password |
| `couchbase.bucketName` | string | yes | | Defines Couchbase bucket name |
| `couchbase.scopeName` | string | no | _default | Defines Couchbase scope name |
| `couchbase.collectionName` | string | no | _default | Defines Couchbase collection name |
| `couchbase.batchSizeLimit` | int | no | 1000 | Maximum message count for batch, if exceed flush will be triggered. |
| `couchbase.batchTickerDuration` | time.Duration | no | 10s | Batch is being flushed automatically at specific time intervals for long waiting messages in batch. |
| `couchbase.batchByteSizeLimit` | int, string | no | 10mb | Maximum size(byte) for batch, if exceed flush will be triggered. `10mb` is default. |
| `couchbase.requestTimeout` | time.Duration | no | 3s | Maximum request waiting time. Value type milliseconds. |
| `couchbase.secureConnection` | bool | no | false | Enables secure connection. |
| `couchbase.rootCAPath` | string | no | false | Defines root CA path. |
| `couchbase.connectionBufferSize` | uint | no | 20971520 | Defines connectionBufferSize. |
| `couchbase.maxQueueSize` | int | no | 2048 | The maximum number of requests that can be queued waiting to be sent to a node. `2048` is default. Check this if you get queue overflowed or queue full. |
| `couchbase.connectionTimeout` | time.Duration | no | 5s | Defines connectionTimeout. |

## Exposed metrics

Expand Down
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Couchbase struct {
BatchTickerDuration time.Duration `yaml:"batchTickerDuration"`
ConnectionTimeout time.Duration `yaml:"connectionTimeout"`
BatchSizeLimit int `yaml:"batchSizeLimit"`
MaxQueueSize int `yaml:"maxQueueSize"`
ConnectionBufferSize uint `yaml:"connectionBufferSize"`
RequestTimeout time.Duration `yaml:"requestTimeout"`
SecureConnection bool `yaml:"secureConnection"`
Expand Down Expand Up @@ -57,6 +58,10 @@ func (c *Config) applyDefaultScopeName() {
func (c *Config) applyDefaultConnectionSettings() {
c.Couchbase.ConnectionTimeout = 5 * time.Second
c.Couchbase.ConnectionBufferSize = 20971520

if c.Couchbase.MaxQueueSize == 0 {
c.Couchbase.MaxQueueSize = 2048
}
}

func (c *Config) applyDefaultProcess() {
Expand Down
114 changes: 113 additions & 1 deletion couchbase/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,29 @@ type Client interface {
preserveExpiry bool,
cb gocbcore.MutateInCallback,
) error
ArrayAppend(ctx context.Context,
scopeName string,
collectionName string,
id []byte,
path []byte,
value []byte,
flags memd.SubdocDocFlag,
cas *gocbcore.Cas,
expiry uint32,
preserveExpiry bool,
cb gocbcore.MutateInCallback,
) error
Increment(ctx context.Context,
scopeName string,
collectionName string,
id []byte,
delta uint64,
initial uint64,
cas *gocbcore.Cas,
expiry uint32,
preserveExpiry bool,
cb gocbcore.CounterCallback,
) error
Execute(ctx context.Context, action *CBActionDocument, callback func(err error))
Close()
}
Expand All @@ -65,7 +88,7 @@ func (s *client) Connect() error {
agent, err := couchbase.CreateAgent(
s.config.Hosts, s.config.BucketName, s.config.Username, s.config.Password,
s.config.SecureConnection, s.config.RootCAPath,
s.config.ConnectionBufferSize, s.config.ConnectionTimeout,
s.config.MaxQueueSize, s.config.ConnectionBufferSize, s.config.ConnectionTimeout,
)
if err != nil {
logger.Log.Error("error while connect to target bucket, err: %v", err)
Expand Down Expand Up @@ -165,6 +188,78 @@ func (s *client) CreatePath(ctx context.Context,
return err
}

func (s *client) ArrayAppend(ctx context.Context,
scopeName string,
collectionName string,
id []byte,
path []byte,
value []byte,
flags memd.SubdocDocFlag,
cas *gocbcore.Cas,
expiry uint32,
preserveExpiry bool,
cb gocbcore.MutateInCallback,
) error {
deadline, _ := ctx.Deadline()

options := gocbcore.MutateInOptions{
Key: id,
Flags: flags,
Ops: []gocbcore.SubDocOp{
{
Op: memd.SubDocOpArrayPushLast,
Value: value,
Path: string(path),
},
},
Expiry: expiry,
PreserveExpiry: preserveExpiry,
Deadline: deadline,
ScopeName: scopeName,
CollectionName: collectionName,
}

if cas != nil {
options.Cas = *cas
}

_, err := s.agent.MutateIn(options, cb)

return err
}

func (s *client) Increment(ctx context.Context,
scopeName string,
collectionName string,
id []byte,
delta uint64,
initial uint64,
cas *gocbcore.Cas,
expiry uint32,
preserveExpiry bool,
cb gocbcore.CounterCallback,
) error {
deadline, _ := ctx.Deadline()

options := gocbcore.CounterOptions{
Key: id,
Delta: delta,
Initial: initial,
Expiry: expiry,
CollectionName: collectionName,
ScopeName: scopeName,
Deadline: deadline,
PreserveExpiry: preserveExpiry,
}

if cas != nil {
options.Cas = *cas
}

_, err := s.agent.Increment(options, cb)
return err
}

func (s *client) CreateDocument(ctx context.Context,
scopeName string,
collectionName string,
Expand Down Expand Up @@ -283,6 +378,17 @@ func (s *client) Execute(ctx context.Context, action *CBActionDocument, callback
func(result *gocbcore.MutateInResult, err error) {
callback(err)
})
case action.Type == ArrayAppend:
flags := memd.SubdocDocFlagMkDoc
if action.DisableAutoCreate {
flags = memd.SubdocDocFlagNone
}

err = s.ArrayAppend(ctx, s.config.ScopeName, s.config.CollectionName,
action.ID, action.Path, action.Source, flags, casPtr, action.Expiry, action.PreserveExpiry,
func(result *gocbcore.MutateInResult, err error) {
callback(err)
})
case action.Type == DeletePath:
err = s.DeletePath(ctx, s.config.ScopeName, s.config.CollectionName,
action.ID, action.Path, casPtr, action.Expiry, action.PreserveExpiry,
Expand All @@ -295,6 +401,12 @@ func (s *client) Execute(ctx context.Context, action *CBActionDocument, callback
func(result *gocbcore.DeleteResult, err error) {
callback(err)
})
case action.Type == Increment:
err = s.Increment(ctx, s.config.ScopeName, s.config.CollectionName,
action.ID, action.Delta, action.Initial, casPtr, action.Expiry, action.PreserveExpiry,
func(result *gocbcore.CounterResult, err error) {
callback(err)
})
default:
err = fmt.Errorf("unexpected action type: %v", action.Type)
}
Expand Down
24 changes: 24 additions & 0 deletions couchbase/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ const (
MutateIn CbAction = "MutateIn"
MultiMutateIn CbAction = "MultiMutateIn"
DeletePath CbAction = "DeletePath"
ArrayAppend CbAction = "ArrayAppend"
Increment CbAction = "Increment"
)

type CBActionDocument struct {
Expand All @@ -26,6 +28,8 @@ type CBActionDocument struct {
Expiry uint32
PreserveExpiry bool
DisableAutoCreate bool
Initial uint64
Delta uint64
}

func (doc *CBActionDocument) SetCas(cas uint64) {
Expand Down Expand Up @@ -93,3 +97,23 @@ func NewDeletePathAction(key []byte, path []byte) CBActionDocument {
Size: len(key) + len(path),
}
}

func NewIncrementAction(key []byte, initial uint64, delta uint64) CBActionDocument {
return CBActionDocument{
ID: key,
Type: Increment,
Initial: initial,
Delta: delta,
Size: len(key),
}
}

func NewArrayAppendAction(key []byte, path []byte, source []byte) CBActionDocument {
return CBActionDocument{
ID: key,
Source: source,
Type: ArrayAppend,
Path: path,
Size: len(key) + len(path) + len(source),
}
}

0 comments on commit 1cea7d1

Please sign in to comment.