Skip to content

Commit

Permalink
rpc: clean up IPC handler (ethereum#16524)
Browse files Browse the repository at this point in the history
  • Loading branch information
gzliudan committed Nov 4, 2024
1 parent 6b653a2 commit 56bce39
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 50 deletions.
23 changes: 7 additions & 16 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (n *Node) Register(constructor ServiceConstructor) error {
return nil
}

// Start create a live P2P node and starts running it.
// Start creates a live P2P node and starts running it.
func (n *Node) Start() error {
n.lock.Lock()
defer n.lock.Unlock()
Expand Down Expand Up @@ -217,7 +217,7 @@ func (n *Node) Start() error {
// Mark the service started for potential cleanup
started = append(started, kind)
}
// Lastly start the configured RPC interfaces
// Lastly, start the configured RPC interfaces
if err := n.startRPC(services); err != nil {
for _, service := range services {
service.Stop()
Expand Down Expand Up @@ -252,7 +252,7 @@ func (n *Node) openDataDir() error {
return nil
}

// startRPC is a helper method to start all the various RPC endpoint during node
// startRPC is a helper method to start all the various RPC endpoints during node
// startup. It's not meant to be called at any time afterwards as it makes certain
// assumptions about the state of the node.
func (n *Node) startRPC(services map[reflect.Type]Service) error {
Expand Down Expand Up @@ -293,7 +293,7 @@ func (n *Node) startInProc(apis []rpc.API) error {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
n.log.Debug("InProc registered", "service", api.Service, "namespace", api.Namespace)
n.log.Debug("InProc registered", "namespace", api.Namespace)
}
n.inprocHandler = handler
return nil
Expand All @@ -320,22 +320,13 @@ func (n *Node) RegisterAPIs(apis []rpc.API) {

// startIPC initializes and starts the IPC RPC endpoint.
func (n *Node) startIPC(apis []rpc.API) error {
// Short circuit if the IPC endpoint isn't being exposed
if n.ipcEndpoint == "" {
return nil
}
isClosed := func() bool {
n.lock.RLock()
defer n.lock.RUnlock()
return n.ipcListener == nil
return nil // IPC disabled.
}

listener, handler, err := rpc.StartIPCEndpoint(isClosed, n.ipcEndpoint, apis)
listener, handler, err := rpc.StartIPCEndpoint(n.ipcEndpoint, apis)
if err != nil {
return err
}

// All listeners booted successfully
n.ipcListener = listener
n.ipcHandler = handler
log.Info("IPC endpoint opened", "url", n.ipcEndpoint)
Expand All @@ -348,7 +339,7 @@ func (n *Node) stopIPC() {
n.ipcListener.Close()
n.ipcListener = nil

n.log.Info("IPC endpoint closed", "endpoint", n.ipcEndpoint)
n.log.Info("IPC endpoint closed", "url", n.ipcEndpoint)
}
if n.ipcHandler != nil {
n.ipcHandler.Stop()
Expand Down
29 changes: 4 additions & 25 deletions rpc/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,10 @@ func StartWSEndpoint(endpoint string, apis []API, modules []string, wsOrigins []
}
go NewWSServer(wsOrigins, handler).Serve(listener)
return listener, handler, err

}

// StartIPCEndpoint starts an IPC endpoint.
func StartIPCEndpoint(isClosedFn func() bool, ipcEndpoint string, apis []API) (net.Listener, *Server, error) {
func StartIPCEndpoint(ipcEndpoint string, apis []API) (net.Listener, *Server, error) {
// Register all the APIs exposed by the services.
var (
handler = NewServer()
Expand All @@ -104,30 +103,10 @@ func StartIPCEndpoint(isClosedFn func() bool, ipcEndpoint string, apis []API) (n
}
log.Debug("IPCs registered", "namespaces", strings.Join(registered, ","))
// All APIs registered, start the IPC listener.
var (
listener net.Listener
err error
)
if listener, err = CreateIPCListener(ipcEndpoint); err != nil {
listener, err := ipcListen(ipcEndpoint)
if err != nil {
return nil, nil, err
}
go func() {
for {
conn, err := listener.Accept()
if err != nil {
// Terminate if the listener was closed
if isClosedFn() {
log.Info("IPC closed", "err", err)
return
}
// Not closed, just some error; report and continue
log.Error("IPC accept failed", "err", err)
continue
}
log.Trace("Accepted RPC connection", "conn", conn.RemoteAddr())
go handler.ServeCodec(NewCodec(conn), 0)
}
}()

go handler.ServeListener(listener)
return listener, handler, nil
}
12 changes: 3 additions & 9 deletions rpc/ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,17 @@ import (
"github.com/XinFinOrg/XDPoSChain/p2p/netutil"
)

// CreateIPCListener creates an listener, on Unix platforms this is a unix socket, on
// Windows this is a named pipe
func CreateIPCListener(endpoint string) (net.Listener, error) {
return ipcListen(endpoint)
}

// ServeListener accepts connections on l, serving JSON-RPC on them.
// ServeListener accepts connections on l, serving IPC-RPC on them.
func (s *Server) ServeListener(l net.Listener) error {
for {
conn, err := l.Accept()
if netutil.IsTemporaryError(err) {
log.Warn("RPC accept error", "err", err)
log.Warn("IPC accept error", "err", err)
continue
} else if err != nil {
return err
}
log.Trace("Accepted RPC connection", "conn", conn.RemoteAddr())
log.Trace("IPC accepted connection")
go s.ServeCodec(NewCodec(conn), 0)
}
}
Expand Down

0 comments on commit 56bce39

Please sign in to comment.