Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ForkDigest Filter #206

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crawler/crawl/crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"eth2-crawler/crawler/p2p"
reqresp "eth2-crawler/crawler/rpc/request"
"eth2-crawler/crawler/util"
"eth2-crawler/graph/model"
"eth2-crawler/models"
ipResolver "eth2-crawler/resolver"
"eth2-crawler/store/peerstore"
Expand Down Expand Up @@ -248,7 +249,7 @@ func (c *crawler) updateGeolocation(ctx context.Context, peer *models.Peer) {
func (c *crawler) insertToHistory() {
ctx := context.Background()
// get count
aggregateData, err := c.peerStore.AggregateBySyncStatus(ctx)
aggregateData, err := c.peerStore.AggregateBySyncStatus(ctx, &model.PeerFilter{})
if err != nil {
log.Error("error getting sync status", log.Ctx{"err": err})
}
Expand Down
473 changes: 417 additions & 56 deletions graph/generated/generated.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions graph/model/models_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 15 additions & 11 deletions graph/schema.graphqls
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,20 @@ type HeatmapData {
country: String!
}

input PeerFilter {
forkDigest: String
}

type Query {
aggregateByAgentName: [AggregateData!]!
aggregateByCountry: [AggregateData!]!
aggregateByOperatingSystem: [AggregateData!]!
aggregateByNetwork: [AggregateData!]!
aggregateByHardforkSchedule: [NextHardforkAggregation!]!
aggregateByClientVersion: [ClientVersionAggregation!]!
getHeatmapData: [HeatmapData!]!
getNodeStats: NodeStats!
getNodeStatsOverTime(start: Float!, end: Float!): [NodeStatsOverTime!]!
getRegionalStats: RegionalStats!
getAltairUpgradePercentage: Float!
aggregateByAgentName(peerFilter: PeerFilter): [AggregateData!]!
aggregateByCountry(peerFilter: PeerFilter): [AggregateData!]!
aggregateByOperatingSystem(peerFilter: PeerFilter): [AggregateData!]!
aggregateByNetwork(peerFilter: PeerFilter): [AggregateData!]!
aggregateByHardforkSchedule(peerFilter: PeerFilter): [NextHardforkAggregation!]!
sadiq1971 marked this conversation as resolved.
Show resolved Hide resolved
aggregateByClientVersion(peerFilter: PeerFilter): [ClientVersionAggregation!]!
getHeatmapData(peerFilter: PeerFilter): [HeatmapData!]!
getNodeStats(peerFilter: PeerFilter): NodeStats!
getNodeStatsOverTime(start: Float!, end: Float!, peerFilter: PeerFilter): [NodeStatsOverTime!]!
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tested all the other methods using peerFilter locally, but getNodeStatsOverTime always returned:

{
  "data": {
    "getNodeStatsOverTime": []
  }
}

before and after the peerFilter additions, so I'm not 100% sure this is working as expected

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking

Copy link
Member

@sadiq1971 sadiq1971 Oct 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@spacesailor24, That's totally fine. getNodeStatsOverTime returns the count of nodes each day from the History table. A scheduler pushes data to the history table each day. So, ideally, you won't see anything with that query unless you run the crawler for at least a day. Change this to second and you will see data.

_, err = scheduler.AddFunc("@daily", c.insertToHistory)

Also, I have tested the APIs. Working fine with or without peerFilter.
So, with this implementation, if no filter is provided it returns all data.
That looks good to me. UI can filter based on the mainnet.

getRegionalStats(peerFilter: PeerFilter): RegionalStats!
getAltairUpgradePercentage(peerFilter: PeerFilter): Float!
}
46 changes: 23 additions & 23 deletions graph/schema.resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
)

// AggregateByAgentName is the resolver for the aggregateByAgentName field.
func (r *queryResolver) AggregateByAgentName(ctx context.Context) ([]*model.AggregateData, error) {
aggregateData, err := r.peerStore.AggregateByAgentName(ctx)
func (r *queryResolver) AggregateByAgentName(ctx context.Context, peerFilter *model.PeerFilter) ([]*model.AggregateData, error) {
aggregateData, err := r.peerStore.AggregateByAgentName(ctx, peerFilter)
if err != nil {
return nil, err
}
Expand All @@ -31,8 +31,8 @@ func (r *queryResolver) AggregateByAgentName(ctx context.Context) ([]*model.Aggr
}

// AggregateByCountry is the resolver for the aggregateByCountry field.
func (r *queryResolver) AggregateByCountry(ctx context.Context) ([]*model.AggregateData, error) {
aggregateData, err := r.peerStore.AggregateByCountry(ctx)
func (r *queryResolver) AggregateByCountry(ctx context.Context, peerFilter *model.PeerFilter) ([]*model.AggregateData, error) {
aggregateData, err := r.peerStore.AggregateByCountry(ctx, peerFilter)
if err != nil {
return nil, err
}
Expand All @@ -48,8 +48,8 @@ func (r *queryResolver) AggregateByCountry(ctx context.Context) ([]*model.Aggreg
}

// AggregateByOperatingSystem is the resolver for the aggregateByOperatingSystem field.
func (r *queryResolver) AggregateByOperatingSystem(ctx context.Context) ([]*model.AggregateData, error) {
aggregateData, err := r.peerStore.AggregateByOperatingSystem(ctx)
func (r *queryResolver) AggregateByOperatingSystem(ctx context.Context, peerFilter *model.PeerFilter) ([]*model.AggregateData, error) {
aggregateData, err := r.peerStore.AggregateByOperatingSystem(ctx, peerFilter)
if err != nil {
return nil, err
}
Expand All @@ -65,8 +65,8 @@ func (r *queryResolver) AggregateByOperatingSystem(ctx context.Context) ([]*mode
}

// AggregateByNetwork is the resolver for the aggregateByNetwork field.
func (r *queryResolver) AggregateByNetwork(ctx context.Context) ([]*model.AggregateData, error) {
aggregateData, err := r.peerStore.AggregateByNetworkType(ctx)
func (r *queryResolver) AggregateByNetwork(ctx context.Context, peerFilter *model.PeerFilter) ([]*model.AggregateData, error) {
aggregateData, err := r.peerStore.AggregateByNetworkType(ctx, peerFilter)
if err != nil {
return nil, err
}
Expand All @@ -81,8 +81,8 @@ func (r *queryResolver) AggregateByNetwork(ctx context.Context) ([]*model.Aggreg
}

// AggregateByHardforkSchedule is the resolver for the aggregateByHardforkSchedule field.
func (r *queryResolver) AggregateByHardforkSchedule(ctx context.Context) ([]*model.NextHardforkAggregation, error) {
allPeers, err := r.peerStore.ViewAll(ctx)
func (r *queryResolver) AggregateByHardforkSchedule(ctx context.Context, peerFilter *model.PeerFilter) ([]*model.NextHardforkAggregation, error) {
allPeers, err := r.peerStore.ViewAll(ctx, peerFilter)
if err != nil {
return nil, err
}
Expand All @@ -100,8 +100,8 @@ func (r *queryResolver) AggregateByHardforkSchedule(ctx context.Context) ([]*mod
}

// AggregateByClientVersion is the resolver for the aggregateByClientVersion field.
func (r *queryResolver) AggregateByClientVersion(ctx context.Context) ([]*model.ClientVersionAggregation, error) {
aggregateData, err := r.peerStore.AggregateByClientVersion(ctx)
func (r *queryResolver) AggregateByClientVersion(ctx context.Context, peerFilter *model.PeerFilter) ([]*model.ClientVersionAggregation, error) {
aggregateData, err := r.peerStore.AggregateByClientVersion(ctx, peerFilter)
if err != nil {
return nil, err
}
Expand All @@ -125,8 +125,8 @@ func (r *queryResolver) AggregateByClientVersion(ctx context.Context) ([]*model.
}

// GetHeatmapData is the resolver for the getHeatmapData field.
func (r *queryResolver) GetHeatmapData(ctx context.Context) ([]*model.HeatmapData, error) {
peers, err := r.peerStore.ViewAll(ctx)
func (r *queryResolver) GetHeatmapData(ctx context.Context, peerFilter *model.PeerFilter) ([]*model.HeatmapData, error) {
peers, err := r.peerStore.ViewAll(ctx, peerFilter)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -155,8 +155,8 @@ func (r *queryResolver) GetHeatmapData(ctx context.Context) ([]*model.HeatmapDat
}

// GetNodeStats is the resolver for the getNodeStats field.
func (r *queryResolver) GetNodeStats(ctx context.Context) (*model.NodeStats, error) {
aggregateData, err := r.peerStore.AggregateBySyncStatus(ctx)
func (r *queryResolver) GetNodeStats(ctx context.Context, peerFilter *model.PeerFilter) (*model.NodeStats, error) {
aggregateData, err := r.peerStore.AggregateBySyncStatus(ctx, peerFilter)
if err != nil {
return nil, err
}
Expand All @@ -168,8 +168,8 @@ func (r *queryResolver) GetNodeStats(ctx context.Context) (*model.NodeStats, err
}

// GetNodeStatsOverTime is the resolver for the getNodeStatsOverTime field.
func (r *queryResolver) GetNodeStatsOverTime(ctx context.Context, start float64, end float64) ([]*model.NodeStatsOverTime, error) {
data, err := r.historyStore.GetHistory(ctx, int64(start), int64(end))
func (r *queryResolver) GetNodeStatsOverTime(ctx context.Context, start float64, end float64, peerFilter *model.PeerFilter) ([]*model.NodeStatsOverTime, error) {
data, err := r.historyStore.GetHistory(ctx, int64(start), int64(end), peerFilter)
if err != nil {
return nil, err
}
Expand All @@ -186,13 +186,13 @@ func (r *queryResolver) GetNodeStatsOverTime(ctx context.Context, start float64,
}

// GetRegionalStats is the resolver for the getRegionalStats field.
func (r *queryResolver) GetRegionalStats(ctx context.Context) (*model.RegionalStats, error) {
countryAggrData, err := r.peerStore.AggregateByCountry(ctx)
func (r *queryResolver) GetRegionalStats(ctx context.Context, peerFilter *model.PeerFilter) (*model.RegionalStats, error) {
countryAggrData, err := r.peerStore.AggregateByCountry(ctx, peerFilter)
if err != nil {
return nil, err
}

networkAggrData, err := r.peerStore.AggregateByNetworkType(ctx)
networkAggrData, err := r.peerStore.AggregateByNetworkType(ctx, peerFilter)
if err != nil {
return nil, err
}
Expand All @@ -216,8 +216,8 @@ func (r *queryResolver) GetRegionalStats(ctx context.Context) (*model.RegionalSt
}

// GetAltairUpgradePercentage is the resolver for the getAltairUpgradePercentage field.
func (r *queryResolver) GetAltairUpgradePercentage(ctx context.Context) (float64, error) {
aggregateData, err := r.peerStore.AggregateByClientVersion(ctx)
func (r *queryResolver) GetAltairUpgradePercentage(ctx context.Context, peerFilter *model.PeerFilter) (float64, error) {
aggregateData, err := r.peerStore.AggregateByClientVersion(ctx, peerFilter)
if err != nil {
return 0, err
}
Expand Down
Loading