Skip to content

Commit

Permalink
rework pools
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed Nov 18, 2024
1 parent 8a6202f commit 2cdf95b
Showing 1 changed file with 59 additions and 46 deletions.
105 changes: 59 additions & 46 deletions share/shwap/p2p/bitswap/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/ipfs/boxo/blockstore"
Expand All @@ -31,8 +30,8 @@ type Getter struct {
bstore blockstore.Blockstore
availWndw time.Duration

availablePool sync.Pool
archivalPool sync.Pool
availablePool *pool
archivalPool *pool

cancel context.CancelFunc
}
Expand All @@ -43,7 +42,13 @@ func NewGetter(
bstore blockstore.Blockstore,
availWndw time.Duration,
) *Getter {
return &Getter{exchange: exchange, bstore: bstore, availWndw: availWndw}
return &Getter{
exchange: exchange,
bstore: bstore,
availWndw: availWndw,
availablePool: newPool(exchange),
archivalPool: newPool(exchange),
}
}

// Start kicks off internal fetching sessions.
Expand All @@ -60,19 +65,8 @@ func (g *Getter) Start() {
ctx, cancel := context.WithCancel(context.Background())
g.cancel = cancel

var availableID atomic.Uint64
g.availablePool.New = func() interface{} {
log.Debugw("creating new available window session", "id", availableID.Load())
availableID.Add(1)
return g.exchange.NewSession(ctx)
}

var archivalID atomic.Uint64
g.archivalPool.New = func() interface{} {
log.Debugw("creating new archival session", "id", archivalID.Load())
archivalID.Add(1)
return g.exchange.NewSession(ctx)
}
g.availablePool.ctx = ctx
g.availablePool.ctx = ctx
}

// Stop shuts down Getter's internal fetching getSession.
Expand Down Expand Up @@ -113,8 +107,8 @@ func (g *Getter) GetShares(
isArchival := g.isArchival(hdr)
span.SetAttributes(attribute.Bool("is_archival", isArchival))

ses := g.getSession(isArchival)
defer g.putSession(ses, isArchival)
ses, release := g.getSession(isArchival)
defer release()

err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithStore(g.bstore), WithFetcher(ses))
if err != nil {
Expand Down Expand Up @@ -177,8 +171,8 @@ func (g *Getter) GetEDS(
isArchival := g.isArchival(hdr)
span.SetAttributes(attribute.Bool("is_archival", isArchival))

ses := g.getSession(isArchival)
defer g.putSession(ses, isArchival)
ses, release := g.getSession(isArchival)
defer release()

err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithFetcher(ses))
if err != nil {
Expand Down Expand Up @@ -236,8 +230,8 @@ func (g *Getter) GetNamespaceData(
isArchival := g.isArchival(hdr)
span.SetAttributes(attribute.Bool("is_archival", isArchival))

ses := g.getSession(isArchival)
defer g.putSession(ses, isArchival)
ses, release := g.getSession(isArchival)
defer release()

if err = Fetch(ctx, g.exchange, hdr.DAH, blks, WithFetcher(ses)); err != nil {
span.RecordError(err)
Expand All @@ -264,31 +258,13 @@ func (g *Getter) isArchival(hdr *header.ExtendedHeader) bool {
}

// getSession takes a session out of the respective session pool
func (g *Getter) getSession(isArchival bool) exchange.Fetcher {
if isArchival {
v := g.archivalPool.Get()
if v == nil {
panic("Getter must be started")
}

return v.(exchange.Fetcher)
}

v := g.availablePool.Get()
if v == nil {
panic("Getter must be started")
}

return v.(exchange.Fetcher)
}

// putSession puts session back into the respective session pool
func (g *Getter) putSession(session exchange.Fetcher, isArchival bool) {
func (g *Getter) getSession(isArchival bool) (ses exchange.Fetcher, release func()) {
if isArchival {
g.archivalPool.Put(session)
} else {
g.availablePool.Put(session)
ses = g.archivalPool.get()
return ses, func() { g.archivalPool.put(ses) }
}
ses = g.availablePool.get()
return ses, func() { g.availablePool.put(ses) }
}

// edsFromRows imports given Rows and computes EDS out of them, assuming enough Rows were provided.
Expand Down Expand Up @@ -322,3 +298,40 @@ func edsFromRows(roots *share.AxisRoots, rows []shwap.Row) (*rsmt2d.ExtendedData

return square, nil
}

// pool is a pool of Bitswap sessions.
type pool struct {
lock sync.Mutex
sessions []exchange.Fetcher
ctx context.Context
exchange exchange.SessionExchange
}

func newPool(ex exchange.SessionExchange) *pool {
return &pool{
exchange: ex,
sessions: make([]exchange.Fetcher, 0),
}
}

// get returns a session from the pool or creates a new one if the pool is empty.
func (p *pool) get() exchange.Fetcher {
p.lock.Lock()
defer p.lock.Unlock()

if len(p.sessions) == 0 {
return p.exchange.NewSession(p.ctx)
}

ses := p.sessions[len(p.sessions)-1]
p.sessions = p.sessions[:len(p.sessions)-1]
return ses
}

// put returns a session to the pool.
func (p *pool) put(ses exchange.Fetcher) {
p.lock.Lock()
defer p.lock.Unlock()

p.sessions = append(p.sessions, ses)
}

0 comments on commit 2cdf95b

Please sign in to comment.