-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsingleflight.go
108 lines (94 loc) · 2.42 KB
/
singleflight.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package ggcache
// This module provides a duplicate function call suppression
// mechanism.
import (
"context"
"fmt"
"sync"
)
// call is an in-flight or completed Do call
type call[V any] struct {
// signals when the call is done
doneCond chan struct{}
// doneOnce sync.Once
// the value returned by the call
val V
// any error encountered during the call
err error
}
func newCall[V any]() *call[V] {
return &call[V]{
doneCond: make(chan struct{}),
}
}
func (c *call[V]) done(val V, err error, onDone ...func()) {
c.val = val
c.err = err
close(c.doneCond)
for _, f := range onDone {
f()
}
}
// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group[K comparable, V any] struct {
mu sync.RWMutex // protects m
m map[K]*call[V] // lazily initialized
}
func NewGroup[K comparable, V any]() *Group[K, V] {
return &Group[K, V]{
m: make(map[K]*call[V]),
}
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
//
// Please ensure that the loader function properly utilizes the context to avoid blocking operations.
func (g *Group[K, V]) Do(key K, loader LoaderFunc[K, V], ctx context.Context) (val V, loaderCalled bool, err error) {
var caller *call[V]
var firstCall = false
g.mu.RLock()
caller = g.m[key]
g.mu.RUnlock()
// If no in-flight call, start a new one
if caller == nil {
g.mu.Lock()
if g.m[key] == nil {
caller = newCall[V]()
g.m[key] = caller
firstCall = true
} else {
caller = g.m[key]
}
g.mu.Unlock()
}
if firstCall {
// For performance reasons, we do not use a goroutine here
// go g.call(caller, key, loader, ctx)
g.call(caller, key, loader, ctx)
} else {
<-caller.doneCond
}
return caller.val, firstCall, caller.err
}
func (g *Group[K, V]) call(caller *call[V], key K, loader LoaderFunc[K, V], ctx context.Context) {
var (
val V
err error
)
defer func() {
if r := recover(); r != nil {
caller.done(*new(V), fmt.Errorf("loader panic: %v, key: %v", r, key), func() { g.removeKey(key) })
} else {
caller.done(val, err, func() { g.removeKey(key) })
}
}()
val, err = loader(key, ctx)
}
func (g *Group[K, V]) removeKey(key K) {
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}