From ee2a44a265f0fd1cff13fb1f0f269e51361646df Mon Sep 17 00:00:00 2001 From: andig Date: Fri, 6 Dec 2024 15:43:22 +0100 Subject: [PATCH] Tibber Pulse: handle subscription timeout (#17619) --- meter/tibber-pulse.go | 159 ++++++++----------- templates/definition/meter/tibber-pulse.yaml | 6 +- 2 files changed, 74 insertions(+), 91 deletions(-) diff --git a/meter/tibber-pulse.go b/meter/tibber-pulse.go index fd7bcf0ffc..22fc601426 100644 --- a/meter/tibber-pulse.go +++ b/meter/tibber-pulse.go @@ -5,7 +5,7 @@ import ( "encoding/json" "errors" "net/http" - "sync" + "strings" "time" "github.com/evcc-io/evcc/api" @@ -17,23 +17,20 @@ import ( ) func init() { - registry.Add("tibber-pulse", NewTibberFromConfig) + registry.AddCtx("tibber-pulse", NewTibberFromConfig) } type Tibber struct { - mu sync.Mutex - log *util.Logger - updated time.Time - live tibber.LiveMeasurement - url string - token, homeID string - client *graphql.SubscriptionClient + data *util.Monitor[tibber.LiveMeasurement] } -func NewTibberFromConfig(other map[string]interface{}) (api.Meter, error) { - var cc struct { - Token string - HomeID string +func NewTibberFromConfig(ctx context.Context, other map[string]interface{}) (api.Meter, error) { + cc := struct { + Token string + HomeID string + Timeout time.Duration + }{ + Timeout: time.Minute, } if err := util.DecodeOther(other, &cc); err != nil { @@ -63,29 +60,19 @@ func NewTibberFromConfig(other map[string]interface{}) (api.Meter, error) { } } - ctx, cancel := context.WithTimeout(context.Background(), request.Timeout) + ctx2, cancel := context.WithTimeout(ctx, request.Timeout) defer cancel() - if err := qclient.Query(ctx, &res, nil); err != nil { + if err := qclient.Query(ctx2, &res, nil); err != nil { return nil, err } t := &Tibber{ - log: log, - url: res.Viewer.WebsocketSubscriptionUrl, - token: cc.Token, - homeID: cc.HomeID, + data: util.NewMonitor[tibber.LiveMeasurement](cc.Timeout), } - // run the client - err := t.reconnect() - - return t, err -} - -// newSubscriptionClient creates graphql subscription client -func (t *Tibber) newSubscriptionClient() { - t.client = graphql.NewSubscriptionClient(t.url). + // subscription client + client := graphql.NewSubscriptionClient(res.Viewer.WebsocketSubscriptionUrl). WithProtocol(graphql.GraphQLWS). WithWebSocketOptions(graphql.WebsocketOptions{ HTTPClient: &http.Client{ @@ -98,101 +85,95 @@ func (t *Tibber) newSubscriptionClient() { }, }). WithConnectionParams(map[string]any{ - "token": t.token, + "token": cc.Token, }). WithRetryTimeout(0). - WithLog(t.log.TRACE.Println) -} + WithTimeout(time.Second). + WithLog(log.TRACE.Println). + OnError(func(_ *graphql.SubscriptionClient, err error) error { + // exit the subscription client due to unauthorized error + if strings.Contains(err.Error(), "invalid x-hasura-admin-secret/x-hasura-access-key") { + return err + } + log.ERROR.Println(err) + return nil + }) -func (t *Tibber) subscribe(done chan error) { - var ( - once sync.Once - query struct { - tibber.LiveMeasurement `graphql:"liveMeasurement(homeId: $homeId)"` - } - ) + done := make(chan error, 1) + go func(done chan error) { + done <- t.subscribe(client, cc.HomeID) + }(done) - _, err := t.client.Subscribe(&query, map[string]any{ - "homeId": graphql.ID(t.homeID), - }, func(data []byte, err error) error { + select { + case err := <-done: if err != nil { - once.Do(func() { done <- err }) + return nil, err } + case <-time.After(cc.Timeout): + return nil, api.ErrTimeout + } - var res struct { - LiveMeasurement tibber.LiveMeasurement + go func() { + <-ctx.Done() + if err := client.Close(); err != nil { + log.ERROR.Println(err) } + }() - if err := json.Unmarshal(data, &res); err != nil { - once.Do(func() { done <- err }) - - t.log.ERROR.Println(err) - return nil + go func() { + if err := client.Run(); err != nil { + log.ERROR.Println(err) } + }() - t.mu.Lock() - t.live = res.LiveMeasurement - t.updated = time.Now() - t.mu.Unlock() - - once.Do(func() { close(done) }) + return t, nil +} - return nil - }) - if err != nil { - once.Do(func() { done <- err }) +func (t *Tibber) subscribe(client *graphql.SubscriptionClient, homeID string) error { + var query struct { + tibber.LiveMeasurement `graphql:"liveMeasurement(homeId: $homeId)"` } - go func() { - if err := t.client.Run(); err != nil { - once.Do(func() { done <- err }) + _, err := client.Subscribe(&query, map[string]any{ + "homeId": graphql.ID(homeID), + }, func(data []byte, err error) error { + if err != nil { + return err } - }() -} -func (t *Tibber) reconnect() error { - const timeout = time.Minute - - t.mu.Lock() - if time.Since(t.updated) <= timeout { - t.mu.Unlock() - return nil - } - t.mu.Unlock() + var res struct { + LiveMeasurement tibber.LiveMeasurement + } - if t.client != nil { - if err := t.client.Close(); err != nil { - t.log.DEBUG.Println("close:", err) + if err := json.Unmarshal(data, &res); err != nil { + return err } - } - t.newSubscriptionClient() + t.data.Set(res.LiveMeasurement) - done := make(chan error) - go t.subscribe(done) + return nil + }) - return <-done + return err } func (t *Tibber) CurrentPower() (float64, error) { - if err := t.reconnect(); err != nil { + res, err := t.data.Get() + if err != nil { return 0, err } - t.mu.Lock() - defer t.mu.Unlock() - return t.live.Power - t.live.PowerProduction, nil + return res.Power - res.PowerProduction, nil } var _ api.PhaseCurrents = (*Tibber)(nil) // Currents implements the api.PhaseCurrents interface func (t *Tibber) Currents() (float64, float64, float64, error) { - if err := t.reconnect(); err != nil { + res, err := t.data.Get() + if err != nil { return 0, 0, 0, err } - t.mu.Lock() - defer t.mu.Unlock() - return t.live.CurrentL1, t.live.CurrentL2, t.live.CurrentL3, nil + return res.CurrentL1, res.CurrentL2, res.CurrentL3, nil } diff --git a/templates/definition/meter/tibber-pulse.yaml b/templates/definition/meter/tibber-pulse.yaml index 66d55e073a..953842dd75 100644 --- a/templates/definition/meter/tibber-pulse.yaml +++ b/templates/definition/meter/tibber-pulse.yaml @@ -14,9 +14,11 @@ params: example: 5K4MVS-OjfWhK_4yrjOlFe1F6kJXPVf7eQYggo8ebAE - name: homeid example: 96a14971-525a-4420-aae9-e5aedaa129ff + - name: timeout + default: 1m + advanced: true render: | type: tibber-pulse token: {{ .token }} - {{- if .homeid }} homeid: {{ .homeid }} - {{- end }} + timeout: {{ .timeout }}