Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat) Integrate cf api with eventgenerator #3357

Draft
wants to merge 2 commits into
base: add-cf-endpoint-to-eventgenerator
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/autoscaler/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ testsuite: generate-fakes
APP_AUTOSCALER_TEST_RUN='true' go run 'github.com/onsi/ginkgo/v2/ginkgo@${GINKGO_VERSION}' -p ${GINKGO_OPTS} ${TEST}

.PHONY: integration
integration: generate-fakes
integration: #generate-fakes
@echo "# Running integration tests"
APP_AUTOSCALER_TEST_RUN='true' go run 'github.com/onsi/ginkgo/v2/ginkgo@${GINKGO_VERSION}' ${GINKGO_OPTS} integration

Expand Down
10 changes: 10 additions & 0 deletions src/autoscaler/api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type Config struct {
PlanCheck *PlanCheckConfig `yaml:"plan_check"`
CatalogPath string `yaml:"catalog_path"`
CatalogSchemaPath string `yaml:"catalog_schema_path"`
CfInstanceCert string `yaml:"cf_instance_cert"`
DashboardRedirectURI string `yaml:"dashboard_redirect_uri"`
PolicySchemaPath string `yaml:"policy_schema_path"`
Scheduler SchedulerConfig `yaml:"scheduler"`
Expand Down Expand Up @@ -209,9 +210,18 @@ func loadVcapConfig(conf *Config, vcapReader configutil.VCAPConfigurationReader)
if err := configureBindingDb(conf, vcapReader); err != nil {
return err
}

configureCfInstanceCert(conf, vcapReader)

return nil
}

func configureCfInstanceCert(conf *Config, vcapReader configutil.VCAPConfigurationReader) {
if cert, err := vcapReader.GetCfInstanceCert(); err == nil {
conf.CfInstanceCert = cert
}
}

func configurePolicyDb(conf *Config, vcapReader configutil.VCAPConfigurationReader) error {
currentPolicyDb, ok := conf.Db[db.PolicyDb]
if !ok {
Expand Down
22 changes: 22 additions & 0 deletions src/autoscaler/api/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,28 @@ var _ = Describe("Config", func() {
conf, err = LoadConfig("", mockVCAPConfigurationReader)
})

When("vcap CF_INSTANCE_CERT is set", func() {
BeforeEach(func() {
mockVCAPConfigurationReader.GetCfInstanceCertReturns("cert", nil)
})

It("sets env variable over config file", func() {
Expect(err).NotTo(HaveOccurred())
Expect(conf.CfInstanceCert).To(Equal("cert"))
})
})

When("vcap CF_INSTANCE_CERT is not set", func() {
BeforeEach(func() {
mockVCAPConfigurationReader.GetCfInstanceCertReturns("", fmt.Errorf("failed to get required credential from service"))
})

It("sets env variable over config file", func() {
Expect(err).NotTo(HaveOccurred())
Expect(conf.CfInstanceCert).To(Equal(""))
})
})

When("vcap PORT is set to a number", func() {
BeforeEach(func() {
mockVCAPConfigurationReader.GetPortReturns(3333)
Expand Down
2 changes: 1 addition & 1 deletion src/autoscaler/api/publicapiserver/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (mw *Middleware) Oauth(next http.Handler) http.Handler {
if err != nil {
mw.logger.Error("failed to check if user is admin", err, nil)
handlers.WriteJSONResponse(w, http.StatusInternalServerError, models.ErrorResponse{
Code: "Internal-Server-Error",
Code: http.StatusText(http.StatusInternalServerError),
Message: "Failed to check if user is admin"})
return
}
Expand Down
2 changes: 1 addition & 1 deletion src/autoscaler/api/publicapiserver/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ var _ = Describe("Middleware", func() {
})
It("should fail with 500", func() {
CheckResponse(resp, http.StatusInternalServerError, models.ErrorResponse{
Code: "Internal-Server-Error",
Code: http.StatusText(http.StatusInternalServerError),
Message: "Failed to check if user is admin",
})
})
Expand Down
195 changes: 115 additions & 80 deletions src/autoscaler/api/publicapiserver/public_api_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package publicapiserver

import (
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
Expand All @@ -16,12 +17,11 @@ import (
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/cred_helper"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/db"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers/handlers"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/models"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/routes"
"github.com/google/uuid"

"code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers/handlers"
"code.cloudfoundry.org/lager/v3"
"github.com/google/uuid"
)

type PublicApiHandler struct {
Expand Down Expand Up @@ -54,22 +54,26 @@ func NewPublicApiHandler(logger lager.Logger, conf *config.Config, policydb db.P
policydb: policydb,
bindingdb: bindingdb,
eventGeneratorClient: egClient,
policyValidator: policyvalidator.NewPolicyValidator(
conf.PolicySchemaPath,
conf.ScalingRules.CPU.LowerThreshold,
conf.ScalingRules.CPU.UpperThreshold,
conf.ScalingRules.CPUUtil.LowerThreshold,
conf.ScalingRules.CPUUtil.UpperThreshold,
conf.ScalingRules.DiskUtil.LowerThreshold,
conf.ScalingRules.DiskUtil.UpperThreshold,
conf.ScalingRules.Disk.LowerThreshold,
conf.ScalingRules.Disk.UpperThreshold,
),
schedulerUtil: schedulerclient.New(conf, logger),
credentials: credentials,
policyValidator: createPolicyValidator(conf),
schedulerUtil: schedulerclient.New(conf, logger),
credentials: credentials,
}
}

func createPolicyValidator(conf *config.Config) *policyvalidator.PolicyValidator {
return policyvalidator.NewPolicyValidator(
conf.PolicySchemaPath,
conf.ScalingRules.CPU.LowerThreshold,
conf.ScalingRules.CPU.UpperThreshold,
conf.ScalingRules.CPUUtil.LowerThreshold,
conf.ScalingRules.CPUUtil.UpperThreshold,
conf.ScalingRules.DiskUtil.LowerThreshold,
conf.ScalingRules.DiskUtil.UpperThreshold,
conf.ScalingRules.Disk.LowerThreshold,
conf.ScalingRules.Disk.UpperThreshold,
)
}

func writeErrorResponse(w http.ResponseWriter, statusCode int, message string) {
handlers.WriteJSONResponse(w, statusCode, models.ErrorResponse{
Code: http.StatusText(statusCode),
Expand All @@ -83,6 +87,7 @@ func (h *PublicApiHandler) GetScalingPolicy(w http.ResponseWriter, r *http.Reque
writeErrorResponse(w, http.StatusBadRequest, ErrorMessageAppidIsRequired)
return
}

logger := h.logger.Session("GetScalingPolicy", lager.Data{"appId": appId})
logger.Info("Get Scaling Policy")

Expand Down Expand Up @@ -128,15 +133,17 @@ func (h *PublicApiHandler) AttachScalingPolicy(w http.ResponseWriter, r *http.Re
}

policyGuid := uuid.NewString()
err = h.policydb.SaveAppPolicy(r.Context(), appId, policy, policyGuid)
if err != nil {
if err := h.policydb.SaveAppPolicy(r.Context(), appId, policy, policyGuid); err != nil {
logger.Error("Failed to save policy", err)
writeErrorResponse(w, http.StatusInternalServerError, "Error saving policy")
return
}

h.logger.Info("creating/updating schedules", lager.Data{"policy": policy})
err = h.schedulerUtil.CreateOrUpdateSchedule(r.Context(), appId, policy, policyGuid)
if err != nil {

//while there is synchronization between policy and schedule, so creating schedule error does not break
//the whole creating binding process
if err := h.schedulerUtil.CreateOrUpdateSchedule(r.Context(), appId, policy, policyGuid); err != nil {
logger.Error("Failed to create/update schedule", err)
writeErrorResponse(w, http.StatusInternalServerError, err.Error())
return
Expand All @@ -150,7 +157,7 @@ func (h *PublicApiHandler) AttachScalingPolicy(w http.ResponseWriter, r *http.Re
}
_, err = w.Write(response)
if err != nil {
logger.Error("Failed to write body", err)
h.logger.Error("Failed to write body", err)
}
}

Expand All @@ -161,73 +168,107 @@ func (h *PublicApiHandler) DetachScalingPolicy(w http.ResponseWriter, r *http.Re
writeErrorResponse(w, http.StatusBadRequest, ErrorMessageAppidIsRequired)
return
}

logger := h.logger.Session("DetachScalingPolicy", lager.Data{"appId": appId})
logger.Info("Deleting policy json", lager.Data{"appId": appId})
err := h.policydb.DeletePolicy(r.Context(), appId)
if err != nil {

if err := h.policydb.DeletePolicy(r.Context(), appId); err != nil {
logger.Error("Failed to delete policy from database", err)
writeErrorResponse(w, http.StatusInternalServerError, "Error deleting policy")
return
}

logger.Info("Deleting schedules")
err = h.schedulerUtil.DeleteSchedule(r.Context(), appId)
if err != nil {
if err := h.schedulerUtil.DeleteSchedule(r.Context(), appId); err != nil {
logger.Error("Failed to delete schedule", err)
writeErrorResponse(w, http.StatusInternalServerError, "Error deleting schedules")
return
}

if h.bindingdb != nil && !reflect.ValueOf(h.bindingdb).IsNil() {
//TODO this is a copy of part of the attach ... this should use a common function.
// brokered offering: check if there's a default policy that could apply
serviceInstance, err := h.bindingdb.GetServiceInstanceByAppId(appId)
if err != nil {
logger.Error("Failed to find service instance for app", err)
writeErrorResponse(w, http.StatusInternalServerError, "Error retrieving service instance")
if err := h.handleDefaultPolicy(w, r, logger, appId); err != nil {
return
}
if serviceInstance.DefaultPolicy != "" {
policyStr := serviceInstance.DefaultPolicy
policyGuidStr := serviceInstance.DefaultPolicyGuid
logger.Info("saving default policy json for app", lager.Data{"policy": policyStr})
var policy *models.ScalingPolicy
err := json.Unmarshal([]byte(policyStr), &policy)
if err != nil {
h.logger.Error("default policy invalid", err, lager.Data{"appId": appId, "policy": policyStr})
writeErrorResponse(w, http.StatusInternalServerError, "Default policy not valid")
return
}

err = h.policydb.SaveAppPolicy(r.Context(), appId, policy, policyGuidStr)
if err != nil {
logger.Error("failed to save policy", err, lager.Data{"policy": policyStr})
writeErrorResponse(w, http.StatusInternalServerError, "Error attaching the default policy")
return
}

logger.Info("creating/updating schedules", lager.Data{"policy": policyStr})
err = h.schedulerUtil.CreateOrUpdateSchedule(r.Context(), appId, policy, policyGuidStr)
//while there is synchronization between policy and schedule, so creating schedule error does not break
//the whole creating binding process
if err != nil {
logger.Error("failed to create/update schedules", err, lager.Data{"policy": policyStr})
writeErrorResponse(w, http.StatusInternalServerError, fmt.Sprintf("Failed to update schedule:%s", err))
}
}
}

// find via the app id the binding -> service instance
// default policy? then apply that

w.WriteHeader(http.StatusOK)
_, err = w.Write([]byte("{}"))
_, err := w.Write([]byte("{}"))
if err != nil {
logger.Error(ActionWriteBody, err)
}
}

func proxyRequest(pathFn func() string, call func(url string) (*http.Response, error), w http.ResponseWriter, reqUrl *url.URL, parameters *url.Values, requestDescription string, logger lager.Logger) {
aUrl := pathFn()
resp, err := call(aUrl)
// TODO this is a copy of part of the attach ... this should use a common function.
// brokered offering: check if there's a default policy that could apply
func (h *PublicApiHandler) handleDefaultPolicy(w http.ResponseWriter, r *http.Request, logger lager.Logger, appId string) error {
serviceInstance, err := h.bindingdb.GetServiceInstanceByAppId(appId)
if err != nil {
logger.Error("Failed to find service instance for app", err)
writeErrorResponse(w, http.StatusInternalServerError, "Error retrieving service instance")
return errors.New("error retrieving service instance")

}

if serviceInstance.DefaultPolicy != "" {
return h.saveDefaultPolicy(w, r, logger, appId, serviceInstance)
}

return nil
}

func (h *PublicApiHandler) saveDefaultPolicy(w http.ResponseWriter, r *http.Request, logger lager.Logger, appId string, serviceInstance *models.ServiceInstance) error {
policyStr := serviceInstance.DefaultPolicy
policyGuidStr := serviceInstance.DefaultPolicyGuid
logger.Info("saving default policy json for app", lager.Data{"policy": policyStr})

var policy *models.ScalingPolicy
if err := json.Unmarshal([]byte(policyStr), &policy); err != nil {
h.logger.Error("default policy invalid", err, lager.Data{"appId": appId, "policy": policyStr})
writeErrorResponse(w, http.StatusInternalServerError, "Default policy not valid")
return errors.New("default policy not valid")
}

if err := h.policydb.SaveAppPolicy(r.Context(), appId, policy, policyGuidStr); err != nil {
logger.Error("failed to save policy", err, lager.Data{"policy": policyStr})
writeErrorResponse(w, http.StatusInternalServerError, "Error attaching the default policy")
return errors.New("error attaching the default policy")
}

logger.Info("creating/updating schedules", lager.Data{"policy": policyStr})
if err := h.schedulerUtil.CreateOrUpdateSchedule(r.Context(), appId, policy, policyGuidStr); err != nil {
logger.Error("failed to create/update schedules", err, lager.Data{"policy": policyStr})
writeErrorResponse(w, http.StatusInternalServerError, fmt.Sprintf("Failed to update schedule:%s", err))
return errors.New("failed to update schedule")
}

return nil
}

func (h *PublicApiHandler) proxyRequest(logger lager.Logger, appId string, metricType string, w http.ResponseWriter, req *http.Request, parameters *url.Values, requestDescription string) {
reqUrl := req.URL
r := routes.NewRouter()
router := r.CreateEventGeneratorRoutes()
if router == nil {
panic("Failed to create event generator routes")
}

route := router.Get(routes.GetAggregatedMetricHistoriesRouteName)
path, err := route.URLPath("appid", appId, "metrictype", metricType)
if err != nil {
logger.Error("Failed to create path", err)
panic(err)
}

aUrl := h.conf.EventGenerator.EventGeneratorUrl + path.RequestURI() + "?" + parameters.Encode()
req, err = http.NewRequest("GET", aUrl, nil)

if h.conf.CfInstanceCert != "" {
h.setXForwardedClientCertHeader(req)
}

resp, err := h.eventGeneratorClient.Do(req)
if err != nil {
logger.Error("Failed to retrieve "+requestDescription, err, lager.Data{"url": aUrl})
writeErrorResponse(w, http.StatusInternalServerError, "Error retrieving "+requestDescription)
Expand All @@ -247,6 +288,7 @@ func proxyRequest(pathFn func() string, call func(url string) (*http.Response, e
writeErrorResponse(w, resp.StatusCode, string(responseData))
return
}

paginatedResponse, err := paginateResource(responseData, parameters, reqUrl)
if err != nil {
handlers.WriteJSONResponse(w, http.StatusInternalServerError, err.Error())
Expand All @@ -256,6 +298,14 @@ func proxyRequest(pathFn func() string, call func(url string) (*http.Response, e
handlers.WriteJSONResponse(w, resp.StatusCode, paginatedResponse)
}

func (h *PublicApiHandler) setXForwardedClientCertHeader(req *http.Request) {
certPEM := []byte(h.conf.CfInstanceCert)
hash := sha256.Sum256(certPEM)
encodedCert := url.QueryEscape(string(certPEM))
xfccHeader := fmt.Sprintf("Hash=%x;Cert=\"%s\"", hash, encodedCert)
req.Header.Set("X-Forwarded-Client-Cert", xfccHeader)
}

func (h *PublicApiHandler) GetAggregatedMetricsHistories(w http.ResponseWriter, req *http.Request, vars map[string]string) {
appId := vars["appId"]
metricType := vars["metricType"]
Expand All @@ -274,22 +324,7 @@ func (h *PublicApiHandler) GetAggregatedMetricsHistories(w http.ResponseWriter,
return
}

pathFn := func() string {
r := routes.NewRouter()
router := r.CreateEventGeneratorRoutes()
if router == nil {
panic("Failed to create event generator routes")
}

route := router.Get(routes.GetAggregatedMetricHistoriesRouteName)
path, err := route.URLPath("appid", appId, "metrictype", metricType)
if err != nil {
logger.Error("Failed to create path", err)
panic(err)
}
return h.conf.EventGenerator.EventGeneratorUrl + path.RequestURI() + "?" + parameters.Encode()
}
proxyRequest(pathFn, h.eventGeneratorClient.Get, w, req.URL, parameters, "metrics history from eventgenerator", logger)
h.proxyRequest(logger, appId, metricType, w, req, parameters, "metrics history from eventgenerator")
}

func (h *PublicApiHandler) GetApiInfo(w http.ResponseWriter, _ *http.Request, _ map[string]string) {
Expand Down
Loading
Loading