diff --git a/device/pools.go b/device/pools.go index 94f3dc7e6..55d2be7df 100644 --- a/device/pools.go +++ b/device/pools.go @@ -7,14 +7,13 @@ package device import ( "sync" - "sync/atomic" ) type WaitPool struct { pool sync.Pool cond sync.Cond lock sync.Mutex - count atomic.Uint32 + count uint32 // Get calls not yet Put back max uint32 } @@ -27,10 +26,10 @@ func NewWaitPool(max uint32, new func() any) *WaitPool { func (p *WaitPool) Get() any { if p.max != 0 { p.lock.Lock() - for p.count.Load() >= p.max { + for p.count >= p.max { p.cond.Wait() } - p.count.Add(1) + p.count++ p.lock.Unlock() } return p.pool.Get() @@ -41,7 +40,9 @@ func (p *WaitPool) Put(x any) { if p.max == 0 { return } - p.count.Add(^uint32(0)) + p.lock.Lock() + defer p.lock.Unlock() + p.count-- p.cond.Signal() } diff --git a/device/pools_test.go b/device/pools_test.go index 82d7493e1..2b16f3984 100644 --- a/device/pools_test.go +++ b/device/pools_test.go @@ -15,7 +15,6 @@ import ( ) func TestWaitPool(t *testing.T) { - t.Skip("Currently disabled") var wg sync.WaitGroup var trials atomic.Int32 startTrials := int32(100000) @@ -32,7 +31,9 @@ func TestWaitPool(t *testing.T) { wg.Add(workers) var max atomic.Uint32 updateMax := func() { - count := p.count.Load() + p.lock.Lock() + count := p.count + p.lock.Unlock() if count > p.max { t.Errorf("count (%d) > max (%d)", count, p.max) } diff --git a/device/send.go b/device/send.go index 769720af8..b20b3c535 100644 --- a/device/send.go +++ b/device/send.go @@ -506,6 +506,7 @@ func (peer *Peer) RoutineSequentialSender(maxBatchSize int) { device.PutMessageBuffer(elem.buffer) device.PutOutboundElement(elem) } + device.PutOutboundElementsContainer(elemsContainer) continue } dataSent := false