Skip to content

Commit

Permalink
[chore] Remove impossible infinite workers use-case for Batcher
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Feb 1, 2025
1 parent fc9dd8f commit 0e51e2c
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 37 deletions.
24 changes: 7 additions & 17 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ type Batcher interface {
}

type BaseBatcher struct {
batchCfg exporterbatcher.Config
queue exporterqueue.Queue[internal.Request]
// TODO: Remove when the -1 hack for testing is removed.
maxWorkers int
batchCfg exporterbatcher.Config
queue exporterqueue.Queue[internal.Request]
workerPool chan struct{}
exportFunc func(ctx context.Context, req internal.Request) error
stopWG sync.WaitGroup
Expand All @@ -50,17 +48,13 @@ func newBaseBatcher(batchCfg exporterbatcher.Config,
exportFunc func(ctx context.Context, req internal.Request) error,
maxWorkers int,
) BaseBatcher {
var workerPool chan struct{}
if maxWorkers > 0 {
workerPool = make(chan struct{}, maxWorkers)
for i := 0; i < maxWorkers; i++ {
workerPool <- struct{}{}
}
workerPool := make(chan struct{}, maxWorkers)
for i := 0; i < maxWorkers; i++ {
workerPool <- struct{}{}
}
return BaseBatcher{
batchCfg: batchCfg,
queue: queue,
maxWorkers: maxWorkers,
workerPool: workerPool,
exportFunc: exportFunc,
stopWG: sync.WaitGroup{},
Expand All @@ -70,17 +64,13 @@ func newBaseBatcher(batchCfg exporterbatcher.Config,
// flush starts a goroutine that calls exportFunc. It blocks until a worker is available if necessary.
func (qb *BaseBatcher) flush(ctx context.Context, req internal.Request, dones []exporterqueue.DoneCallback) {
qb.stopWG.Add(1)
if qb.workerPool != nil {
<-qb.workerPool
}
<-qb.workerPool
go func() {
defer qb.stopWG.Done()
err := qb.exportFunc(ctx, req)
for _, done := range dones {
done(err)
}
if qb.workerPool != nil {
qb.workerPool <- struct{}{}
}
qb.workerPool <- struct{}{}
}()
}
16 changes: 0 additions & 16 deletions exporter/internal/queue/default_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ func TestDefaultBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) {
name string
maxWorkers int
}{
{
name: "infinate_workers",
maxWorkers: 0,
},
{
name: "one_worker",
maxWorkers: 1,
Expand Down Expand Up @@ -89,10 +85,6 @@ func TestDefaultBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
name string
maxWorkers int
}{
{
name: "infinate_workers",
maxWorkers: 0,
},
{
name: "one_worker",
maxWorkers: 1,
Expand Down Expand Up @@ -161,10 +153,6 @@ func TestDefaultBatcher_NoSplit_WithTimeout(t *testing.T) {
name string
maxWorkers int
}{
{
name: "infinate_workers",
maxWorkers: 0,
},
{
name: "one_worker",
maxWorkers: 1,
Expand Down Expand Up @@ -230,10 +218,6 @@ func TestDefaultBatcher_Split_TimeoutDisabled(t *testing.T) {
name string
maxWorkers int
}{
{
name: "infinate_workers",
maxWorkers: 0,
},
{
name: "one_worker",
maxWorkers: 1,
Expand Down
4 changes: 0 additions & 4 deletions exporter/internal/queue/disabled_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ func TestDisabledBatcher_Basic(t *testing.T) {
name string
maxWorkers int
}{
{
name: "infinate_workers",
maxWorkers: 0,
},
{
name: "one_worker",
maxWorkers: 1,
Expand Down

0 comments on commit 0e51e2c

Please sign in to comment.