-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
factory.go
155 lines (130 loc) · 4.33 KB
/
factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package skywalkingreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver"
// This file implements factory for skywalking receiver.
import (
"context"
"fmt"
"net"
"strconv"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/metadata"
)
const (
// Protocol values.
protoGRPC = "grpc"
protoHTTP = "http"
// Default endpoints to bind to.
defaultGRPCBindEndpoint = "0.0.0.0:11800"
defaultHTTPBindEndpoint = "0.0.0.0:12800"
)
// NewFactory creates a new Skywalking receiver factory.
func NewFactory() receiver.Factory {
return receiver.NewFactory(
metadata.Type,
createDefaultConfig,
receiver.WithTraces(createTracesReceiver, metadata.TracesStability),
receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability))
}
// CreateDefaultConfig creates the default configuration for Skywalking receiver.
func createDefaultConfig() component.Config {
return &Config{
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: defaultGRPCBindEndpoint,
Transport: "tcp",
},
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: defaultHTTPBindEndpoint,
},
},
}
}
// createTracesReceiver creates a trace receiver based on provided config.
func createTracesReceiver(
_ context.Context,
set receiver.CreateSettings,
cfg component.Config,
nextConsumer consumer.Traces,
) (receiver.Traces, error) {
// Convert settings in the source c to configuration struct
// that Skywalking receiver understands.
rCfg := cfg.(*Config)
c, err := createConfiguration(rCfg)
if err != nil {
return nil, err
}
r := receivers.GetOrAdd(cfg, func() component.Component {
return newSkywalkingReceiver(c, set)
})
if err = r.Unwrap().(*swReceiver).registerTraceConsumer(nextConsumer); err != nil {
return nil, err
}
return r, nil
}
// createMetricsReceiver creates a metrics receiver based on provided config.
func createMetricsReceiver(
_ context.Context,
set receiver.CreateSettings,
cfg component.Config,
nextConsumer consumer.Metrics,
) (receiver.Metrics, error) {
// Convert settings in the source c to configuration struct
// that Skywalking receiver understands.
rCfg := cfg.(*Config)
c, err := createConfiguration(rCfg)
if err != nil {
return nil, err
}
r := receivers.GetOrAdd(cfg, func() component.Component {
return newSkywalkingReceiver(c, set)
})
if err = r.Unwrap().(*swReceiver).registerMetricsConsumer(nextConsumer); err != nil {
return nil, err
}
return r, nil
}
// create the config that Skywalking receiver will use.
func createConfiguration(rCfg *Config) (*configuration, error) {
var err error
var c configuration
// Set ports
if rCfg.Protocols.GRPC != nil {
c.CollectorGRPCServerSettings = *rCfg.Protocols.GRPC
if c.CollectorGRPCPort, err = extractPortFromEndpoint(rCfg.Protocols.GRPC.NetAddr.Endpoint); err != nil {
return nil, fmt.Errorf("unable to extract port for the gRPC endpoint: %w", err)
}
}
if rCfg.Protocols.HTTP != nil {
c.CollectorHTTPSettings = *rCfg.Protocols.HTTP
if c.CollectorHTTPPort, err = extractPortFromEndpoint(rCfg.Protocols.HTTP.Endpoint); err != nil {
return nil, fmt.Errorf("unable to extract port for the HTTP endpoint: %w", err)
}
}
return &c, nil
}
// extract the port number from string in "address:port" format. If the
// port number cannot be extracted returns an error.
func extractPortFromEndpoint(endpoint string) (int, error) {
_, portStr, err := net.SplitHostPort(endpoint)
if err != nil {
return 0, fmt.Errorf("endpoint is not formatted correctly: %w", err)
}
port, err := strconv.ParseInt(portStr, 10, 0)
if err != nil {
return 0, fmt.Errorf("endpoint port is not a number: %w", err)
}
if port < 1 || port > 65535 {
return 0, fmt.Errorf("port number must be between 1 and 65535")
}
return int(port), nil
}
var receivers = sharedcomponent.NewSharedComponents()