diff --git a/node/modules/lp2p/pubsub.go b/node/modules/lp2p/pubsub.go index 5376d93c8f8..33a03f844f0 100644 --- a/node/modules/lp2p/pubsub.go +++ b/node/modules/lp2p/pubsub.go @@ -394,6 +394,20 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { transports = append(transports, jsonTransport) } + + tps := make([]string, 0) // range of topics that will be submited to the traces + addTopicToList := func(topicList []string, newTopic string) []string { + // check if the topic is already in the list + for _, tp := range topicList { + if tp == newTopic { + return topicList + } + } + // add it otherwise + return append(topicList, newTopic) + } + + // tracer if in.Cfg.ElasticSearchTracer != "" { elasticSearchTransport, err := tracer.NewElasticSearchTransport( in.Cfg.ElasticSearchTracer, @@ -403,7 +417,9 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { return nil, err } transports = append(transports, elasticSearchTransport) + tps = addTopicToList(tps, build.BlocksTopic(in.Nn)) } + lt := tracer.NewLotusTracer(transports, in.Host.ID(), in.Cfg.TracerSourceAuth) // tracer @@ -412,28 +428,25 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { if err != nil { return nil, err } - pi, err := peer.AddrInfoFromP2pAddr(a) if err != nil { return nil, err } - tr, err := pubsub.NewRemoteTracer(context.TODO(), in.Host, *pi) if err != nil { return nil, err } - pst := newPeerScoreTracker(lt, in.Sk) - trw := newTracerWrapper(tr, lt, build.BlocksTopic(in.Nn)) + trw := newTracerWrapper(tr, lt, tps...) options = append(options, pubsub.WithEventTracer(trw)) options = append(options, pubsub.WithPeerScoreInspect(pst.UpdatePeerScore, 10*time.Second)) } else { // still instantiate a tracer for collecting metrics - trw := newTracerWrapper(nil, lt) - options = append(options, pubsub.WithEventTracer(trw)) - pst := newPeerScoreTracker(lt, in.Sk) + trw := newTracerWrapper(nil, lt, tps...) + + options = append(options, pubsub.WithEventTracer(trw)) options = append(options, pubsub.WithPeerScoreInspect(pst.UpdatePeerScore, 10*time.Second)) } @@ -457,7 +470,6 @@ func newTracerWrapper( topicsMap[topic] = struct{}{} } } - return &tracerWrapper{lp2pTracer: lp2pTracer, lotusTracer: lotusTracer, topics: topicsMap} } @@ -486,71 +498,164 @@ func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) { if trw.lp2pTracer != nil { trw.lp2pTracer.Trace(evt) } - if trw.lotusTracer != nil { trw.lotusTracer.Trace(evt) } } + case pubsub_pb.TraceEvent_DELIVER_MESSAGE: stats.Record(context.TODO(), metrics.PubsubDeliverMessage.M(1)) - if trw.traceMessage(evt.GetDeliverMessage().GetTopic()) { - if trw.lp2pTracer != nil { - trw.lp2pTracer.Trace(evt) - } - if trw.lotusTracer != nil { - trw.lotusTracer.Trace(evt) - } - } case pubsub_pb.TraceEvent_REJECT_MESSAGE: stats.Record(context.TODO(), metrics.PubsubRejectMessage.M(1)) if trw.traceMessage(evt.GetRejectMessage().GetTopic()) { if trw.lp2pTracer != nil { trw.lp2pTracer.Trace(evt) } - if trw.lotusTracer != nil { trw.lotusTracer.Trace(evt) } } + case pubsub_pb.TraceEvent_DUPLICATE_MESSAGE: stats.Record(context.TODO(), metrics.PubsubDuplicateMessage.M(1)) - case pubsub_pb.TraceEvent_JOIN: - if trw.lp2pTracer != nil { - trw.lp2pTracer.Trace(evt) + if trw.traceMessage(evt.GetDuplicateMessage().GetTopic()) { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } } - if trw.lotusTracer != nil { - trw.lotusTracer.Trace(evt) + case pubsub_pb.TraceEvent_JOIN: + if trw.traceMessage(evt.GetJoin().GetTopic()) { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } } case pubsub_pb.TraceEvent_LEAVE: - if trw.lp2pTracer != nil { - trw.lp2pTracer.Trace(evt) + if trw.traceMessage(evt.GetLeave().GetTopic()) { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } } - if trw.lotusTracer != nil { - trw.lotusTracer.Trace(evt) - } case pubsub_pb.TraceEvent_GRAFT: + if trw.traceMessage(evt.GetGraft().GetTopic()) { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + } + + case pubsub_pb.TraceEvent_PRUNE: + if trw.traceMessage(evt.GetPrune().GetTopic()) { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + } + + case pubsub_pb.TraceEvent_ADD_PEER: if trw.lp2pTracer != nil { trw.lp2pTracer.Trace(evt) } - if trw.lotusTracer != nil { trw.lotusTracer.Trace(evt) } - case pubsub_pb.TraceEvent_PRUNE: + + case pubsub_pb.TraceEvent_REMOVE_PEER: if trw.lp2pTracer != nil { trw.lp2pTracer.Trace(evt) } - if trw.lotusTracer != nil { trw.lotusTracer.Trace(evt) } + case pubsub_pb.TraceEvent_RECV_RPC: stats.Record(context.TODO(), metrics.PubsubRecvRPC.M(1)) + // only track the RPC Calls from IWANT / IHAVE / BLOCK topic + controlRPC := evt.GetRecvRPC().GetMeta().GetControl() + ihave := controlRPC.GetIhave() + iwant := controlRPC.GetIwant() + msgsRPC := evt.GetRecvRPC().GetMeta().GetMessages() + + // check if any of the messages we are sending belong to a trackable topic + var validTopic bool = false + for _, topic := range msgsRPC { + if trw.traceMessage(topic.GetTopic()) { + validTopic = true + break + } + } + // track if the Iwant / Ihave messages are from a valid Topic + var validIhave bool = false + for _, msgs := range ihave { + if trw.traceMessage(msgs.GetTopic()) { + validIhave = true + break + } + } + // check if we have any of iwant msgs (it doesn't classify per topic - just msg.ID) + validIwant := len(iwant) > 0 + + // trace the event if any of the flags was triggered + if validIhave || validIwant || validTopic { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + } case pubsub_pb.TraceEvent_SEND_RPC: stats.Record(context.TODO(), metrics.PubsubSendRPC.M(1)) + // only track the RPC Calls from IWANT / IHAVE / BLOCK topic + controlRPC := evt.GetSendRPC().GetMeta().GetControl() + ihave := controlRPC.GetIhave() + iwant := controlRPC.GetIwant() + msgsRPC := evt.GetSendRPC().GetMeta().GetMessages() + + // check if any of the messages we are sending belong to a trackable topic + var validTopic bool = false + for _, topic := range msgsRPC { + if trw.traceMessage(topic.GetTopic()) { + validTopic = true + break + } + } + // track if the Iwant / Ihave messages are from a valid Topic + var validIhave bool = false + for _, msgs := range ihave { + if trw.traceMessage(msgs.GetTopic()) { + validIhave = true + break + } + } + // check if there was any of the Iwant msgs + validIwant := len(iwant) > 0 + + // trace the msgs if any of the flags was triggered + if validIhave || validIwant || validTopic { + if trw.lp2pTracer != nil { + trw.lp2pTracer.Trace(evt) + } + if trw.lotusTracer != nil { + trw.lotusTracer.Trace(evt) + } + } case pubsub_pb.TraceEvent_DROP_RPC: stats.Record(context.TODO(), metrics.PubsubDropRPC.M(1)) } diff --git a/node/modules/tracer/elasticsearch_transport.go b/node/modules/tracer/elasticsearch_transport.go index 1f6f9a15703..e54e0eba241 100644 --- a/node/modules/tracer/elasticsearch_transport.go +++ b/node/modules/tracer/elasticsearch_transport.go @@ -1,18 +1,23 @@ package tracer import ( + "bytes" "context" "encoding/json" "fmt" + "net/http" "net/url" - "strings" + "time" "github.com/elastic/go-elasticsearch/v7" - "github.com/elastic/go-elasticsearch/v7/esapi" + "github.com/elastic/go-elasticsearch/v7/esutil" ) const ( ElasticSearchDefaultIndex = "lotus-pubsub" + flushInterval = 10 * time.Second + flushBytes = 1024 * 1024 // MB + esWorkers = 2 // TODO: hardcoded ) func NewElasticSearchTransport(connectionString string, elasticsearchIndex string) (TracerTransport, error) { @@ -28,12 +33,12 @@ func NewElasticSearchTransport(connectionString string, elasticsearchIndex strin Addresses: []string{ conUrl.Scheme + "://" + conUrl.Host, }, - Username: username, - Password: password, + Username: username, + Password: password, + Transport: &http.Transport{}, } es, err := elasticsearch.NewClient(cfg) - if err != nil { return nil, err } @@ -45,14 +50,31 @@ func NewElasticSearchTransport(connectionString string, elasticsearchIndex strin esIndex = ElasticSearchDefaultIndex } + // Create the BulkIndexer to batch ES trace submission + bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Index: esIndex, + Client: es, + NumWorkers: esWorkers, + FlushBytes: int(flushBytes), + FlushInterval: flushInterval, + OnError: func(ctx context.Context, err error) { + log.Errorf("Error persisting queries %s", err.Error()) + }, + }) + if err != nil { + return nil, err + } + return &elasticSearchTransport{ cl: es, + bi: bi, esIndex: esIndex, }, nil } type elasticSearchTransport struct { cl *elasticsearch.Client + bi esutil.BulkIndexer esIndex string } @@ -72,26 +94,18 @@ func (est *elasticSearchTransport) Transport(evt TracerTransportEvent) error { return fmt.Errorf("error while marshaling event: %s", err) } - req := esapi.IndexRequest{ - Index: est.esIndex, - Body: strings.NewReader(string(jsonEvt)), - Refresh: "true", - } - - // Perform the request with the client. - res, err := req.Do(context.Background(), est.cl) - if err != nil { - return err - } - - err = res.Body.Close() - if err != nil { - return err - } - - if res.IsError() { - return fmt.Errorf("[%s] Error indexing document ID=%s", res.Status(), req.DocumentID) - } - - return nil + return est.bi.Add( + context.Background(), + esutil.BulkIndexerItem{ + Action: "index", + Body: bytes.NewReader(jsonEvt), + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { + if err != nil { + log.Errorf("unable to submit trace - %s", err) + } else { + log.Errorf("unable to submit trace %s: %s", res.Error.Type, res.Error.Reason) + } + }, + }, + ) }