Skip to content

Commit

Permalink
Merge pull request #9 from ChainSafe/mpetrun5/lotus-extended-tracer
Browse files Browse the repository at this point in the history
Lotus extended pubsub tracer
  • Loading branch information
Matija Petrunić authored Sep 28, 2021
2 parents d61612e + 1e9a089 commit 58afade
Show file tree
Hide file tree
Showing 14 changed files with 407 additions and 19 deletions.
20 changes: 20 additions & 0 deletions cmd/lotus/daemon.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !nodaemon
// +build !nodaemon

package main
Expand Down Expand Up @@ -153,6 +154,18 @@ var DaemonCmd = &cli.Command{
Name: "restore-config",
Usage: "config file to use when restoring from backup",
},
&cli.StringFlag{
Name: "trace-to-json",
Usage: "starts tracer and outputs to json file defined with this flag",
},
&cli.StringFlag{
Name: "trace-to-elasticsearch",
Usage: "starts tracer and outputs to elasticsearch, flag must contain connection string for elasticsearch",
},
&cli.StringFlag{
Name: "trace-source-auth",
Usage: "auth token for trusted source of traces",
},
},
Action: func(cctx *cli.Context) error {
isLite := cctx.Bool("lite")
Expand Down Expand Up @@ -310,6 +323,10 @@ var DaemonCmd = &cli.Command{
log.Warnf("unable to inject prometheus ipfs/go-metrics exporter; some metrics will be unavailable; err: %s", err)
}

traceToJsonFile := cctx.String("trace-to-json")
traceToElasticsearch := cctx.String("trace-to-elasticsearch")
traceSourceAuth := cctx.String("trace-source-auth")

var api api.FullNode
stop, err := node.New(ctx,
node.FullAPI(&api, node.Lite(isLite)),
Expand All @@ -319,6 +336,9 @@ var DaemonCmd = &cli.Command{

node.Override(new(dtypes.Bootstrapper), isBootstrapper),
node.Override(new(dtypes.ShutdownChan), shutdownChan),
node.Override(new(dtypes.JsonTracer), traceToJsonFile),
node.Override(new(dtypes.ElasticSearchTracer), traceToElasticsearch),
node.Override(new(dtypes.TracerSourceAuth), traceSourceAuth),

genesis,
liteModeDeps,
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/drand/drand v1.2.1
github.com/drand/kyber v1.1.4
github.com/dustin/go-humanize v1.0.0
github.com/elastic/go-elasticsearch/v7 v7.14.0
github.com/elastic/go-sysinfo v1.3.0
github.com/elastic/gosigar v0.12.0
github.com/etclabscore/go-openrpc-reflect v0.0.36
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5m
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/elastic/go-elasticsearch/v7 v7.14.0 h1:extp3jos/rwJn3J+lgbaGlwAgs0TVsIHme00GyNAyX4=
github.com/elastic/go-elasticsearch/v7 v7.14.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/elastic/go-sysinfo v1.3.0 h1:eb2XFGTMlSwG/yyU9Y8jVAYLIzU2sFzWXwo2gmetyrE=
github.com/elastic/go-sysinfo v1.3.0/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0=
github.com/elastic/go-windows v1.0.0 h1:qLURgZFkkrYyTTkvYpsZIgf83AUsdIHfvlJaqaZ7aSY=
Expand Down
18 changes: 18 additions & 0 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ type Pubsub struct {
DirectPeers []string
IPColocationWhitelist []string
RemoteTracer string
JsonTracer string
ElasticSearchTracer string
ElasticSearchIndex string
TracerSourceAuth string
}

type Chainstore struct {
Expand Down
5 changes: 5 additions & 0 deletions node/modules/dtypes/tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package dtypes

type JsonTracer string
type ElasticSearchTracer string
type TracerSourceAuth string
122 changes: 103 additions & 19 deletions node/modules/lp2p/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/modules/tracer"
)

func init() {
Expand Down Expand Up @@ -49,6 +50,30 @@ func ScoreKeeper() *dtypes.ScoreKeeper {
return new(dtypes.ScoreKeeper)
}

type PeerScoreTracker interface {
UpdatePeerScore(scores map[peer.ID]*pubsub.PeerScoreSnapshot)
}

type peerScoreTracker struct {
sk *dtypes.ScoreKeeper
lt tracer.LotusTracer
}

func newPeerScoreTracker(lt tracer.LotusTracer, sk *dtypes.ScoreKeeper) PeerScoreTracker {
return &peerScoreTracker{
sk: sk,
lt: lt,
}
}

func (pst *peerScoreTracker) UpdatePeerScore(scores map[peer.ID]*pubsub.PeerScoreSnapshot) {
if pst.lt != nil {
pst.lt.PeerScores(scores)
}

pst.sk.Update(scores)
}

type GossipIn struct {
fx.In
Mctx helpers.MetricsCtx
Expand Down Expand Up @@ -272,7 +297,6 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
OpportunisticGraftThreshold: OpportunisticGraftScoreThreshold,
},
),
pubsub.WithPeerScoreInspect(in.Sk.Update, 10*time.Second),
}

// enable Peer eXchange on bootstrappers
Expand Down Expand Up @@ -341,6 +365,27 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
pubsub.NewAllowlistSubscriptionFilter(allowTopics...),
100)))

var transports []tracer.TracerTransport
if in.Cfg.JsonTracer != "" {
jsonTransport, err := tracer.NewJsonTracerTransport(in.Cfg.JsonTracer)
if err != nil {
return nil, err
}

transports = append(transports, jsonTransport)
}
if in.Cfg.ElasticSearchTracer != "" {
elasticSearchTransport, err := tracer.NewElasticSearchTransport(
in.Cfg.ElasticSearchTracer,
in.Cfg.ElasticSearchIndex,
)
if err != nil {
return nil, err
}
transports = append(transports, elasticSearchTransport)
}
lt := tracer.NewLotusTracer(transports, in.Host.ID(), in.Cfg.TracerSourceAuth)

// tracer
if in.Cfg.RemoteTracer != "" {
a, err := ma.NewMultiaddr(in.Cfg.RemoteTracer)
Expand All @@ -358,12 +403,18 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
return nil, err
}

trw := newTracerWrapper(tr, build.BlocksTopic(in.Nn))
pst := newPeerScoreTracker(lt, in.Sk)
trw := newTracerWrapper(tr, lt, build.BlocksTopic(in.Nn))

options = append(options, pubsub.WithEventTracer(trw))
options = append(options, pubsub.WithPeerScoreInspect(pst.UpdatePeerScore, 10*time.Second))
} else {
// still instantiate a tracer for collecting metrics
trw := newTracerWrapper(nil)
trw := newTracerWrapper(nil, lt)
options = append(options, pubsub.WithEventTracer(trw))

pst := newPeerScoreTracker(lt, in.Sk)
options = append(options, pubsub.WithPeerScoreInspect(pst.UpdatePeerScore, 10*time.Second))
}

return pubsub.NewGossipSub(helpers.LifecycleCtx(in.Mctx, in.Lc), in.Host, options...)
Expand All @@ -374,7 +425,11 @@ func HashMsgId(m *pubsub_pb.Message) string {
return string(hash[:])
}

func newTracerWrapper(tr pubsub.EventTracer, topics ...string) pubsub.EventTracer {
func newTracerWrapper(
lp2pTracer pubsub.EventTracer,
lotusTracer pubsub.EventTracer,
topics ...string,
) pubsub.EventTracer {
var topicsMap map[string]struct{}
if len(topics) > 0 {
topicsMap = make(map[string]struct{})
Expand All @@ -383,12 +438,13 @@ func newTracerWrapper(tr pubsub.EventTracer, topics ...string) pubsub.EventTrace
}
}

return &tracerWrapper{tr: tr, topics: topicsMap}
return &tracerWrapper{lp2pTracer: lp2pTracer, lotusTracer: lotusTracer, topics: topicsMap}
}

type tracerWrapper struct {
tr pubsub.EventTracer
topics map[string]struct{}
lp2pTracer pubsub.EventTracer
lotusTracer pubsub.EventTracer
topics map[string]struct{}
}

func (trw *tracerWrapper) traceMessage(topic string) bool {
Expand All @@ -406,33 +462,61 @@ func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) {
switch evt.GetType() {
case pubsub_pb.TraceEvent_PUBLISH_MESSAGE:
stats.Record(context.TODO(), metrics.PubsubPublishMessage.M(1))
if trw.tr != nil && trw.traceMessage(evt.GetPublishMessage().GetTopic()) {
trw.tr.Trace(evt)
if trw.traceMessage(evt.GetPublishMessage().GetTopic()) {
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}

if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
}
case pubsub_pb.TraceEvent_DELIVER_MESSAGE:
stats.Record(context.TODO(), metrics.PubsubDeliverMessage.M(1))
if trw.tr != nil && trw.traceMessage(evt.GetDeliverMessage().GetTopic()) {
trw.tr.Trace(evt)
if trw.traceMessage(evt.GetDeliverMessage().GetTopic()) {
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}

if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
}
case pubsub_pb.TraceEvent_REJECT_MESSAGE:
stats.Record(context.TODO(), metrics.PubsubRejectMessage.M(1))
case pubsub_pb.TraceEvent_DUPLICATE_MESSAGE:
stats.Record(context.TODO(), metrics.PubsubDuplicateMessage.M(1))
case pubsub_pb.TraceEvent_JOIN:
if trw.tr != nil {
trw.tr.Trace(evt)
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}

if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
case pubsub_pb.TraceEvent_LEAVE:
if trw.tr != nil {
trw.tr.Trace(evt)
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}

if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
case pubsub_pb.TraceEvent_GRAFT:
if trw.tr != nil {
trw.tr.Trace(evt)
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}

if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
case pubsub_pb.TraceEvent_PRUNE:
if trw.tr != nil {
trw.tr.Trace(evt)
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}

if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
case pubsub_pb.TraceEvent_RECV_RPC:
stats.Record(context.TODO(), metrics.PubsubRecvRPC.M(1))
Expand Down
92 changes: 92 additions & 0 deletions node/modules/tracer/elasticsearch_transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package tracer

import (
"context"
"encoding/json"
"fmt"
"net/url"
"strings"

"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
)

const (
ElasticSearch_INDEX_DEFAULT = "lotus-pubsub"
)

func NewElasticSearchTransport(connectionString string, elasticsearchIndex string) (TracerTransport, error) {
conUrl, err := url.Parse(connectionString)

if err != nil {
return nil, err
}

username := conUrl.User.Username()
password, _ := conUrl.User.Password()
cfg := elasticsearch.Config{
Addresses: []string{
conUrl.Scheme + "://" + conUrl.Host,
},
Username: username,
Password: password,
}

es, err := elasticsearch.NewClient(cfg)

if err != nil {
return nil, err
}

var esIndex string
if elasticsearchIndex != "" {
esIndex = elasticsearchIndex
} else {
esIndex = ElasticSearch_INDEX_DEFAULT
}

return &elasticSearchTransport{
cl: es,
esIndex: esIndex,
}, nil
}

type elasticSearchTransport struct {
cl *elasticsearch.Client
esIndex string
}

func (est *elasticSearchTransport) Transport(evt TracerTransportEvent) error {
var e interface{}

if evt.lotusTraceEvent != nil {
e = *evt.lotusTraceEvent
} else if evt.pubsubTraceEvent != nil {
e = *evt.pubsubTraceEvent
} else {
return nil
}

jsonEvt, err := json.Marshal(e)
if err != nil {
return fmt.Errorf("error while marshaling event: %s", err)
}

req := esapi.IndexRequest{
Index: est.esIndex,
Body: strings.NewReader(string(jsonEvt)),
Refresh: "true",
}

// Perform the request with the client.
res, err := req.Do(context.Background(), est.cl)
if err != nil {
return err
}
defer res.Body.Close()

if res.IsError() {
return fmt.Errorf("[%s] Error indexing document ID=%s", res.Status(), req.DocumentID)
}
return nil
}
Loading

0 comments on commit 58afade

Please sign in to comment.