From 8e2f334f484f6de92d6b4a46b222a7fc84a82f12 Mon Sep 17 00:00:00 2001 From: Matt Jaffee Date: Wed, 28 Aug 2019 08:48:44 -0500 Subject: [PATCH] use shardnode cluster change detector w/ cleanup --- client.go | 48 ++++++++++------------------ client_internal_it_test.go | 2 +- client_it_test.go | 65 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 32 deletions(-) diff --git a/client.go b/client.go index f60764a..5d84220 100644 --- a/client.go +++ b/client.go @@ -90,37 +90,11 @@ type Client struct { importLogEncoder encoder logLock sync.Mutex - // TODO shardNodes needs to be invalidated/updated when cluster topology changes. shardNodes shardNodes tick *time.Ticker + done chan struct{} } -// func (c *Client) translateCol(index, key string) (uint64, bool) { -// c.tlock.RLock() -// v, b := c.translator.GetCol(index, key) -// c.tlock.RUnlock() -// return v, b -// } - -// func (c *Client) translateRow(index, field, key string) (uint64, bool) { -// c.tlock.RLock() -// v, b := c.translator.GetRow(index, field, key) -// c.tlock.RUnlock() -// return v, b -// } - -// func (c *Client) addTranslateCol(index, key string, value uint64) { -// c.tlock.Lock() -// c.translator.AddCol(index, key, value) -// c.tlock.Unlock() -// } - -// func (c *Client) addTranslateRow(index, field, key string, value uint64) { -// c.tlock.Lock() -// c.translator.AddRow(index, field, key, value) -// c.tlock.Unlock() -// } - func (c *Client) getURIsForShard(index string, shard uint64) ([]*URI, error) { uris, ok := c.shardNodes.Get(index, shard) if ok { @@ -139,13 +113,22 @@ func (c *Client) getURIsForShard(index string, shard uint64) ([]*URI, error) { } func (c *Client) runChangeDetection() { - c.tick = time.NewTicker(time.Minute) - - for range c.tick.C { - c.detectClusterChanges() + for { + select { + case <-c.tick.C: + c.detectClusterChanges() + case <-c.done: + return + } } } +func (c *Client) Close() error { + c.tick.Stop() + close(c.done) + return nil +} + // detectClusterChanges chooses a random index and shard from the // shardNodes cache and deletes it. It then looks it up from Pilosa to // see if it still matches, and if not it drops the whole cache. @@ -232,6 +215,8 @@ func newClientWithOptions(options *ClientOptions) *Client { coordinatorLock: &sync.RWMutex{}, shardNodes: newShardNodes(), + tick: time.NewTicker(time.Minute), + done: make(chan struct{}, 0), } if options.importLogWriter != nil { c.importLogEncoder = newImportLogEncoder(options.importLogWriter) @@ -245,6 +230,7 @@ func newClientWithOptions(options *ClientOptions) *Client { c.minRetrySleepTime = 1 * time.Second c.maxRetrySleepTime = 2 * time.Minute c.importManager = newRecordImportManager(c) + go c.runChangeDetection() return c } diff --git a/client_internal_it_test.go b/client_internal_it_test.go index 2471b90..3baf7dd 100644 --- a/client_internal_it_test.go +++ b/client_internal_it_test.go @@ -174,7 +174,7 @@ func TestImportWithReplayErrors(t *testing.T) { func TestDetectClusterChanges(t *testing.T) { c := getClient() - + defer c.Close() c.shardNodes.data["blah"] = make(map[uint64][]*URI) c.shardNodes.data["blah"][1] = []*URI{{scheme: "zzz"}} diff --git a/client_it_test.go b/client_it_test.go index a597267..a58ab01 100644 --- a/client_it_test.go +++ b/client_it_test.go @@ -77,10 +77,12 @@ func Setup() { if err != nil { panic(err) } + _ = client.Close() } func TearDown() { client := getClient() + defer client.Close() err := client.DeleteIndex(index) if err != nil { panic(err) @@ -93,6 +95,7 @@ func TearDown() { func Reset() { client := getClient() + defer client.Close() client.DeleteIndex(index) Setup() } @@ -106,6 +109,7 @@ func TestCreateDefaultClient(t *testing.T) { func TestClientReturnsResponse(t *testing.T) { client := getClient() + defer client.Close() response, err := client.Query(testField.Row(1)) if err != nil { t.Fatalf("Error querying: %s", err) @@ -119,6 +123,7 @@ func TestQueryWithShards(t *testing.T) { Reset() const shardWidth = 1048576 client := getClient() + defer client.Close() if _, err := client.Query(testField.Set(1, 100)); err != nil { t.Fatal(err) } @@ -141,6 +146,7 @@ func TestQueryWithShards(t *testing.T) { func TestQueryWithColumns(t *testing.T) { Reset() client := getClient() + defer client.Close() targetAttrs := map[string]interface{}{ "name": "some string", "age": int64(95), @@ -184,6 +190,7 @@ func TestQueryWithColumns(t *testing.T) { func TestSetRowAttrs(t *testing.T) { Reset() client := getClient() + defer client.Close() targetAttrs := map[string]interface{}{ "name": "some string", "age": int64(95), @@ -209,6 +216,7 @@ func TestSetRowAttrs(t *testing.T) { func TestOrmCount(t *testing.T) { client := getClient() + defer client.Close() countField := index.Field("count-test") err := client.EnsureField(countField) if err != nil { @@ -231,6 +239,7 @@ func TestOrmCount(t *testing.T) { func TestIntersectReturns(t *testing.T) { client := getClient() + defer client.Close() field := index.Field("segments") err := client.EnsureField(field) if err != nil { @@ -258,6 +267,7 @@ func TestIntersectReturns(t *testing.T) { func TestTopNReturns(t *testing.T) { client := getClient() + defer client.Close() field := index.Field("topn_test") err := client.EnsureField(field) if err != nil { @@ -309,6 +319,7 @@ func TestTopNReturns(t *testing.T) { func TestMinMaxRow(t *testing.T) { client := getClient() + defer client.Close() field := index.Field("test-minmaxrow-field") err := client.EnsureField(field) if err != nil { @@ -347,6 +358,7 @@ func TestMinMaxRow(t *testing.T) { func TestSetMutexField(t *testing.T) { client := getClient() + defer client.Close() field := index.Field("mutex-test", OptFieldTypeMutex(CacheTypeDefault, 0)) err := client.EnsureField(field) if err != nil { @@ -391,6 +403,7 @@ func TestSetMutexField(t *testing.T) { func TestSetBoolField(t *testing.T) { client := getClient() + defer client.Close() field := index.Field("bool-test", OptFieldTypeBool()) err := client.EnsureField(field) if err != nil { @@ -414,6 +427,7 @@ func TestSetBoolField(t *testing.T) { func TestClearRowQuery(t *testing.T) { client := getClient() + defer client.Close() field := index.Field("clear-row-test") err := client.EnsureField(field) if err != nil { @@ -452,6 +466,7 @@ func TestClearRowQuery(t *testing.T) { func TestRowsQuery(t *testing.T) { client := getClient() + defer client.Close() field := index.Field("rows-test") err := client.EnsureField(field) if err != nil { @@ -480,6 +495,7 @@ func TestRowsQuery(t *testing.T) { func TestGroupByQuery(t *testing.T) { client := getClient() + defer client.Close() field := index.Field("group-by-test") err := client.EnsureField(field) if err != nil { @@ -509,6 +525,7 @@ func TestGroupByQuery(t *testing.T) { func TestCreateDeleteIndexField(t *testing.T) { client := getClient() + defer client.Close() index1 := NewIndex("to-be-deleted") field1 := index1.Field("foo") err := client.CreateIndex(index1) @@ -531,6 +548,7 @@ func TestCreateDeleteIndexField(t *testing.T) { func TestEnsureIndexExists(t *testing.T) { client := getClient() + defer client.Close() err := client.EnsureIndex(index) if err != nil { t.Fatal(err) @@ -539,6 +557,7 @@ func TestEnsureIndexExists(t *testing.T) { func TestEnsureFieldExists(t *testing.T) { client := getClient() + defer client.Close() err := client.EnsureField(testField) if err != nil { t.Fatal(err) @@ -547,6 +566,7 @@ func TestEnsureFieldExists(t *testing.T) { func TestCreateFieldWithTimeQuantum(t *testing.T) { client := getClient() + defer client.Close() field := index.Field("field-with-timequantum", OptFieldTypeTime(TimeQuantumYear)) err := client.CreateField(field) if err != nil { @@ -556,6 +576,7 @@ func TestCreateFieldWithTimeQuantum(t *testing.T) { func TestErrorCreatingIndex(t *testing.T) { client := getClient() + defer client.Close() err := client.CreateIndex(index) if err == nil { t.Fatal() @@ -564,6 +585,7 @@ func TestErrorCreatingIndex(t *testing.T) { func TestErrorCreatingField(t *testing.T) { client := getClient() + defer client.Close() err := client.CreateField(testField) if err == nil { t.Fatal() @@ -572,6 +594,7 @@ func TestErrorCreatingField(t *testing.T) { func TestIndexAlreadyExists(t *testing.T) { client := getClient() + defer client.Close() err := client.CreateIndex(index) if err != ErrIndexExists { t.Fatal(err) @@ -620,6 +643,7 @@ func TestQueryFailsIfAddressNotResolved(t *testing.T) { func TestQueryFails(t *testing.T) { client := getClient() + defer client.Close() _, err := client.Query(index.RawQuery("Invalid query")) if err == nil { t.Fatal() @@ -628,6 +652,7 @@ func TestQueryFails(t *testing.T) { func TestInvalidHttpRequest(t *testing.T) { client := getClient() + defer client.Close() _, _, err := client.httpRequest("INVALID METHOD", "/foo", nil, nil, false) if err == nil { t.Fatal() @@ -664,6 +689,7 @@ func TestResponseNotRead(t *testing.T) { func TestSchema(t *testing.T) { client := getClient() + defer client.Close() schema, err := client.Schema() if err != nil { t.Fatal(err) @@ -712,6 +738,7 @@ func TestSchema(t *testing.T) { func TestSync(t *testing.T) { client := getClient() + defer client.Close() remoteIndex := NewIndex("remote-index-1") err := client.EnsureIndex(remoteIndex) if err != nil { @@ -813,6 +840,7 @@ func NewGivenColumnGenerator(recs []Record) *GivenColumnGenerator { func TestImportWithBatchSize(t *testing.T) { client := getClient() + defer client.Close() // the first iterator for creating the target iterator := &ColumnGenerator{numRows: 10, numColumns: 1000} target := map[uint64][]uint64{} @@ -853,6 +881,7 @@ func TestImportWithBatchSize(t *testing.T) { func TestImportValues(t *testing.T) { client := getClient() + defer client.Close() schema, err := client.Schema() if err != nil { t.Fatalf("getting schema: %v", err) @@ -885,6 +914,7 @@ func TestImportValues(t *testing.T) { func TestImportWithBatchSizeExpectingZero(t *testing.T) { const shardWidth = 1048576 client := getClient() + defer client.Close() iterator := NewGivenColumnGenerator( []Record{ @@ -925,6 +955,7 @@ func failingImportColumns(field *Field, shard uint64, records []Record, nodes [] func TestImportWithBatchSizeFails(t *testing.T) { client := getClient() + defer client.Close() iterator := &ColumnGenerator{numRows: 10, numColumns: 1000} field := index.Field("importfield-batchsize") err := client.EnsureField(field) @@ -978,6 +1009,7 @@ func TestExportReaderReadBodyFailure(t *testing.T) { func TestFetchFragmentNodes(t *testing.T) { client := getClient() + defer client.Close() nodes, err := client.fetchFragmentNodes(index.Name(), 0) if err != nil { t.Fatal(err) @@ -997,6 +1029,7 @@ func TestFetchFragmentNodes(t *testing.T) { func TestFetchStatus(t *testing.T) { client := getClient() + defer client.Close() status, err := client.Status() if err != nil { t.Fatal(err) @@ -1008,6 +1041,7 @@ func TestFetchStatus(t *testing.T) { func TestFetchInfo(t *testing.T) { client := getClient() + defer client.Close() info, err := client.Info() if err != nil { t.Fatal(err) @@ -1031,6 +1065,7 @@ func TestFetchInfo(t *testing.T) { func TestRowRangeQuery(t *testing.T) { client := getClient() + defer client.Close() field := index.Field("test-rowrangefield", OptFieldTypeTime(TimeQuantumMonthDayHour)) err := client.EnsureField(field) if err != nil { @@ -1058,6 +1093,7 @@ func TestRowRangeQuery(t *testing.T) { func TestRangeField(t *testing.T) { client := getClient() + defer client.Close() field := index.Field("rangefield", OptFieldTypeInt()) field2 := index.Field("rangefield-set") err := client.EnsureField(field) @@ -1092,6 +1128,7 @@ func TestRangeField(t *testing.T) { func TestRangeField2(t *testing.T) { client := getClient() + defer client.Close() field := index.Field("rangefield", OptFieldTypeInt(10, 20)) field2 := index.Field("rangefield-set") err := client.EnsureField(field) @@ -1139,6 +1176,7 @@ func TestRangeField2(t *testing.T) { func TestNotQuery(t *testing.T) { client := getClient() + defer client.Close() index := schema.Index("not-query-index", OptIndexTrackExistence(true)) field := index.Field("not-field") err := client.SyncSchema(schema) @@ -1166,6 +1204,7 @@ func TestNotQuery(t *testing.T) { func TestStoreQuery(t *testing.T) { client := getClient() + defer client.Close() schema := NewSchema() index := schema.Index("store-test") fromField := index.Field("x-from-field") @@ -1196,6 +1235,7 @@ func TestStoreQuery(t *testing.T) { func TestExcludeAttrsColumns(t *testing.T) { client := getClient() + defer client.Close() field := index.Field("excludecolumnsattrsfield") err := client.EnsureField(field) if err != nil { @@ -1239,6 +1279,7 @@ func TestExcludeAttrsColumns(t *testing.T) { func TestMultipleClientKeyQuery(t *testing.T) { client := getClient() + defer client.Close() field := keysIndex.Field("multiple-client-field") err := client.EnsureField(field) if err != nil { @@ -1416,6 +1457,7 @@ func TestStatusUnmarshalFails(t *testing.T) { func TestStatusToNodeShardsForIndex(t *testing.T) { client := getClient() + defer client.Close() status := Status{ Nodes: []StatusNode{ { @@ -1444,6 +1486,7 @@ func TestStatusToNodeShardsForIndex(t *testing.T) { func TestHttpRequest(t *testing.T) { client := getClient() + defer client.Close() _, _, err := client.HttpRequest("GET", "/status", nil, nil) if err != nil { t.Fatal(err) @@ -1638,6 +1681,7 @@ func TestServerWarning(t *testing.T) { func TestRowIDColumnIDImport(t *testing.T) { client := getClient() + defer client.Close() iterator := newTestIterator() field := index.Field("importfield-rowid-colid") err := client.EnsureField(field) @@ -1700,6 +1744,7 @@ func TestRowIDColumnIDImport(t *testing.T) { func TestRowIDColumnIDImportTimestamp(t *testing.T) { client := getClient() + defer client.Close() iterator := newTestIteratorWithTimestamp() field := index.Field("importfield-csv-rowid-colid-time", OptFieldTypeTime(TimeQuantumYearMonthDayHour)) err := client.EnsureField(field) @@ -1774,6 +1819,7 @@ func TestRowIDColumnIDImportTimestamp(t *testing.T) { func TestRowIDColumnIDImportManualAddress(t *testing.T) { client := getClientManualAddress() + defer client.Close() iterator := newTestIterator() field := index.Field("importfield-rowid-colid") err := client.EnsureField(field) @@ -1836,6 +1882,7 @@ func TestRowIDColumnIDImportManualAddress(t *testing.T) { func TestRowIDColumnIDImportRoaring(t *testing.T) { client := getClient() + defer client.Close() iterator := newTestIterator() field := index.Field("importfield-rowid-colid") err := client.EnsureField(field) @@ -1899,6 +1946,7 @@ func TestRowIDColumnIDImportRoaring(t *testing.T) { func TestRowIDColumnIDTimestampImportRoaring(t *testing.T) { client := getClient() + defer client.Close() iterator := newTestIteratorWithTimestamp() field := index.Field("importfield-rowid-colid-time", OptFieldTypeTime(TimeQuantumYearMonthDayHour)) err := client.EnsureField(field) @@ -1973,6 +2021,7 @@ func TestRowIDColumnIDTimestampImportRoaring(t *testing.T) { func TestRowIDColumnIDTimestampImportRoaringNoStandardView(t *testing.T) { client := getClient() + defer client.Close() iterator := newTestIteratorWithTimestamp() field := index.Field("importfield-rowid-colid-time-nostd", OptFieldTypeTime(TimeQuantumMonthDayHour, true)) err := client.EnsureField(field) @@ -2066,6 +2115,7 @@ func TestRowIDColumnIDImportFailsRoaring(t *testing.T) { func TestCSVRowIDColumnKeyImport(t *testing.T) { client := getClient() + defer client.Close() iterator := NewArrayRecordIterator([]Record{ Column{RowID: 10, ColumnKey: "five"}, Column{RowID: 2, ColumnKey: "three"}, @@ -2108,6 +2158,7 @@ func TestCSVRowIDColumnKeyImport(t *testing.T) { func TestCSVRowIDColumnKeyImportManualAddress(t *testing.T) { client := getClientManualAddress() + defer client.Close() iterator := NewArrayRecordIterator([]Record{ Column{RowID: 10, ColumnKey: "five"}, Column{RowID: 2, ColumnKey: "three"}, @@ -2166,6 +2217,7 @@ func TestRowIDColumnKeyImportFails(t *testing.T) { func TestRowKeyColumnIDImport(t *testing.T) { client := getClient() + defer client.Close() iterator := NewArrayRecordIterator([]Record{ Column{RowKey: "ten", ColumnID: 7}, Column{RowKey: "ten", ColumnID: 5}, @@ -2208,6 +2260,7 @@ func TestRowKeyColumnIDImport(t *testing.T) { func TestRowKeyColumnKeyImport(t *testing.T) { client := getClient() + defer client.Close() iterator := NewArrayRecordIterator([]Record{ Column{RowKey: "ten", ColumnKey: "five"}, Column{RowKey: "two", ColumnKey: "three"}, @@ -2249,6 +2302,7 @@ func TestRowKeyColumnKeyImport(t *testing.T) { func TestRowKeyColumnKeyImportRoaring(t *testing.T) { client := getClient() + defer client.Close() iterator := NewArrayRecordIterator([]Record{ Column{RowKey: "ten", ColumnKey: "five"}, Column{RowKey: "two", ColumnKey: "three"}, @@ -2296,6 +2350,7 @@ func TestValueFieldImport(t *testing.T) { }) } client := getClient() + defer client.Close() iterator := newIterator() field := index.Field("importvaluefield", OptFieldTypeInt(0, 100)) err := client.EnsureField(field) @@ -2346,6 +2401,7 @@ func TestValueFieldImport(t *testing.T) { func TestValueFieldWithKeysImport(t *testing.T) { client := getClient() + defer client.Close() iterator := NewArrayRecordIterator([]Record{ FieldValue{ColumnKey: "ten", Value: 7}, FieldValue{ColumnKey: "seven", Value: 1}, @@ -2384,6 +2440,7 @@ func TestValueFieldWithKeysImport(t *testing.T) { func TestExportRowIDColumnID(t *testing.T) { client := getClient() + defer client.Close() field := index.Field("exportfield-rowid-colid") client.EnsureField(field) _, err := client.Query(index.BatchQuery( @@ -2407,6 +2464,7 @@ func TestExportRowIDColumnID(t *testing.T) { func TestExportRowIDColumnKey(t *testing.T) { client := getClient() + defer client.Close() field := keysIndex.Field("exportfield-rowid-colkey") client.EnsureField(field) _, err := client.Query(keysIndex.BatchQuery( @@ -2430,6 +2488,7 @@ func TestExportRowIDColumnKey(t *testing.T) { func TestExportRowKeyColumnID(t *testing.T) { client := getClient() + defer client.Close() field := index.Field("exportfield-rowkey-colid", OptFieldKeys(true)) client.EnsureField(field) _, err := client.Query(index.BatchQuery( @@ -2453,6 +2512,7 @@ func TestExportRowKeyColumnID(t *testing.T) { func TestExportRowKeyColumnKey(t *testing.T) { client := getClient() + defer client.Close() field := keysIndex.Field("exportfield-rowkey-colkey", OptFieldKeys(true)) client.EnsureField(field) _, err := client.Query(keysIndex.BatchQuery( @@ -2476,6 +2536,7 @@ func TestExportRowKeyColumnKey(t *testing.T) { func TestTranslateRowKeys(t *testing.T) { client := getClient() + defer client.Close() field := index.Field("translate-rowkeys", OptFieldKeys(true)) client.EnsureField(field) _, err := client.Query(index.BatchQuery( @@ -2497,6 +2558,7 @@ func TestTranslateRowKeys(t *testing.T) { func TestTranslateColKeys(t *testing.T) { client := getClient() + defer client.Close() field := keysIndex.Field("translate-colkeys") client.EnsureField(field) _, err := client.Query(keysIndex.BatchQuery( @@ -2529,6 +2591,7 @@ func TestCSVExportFailure(t *testing.T) { func TestImportColumnIteratorError(t *testing.T) { client := getClient() + defer client.Close() field := index.Field("not-important") iterator := &BrokenRecordIterator{} err := client.ImportField(field, iterator) @@ -2541,6 +2604,7 @@ func TestErrorReturningImportOption(t *testing.T) { iterator := NewArrayRecordIterator([]Record{}) field := index.Field("importfield") client := getClient() + defer client.Close() optionErr := errors.New("ERR") err := client.ImportField(field, iterator, ErrorImportOption(optionErr)) if err != optionErr { @@ -2550,6 +2614,7 @@ func TestErrorReturningImportOption(t *testing.T) { func TestImportColumnsNoNodesError(t *testing.T) { client := getClient() + defer client.Close() field := &Field{ index: &Index{ options: &IndexOptions{},