Skip to content

Commit

Permalink
feat: elaborate cleanup of host, connections and streams to clear up …
Browse files Browse the repository at this point in the history
…resources
  • Loading branch information
anomit committed Oct 28, 2024
1 parent 7da90e5 commit c6f80ba
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 42 deletions.
12 changes: 7 additions & 5 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ func main() {
for {
select {
case <-ticker.C:
// Check connection and attempt to create a test stream
// Check both basic connection status and resource limits
if service.SequencerHostConn.Network().Connectedness(service.SequencerId) != network.Connected ||
service.IsResourceLimitExceeded() {
log.Warn("Lost connection to sequencer or resource limit exceeded. Attempting to reconnect...")
if err := service.ConnectToSequencer(); err != nil {
log.Errorf("Failed to reconnect to sequencer: %v", err)

log.Warn("Connection issue or resource limit exceeded. Attempting atomic reset...")

if err := service.AtomicConnectionReset(); err != nil {
log.Errorf("Failed to perform atomic reset: %v", err)
} else {
log.Info("Successfully reconnected to sequencer")
log.Info("Successfully completed atomic reset")
}
}
}
Expand Down
79 changes: 79 additions & 0 deletions pkgs/service/connection_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ package service

import (
"context"
"fmt"
"proto-snapshot-server/config"
"strings"
"time"

"github.com/libp2p/go-libp2p/core/network"
log "github.com/sirupsen/logrus"
)

// IsResourceLimitExceeded attempts to create a regular stream to check if we're hitting resource limits
Expand All @@ -16,3 +22,76 @@ func IsResourceLimitExceeded() bool {
defer stream.Close()
return false
}

// ClearExistingConnections closes all existing streams and connections
// to free up resources before attempting reconnection
func ClearExistingConnections() {
// Close all streams in the pool first
pool := GetLibp2pStreamPool()
if pool != nil {
log.Info("Clearing stream pool")
pool.Stop()
}

// Close existing connection to sequencer
if SequencerHostConn != nil {
log.Info("Closing existing sequencer connection")

// Close all active streams first
for _, conn := range SequencerHostConn.Network().Conns() {
streams := conn.GetStreams()
for _, stream := range streams {
if err := stream.Reset(); err != nil {
log.Warnf("Error resetting stream: %v", err)
}
}
}

// Remove all connection handlers
SequencerHostConn.RemoveStreamHandler("/collect")

// Close all network connections
if err := SequencerHostConn.Network().Close(); err != nil {
log.Warnf("Error closing network: %v", err)
}

// Finally close the host
if err := SequencerHostConn.Close(); err != nil {
log.Warnf("Error closing sequencer connection: %v", err)
}
}

// Give some time for resources to be freed
time.Sleep(2 * time.Second)
}

// AtomicConnectionReset performs a complete reset of host, connections, and streams
func AtomicConnectionReset() error {
log.Info("Performing atomic connection reset...")

// Use mutex to ensure thread safety during reset
libp2pStreamPoolMu.Lock()
defer libp2pStreamPoolMu.Unlock()

// Clear all existing connections and streams
ClearExistingConnections()

// Reconfigure the relayer to get fresh host connection
ConfigureRelayer()

// Attempt reconnection with fresh state
if err := ConnectToSequencer(); err != nil {
return fmt.Errorf("failed to reconnect after atomic reset: %w", err)
}

// Reinitialize the stream pool with the new host connection
createStream := func() (network.Stream, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return SequencerHostConn.NewStream(ctx, SequencerId, "/collect")
}
InitLibp2pStreamPool(config.SettingsObj.MaxStreamPoolSize, createStream, SequencerId)

log.Info("Atomic connection reset completed successfully")
return nil
}
64 changes: 49 additions & 15 deletions pkgs/service/libp2p_stream_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@ import (
log "github.com/sirupsen/logrus"
)

type streamPool struct {
// Global variables for service-wide access
var (
libp2pStreamPool *StreamPool
libp2pStreamPoolMu sync.RWMutex
)

// StreamPool manages a pool of libp2p network streams
type StreamPool struct {
mu sync.Mutex
streams []network.Stream
maxSize int
Expand All @@ -23,9 +30,9 @@ type streamPool struct {
cancel context.CancelFunc
}

func newStreamPool(maxSize int, createStream func() (network.Stream, error), sequencerID peer.ID) *streamPool {
func newStreamPool(maxSize int, createStream func() (network.Stream, error), sequencerID peer.ID) *StreamPool {
ctx, cancel := context.WithCancel(context.Background())
return &streamPool{
return &StreamPool{
streams: make([]network.Stream, 0, maxSize),
maxSize: maxSize,
createStream: createStream,
Expand All @@ -35,7 +42,19 @@ func newStreamPool(maxSize int, createStream func() (network.Stream, error), seq
}
}

func (p *streamPool) GetStream() (network.Stream, error) {
func InitLibp2pStreamPool(maxSize int, createStream func() (network.Stream, error), sequencerID peer.ID) {
libp2pStreamPoolMu.Lock()
defer libp2pStreamPoolMu.Unlock()
libp2pStreamPool = newStreamPool(maxSize, createStream, sequencerID)
}

func GetLibp2pStreamPool() *StreamPool {
libp2pStreamPoolMu.RLock()
defer libp2pStreamPoolMu.RUnlock()
return libp2pStreamPool
}

func (p *StreamPool) GetStream() (network.Stream, error) {
p.mu.Lock()
defer p.mu.Unlock()

Expand All @@ -51,7 +70,7 @@ func (p *streamPool) GetStream() (network.Stream, error) {
return p.createNewStreamWithRetry()
}

func (p *streamPool) ReturnStream(stream network.Stream) {
func (p *StreamPool) ReturnStream(stream network.Stream) {
p.mu.Lock()
defer p.mu.Unlock()

Expand All @@ -67,7 +86,7 @@ func (p *streamPool) ReturnStream(stream network.Stream) {
}
}

func (p *streamPool) pingStream(stream network.Stream) error {
func (p *StreamPool) pingStream(stream network.Stream) error {
if err := stream.SetDeadline(time.Now().Add(500 * time.Millisecond)); err != nil {
return err
}
Expand All @@ -77,27 +96,34 @@ func (p *streamPool) pingStream(stream network.Stream) error {
return err
}

func (p *streamPool) createNewStreamWithRetry() (network.Stream, error) {
func (p *StreamPool) createNewStreamWithRetry() (network.Stream, error) {
var stream network.Stream
var err error

operation := func() error {
// Check connection status before attempting to create stream
if SequencerHostConn.Network().Connectedness(p.sequencerID) != network.Connected {
log.Warn("Connection to sequencer not established, attempting to reconnect...")
if err := ConnectToSequencer(); err != nil {
return fmt.Errorf("failed to reconnect to sequencer: %w", err)
if err := AtomicConnectionReset(); err != nil {
return fmt.Errorf("failed to perform atomic reset: %w", err)
}
log.Info("Successfully reconnected to sequencer")
}

stream, err = p.createStream()
if err != nil {
if strings.Contains(err.Error(), "resource limit exceeded") {
log.Warn("Resource limit exceeded, forcing reconnection to sequencer...")
if err := ConnectToSequencer(); err != nil {
log.Errorf("Failed to reconnect to sequencer: %v", err)
log.Warn("Resource limit exceeded, performing atomic reset...")

if err := AtomicConnectionReset(); err != nil {
return fmt.Errorf("atomic reset failed: %w", err)
}

// Try creating the stream again with fresh connection
stream, err = p.createStream()
if err != nil {
return fmt.Errorf("failed to create stream even after reset: %w", err)
}
return nil
}
log.Warnf("Failed to create stream: %v. Retrying...", err)
return err
Expand All @@ -116,19 +142,27 @@ func (p *streamPool) createNewStreamWithRetry() (network.Stream, error) {
return stream, nil
}

func (p *streamPool) Stop() {
// Modified stream pool cleanup to be more aggressive
func (p *StreamPool) Stop() {
p.mu.Lock()
defer p.mu.Unlock()

p.cancel()

// Aggressively close all streams
for _, stream := range p.streams {
if err := stream.Reset(); err != nil {
log.Warnf("Error resetting stream: %v", err)
}
stream.Close()
}
p.streams = nil

// Wait a moment for cleanup
time.Sleep(1 * time.Second)
}

func (p *streamPool) RemoveStream(s network.Stream) {
func (p *StreamPool) RemoveStream(s network.Stream) {
p.mu.Lock()
defer p.mu.Unlock()

Expand Down
28 changes: 6 additions & 22 deletions pkgs/service/msg_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// server is used to implement submission.SubmissionService.
type server struct {
pkgs.UnimplementedSubmissionServer
streamPool *streamPool
streamPool *StreamPool
limiter *rate.Limiter
}

Expand Down Expand Up @@ -70,30 +70,14 @@ func NewMsgServerImplV2() pkgs.SubmissionServer {
createStream := func() (network.Stream, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

log.Debugf("Attempting to create new stream to sequencer: %s", sequencerID)
stream, err := SequencerHostConn.NewStream(
ctx,
sequencerID,
"/collect",
)
if err != nil {
log.Errorf("Failed to create stream: %v", err)
// Check if we're still connected to the sequencer
if SequencerHostConn.Network().Connectedness(sequencerID) != network.Connected {
log.Warn("Lost connection to sequencer. Attempting to reconnect...")
if err := ConnectToSequencer(); err != nil {
log.Errorf("Failed to reconnect to sequencer: %v", err)
}
}
return nil, fmt.Errorf("failed to create stream: %w", err)
}
log.Debug("Successfully created new stream")
return stream, nil
return SequencerHostConn.NewStream(ctx, sequencerID, "/collect")
}

// Initialize the global stream pool
InitLibp2pStreamPool(config.SettingsObj.MaxStreamPoolSize, createStream, sequencerID)

s := &server{
streamPool: newStreamPool(config.SettingsObj.MaxStreamPoolSize, createStream, sequencerID),
streamPool: GetLibp2pStreamPool(), // Use the global pool instead of creating a new one
limiter: rate.NewLimiter(rate.Limit(300), 50), // Adjusted rate limit
}
return s
Expand Down

0 comments on commit c6f80ba

Please sign in to comment.