Skip to content

Commit

Permalink
feat: status for all instances (#5437)
Browse files Browse the repository at this point in the history
* feat: status for all instances

Signed-off-by: Yvonnick Esnault <[email protected]>
  • Loading branch information
yesnault authored Sep 25, 2020
1 parent 110ef4d commit 679eb5e
Show file tree
Hide file tree
Showing 18 changed files with 364 additions and 52 deletions.
12 changes: 6 additions & 6 deletions cli/cdsctl/admin_cdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ var adminCdnStatusCmd = cli.Command{
}

func adminCdnStatusRun(v cli.Values) (cli.ListResult, error) {
btes, err := client.ServiceCallGET("cdn", "/mon/status")
services, err := client.ServicesByType(sdk.TypeCDN)
if err != nil {
return nil, err
}
status := sdk.MonitoringStatus{}
if err := json.Unmarshal(btes, &status); err != nil {
return nil, err
for _, srv := range services {
status.Lines = append(status.Lines, srv.MonitoringStatus.Lines...)
}
return cli.AsListResult(status.Lines), nil
}
Expand All @@ -61,7 +61,7 @@ var adminCdnCacheLogClearCmd = cli.Command{
}

func adminCdnCacheLogClearRun(v cli.Values) error {
return client.ServiceCallDELETE("cdn", "/cache")
return client.ServiceCallDELETE(sdk.TypeCDN, "/cache")
}

var adminCdnCacheLogStatusCmd = cli.Command{
Expand All @@ -71,7 +71,7 @@ var adminCdnCacheLogStatusCmd = cli.Command{
}

func adminCdnCacheLogStatusRun(v cli.Values) (cli.ListResult, error) {
btes, err := client.ServiceCallGET("cdn", "/cache/status")
btes, err := client.ServiceCallGET(sdk.TypeCDN, "/cache/status")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -104,7 +104,7 @@ var adminCdnItemSizeProjectCmd = cli.Command{
}

func adminCdnItemSizeProjectRun(v cli.Values) error {
btes, err := client.ServiceCallGET("cdn", "/size/item/project/"+v.GetString(_ProjectKey))
btes, err := client.ServiceCallGET(sdk.TypeCDN, "/size/item/project/"+v.GetString(_ProjectKey))
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions engine/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ func (api *API) getAdminServicesHandler() service.Handler {

var err error
if r.FormValue("type") != "" {
srvs, err = services.LoadAllByType(ctx, api.mustDB(), r.FormValue("type"))
srvs, err = services.LoadAllByType(ctx, api.mustDB(), r.FormValue("type"), services.LoadOptions.WithStatus)
} else {
srvs, err = services.LoadAll(ctx, api.mustDB())
srvs, err = services.LoadAll(ctx, api.mustDB(), services.LoadOptions.WithStatus)
}
if err != nil {
return err
Expand Down Expand Up @@ -87,7 +87,7 @@ func (api *API) getAdminServiceHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
vars := mux.Vars(r)
name := vars["name"]
srv, err := services.LoadByName(ctx, api.mustDB(), name)
srv, err := services.LoadByName(ctx, api.mustDB(), name, services.LoadOptions.WithStatus)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,9 @@ func (a *API) Serve(ctx context.Context) error {
go event.DequeueEvent(ctx, a.mustDB())
}

// here the generated name of API is ready, we set ServerName with that
a.Common.ServiceName = event.GetCDSName()

log.Info(ctx, "Initializing internal routines...")
a.GoRoutines.Run(ctx, "maintenance.Subscribe", func(ctx context.Context) {
if err := a.listenMaintenance(ctx); err != nil {
Expand Down
27 changes: 27 additions & 0 deletions engine/api/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func (api *API) postServiceRegisterHandler() service.Handler {
}

// Update or create the service

var sessionID string
if a := getAuthSession(ctx); a != nil {
sessionID = a.ID
}
if exists {
srv.Update(data)
if err := services.Update(ctx, tx, srv); err != nil {
Expand All @@ -108,12 +113,17 @@ func (api *API) postServiceRegisterHandler() service.Handler {
} else {
srv = &data
srv.ConsumerID = &consumer.ID

if err := services.Insert(ctx, tx, srv); err != nil {
return sdk.WithStack(err)
}
log.Debug("postServiceRegisterHandler> insert new service %s(%d) registered for consumer %s", srv.Name, srv.ID, *srv.ConsumerID)
}

if err := services.UpsertStatus(tx, *srv, sessionID); err != nil {
return sdk.WithStack(err)
}

if len(srv.PublicKey) > 0 {
log.Debug("postServiceRegisterHandler> service %s registered with public key: %s", srv.Name, string(srv.PublicKey))
}
Expand Down Expand Up @@ -173,10 +183,18 @@ func (api *API) postServiceHearbeatHandler() service.Handler {
s.LastHeartbeat = time.Now()
s.MonitoringStatus = mon

var sessionID string
if a := getAuthSession(ctx); a != nil {
sessionID = a.ID
}
if err := services.Update(ctx, tx, s); err != nil {
return err
}

if err := services.UpsertStatus(tx, *s, sessionID); err != nil {
return err
}

if err := tx.Commit(); err != nil {
return sdk.WithStack(err)
}
Expand Down Expand Up @@ -238,6 +256,10 @@ func (api *API) serviceAPIHeartbeatUpdate(ctx context.Context, db *gorp.DbMap) {
return
}

var authSessionID string
if a := getAuthSession(ctx); a != nil {
authSessionID = a.ID
}
if exists {
srv.ID = old.ID
if err := services.Update(ctx, tx, srv); err != nil {
Expand All @@ -251,6 +273,11 @@ func (api *API) serviceAPIHeartbeatUpdate(ctx context.Context, db *gorp.DbMap) {
}
}

if err := services.UpsertStatus(tx, *srv, authSessionID); err != nil {
log.Error(ctx, "serviceAPIHeartbeat> Unable to insert or update monitoring status %s: %v", srv.Name, err)
return
}

if err := tx.Commit(); err != nil {
log.Error(ctx, "serviceAPIHeartbeat> error tx commit: %v", err)
return
Expand Down
69 changes: 56 additions & 13 deletions engine/api/services/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ import (
"github.com/ovh/cds/sdk/log"
)

func getAll(ctx context.Context, db gorp.SqlExecutor, q gorpmapping.Query) ([]sdk.Service, error) {
func getAll(ctx context.Context, db gorp.SqlExecutor, q gorpmapping.Query, opts ...LoadOptionFunc) ([]sdk.Service, error) {
ss := []service{}

if err := gorpmapping.GetAll(ctx, db, q, &ss); err != nil {
return nil, sdk.WrapError(err, "cannot get services")
}

// Check signature of data, if invalid do not return it
verifiedServices := make([]sdk.Service, 0, len(ss))
verifiedServices := make([]*sdk.Service, 0, len(ss))
for i := range ss {
isValid, err := gorpmapping.CheckSignature(ss[i], ss[i].Signature)
if err != nil {
Expand All @@ -30,13 +29,27 @@ func getAll(ctx context.Context, db gorp.SqlExecutor, q gorpmapping.Query) ([]sd
log.Error(ctx, "service.getAll> service %d data corrupted", ss[i].ID)
continue
}
verifiedServices = append(verifiedServices, ss[i].Service)
s := ss[i].Service
verifiedServices = append(verifiedServices, &s)
}

return verifiedServices, nil
if len(verifiedServices) > 0 {
for _, f := range opts {
if err := f(ctx, db, verifiedServices...); err != nil {
return nil, err
}
}
}

services := make([]sdk.Service, 0, len(verifiedServices))
for _, v := range verifiedServices {
services = append(services, *v)
}

return services, nil
}

func get(ctx context.Context, db gorp.SqlExecutor, q gorpmapping.Query) (*sdk.Service, error) {
func get(ctx context.Context, db gorp.SqlExecutor, q gorpmapping.Query, opts ...LoadOptionFunc) (*sdk.Service, error) {
var s service

found, err := gorpmapping.Get(ctx, db, q, &s)
Expand All @@ -56,25 +69,33 @@ func get(ctx context.Context, db gorp.SqlExecutor, q gorpmapping.Query) (*sdk.Se
return nil, sdk.WithStack(sdk.ErrNotFound)
}

if len(opts) > 0 {
for _, f := range opts {
if err := f(ctx, db, &s.Service); err != nil {
return nil, err
}
}
}

return &s.Service, nil
}

// LoadAll returns all services in database.
func LoadAll(ctx context.Context, db gorp.SqlExecutor) ([]sdk.Service, error) {
func LoadAll(ctx context.Context, db gorp.SqlExecutor, opts ...LoadOptionFunc) ([]sdk.Service, error) {
query := gorpmapping.NewQuery(`SELECT * FROM service`)
return getAll(ctx, db, query)
return getAll(ctx, db, query, opts...)
}

// LoadAllByType returns all services with given type.
func LoadAllByType(ctx context.Context, db gorp.SqlExecutor, typeService string) ([]sdk.Service, error) {
func LoadAllByType(ctx context.Context, db gorp.SqlExecutor, typeService string, opts ...LoadOptionFunc) ([]sdk.Service, error) {
if ss, ok := internalCache.getFromCache(typeService); ok {
return ss, nil
}
query := gorpmapping.NewQuery(`SELECT * FROM service WHERE type = $1`).Args(typeService)
return getAll(ctx, db, query)
return getAll(ctx, db, query, opts...)
}

// LoadAllByType returns all services that users can see with given type.
// LoadAllByTypeAndUserID returns all services that users can see with given type.
func LoadAllByTypeAndUserID(ctx context.Context, db gorp.SqlExecutor, typeService string, userID string) ([]sdk.Service, error) {
query := gorpmapping.NewQuery(`
SELECT service.*
Expand All @@ -97,9 +118,9 @@ func LoadByNameAndType(ctx context.Context, db gorp.SqlExecutor, name, stype str
}

// LoadByName returns a service by its name.
func LoadByName(ctx context.Context, db gorp.SqlExecutor, name string) (*sdk.Service, error) {
func LoadByName(ctx context.Context, db gorp.SqlExecutor, name string, opts ...LoadOptionFunc) (*sdk.Service, error) {
query := gorpmapping.NewQuery("SELECT * FROM service WHERE name = $1").Args(name)
return get(ctx, db, query)
return get(ctx, db, query, opts...)
}

// LoadByID returns a service by its id.
Expand Down Expand Up @@ -134,6 +155,28 @@ func Update(ctx context.Context, db gorpmapper.SqlExecutorWithTx, s *sdk.Service
return nil
}

// UpsertStatus insert or update monitoring status
func UpsertStatus(db gorpmapper.SqlExecutorWithTx, s sdk.Service, authSessionID string) error {
var sessionID *string
if authSessionID == "" {
// no sessionID : we can delete service_status to keep only one status
// example: each api has a consumerID and no authSessionID -> so only on status per service
query := "delete from service_status where service_id = $1"
if _, err := db.Exec(query, s.ID); err != nil {
return sdk.WithStack(err)
}
query = `INSERT INTO service_status(monitoring_status, service_id) VALUES($1,$2)`
_, err := db.Exec(query, s.MonitoringStatus, s.ID)
return sdk.WithStack(err)
}
sessionID = &authSessionID
query := `INSERT INTO service_status(monitoring_status, service_id, auth_session_id) VALUES($1,$2, $3)
ON CONFLICT (service_id, auth_session_id) DO UPDATE SET monitoring_status = $1, service_id = $2, auth_session_id = $3`
_, err := db.Exec(query, s.MonitoringStatus, s.ID, sessionID)
return sdk.WithStack(err)

}

// Delete a service.
func Delete(db gorp.SqlExecutor, s *sdk.Service) error {
if s.Type == sdk.TypeHatchery {
Expand Down
85 changes: 85 additions & 0 deletions engine/api/services/dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package services_test
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ovh/cds/engine/api/services"
"github.com/ovh/cds/engine/api/test"
Expand Down Expand Up @@ -55,3 +57,86 @@ func TestDAO(t *testing.T) {
_, err = services.FindDeadServices(context.TODO(), db, 0)
test.NoError(t, err)
}

func TestDAOWithStatus(t *testing.T) {
db, _ := test.SetupPG(t)

allSrv, err := services.LoadAll(context.TODO(), db)
for _, s := range allSrv {
if err := services.Delete(db, &s); err != nil {
t.Fatalf("unable to delete service: %v", err)
}
}

privateKey, err := jws.NewRandomRSAKey()
test.NoError(t, err)
publicKey, err := jws.ExportPublicKey(privateKey)
test.NoError(t, err)

theServiceName := sdk.RandomString(10)
var srv = sdk.Service{
CanonicalService: sdk.CanonicalService{
Name: theServiceName,
Type: "type-service-test",
PublicKey: publicKey,
},
MonitoringStatus: sdk.MonitoringStatus{
Now: time.Now(),
Lines: []sdk.MonitoringStatusLine{
{
Component: "backend/cds-backend/items",
Hostname: "foofoo.local",
Service: "cds-cdn",
Status: "OK",
Type: "cdn",
Value: "90",
},
},
},
}

test.NoError(t, services.Insert(context.TODO(), db, &srv))
test.NoError(t, services.UpsertStatus(db, srv, ""))

srv2, err := services.LoadByName(context.TODO(), db, srv.Name)
test.NoError(t, err)

assert.Equal(t, srv.Name, srv2.Name)
assert.Equal(t, string(srv.PublicKey), string(srv2.PublicKey))

all, err := services.LoadAllByType(context.TODO(), db, srv.Type)
test.NoError(t, err)

assert.True(t, len(all) >= 1)

all2, err := services.LoadAll(context.TODO(), db, services.LoadOptions.WithStatus)
test.NoError(t, err)
var found bool
for _, s := range all2 {
if s.Name == theServiceName {
found = true
require.EqualValues(t, 1, len(s.MonitoringStatus.Lines))
for _, ss := range s.MonitoringStatus.Lines {
require.EqualValues(t, "backend/cds-backend/items", ss.Component)
}
break
}
}

require.True(t, found)

srv3, err := services.LoadByName(context.TODO(), db, theServiceName)
test.NoError(t, err)
require.EqualValues(t, 0, len(srv3.MonitoringStatus.Lines))

srv4, err := services.LoadByName(context.TODO(), db, theServiceName, services.LoadOptions.WithStatus)
test.NoError(t, err)
require.EqualValues(t, 1, len(srv4.MonitoringStatus.Lines))

for _, s := range all {
test.NoError(t, services.Delete(db, &s))
}

_, err = services.FindDeadServices(context.TODO(), db, 0)
test.NoError(t, err)
}
Loading

0 comments on commit 679eb5e

Please sign in to comment.