-
Notifications
You must be signed in to change notification settings - Fork 1
/
report.go
204 lines (178 loc) · 3.8 KB
/
report.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package report
import (
"errors"
"sync"
"time"
)
// Logger is the central logging agent on which to register events
type Logger struct {
exporters []Exporter
taskC chan task
stopC chan struct{}
wg sync.WaitGroup
baggage Data
count map[string]int
err error
errMutex sync.Mutex
}
// Data is a string-keyed map of unstructured data relevant to the event
type Data map[string]interface{}
type command int
const (
info command = iota
action
spanCmd
count
)
type task struct {
command command
event string
data Data
ackC chan<- int
}
// New creates an instance of a logging agent
//
// logger := report.New("myAppName")
// logger.Export(report.StdOutJSON())
// defer logger.Stop()
//
func New(name string) *Logger {
logger := Logger{
taskC: make(chan task, 1),
stopC: make(chan struct{}),
baggage: Data{
"service_name": name,
},
count: make(map[string]int),
}
go logger.run()
return &logger
}
// Baggage adds a key value pair that is included in every logged event
func (l *Logger) Baggage(key string, value interface{}) {
l.baggage[key] = value
}
// Export configures an external service to receive log events
func (l *Logger) Export(e Exporter) {
l.exporters = append(l.exporters, e)
}
// Info logs event that provide telemetry measures or context to any events requiring action.
func (l *Logger) Info(event string, payload Data) <-chan int {
ack := make(chan int)
l.taskC <- task{
command: info,
event: event,
data: payload,
ackC: ack,
}
return ack
}
// Action events that need intervention or resolving.
func (l *Logger) Action(event string, payload Data) <-chan int {
ack := make(chan int)
l.taskC <- task{
command: action,
event: event,
data: payload,
ackC: ack,
}
return ack
}
// Count returns the number of log events of a particular type since startup
func (l *Logger) Count(event string) int {
ack := make(chan int)
l.taskC <- task{
command: count,
event: event,
data: Data{},
ackC: ack,
}
return <-ack
}
// Err returns the last Actionable log event or encoding error if either occurred
func (l *Logger) Err() error {
l.errMutex.Lock()
defer l.errMutex.Unlock()
return l.err
}
// Send exports a raw data event to configured external services
func (l *Logger) Send(d Data) error {
var err error
for _, e := range l.exporters {
if err == nil {
err = e.Send(d)
}
}
return err
}
// Close shuts down the logging agent
func (l *Logger) Close() {
if l.isClosing() {
l.wg.Wait()
return
}
close(l.taskC)
close(l.stopC)
for _, e := range l.exporters {
e.Close()
}
l.wg.Wait()
}
func (l *Logger) isClosing() bool {
select {
case <-l.stopC:
return true
default:
}
return false
}
func (l *Logger) run() {
l.wg.Add(1)
toNewTask:
for t := range l.taskC {
if t.command == count {
n, exists := l.count[t.event]
if exists {
t.ackC <- n
} else {
t.ackC <- 0
}
close(t.ackC)
continue toNewTask
}
n, exists := l.count[t.event]
if exists {
l.count[t.event] = n + 1
} else {
l.count[t.event] = 1
}
t.data["name"] = t.event
// timestamp is not overwritten if it already exists
// (e.g. endspan needs to log startspan timestamp)
if _, exists := t.data["timestamp"]; !exists {
t.data["timestamp"] = time.Now().Format(time.RFC3339Nano)
}
for k, v := range l.baggage {
t.data[k] = v
}
switch t.command {
case info:
t.data["type"] = "info"
case action:
t.data["type"] = "action"
l.errMutex.Lock()
l.err = errors.New("Actionable event: " + t.event)
l.errMutex.Unlock()
case spanCmd:
t.data["type"] = "span"
}
if err := l.Send(t.data); err != nil {
msg := "Error sending " + t.event + ": " + err.Error()
l.errMutex.Lock()
l.err = errors.New(msg)
l.errMutex.Unlock()
}
close(t.ackC)
}
l.wg.Done()
}