Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add liveness probe to created container if otelcol configuration supports a health_check. #574

143 changes: 143 additions & 0 deletions pkg/collector/adapters/config_to_probe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package adapters

import (
"errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"strings"
)

var (
errNoService = errors.New("no service available as part of the configuration")
errNoExtensions = errors.New("no extensions available as part of the configuration")

errServiceNotAMap = errors.New("service property in the configuration doesn't contain valid services")
errExtensionsNotAMap = errors.New("extensions property in the configuration doesn't contain valid extensions")

errNoExtensionHealthCheck = errors.New("extensions property in the configuration does not contain the expected health_check extension")

errNoServiceExtensions = errors.New("service property in the configuration doesn't contain extensions")

errServiceExtensionsNotSlice = errors.New("service extensions property in the configuration does not contain valid extensions")
errNoServiceExtensionHealthCheck = errors.New("no healthcheck extension available in service extension configuration")
)

type probeConfiguration struct {
path string
port intstr.IntOrString
}

const (
defaultHealthCheckPath = "/"
defaultHealthCheckPort = 13133
)

// ConfigToContainerProbe converts the incoming configuration object into a container probe or returns an error
func ConfigToContainerProbe(config map[interface{}]interface{}) (*corev1.Probe, error) {
serviceProperty, ok := config["service"]
if !ok {
return nil, errNoService
}
service, ok := serviceProperty.(map[interface{}]interface{})
if !ok {
return nil, errServiceNotAMap
}

serviceExtensionsProperty, ok := service["extensions"]
if !ok {
return nil, errNoServiceExtensions
}

serviceExtensions, ok := serviceExtensionsProperty.([]interface{})
if !ok {
return nil, errServiceExtensionsNotSlice
}
healthCheckServiceExtensions := make([]string, 0)
for _, ext := range serviceExtensions {
parsedExt, ok := ext.(string)
if ok && strings.HasPrefix(parsedExt, "health_check") {
healthCheckServiceExtensions = append(healthCheckServiceExtensions, parsedExt)
}
}

if len(healthCheckServiceExtensions) == 0 {
return nil, errNoServiceExtensionHealthCheck
}

extensionsProperty, ok := config["extensions"]
if !ok {
return nil, errNoExtensions
}
extensions, ok := extensionsProperty.(map[interface{}]interface{})
if !ok {
return nil, errExtensionsNotAMap
}
// in the event of multiple health_check service extensions defined, we arbitrarily take the first one found
for _, healthCheckForProbe := range healthCheckServiceExtensions {
healthCheckExtension, ok := extensions[healthCheckForProbe]
if ok {
return createProbeFromExtension(healthCheckExtension)
}
}

return nil, errNoExtensionHealthCheck
}

func createProbeFromExtension(extension interface{}) (*corev1.Probe, error) {
probeCfg := extractProbeConfigurationFromExtension(extension)
return &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: probeCfg.path,
Port: probeCfg.port,
},
},
}, nil
}

func extractProbeConfigurationFromExtension(ext interface{}) probeConfiguration {
extensionCfg, ok := ext.(map[interface{}]interface{})
if !ok {
return defaultProbeConfiguration()
}
return probeConfiguration{
path: extractPathFromExtensionConfig(extensionCfg),
port: extractPortFromExtensionConfig(extensionCfg),
}
}

func defaultProbeConfiguration() probeConfiguration {
return probeConfiguration{
path: defaultHealthCheckPath,
port: intstr.FromInt(defaultHealthCheckPort),
}
}

func extractPathFromExtensionConfig(cfg map[interface{}]interface{}) string {
if path, ok := cfg["path"]; ok {
if parsedPath, ok := path.(string); ok {
return parsedPath
}
}
return defaultHealthCheckPath
}

func extractPortFromExtensionConfig(cfg map[interface{}]interface{}) intstr.IntOrString {
endpoint, ok := cfg["endpoint"]
if !ok {
return defaultHealthCheckEndpoint()
}
parsedEndpoint, ok := endpoint.(string)
if !ok {
return defaultHealthCheckEndpoint()
}
endpointComponents := strings.Split(parsedEndpoint, ":")
if len(endpointComponents) != 2 {
return defaultHealthCheckEndpoint()
}
return intstr.Parse(endpointComponents[1])
}

func defaultHealthCheckEndpoint() intstr.IntOrString {
return intstr.FromInt(defaultHealthCheckPort)
}
172 changes: 172 additions & 0 deletions pkg/collector/adapters/config_to_probe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package adapters

import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
)

func TestConfigToProbeShouldCreateProbeFor(t *testing.T) {
tests := []struct {
desc string
config string
expectedPort int32
expectedPath string
}{
{
desc: "SimpleHappyPath",
expectedPort: int32(13133),
expectedPath: "/",
config: `extensions:
health_check:
service:
extensions: [health_check]`,
}, {
desc: "CustomEndpointAndPath",
expectedPort: int32(1234),
expectedPath: "/checkit",
config: `extensions:
health_check:
endpoint: localhost:1234
path: /checkit
service:
extensions: [health_check]`,
}, {
desc: "CustomEndpointAndDefaultPath",
expectedPort: int32(1234),
expectedPath: "/",
config: `extensions:
health_check:
endpoint: localhost:1234
adriankostrubiak-tomtom marked this conversation as resolved.
Show resolved Hide resolved
service:
extensions: [health_check]`,
}, {
desc: "CustomEndpointWithJustPortAndDefaultPath",
expectedPort: int32(1234),
expectedPath: "/",
config: `extensions:
health_check:
endpoint: :1234
service:
extensions: [health_check]`,
}, {
desc: "DefaultEndpointAndCustomPath",
expectedPort: int32(13133),
expectedPath: "/checkit",
config: `extensions:
health_check:
path: /checkit
service:
extensions: [health_check]`,
}, {
desc: "DefaultEndpointForUnexpectedEndpoint",
expectedPort: int32(13133),
expectedPath: "/",
config: `extensions:
health_check:
endpoint: 0:0:0"
service:
extensions: [health_check]`,
}, {
desc: "DefaultEndpointForUnparseablendpoint",
expectedPort: int32(13133),
expectedPath: "/",
config: `extensions:
health_check:
endpoint:
this: should-not-be-a-map"
service:
extensions: [health_check]`,
}, {
desc: "WillUseSecondServiceExtension",
config: `extensions:
health_check:
service:
extensions: [health_check/1, health_check]`,
expectedPort: int32(13133),
expectedPath: "/",
},
}

for _, test := range tests {
// prepare
config, err := ConfigFromString(test.config)
require.NoError(t, err, test.desc)
require.NotEmpty(t, config, test.desc)

// test
actualProbe, err := ConfigToContainerProbe(config)
assert.NoError(t, err)
assert.Equal(t, test.expectedPath, actualProbe.HTTPGet.Path, test.desc)
assert.Equal(t, test.expectedPort, actualProbe.HTTPGet.Port.IntVal, test.desc)
assert.Equal(t, "", actualProbe.HTTPGet.Host, test.desc)
}
}

func TestConfigToProbeShouldErrorIf(t *testing.T) {
tests := []struct {
desc string
config string
expectedErr error
}{
{
desc: "NoHealthCheckExtension",
config: `extensions:
pprof:
service:
extensions: [health_check]`,
expectedErr: errNoExtensionHealthCheck,
}, {
desc: "BadlyFormattedExtensions",
config: `extensions: [hi]
service:
extensions: [health_check]`,
expectedErr: errExtensionsNotAMap,
}, {
desc: "NoExtensions",
config: `service:
extensions: [health_check]`,
expectedErr: errNoExtensions,
}, {
desc: "NoHealthCheckInServiceExtensions",
config: `service:
extensions: [pprof]`,
expectedErr: errNoServiceExtensionHealthCheck,
}, {
desc: "BadlyFormattedServiceExtensions",
config: `service:
extensions:
this: should-not-be-a-map`,
expectedErr: errServiceExtensionsNotSlice,
}, {
desc: "NoServiceExtensions",
config: `service:
pipelines:
traces:
receivers: [otlp]`,
expectedErr: errNoServiceExtensions,
}, {
desc: "BadlyFormattedService",
config: `extensions:
health_check:
service: [hi]`,
expectedErr: errServiceNotAMap,
}, {
desc: "NoService",
config: `extensions:
health_check:`,
expectedErr: errNoService,
},
}

for _, test := range tests {
// prepare
config, err := ConfigFromString(test.config)
require.NoError(t, err, test.desc)
require.NotEmpty(t, config, test.desc)

// test
_, err = ConfigToContainerProbe(config)
assert.Equal(t, test.expectedErr, err, test.desc)
}
}
10 changes: 9 additions & 1 deletion pkg/collector/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ package collector

import (
"fmt"

"github.com/go-logr/logr"
"github.com/open-telemetry/opentelemetry-operator/pkg/collector/adapters"
corev1 "k8s.io/api/core/v1"

adriankostrubiak-tomtom marked this conversation as resolved.
Show resolved Hide resolved
"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
Expand Down Expand Up @@ -77,6 +77,13 @@ func Container(cfg config.Config, logger logr.Logger, otelcol v1alpha1.OpenTelem
},
})

var livenessProbe *corev1.Probe
if config, err := adapters.ConfigFromString(otelcol.Spec.Config); err == nil {
if probe, err := adapters.ConfigToContainerProbe(config); err == nil {
livenessProbe = probe
}
}

return corev1.Container{
Name: naming.Container(),
Image: image,
Expand All @@ -87,5 +94,6 @@ func Container(cfg config.Config, logger logr.Logger, otelcol v1alpha1.OpenTelem
EnvFrom: otelcol.Spec.EnvFrom,
Resources: otelcol.Spec.Resources,
SecurityContext: otelcol.Spec.SecurityContext,
LivenessProbe: livenessProbe,
}
}
21 changes: 21 additions & 0 deletions pkg/collector/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,24 @@ func TestContainerEnvFrom(t *testing.T) {
assert.Contains(t, c.EnvFrom, envFrom1)
assert.Contains(t, c.EnvFrom, envFrom2)
}

func TestContainerProbe(t *testing.T) {
// prepare
otelcol := v1alpha1.OpenTelemetryCollector{
Spec: v1alpha1.OpenTelemetryCollectorSpec{
Config: `extensions:
health_check:
service:
extensions: [health_check]`,
},
}
cfg := config.New()

// test
c := Container(cfg, logger, otelcol)

// verify
assert.Equal(t, "/", c.LivenessProbe.HTTPGet.Path)
assert.Equal(t, int32(13133), c.LivenessProbe.HTTPGet.Port.IntVal)
assert.Equal(t, "", c.LivenessProbe.HTTPGet.Host)
}