Skip to content

Commit

Permalink
[processor/geoipprocessor] Add attributes parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
bencehornak committed Jan 6, 2025
1 parent 8cd5287 commit b62e20f
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 21 deletions.
10 changes: 6 additions & 4 deletions processor/geoipprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

## Description

The geoIP processor `geoipprocessor` enhances the attributes of a span, log, or metric by appending information about the geographical location of an IP address. To add geographical information, the IP address must be included in the attributes using the [`client.address`](https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/general/attributes.md#client-attributes) or the [`source.address`](https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/general/attributes.md#source) semantic conventions key attribute. By default, only the resource attributes will be modified. Please refer to [config.go](./config.go) for the config spec.
The geoIP processor `geoipprocessor` enhances the attributes of a span, log, or metric by appending information about the geographical location of an IP address. To add geographical information, the IP address must be included in the attributes specified by `attributes` ([`client.address`](https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/general/attributes.md#client-attributes) and [`source.address`](https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/general/attributes.md#source) by default). By default, only the resource attributes will be modified. Please refer to [config.go](./config.go) for the config spec.

### Geographical location metadata

Expand All @@ -36,13 +36,14 @@ The following [resource attributes](./internal/convention/attributes.go) will be

## Configuration

The following settings must be configured:
The following settings can be configured:

- `providers`: A map containing geographical location information providers. These providers are used to search for the geographical location attributes associated with an IP. Supported providers:
- [maxmind](./internal/provider/maxmindprovider/README.md)
- `context`: Allows specifying the underlying telemetry context the processor will work with. Available values:
- `resource`(default): Resource attributes.
- `context` (default: `resource`): Allows specifying the underlying telemetry context the processor will work with. Available values:
- `resource`: Resource attributes.
- `record`: Attributes within a data point, log record or a span.
- `attributes` (default: `[client.address, source.address]`): An array of attribute names, which are used for the IP address lookup

## Examples

Expand All @@ -54,4 +55,5 @@ processors:
providers:
maxmind:
database_path: /tmp/mygeodb
attributes: [client.address, source.address, custom.address]
```
8 changes: 8 additions & 0 deletions processor/geoipprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/otel/attribute"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoipprocessor/internal/provider"
)
Expand Down Expand Up @@ -43,6 +44,9 @@ type Config struct {

// Context section allows specifying the source type to look for the IP. Available options: resource or record.
Context ContextID `mapstructure:"context"`

// An array of attribute names, which are used for the IP address lookup
Attributes []attribute.Key `mapstructure:"attributes"`
}

var (
Expand All @@ -62,6 +66,10 @@ func (cfg *Config) Validate() error {
}
}

if cfg.Attributes != nil && len(cfg.Attributes) == 0 {
return errors.New("the attributes array must not be empty")
}

return nil
}

Expand Down
17 changes: 17 additions & 0 deletions processor/geoipprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/otelcol/otelcoltest"
"go.opentelemetry.io/otel/attribute"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoipprocessor/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoipprocessor/internal/provider"
Expand All @@ -39,6 +40,7 @@ func TestLoadConfig(t *testing.T) {
Providers: map[string]provider.Config{
"maxmind": &maxmind.Config{DatabasePath: "/tmp/db"},
},
Attributes: defaultAttributes,
},
},
{
Expand All @@ -48,6 +50,7 @@ func TestLoadConfig(t *testing.T) {
Providers: map[string]provider.Config{
"maxmind": &maxmind.Config{DatabasePath: "/tmp/db"},
},
Attributes: defaultAttributes,
},
},
{
Expand All @@ -58,6 +61,20 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, "invalid_source"),
unmarshalErrorMessage: "unknown context not.an.otlp.context, available values: resource, record",
},
{
id: component.NewIDWithName(metadata.Type, "invalid_source_attributes"),
validateErrorMessage: "the attributes array must not be empty",
},
{
id: component.NewIDWithName(metadata.Type, "custom_source_attributes"),
expected: &Config{
Context: resource,
Providers: map[string]provider.Config{
"maxmind": &maxmind.Config{DatabasePath: "/tmp/db"},
},
Attributes: []attribute.Key{"client.address", "source.address", "custom.address"},
},
},
}

for _, tt := range tests {
Expand Down
13 changes: 7 additions & 6 deletions processor/geoipprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (

var (
processorCapabilities = consumer.Capabilities{MutatesData: true}
// defaultResourceAttributes holds a list of default resource attribute keys.
// defaultAttributes holds a list of default resource attribute keys.
// These keys are used to identify an IP address attribute associated with the resource.
defaultResourceAttributes = []attribute.Key{
defaultAttributes = []attribute.Key{
// The client attributes are in use by the HTTP semantic conventions
semconv.ClientAddressKey,
// The source attributes are used when there is no client/server relationship between the two sides, or when that relationship is unknown
Expand Down Expand Up @@ -55,7 +55,8 @@ func getProviderFactory(key string) (provider.GeoIPProviderFactory, bool) {
// createDefaultConfig returns a default configuration for the processor.
func createDefaultConfig() component.Config {
return &Config{
Context: resource,
Context: resource,
Attributes: defaultAttributes,
}
}

Expand Down Expand Up @@ -91,7 +92,7 @@ func createMetricsProcessor(ctx context.Context, set processor.Settings, cfg com
if err != nil {
return nil, err
}
return processorhelper.NewMetrics(ctx, set, cfg, nextConsumer, newGeoIPProcessor(geoCfg, defaultResourceAttributes, providers, set).processMetrics, processorhelper.WithCapabilities(processorCapabilities))
return processorhelper.NewMetrics(ctx, set, cfg, nextConsumer, newGeoIPProcessor(geoCfg, providers, set).processMetrics, processorhelper.WithCapabilities(processorCapabilities))
}

func createTracesProcessor(ctx context.Context, set processor.Settings, cfg component.Config, nextConsumer consumer.Traces) (processor.Traces, error) {
Expand All @@ -100,7 +101,7 @@ func createTracesProcessor(ctx context.Context, set processor.Settings, cfg comp
if err != nil {
return nil, err
}
return processorhelper.NewTraces(ctx, set, cfg, nextConsumer, newGeoIPProcessor(geoCfg, defaultResourceAttributes, providers, set).processTraces, processorhelper.WithCapabilities(processorCapabilities))
return processorhelper.NewTraces(ctx, set, cfg, nextConsumer, newGeoIPProcessor(geoCfg, providers, set).processTraces, processorhelper.WithCapabilities(processorCapabilities))
}

func createLogsProcessor(ctx context.Context, set processor.Settings, cfg component.Config, nextConsumer consumer.Logs) (processor.Logs, error) {
Expand All @@ -109,5 +110,5 @@ func createLogsProcessor(ctx context.Context, set processor.Settings, cfg compon
if err != nil {
return nil, err
}
return processorhelper.NewLogs(ctx, set, cfg, nextConsumer, newGeoIPProcessor(geoCfg, defaultResourceAttributes, providers, set).processLogs, processorhelper.WithCapabilities(processorCapabilities))
return processorhelper.NewLogs(ctx, set, cfg, nextConsumer, newGeoIPProcessor(geoCfg, providers, set).processLogs, processorhelper.WithCapabilities(processorCapabilities))
}
16 changes: 7 additions & 9 deletions processor/geoipprocessor/geoip_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,17 @@ var (

// newGeoIPProcessor creates a new instance of geoIPProcessor with the specified fields.
type geoIPProcessor struct {
providers []provider.GeoIPProvider
resourceAttributes []attribute.Key
logger *zap.Logger
providers []provider.GeoIPProvider
logger *zap.Logger

cfg *Config
}

func newGeoIPProcessor(processorConfig *Config, resourceAttributes []attribute.Key, providers []provider.GeoIPProvider, params processor.Settings) *geoIPProcessor {
func newGeoIPProcessor(processorConfig *Config, providers []provider.GeoIPProvider, params processor.Settings) *geoIPProcessor {
return &geoIPProcessor{
resourceAttributes: resourceAttributes,
providers: providers,
cfg: processorConfig,
logger: params.Logger,
providers: providers,
cfg: processorConfig,
logger: params.Logger,
}
}

Expand Down Expand Up @@ -92,7 +90,7 @@ func (g *geoIPProcessor) geoLocation(ctx context.Context, ip net.IP) (attribute.

// processAttributes processes a pcommon.Map by adding geolocation attributes based on the found IP address.
func (g *geoIPProcessor) processAttributes(ctx context.Context, metadata pcommon.Map) error {
ipAddr, err := ipFromAttributes(g.resourceAttributes, metadata)
ipAddr, err := ipFromAttributes(g.cfg.Attributes, metadata)
if err != nil {
// TODO: log IP error not found
if errors.Is(err, errIPNotFound) {
Expand Down
2 changes: 1 addition & 1 deletion processor/geoipprocessor/geoip_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func TestProcessor(t *testing.T) {

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
cfg := &Config{Context: tt.context, Providers: map[string]provider.Config{providerKey: &providerConfigMock{}}}
cfg := &Config{Context: tt.context, Providers: map[string]provider.Config{providerKey: &providerConfigMock{}}, Attributes: defaultAttributes}
compareAllSignals(cfg, tt.goldenDir)(t)
})
}
Expand Down
2 changes: 1 addition & 1 deletion processor/geoipprocessor/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestProcessorWithMaxMind(t *testing.T) {

for _, tt := range testCases {
t.Run("maxmind_"+tt.name, func(t *testing.T) {
cfg := &Config{Context: tt.context, Providers: map[string]provider.Config{"maxmind": &maxmindConfig}}
cfg := &Config{Context: tt.context, Providers: map[string]provider.Config{"maxmind": &maxmindConfig}, Attributes: defaultAttributes}

compareAllSignals(cfg, tt.goldenDir)(t)
})
Expand Down
10 changes: 10 additions & 0 deletions processor/geoipprocessor/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,13 @@ geoip/invalid_source:
maxmind:
database_path: /tmp/db
context: not.an.otlp.context
geoip/invalid_source_attributes:
providers:
maxmind:
database_path: /tmp/db
attributes: []
geoip/custom_source_attributes:
providers:
maxmind:
database_path: /tmp/db
attributes: [client.address, source.address, custom.address]

0 comments on commit b62e20f

Please sign in to comment.