Skip to content

Commit

Permalink
Tibber Pulse: handle subscription timeout (evcc-io#17619)
Browse files Browse the repository at this point in the history
  • Loading branch information
andig authored and jonilala796 committed Jan 3, 2025
1 parent 69a3ee5 commit ee2a44a
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 91 deletions.
159 changes: 70 additions & 89 deletions meter/tibber-pulse.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"encoding/json"
"errors"
"net/http"
"sync"
"strings"
"time"

"github.com/evcc-io/evcc/api"
Expand All @@ -17,23 +17,20 @@ import (
)

func init() {
registry.Add("tibber-pulse", NewTibberFromConfig)
registry.AddCtx("tibber-pulse", NewTibberFromConfig)
}

type Tibber struct {
mu sync.Mutex
log *util.Logger
updated time.Time
live tibber.LiveMeasurement
url string
token, homeID string
client *graphql.SubscriptionClient
data *util.Monitor[tibber.LiveMeasurement]
}

func NewTibberFromConfig(other map[string]interface{}) (api.Meter, error) {
var cc struct {
Token string
HomeID string
func NewTibberFromConfig(ctx context.Context, other map[string]interface{}) (api.Meter, error) {
cc := struct {
Token string
HomeID string
Timeout time.Duration
}{
Timeout: time.Minute,
}

if err := util.DecodeOther(other, &cc); err != nil {
Expand Down Expand Up @@ -63,29 +60,19 @@ func NewTibberFromConfig(other map[string]interface{}) (api.Meter, error) {
}
}

ctx, cancel := context.WithTimeout(context.Background(), request.Timeout)
ctx2, cancel := context.WithTimeout(ctx, request.Timeout)
defer cancel()

if err := qclient.Query(ctx, &res, nil); err != nil {
if err := qclient.Query(ctx2, &res, nil); err != nil {
return nil, err
}

t := &Tibber{
log: log,
url: res.Viewer.WebsocketSubscriptionUrl,
token: cc.Token,
homeID: cc.HomeID,
data: util.NewMonitor[tibber.LiveMeasurement](cc.Timeout),
}

// run the client
err := t.reconnect()

return t, err
}

// newSubscriptionClient creates graphql subscription client
func (t *Tibber) newSubscriptionClient() {
t.client = graphql.NewSubscriptionClient(t.url).
// subscription client
client := graphql.NewSubscriptionClient(res.Viewer.WebsocketSubscriptionUrl).
WithProtocol(graphql.GraphQLWS).
WithWebSocketOptions(graphql.WebsocketOptions{
HTTPClient: &http.Client{
Expand All @@ -98,101 +85,95 @@ func (t *Tibber) newSubscriptionClient() {
},
}).
WithConnectionParams(map[string]any{
"token": t.token,
"token": cc.Token,
}).
WithRetryTimeout(0).
WithLog(t.log.TRACE.Println)
}
WithTimeout(time.Second).
WithLog(log.TRACE.Println).
OnError(func(_ *graphql.SubscriptionClient, err error) error {
// exit the subscription client due to unauthorized error
if strings.Contains(err.Error(), "invalid x-hasura-admin-secret/x-hasura-access-key") {
return err
}
log.ERROR.Println(err)
return nil
})

func (t *Tibber) subscribe(done chan error) {
var (
once sync.Once
query struct {
tibber.LiveMeasurement `graphql:"liveMeasurement(homeId: $homeId)"`
}
)
done := make(chan error, 1)
go func(done chan error) {
done <- t.subscribe(client, cc.HomeID)
}(done)

_, err := t.client.Subscribe(&query, map[string]any{
"homeId": graphql.ID(t.homeID),
}, func(data []byte, err error) error {
select {
case err := <-done:
if err != nil {
once.Do(func() { done <- err })
return nil, err
}
case <-time.After(cc.Timeout):
return nil, api.ErrTimeout
}

var res struct {
LiveMeasurement tibber.LiveMeasurement
go func() {
<-ctx.Done()
if err := client.Close(); err != nil {
log.ERROR.Println(err)
}
}()

if err := json.Unmarshal(data, &res); err != nil {
once.Do(func() { done <- err })

t.log.ERROR.Println(err)
return nil
go func() {
if err := client.Run(); err != nil {
log.ERROR.Println(err)
}
}()

t.mu.Lock()
t.live = res.LiveMeasurement
t.updated = time.Now()
t.mu.Unlock()

once.Do(func() { close(done) })
return t, nil
}

return nil
})
if err != nil {
once.Do(func() { done <- err })
func (t *Tibber) subscribe(client *graphql.SubscriptionClient, homeID string) error {
var query struct {
tibber.LiveMeasurement `graphql:"liveMeasurement(homeId: $homeId)"`
}

go func() {
if err := t.client.Run(); err != nil {
once.Do(func() { done <- err })
_, err := client.Subscribe(&query, map[string]any{
"homeId": graphql.ID(homeID),
}, func(data []byte, err error) error {
if err != nil {
return err
}
}()
}

func (t *Tibber) reconnect() error {
const timeout = time.Minute

t.mu.Lock()
if time.Since(t.updated) <= timeout {
t.mu.Unlock()
return nil
}
t.mu.Unlock()
var res struct {
LiveMeasurement tibber.LiveMeasurement
}

if t.client != nil {
if err := t.client.Close(); err != nil {
t.log.DEBUG.Println("close:", err)
if err := json.Unmarshal(data, &res); err != nil {
return err
}
}

t.newSubscriptionClient()
t.data.Set(res.LiveMeasurement)

done := make(chan error)
go t.subscribe(done)
return nil
})

return <-done
return err
}

func (t *Tibber) CurrentPower() (float64, error) {
if err := t.reconnect(); err != nil {
res, err := t.data.Get()
if err != nil {
return 0, err
}

t.mu.Lock()
defer t.mu.Unlock()
return t.live.Power - t.live.PowerProduction, nil
return res.Power - res.PowerProduction, nil
}

var _ api.PhaseCurrents = (*Tibber)(nil)

// Currents implements the api.PhaseCurrents interface
func (t *Tibber) Currents() (float64, float64, float64, error) {
if err := t.reconnect(); err != nil {
res, err := t.data.Get()
if err != nil {
return 0, 0, 0, err
}

t.mu.Lock()
defer t.mu.Unlock()
return t.live.CurrentL1, t.live.CurrentL2, t.live.CurrentL3, nil
return res.CurrentL1, res.CurrentL2, res.CurrentL3, nil
}
6 changes: 4 additions & 2 deletions templates/definition/meter/tibber-pulse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ params:
example: 5K4MVS-OjfWhK_4yrjOlFe1F6kJXPVf7eQYggo8ebAE
- name: homeid
example: 96a14971-525a-4420-aae9-e5aedaa129ff
- name: timeout
default: 1m
advanced: true
render: |
type: tibber-pulse
token: {{ .token }}
{{- if .homeid }}
homeid: {{ .homeid }}
{{- end }}
timeout: {{ .timeout }}

0 comments on commit ee2a44a

Please sign in to comment.