In this section, we will mainly discuss how Optimism utilizes libp2p to establish the p2p network in op-node. The p2p network primarily serves the purpose of passing information between different nodes. For instance, after a sequencer completes the construction of an unsafe block, it is disseminated through the p2p's gossiphub via pub/sub. libp2p also addresses other basics in the p2p network, such as networking, addressing, etc.
libp2p (shortened from "library peer-to-peer") is a framework geared towards peer-to-peer (P2P) networks that aids in the development of P2P applications. It encompasses a set of protocols, standards, and libraries, making P2P communication between network participants (also known as "peers") more straightforward (source). Initially, libp2p was introduced as a part of the IPFS (InterPlanetary File System) project. However, over time, it evolved into an independent project, becoming a modular network stack for distributed systems (source).
libp2p is an open-source initiative by the IPFS community, inviting extensive community contributions, including assisting in drafting specifications, coding implementations, and curating examples and tutorials (source). Comprised of various building blocks, each module of libp2p boasts clear, well-documented, and tested interfaces. This design allows them to be combinable, interchangeable, and hence upgradeable (source). The modularity of libp2p enables developers to pick and utilize only those components that are essential for their application, promoting flexibility and efficiency during the construction of P2P network applications.
The modular architecture and the open-source nature of libp2p offer a conducive environment for developing robust, scalable, and versatile P2P applications, marking its significance in the realm of distributed network and web application development.
When deploying libp2p, you would need to implement and configure certain core components to establish your P2P network. Here are some primary aspects of libp2p application:
- The foundational step is the creation and configuration of a libp2p node. This encompasses setting up the node's network address, identity, and other basic parameters. Essential usage code:
libp2p.New()
- Choose and configure your transport protocols (e.g., TCP, WebSockets, etc.) to ensure communication between nodes. Key usage code:
tcpTransport := tcp.NewTCPTransport()
- Implement multiplexing to allow handling multiple concurrent data streams on a single connection.
- Implement flow control to manage the transmission and processing rate of data. Key usage code:
yamuxTransport := yamux.New()
- Configure a secure transport layer to ensure the security and privacy of communications.
- Implement encryption and authentication mechanisms to protect data and verify communicators. Key usage code:
tlsTransport := tls.New()
- Define and implement custom protocols to handle specific network operations and message exchanges.
- Handle received messages and send responses as needed. Key usage code:
host.SetStreamHandler("/my-protocol/1.0.0", myProtocolHandler)
- Implement node discovery mechanisms to find other nodes in the network.
- Implement routing logic to determine how to route messages to the correct node in the network. Key usage code:
dht := kaddht.NewDHT(ctx, host, datastore.NewMapDatastore())
- Define and implement behaviors and policies for the network, such as connection management, error handling, and retry logic. Key usage code:
connManager := connmgr.NewConnManager(lowWater, highWater, gracePeriod)
- Manage the state of the node and the network, including connection states, node lists, and data storage. Key usage code:
peerstore := pstoremem.NewPeerstore()
- Write tests for your libp2p application to ensure its correctness and reliability.
- Use debugging tools and logs to diagnose and solve network issues. Key usage code:
logging.SetLogLevel("libp2p", "DEBUG")
- Consult the documentation of libp2p to understand its various components and APIs.
- Engage with the libp2p community for support and problem resolution.
The above are some primary aspects to consider and implement when using libp2p. The specific implementation might vary for each project, but these foundational elements are essential for building and running libp2p applications. When implementing these features, you can refer to the official documentation of libp2p and example codes and tutorials in the GitHub repository.
To understand the relationship between op-node and libp2p, we need to clarify a few questions
- Why choose libp2p? Why not devp2p (as used by geth)?
- What data or processes in OP-node are closely related to the p2p network?
- How are these functionalities implemented at the code level?
First, we need to understand why optimism requires a p2p network. libp2p is a modular network protocol, allowing developers to build decentralized peer-to-peer applications suitable for various use cases (source)(source). On the other hand, devp2p is mainly for the Ethereum ecosystem, tailored specifically for Ethereum applications (source). The flexibility and broad applicability of libp2p might make it a preferred choice for developers.
- To transmit unsafe blocks produced by the sequencer to other non-sequencer nodes.
- For fast synchronization (reverse chain synchronization) in other nodes under non-sequencer mode when gaps appear.
- To employ an integral reputation system to maintain a conducive environment among all nodes.
The host can be understood as the p2p node. When starting this node, some specific initialization configurations are required according to the project.
Now let's take a look at the Host
method in the op-node/p2p/host.go
file.
This function is primarily used to set up the libp2p host and various configurations. Below are the key parts of the function along with a brief description:
-
Check if P2P is Disabled
If P2P is disabled, the function will return immediately. -
Obtain Peer ID from the Public Key
Generate the Peer ID using the public key from the configuration. -
Initialize the Basic Peerstore
Create a basic Peerstore storage. -
Initialize the Extended Peerstore
Build an extended Peerstore on top of the basic one. -
Add Private and Public Keys to Peerstore
Store the Peer's private and public keys in the Peerstore. -
Initialize the Connection Gater
For controlling network connections. -
Initialize the Connection Manager
For managing network connections. -
Set up Transport and Listening Addresses
Set up the network transport protocols and the listening addresses for the host. -
Create the libp2p Host
Use all the preceding settings to create a new libp2p host. -
Initialize Static Peers
Initialize them if there are configured static peers. -
Return the Host
Finally, the function returns the configured libp2p host.
These key parts are responsible for the initialization and setup of the libp2p host, with each part catering to a specific aspect of the host's configuration.
func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics HostMetrics) (host.Host, error) {
if conf.DisableP2P {
return nil, nil
}
pub := conf.Priv.GetPublic()
pid, err := peer.IDFromPublicKey(pub)
if err != nil {
return nil, fmt.Errorf("failed to derive pubkey from network priv key: %w", err)
}
basePs, err := pstoreds.NewPeerstore(context.Background(), conf.Store, pstoreds.DefaultOpts())
if err != nil {
return nil, fmt.Errorf("failed to open peerstore: %w", err)
}
peerScoreParams := conf.PeerScoringParams()
var scoreRetention time.Duration
if peerScoreParams != nil {
// Use the same retention period as gossip will if available
scoreRetention = peerScoreParams.PeerScoring.RetainScore
} else {
// Disable score GC if peer scoring is disabled
scoreRetention = 0
}
ps, err := store.NewExtendedPeerstore(context.Background(), log, clock.SystemClock, basePs, conf.Store, scoreRetention)
if err != nil {
return nil, fmt.Errorf("failed to open extended peerstore: %w", err)
}
if err := ps.AddPrivKey(pid, conf.Priv); err != nil {
return nil, fmt.Errorf("failed to set up peerstore with priv key: %w", err)
}
if err := ps.AddPubKey(pid, pub); err != nil {
return nil, fmt.Errorf("failed to set up peerstore with pub key: %w", err)
}
var connGtr gating.BlockingConnectionGater
connGtr, err = gating.NewBlockingConnectionGater(conf.Store)
if err != nil {
return nil, fmt.Errorf("failed to open connection gater: %w", err)
}
connGtr = gating.AddBanExpiry(connGtr, ps, log, clock.SystemClock, metrics)
connGtr = gating.AddMetering(connGtr, metrics)
connMngr, err := DefaultConnManager(conf)
if err != nil {
return nil, fmt.Errorf("failed to open connection manager: %w", err)
}
listenAddr, err := addrFromIPAndPort(conf.ListenIP, conf.ListenTCPPort)
if err != nil {
return nil, fmt.Errorf("failed to make listen addr: %w", err)
}
tcpTransport := libp2p.Transport(
tcp.NewTCPTransport,
tcp.WithConnectionTimeout(time.Minute*60)) // break unused connections
// TODO: technically we can also run the node on websocket and QUIC transports. Maybe in the future?
var nat lconf.NATManagerC // disabled if nil
if conf.NAT {
nat = basichost.NewNATManager
}
opts := []libp2p.Option{
libp2p.Identity(conf.Priv),
// Explicitly set the user-agent, so we can differentiate from other Go libp2p users.
libp2p.UserAgent(conf.UserAgent),
tcpTransport,
libp2p.WithDialTimeout(conf.TimeoutDial),
// No relay services, direct connections between peers only.
libp2p.DisableRelay(),
// host will start and listen to network directly after construction from config.
libp2p.ListenAddrs(listenAddr),
libp2p.ConnectionGater(connGtr),
libp2p.ConnectionManager(connMngr),
//libp2p.ResourceManager(nil), // TODO use resource manager interface to manage resources per peer better.
libp2p.NATManager(nat),
libp2p.Peerstore(ps),
libp2p.BandwidthReporter(reporter), // may be nil if disabled
libp2p.MultiaddrResolver(madns.DefaultResolver),
// Ping is a small built-in libp2p protocol that helps us check/debug latency between peers.
libp2p.Ping(true),
// Help peers with their NAT reachability status, but throttle to avoid too much work.
libp2p.EnableNATService(),
libp2p.AutoNATServiceRateLimit(10, 5, time.Second*60),
}
opts = append(opts, conf.HostMux...)
if conf.NoTransportSecurity {
opts = append(opts, libp2p.Security(insecure.ID, insecure.NewWithIdentity))
} else {
opts = append(opts, conf.HostSecurity...)
}
h, err := libp2p.New(opts...)
if err != nil {
return nil, err
}
staticPeers := make([]*peer.AddrInfo, len(conf.StaticPeers))
for i, peerAddr := range conf.StaticPeers {
addr, err := peer.AddrInfoFromP2pAddr(peerAddr)
if err != nil {
return nil, fmt.Errorf("bad peer address: %w", err)
}
staticPeers[i] = addr
}
out := &extraHost{
Host: h,
connMgr: connMngr,
log: log,
staticPeers: staticPeers,
quitC: make(chan struct{}),
}
out.initStaticPeers()
if len(conf.StaticPeers) > 0 {
go out.monitorStaticPeers()
}
out.gater = connGtr
return out, nil
}
Gossip is used in distributed systems to ensure data consistency and address issues caused by multicasting. It's a communication protocol where information from one or more nodes is sent to another set of nodes in the network. This is useful when a group of clients in the network simultaneously requires the same data. When the sequencer produces blocks in an unsafe state, they are propagated to other nodes via the gossip network.
Let's first see where the node joins the gossip network. In op-node/p2p/node.go
, during node initialization, the init
method is called, which then invokes the JoinGossip method to join the gossip network.
func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer) error {
…
// note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, n.scorer, metrics, log)
if err != nil {
return fmt.Errorf("failed to start gossipsub router: %w", err)
}
n.gsOut, err = JoinGossip(resourcesCtx, n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn)
…
}
Next, let's move to op-node/p2p/gossip.go
.
Below is a simple overview of the main operations executed within the JoinGossip
function:
-
Validator Creation:
val
is assigned the result of theguardGossipValidator
function call. This is intended to create a validator for gossip messages, checking the validity of blocks propagated in the network.
-
Block Topic Name Generation:
- The
blocksTopicName
is generated using theblocksTopicV1
function, which formats a string based on theL2ChainID
in the configuration (cfg
). The formatted string follows a specific structure:/optimism/{L2ChainID}/0/blocks
.
- The
-
Topic Validator Registration:
- The
RegisterTopicValidator
method ofps
is invoked to registerval
as the block topic's validator. Some configuration options for the validator are also specified, such as a 3-second timeout and a concurrency level of 4.
- The
-
Joining the Topic:
- The function attempts to join the block gossip topic by calling
ps.Join(blocksTopicName)
. If there's an error, it returns an error message indicating the inability to join the topic.
- The function attempts to join the block gossip topic by calling
-
Event Handler Creation:
- It tries to create an event handler for the block topic by calling
blocksTopic.EventHandler()
. If there's an error, it returns an error message indicating the failure to create the handler.
- It tries to create an event handler for the block topic by calling
-
Logging Topic Events:
- A new goroutine is spawned to log topic events using the
LogTopicEvents
function.
- A new goroutine is spawned to log topic events using the
-
Topic Subscription:
- The function attempts to subscribe to the block gossip topic by calling
blocksTopic.Subscribe()
. If there's an error, it returns an error message indicating the inability to subscribe.
- The function attempts to subscribe to the block gossip topic by calling
-
Subscriber Creation:
- A
subscriber
is created using theMakeSubscriber
function, which encapsulates aBlocksHandler
that handles theOnUnsafeL2Payload
event fromgossipIn
. A new goroutine is spawned to run the providedsubscription
.
- A
-
Creation and Return of Publisher:
- An instance of
publisher
is created and returned, configured to use the provided configuration and block topic.
- An instance of
func JoinGossip(p2pCtx context.Context, self peer.ID, ps *pubsub.PubSub, log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, gossipIn GossipIn) (GossipOut, error) {
val := guardGossipValidator(log, logValidationResult(self, "validated block", log, BuildBlocksValidator(log, cfg, runCfg)))
blocksTopicName := blocksTopicV1(cfg) // return fmt.Sprintf("/optimism/%s/0/blocks", cfg.L2ChainID.String())
err := ps.RegisterTopicValidator(blocksTopicName,
val,
pubsub.WithValidatorTimeout(3*time.Second),
pubsub.WithValidatorConcurrency(4))
if err != nil {
return nil, fmt.Errorf("failed to register blocks gossip topic: %w", err)
}
blocksTopic, err := ps.Join(blocksTopicName)
if err != nil {
return nil, fmt.Errorf("failed to join blocks gossip topic: %w", err)
}
blocksTopicEvents, err := blocksTopic.EventHandler()
if err != nil {
return nil, fmt.Errorf("failed to create blocks gossip topic handler: %w", err)
}
go LogTopicEvents(p2pCtx, log.New("topic", "blocks"), blocksTopicEvents)
subscription, err := blocksTopic.Subscribe()
if err != nil {
return nil, fmt.Errorf("failed to subscribe to blocks gossip topic: %w", err)
}
subscriber := MakeSubscriber(log, BlocksHandler(gossipIn.OnUnsafeL2Payload))
go subscriber(p2pCtx, subscription)
return &publisher{log: log, cfg: cfg, blocksTopic: blocksTopic, runCfg: runCfg}, nil
}
Thus, a non-sequencer node's subscription is established. Next, let's shift our focus to the nodes in sequencer mode and see how they broadcast the blocks.
op-node/rollup/driver/state.go
Within the event loop, it waits for the generation of new payloads in sequencer mode (unsafe blocks) through a loop. Subsequently, this payload is propagated to the gossip network via PublishL2Payload
.
func (s *Driver) eventLoop() {
…
for(){
…
select {
case <-sequencerCh:
payload, err := s.sequencer.RunNextSequencerAction(ctx)
if err != nil {
s.log.Error("Sequencer critical error", "err", err)
return
}
if s.network != nil && payload != nil {
// Publishing of unsafe data via p2p is optional.
// Errors are not severe enough to change/halt sequencing but should be logged and metered.
if err := s.network.PublishL2Payload(ctx, payload); err != nil {
s.log.Warn("failed to publish newly created block", "id", payload.ID(), "err", err)
s.metrics.RecordPublishingError()
}
}
planSequencerAction() // schedule the next sequencer action to keep the sequencing looping
…
}
…
}
…
}
Thus, a new payload has entered the gossip network.
Within libp2p's pubsub system, nodes first receive messages from other nodes and then check the validity of those messages. If the message is valid and aligns with the node's subscription criteria, the node considers forwarding it to other nodes. Based on certain strategies, such as network topology and the subscriptions of nodes, a node decides whether or not to forward a message. If it decides to forward, the node sends the message to all nodes it's connected to that have subscribed to the same topic. During forwarding, to prevent the message from looping infinitely within the network, there are typically mechanisms in place to track already forwarded messages, ensuring a message isn't forwarded multiple times. Additionally, a message might have a "Time To Live" (TTL) attribute, which defines the number or time a message can be forwarded in the network. Every time a message is forwarded, its TTL value decreases until it's no longer forwarded. Regarding validation, messages typically go through some validation processes, such as checking the message's signature and format, to ensure its integrity and authenticity. In libp2p's pubsub model, this process ensures widespread propagation of messages to many nodes in the network while preventing infinite loops and network congestion, achieving efficient message delivery and processing.
Similar to L1, nodes validate blocks upon receipt. The major difference in Optimism is that, while in L1 the validation involves signatures from multiple selected beacon nodes, in Optimism it only involves the signature of the sequencer node.
Let's illustrate this with the process of signing and verifying signatures:
- When the sequencer publishes a block via the P2P network, the sequencer signs the block.
func (p *publisher) PublishL2Payload(ctx context.Context, envelope *eth.ExecutionPayloadEnvelope, signer Signer) error {
……
sig, err := signer.Sign(ctx, SigningDomainBlocksV1, p.cfg.L2ChainID, payloadData)
if err != nil {
return fmt.Errorf("failed to sign execution payload with signer: %w", err)
}
copy(data[:65], sig[:])
……
- When a verifier receives the block, they check if the signer is the sequencer's signing address.
func verifyBlockSignature(log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, id peer.ID, signatureBytes []byte, payloadBytes []byte) pubsub.ValidationResult {
signingHash, err := BlockSigningHash(cfg, payloadBytes)
if err != nil {
log.Warn("failed to compute block signing hash", "err", err, "peer", id)
return pubsub.ValidationReject
}
pub, err := crypto.SigToPub(signingHash[:], signatureBytes)
if err != nil {
log.Warn("invalid block signature", "err", err, "peer", id)
return pubsub.ValidationReject
}
addr := crypto.PubkeyToAddress(*pub)
// In the future we may load & validate block metadata before checking the signature.
// And then check the signer based on the metadata, to support e.g. multiple p2p signers at the same time.
// For now we only have one signer at a time and thus check the address directly.
// This means we may drop old payloads upon key rotation,
// but this can be recovered from like any other missed unsafe payload.
if expected := runCfg.P2PSequencerAddress(); expected == (common.Address{}) {
log.Warn("no configured p2p sequencer address, ignoring gossiped block", "peer", id, "addr", addr)
return pubsub.ValidationIgnore
} else if addr != expected {
log.Warn("unexpected block author", "err", err, "peer", id, "addr", addr, "expected", expected)
return pubsub.ValidationReject
}
return pubsub.ValidationAccept
}
When a node, due to special circumstances like going down and reconnecting, might end up with some unsynchronized blocks (gaps), it can quickly sync using the p2p network's reverse chain method.
Let's look at the checkForGapInUnsafeQueue
function in op-node/rollup/driver/state.go
.
This code segment defines a method named checkForGapInUnsafeQueue
which belongs to the Driver
struct. Its purpose is to check if there's a gap in a queue named "unsafe queue" and attempts to retrieve the missing payloads through an alternate sync method named altSync
. Here's the key point: the method ensures data continuity and tries to fetch missing data from other sync methods when a data gap is detected. The main steps of the function are:
- The function first gets
UnsafeL2Head
andUnsafeL2SyncTarget
froms.derivation
to define the range's starting and ending points for the check. - The function checks if there are missing data blocks between
start
andend
by comparing theNumber
values ofend
andstart
. - If a data gap is detected, the function tries to request the missing data range by calling
s.altSync.RequestL2Range(ctx, start, end)
. Ifend
is a null reference (i.e.,eth.L2BlockRef{}
), the function requests a sync with an open-ended range starting fromstart
. - While making the data request, the function logs a debug message detailing the data range it's requesting.
- The function finally returns an error value. It will return
nil
if no error is present.
// checkForGapInUnsafeQueue checks if there's a gap in the unsafe queue and tries to retrieve the missing payloads using an alt-sync method.
// WARNING: This is just an outgoing signal; there's no guarantee that blocks will be retrieved.
// Results are received through OnUnsafeL2Payload.
func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) error {
start := s.derivation.UnsafeL2Head()
end := s.derivation.UnsafeL2SyncTarget()
// Check for missing blocks between start and end and request them if found.
if end == (eth.L2BlockRef{}) {
s.log.Debug("requesting sync with open-ended range", "start", start)
return s.altSync.RequestL2Range(ctx, start, eth.L2BlockRef{})
} else if end.Number > start.Number+1 {
s.log.Debug("requesting missing unsafe L2 block range", "start", start, "end", end, "size", end.Number-start.Number)
return s.altSync.RequestL2Range(ctx, start, end)
}
return nil
}
The RequestL2Range
function passes the beginning and ending signals of the requested blocks to the requests
channel.
It then distributes the request to the peerRequests
channel through the onRangeRequest
method. The peerRequests
channel is awaited by loops opened by multiple peers, meaning each dispatch is handled by a single peer.
func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) {
…
for i := uint64(0); ; i++ {
num := req.end.Number - 1 - i
if num <= req.start {
return
}
// check if we have something in quarantine already
if h, ok := s.quarantineByNum[num]; ok {
if s.trusted.Contains(h) { // if we trust it, try to promote it.
s.tryPromote(h)
}
// Don't fetch things that we have a candidate for already.
// We'll evict it from quarantine by finding a conflict, or if we sync enough other blocks
continue
}
if _, ok := s.inFlight[num]; ok {
log.Debug("request still in-flight, not rescheduling sync request", "num", num)
continue // request still in flight
}
pr := peerRequest{num: num, complete: new(atomic.Bool)}
log.Debug("Scheduling P2P block request", "num", num)
// schedule number
select {
case s.peerRequests <- pr:
s.inFlight[num] = pr.complete
case <-ctx.Done():
log.Info("did not schedule full P2P sync range", "current", num, "err", ctx.Err())
return
default: // peers may all be busy processing requests already
log.Info("no peers ready to handle block requests for more P2P requests for L2 block history", "current", num)
return
}
}
}
Next, let's look at how a peer handles the request when received.
First and foremost, it's essential to understand that the connection or message passage between the peer and the requesting node is carried out through libp2p's stream. The receiving peer node implements the stream's handling method, while the sending node initiates the stream creation.
From the previous init
function, we see code snippets like the following. Here, MakeStreamHandler
returns a handling function. SetStreamHandler
binds the protocol id with this handling function. Thus, every time the sending node creates and utilizes this stream, the returned handling function is triggered.
n.syncSrv = NewReqRespServer(rollupCfg, l2Chain, metrics)
// register the sync protocol with libp2p host
payloadByNumber := MakeStreamHandler(resourcesCtx, log.New("serve", "payloads_by_number"), n.syncSrv.HandleSyncRequest)
n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber)
Now, let's delve into how the handler function processes the request.
The function first performs global and personal rate-limiting checks to control the speed of handling requests. It then reads and verifies the block number of the request, ensuring it falls within a reasonable range. Subsequently, the function retrieves the requested block payload from the L2 layer and writes it into the response stream. While writing the response data, it sets a write deadline to prevent being blocked by slow peer connections during the write process. Ultimately, the function returns the requested block number and any potential errors.
func (srv *ReqRespServer) handleSyncRequest(ctx context.Context, stream network.Stream) (uint64, error) {
peerId := stream.Conn().RemotePeer()
// take a token from the global rate-limiter,
// to make sure there's not too much concurrent server work between different peers.
if err := srv.globalRequestsRL.Wait(ctx); err != nil {
return 0, fmt.Errorf("timed out waiting for global sync rate limit: %w", err)
}
// find rate limiting data of peer, or add otherwise
srv.peerStatsLock.Lock()
ps, _ := srv.peerRateLimits.Get(peerId)
if ps == nil {
ps = &peerStat{
Requests: rate.NewLimiter(peerServerBlocksRateLimit, peerServerBlocksBurst),
}
srv.peerRateLimits.Add(peerId, ps)
ps.Requests.Reserve() // count the hit, but make it delay the next request rather than immediately waiting
} else {
// Only wait if it's an existing peer, otherwise the instant rate-limit Wait call always errors.
// If the requester thinks we're taking too long, then it's their problem and they can disconnect.
// We'll disconnect ourselves only when failing to read/write,
// if the work is invalid (range validation), or when individual sub tasks timeout.
if err := ps.Requests.Wait(ctx); err != nil {
return 0, fmt.Errorf("timed out waiting for global sync rate limit: %w", err)
}
}
srv.peerStatsLock.Unlock()
// Set read deadline, if available
_ = stream.SetReadDeadline(time.Now().Add(serverReadRequestTimeout))
// Read the request
var req uint64
if err := binary.Read(stream, binary.LittleEndian, &req); err != nil {
return 0, fmt.Errorf("failed to read requested block number: %w", err)
}
if err := stream.CloseRead(); err != nil {
return req, fmt.Errorf("failed to close reading-side of a P2P sync request call: %w", err)
}
// Check the request is within the expected range of blocks
if req < srv.cfg.Genesis.L2.Number {
return req, fmt.Errorf("cannot serve request for L2 block %d before genesis %d: %w", req, srv.cfg.Genesis.L2.Number, invalidRequestErr)
}
max, err := srv.cfg.TargetBlockNumber(uint64(time.Now().Unix()))
if err != nil {
return req, fmt.Errorf("cannot determine max target block number to verify request: %w", invalidRequestErr)
}
if req > max {
return req, fmt.Errorf("cannot serve request for L2 block %d after max expected block (%v): %w", req, max, invalidRequestErr)
}
payload, err := srv.l2.PayloadByNumber(ctx, req)
if err != nil {
if errors.Is(err, ethereum.NotFound) {
return req, fmt.Errorf("peer requested unknown block by number: %w", err)
} else {
return req, fmt.Errorf("failed to retrieve payload to serve to peer: %w", err)
}
}
// We set write deadline, if available, to safely write without blocking on a throttling peer connection
_ = stream.SetWriteDeadline(time.Now().Add(serverWriteChunkTimeout))
// 0 - resultCode: success = 0
// 1:5 - version: 0
var tmp [5]byte
if _, err := stream.Write(tmp[:]); err != nil {
return req, fmt.Errorf("failed to write response header data: %w", err)
}
w := snappy.NewBufferedWriter(stream)
if _, err := payload.MarshalSSZ(w); err != nil {
return req, fmt.Errorf("failed to write payload to sync response: %w", err)
}
if err := w.Close(); err != nil {
return req, fmt.Errorf("failed to finishing writing payload to sync response: %w", err)
}
return req, nil
}
Up to this point, the overall process of reverse chain sync requests and handling has been explained.
To prevent certain nodes from making malicious requests and responses that could compromise the security of the entire network, Optimism has also employed a scoring system.
For instance, in the file op-node/p2p/app_scores.go
, there is a series of functions set up to score peers.
func (s *peerApplicationScorer) onValidResponse(id peer.ID) {
_, err := s.scorebook.SetScore(id, store.IncrementValidResponses{Cap: s.params.ValidResponseCap})
if err != nil {
s.log.Error("Unable to update peer score", "peer", id, "err", err)
return
}
}
func (s *peerApplicationScorer) onResponseError(id peer.ID) {
_, err := s.scorebook.SetScore(id, store.IncrementErrorResponses{Cap: s.params.ErrorResponseCap})
if err != nil {
s.log.Error("Unable to update peer score", "peer", id, "err", err)
return
}
}
func (s *peerApplicationScorer) onRejectedPayload(id peer.ID) {
_, err := s.scorebook.SetScore(id, store.IncrementRejectedPayloads{Cap: s.params.RejectedPayloadCap})
if err != nil {
s.log.Error("Unable to update peer score", "peer", id, "err", err)
return
}
}
Then before adding a new node, its points will be checked
func AddScoring(gater BlockingConnectionGater, scores Scores, minScore float64) *ScoringConnectionGater {
return &ScoringConnectionGater{BlockingConnectionGater: gater, scores: scores, minScore: minScore}
}
func (g *ScoringConnectionGater) checkScore(p peer.ID) (allow bool) {
score, err := g.scores.GetPeerScore(p)
if err != nil {
return false
}
return score >= g.minScore
}
func (g *ScoringConnectionGater) InterceptPeerDial(p peer.ID) (allow bool) {
return g.BlockingConnectionGater.InterceptPeerDial(p) && g.checkScore(p)
}
func (g *ScoringConnectionGater) InterceptAddrDial(id peer.ID, ma multiaddr.Multiaddr) (allow bool) {
return g.BlockingConnectionGater.InterceptAddrDial(id, ma) && g.checkScore(id)
}
func (g *ScoringConnectionGater) InterceptSecured(dir network.Direction, id peer.ID, mas network.ConnMultiaddrs) (allow bool) {
return g.BlockingConnectionGater.InterceptSecured(dir, id, mas) && g.checkScore(id)
}
The high configurability of libp2p allows for a high degree of customization and modularity in the project's p2p system. The above illustrates the primary logic behind Optimism's personalized implementation of libp2p. For further details, one can delve deeper into the p2p directory by examining the source code.