Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: status for all instances #5437

Merged
merged 9 commits into from
Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cli/cdsctl/admin_cdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ var adminCdnStatusCmd = cli.Command{
}

func adminCdnStatusRun(v cli.Values) (cli.ListResult, error) {
btes, err := client.ServiceCallGET("cdn", "/mon/status")
services, err := client.ServicesByType("cdn")
yesnault marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
status := sdk.MonitoringStatus{}
if err := json.Unmarshal(btes, &status); err != nil {
return nil, err
for _, srv := range services {
status.Lines = append(status.Lines, srv.MonitoringStatus.Lines...)
}
return cli.AsListResult(status.Lines), nil
}
Expand Down
2 changes: 1 addition & 1 deletion engine/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (api *API) getAdminServiceHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
vars := mux.Vars(r)
name := vars["name"]
srv, err := services.LoadByName(ctx, api.mustDB(), name)
srv, err := services.LoadByNameWithStatus(ctx, api.mustDB(), name)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,9 @@ func (a *API) Serve(ctx context.Context) error {
go event.DequeueEvent(ctx, a.mustDB())
}

// here the generated name of API is ready, we set ServerName with that
a.Common.ServiceName = event.GetCDSName()

log.Info(ctx, "Initializing internal routines...")
a.GoRoutines.Run(ctx, "maintenance.Subscribe", func(ctx context.Context) {
if err := a.listenMaintenance(ctx); err != nil {
Expand Down
27 changes: 27 additions & 0 deletions engine/api/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func (api *API) postServiceRegisterHandler() service.Handler {
}

// Update or create the service

var sessionID string
if a := getAuthSession(ctx); a != nil {
sessionID = a.ID
}
if exists {
srv.Update(data)
if err := services.Update(ctx, tx, srv); err != nil {
Expand All @@ -108,12 +113,17 @@ func (api *API) postServiceRegisterHandler() service.Handler {
} else {
srv = &data
srv.ConsumerID = &consumer.ID

if err := services.Insert(ctx, tx, srv); err != nil {
return sdk.WithStack(err)
}
log.Debug("postServiceRegisterHandler> insert new service %s(%d) registered for consumer %s", srv.Name, srv.ID, *srv.ConsumerID)
}

if err := services.UpsertStatus(tx, *srv, sessionID); err != nil {
return sdk.WithStack(err)
}

if len(srv.PublicKey) > 0 {
log.Debug("postServiceRegisterHandler> service %s registered with public key: %s", srv.Name, string(srv.PublicKey))
}
Expand Down Expand Up @@ -173,10 +183,18 @@ func (api *API) postServiceHearbeatHandler() service.Handler {
s.LastHeartbeat = time.Now()
s.MonitoringStatus = mon

var sessionID string
if a := getAuthSession(ctx); a != nil {
sessionID = a.ID
}
if err := services.Update(ctx, tx, s); err != nil {
return err
}

if err := services.UpsertStatus(tx, *s, sessionID); err != nil {
return err
}

if err := tx.Commit(); err != nil {
return sdk.WithStack(err)
}
Expand Down Expand Up @@ -238,6 +256,10 @@ func (api *API) serviceAPIHeartbeatUpdate(ctx context.Context, db *gorp.DbMap) {
return
}

var authSessionID string
if a := getAuthSession(ctx); a != nil {
authSessionID = a.ID
}
if exists {
srv.ID = old.ID
if err := services.Update(ctx, tx, srv); err != nil {
Expand All @@ -251,6 +273,11 @@ func (api *API) serviceAPIHeartbeatUpdate(ctx context.Context, db *gorp.DbMap) {
}
}

if err := services.UpsertStatus(tx, *srv, authSessionID); err != nil {
log.Error(ctx, "serviceAPIHeartbeat> Unable to insert or update monitoring status %s: %v", srv.Name, err)
return
}

if err := tx.Commit(); err != nil {
log.Error(ctx, "serviceAPIHeartbeat> error tx commit: %v", err)
return
Expand Down
106 changes: 103 additions & 3 deletions engine/api/services/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ func get(ctx context.Context, db gorp.SqlExecutor, q gorpmapping.Query) (*sdk.Se
// LoadAll returns all services in database.
func LoadAll(ctx context.Context, db gorp.SqlExecutor) ([]sdk.Service, error) {
query := gorpmapping.NewQuery(`SELECT * FROM service`)
return getAll(ctx, db, query)
services, err := getAll(ctx, db, query)
if err != nil {
return nil, err
}
return withServiceStatus(ctx, db, services)
yesnault marked this conversation as resolved.
Show resolved Hide resolved
}

// LoadAllByType returns all services with given type.
Expand All @@ -71,10 +75,45 @@ func LoadAllByType(ctx context.Context, db gorp.SqlExecutor, typeService string)
return ss, nil
}
query := gorpmapping.NewQuery(`SELECT * FROM service WHERE type = $1`).Args(typeService)
return getAll(ctx, db, query)
services, err := getAll(ctx, db, query)
if err != nil {
return nil, err
}
return withServiceStatus(ctx, db, services)
}

// LoadAllByType returns all services that users can see with given type.
func withServiceStatus(ctx context.Context, db gorp.SqlExecutor, services []sdk.Service) ([]sdk.Service, error) {
ss, err := loadAllServiceStatus(ctx, db)
if err != nil {
return nil, err
}
for i := range services {
srv := &services[i]
srv.MonitoringStatus = sdk.MonitoringStatus{Now: time.Now()}
completeStatus(ss, srv)
services[i] = *srv
}

return services, nil
}

func completeStatus(ss []sdk.ServiceStatus, srv *sdk.Service) {
for _, status := range ss {
if srv.ID == status.ServiceID {
for _, line := range status.MonitoringStatus.Lines {
if status.SessionID != nil {
line.SessionID = *status.SessionID
}
if srv.ConsumerID != nil {
line.ConsumerID = *srv.ConsumerID
}
srv.MonitoringStatus.Lines = append(srv.MonitoringStatus.Lines, line)
}
}
}
}

// LoadAllByTypeAndUserID returns all services that users can see with given type.
func LoadAllByTypeAndUserID(ctx context.Context, db gorp.SqlExecutor, typeService string, userID string) ([]sdk.Service, error) {
query := gorpmapping.NewQuery(`
SELECT service.*
Expand Down Expand Up @@ -102,6 +141,25 @@ func LoadByName(ctx context.Context, db gorp.SqlExecutor, name string) (*sdk.Ser
return get(ctx, db, query)
}

// LoadByNameWithStatus returns a service by its name.
func LoadByNameWithStatus(ctx context.Context, db gorp.SqlExecutor, name string) (*sdk.Service, error) {
query := gorpmapping.NewQuery("SELECT * FROM service WHERE name = $1").Args(name)
srv, err := get(ctx, db, query)

if err != nil {
return nil, err
}

ss, err := loadAllServiceStatusByServiceID(ctx, db, srv.ID)
if err != nil {
return nil, err
}

completeStatus(ss, srv)

return srv, nil
}

// LoadByID returns a service by its id.
func LoadByID(ctx context.Context, db gorp.SqlExecutor, id int64) (*sdk.Service, error) {
query := gorpmapping.NewQuery("SELECT * FROM service WHERE id = $1").Args(id)
Expand Down Expand Up @@ -134,6 +192,48 @@ func Update(ctx context.Context, db gorpmapper.SqlExecutorWithTx, s *sdk.Service
return nil
}

// loadAllServiceStatusByServiceID returns all services status for a service ID
func loadAllServiceStatusByServiceID(ctx context.Context, db gorp.SqlExecutor, serviceID int64) ([]sdk.ServiceStatus, error) {
yesnault marked this conversation as resolved.
Show resolved Hide resolved
query := gorpmapping.NewQuery(`SELECT * FROM service_status where service_id = $1`).Args(serviceID)
ss := []sdk.ServiceStatus{}
if err := gorpmapping.GetAll(ctx, db, query, &ss); err != nil {
return nil, sdk.WrapError(err, "cannot get services")
}
return ss, nil
}

// loadAllServiceStatus returns all services status
func loadAllServiceStatus(ctx context.Context, db gorp.SqlExecutor) ([]sdk.ServiceStatus, error) {
query := gorpmapping.NewQuery(`SELECT * FROM service_status`)
ss := []sdk.ServiceStatus{}
if err := gorpmapping.GetAll(ctx, db, query, &ss); err != nil {
return nil, sdk.WrapError(err, "cannot get services")
}
return ss, nil
}

// UpsertStatus insert or update monitoring status
func UpsertStatus(db gorpmapper.SqlExecutorWithTx, s sdk.Service, authSessionID string) error {
var sessionID *string
if authSessionID == "" {
// no sessionID : we can delete service_status to keep only one status
// example: each api has a consumerID and no authSessionID -> so only on status per service
query := "delete from service_status where service_id = $1"
if _, err := db.Exec(query, s.ID); err != nil {
return sdk.WithStack(err)
}
query = `INSERT INTO service_status(monitoring_status, service_id) VALUES($1,$2)`
_, err := db.Exec(query, s.MonitoringStatus, s.ID)
return sdk.WithStack(err)
}
sessionID = &authSessionID
query := `INSERT INTO service_status(monitoring_status, service_id, auth_session_id) VALUES($1,$2, $3)
ON CONFLICT (service_id, auth_session_id) DO UPDATE SET monitoring_status = $1, service_id = $2, auth_session_id = $3`
_, err := db.Exec(query, s.MonitoringStatus, s.ID, sessionID)
return sdk.WithStack(err)

}

// Delete a service.
func Delete(db gorp.SqlExecutor, s *sdk.Service) error {
if s.Type == sdk.TypeHatchery {
Expand Down
77 changes: 77 additions & 0 deletions engine/api/services/dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package services_test
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ovh/cds/engine/api/services"
"github.com/ovh/cds/engine/api/test"
Expand Down Expand Up @@ -55,3 +57,78 @@ func TestDAO(t *testing.T) {
_, err = services.FindDeadServices(context.TODO(), db, 0)
test.NoError(t, err)
}

func TestDAOWithStatus(t *testing.T) {
db, _ := test.SetupPG(t)

allSrv, err := services.LoadAll(context.TODO(), db)
for _, s := range allSrv {
if err := services.Delete(db, &s); err != nil {
t.Fatalf("unable to delete service: %v", err)
}
}

privateKey, err := jws.NewRandomRSAKey()
test.NoError(t, err)
publicKey, err := jws.ExportPublicKey(privateKey)
test.NoError(t, err)

theServiceName := sdk.RandomString(10)
var srv = sdk.Service{
CanonicalService: sdk.CanonicalService{
Name: theServiceName,
Type: "type-service-test",
PublicKey: publicKey,
},
MonitoringStatus: sdk.MonitoringStatus{
Now: time.Now(),
Lines: []sdk.MonitoringStatusLine{
{
Component: "backend/cds-backend/items",
Hostname: "foofoo.local",
Service: "cds-cdn",
Status: "OK",
Type: "cdn",
Value: "90",
},
},
},
}

test.NoError(t, services.Insert(context.TODO(), db, &srv))
test.NoError(t, services.UpsertStatus(db, srv, ""))

srv2, err := services.LoadByName(context.TODO(), db, srv.Name)
test.NoError(t, err)

assert.Equal(t, srv.Name, srv2.Name)
assert.Equal(t, string(srv.PublicKey), string(srv2.PublicKey))

all, err := services.LoadAllByType(context.TODO(), db, srv.Type)
test.NoError(t, err)

assert.True(t, len(all) >= 1)

all2, err := services.LoadAll(context.TODO(), db)
test.NoError(t, err)
var found bool
for _, s := range all2 {
if s.Name == theServiceName {
found = true
require.EqualValues(t, 1, len(s.MonitoringStatus.Lines))
for _, ss := range s.MonitoringStatus.Lines {
require.EqualValues(t, "backend/cds-backend/items", ss.Component)
}
break
}
}

require.True(t, found)

for _, s := range all {
test.NoError(t, services.Delete(db, &s))
}

_, err = services.FindDeadServices(context.TODO(), db, 0)
test.NoError(t, err)
}
17 changes: 13 additions & 4 deletions engine/api/services/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type ExternalService struct {
Path string `json:"path"`
}

// ServiceConfig return sthe serviceConfig for the current ExternalService
func (e ExternalService) ServiceConfig() sdk.ServiceConfig {
b, _ := json.Marshal(e)
var cfg sdk.ServiceConfig
Expand Down Expand Up @@ -64,7 +65,7 @@ func Pings(ctx context.Context, dbFunc func() *gorp.DbMap, ss []ExternalService)

func ping(ctx context.Context, db gorpmapper.SqlExecutorWithTx, s ExternalService) error {
// Select for update
serv, err := LoadByName(context.Background(), db, s.Name)
srv, err := LoadByName(context.Background(), db, s.Name)
if err != nil {
return sdk.WithStack(err)
}
Expand Down Expand Up @@ -101,9 +102,13 @@ func ping(ctx context.Context, db gorpmapper.SqlExecutorWithTx, s ExternalServic
mon.Lines[0].Value = "Health: OK"
}

serv.LastHeartbeat = time.Now()
serv.MonitoringStatus = mon
if err := Update(ctx, db, serv); err != nil {
srv.LastHeartbeat = time.Now()
srv.MonitoringStatus = mon
if err := Update(ctx, db, srv); err != nil {
log.Warning(ctx, "services.ping> unable to update external service: %v", err)
return err
}
if err := UpsertStatus(db, *srv, ""); err != nil {
log.Warning(ctx, "services.ping> unable to update monitoring status: %v", err)
return err
}
Expand Down Expand Up @@ -155,6 +160,10 @@ func initExternal(ctx context.Context, db *gorp.DbMap, s ExternalService) error
}
}

if err := UpsertStatus(tx, s.Service, ""); err != nil {
return sdk.WrapError(err, "unable to insert or update monitoring status for external service")
}

if err := tx.Commit(); err != nil {
return sdk.WithStack(err)
}
Expand Down
1 change: 1 addition & 0 deletions engine/api/services/gorp_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ func (s service) Canonical() gorpmapper.CanonicalForms {
func init() {
gorpmapping.Register(
gorpmapping.New(service{}, "service", true, "id"),
gorpmapping.New(sdk.ServiceStatus{}, "service_status", true, "id"),
)
}
Loading