Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into fix-5570
Browse files Browse the repository at this point in the history
Signed-off-by: Yury Akudovich <[email protected]>
  • Loading branch information
yorik committed Jan 3, 2025
2 parents 48d38ea + 720af45 commit 7326570
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 96 deletions.
2 changes: 1 addition & 1 deletion apis/keda/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ const (
// HealthStatusFailing means the status of the health object is failing
HealthStatusFailing HealthStatusType = "Failing"

// Composite metric name used for scalingModifiers composite metric
// CompositeMetricName is used for scalingModifiers composite metric
CompositeMetricName string = "composite-metric"

defaultMinReplicas int32 = 0
Expand Down
14 changes: 7 additions & 7 deletions apis/keda/v1alpha1/scaledobject_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ func verifyHpas(incomingSo *ScaledObject, action string, _ bool) error {
return err
}

var incomingSoGckr GroupVersionKindResource
incomingSoGckr, err = ParseGVKR(restMapper, incomingSo.Spec.ScaleTargetRef.APIVersion, incomingSo.Spec.ScaleTargetRef.Kind)
var incomingSoGvkr GroupVersionKindResource
incomingSoGvkr, err = ParseGVKR(restMapper, incomingSo.Spec.ScaleTargetRef.APIVersion, incomingSo.Spec.ScaleTargetRef.Kind)
if err != nil {
scaledobjectlog.Error(err, "Failed to parse Group, Version, Kind, Resource from incoming ScaledObject", "apiVersion", incomingSo.Spec.ScaleTargetRef.APIVersion, "kind", incomingSo.Spec.ScaleTargetRef.Kind)
return err
Expand All @@ -245,13 +245,13 @@ func verifyHpas(incomingSo *ScaledObject, action string, _ bool) error {
val, _ := json.MarshalIndent(hpa, "", " ")
scaledobjectlog.V(1).Info(fmt.Sprintf("checking hpa %s: %v", hpa.Name, string(val)))

hpaGckr, err := ParseGVKR(restMapper, hpa.Spec.ScaleTargetRef.APIVersion, hpa.Spec.ScaleTargetRef.Kind)
hpaGvkr, err := ParseGVKR(restMapper, hpa.Spec.ScaleTargetRef.APIVersion, hpa.Spec.ScaleTargetRef.Kind)
if err != nil {
scaledobjectlog.Error(err, "Failed to parse Group, Version, Kind, Resource from HPA", "hpaName", hpa.Name, "apiVersion", hpa.Spec.ScaleTargetRef.APIVersion, "kind", hpa.Spec.ScaleTargetRef.Kind)
return err
}

if hpaGckr.GVKString() == incomingSoGckr.GVKString() &&
if hpaGvkr.GVKString() == incomingSoGvkr.GVKString() &&
hpa.Spec.ScaleTargetRef.Name == incomingSo.Spec.ScaleTargetRef.Name {
owned := false
for _, owner := range hpa.OwnerReferences {
Expand All @@ -268,7 +268,7 @@ func verifyHpas(incomingSo *ScaledObject, action string, _ bool) error {
incomingSo.Spec.Advanced.HorizontalPodAutoscalerConfig.Name == hpa.Name {
scaledobjectlog.Info(fmt.Sprintf("%s hpa ownership being transferred to %s", hpa.Name, incomingSo.Name))
} else {
err = fmt.Errorf("the workload '%s' of type '%s' is already managed by the hpa '%s'", incomingSo.Spec.ScaleTargetRef.Name, incomingSoGckr.GVKString(), hpa.Name)
err = fmt.Errorf("the workload '%s' of type '%s' is already managed by the hpa '%s'", incomingSo.Spec.ScaleTargetRef.Name, incomingSoGvkr.GVKString(), hpa.Name)
scaledobjectlog.Error(err, "validation error")
metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "other-hpa")
return err
Expand Down Expand Up @@ -363,13 +363,13 @@ func verifyCPUMemoryScalers(incomingSo *ScaledObject, action string, dryRun bool
Namespace: incomingSo.Namespace,
Name: incomingSo.Spec.ScaleTargetRef.Name,
}
incomingSoGckr, err := ParseGVKR(restMapper, incomingSo.Spec.ScaleTargetRef.APIVersion, incomingSo.Spec.ScaleTargetRef.Kind)
incomingSoGvkr, err := ParseGVKR(restMapper, incomingSo.Spec.ScaleTargetRef.APIVersion, incomingSo.Spec.ScaleTargetRef.Kind)
if err != nil {
scaledobjectlog.Error(err, "Failed to parse Group, Version, Kind, Resource from incoming ScaledObject", "apiVersion", incomingSo.Spec.ScaleTargetRef.APIVersion, "kind", incomingSo.Spec.ScaleTargetRef.Kind)
return err
}

switch incomingSoGckr.GVKString() {
switch incomingSoGvkr.GVKString() {
case "apps/v1.Deployment":
deployment := &appsv1.Deployment{}
if err := getFromCacheOrDirect(context.Background(), key, deployment); err != nil {
Expand Down
75 changes: 15 additions & 60 deletions pkg/scalers/gcp_cloud_tasks_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scalers
import (
"context"
"fmt"
"strconv"

"github.com/go-logr/logr"
v2 "k8s.io/api/autoscaling/v2"
Expand All @@ -16,8 +15,6 @@ import (

const (
cloudTasksStackDriverQueueSize = "cloudtasks.googleapis.com/queue/depth"

cloudTaskDefaultValue = 100
)

type gcpCloudTasksScaler struct {
Expand All @@ -28,12 +25,12 @@ type gcpCloudTasksScaler struct {
}

type gcpCloudTaskMetadata struct {
value float64
activationValue float64
filterDuration int64
Value float64 `keda:"name=value, order=triggerMetadata, optional, default=100"`
ActivationValue float64 `keda:"name=activationValue, order=triggerMetadata, optional, default=0"`
FilterDuration int64 `keda:"name=filterDuration, order=triggerMetadata, optional"`

queueName string
projectID string
QueueName string `keda:"name=queueName, order=triggerMetadata"`
ProjectID string `keda:"name=projectID, order=triggerMetadata"`
gcpAuthorization *gcp.AuthorizationMetadata
triggerIndex int
}
Expand All @@ -60,61 +57,19 @@ func NewGcpCloudTasksScaler(config *scalersconfig.ScalerConfig) (Scaler, error)
}

func parseGcpCloudTasksMetadata(config *scalersconfig.ScalerConfig) (*gcpCloudTaskMetadata, error) {
meta := gcpCloudTaskMetadata{value: cloudTaskDefaultValue}

value, valuePresent := config.TriggerMetadata["value"]

if valuePresent {
triggerValue, err := strconv.ParseFloat(value, 64)
if err != nil {
return nil, fmt.Errorf("value parsing error %w", err)
}
meta.value = triggerValue
}

if val, ok := config.TriggerMetadata["queueName"]; ok {
if val == "" {
return nil, fmt.Errorf("no queue name given")
}
meta.queueName = val
} else {
return nil, fmt.Errorf("no queue name given")
}

if val, ok := config.TriggerMetadata["filterDuration"]; ok {
filterDuration, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("filterDuration parsing error %w", err)
}
meta.filterDuration = filterDuration
}

meta.activationValue = 0
if val, ok := config.TriggerMetadata["activationValue"]; ok {
activationValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("activationValue parsing error %w", err)
}
meta.activationValue = activationValue
}

if val, ok := config.TriggerMetadata["projectID"]; ok {
if val == "" {
return nil, fmt.Errorf("no project id given")
}

meta.projectID = val
} else {
return nil, fmt.Errorf("no project id given")
meta := &gcpCloudTaskMetadata{}
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing Gcp cloud task metadata: %w", err)
}

auth, err := gcp.GetGCPAuthorization(config)
if err != nil {
return nil, err
}

meta.gcpAuthorization = auth
meta.triggerIndex = config.TriggerIndex
return &meta, nil
return meta, nil
}

func (s *gcpCloudTasksScaler) Close(context.Context) error {
Expand All @@ -132,9 +87,9 @@ func (s *gcpCloudTasksScaler) Close(context.Context) error {
func (s *gcpCloudTasksScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("gcp-ct-%s", s.metadata.queueName))),
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("gcp-ct-%s", s.metadata.QueueName))),
},
Target: GetMetricTargetMili(s.metricType, s.metadata.value),
Target: GetMetricTargetMili(s.metricType, s.metadata.Value),
}

// Create the metric spec for the HPA
Expand All @@ -158,7 +113,7 @@ func (s *gcpCloudTasksScaler) GetMetricsAndActivity(ctx context.Context, metricN

metric := GenerateMetricInMili(metricName, value)

return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.activationValue, nil
return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.ActivationValue, nil
}

func (s *gcpCloudTasksScaler) setStackdriverClient(ctx context.Context) error {
Expand All @@ -185,9 +140,9 @@ func (s *gcpCloudTasksScaler) getMetrics(ctx context.Context, metricType string)
return -1, err
}
}
filter := `metric.type="` + metricType + `" AND resource.labels.queue_id="` + s.metadata.queueName + `"`
filter := `metric.type="` + metricType + `" AND resource.labels.queue_id="` + s.metadata.QueueName + `"`

// Cloud Tasks metrics are collected every 60 seconds so no need to aggregate them.
// See: https://cloud.google.com/monitoring/api/metrics_gcp#gcp-cloudtasks
return s.client.GetMetrics(ctx, filter, s.metadata.projectID, nil, nil, s.metadata.filterDuration)
return s.client.GetMetrics(ctx, filter, s.metadata.ProjectID, nil, nil, s.metadata.FilterDuration)
}
125 changes: 99 additions & 26 deletions pkg/scalers/gcp_cloud_tasks_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package scalers

import (
"context"
"reflect"
"testing"

"github.com/go-logr/logr"

"github.com/kedacore/keda/v2/pkg/scalers/gcp"
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
)

Expand All @@ -17,6 +19,8 @@ type parseGcpCloudTasksMetadataTestData struct {
authParams map[string]string
metadata map[string]string
isError bool
expected *gcpCloudTaskMetadata
comment string
}

type gcpCloudTasksMetricIdentifier struct {
Expand All @@ -26,25 +30,82 @@ type gcpCloudTasksMetricIdentifier struct {
}

var testGcpCloudTasksMetadata = []parseGcpCloudTasksMetadataTestData{
{map[string]string{}, map[string]string{}, true},
// all properly formed
{nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectID": "myproject", "activationValue": "5"}, false},
// missing subscriptionName
{nil, map[string]string{"queueName": "", "value": "7", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// missing credentials
{nil, map[string]string{"queueName": "myQueue", "value": "7", "projectID": "myproject", "credentialsFromEnv": ""}, true},
// malformed subscriptionSize
{nil, map[string]string{"queueName": "myQueue", "value": "AA", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// malformed mode
{nil, map[string]string{"queueName": "", "mode": "AA", "value": "7", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// malformed activationTargetValue
{nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectID": "myproject", "activationValue": "AA"}, true},
// Credentials from AuthParams
{map[string]string{"GoogleApplicationCredentials": "Creds"}, map[string]string{"queueName": "myQueue", "value": "7", "projectID": "myproject"}, false},
// Credentials from AuthParams with empty creds
{map[string]string{"GoogleApplicationCredentials": ""}, map[string]string{"queueName": "myQueue", "subscriptionSize": "7", "projectID": "myproject"}, true},
// properly formed float value and activationTargetValue
{nil, map[string]string{"queueName": "mysubscription", "value": "7.1", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "2.1", "projectID": "myproject"}, false},

{map[string]string{}, map[string]string{}, true, nil, "erro case"},

{nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectID": "myproject", "activationValue": "5"}, false, &gcpCloudTaskMetadata{
Value: 7,
ActivationValue: 5,
FilterDuration: 0,
QueueName: "myQueue",
ProjectID: "myproject",
gcpAuthorization: &gcp.AuthorizationMetadata{
GoogleApplicationCredentials: "{}",
PodIdentityProviderEnabled: false,
},
triggerIndex: 0}, "all properly formed"},

{nil, map[string]string{"queueName": "", "value": "7", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true, nil, "missing subscriptionName"},

{nil, map[string]string{"queueName": "myQueue", "value": "7", "projectID": "myproject", "credentialsFromEnv": ""}, true, nil, "missing credentials"},

{nil, map[string]string{"queueName": "myQueue", "value": "AA", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true, nil, "malformed subscriptionSize"},

{nil, map[string]string{"queueName": "", "mode": "AA", "value": "7", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true, nil, "malformed mode"},

{nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectID": "myproject", "activationValue": "AA"}, true, nil, "malformed activationTargetValue"},

{map[string]string{"GoogleApplicationCredentials": "Creds"}, map[string]string{"queueName": "myQueue", "value": "7", "projectID": "myproject"}, false, &gcpCloudTaskMetadata{
Value: 7,
ActivationValue: 0,
FilterDuration: 0,
QueueName: "myQueue",
ProjectID: "myproject",
gcpAuthorization: &gcp.AuthorizationMetadata{
GoogleApplicationCredentials: "Creds",
PodIdentityProviderEnabled: false,
},
triggerIndex: 0}, "Credentials from AuthParams"},

{map[string]string{"GoogleApplicationCredentials": ""}, map[string]string{"queueName": "myQueue", "subscriptionSize": "7", "projectID": "myproject"}, true, nil, "Credentials from AuthParams with empty creds"},

{nil, map[string]string{"queueName": "mysubscription", "value": "7.1", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "2.1", "projectID": "myproject"}, false, &gcpCloudTaskMetadata{
Value: 7.1,
ActivationValue: 2.1,
FilterDuration: 0,
QueueName: "mysubscription",
ProjectID: "myproject",
gcpAuthorization: &gcp.AuthorizationMetadata{
GoogleApplicationCredentials: "{}",
PodIdentityProviderEnabled: false,
},
triggerIndex: 0}, "properly formed float value and activationTargetValue"},

{nil, map[string]string{"queueName": "myQueue", "projectID": "myProject", "credentialsFromEnv": "SAMPLE_CREDS"}, false, &gcpCloudTaskMetadata{
Value: 100,
ActivationValue: 0,
FilterDuration: 0,
QueueName: "myQueue",
ProjectID: "myProject",
gcpAuthorization: &gcp.AuthorizationMetadata{
GoogleApplicationCredentials: "{}",
PodIdentityProviderEnabled: false,
},
triggerIndex: 0}, "test default value (100) when value is not provided"},

{nil, map[string]string{"queueName": "myQueue", "projectID": "myProject", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "5"}, false, &gcpCloudTaskMetadata{
Value: 100,
ActivationValue: 5,
FilterDuration: 0,
QueueName: "myQueue",
ProjectID: "myProject",
gcpAuthorization: &gcp.AuthorizationMetadata{
GoogleApplicationCredentials: "{}",
PodIdentityProviderEnabled: false,
},
triggerIndex: 0}, "test default value with specified activationVal"},

{nil, map[string]string{"queueName": "myQueue", "projectID": "myProject", "credentialsFromEnv": "SAMPLE_CREDS", "filterDuration": "invalid"}, true, nil, "test invalid filterDuration with default values"},
}

var gcpCloudTasksMetricIdentifiers = []gcpCloudTasksMetricIdentifier{
Expand All @@ -54,13 +115,25 @@ var gcpCloudTasksMetricIdentifiers = []gcpCloudTasksMetricIdentifier{

func TestGcpCloudTasksParseMetadata(t *testing.T) {
for _, testData := range testGcpCloudTasksMetadata {
_, err := parseGcpCloudTasksMetadata(&scalersconfig.ScalerConfig{AuthParams: testData.authParams, TriggerMetadata: testData.metadata, ResolvedEnv: testGcpCloudTasksResolvedEnv})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
t.Run(testData.comment, func(t *testing.T) {
metadata, err := parseGcpCloudTasksMetadata(&scalersconfig.ScalerConfig{
AuthParams: testData.authParams,
TriggerMetadata: testData.metadata,
ResolvedEnv: testGcpCloudTasksResolvedEnv,
})

if err != nil && !testData.isError {
t.Errorf("Expected success but got error")
}

if testData.isError && err == nil {
t.Errorf("Expected error but got success")
}

if !testData.isError && !reflect.DeepEqual(testData.expected, metadata) {
t.Fatalf("Expected %#v but got %+#v", testData.expected, metadata)
}
})
}
}

Expand Down
4 changes: 2 additions & 2 deletions tests/internals/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,12 @@ func getTemplateData() (templateData, []Template) {
}, []Template{}
}

func checkingEvent(t *testing.T, namespace string, scaledObject string, index int, eventreason string, message string) {
func checkingEvent(t *testing.T, namespace string, scaledObject string, index int, eventReason string, message string) {
result, err := ExecuteCommand(fmt.Sprintf("kubectl get events -n %s --field-selector involvedObject.name=%s --sort-by=.metadata.creationTimestamp -o jsonpath=\"{.items[%d].reason}:{.items[%d].message}\"", namespace, scaledObject, index, index))

assert.NoError(t, err)
lastEventMessage := strings.Trim(string(result), "\"")
assert.Equal(t, lastEventMessage, eventreason+":"+message)
assert.Equal(t, eventReason+":"+message, lastEventMessage)
}

func testNormalEvent(t *testing.T, kc *kubernetes.Clientset, data templateData) {
Expand Down

0 comments on commit 7326570

Please sign in to comment.