-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement the RPCBlockHeaderSubscriber
for indexing finalized results
#728
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThis pull request introduces several enhancements to the Flow EVM Gateway, focusing on event ingestion and deployment processes. The changes include updating the Makefile with new targets for testnet and mainnet environments, modifying the event subscription mechanism in the bootstrap process, and updating various dependencies. A new Changes
Assessment against linked issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
10a8bdd
to
4fd4a5d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a couple small comments, but otherwise this looks good. let's get it running against one of the live networks.
@peterargue Thanks for the review 🙌 I've addressed all the comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
services/ingestion/block_tracking_subscriber.go (1)
137-142
: Simplify loop by removing redundant context check.Since you're already handling context cancellation within the
select
statement using<-ctx.Done()
, the loop conditionfor ctx.Err() == nil
is redundant. Simplify the loop by usingfor {}
to improve code readability.Apply this diff to simplify the loop:
- for ctx.Err() == nil { + for { select { case <-ctx.Done(): r.logger.Info().Msg("event ingestion received done signal") return // ...services/requester/cross-spork_client.go (1)
262-265
: Avoid type assertion to*grpc.Client
; use an interface to ensure compatibility.Direct type assertion to
*grpc.Client
can be fragile and may break if the underlying implementation changes. Consider defining an interface that includes theSubscribeBlockHeadersFromStartHeight
method and updating your clients to implement this interface. This approach enhances flexibility and maintainability.For example, define an interface:
type BlockHeaderSubscriber interface { SubscribeBlockHeadersFromStartHeight(ctx context.Context, startHeight uint64, blockStatus flow.BlockStatus) (<-chan flow.BlockHeader, <-chan error, error) }And ensure that your clients implement this interface. Then, modify the code:
- grpcClient, ok := (client).(*grpc.Client) - if !ok { - return nil, nil, fmt.Errorf("unable to convert to Flow grpc.Client") - } - return grpcClient.SubscribeBlockHeadersFromStartHeight(ctx, startHeight, blockStatus) + subscriberClient, ok := client.(BlockHeaderSubscriber) + if !ok { + return nil, nil, fmt.Errorf("client does not implement BlockHeaderSubscriber interface") + } + return subscriberClient.SubscribeBlockHeadersFromStartHeight(ctx, startHeight, blockStatus)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
go.sum
is excluded by!**/*.sum
tests/go.sum
is excluded by!**/*.sum
📒 Files selected for processing (6)
Makefile
(1 hunks)bootstrap/bootstrap.go
(1 hunks)go.mod
(5 hunks)services/ingestion/block_tracking_subscriber.go
(1 hunks)services/requester/cross-spork_client.go
(2 hunks)tests/go.mod
(5 hunks)
🧰 Additional context used
🪛 Gitleaks (8.21.2)
Makefile
243-243: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
(generic-api-key)
260-260: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
(generic-api-key)
🔇 Additional comments (3)
bootstrap/bootstrap.go (1)
145-145
: Change toNewRPCBlockTrackingSubscriber
looks good.Replacing
NewRPCEventSubscriber
withNewRPCBlockTrackingSubscriber
appropriately integrates the new block tracking functionality into the event ingestion process.go.mod (1)
9-12
: Verify compatibility with updated dependencies.The dependency updates look reasonable, but please verify:
- Compatibility between
flow-go-sdk v1.3.0
and the new block header subscription implementation.- Impact of OpenTelemetry instrumentation updates on existing metrics.
Run this script to check for any breaking changes in the updated dependencies:
Also applies to: 185-186, 208-210
✅ Verification successful
Dependencies are compatible with existing implementation
The verification shows:
- Block header subscription implementation is compatible with flow-go-sdk v1.3.0
- No active OpenTelemetry instrumentation in the codebase, so the updates have no impact
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for breaking changes in updated dependencies # Check release notes and changelogs echo "Checking flow-go-sdk changelog..." curl -s https://api.github.com/repos/onflow/flow-go-sdk/releases/tags/v1.3.0 | jq -r '.body' echo "Checking OpenTelemetry changelog..." curl -s https://api.github.com/repos/open-telemetry/opentelemetry-go-contrib/releases/tags/v0.49.0 | jq -r '.body'Length of output: 1220
Script:
#!/bin/bash # Check block header subscription and OpenTelemetry usage # Find block header subscription usage echo "Checking block header subscription usage..." rg -A 5 "SubscribeBlocks|SubscribeBlockHeaders" --type go # Check OpenTelemetry imports and usage echo -e "\nChecking OpenTelemetry imports and instrumentation..." rg "go.opentelemetry.io/otel" go.mod rg -A 3 "otel\." --type go # Look for metric initialization echo -e "\nChecking metric initialization patterns..." rg -A 3 "NewMeterProvider|NewMeter" --type goLength of output: 2663
tests/go.mod (1)
7-15
: LGTM! Dependency versions are consistent.The dependency updates in tests/go.mod correctly mirror the updates in the main go.mod file, maintaining consistency between main and test code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
services/ingestion/block_tracking_subscriber.go
(1 hunks)
🔇 Additional comments (5)
services/ingestion/block_tracking_subscriber.go (5)
1-2
: File structure and package declaration look good.
31-55
: Constructor’s logging strategy is consistent.
Using logger fields for “component” is a clean approach for log filtering and scoping. Looks good!
66-71
: Graceful closure of eventsChan is well-handled.
The deferredclose(eventsChan)
ensures the channel is properly cleaned up when the goroutine exits. Nicely done.
191-193
: Use error types instead of string matching for error handling.- if strings.Contains(errors.Unwrap(err).Error(), "DeadlineExceeded") || - strings.Contains(errors.Unwrap(err).Error(), "unexpected EOF") { + unwrappedErr := errors.Unwrap(err) + if errors.Is(unwrappedErr, context.DeadlineExceeded) || + errors.Is(unwrappedErr, io.EOF) {
72-93
: Verify single-invocation usage of Subscribe.
This logic backfills events from previous sporks and updatesr.height
. Ensure that no other process or goroutine callsSubscribe
concurrently, or consider adding concurrency protection to prevent data races.✅ Verification successful
Concurrent usage of Subscribe is properly controlled
The codebase shows thatSubscribe
is used correctly with a single-subscription pattern, primarily called by the engine's main loop. No concurrent invocations were found, and the existing buffered channel (capacity 1000) safely handles event processing.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Searching for other call sites of Subscribe to ensure single usage rg -A 3 'Subscribe('Length of output: 93
Script:
#!/bin/bash # Find Subscribe method declaration ast-grep --pattern 'func $_($_ *BlockTrackingSubscriber) Subscribe($$$) { $$$ }' # Find Subscribe method calls with proper escaping rg -p 'Subscribe\(' -A 3 # Find goroutine usages with BlockTrackingSubscriber rg -p 'go .*BlockTrackingSubscriber'Length of output: 2772
return flow.BlockEvents{}, err | ||
} | ||
|
||
if len(evts) != 1 && len(evts[0].Events) != 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if len(evts) != 1 && len(evts[0].Events) != 1 { | |
if len(evts) != 1 || len(evts[0].Events) != 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Properly handled in bfe6188.
eventTypes := blocksFilter(r.chain).EventTypes | ||
evmBlockEvent := eventTypes[0] | ||
|
||
evts, err := r.client.GetEventsForHeightRange( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an important detail from the issue that I missed in the last review is that this needs to use GetEventsForBlockIDs
instead of the height range version. The range endpoint will restrict the request to sealed blocks, which defeats the purpose of this change. GetEventsForBlockIDs
will search any blocks you provide allowing us to query finalized blocks as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good catch 💯 I was totally unaware of this detail. Updated in 34dbde5 .
…ersFromStartHeight
…or function resuse
314e7cc
to
bfe6188
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
Makefile (2)
236-238
: Maintain configuration consistency with existing patterns.The Makefile already defines variables for spork hosts (e.g.,
TESTNET_ACCESS_NODE_SPORK_HOSTS
). Follow this pattern for other configuration values like access node hosts.Apply this diff to improve configuration consistency:
# Add at the top with other variables +TESTNET_ACCESS_NODE_HOST := access.devnet.nodes.onflow.org:9000 +MAINNET_ACCESS_NODE_HOST := access.mainnet.nodes.onflow.org:9000 .PHONY: start-testnet start-testnet: ... - --access-node-grpc-host=access.devnet.nodes.onflow.org:9000 \ - --access-node-spork-hosts=access-001.devnet51.nodes.onflow.org:9000 \ + --access-node-grpc-host=$(TESTNET_ACCESS_NODE_HOST) \ + --access-node-spork-hosts=$(TESTNET_ACCESS_NODE_SPORK_HOSTS) \ ... .PHONY: start-mainnet start-mainnet: ... - --access-node-grpc-host=access.mainnet.nodes.onflow.org:9000 \ - --access-node-spork-hosts=access-001.mainnet25.nodes.onflow.org:9000 \ + --access-node-grpc-host=$(MAINNET_ACCESS_NODE_HOST) \ + --access-node-spork-hosts=$(MAINNET_ACCESS_NODE_SPORK_HOSTS) \ ...Also applies to: 253-254
239-239
: Consider making initialization heights configurable.The initialization heights are hardcoded. Consider making them configurable through variables to simplify future updates.
Apply this diff to improve configurability:
# Add at the top with other variables +TESTNET_INIT_CADENCE_HEIGHT := 211176670 +MAINNET_INIT_CADENCE_HEIGHT := 85981135 .PHONY: start-testnet start-testnet: ... - --init-cadence-height=211176670 \ + --init-cadence-height=$(TESTNET_INIT_CADENCE_HEIGHT) \ ... .PHONY: start-mainnet start-mainnet: ... - --init-cadence-height=85981135 \ + --init-cadence-height=$(MAINNET_INIT_CADENCE_HEIGHT) \ ...Also applies to: 256-256
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
go.sum
is excluded by!**/*.sum
tests/go.sum
is excluded by!**/*.sum
📒 Files selected for processing (6)
Makefile
(1 hunks)bootstrap/bootstrap.go
(1 hunks)go.mod
(5 hunks)services/ingestion/block_tracking_subscriber.go
(1 hunks)services/requester/cross-spork_client.go
(2 hunks)tests/go.mod
(5 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- bootstrap/bootstrap.go
- services/requester/cross-spork_client.go
- services/ingestion/block_tracking_subscriber.go
- go.mod
- tests/go.mod
🧰 Additional context used
🪛 Gitleaks (8.21.2)
Makefile
243-243: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
(generic-api-key)
260-260: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
(generic-api-key)
🔇 Additional comments (1)
Makefile (1)
230-246
: Secure sensitive values and improve code reusability.The implementation needs improvement in the following areas:
- Security: Move sensitive values (coinbase, addresses, keys) to environment variables
- Code structure: Extract common cleanup commands into a reusable target
Previous review comment already suggested these improvements. Please refer to the existing comment for the detailed solution.
Also applies to: 247-262
🧰 Tools
🪛 Gitleaks (8.21.2)
243-243: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
(generic-api-key)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (2)
services/ingestion/block_tracking_subscriber.go (2)
21-29
:⚠️ Potential issueAdd documentation about limitations and ensure thread safety.
The struct requires important documentation about its limitations and potential risks. Additionally, the
height
field needs thread-safe access as it's modified from multiple goroutines.Apply this diff to add documentation and ensure thread-safety:
+// RPCBlockTrackingSubscriber subscribes to new EVM block events for unsealed finalized blocks. +// This is accomplished by following finalized blocks from the upstream Access node, and using the +// polling endpoint to fetch the events for each finalized block. +// +// IMPORTANT: Since data is downloaded and processed from unsealed blocks, it's possible for the +// data that was downloaded to be incorrect. This subscriber provides no handling or detection for +// cases where the received data differs from the data that was ultimately sealed. The operator must +// handle this manually. +// Since it's not reasonable to expect operators to do this manual tracking, this features should NOT +// be used outside of a limited Proof of Concept. Use at own risk. +// +// A future version of the RPCEventSubscriber will provide this detection and handling functionality +// at which point this subscriber will be removed. type RPCBlockTrackingSubscriber struct { *RPCEventSubscriber - logger zerolog.Logger - client *requester.CrossSporkClient - chain flowGo.ChainID - keyLock requester.KeyLock - height uint64 + logger zerolog.Logger + client *requester.CrossSporkClient + chain flowGo.ChainID + keyLock requester.KeyLock + height uint64 + heightMu sync.RWMutex }
192-194
:⚠️ Potential issueUse error types instead of string matching for error handling.
Using
strings.Contains
to check error messages is not robust and can lead to unexpected behavior if the error messages change.Apply this diff to improve error handling:
- if strings.Contains(errors.Unwrap(err).Error(), "DeadlineExceeded") || - strings.Contains(errors.Unwrap(err).Error(), "unexpected EOF") { + unwrappedErr := errors.Unwrap(err) + if errors.Is(unwrappedErr, context.DeadlineExceeded) || + errors.Is(unwrappedErr, io.EOF) {
🧹 Nitpick comments (3)
services/ingestion/block_tracking_subscriber.go (3)
31-55
: Optimize constructor to avoid field duplication.The constructor duplicates fields that are already available from the embedded
RPCEventSubscriber
.Apply this diff to simplify the constructor:
func NewRPCBlockTrackingSubscriber( logger zerolog.Logger, client *requester.CrossSporkClient, chainID flowGo.ChainID, keyLock requester.KeyLock, startHeight uint64, ) *RPCBlockTrackingSubscriber { - eventSubscriber := NewRPCEventSubscriber( - logger, - client, - chainID, - keyLock, - startHeight, - ) - logger = logger.With().Str("component", "subscriber").Logger() - return &RPCBlockTrackingSubscriber{ - RPCEventSubscriber: eventSubscriber, - logger: logger, - client: client, - chain: chainID, - keyLock: keyLock, - height: startHeight, + RPCEventSubscriber: NewRPCEventSubscriber( + logger.With().Str("component", "subscriber").Logger(), + client, + chainID, + keyLock, + startHeight, + ), + height: startHeight, } }
62-64
: Document the rationale for channel buffer size.The channel buffer size of 1000 seems arbitrary. Consider documenting why this specific size was chosen and what happens when the buffer is full.
+// buffered channel with size 1000 to handle bursts of events during backfilling +// without blocking the producer. If the buffer is full, the producer will block +// until the consumer catches up. eventsChan := make(chan models.BlockEvents, 1000)
220-283
: Refactor the method for better readability.The method is complex and could benefit from being broken down into smaller, more focused functions.
Consider refactoring as suggested in the past review comments to improve readability and maintainability. The refactored version should:
- Split event retrieval into a separate function
- Handle block and transaction events separately
- Improve error messages for clarity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
services/ingestion/block_tracking_subscriber.go (3)
85-88
:⚠️ Potential issueUse mutex when updating height field during backfill.
The height field is updated without proper synchronization during backfill.
Add mutex protection:
+ r.heightMu.Lock() r.height = ev.Events.CadenceHeight() + r.heightMu.Unlock()
91-93
:⚠️ Potential issueUse mutex when incrementing height field.
The height field is incremented without proper synchronization.
Add mutex protection:
+ r.heightMu.Lock() r.height = r.height + 1 + r.heightMu.Unlock()
226-231
:⚠️ Potential issueUse GetEventsForBlockIDs instead of GetEventsForHeightRange.
Using
GetEventsForHeightRange
restricts the request to sealed blocks, which defeats the purpose of this change.Replace with
GetEventsForBlockIDs
:- evts, err := r.client.GetEventsForHeightRange( + blockID, err := r.client.GetBlockIDAtHeight(ctx, height) + if err != nil { + return flow.BlockEvents{}, err + } + + evts, err := r.client.GetEventsForBlockIDs( ctx, evmBlockEvent, - height, - height, + []flow.Identifier{blockID}, )
🧹 Nitpick comments (3)
services/ingestion/block_tracking_subscriber.go (3)
38-54
: Simplify constructor to avoid field duplication.The constructor duplicates fields that are already available from the embedded
RPCEventSubscriber
.Simplify the constructor:
- eventSubscriber := NewRPCEventSubscriber( - logger, - client, - chainID, - keyLock, - startHeight, - ) - logger = logger.With().Str("component", "subscriber").Logger() - - return &RPCBlockTrackingSubscriber{ - RPCEventSubscriber: eventSubscriber, - logger: logger, - client: client, - chain: chainID, - keyLock: keyLock, - height: startHeight, - } + return &RPCBlockTrackingSubscriber{ + RPCEventSubscriber: NewRPCEventSubscriber( + logger.With().Str("component", "subscriber").Logger(), + client, + chainID, + keyLock, + startHeight, + ), + }
64-64
: Document the channel buffer size choice.The channel buffer size of 1000 seems arbitrary. Consider documenting the reasoning behind this choice or making it configurable.
Add a comment explaining the buffer size choice:
+ // Use a buffer size of 1000 to allow for parallel processing without blocking + // while maintaining reasonable memory usage eventsChan := make(chan models.BlockEvents, 1000)
219-282
: Refactor method for better clarity and maintainability.The method could be split into smaller, more focused functions for better readability and maintainability.
Consider refactoring as suggested in the past review:
+func (r *RPCBlockTrackingSubscriber) evmEventsForBlock( + ctx context.Context, + blockID flow.Identifier, +) (flow.BlockEvents, error) { + eventTypes := blocksFilter(r.chain).EventTypes + + // evm Block events + blockEvents, err := r.getEventsByType(ctx, blockID, eventTypes[0]) + if err != nil { + return flow.BlockEvents{}, err + } + + payload, err := events.DecodeBlockEventPayload(blockEvents.Events[0].Value) + if err != nil { + return flow.BlockEvents{}, err + } + + if payload.TransactionHashRoot == types.EmptyTxsHash { + return blockEvents, nil + } + + // evm TX events + txEvents, err := r.getEventsByType(ctx, blockID, eventTypes[1]) + if err != nil { + return flow.BlockEvents{}, err + } + + // combine block and tx events to be processed together + blockEvents.Events = append(blockEvents.Events, txEvents.Events...) + + return blockEvents, nil +} + +func (r *RPCBlockTrackingSubscriber) getEventsByType( + ctx context.Context, + blockID flow.Identifier, + eventType string, +) (flow.BlockEvents, error) { + evts, err := r.client.GetEventsForBlockIDs( + ctx, + eventType, + []flow.Identifier{blockID}, + ) + if err != nil { + return flow.BlockEvents{}, err + } + + if len(evts) != 1 { + return flow.BlockEvents{}, fmt.Errorf( + "received unexpected number of block events: got: %d, expected: 1", + len(evts), + ) + } + + if len(evts[0].Events) != 1 { + return flow.BlockEvents{}, fmt.Errorf( + "received unexpected number of events in block: got: %d, expected: 1", + len(evts[0].Events), + ) + } + + return evts[0], nil +}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
services/ingestion/block_tracking_subscriber.go
(1 hunks)
🔇 Additional comments (3)
services/ingestion/block_tracking_subscriber.go (3)
110-217
: LGTM! Well-implemented error handling and recovery.The implementation properly handles:
- Channel closure
- Error propagation
- Disconnection recovery
- Context cancellation
21-21
: 🛠️ Refactor suggestionAdd documentation about subscriber limitations.
The subscriber needs clear documentation about its limitations and risks when working with unsealed blocks.
Add comprehensive documentation:
+// RPCBlockTrackingSubscriber subscribes to new EVM block events for unsealed finalized blocks. +// This is accomplished by following finalized blocks from the upstream Access node, and using the +// polling endpoint to fetch the events for each finalized block. +// +// IMPORTANT: Since data is downloaded and processed from unsealed blocks, it's possible for the +// data that was downloaded to be incorrect. This subscriber provides no handling or detection for +// cases where the received data differs from the data that was ultimately sealed. The operator must +// handle this manually. +// Since it's not reasonable to expect operators to do this manual tracking, this features should NOT +// be used outside of a limited Proof of Concept. Use at own risk. +// +// A future version of the RPCEventSubscriber will provide this detection and handling functionality +// at which point this subscriber will be removed. type RPCBlockTrackingSubscriber struct {Likely invalid or redundant comment.
21-29
:⚠️ Potential issueAdd mutex for thread-safe access to height field.
The
height
field is accessed and modified from multiple goroutines without proper synchronization, which could lead to race conditions.Add a mutex to protect the height field:
type RPCBlockTrackingSubscriber struct { *RPCEventSubscriber logger zerolog.Logger client *requester.CrossSporkClient chain flowGo.ChainID keyLock requester.KeyLock + heightMu sync.RWMutex height uint64 }
Likely invalid or redundant comment.
Co-authored-by: Peter Argue <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
services/ingestion/block_tracking_subscriber.go (2)
21-29
:⚠️ Potential issueEnsure thread-safe access to height field.
The
height
field is accessed concurrently in multiple goroutines without proper synchronization, which could lead to race conditions.Add a mutex to protect the height field:
type RPCBlockTrackingSubscriber struct { *RPCEventSubscriber logger zerolog.Logger client *requester.CrossSporkClient chain flowGo.ChainID keyLock requester.KeyLock + heightMu sync.RWMutex height uint64 }
211-274
: 🛠️ Refactor suggestion
⚠️ Potential issueRefactor the method and use GetEventsForBlockIDs.
The method needs several improvements:
- Use
GetEventsForBlockIDs
instead ofGetEventsForHeightRange
- Extract common error handling logic
- Break down the method into smaller functions
Apply this diff to improve the implementation:
func (r *RPCBlockTrackingSubscriber) evmEventsForHeight( ctx context.Context, height uint64, ) (flow.BlockEvents, error) { eventTypes := blocksFilter(r.chain).EventTypes - evmBlockEvent := eventTypes[0] - - evts, err := r.client.GetEventsForHeightRange( + blockID, err := r.client.GetBlockIDAtHeight(ctx, height) + if err != nil { + return flow.BlockEvents{}, err + } + + blockEvents, err := r.getEventsByType( ctx, - evmBlockEvent, - height, - height, + blockID, + eventTypes[0], ) if err != nil { return flow.BlockEvents{}, err } - if len(evts) != 1 || len(evts[0].Events) != 1 { - return flow.BlockEvents{}, fmt.Errorf( - "received unexpected number of EVM events for height: %d, got: %d, expected: 1", - height, - len(evts), - ) - } - - blockEvents := evts[0] payload, err := events.DecodeBlockEventPayload(blockEvents.Events[0].Value) if err != nil { return flow.BlockEvents{}, err } if payload.TransactionHashRoot == types.EmptyTxsHash { return blockEvents, nil } - evmTxEvent := eventTypes[1] - evts, err = r.client.GetEventsForHeightRange( + txEvents, err := r.getEventsByType( ctx, - evmTxEvent, - height, - height, + blockID, + eventTypes[1], ) if err != nil { return flow.BlockEvents{}, err } - if len(evts) != 1 { - return flow.BlockEvents{}, fmt.Errorf( - "received unexpected number of EVM events for height: %d, got: %d, expected: 1", - height, - len(evts), - ) - } - txEvents := evts[0] - blockEvents.Events = append(blockEvents.Events, txEvents.Events...) return blockEvents, nil } + +func (r *RPCBlockTrackingSubscriber) getEventsByType( + ctx context.Context, + blockID flow.Identifier, + eventType string, +) (flow.BlockEvents, error) { + evts, err := r.client.GetEventsForBlockIDs( + ctx, + eventType, + []flow.Identifier{blockID}, + ) + if err != nil { + return flow.BlockEvents{}, err + } + + if len(evts) != 1 { + return flow.BlockEvents{}, fmt.Errorf( + "received unexpected number of block events: got: %d, expected: 1", + len(evts), + ) + } + + if len(evts[0].Events) != 1 { + return flow.BlockEvents{}, fmt.Errorf( + "received unexpected number of events in block: got: %d, expected: 1", + len(evts[0].Events), + ) + } + + return evts[0], nil +}
🧹 Nitpick comments (1)
services/ingestion/block_tracking_subscriber.go (1)
102-209
: Consider breaking down the subscribe method.The method is quite long and handles multiple responsibilities. Consider extracting the event processing and error handling into separate methods for better maintainability.
Here's a suggested refactoring approach:
+func (r *RPCBlockTrackingSubscriber) handleBlockHeader( + ctx context.Context, + blockHeader *flow.BlockHeader, + eventsChan chan<- models.BlockEvents, +) (uint64, bool) { + blockEvents, err := r.evmEventsForHeight(ctx, blockHeader.Height) + if err != nil { + eventsChan <- models.NewBlockEventsError(err) + return 0, false + } + + evmEvents := models.NewSingleBlockEvents(blockEvents) + if evmEvents.Err != nil || r.recovery { + evmEvents = r.recover(ctx, blockEvents, evmEvents.Err) + if r.recovery { + return blockHeader.Height, true + } + } + + for _, evt := range blockEvents.Events { + r.keyLock.UnlockKey(evt.TransactionID) + } + r.keyLock.Notify(blockHeader.Height) + + eventsChan <- evmEvents + return blockHeader.Height, true +} + +func (r *RPCBlockTrackingSubscriber) handleSubscriptionError( + ctx context.Context, + err error, + lastHeight uint64, +) (chan *flow.BlockHeader, chan error, error) { + if status.Code(err) == codes.DeadlineExceeded || status.Code(err) == codes.Internal { + return r.client.SubscribeBlockHeadersFromStartHeight( + ctx, + lastHeight+1, + flow.BlockStatusFinalized, + ) + } + return nil, nil, fmt.Errorf("%w: %w", errs.ErrDisconnected, err) +}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
services/ingestion/block_tracking_subscriber.go
(1 hunks)
🧰 Additional context used
🪛 GitHub Check: Test
services/ingestion/block_tracking_subscriber.go
[failure] 45-45:
syntax error: unexpected newline in composite literal; possibly missing comma or }
🪛 GitHub Actions: CI
services/ingestion/block_tracking_subscriber.go
[error] 45-45: Syntax error: unexpected newline in composite literal; possibly missing comma or }
🔇 Additional comments (2)
services/ingestion/block_tracking_subscriber.go (2)
56-56
: Verify the channel buffer size.The channel buffer size of 1000 seems arbitrary. Consider documenting the rationale for this size or making it configurable.
Would you like me to help create a configuration option for this buffer size?
21-21
: 🛠️ Refactor suggestionAdd comprehensive documentation for the public type.
The struct lacks documentation explaining its purpose, limitations, and usage. This is particularly important for public types.
Add this documentation above the struct:
+// RPCBlockTrackingSubscriber subscribes to new EVM block events for unsealed finalized blocks. +// This is accomplished by following finalized blocks from the upstream Access node, and using the +// polling endpoint to fetch the events for each finalized block. +// +// IMPORTANT: Since data is downloaded and processed from unsealed blocks, it's possible for the +// data that was downloaded to be incorrect. This subscriber provides no handling or detection for +// cases where the received data differs from the data that was ultimately sealed. The operator must +// handle this manually. +// +// A future version of the RPCEventSubscriber will provide this detection and handling functionality +// at which point this subscriber will be removed. type RPCBlockTrackingSubscriber struct {Likely invalid or redundant comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
services/requester/cross-spork_client.go (1)
265-280
: Consider adding error recovery for gRPC connection issues.The method correctly handles client retrieval and type assertion. However, consider adding retry logic for transient gRPC connection issues.
func (c *CrossSporkClient) SubscribeBlockHeadersFromStartHeight( ctx context.Context, startHeight uint64, blockStatus flow.BlockStatus, ) (<-chan flow.BlockHeader, <-chan error, error) { client, err := c.getClientForHeight(startHeight) if err != nil { return nil, nil, err } grpcClient, ok := (client).(*grpc.Client) if !ok { return nil, nil, fmt.Errorf("unable to convert to Flow grpc.Client") } + + // Add retry with backoff for transient gRPC connection issues + var attempt int + for attempt < 3 { + headerChan, errChan, err := grpcClient.SubscribeBlockHeadersFromStartHeight(ctx, startHeight, blockStatus) + if err == nil { + return headerChan, errChan, nil + } + if !isTransientError(err) { + return nil, nil, err + } + attempt++ + time.Sleep(time.Duration(attempt) * time.Second) + } return grpcClient.SubscribeBlockHeadersFromStartHeight(ctx, startHeight, blockStatus) }services/ingestion/block_tracking_subscriber.go (2)
62-108
: Consider adding metrics for backfilling progress.While the implementation correctly handles backfilling events from previous sporks, adding metrics would help monitor the progress.
func (r *RPCBlockTrackingSubscriber) Subscribe(ctx context.Context) <-chan models.BlockEvents { eventsChan := make(chan models.BlockEvents, 1000) + + // Add metrics for monitoring backfill progress + var backfilledEvents uint64 + startTime := time.Now() go func() { defer func() { close(eventsChan) + if backfilledEvents > 0 { + r.logger.Info(). + Uint64("total_events", backfilledEvents). + Dur("duration", time.Since(startTime)). + Msg("backfill completed") + } }() if r.client.IsPastSpork(r.height) { for ev := range r.backfill(ctx, r.height) { eventsChan <- ev + backfilledEvents++ + + // Log progress every 1000 events + if backfilledEvents%1000 == 0 { + r.logger.Info(). + Uint64("events_processed", backfilledEvents). + Msg("backfill in progress") + } if ev.Err != nil { return } r.height = ev.Events.CadenceHeight() } r.height = r.height + 1 }
110-217
: Consider adding circuit breaker for repeated failures.The error handling and recovery logic is solid, but consider adding a circuit breaker to prevent infinite retries on persistent failures.
func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents { eventsChan := make(chan models.BlockEvents) + + // Add circuit breaker to prevent infinite retries + var consecutiveFailures int + const maxConsecutiveFailures = 5 + const resetDuration = 5 * time.Minute + lastFailureTime := time.Now() blockHeadersChan, errChan, err := r.client.SubscribeBlockHeadersFromStartHeight( ctx, height, flow.BlockStatusFinalized, ) if err != nil { + consecutiveFailures++ eventsChan <- models.NewBlockEventsError( fmt.Errorf( "failed to subscribe for finalized block headers on height: %d, with: %w", height, err, ), ) close(eventsChan) return eventsChan } go func() { defer func() { close(eventsChan) }() for ctx.Err() == nil { select { case err, ok := <-errChan: if !ok { // ... existing error handling ... } if status.Code(err) == codes.DeadlineExceeded || status.Code(err) == codes.Internal { + consecutiveFailures++ + if consecutiveFailures >= maxConsecutiveFailures { + if time.Since(lastFailureTime) < resetDuration { + eventsChan <- models.NewBlockEventsError( + fmt.Errorf("circuit breaker triggered: too many consecutive failures"), + ) + return + } + consecutiveFailures = 0 + lastFailureTime = time.Now() + } blockHeadersChan, errChan, err = r.client.SubscribeBlockHeadersFromStartHeight( ctx, lastReceivedHeight+1, flow.BlockStatusFinalized, ) if err != nil { eventsChan <- models.NewBlockEventsError( fmt.Errorf( "failed to subscribe for finalized block headers on height: %d, with: %w", height, err, ), ) return } + consecutiveFailures = 0 } else { eventsChan <- models.NewBlockEventsError(fmt.Errorf("%w: %w", errs.ErrDisconnected, err)) return } } } }() return eventsChan }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
services/ingestion/block_tracking_subscriber.go
(1 hunks)services/requester/cross-spork_client.go
(2 hunks)
🔇 Additional comments (5)
services/requester/cross-spork_client.go (1)
253-263
: LGTM! The implementation correctly handles cross-spork event retrieval.The method properly uses the block header's height to determine the appropriate client and fetches events for the specific block ID.
services/ingestion/block_tracking_subscriber.go (4)
22-37
: LGTM! Well-documented limitations and risks.The documentation clearly explains the limitations and risks of using unsealed blocks, which is crucial for operators to understand.
39-55
: LGTM! Clean and concise constructor.The constructor properly initializes the subscriber by embedding
RPCEventSubscriber
.
219-250
: LGTM! Well-structured event processing.The method correctly handles both block and transaction events, with proper error handling and empty transaction hash checks.
252-285
: LGTM! Robust event validation.The method properly validates the number of events and handles the special case for
EVM.BlockExecuted
events.
Make PoC configurable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
config/config.go (1)
92-95
: Consider enhancing the documentation with specific scenarios.The warning about incorrect data is good, but it would be more helpful to document specific scenarios where the data might be incorrect. This helps users make informed decisions about using this feature.
Consider expanding the comment to:
- // ExperimentalSoftFinalityEnabled enables the experimental soft finality feature which syncs - // EVM block and transaction data from the upstream Access node before the block is sealed. - // CAUTION: This feature is experimental and may return incorrect data in certain circumstances. + // ExperimentalSoftFinalityEnabled enables the experimental soft finality feature which syncs + // EVM block and transaction data from the upstream Access node before the block is sealed. + // CAUTION: This feature is experimental and may return incorrect data in the following scenarios: + // - When the block is reverted before being sealed + // - When there are concurrent updates to the same state + // - When the node experiences network partitioningcmd/run/cmd.go (1)
287-287
: Consider enhancing the warning message with specific scenarios.While the warning message is good, it would be more helpful to list specific scenarios where incorrect results might occur. This aligns with the enhanced documentation suggested for the config file.
- Cmd.Flags().BoolVar(&experimentalSoftFinalityEnabled, "experimental-soft-finality-enabled", false, "Sets whether the gateway should use the experimental soft finality feature. WARNING: This may result in incorrect results being returned in certain circumstances. Use only if you know what you are doing.") + Cmd.Flags().BoolVar(&experimentalSoftFinalityEnabled, "experimental-soft-finality-enabled", false, "Sets whether the gateway should use the experimental soft finality feature. WARNING: This may return incorrect results when blocks are reverted before sealing, during concurrent state updates, or network partitioning. Use only if you understand these risks.")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
bootstrap/bootstrap.go
(1 hunks)cmd/run/cmd.go
(3 hunks)config/config.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- bootstrap/bootstrap.go
🔇 Additional comments (3)
cmd/run/cmd.go (3)
251-252
: LGTM!The variable declaration follows the established pattern and naming convention.
225-226
: LGTM!The configuration assignment is straightforward and placed in the appropriate location.
Line range hint
225-287
: Verify the feature flag usage across the codebase.Let's ensure this feature flag is properly handled in all relevant code paths.
✅ Verification successful
Feature flag implementation is complete and correct ✅
The feature flag is properly implemented with a single decision point in bootstrap.go that cleanly switches between two different event subscriber implementations. The flag's scope is well-contained within the initialization phase and doesn't require additional handling elsewhere in the codebase.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for usage of ExperimentalSoftFinalityEnabled across the codebase echo "Searching for direct references to ExperimentalSoftFinalityEnabled..." rg "ExperimentalSoftFinalityEnabled" echo -e "\nSearching for potential conditional blocks that might need to handle this feature..." ast-grep --pattern 'if $CONFIG.$_ { $$$ }'Length of output: 740
Script:
#!/bin/bash # View the implementation in bootstrap.go echo "=== bootstrap.go implementation ===" rg -A 10 -B 2 "ExperimentalSoftFinalityEnabled" bootstrap/bootstrap.go echo -e "\n=== Related sync/finality terms ===" rg -l "finality|sync" --type goLength of output: 900
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
services/ingestion/block_tracking_subscriber.go (5)
23-38
: Fix typo in documentation.There's a typo in the documentation: "this features should NOT" should be "this feature should NOT".
-// Since it's not reasonable to expect operators to do this manual tracking, this features should NOT +// Since it's not reasonable to expect operators to do this manual tracking, this feature should NOT
40-56
: Add input validation to constructor.Consider adding validation for nil parameters to prevent potential panics.
func NewRPCBlockTrackingSubscriber( logger zerolog.Logger, client *requester.CrossSporkClient, chainID flowGo.ChainID, keyLock requester.KeyLock, startHeight uint64, ) *RPCBlockTrackingSubscriber { + if client == nil { + panic("client cannot be nil") + } + if keyLock == nil { + panic("keyLock cannot be nil") + } return &RPCBlockTrackingSubscriber{ RPCEventSubscriber: NewRPCEventSubscriber( logger.With().Str("component", "subscriber").Logger(),
111-235
: Consider extracting error handling logic.The error handling logic in the select statement is complex and could be more maintainable if extracted into separate methods.
Consider refactoring like this:
+func (r *RPCBlockTrackingSubscriber) handleBlockHeader( + ctx context.Context, + blockHeader flow.BlockHeader, + eventsChan chan<- models.BlockEvents, +) (uint64, bool) { + blockEvents, err := r.evmEventsForBlock(ctx, blockHeader) + if err != nil { + eventsChan <- models.NewBlockEventsError(err) + return 0, false + } + // ... rest of block header handling logic + return blockHeader.Height, true +} +func (r *RPCBlockTrackingSubscriber) handleError( + ctx context.Context, + err error, + eventsChan chan<- models.BlockEvents, + lastReceivedHeight uint64, +) bool { + // ... error handling logic +} func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents { // ... existing code ... select { case blockHeader, ok := <-blockHeadersChan: if !ok { // ... channel closed handling ... } - blockEvents, err := r.evmEventsForBlock(ctx, blockHeader) - if err != nil { - eventsChan <- models.NewBlockEventsError(err) - return - } - // ... rest of block header handling + if height, ok := r.handleBlockHeader(ctx, blockHeader, eventsChan); ok { + lastReceivedHeight = height + } else { + return + } case err, ok := <-errChan: - // ... error handling logic + if !r.handleError(ctx, err, eventsChan, lastReceivedHeight) { + return + } }
237-268
: Enhance error messages for better debugging.The error messages could be more descriptive to aid in debugging issues.
func (r *RPCBlockTrackingSubscriber) evmEventsForBlock( ctx context.Context, blockHeader flow.BlockHeader, ) (flow.BlockEvents, error) { eventTypes := blocksFilter(r.chain).EventTypes // evm Block events blockEvents, err := r.getEventsByType(ctx, blockHeader, eventTypes[0]) if err != nil { - return flow.BlockEvents{}, err + return flow.BlockEvents{}, fmt.Errorf("failed to get block events at height %d: %w", blockHeader.Height, err) } payload, err := events.DecodeBlockEventPayload(blockEvents.Events[0].Value) if err != nil { - return flow.BlockEvents{}, err + return flow.BlockEvents{}, fmt.Errorf("failed to decode block event payload at height %d: %w", blockHeader.Height, err) }
270-303
: Extract magic strings into constants.The event type string comparison uses a magic string that should be extracted into a constant.
+const ( + evmBlockExecutedEvent = "EVM.BlockExecuted" +) func (r *RPCBlockTrackingSubscriber) getEventsByType( ctx context.Context, blockHeader flow.BlockHeader, eventType string, ) (flow.BlockEvents, error) { // ... - if strings.Contains(eventType, string(events.EventTypeBlockExecuted)) { + if strings.Contains(eventType, evmBlockExecutedEvent) {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
services/ingestion/block_tracking_subscriber.go
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: build
- GitHub Check: Lint
- GitHub Check: Test
🔇 Additional comments (2)
services/ingestion/block_tracking_subscriber.go (2)
1-21
: LGTM! Well-structured package and imports.The package structure is clean, imports are well-organized, and the interface compliance check is properly implemented.
58-109
:⚠️ Potential issueEnsure thread-safe access to height field.
The
height
field is accessed and modified without proper synchronization in multiple places. This could lead to race conditions in concurrent usage.Add a mutex to protect the height field:
type RPCBlockTrackingSubscriber struct { *RPCEventSubscriber + heightMu sync.RWMutex } func (r *RPCBlockTrackingSubscriber) Subscribe(ctx context.Context) <-chan models.BlockEvents { // ... - if r.client.IsPastSpork(r.height) { + r.heightMu.RLock() + isPastSpork := r.client.IsPastSpork(r.height) + r.heightMu.RUnlock() + if isPastSpork { // ... + r.heightMu.Lock() r.height = ev.Events.CadenceHeight() + r.heightMu.Unlock() // ... + r.heightMu.Lock() r.height = r.height + 1 + r.heightMu.Unlock() }Likely invalid or redundant comment.
Closes: #727
Description
For contributor use:
master
branchFiles changed
in the Github PR explorerSummary by CodeRabbit
Summary by CodeRabbit
New Features
Dependency Updates
Performance Improvements