From 710d6edeef02395c477108878ab72b6290b0ac76 Mon Sep 17 00:00:00 2001 From: Eray Arslan Date: Wed, 8 May 2024 16:24:24 +0300 Subject: [PATCH] feat: mapper process latency metric added --- connector.go | 18 +++++++++++++++++- metric/collector.go | 23 ++++++++++++++++++++--- 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/connector.go b/connector.go index b64b256..bd531bc 100644 --- a/connector.go +++ b/connector.go @@ -3,6 +3,7 @@ package dcpcouchbase import ( "errors" "os" + "time" "github.com/Trendyol/go-dcp/helpers" @@ -24,6 +25,7 @@ import ( type Connector interface { Start() Close() + GetMapperProcessLatencyMs() int64 } type connector struct { @@ -32,6 +34,11 @@ type connector struct { mapper Mapper processor *couchbase.Processor targetClient couchbase.TargetClient + metric *Metric +} + +type Metric struct { + MapperProcessLatencyMs int64 } func (c *connector) Start() { @@ -47,6 +54,10 @@ func (c *connector) Close() { c.processor.Close() } +func (c *connector) GetMapperProcessLatencyMs() int64 { + return c.metric.MapperProcessLatencyMs +} + func (c *connector) listener(ctx *models.ListenerContext) { var e couchbase.Event switch event := ctx.Event.(type) { @@ -69,6 +80,8 @@ func (c *connector) listener(ctx *models.ListenerContext) { return } + beforeMapperTime := time.Now() + actions := c.mapper( couchbase.EventContext{ TargetClient: c.targetClient, @@ -76,6 +89,8 @@ func (c *connector) listener(ctx *models.ListenerContext) { }, ) + c.metric.MapperProcessLatencyMs = time.Since(beforeMapperTime).Milliseconds() + if len(actions) == 0 { ctx.Ack() return @@ -114,6 +129,7 @@ func newConnector(cf any, mapper Mapper, sinkResponseHandler couchbase.SinkRespo connector := &connector{ mapper: mapper, config: cfg, + metric: &Metric{}, } if err != nil { @@ -158,7 +174,7 @@ func newConnector(cf any, mapper Mapper, sinkResponseHandler couchbase.SinkRespo processor: connector.processor, }) - metricCollector := metric.NewMetricCollector(connector.processor) + metricCollector := metric.NewMetricCollector(connector.processor, connector.GetMapperProcessLatencyMs) dcp.SetMetricCollectors(metricCollector) return connector, nil diff --git a/metric/collector.go b/metric/collector.go index 18f3b44..f3d11ae 100644 --- a/metric/collector.go +++ b/metric/collector.go @@ -7,9 +7,11 @@ import ( ) type Collector struct { - processor *couchbase.Processor + processor *couchbase.Processor + getMapperProcessLatencyMs func() int64 processLatency *prometheus.Desc + mapperProcessLatency *prometheus.Desc bulkRequestProcessLatency *prometheus.Desc bulkRequestSize *prometheus.Desc bulkRequestByteSize *prometheus.Desc @@ -29,6 +31,13 @@ func (s *Collector) Collect(ch chan<- prometheus.Metric) { []string{}..., ) + ch <- prometheus.MustNewConstMetric( + s.mapperProcessLatency, + prometheus.GaugeValue, + float64(s.getMapperProcessLatencyMs()), + []string{}..., + ) + ch <- prometheus.MustNewConstMetric( s.bulkRequestProcessLatency, prometheus.GaugeValue, @@ -51,9 +60,10 @@ func (s *Collector) Collect(ch chan<- prometheus.Metric) { ) } -func NewMetricCollector(processor *couchbase.Processor) *Collector { +func NewMetricCollector(processor *couchbase.Processor, getMapperProcessLatencyMs func() int64) *Collector { return &Collector{ - processor: processor, + processor: processor, + getMapperProcessLatencyMs: getMapperProcessLatencyMs, processLatency: prometheus.NewDesc( prometheus.BuildFQName(helpers.Name, "couchbase_connector_latency_ms", "current"), @@ -62,6 +72,13 @@ func NewMetricCollector(processor *couchbase.Processor) *Collector { nil, ), + mapperProcessLatency: prometheus.NewDesc( + prometheus.BuildFQName(helpers.Name, "couchbase_connector_mapper_latency_ms", "current"), + "Couchbase connector mapper latency ms", + []string{}, + nil, + ), + bulkRequestProcessLatency: prometheus.NewDesc( prometheus.BuildFQName(helpers.Name, "couchbase_connector_bulk_request_process_latency_ms", "current"), "Couchbase connector bulk request process latency ms",