Skip to content

Commit

Permalink
chore: simplify ping stream
Browse files Browse the repository at this point in the history
  • Loading branch information
anomit committed Nov 6, 2024
1 parent d7bd914 commit 28bae07
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 21 deletions.
2 changes: 1 addition & 1 deletion config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func LoadConfig() {

// Numeric values with defaults
config.MaxStreamPoolSize = getEnvAsInt("MAX_STREAM_POOL_SIZE", 50)
config.StreamHealthCheckTimeout = time.Duration(getEnvAsInt("STREAM_HEALTH_CHECK_TIMEOUT_MS", 2000)) * time.Millisecond
config.StreamHealthCheckTimeout = time.Duration(getEnvAsInt("STREAM_HEALTH_CHECK_TIMEOUT_MS", 5000)) * time.Millisecond
config.StreamWriteTimeout = time.Duration(getEnvAsInt("STREAM_WRITE_TIMEOUT_MS", 5000)) * time.Millisecond
config.MaxWriteRetries = getEnvAsInt("MAX_WRITE_RETRIES", 5)
config.MaxConcurrentWrites = getEnvAsInt("MAX_CONCURRENT_WRITES", 1000)
Expand Down
34 changes: 14 additions & 20 deletions pkgs/service/libp2p_stream_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,27 +99,22 @@ func (p *StreamPool) GetStream() (network.Stream, error) {
p.streams = p.streams[:len(p.streams)-1]

// Verify stream is on current connection
if stream.Conn().ID() == SequencerHostConn.ID().String() {
if err := p.pingStream(stream); err == nil {
return stream, nil
}
if stream.Conn().ID() != SequencerHostConn.ID().String() {
stream.Close()
continue
}
// Stream is from old connection or unhealthy
stream.Close()
}

// If pool is empty, wait and retry
if len(p.streams) == 0 {
p.mu.Unlock() // Release lock while waiting
time.Sleep(100 * time.Millisecond) // Wait before retry
p.mu.Lock() // Reacquire lock
continue
// Only do ping check if the connection looks good
if err := p.pingStream(stream); err == nil {
return stream, nil
}
stream.Close()
}

// If we get here, try to create a new stream
// Create new stream if pool is empty
stream, err := p.createNewStreamWithRetry()
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create new stream: %w", err)
}
return stream, nil
}
Expand Down Expand Up @@ -158,15 +153,14 @@ func (p *StreamPool) pingStream(stream network.Stream) error {
log.Debugf("Failed to set stream deadline: %v", err)
return fmt.Errorf("failed to set deadline: %w", err)
}
defer stream.SetDeadline(time.Time{})
defer stream.SetDeadline(time.Time{}) // Clear deadline

// Simple connectivity check
if stream.Reset() != nil {
log.Debug("Stream failed health check")
// Simply check if the connection is closed
if stream.Conn() == nil || stream.Conn().IsClosed() {
log.Debug("Stream failed health check - connection not alive")
return fmt.Errorf("stream is not alive")
}

log.Debug("Stream passed health check")
return nil
}

Expand Down

0 comments on commit 28bae07

Please sign in to comment.