diff --git a/couchbase/client.go b/couchbase/client.go index 33f257f..b0096e2 100644 --- a/couchbase/client.go +++ b/couchbase/client.go @@ -3,7 +3,6 @@ package couchbase import ( "github.com/Trendyol/go-dcp-couchbase/config" "github.com/Trendyol/go-dcp/couchbase" - "github.com/Trendyol/go-dcp/logger" "github.com/couchbase/gocbcore/v10" ) diff --git a/couchbase/document.go b/couchbase/document.go index 8af969c..a55e4ac 100644 --- a/couchbase/document.go +++ b/couchbase/document.go @@ -3,14 +3,16 @@ package couchbase type CbAction string const ( - Index CbAction = "Index" - Delete CbAction = "Delete" + Set CbAction = "Set" + Delete CbAction = "Delete" + MutateIn CbAction = "MutateIn" ) type CBActionDocument struct { Type CbAction Source []byte ID []byte + Path []byte } func NewDeleteAction(key []byte) CBActionDocument { @@ -20,10 +22,19 @@ func NewDeleteAction(key []byte) CBActionDocument { } } -func NewIndexAction(key []byte, source []byte) CBActionDocument { +func NewSetAction(key []byte, source []byte) CBActionDocument { return CBActionDocument{ ID: key, Source: source, - Type: Index, + Type: Set, + } +} + +func NewMutateInAction(key []byte, path []byte, source []byte) CBActionDocument { + return CBActionDocument{ + ID: key, + Source: source, + Type: MutateIn, + Path: path, } } diff --git a/couchbase/processor.go b/couchbase/processor.go index 4433fdb..2d9f6a8 100644 --- a/couchbase/processor.go +++ b/couchbase/processor.go @@ -5,6 +5,8 @@ import ( "errors" "time" + "github.com/couchbase/gocbcore/v10/memd" + "github.com/Trendyol/go-dcp/couchbase" "github.com/Trendyol/go-dcp-couchbase/config" @@ -105,12 +107,18 @@ func (b *Processor) bulkRequest() error { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(b.requestTimeoutMs)*time.Millisecond) defer cancel() for _, v := range b.batch { - if v.Type == Index { + switch { + case v.Type == Set: err := couchbase.CreateDocument(ctx, b.client.GetAgent(), b.scopeName, b.collectionName, v.ID, v.Source, 0, 0) if err != nil { return err } - } else { + case v.Type == MutateIn: + err := couchbase.CreatePath(ctx, b.client.GetAgent(), b.scopeName, b.collectionName, v.ID, v.Path, v.Source, memd.SubdocDocFlagMkDoc) + if err != nil { + return err + } + default: err := couchbase.DeleteDocument(ctx, b.client.GetAgent(), b.scopeName, b.collectionName, v.ID) var keyValueErr *gocbcore.KeyValueError if errors.As(err, &keyValueErr) { diff --git a/example/struct-config/go.mod b/example/struct-config/go.mod index f997561..f083689 100644 --- a/example/struct-config/go.mod +++ b/example/struct-config/go.mod @@ -5,7 +5,7 @@ go 1.20 replace github.com/Trendyol/go-dcp-couchbase => ./../.. require ( - github.com/Trendyol/go-dcp v0.0.65 + github.com/Trendyol/go-dcp v0.0.66 github.com/Trendyol/go-dcp-couchbase v0.0.0 ) diff --git a/example/struct-config/go.sum b/example/struct-config/go.sum index b023fa2..6f32265 100644 --- a/example/struct-config/go.sum +++ b/example/struct-config/go.sum @@ -35,8 +35,8 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOEl github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA= -github.com/Trendyol/go-dcp v0.0.65 h1:/NSxOzR3pej50usfnZq4s+7qsqiqAJ58jVa3SsBD5FU= -github.com/Trendyol/go-dcp v0.0.65/go.mod h1:2sm0cBkX5O47oF5VPmZLyuGX/HAaiQMSMs1c1o7AfaA= +github.com/Trendyol/go-dcp v0.0.66 h1:+BGUxnXI2pA1BFhF0IsmnG+Y9gYidDvKXgNwcieS7kU= +github.com/Trendyol/go-dcp v0.0.66/go.mod h1:2sm0cBkX5O47oF5VPmZLyuGX/HAaiQMSMs1c1o7AfaA= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/example/struct-config/main.go b/example/struct-config/main.go index d29ae7a..a1baaf9 100644 --- a/example/struct-config/main.go +++ b/example/struct-config/main.go @@ -26,7 +26,7 @@ func main() { }, Metadata: dcpConfig.Metadata{ Config: map[string]string{ - "bucket": "dcp-test-meta", + "bucket": "dcp-test", "scope": "_default", "collection": "_default", }, @@ -37,7 +37,7 @@ func main() { Couchbase: config.Couchbase{ Hosts: []string{"localhost:8091"}, Username: "user", - Password: "123456", + Password: "password", BucketName: "dcp-test-backup", BatchSizeLimit: 10, RequestTimeoutMs: 1000 * 10, diff --git a/go.mod b/go.mod index 197d936..93d92f6 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/Trendyol/go-dcp-couchbase go 1.20 require ( - github.com/Trendyol/go-dcp v0.0.65 + github.com/Trendyol/go-dcp v0.0.66 github.com/couchbase/gocbcore/v10 v10.2.3 ) diff --git a/go.sum b/go.sum index b023fa2..6f32265 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,8 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOEl github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA= -github.com/Trendyol/go-dcp v0.0.65 h1:/NSxOzR3pej50usfnZq4s+7qsqiqAJ58jVa3SsBD5FU= -github.com/Trendyol/go-dcp v0.0.65/go.mod h1:2sm0cBkX5O47oF5VPmZLyuGX/HAaiQMSMs1c1o7AfaA= +github.com/Trendyol/go-dcp v0.0.66 h1:+BGUxnXI2pA1BFhF0IsmnG+Y9gYidDvKXgNwcieS7kU= +github.com/Trendyol/go-dcp v0.0.66/go.mod h1:2sm0cBkX5O47oF5VPmZLyuGX/HAaiQMSMs1c1o7AfaA= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/mapper.go b/mapper.go index 90b9d85..7817b84 100644 --- a/mapper.go +++ b/mapper.go @@ -6,7 +6,7 @@ type Mapper func(event couchbase.Event) []couchbase.CBActionDocument func DefaultMapper(event couchbase.Event) []couchbase.CBActionDocument { if event.IsMutated { - return []couchbase.CBActionDocument{couchbase.NewIndexAction(event.Key, event.Value)} + return []couchbase.CBActionDocument{couchbase.NewMutateInAction(event.Key, event.Key, event.Value)} } return []couchbase.CBActionDocument{couchbase.NewDeleteAction(event.Key)} }