From 4ebe1d3581c0d6e4d63331915e242ff4843ce8ae Mon Sep 17 00:00:00 2001 From: kifen Date: Tue, 7 Jan 2020 13:19:48 +0100 Subject: [PATCH] update transport deregistration logic --- pkg/transport-discovery/client/client.go | 30 ++++++++++++++++++++++++ pkg/transport/discovery.go | 17 ++++++++++++++ pkg/transport/manager.go | 14 +++++++++-- 3 files changed, 59 insertions(+), 2 deletions(-) diff --git a/pkg/transport-discovery/client/client.go b/pkg/transport-discovery/client/client.go index d29ec03d0b..78c09b3716 100644 --- a/pkg/transport-discovery/client/client.go +++ b/pkg/transport-discovery/client/client.go @@ -73,6 +73,16 @@ func (c *apiClient) Get(ctx context.Context, path string) (*http.Response, error return c.client.Do(req.WithContext(ctx)) } +// Delete performs a new DELETE request. +func (c *apiClient) Delete(ctx context.Context, path string) (*http.Response, error) { + req, err := http.NewRequest(http.MethodDelete, c.client.Addr()+path, new(bytes.Buffer)) + if err != nil { + return nil, err + } + + return c.client.Do(req.WithContext(ctx)) +} + // RegisterTransports registers new Transports. func (c *apiClient) RegisterTransports(ctx context.Context, entries ...*transport.SignedEntry) error { if len(entries) == 0 { @@ -150,6 +160,26 @@ func (c *apiClient) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) ( return entries, nil } +// DeleteTransport deletes given transport by it's ID. A visor can only delete transports if he is one of it's edges. +func (c *apiClient) DeleteTransport(ctx context.Context, id uuid.UUID) error { + resp, err := c.Delete(ctx, fmt.Sprintf("/transports/id:%s", id.String())) + if resp != nil { + defer func() { + if err := resp.Body.Close(); err != nil { + log.WithError(err).Warn("Failed to close HTTP response body") + } + }() + } + if err != nil { + return err + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("status: %d, error: %v", resp.StatusCode, extractError(resp.Body)) + } + + return nil +} + // UpdateStatuses updates statuses of transports in discovery. func (c *apiClient) UpdateStatuses(ctx context.Context, statuses ...*transport.Status) ([]*transport.EntryWithStatus, error) { if len(statuses) == 0 { diff --git a/pkg/transport/discovery.go b/pkg/transport/discovery.go index 2fdee01a2c..3238c3f588 100644 --- a/pkg/transport/discovery.go +++ b/pkg/transport/discovery.go @@ -5,6 +5,7 @@ import ( "errors" "sync" "time" + "fmt" "github.com/SkycoinProject/dmsg/cipher" "github.com/google/uuid" @@ -15,6 +16,7 @@ type DiscoveryClient interface { RegisterTransports(ctx context.Context, entries ...*SignedEntry) error GetTransportByID(ctx context.Context, id uuid.UUID) (*EntryWithStatus, error) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) ([]*EntryWithStatus, error) + DeleteTransport(ctx context.Context, id uuid.UUID) error UpdateStatuses(ctx context.Context, statuses ...*Status) ([]*EntryWithStatus, error) } @@ -81,6 +83,21 @@ func (td *mockDiscoveryClient) GetTransportsByEdge(ctx context.Context, pk ciphe return res, nil } +// NOTE that mock implementation doesn't checks whether the transport to be deleted is valid or not, this is, that +// it can be deleted by the visor who called DeleteTransport +func (td *mockDiscoveryClient) DeleteTransport(ctx context.Context, id uuid.UUID) error { + td.Lock() + defer td.Unlock() + + _, ok := td.entries[id] + if !ok { + return fmt.Errorf("transport with id: %s not found in transport discovery", id) + } + + delete(td.entries, id) + return nil +} + func (td *mockDiscoveryClient) UpdateStatuses(ctx context.Context, statuses ...*Status) ([]*EntryWithStatus, error) { res := make([]*EntryWithStatus, 0) diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index 0aedab6a1d..e463803efc 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -6,6 +6,7 @@ import ( "io" "strings" "sync" + "time" "github.com/SkycoinProject/skywire-mainnet/internal/skyenv" "github.com/SkycoinProject/skywire-mainnet/pkg/snet/snettest" @@ -224,7 +225,7 @@ func (tm *Manager) saveTransport(remote cipher.PubKey, netName string) (*Managed return mTp, nil } -// DeleteTransport disconnects and removes the Transport of Transport ID. +// DeleteTransport deregisters the Transport of Transport ID in transport discovery and deletes it locally. func (tm *Manager) DeleteTransport(id uuid.UUID) { tm.mx.Lock() defer tm.mx.Unlock() @@ -234,8 +235,17 @@ func (tm *Manager) DeleteTransport(id uuid.UUID) { if tp, ok := tm.tps[id]; ok { tp.Close() + tm.Logger.Infof("Deregister transport %s from manager", id) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + err := tm.Conf.DiscoveryClient.DeleteTransport(ctx, id) + if err != nil { + tm.Logger.Errorf("Deregister transport %s from discovery failed with error: %s", id, err) + } + tm.Logger.Infof("Deregister transport %s from discovery", id) + delete(tm.tps, id) - tm.Logger.Infof("Unregistered transport %s", id) } }