Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add liveness probe to created container if otelcol configuration supports a health_check. #574

158 changes: 158 additions & 0 deletions pkg/collector/adapters/config_to_probe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package adapters

import (
"errors"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

var (
errNoService = errors.New("no service available as part of the configuration")
errNoExtensions = errors.New("no extensions available as part of the configuration")

errServiceNotAMap = errors.New("service property in the configuration doesn't contain valid services")
errExtensionsNotAMap = errors.New("extensions property in the configuration doesn't contain valid extensions")

errNoExtensionHealthCheck = errors.New("extensions property in the configuration does not contain the expected health_check extension")

errNoServiceExtensions = errors.New("service property in the configuration doesn't contain extensions")

errServiceExtensionsNotSlice = errors.New("service extensions property in the configuration does not contain valid extensions")
errNoServiceExtensionHealthCheck = errors.New("no healthcheck extension available in service extension configuration")
)

type probeConfiguration struct {
path string
port intstr.IntOrString
}

const (
defaultHealthCheckPath = "/"
defaultHealthCheckPort = 13133
)

// ConfigToContainerProbe converts the incoming configuration object into a container probe or returns an error.
func ConfigToContainerProbe(config map[interface{}]interface{}) (*corev1.Probe, error) {
serviceProperty, ok := config["service"]
if !ok {
return nil, errNoService
}
service, ok := serviceProperty.(map[interface{}]interface{})
if !ok {
return nil, errServiceNotAMap
}

serviceExtensionsProperty, ok := service["extensions"]
if !ok {
return nil, errNoServiceExtensions
}

serviceExtensions, ok := serviceExtensionsProperty.([]interface{})
if !ok {
return nil, errServiceExtensionsNotSlice
}
healthCheckServiceExtensions := make([]string, 0)
for _, ext := range serviceExtensions {
parsedExt, ok := ext.(string)
if ok && strings.HasPrefix(parsedExt, "health_check") {
healthCheckServiceExtensions = append(healthCheckServiceExtensions, parsedExt)
}
}

if len(healthCheckServiceExtensions) == 0 {
return nil, errNoServiceExtensionHealthCheck
}

extensionsProperty, ok := config["extensions"]
if !ok {
return nil, errNoExtensions
}
extensions, ok := extensionsProperty.(map[interface{}]interface{})
if !ok {
return nil, errExtensionsNotAMap
}
// in the event of multiple health_check service extensions defined, we arbitrarily take the first one found
for _, healthCheckForProbe := range healthCheckServiceExtensions {
healthCheckExtension, ok := extensions[healthCheckForProbe]
if ok {
return createProbeFromExtension(healthCheckExtension)
}
}

return nil, errNoExtensionHealthCheck
}

func createProbeFromExtension(extension interface{}) (*corev1.Probe, error) {
probeCfg := extractProbeConfigurationFromExtension(extension)
return &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: probeCfg.path,
Port: probeCfg.port,
},
},
}, nil
}

func extractProbeConfigurationFromExtension(ext interface{}) probeConfiguration {
extensionCfg, ok := ext.(map[interface{}]interface{})
if !ok {
return defaultProbeConfiguration()
}
return probeConfiguration{
path: extractPathFromExtensionConfig(extensionCfg),
port: extractPortFromExtensionConfig(extensionCfg),
}
}

func defaultProbeConfiguration() probeConfiguration {
return probeConfiguration{
path: defaultHealthCheckPath,
port: intstr.FromInt(defaultHealthCheckPort),
}
}

func extractPathFromExtensionConfig(cfg map[interface{}]interface{}) string {
if path, ok := cfg["path"]; ok {
if parsedPath, ok := path.(string); ok {
return parsedPath
}
}
return defaultHealthCheckPath
}

func extractPortFromExtensionConfig(cfg map[interface{}]interface{}) intstr.IntOrString {
endpoint, ok := cfg["endpoint"]
if !ok {
return defaultHealthCheckEndpoint()
}
parsedEndpoint, ok := endpoint.(string)
if !ok {
return defaultHealthCheckEndpoint()
}
endpointComponents := strings.Split(parsedEndpoint, ":")
if len(endpointComponents) != 2 {
return defaultHealthCheckEndpoint()
}
return intstr.Parse(endpointComponents[1])
}

func defaultHealthCheckEndpoint() intstr.IntOrString {
return intstr.FromInt(defaultHealthCheckPort)
}
187 changes: 187 additions & 0 deletions pkg/collector/adapters/config_to_probe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package adapters

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestConfigToProbeShouldCreateProbeFor(t *testing.T) {
tests := []struct {
desc string
config string
expectedPort int32
expectedPath string
}{
{
desc: "SimpleHappyPath",
expectedPort: int32(13133),
expectedPath: "/",
config: `extensions:
health_check:
service:
extensions: [health_check]`,
}, {
desc: "CustomEndpointAndPath",
expectedPort: int32(1234),
expectedPath: "/checkit",
config: `extensions:
health_check:
endpoint: localhost:1234
path: /checkit
service:
extensions: [health_check]`,
}, {
desc: "CustomEndpointAndDefaultPath",
expectedPort: int32(1234),
expectedPath: "/",
config: `extensions:
health_check:
endpoint: localhost:1234
adriankostrubiak-tomtom marked this conversation as resolved.
Show resolved Hide resolved
service:
extensions: [health_check]`,
}, {
desc: "CustomEndpointWithJustPortAndDefaultPath",
expectedPort: int32(1234),
expectedPath: "/",
config: `extensions:
health_check:
endpoint: :1234
service:
extensions: [health_check]`,
}, {
desc: "DefaultEndpointAndCustomPath",
expectedPort: int32(13133),
expectedPath: "/checkit",
config: `extensions:
health_check:
path: /checkit
service:
extensions: [health_check]`,
}, {
desc: "DefaultEndpointForUnexpectedEndpoint",
expectedPort: int32(13133),
expectedPath: "/",
config: `extensions:
health_check:
endpoint: 0:0:0"
service:
extensions: [health_check]`,
}, {
desc: "DefaultEndpointForUnparseablendpoint",
expectedPort: int32(13133),
expectedPath: "/",
config: `extensions:
health_check:
endpoint:
this: should-not-be-a-map"
service:
extensions: [health_check]`,
}, {
desc: "WillUseSecondServiceExtension",
config: `extensions:
health_check:
service:
extensions: [health_check/1, health_check]`,
expectedPort: int32(13133),
expectedPath: "/",
},
}

for _, test := range tests {
// prepare
config, err := ConfigFromString(test.config)
require.NoError(t, err, test.desc)
require.NotEmpty(t, config, test.desc)

// test
actualProbe, err := ConfigToContainerProbe(config)
assert.NoError(t, err)
assert.Equal(t, test.expectedPath, actualProbe.HTTPGet.Path, test.desc)
assert.Equal(t, test.expectedPort, actualProbe.HTTPGet.Port.IntVal, test.desc)
assert.Equal(t, "", actualProbe.HTTPGet.Host, test.desc)
}
}

func TestConfigToProbeShouldErrorIf(t *testing.T) {
tests := []struct {
desc string
config string
expectedErr error
}{
{
desc: "NoHealthCheckExtension",
config: `extensions:
pprof:
service:
extensions: [health_check]`,
expectedErr: errNoExtensionHealthCheck,
}, {
desc: "BadlyFormattedExtensions",
config: `extensions: [hi]
service:
extensions: [health_check]`,
expectedErr: errExtensionsNotAMap,
}, {
desc: "NoExtensions",
config: `service:
extensions: [health_check]`,
expectedErr: errNoExtensions,
}, {
desc: "NoHealthCheckInServiceExtensions",
config: `service:
extensions: [pprof]`,
expectedErr: errNoServiceExtensionHealthCheck,
}, {
desc: "BadlyFormattedServiceExtensions",
config: `service:
extensions:
this: should-not-be-a-map`,
expectedErr: errServiceExtensionsNotSlice,
}, {
desc: "NoServiceExtensions",
config: `service:
pipelines:
traces:
receivers: [otlp]`,
expectedErr: errNoServiceExtensions,
}, {
desc: "BadlyFormattedService",
config: `extensions:
health_check:
service: [hi]`,
expectedErr: errServiceNotAMap,
}, {
desc: "NoService",
config: `extensions:
health_check:`,
expectedErr: errNoService,
},
}

for _, test := range tests {
// prepare
config, err := ConfigFromString(test.config)
require.NoError(t, err, test.desc)
require.NotEmpty(t, config, test.desc)

// test
_, err = ConfigToContainerProbe(config)
assert.Equal(t, test.expectedErr, err, test.desc)
}
}
10 changes: 10 additions & 0 deletions pkg/collector/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"

"github.com/open-telemetry/opentelemetry-operator/pkg/collector/adapters"

adriankostrubiak-tomtom marked this conversation as resolved.
Show resolved Hide resolved
"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
"github.com/open-telemetry/opentelemetry-operator/internal/config"
"github.com/open-telemetry/opentelemetry-operator/pkg/naming"
Expand Down Expand Up @@ -77,6 +79,13 @@ func Container(cfg config.Config, logger logr.Logger, otelcol v1alpha1.OpenTelem
},
})

var livenessProbe *corev1.Probe
if config, err := adapters.ConfigFromString(otelcol.Spec.Config); err == nil {
if probe, err := adapters.ConfigToContainerProbe(config); err == nil {
livenessProbe = probe
}
}

return corev1.Container{
Name: naming.Container(),
Image: image,
Expand All @@ -87,5 +96,6 @@ func Container(cfg config.Config, logger logr.Logger, otelcol v1alpha1.OpenTelem
EnvFrom: otelcol.Spec.EnvFrom,
Resources: otelcol.Spec.Resources,
SecurityContext: otelcol.Spec.SecurityContext,
LivenessProbe: livenessProbe,
}
}
Loading