Skip to content

Commit

Permalink
rollup: split loop (ethereum#217)
Browse files Browse the repository at this point in the history
* rollup: split loop

* rollup: fix comment
  • Loading branch information
tynes authored Feb 23, 2021
1 parent 02a4ed1 commit e7a13b2
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 113 deletions.
218 changes: 107 additions & 111 deletions rollup/sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ func (s *SyncService) Start() error {
s.setSyncStatus(false)
}

go s.Loop()
if s.verifier {
go s.VerifierLoop()
} else {
go s.SequencerLoop()
}
return nil
}

Expand Down Expand Up @@ -274,106 +278,11 @@ func (s *SyncService) Stop() error {
return nil
}

// Loop is the main processing loop for the sync service.
// If running as a sequencer, it will pull in any enqueue transactions
// and apply them. It pulls in as many sequential enqueue transactions
// at once and applies them sequentially as to create the most efficient
// batch contexts. Note that this function assumes that the historical
// state has already been synced.
func (s *SyncService) Loop() {
log.Info("Starting Tip processing loop", "poll-interval", s.pollInterval, "timestamp-refresh-threshold", s.timestampRefreshThreshold)
func (s *SyncService) VerifierLoop() {
log.Info("Starting Verifier Loop", "poll-interval", s.pollInterval, "timestamp-refresh-threshold", s.timestampRefreshThreshold)
for {
// Only the sequencer needs to poll for enqueue transactions
// and then can choose when to apply them. We choose to apply
// transactions such that it makes for efficient batch submitting.
// Place as many L1ToL2 transactions in the same context as possible
// by executing them one after another.
if !s.verifier {
// TODO: break this routine out into a function so that lock
// management is more simple. For now, be sure to unlock before
// each outer continue
s.txLock.Lock()
latest, err := s.client.GetLatestEnqueue()
if err != nil {
log.Error("Cannot get latest enqueue")
s.txLock.Unlock()
time.Sleep(s.pollInterval)
continue
}
// This should never happen unless the backend is empty
if latest == nil {
log.Debug("No enqueue transactions found")
s.txLock.Unlock()
time.Sleep(s.pollInterval)
continue
}
// Compare the remote latest queue index to the local latest
// queue index. If the remote latest queue index is greater
// than the local latest queue index, be sure to ingest more
// enqueued transactions
var start uint64
if s.GetLatestEnqueueIndex() == nil {
start = 0
} else {
start = *s.GetLatestEnqueueIndex() + 1
}
end := *latest.GetMeta().QueueIndex

log.Info("Polling enqueued transactions", "start", start, "end", end)
for i := start; i <= end; i++ {
enqueue, err := s.client.GetEnqueue(i)
if err != nil {
log.Error("Cannot get enqueue in loop", "index", i)
continue
}

if enqueue == nil {
log.Debug("No enqueue transaction found")
break
}

// This should never happen
if enqueue.L1BlockNumber() == nil {
log.Error("No blocknumber for enqueue", "index", i, "timestamp", enqueue.L1Timestamp(), "blocknumber", enqueue.L1BlockNumber())
continue
}

// Update the timestamp and blocknumber based on the enqueued
// transactions
if enqueue.L1Timestamp() > s.GetLatestL1Timestamp() {
ts := enqueue.L1Timestamp()
bn := enqueue.L1BlockNumber().Uint64()
s.SetLatestL1Timestamp(ts)
s.SetLatestL1BlockNumber(bn)
log.Info("Updated Eth Context from enqueue", "index", i, "timestamp", ts, "blocknumber", bn)
}

log.Debug("Applying enqueue transaction", "index", i)
err = s.applyTransaction(enqueue)
if err != nil {
log.Error("Cannot apply transaction", "msg", err)
}

s.SetLatestEnqueueIndex(enqueue.GetMeta().QueueIndex)
if enqueue.GetMeta().Index == nil {
latest := s.GetLatestIndex()
index := uint64(0)
if latest != nil {
index = *latest + 1
}
s.SetLatestIndex(&index)
} else {
s.SetLatestIndex(enqueue.GetMeta().Index)
}
}
s.txLock.Unlock()
}

// Both the verifier and the sequencer poll for ctc transactions.
// For the sequencer, ctc transactions are in the past while for
// the verifier, ctc transactions are extending the chain.
// The sequencer essentially runs a verifier to make sure that
// it reflects the ultimate source of truth which is the L1 contracts.
// The verifier polls for ctc transactions.
// the ctc transactions are extending the chain.
latest, err := s.client.GetLatestTransaction()
if err != nil {
log.Error("Cannot fetch transaction")
Expand Down Expand Up @@ -406,24 +315,111 @@ func (s *SyncService) Loop() {
}
s.SetLatestIndex(&i)
}
time.Sleep(s.pollInterval)
}
}

// Update the execution context's timestamp and blocknumber
// over time. This is only necessary for the sequencer.
if !s.verifier {
context, err := s.client.GetLatestEthContext()
func (s *SyncService) SequencerLoop() {
log.Info("Starting Sequencer Loop", "poll-interval", s.pollInterval, "timestamp-refresh-threshold", s.timestampRefreshThreshold)
for {
// Only the sequencer needs to poll for enqueue transactions
// and then can choose when to apply them. We choose to apply
// transactions such that it makes for efficient batch submitting.
// Place as many L1ToL2 transactions in the same context as possible
// by executing them one after another.
// TODO: break this routine out into a function so that lock
// management is more simple. For now, be sure to unlock before
// each outer continue
s.txLock.Lock()
latest, err := s.client.GetLatestEnqueue()
if err != nil {
log.Error("Cannot get latest enqueue")
s.txLock.Unlock()
time.Sleep(s.pollInterval)
continue
}
// This should never happen unless the backend is empty
if latest == nil {
log.Debug("No enqueue transactions found")
s.txLock.Unlock()
time.Sleep(s.pollInterval)
continue
}
// Compare the remote latest queue index to the local latest
// queue index. If the remote latest queue index is greater
// than the local latest queue index, be sure to ingest more
// enqueued transactions
var start uint64
if s.GetLatestEnqueueIndex() == nil {
start = 0
} else {
start = *s.GetLatestEnqueueIndex() + 1
}
end := *latest.GetMeta().QueueIndex

log.Info("Polling enqueued transactions", "start", start, "end", end)
for i := start; i <= end; i++ {
enqueue, err := s.client.GetEnqueue(i)
if err != nil {
log.Error("Cannot get latest eth context", "msg", err)
log.Error("Cannot get enqueue in loop", "index", i)
continue
}
current := time.Unix(int64(s.GetLatestL1Timestamp()), 0)
next := time.Unix(int64(context.Timestamp), 0)
if next.Sub(current) > s.timestampRefreshThreshold {
log.Info("Updating Eth Context", "timetamp", context.Timestamp, "blocknumber", context.BlockNumber)
s.SetLatestL1BlockNumber(context.BlockNumber)
s.SetLatestL1Timestamp(context.Timestamp)

if enqueue == nil {
log.Debug("No enqueue transaction found")
break
}

// This should never happen
if enqueue.L1BlockNumber() == nil {
log.Error("No blocknumber for enqueue", "index", i, "timestamp", enqueue.L1Timestamp(), "blocknumber", enqueue.L1BlockNumber())
continue
}

// Update the timestamp and blocknumber based on the enqueued
// transactions
if enqueue.L1Timestamp() > s.GetLatestL1Timestamp() {
ts := enqueue.L1Timestamp()
bn := enqueue.L1BlockNumber().Uint64()
s.SetLatestL1Timestamp(ts)
s.SetLatestL1BlockNumber(bn)
log.Info("Updated Eth Context from enqueue", "index", i, "timestamp", ts, "blocknumber", bn)
}

log.Debug("Applying enqueue transaction", "index", i)
err = s.applyTransaction(enqueue)
if err != nil {
log.Error("Cannot apply transaction", "msg", err)
}

s.SetLatestEnqueueIndex(enqueue.GetMeta().QueueIndex)
if enqueue.GetMeta().Index == nil {
latest := s.GetLatestIndex()
index := uint64(0)
if latest != nil {
index = *latest + 1
}
s.SetLatestIndex(&index)
} else {
s.SetLatestIndex(enqueue.GetMeta().Index)
}
}
s.txLock.Unlock()

// Update the execution context's timestamp and blocknumber
// over time. This is only necessary for the sequencer.
context, err := s.client.GetLatestEthContext()
if err != nil {
log.Error("Cannot get latest eth context", "msg", err)
continue
}
current := time.Unix(int64(s.GetLatestL1Timestamp()), 0)
next := time.Unix(int64(context.Timestamp), 0)
if next.Sub(current) > s.timestampRefreshThreshold {
log.Info("Updating Eth Context", "timetamp", context.Timestamp, "blocknumber", context.BlockNumber)
s.SetLatestL1BlockNumber(context.BlockNumber)
s.SetLatestL1Timestamp(context.Timestamp)
}
time.Sleep(s.pollInterval)
}
}
Expand Down
4 changes: 2 additions & 2 deletions rollup/sync_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestSyncServiceTransactionEnqueued(t *testing.T) {
})

// Start up the main loop
go service.Loop()
go service.SequencerLoop()
// Wait for the tx to be confirmed into the chain and then
// make sure it is the transactions that was set up with in the mockclient
event := <-txCh
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestSyncServiceSync(t *testing.T) {
},
})

go service.Loop()
go service.VerifierLoop()

event := <-txCh
if len(event.Txs) != 1 {
Expand Down

0 comments on commit e7a13b2

Please sign in to comment.