Skip to content

Commit

Permalink
Add more failure modes to the block-builder (#4345)
Browse files Browse the repository at this point in the history
* Add more tests to the block-builder

* stuff

* Add comments
  • Loading branch information
mapno authored Nov 20, 2024
1 parent 2b4e1fb commit f6826ba
Show file tree
Hide file tree
Showing 2 changed files with 286 additions and 72 deletions.
34 changes: 0 additions & 34 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,40 +419,6 @@ func getGroupLag(ctx context.Context, admClient *kadm.Client, topic, group strin
return kadm.CalculateGroupLagWithStartOffsets(descrGroup, offsets, startOffsets, endOffsets), nil
}

func (b *BlockBuilder) listOffsets(c *kgo.Client) {
admc := kadm.NewClient(c)
level.Info(b.logger).Log("msg", "list end offsets")
b.logListedOffsets(admc.ListEndOffsets)

level.Info(b.logger).Log("msg", "list start offsets")
b.logListedOffsets(admc.ListStartOffsets)

level.Info(b.logger).Log("msg", "list committed offsets")
b.logListedOffsets(admc.ListCommittedOffsets)
}

func (b *BlockBuilder) logListedOffsets(fn func(ctx context.Context, topics ...string) (kadm.ListedOffsets, error)) {
lo, err := fn(context.Background(), b.cfg.IngestStorageConfig.Kafka.Topic)
if err != nil {
level.Error(b.logger).Log("msg", "failed to list committed offsets", "err", err)
return
}
if lo.Error() != nil {
level.Error(b.logger).Log("msg", "list committed offsets error", "err", lo.Error())
}
lo.Each(func(offset kadm.ListedOffset) {
if offset.Partition <= 2 {
level.Info(b.logger).Log(
"msg", "listed offset",
"offset", offset.Offset,
"topic", offset.Topic,
"partition", offset.Partition,
"leader_epoch", offset.LeaderEpoch,
)
}
})
}

func (b *BlockBuilder) onRevoked(_ context.Context, _ *kgo.Client, revoked map[string][]int32) {
for topic, partitions := range revoked {
partitionsStr := fmt.Sprintf("%v", partitions)
Expand Down
Loading

0 comments on commit f6826ba

Please sign in to comment.