Skip to content

Commit

Permalink
[exporter/doris] Send json lines to doris rather than json array (#36912
Browse files Browse the repository at this point in the history
)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description 
Send json lines to doris rather than json array.

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Fixes #36896

<!--Describe what testing was performed and which tests were added.-->
#### Testing
unit test

<!--Describe the documentation added.-->
#### Documentation

<!--Please delete paragraphs that you did not use before submitting.-->
  • Loading branch information
joker-star-l authored Jan 13, 2025
1 parent 0448c78 commit 667e81e
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 13 deletions.
27 changes: 27 additions & 0 deletions .chloggen/json_line.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: dorisexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: send json lines to doris rather than json array

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36896]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
19 changes: 18 additions & 1 deletion exporter/dorisexporter/exporter_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
"net/http"
"time"
Expand Down Expand Up @@ -81,7 +82,7 @@ func streamLoadRequest(ctx context.Context, cfg *Config, table string, data []by

req.Header.Set("format", "json")
req.Header.Set("Expect", "100-continue")
req.Header.Set("strip_outer_array", "true")
req.Header.Set("read_json_by_line", "true")
if cfg.ClientConfig.Timeout != 0 {
req.Header.Set("timeout", fmt.Sprintf("%d", cfg.ClientConfig.Timeout/time.Second))
}
Expand Down Expand Up @@ -118,3 +119,19 @@ func createAndUseDatabase(ctx context.Context, conn *sql.DB, cfg *Config) error
_, err = conn.ExecContext(ctx, "USE "+cfg.Database)
return err
}

type metric interface {
dMetricGauge | dMetricSum | dMetricHistogram | dMetricExponentialHistogram | dMetricSummary
}

func toJsonLines[T dLog | dTrace | metric](data []*T) ([]byte, error) {
buf := &bytes.Buffer{}
enc := json.NewEncoder(buf)
for _, d := range data {
err := enc.Encode(d)
if err != nil {
return nil, err
}
}
return buf.Bytes(), nil
}
9 changes: 9 additions & 0 deletions exporter/dorisexporter/exporter_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package dorisexporter // import "github.com/open-telemetry/opentelemetry-collect

import (
"net"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -60,3 +61,11 @@ func findRandomPort() (int, error) {

return port, nil
}

func TestToJsonLines(t *testing.T) {
logs, err := toJsonLines([]*dLog{
{}, {},
})
require.NoError(t, err)
require.Len(t, strings.Split(string(logs), "\n"), 2+1)
}
2 changes: 1 addition & 1 deletion exporter/dorisexporter/exporter_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (e *logsExporter) pushLogData(ctx context.Context, ld plog.Logs) error {
}

func (e *logsExporter) pushLogDataInternal(ctx context.Context, logs []*dLog) error {
marshal, err := json.Marshal(logs)
marshal, err := toJsonLines(logs)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/dorisexporter/exporter_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er
}

func (e *tracesExporter) pushTraceDataInternal(ctx context.Context, traces []*dTrace) error {
marshal, err := json.Marshal(traces)
marshal, err := toJsonLines(traces)
if err != nil {
return err
}
Expand Down
3 changes: 1 addition & 2 deletions exporter/dorisexporter/metrics_exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package dorisexporter // import "github.com/open-telemetry/opentelemetry-collect

import (
_ "embed"
"encoding/json"
"fmt"

"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -118,5 +117,5 @@ func (m *metricModelExponentialHistogram) size() int {
}

func (m *metricModelExponentialHistogram) bytes() ([]byte, error) {
return json.Marshal(m.data)
return toJsonLines(m.data)
}
3 changes: 1 addition & 2 deletions exporter/dorisexporter/metrics_gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package dorisexporter // import "github.com/open-telemetry/opentelemetry-collect

import (
_ "embed"
"encoding/json"
"fmt"

"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -84,5 +83,5 @@ func (m *metricModelGauge) size() int {
}

func (m *metricModelGauge) bytes() ([]byte, error) {
return json.Marshal(m.data)
return toJsonLines(m.data)
}
3 changes: 1 addition & 2 deletions exporter/dorisexporter/metrics_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package dorisexporter // import "github.com/open-telemetry/opentelemetry-collect

import (
_ "embed"
"encoding/json"
"fmt"

"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -108,5 +107,5 @@ func (m *metricModelHistogram) size() int {
}

func (m *metricModelHistogram) bytes() ([]byte, error) {
return json.Marshal(m.data)
return toJsonLines(m.data)
}
3 changes: 1 addition & 2 deletions exporter/dorisexporter/metrics_sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package dorisexporter // import "github.com/open-telemetry/opentelemetry-collect

import (
_ "embed"
"encoding/json"
"fmt"

"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -88,5 +87,5 @@ func (m *metricModelSum) size() int {
}

func (m *metricModelSum) bytes() ([]byte, error) {
return json.Marshal(m.data)
return toJsonLines(m.data)
}
3 changes: 1 addition & 2 deletions exporter/dorisexporter/metrics_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package dorisexporter // import "github.com/open-telemetry/opentelemetry-collect

import (
_ "embed"
"encoding/json"
"fmt"

"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -89,5 +88,5 @@ func (m *metricModelSummary) size() int {
}

func (m *metricModelSummary) bytes() ([]byte, error) {
return json.Marshal(m.data)
return toJsonLines(m.data)
}

0 comments on commit 667e81e

Please sign in to comment.