Skip to content

Commit

Permalink
Fix old sync map functions
Browse files Browse the repository at this point in the history
  • Loading branch information
mhmtszr committed Aug 8, 2023
1 parent e8680dc commit a5400fd
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
9 changes: 6 additions & 3 deletions kafka/metadata/kafka_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (s *kafkaMetadata) Save(state map[uint16]*models.CheckpointDocument, dirtyO
func (s *kafkaMetadata) Load( //nolint:funlen
vbIDs []uint16,
bucketUUID string,
) (*wrapper.SyncMap[uint16, *models.CheckpointDocument], bool, error) {
) (*wrapper.ConcurrentSwissMap[uint16, *models.CheckpointDocument], bool, error) {
partitions, err := s.kafkaClient.GetPartitions(s.topic)
if err != nil {
return nil, false, err
Expand Down Expand Up @@ -95,7 +95,7 @@ func (s *kafkaMetadata) Load( //nolint:funlen
}(consumer, endOffset.LastOffset)
}

state := &wrapper.SyncMap[uint16, *models.CheckpointDocument]{}
state := wrapper.CreateConcurrentSwissMap[uint16, *models.CheckpointDocument](1024)
exist := false

go func() {
Expand All @@ -122,7 +122,10 @@ func (s *kafkaMetadata) Load( //nolint:funlen
wg.Wait()

for _, vbID := range vbIDs {
state.LoadOrStore(vbID, models.NewEmptyCheckpointDocument(bucketUUID))
_, ok := state.Load(vbID)
if !ok {
state.Store(vbID, models.NewEmptyCheckpointDocument(bucketUUID))
}
}

return state, exist, nil
Expand Down
2 changes: 1 addition & 1 deletion test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestKafka(t *testing.T) {
}()

wg.Wait()
t.Log("done done done")
t.Log("done")
}

type CountResponse struct {
Expand Down

0 comments on commit a5400fd

Please sign in to comment.