Skip to content

Commit

Permalink
feat: expose elasticsearch action results as metric
Browse files Browse the repository at this point in the history
  • Loading branch information
uatmaca committed Oct 29, 2024
1 parent 9cf4be8 commit 5d812d5
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 17 deletions.
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ used for both connectors.

| Package | Time to Process Events | Elasticsearch Indexing Rate(/s) | Average CPU Usage(Core) | Average Memory Usage |
|:--------------------------------------------|:----------------------:|:-------------------------------:|:-----------------------:|:--------------------:|
| **Go Dcp Elasticsearch**(Go 1.20) | **50s** | ![go](./benchmark/go.png) | **0.486** | **408MB**
| Java Elasticsearch Connect Couchbase(JDK15) | 80s | ![go](./benchmark/java.png) | 0.31 | 1091MB
| **Go Dcp Elasticsearch**(Go 1.20) | **50s** | ![go](./benchmark/go.png) | **0.486** | **408MB** |
| Java Elasticsearch Connect Couchbase(JDK15) | 80s | ![go](./benchmark/java.png) | 0.31 | 1091MB |

## Example

Expand Down Expand Up @@ -121,10 +121,11 @@ Check out on [go-dcp](https://github.com/Trendyol/go-dcp#configuration)

## Exposed metrics

| Metric Name | Description | Labels | Value Type |
|---------------------------------------------------------|-------------------------------|--------|------------|
| elasticsearch_connector_latency_ms | Time to adding to the batch. | N/A | Gauge |
| elasticsearch_connector_bulk_request_process_latency_ms | Time to process bulk request. | N/A | Gauge |
| Metric Name | Description | Labels | Value Type |
|---------------------------------------------------------|-------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------|
| elasticsearch_connector_latency_ms | Time to adding to the batch. | N/A | Gauge |
| elasticsearch_connector_bulk_request_process_latency_ms | Time to process bulk request. | N/A | Gauge |
| elasticsearch_connector_action_total | Count elasticsearch actions | `action_type`: Type of action (e.g., `delete`, `index`) `result`: Result of the action (e.g., `success`, `error`) `index_name`: The name of the index to which the action is applied | Counter |

You can also use all DCP-related metrics explained [here](https://github.com/Trendyol/go-dcp#exposed-metrics).
All DCP-related metrics are automatically injected. It means you don't need to do anything.
Expand Down
49 changes: 38 additions & 11 deletions elasticsearch/bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ type Bulk struct {
}

type Metric struct {
ProcessLatencyMs int64
BulkRequestProcessLatencyMs int64
ProcessLatencyMs int64
BulkRequestProcessLatencyMs int64
IndexingSuccessActionCounter map[string]int64
IndexingErrorActionCounter map[string]int64
DeletionSuccessActionCounter map[string]int64
DeletionErrorActionCounter map[string]int64
}

type BatchItem struct {
Expand All @@ -71,15 +75,20 @@ func NewBulk(
}

bulk := &Bulk{
batchTickerDuration: config.Elasticsearch.BatchTickerDuration,
batchTicker: time.NewTicker(config.Elasticsearch.BatchTickerDuration),
actionCh: make(chan document.ESActionDocument, config.Elasticsearch.BatchSizeLimit),
batchSizeLimit: config.Elasticsearch.BatchSizeLimit,
batchByteSizeLimit: helpers.ResolveUnionIntOrStringValue(config.Elasticsearch.BatchByteSizeLimit),
isClosed: make(chan bool, 1),
dcpCheckpointCommit: dcpCheckpointCommit,
esClient: esClient,
metric: &Metric{},
batchTickerDuration: config.Elasticsearch.BatchTickerDuration,
batchTicker: time.NewTicker(config.Elasticsearch.BatchTickerDuration),
actionCh: make(chan document.ESActionDocument, config.Elasticsearch.BatchSizeLimit),
batchSizeLimit: config.Elasticsearch.BatchSizeLimit,
batchByteSizeLimit: helpers.ResolveUnionIntOrStringValue(config.Elasticsearch.BatchByteSizeLimit),
isClosed: make(chan bool, 1),
dcpCheckpointCommit: dcpCheckpointCommit,
esClient: esClient,
metric: &Metric{
IndexingSuccessActionCounter: make(map[string]int64),
IndexingErrorActionCounter: make(map[string]int64),
DeletionSuccessActionCounter: make(map[string]int64),
DeletionErrorActionCounter: make(map[string]int64),
},
collectionIndexMapping: config.Elasticsearch.CollectionIndexMapping,
config: config,
typeName: helper.Byte(config.Elasticsearch.TypeName),
Expand Down Expand Up @@ -379,18 +388,36 @@ func (b *Bulk) executeSinkResponseHandler(batchActions []*document.ESActionDocum
for _, action := range batchActions {
key := getActionKey(*action)
if _, ok := errorData[key]; ok {
b.countError(action)
b.sinkResponseHandler.OnError(&dcpElasticsearch.SinkResponseHandlerContext{
Action: action,
Err: fmt.Errorf(errorData[key]),
})
} else {
b.countSuccess(action)
b.sinkResponseHandler.OnSuccess(&dcpElasticsearch.SinkResponseHandlerContext{
Action: action,
})
}
}
}

func (b *Bulk) countError(action *document.ESActionDocument) {
if action.Type == document.Index {
b.metric.IndexingErrorActionCounter[action.IndexName]++
} else if action.Type == document.Delete {
b.metric.DeletionErrorActionCounter[action.IndexName]++
}
}

func (b *Bulk) countSuccess(action *document.ESActionDocument) {
if action.Type == document.Index {
b.metric.IndexingSuccessActionCounter[action.IndexName]++
} else if action.Type == document.Delete {
b.metric.DeletionSuccessActionCounter[action.IndexName]++
}
}

func getActionKey(action document.ESActionDocument) string {
if action.Routing != nil {
return fmt.Sprintf("%s:%s:%s", action.ID, action.IndexName, *action.Routing)
Expand Down
44 changes: 44 additions & 0 deletions metric/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Collector struct {

processLatency *prometheus.Desc
bulkRequestProcessLatency *prometheus.Desc
actionCounter *prometheus.Desc
}

func (s *Collector) Describe(ch chan<- *prometheus.Desc) {
Expand All @@ -33,6 +34,42 @@ func (s *Collector) Collect(ch chan<- prometheus.Metric) {
float64(bulkMetric.BulkRequestProcessLatencyMs),
[]string{}...,
)

for indexName, count := range bulkMetric.IndexingSuccessActionCounter {
ch <- prometheus.MustNewConstMetric(
s.actionCounter,
prometheus.CounterValue,
float64(count),
"index", "success", indexName,
)
}

for indexName, count := range bulkMetric.IndexingErrorActionCounter {
ch <- prometheus.MustNewConstMetric(
s.actionCounter,
prometheus.CounterValue,
float64(count),
"index", "error", indexName,
)
}

for indexName, count := range bulkMetric.DeletionSuccessActionCounter {
ch <- prometheus.MustNewConstMetric(
s.actionCounter,
prometheus.CounterValue,
float64(count),
"delete", "success", indexName,
)
}

for indexName, count := range bulkMetric.DeletionErrorActionCounter {
ch <- prometheus.MustNewConstMetric(
s.actionCounter,
prometheus.CounterValue,
float64(count),
"delete", "error", indexName,
)
}
}

func NewMetricCollector(bulk *bulk.Bulk) *Collector {
Expand All @@ -52,5 +89,12 @@ func NewMetricCollector(bulk *bulk.Bulk) *Collector {
[]string{},
nil,
),

actionCounter: prometheus.NewDesc(
prometheus.BuildFQName(helpers.Name, "elasticsearch_connector_action_total", "current"),
"Elasticsearch connector action counter",
[]string{"action_type", "result", "index_name"},
nil,
),
}
}

0 comments on commit 5d812d5

Please sign in to comment.