Skip to content

Commit

Permalink
Migrate fileexporter to new data model. (#1488)
Browse files Browse the repository at this point in the history
* Migrate fileexporter to new data model.

* Formalize fileexporter format

* Polish

* Polish
  • Loading branch information
iNikem authored Aug 5, 2020
1 parent 56a22ad commit 38453fd
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 355 deletions.
6 changes: 3 additions & 3 deletions exporter/fileexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

This exporter will write the pipeline data to a JSON file.
The data is written in Protobuf JSON encoding
(https://developers.google.com/protocol-buffers/docs/proto3#json).
Note that there are no compatibility guarantees for this format, since it
just a dump of internal structures which can be changed over time.
(https://developers.google.com/protocol-buffers/docs/proto3#json)
using [OpenTelemetry protocol](https://github.com/open-telemetry/opentelemetry-proto).
Please note that there is no guarantee that exact field names will remain stable.
This intended for primarily for debugging Collector without setting up backends.

The following settings are required:
Expand Down
218 changes: 22 additions & 196 deletions exporter/fileexporter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,121 +19,21 @@ import (
"io"
"sync"

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/translator/internaldata"
"go.opentelemetry.io/collector/internal/data"
otlplogs "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/logs/v1"
otlpmetrics "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/metrics/v1"
otlptrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1"
)

// Marshaler configuration used for marhsaling Protobuf to JSON. Use default config.
var marshaler = &jsonpb.Marshaler{}

// Helper struct to write JSON objects and arrays.
type jsonWriter struct {
firstFieldDone bool
firstArrayItemDone bool
writer io.Writer
}

func (jw *jsonWriter) Reset() {
jw.firstFieldDone = false
}

// Begin writing JSON. Call first.
func (jw *jsonWriter) Begin() error {
_, err := io.WriteString(jw.writer, "{\n")
return err
}

// End writing JSON. Call last.
func (jw *jsonWriter) End() error {
_, err := io.WriteString(jw.writer, "\n}\n")
return err
}

// MarshalObject marshals an object as a field of top-level object.
func (jw *jsonWriter) MarshalObject(fieldName string, pb proto.Message) error {
if jw.firstFieldDone {
io.WriteString(jw.writer, ",\n")
} else {
jw.firstFieldDone = true
}
_, err := io.WriteString(jw.writer, ` "`+fieldName+`": `)
if err != nil {
return err
}

err = marshaler.Marshal(jw.writer, pb)
if err != nil {
return err
}
return nil
}

// BeginMarshalArray prepares to marshal array items under a field of top-level object.
func (jw *jsonWriter) BeginMarshalArray(fieldName string) error {
if jw.firstFieldDone {
io.WriteString(jw.writer, ",\n")
} else {
jw.firstFieldDone = true
}
_, err := io.WriteString(jw.writer, ` "`+fieldName+"\": [")
jw.firstArrayItemDone = false
return err
}

// EndMarshalArray must be called after all array items are marshaled.
func (jw *jsonWriter) EndMarshalArray() error {
var str string
if jw.firstArrayItemDone {
// Non-empty array. End on a new line.
str = "\n ]"
} else {
// Empty array. End on the same line.
str = "]"
}
_, err := io.WriteString(jw.writer, str)
return err
}

// MarshalArrayItem marshals single array item. Call repeatedly after BeginMarshalArray.
func (jw *jsonWriter) MarshalArrayItem(pb proto.Message) error {
var str string
if jw.firstArrayItemDone {
str = ",\n "
} else {
str = "\n "
jw.firstArrayItemDone = true
}
_, err := io.WriteString(jw.writer, str)
if err != nil {
return err
}
err = marshaler.Marshal(jw.writer, pb)
if err != nil {
return err
}
return nil
}

func exportResourceAndNode(writer *jsonWriter, node *commonpb.Node, resource *resourcepb.Resource) error {
if resource != nil {
err := writer.MarshalObject("resource", resource)
if err != nil {
return err
}
}
if node != nil {
return writer.MarshalObject("node", node)
}
return nil
}

// fileExporter is the implementation of file exporter that writes telemetry data to a file
// in Protobuf-JSON format.
type fileExporter struct {
Expand All @@ -142,109 +42,35 @@ type fileExporter struct {
}

func (e *fileExporter) ConsumeTraces(_ context.Context, td pdata.Traces) error {
octds := internaldata.TraceDataToOC(td)
for _, octd := range octds {
// Ensure only one write operation happens at a time.
e.mutex.Lock()
defer e.mutex.Unlock()

// Prepare to write JSON object.
jw := &jsonWriter{writer: e.file}
if err := jw.Begin(); err != nil {
return err
}
defer jw.End()

if err := exportResourceAndNode(jw, octd.Node, octd.Resource); err != nil {
return err
}

if err := jw.BeginMarshalArray("spans"); err != nil {
return err
}
defer jw.EndMarshalArray()

for _, span := range octd.Spans {
if span != nil {
if err := jw.MarshalArrayItem(span); err != nil {
return err
}
}
}
request := otlptrace.ExportTraceServiceRequest{
ResourceSpans: pdata.TracesToOtlp(td),
}
return nil
return exportMessageAsLine(e, &request)
}

func (e *fileExporter) ConsumeMetrics(_ context.Context, md pdata.Metrics) error {
ocmds := pdatautil.MetricsToMetricsData(md)
for _, ocmd := range ocmds {
// Ensure only one write operation happens at a time.
e.mutex.Lock()
defer e.mutex.Unlock()

// Prepare to write JSON object.
jw := &jsonWriter{writer: e.file}
if err := jw.Begin(); err != nil {
return err
}
defer jw.End()

if err := exportResourceAndNode(jw, ocmd.Node, ocmd.Resource); err != nil {
return err
}

if err := jw.BeginMarshalArray("metrics"); err != nil {
return err
}
defer jw.EndMarshalArray()

for _, metric := range ocmd.Metrics {
if metric != nil {
if err := jw.MarshalArrayItem(metric); err != nil {
return err
}
}
}
request := otlpmetrics.ExportMetricsServiceRequest{
ResourceMetrics: data.MetricDataToOtlp(pdatautil.MetricsToInternalMetrics(md)),
}
return nil
return exportMessageAsLine(e, &request)
}

func (e *fileExporter) ConsumeLogs(_ context.Context, ld pdata.Logs) error {
request := otlplogs.ExportLogsServiceRequest{
ResourceLogs: pdata.LogsToOtlp(ld),
}
return exportMessageAsLine(e, &request)
}

func exportMessageAsLine(e *fileExporter, message proto.Message) error {
// Ensure only one write operation happens at a time.
e.mutex.Lock()
defer e.mutex.Unlock()

// Prepare to write JSON object.
jw := &jsonWriter{writer: e.file}

logsProto := pdata.LogsToOtlp(ld)

for _, rl := range logsProto {
if err := jw.Begin(); err != nil {
return err
}
err := jw.MarshalObject("resource", rl.Resource)
if err != nil {
return err
}

if err := jw.BeginMarshalArray("logs"); err != nil {
return err
}

for _, ill := range rl.InstrumentationLibraryLogs {
// TODO: output ill.InstrumentationLibrary
for _, log := range ill.Logs {
if log != nil {
if err := jw.MarshalArrayItem(log); err != nil {
return err
}
}
}
}
jw.EndMarshalArray()
jw.End()
jw.Reset()
if err := marshaler.Marshal(e.file, message); err != nil {
return err
}
if _, err := io.WriteString(e.file, "\n"); err != nil {
return err
}
return nil
}
Expand Down
Loading

0 comments on commit 38453fd

Please sign in to comment.