Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: [Rhythm] Block-builder consumption loop #4480

Draft
wants to merge 11 commits into
base: main-rhythm
Choose a base branch
from

Conversation

mdisibio
Copy link
Contributor

@mdisibio mdisibio commented Dec 19, 2024

What this PR does:
This is an alternate block-builder consumption loop that I think has benefits and would like to get feedback on.

The curent loop can be thought of as top-down. It calculates the total time range that the block builder is lagging, splits it into smaller sections (i.e. 5 minutes), consume/flush/commit each section.

This new loop is: while more data, start at last commit and consume/flush/commit another chunk of data (i.e. 5 minutes).

The benefits are:

  • less state - Each consume/flush/commit cycle is independent, doesn't require knowledge of the overall state of the queue, or side effects from previous loops
  • fewer kafka apis are involved, (i.e. no CalculateGroupLag)
  • doesn't require custom commit metadata
  • I think core loop is simpler and will make it easier to iterate. For example there is a TODO to round-robin when the block-builder is assigned to multiple partitions and they are all lagging. I think this is more complex to try in the current design.

The drawbacks are:
* Mainly around how we want to metric "lag". The current metric is number of messages, but I think finding a way to determine length of time (i.e. "the block-builder is 15 minutes behind") is more useful. However you have to read a record to determine that, so this needs more work.

TODO

  • This is a draft, and needs more finishing touches before it can be merged. What I want is to think about the overall loop structure. Logs/cleanup/finishing touches would be added before any merge.
  • Depends on test updates in [WIP] [Rhythm] Block builder test updates #4510 which setup support for consumer groups.

Which issue(s) this PR fixes:
Fixes #

Checklist

  • Tests updated
  • Documentation added
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX]

Copy link
Contributor

@javiermolinar javiermolinar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

modules/blockbuilder/blockbuilder.go Outdated Show resolved Hide resolved
return false, err
}

lastCommit, ok := commits.Lookup(topic, partition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
lastCommit, ok := commits.Lookup(topic, partition)
lastCommit, exists := commits.Lookup(topic, partition)
if exists && lastCommit.At >= 0 {
startOffset = startOffset.At(lastCommit.At)
} else {
startOffset = kgo.NewOffset().AtStart()
}

https://pkg.go.dev/github.com/twmb/franz-go/pkg/[email protected]#OffsetResponses.Lookup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ok is more idiomatic, and the lib is reading from a map internally anyway.

}

err := b.pushTraces(rec.Key, rec.Value, writer)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if something is wrong with the WAL? I guess it will enter in a loop

Copy link
Member

@mapno mapno left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice job on reducing the number of API calls. The way we calculate cycle sections is complex and should be simplified.

I have some concerns on losing visibility on the state of partitions. Before it was easy to see exactly how many pending messages there were and what was being consumed. I feel we lose that a bit with this PR.
I believe lag can still be periodically polled with fewer lines than currently and we'd keep that functionality. We might be able to delete the fallback time entirely.

)

var (
metricPartitionLag = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tempo",
Subsystem: "block_builder",
Name: "partition_lag",
Help: "Lag of a partition.",
Name: "partition_lag_s",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Name: "partition_lag_s",
Name: "partition_lag_seconds",

https://prometheus.io/docs/practices/naming/#metric-names

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO the number of pending records is also relevant. I think we should keep the original metric and add one for time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 re-added, and it is polled in a separate go routine.

Comment on lines 288 to 293
// Determine begin and end time range, which is -/+ cycle duration.
// But don't exceed the given overall end time.
begin = rec.Timestamp.Add(-dur)
if rec.Timestamp.Add(dur).Before(end) {
end = rec.Timestamp.Add(dur)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a strange side-effect of the writer being nil. Cycle initialisation could be consolidated in one place instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the cycle initialization. Swapped to use an init bool which should be clearer.

writer = newPartitionSectionWriter(b.logger, int64(partition), rec.Offset, b.cfg.BlockConfig, b.overrides, b.wal, b.enc)
}

if rec.Timestamp.Before(begin) || rec.Timestamp.After(end) {
Copy link
Member

@mapno mapno Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsure if we should break for records being too old. The record ts is set by the produced—the distributor in this case. It's not based on the trace's time.

If a record was so old as to be outside of the cycle's start time, I think we'd want to put it in the current block. Otherwise it could impact consumption too much, creating too small blocks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that if something is too old (older than the cycle time, i.e. minutes), then it likely has old traces too. But don't have strong opinions. Removed the check.

@mdisibio
Copy link
Contributor Author

mdisibio commented Jan 6, 2025

lag can still be periodically polled with fewer lines than currently and we'd keep that functionality. We might be able to delete the fallback time entirely.

Good call, re-added the original metric, polled it in a separate goroutine, and removed the fallback logic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants