Skip to content

Commit

Permalink
Merge branch 'uber:master' into handler-test-7
Browse files Browse the repository at this point in the history
  • Loading branch information
timl3136 authored May 9, 2024
2 parents 6c6538f + 2fef3c3 commit 9b59344
Show file tree
Hide file tree
Showing 21 changed files with 4,663 additions and 2,776 deletions.
11 changes: 9 additions & 2 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ coverage:
if_ci_failed: ignore # require the CI to pass before setting the status
patch:
default:
target: 85% # specify the target coverage for each commit status
target: 0% # Todo (David.porter) to immediately revert this after landing a refactor
# specify the target coverage for each commit status
# option: "auto" (compare against parent commit or pull request base)
# option: "X%" a static target percentage to hit
threshold: 0% # allow the coverage drop by x% before marking as failure
Expand All @@ -38,7 +39,7 @@ ignore:
- "**/interface.go"
- "**/interfaces.go"
- "**/main.go"
- "**/mocks.go"
- "**/*mocks.go"
- "**/mocks/**"
- "**/testdata/**"
- "**/testing/**"
Expand All @@ -56,7 +57,13 @@ ignore:
- "common/types/shared.go" # 8k lines of getters. Not worth testing manually but consider switching to generated code.
- "host/**"
- "idls/**"
- "service/frontend/service.go"
- "service/history/constants/test_constants.go"
- "service/history/execution/mutable_state.go"
- "service/history/workflow/errors.go"
- "service/history/service.go"
- "service/matching/service.go"
- "service/worker/service.go"
- "testflags/**"
- "tools/common/schema/test/**"
- "tools/linter/**"
70 changes: 34 additions & 36 deletions common/elasticsearch/client/os2/client_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,42 @@ func (v *bulkProcessor) Add(request *bulk.GenericBulkableAddRequest) {
}

req.OnFailure = func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchutil.BulkIndexerResponseItem, err error) {
v.before(0, metricsRequest)
it := bulk.GenericBulkResponseItem{
Index: res.Index,
ID: res.DocumentID,
Version: res.Version,
Status: res.Status,
Error: res.Error,
v.processCallback(0, callBackRequest, req, res, false, err)
}

req.OnSuccess = func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchutil.BulkIndexerResponseItem) {
v.processCallback(0, callBackRequest, req, res, true, nil)
}
if err := v.processor.Add(context.Background(), req); err != nil {
v.logger.Error("adding bulk request to OpenSearch", tag.Error(err))
}
}

// processCallback processes both success and failure scenarios.
func (v *bulkProcessor) processCallback(index int64, callBackRequest bulk.GenericBulkableRequest, req opensearchutil.BulkIndexerItem, res opensearchutil.BulkIndexerResponseItem, onSuccess bool, err error) {
v.before(0, metricsRequest)

gr := []bulk.GenericBulkableRequest{callBackRequest}

it := bulk.GenericBulkResponseItem{
Index: res.Index,
ID: res.DocumentID,
Version: res.Version,
Status: res.Status,
Error: res.Error,
}

if onSuccess {
gbr := bulk.GenericBulkResponse{
Took: 0,
Errors: false,
Items: []map[string]*bulk.GenericBulkResponseItem{
{req.Action: &it},
},
}

gr := []bulk.GenericBulkableRequest{callBackRequest}
v.after(0, gr, &gbr, nil /*No errors here*/)
} else {
gbr := bulk.GenericBulkResponse{
Took: 0,
Errors: res.Error.Type != "" || res.Status > 201,
Expand All @@ -128,34 +154,6 @@ func (v *bulkProcessor) Add(request *bulk.GenericBulkableAddRequest) {
}

v.after(0, gr, &gbr, &ge)

}

req.OnSuccess = func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchutil.BulkIndexerResponseItem) {
v.before(0, metricsRequest)

gr := []bulk.GenericBulkableRequest{callBackRequest}

it := bulk.GenericBulkResponseItem{
Index: res.Index,
ID: res.DocumentID,
Version: res.Version,
Status: res.Status,
Error: res.Error,
}

gbr := bulk.GenericBulkResponse{
Took: 0,
Errors: false,
Items: []map[string]*bulk.GenericBulkResponseItem{
{req.Action: &it},
},
}

v.after(0, gr, &gbr, nil /*No errors here*/)
}
if err := v.processor.Add(context.Background(), req); err != nil {
v.logger.Error("adding bulk request to OpenSearch", tag.Error(err))
}
}

Expand Down
278 changes: 278 additions & 0 deletions common/elasticsearch/client/os2/client_bulk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package os2

import (
"bytes"
"context"
"errors"
"io"
"testing"

"github.com/opensearch-project/opensearch-go/v2/opensearchutil"
"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common/elasticsearch/bulk"
)

func TestStart(t *testing.T) {
ctx := context.Background()
processor := &bulkProcessor{}
if err := processor.Start(ctx); err != nil {
t.Errorf("Start() error = %v, wantErr %v", err, nil)
}
}

func TestStopAndClose(t *testing.T) {
osClient, testServer := getSecureMockOS2Client(t, nil, false)
defer testServer.Close()

params := bulk.BulkProcessorParameters{
FlushInterval: 1,
NumOfWorkers: 1,
}
processor, err := osClient.RunBulkProcessor(context.Background(), &params)
assert.NoError(t, err)

if err := processor.Stop(); err != nil {
t.Errorf("Stop() error = %v, wantErr %v", err, nil)
}
}

func TestFlush(t *testing.T) {
processor := &bulkProcessor{}
if err := processor.Flush(); err != nil {
t.Errorf("Flush() error = %v, wantErr %v", err, nil)
}
}

func TestAddDeleteRequest(t *testing.T) {
testcases := []struct {
name string
input bulk.GenericBulkableAddRequest
expectErr bool
}{
{
name: "delete request",
input: bulk.GenericBulkableAddRequest{
RequestType: bulk.BulkableDeleteRequest,
Index: "test-index",
ID: "test-id",
Version: 1,
VersionType: "external",
},
expectErr: false,
},
{
name: "index request success",
input: bulk.GenericBulkableAddRequest{
RequestType: bulk.BulkableIndexRequest,
Index: "test-index",
ID: "test-id",
Doc: map[string]interface{}{"field": "value"},
},
expectErr: false,
},
{
name: "create request success",
input: bulk.GenericBulkableAddRequest{
RequestType: bulk.BulkableCreateRequest,
Index: "test-index",
ID: "test-id",
Doc: map[string]interface{}{"field": "value"},
},
expectErr: false,
},
{
name: "create request failure",
input: bulk.GenericBulkableAddRequest{
RequestType: bulk.BulkableCreateRequest,
Index: "test-index",
ID: "test-id",
Doc: func() {},
},
expectErr: false,
},
{
name: "index request failure",
input: bulk.GenericBulkableAddRequest{
RequestType: bulk.BulkableIndexRequest,
Index: "test-index",
ID: "test-id",
Doc: func() {},
},
expectErr: false,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
osClient, testServer := getSecureMockOS2Client(t, nil, false)
defer testServer.Close()

params := bulk.BulkProcessorParameters{
FlushInterval: 1,
NumOfWorkers: 1,
}
processor, err := osClient.RunBulkProcessor(context.Background(), &params)
assert.NoError(t, err)
processor.Add(&tc.input)
processor.Stop()
})
}
}

func TestProcessCallback(t *testing.T) {
osClient, testServer := getSecureMockOS2Client(t, nil, false)
defer testServer.Close()
processor, _ := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{
Client: osClient.client,
FlushInterval: 1,
NumWorkers: 1,
Decoder: &NumberDecoder{},
})
bulkProcessor := &bulkProcessor{
processor: processor,
before: beforeFunc,
after: func(int64, []bulk.GenericBulkableRequest, *bulk.GenericBulkResponse, *bulk.GenericError) {},
}

callBackRequest := bulk.NewBulkIndexRequest().ID("test-id").Index("test-index").Doc(map[string]interface{}{"field": "value"})
req := opensearchutil.BulkIndexerItem{Index: "test-index", DocumentID: "test-id"}
res := opensearchutil.BulkIndexerResponseItem{Index: "test-index", DocumentID: "test-id", Status: 200}

tests := []struct {
name string
onSuccess bool
errorPassed error
expectError bool
}{
{"Success Callback", true, nil, false},
{"Failure Callback", false, errors.New("test error"), true},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
bulkProcessor.processCallback(0, callBackRequest, req, res, test.onSuccess, test.errorPassed)
})
}
}

func beforeFunc(executionID int64, requests []bulk.GenericBulkableRequest) {}

func TestRunBulkProcessor(t *testing.T) {
osClient, testServer := getSecureMockOS2Client(t, nil, false)
defer testServer.Close()

testcases := []struct {
name string
input bulk.BulkProcessorParameters
expectErr bool
}{
{
name: "normal",
input: bulk.BulkProcessorParameters{
FlushInterval: 1,
NumOfWorkers: 1,
},
expectErr: false,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
processor, err := osClient.RunBulkProcessor(context.Background(), &tc.input)
if tc.expectErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.NotNil(t, processor)
}
})
}
}

func TestUnmarshalFromReader(t *testing.T) {
// test unmarshalFromReader
testcases := []struct {
name string
input string
expectErr bool
}{
{
name: "normal",
input: `{"status":"ok"}`,
expectErr: false,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
decoder := &NumberDecoder{}
reader := createReaderFromString(tc.input)

var response opensearchutil.BulkIndexerResponse
err := decoder.UnmarshalFromReader(reader, &response)
if tc.expectErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}

func TestDecode(t *testing.T) {
// test decode
testcases := []struct {
name string
input string
expectErr bool
}{
{
name: "normal",
input: `{"status":"ok"}`,
expectErr: false,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
decoder := &NumberDecoder{}
reader := createReaderFromString(tc.input)

var response opensearchutil.BulkIndexerResponse
err := decoder.Decode(reader, &response)
if tc.expectErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}

// Function to create a sample reader from a string
func createReaderFromString(s string) io.Reader {
return bytes.NewBufferString(s)
}
Loading

0 comments on commit 9b59344

Please sign in to comment.