Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: status for all instances #5437

Merged
merged 9 commits into from
Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions cli/cdsctl/admin_cdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ var adminCdnStatusCmd = cli.Command{
}

func adminCdnStatusRun(v cli.Values) (cli.ListResult, error) {
btes, err := client.ServiceCallGET("cdn", "/mon/status")
services, err := client.ServicesByType(sdk.TypeCDN)
if err != nil {
return nil, err
}
status := sdk.MonitoringStatus{}
if err := json.Unmarshal(btes, &status); err != nil {
return nil, err
for _, srv := range services {
status.Lines = append(status.Lines, srv.MonitoringStatus.Lines...)
}
return cli.AsListResult(status.Lines), nil
}
Expand All @@ -61,7 +61,7 @@ var adminCdnCacheLogClearCmd = cli.Command{
}

func adminCdnCacheLogClearRun(v cli.Values) error {
return client.ServiceCallDELETE("cdn", "/cache")
return client.ServiceCallDELETE(sdk.TypeCDN, "/cache")
}

var adminCdnCacheLogStatusCmd = cli.Command{
Expand All @@ -71,7 +71,7 @@ var adminCdnCacheLogStatusCmd = cli.Command{
}

func adminCdnCacheLogStatusRun(v cli.Values) (cli.ListResult, error) {
btes, err := client.ServiceCallGET("cdn", "/cache/status")
btes, err := client.ServiceCallGET(sdk.TypeCDN, "/cache/status")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -104,7 +104,7 @@ var adminCdnItemSizeProjectCmd = cli.Command{
}

func adminCdnItemSizeProjectRun(v cli.Values) error {
btes, err := client.ServiceCallGET("cdn", "/size/item/project/"+v.GetString(_ProjectKey))
btes, err := client.ServiceCallGET(sdk.TypeCDN, "/size/item/project/"+v.GetString(_ProjectKey))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion engine/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (api *API) getAdminServiceHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
vars := mux.Vars(r)
name := vars["name"]
srv, err := services.LoadByName(ctx, api.mustDB(), name)
srv, err := services.LoadByName(ctx, api.mustDB(), name, services.LoadOptions.WithStatus)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,9 @@ func (a *API) Serve(ctx context.Context) error {
go event.DequeueEvent(ctx, a.mustDB())
}

// here the generated name of API is ready, we set ServerName with that
a.Common.ServiceName = event.GetCDSName()

log.Info(ctx, "Initializing internal routines...")
a.GoRoutines.Run(ctx, "maintenance.Subscribe", func(ctx context.Context) {
if err := a.listenMaintenance(ctx); err != nil {
Expand Down
23 changes: 18 additions & 5 deletions engine/api/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,20 +106,24 @@ func (api *API) postServiceRegisterHandler() service.Handler {
}
if exists {
srv.Update(data)
if err := services.Update(ctx, tx, srv, sessionID); err != nil {
if err := services.Update(ctx, tx, srv); err != nil {
return err
}
log.Debug("postServiceRegisterHandler> update existing service %s(%d) registered for consumer %s", srv.Name, srv.ID, *srv.ConsumerID)
} else {
srv = &data
srv.ConsumerID = &consumer.ID

if err := services.Insert(ctx, tx, srv, sessionID); err != nil {
if err := services.Insert(ctx, tx, srv); err != nil {
return sdk.WithStack(err)
}
log.Debug("postServiceRegisterHandler> insert new service %s(%d) registered for consumer %s", srv.Name, srv.ID, *srv.ConsumerID)
}

if err := services.UpsertStatus(tx, *srv, sessionID); err != nil {
return sdk.WithStack(err)
}

if len(srv.PublicKey) > 0 {
log.Debug("postServiceRegisterHandler> service %s registered with public key: %s", srv.Name, string(srv.PublicKey))
}
Expand Down Expand Up @@ -183,7 +187,11 @@ func (api *API) postServiceHearbeatHandler() service.Handler {
if a := getAuthSession(ctx); a != nil {
sessionID = a.ID
}
if err := services.Update(ctx, tx, s, sessionID); err != nil {
if err := services.Update(ctx, tx, s); err != nil {
return err
}

if err := services.UpsertStatus(tx, *s, sessionID); err != nil {
return err
}

Expand Down Expand Up @@ -254,17 +262,22 @@ func (api *API) serviceAPIHeartbeatUpdate(ctx context.Context, db *gorp.DbMap) {
}
if exists {
srv.ID = old.ID
if err := services.Update(ctx, tx, srv, authSessionID); err != nil {
if err := services.Update(ctx, tx, srv); err != nil {
log.Error(ctx, "serviceAPIHeartbeat> Unable to update service %s: %v", srv.Name, err)
return
}
} else {
if err := services.Insert(ctx, tx, srv, authSessionID); err != nil {
if err := services.Insert(ctx, tx, srv); err != nil {
log.Error(ctx, "serviceAPIHeartbeat> Unable to insert service %s: %v", srv.Name, err)
return
}
}

if err := services.UpsertStatus(tx, *srv, authSessionID); err != nil {
log.Error(ctx, "serviceAPIHeartbeat> Unable to insert or update monitoring status %s: %v", srv.Name, err)
return
}

if err := tx.Commit(); err != nil {
log.Error(ctx, "serviceAPIHeartbeat> error tx commit: %v", err)
return
Expand Down
98 changes: 49 additions & 49 deletions engine/api/services/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ import (
"github.com/ovh/cds/sdk/log"
)

func getAll(ctx context.Context, db gorp.SqlExecutor, q gorpmapping.Query) ([]sdk.Service, error) {
func getAll(ctx context.Context, db gorp.SqlExecutor, q gorpmapping.Query, opts ...LoadOptionFunc) ([]sdk.Service, error) {
ss := []service{}

if err := gorpmapping.GetAll(ctx, db, q, &ss); err != nil {
return nil, sdk.WrapError(err, "cannot get services")
}

// Check signature of data, if invalid do not return it
verifiedServices := make([]sdk.Service, 0, len(ss))
servicesIDs := make([]int64, 0, len(ss))
for i := range ss {
isValid, err := gorpmapping.CheckSignature(ss[i], ss[i].Signature)
if err != nil {
Expand All @@ -31,12 +31,19 @@ func getAll(ctx context.Context, db gorp.SqlExecutor, q gorpmapping.Query) ([]sd
continue
}
verifiedServices = append(verifiedServices, ss[i].Service)
servicesIDs = append(servicesIDs, ss[i].Service.ID)
}

for _, f := range opts {
yesnault marked this conversation as resolved.
Show resolved Hide resolved
if err := f(ctx, db, verifiedServices, servicesIDs); err != nil {
return nil, err
}
}

return verifiedServices, nil
}

func get(ctx context.Context, db gorp.SqlExecutor, q gorpmapping.Query) (*sdk.Service, error) {
func get(ctx context.Context, db gorp.SqlExecutor, q gorpmapping.Query, opts ...LoadOptionFunc) (*sdk.Service, error) {
var s service

found, err := gorpmapping.Get(ctx, db, q, &s)
Expand All @@ -56,48 +63,32 @@ func get(ctx context.Context, db gorp.SqlExecutor, q gorpmapping.Query) (*sdk.Se
return nil, sdk.WithStack(sdk.ErrNotFound)
}

if len(opts) > 0 {
services := []sdk.Service{s.Service}
for _, f := range opts {
if err := f(ctx, db, services, []int64{s.Service.ID}); err != nil {
return nil, err
}
}
s.Service = services[0]
}

return &s.Service, nil
}

// LoadAll returns all services in database.
func LoadAll(ctx context.Context, db gorp.SqlExecutor) ([]sdk.Service, error) {
func LoadAll(ctx context.Context, db gorp.SqlExecutor, opts ...LoadOptionFunc) ([]sdk.Service, error) {
query := gorpmapping.NewQuery(`SELECT * FROM service`)
return getAll(ctx, db, query)
}

// LoadAllWithStatus returns all services in database with status
func LoadAllWithStatus(ctx context.Context, db gorp.SqlExecutor) ([]sdk.Service, error) {
query := gorpmapping.NewQuery(`SELECT * FROM service`)
srvs, err := getAll(ctx, db, query)
if err != nil {
return nil, err
}

ss, err := loadAllServiceStatus(ctx, db)
if err != nil {
return nil, err
}
for i := range srvs {
srv := &srvs[i]
srv.MonitoringStatus = sdk.MonitoringStatus{Now: time.Now()}
for _, status := range ss {
if srv.ID == status.ServiceID {
srv.MonitoringStatus.Lines = append(srv.MonitoringStatus.Lines, status.MonitoringStatus.Lines...)
}
}
srvs[i] = *srv
}

return srvs, nil
return getAll(ctx, db, query, opts...)
}

// LoadAllByType returns all services with given type.
func LoadAllByType(ctx context.Context, db gorp.SqlExecutor, typeService string) ([]sdk.Service, error) {
func LoadAllByType(ctx context.Context, db gorp.SqlExecutor, typeService string, opts ...LoadOptionFunc) ([]sdk.Service, error) {
if ss, ok := internalCache.getFromCache(typeService); ok {
return ss, nil
}
query := gorpmapping.NewQuery(`SELECT * FROM service WHERE type = $1`).Args(typeService)
return getAll(ctx, db, query)
return getAll(ctx, db, query, opts...)
}

// LoadAllByTypeAndUserID returns all services that users can see with given type.
Expand All @@ -123,9 +114,9 @@ func LoadByNameAndType(ctx context.Context, db gorp.SqlExecutor, name, stype str
}

// LoadByName returns a service by its name.
func LoadByName(ctx context.Context, db gorp.SqlExecutor, name string) (*sdk.Service, error) {
func LoadByName(ctx context.Context, db gorp.SqlExecutor, name string, opts ...LoadOptionFunc) (*sdk.Service, error) {
query := gorpmapping.NewQuery("SELECT * FROM service WHERE name = $1").Args(name)
return get(ctx, db, query)
return get(ctx, db, query, opts...)
}

// LoadByID returns a service by its id.
Expand All @@ -141,46 +132,55 @@ func FindDeadServices(ctx context.Context, db gorp.SqlExecutor, t time.Duration)
}

// Insert a service in database.
func Insert(ctx context.Context, db gorpmapper.SqlExecutorWithTx, s *sdk.Service, authSessionID string) error {
func Insert(ctx context.Context, db gorpmapper.SqlExecutorWithTx, s *sdk.Service) error {
sdb := service{Service: *s}
if err := gorpmapping.InsertAndSign(ctx, db, &sdb); err != nil {
return err
}
*s = sdb.Service
return upsertStatus(db, s, authSessionID)
return nil
}

// Update a service in database.
func Update(ctx context.Context, db gorpmapper.SqlExecutorWithTx, s *sdk.Service, authSessionID string) error {
func Update(ctx context.Context, db gorpmapper.SqlExecutorWithTx, s *sdk.Service) error {
sdb := service{Service: *s}
if err := gorpmapping.UpdateAndSign(ctx, db, &sdb); err != nil {
return err
}
*s = sdb.Service
return upsertStatus(db, s, authSessionID)
return nil
}

// loadAllServiceStatus returns all services status
func loadAllServiceStatus(ctx context.Context, db gorp.SqlExecutor) ([]sdk.ServiceStatus, error) {
query := gorpmapping.NewQuery(`SELECT * FROM service_status`)
// loadAllServiceStatusByServiceID returns all services status for a service ID
func loadAllServiceStatusByServiceID(ctx context.Context, db gorp.SqlExecutor, serviceID int64) ([]sdk.ServiceStatus, error) {
yesnault marked this conversation as resolved.
Show resolved Hide resolved
query := gorpmapping.NewQuery(`SELECT * FROM service_status where service_id = $1`).Args(serviceID)
ss := []sdk.ServiceStatus{}
if err := gorpmapping.GetAll(ctx, db, query, &ss); err != nil {
return nil, sdk.WrapError(err, "cannot get services")
}
return ss, nil
}

func upsertStatus(db gorpmapper.SqlExecutorWithTx, s *sdk.Service, authSessionID string) error {
// UpsertStatus insert or update monitoring status
func UpsertStatus(db gorpmapper.SqlExecutorWithTx, s sdk.Service, authSessionID string) error {
var sessionID *string
if authSessionID != "" {
sessionID = &authSessionID
if authSessionID == "" {
// no sessionID : we can delete service_status to keep only one status
// example: each api has a consumerID and no authSessionID -> so only on status per service
query := "delete from service_status where service_id = $1"
if _, err := db.Exec(query, s.ID); err != nil {
return sdk.WithStack(err)
}
query = `INSERT INTO service_status(monitoring_status, service_id) VALUES($1,$2)`
_, err := db.Exec(query, s.MonitoringStatus, s.ID)
return sdk.WithStack(err)
}
sessionID = &authSessionID
query := `INSERT INTO service_status(monitoring_status, service_id, auth_session_id) VALUES($1,$2, $3)
ON CONFLICT (service_id, auth_session_id) DO UPDATE SET monitoring_status = $1, service_id = $2, auth_session_id = $3`
if _, err := db.Exec(query, s.MonitoringStatus, s.ID, sessionID); err != nil {
return sdk.WithStack(err)
}
return nil
_, err := db.Exec(query, s.MonitoringStatus, s.ID, sessionID)
return sdk.WithStack(err)

}

// Delete a service.
Expand Down
15 changes: 12 additions & 3 deletions engine/api/services/dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestDAO(t *testing.T) {
},
}

test.NoError(t, services.Insert(context.TODO(), db, &srv, ""))
test.NoError(t, services.Insert(context.TODO(), db, &srv))

srv2, err := services.LoadByName(context.TODO(), db, srv.Name)
test.NoError(t, err)
Expand Down Expand Up @@ -95,7 +95,8 @@ func TestDAOWithStatus(t *testing.T) {
},
}

test.NoError(t, services.Insert(context.TODO(), db, &srv, ""))
test.NoError(t, services.Insert(context.TODO(), db, &srv))
test.NoError(t, services.UpsertStatus(db, srv, ""))

srv2, err := services.LoadByName(context.TODO(), db, srv.Name)
test.NoError(t, err)
Expand All @@ -108,7 +109,7 @@ func TestDAOWithStatus(t *testing.T) {

assert.True(t, len(all) >= 1)

all2, err := services.LoadAllWithStatus(context.TODO(), db)
all2, err := services.LoadAll(context.TODO(), db, services.LoadOptions.WithStatus)
test.NoError(t, err)
var found bool
for _, s := range all2 {
Expand All @@ -124,6 +125,14 @@ func TestDAOWithStatus(t *testing.T) {

require.True(t, found)

srv3, err := services.LoadByName(context.TODO(), db, theServiceName)
test.NoError(t, err)
require.EqualValues(t, 0, len(srv3.MonitoringStatus.Lines))

srv4, err := services.LoadByName(context.TODO(), db, theServiceName, services.LoadOptions.WithStatus)
test.NoError(t, err)
require.EqualValues(t, 1, len(srv4.MonitoringStatus.Lines))

for _, s := range all {
test.NoError(t, services.Delete(db, &s))
}
Expand Down
14 changes: 11 additions & 3 deletions engine/api/services/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ func ping(ctx context.Context, db gorpmapper.SqlExecutorWithTx, s ExternalServic

srv.LastHeartbeat = time.Now()
srv.MonitoringStatus = mon
if err := Update(ctx, db, srv, ""); err != nil {
if err := Update(ctx, db, srv); err != nil {
log.Warning(ctx, "services.ping> unable to update external service: %v", err)
return err
}
if err := UpsertStatus(db, *srv, ""); err != nil {
log.Warning(ctx, "services.ping> unable to update monitoring status: %v", err)
return err
}
Expand Down Expand Up @@ -145,17 +149,21 @@ func initExternal(ctx context.Context, db *gorp.DbMap, s ExternalService) error
s.Service.LastHeartbeat = old.LastHeartbeat
s.Service.MonitoringStatus = old.MonitoringStatus
s.Service.Config = s.ServiceConfig()
if err := Update(ctx, tx, &s.Service, ""); err != nil {
if err := Update(ctx, tx, &s.Service); err != nil {
return sdk.WrapError(err, "unable to update external service")
}
} else {
s.Service.LastHeartbeat = time.Now()
s.Service.Config = s.ServiceConfig()
if err := Insert(ctx, tx, &s.Service, ""); err != nil {
if err := Insert(ctx, tx, &s.Service); err != nil {
return sdk.WrapError(err, "unable to insert external service")
}
}

if err := UpsertStatus(tx, s.Service, ""); err != nil {
return sdk.WrapError(err, "unable to insert or update monitoring status for external service")
}

if err := tx.Commit(); err != nil {
return sdk.WithStack(err)
}
Expand Down
Loading