diff --git a/config/settings.go b/config/settings.go index 143b763..95eb393 100644 --- a/config/settings.go +++ b/config/settings.go @@ -61,7 +61,7 @@ func LoadConfig() { // Numeric values with defaults config.MaxStreamPoolSize = getEnvAsInt("MAX_STREAM_POOL_SIZE", 50) - config.StreamHealthCheckTimeout = time.Duration(getEnvAsInt("STREAM_HEALTH_CHECK_TIMEOUT_MS", 2000)) * time.Millisecond + config.StreamHealthCheckTimeout = time.Duration(getEnvAsInt("STREAM_HEALTH_CHECK_TIMEOUT_MS", 5000)) * time.Millisecond config.StreamWriteTimeout = time.Duration(getEnvAsInt("STREAM_WRITE_TIMEOUT_MS", 5000)) * time.Millisecond config.MaxWriteRetries = getEnvAsInt("MAX_WRITE_RETRIES", 5) config.MaxConcurrentWrites = getEnvAsInt("MAX_CONCURRENT_WRITES", 1000) diff --git a/pkgs/service/libp2p_stream_pool.go b/pkgs/service/libp2p_stream_pool.go index 6c1d5cf..1d45c0a 100644 --- a/pkgs/service/libp2p_stream_pool.go +++ b/pkgs/service/libp2p_stream_pool.go @@ -99,27 +99,22 @@ func (p *StreamPool) GetStream() (network.Stream, error) { p.streams = p.streams[:len(p.streams)-1] // Verify stream is on current connection - if stream.Conn().ID() == SequencerHostConn.ID().String() { - if err := p.pingStream(stream); err == nil { - return stream, nil - } + if stream.Conn().ID() != SequencerHostConn.ID().String() { + stream.Close() + continue } - // Stream is from old connection or unhealthy - stream.Close() - } - // If pool is empty, wait and retry - if len(p.streams) == 0 { - p.mu.Unlock() // Release lock while waiting - time.Sleep(100 * time.Millisecond) // Wait before retry - p.mu.Lock() // Reacquire lock - continue + // Only do ping check if the connection looks good + if err := p.pingStream(stream); err == nil { + return stream, nil + } + stream.Close() } - // If we get here, try to create a new stream + // Create new stream if pool is empty stream, err := p.createNewStreamWithRetry() if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create new stream: %w", err) } return stream, nil } @@ -158,15 +153,14 @@ func (p *StreamPool) pingStream(stream network.Stream) error { log.Debugf("Failed to set stream deadline: %v", err) return fmt.Errorf("failed to set deadline: %w", err) } - defer stream.SetDeadline(time.Time{}) + defer stream.SetDeadline(time.Time{}) // Clear deadline - // Simple connectivity check - if stream.Reset() != nil { - log.Debug("Stream failed health check") + // Simply check if the connection is closed + if stream.Conn() == nil || stream.Conn().IsClosed() { + log.Debug("Stream failed health check - connection not alive") return fmt.Errorf("stream is not alive") } - log.Debug("Stream passed health check") return nil }