forked from go-joe/joe
-
Notifications
You must be signed in to change notification settings - Fork 0
/
adapter.go
179 lines (154 loc) · 5.13 KB
/
adapter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package joe
import (
"bufio"
"fmt"
"io"
"os"
"sync"
"github.com/pkg/errors"
"go.uber.org/zap"
)
// An Adapter connects the bot with the chat by enabling it to receive and send
// messages. Additionally advanced adapters can emit more events than just the
// ReceiveMessageEvent (e.g. the slack adapter also emits the UserTypingEvent).
// All adapter events must be setup in the RegisterAt function of the Adapter.
//
// Joe provides a default CLIAdapter implementation which connects the bot with
// the local shell to receive messages from stdin and print messages to stdout.
type Adapter interface {
RegisterAt(*Brain)
Send(text, channel string) error
Close() error
}
// The CLIAdapter is the default Adapter implementation that the bot uses if no
// other adapter was configured. It emits a ReceiveMessageEvent for each line it
// receives from stdin and prints all sent messages to stdout.
//
// The CLIAdapter does not set the Message.Data field.
type CLIAdapter struct {
Prefix string
Input io.ReadCloser
Output io.Writer
Logger *zap.Logger
Author string // used to set the author of the messages, defaults to os.Getenv("USER)
mu sync.Mutex // protects the Output and closing channel
closing chan chan error
}
// NewCLIAdapter creates a new CLIAdapter. The caller must call Close
// to make the CLIAdapter stop reading messages and emitting events.
func NewCLIAdapter(name string, logger *zap.Logger) *CLIAdapter {
return &CLIAdapter{
Prefix: fmt.Sprintf("%s > ", name),
Input: os.Stdin,
Output: os.Stdout,
Logger: logger,
Author: os.Getenv("USER"),
closing: make(chan chan error),
}
}
// RegisterAt starts the CLIAdapter by reading messages from stdin and emitting
// a ReceiveMessageEvent for each of them. Additionally the adapter hooks into
// the InitEvent to print a nice prefix to stdout to show to the user it is
// ready to accept input.
func (a *CLIAdapter) RegisterAt(brain *Brain) {
brain.RegisterHandler(func(evt InitEvent) {
_ = a.print(a.Prefix)
})
go a.loop(brain)
}
func (a *CLIAdapter) loop(brain *Brain) {
input := a.readLines()
// The adapter loop is built to stay responsive even if the Brain stops
// processing events so we can safely close the CLIAdapter.
//
// We want to print the prefix each time when the Brain has completely
// processed a ReceiveMessageEvent and before we are emitting the next one.
// This gives us a shell-like behavior which signals to the user that she
// can input more data on the CLI. This channel is buffered so we do not
// block the Brain when it executes the callback.
callback := make(chan Event, 1)
callbackFun := func(evt Event) {
callback <- evt
}
var lines = input // channel represents the case that we receive a new message
for {
select {
case msg, ok := <-lines:
if !ok {
// no more input from stdin
lines = nil // disable this case and wait for closing signal
continue
}
lines = nil // disable this case and wait for the callback
brain.Emit(ReceiveMessageEvent{Text: msg, AuthorID: a.Author}, callbackFun)
case <-callback:
// This case is executed after all ReceiveMessageEvent handlers have
// completed and we can continue with the next line.
_ = a.print(a.Prefix)
lines = input // activate first case again
case result := <-a.closing:
if lines == nil {
// We were just waiting for our callback
_ = a.print(a.Prefix)
}
_ = a.print("\n")
result <- a.Input.Close()
return
}
}
}
// ReadLines reads lines from stdin and returns them in a channel.
// All strings in the returned channel will not include the trailing newline.
// The channel is closed automatically when a.Input is closed.
func (a *CLIAdapter) readLines() <-chan string {
r := bufio.NewReader(a.Input)
lines := make(chan string)
go func() {
// This goroutine will exit when we call a.Input.Close() which will make
// r.ReadString(…) return an io.EOF.
for {
line, err := r.ReadString('\n')
switch {
case err == io.EOF:
close(lines)
return
case err != nil:
a.Logger.Error("Failed to read messages from input", zap.Error(err))
return
}
lines <- line[:len(line)-1]
}
}()
return lines
}
// Send implements the Adapter interface by sending the given text to stdout.
// The channel argument is required by the Adapter interface but is otherwise ignored.
func (a *CLIAdapter) Send(text, channel string) error {
return a.print(text + "\n")
}
// Close makes the CLIAdapter stop emitting any new events or printing any output.
// Calling this function more than once will result in an error.
func (a *CLIAdapter) Close() error {
if a.closing == nil {
return errors.Errorf("already closed")
}
a.Logger.Debug("Closing CLIAdapter")
callback := make(chan error)
a.closing <- callback
err := <-callback
// Mark CLIAdapter as closed by setting its closing channel to nil.
// This will prevent any more output to be printed after this function returns.
a.mu.Lock()
a.closing = nil
a.mu.Unlock()
return err
}
func (a *CLIAdapter) print(msg string) error {
a.mu.Lock()
if a.closing == nil {
return errors.New("adapter is closed")
}
_, err := fmt.Fprint(a.Output, msg)
a.mu.Unlock()
return err
}