diff --git a/examples/socketmode/socketmode.go b/examples/socketmode/socketmode.go new file mode 100644 index 000000000..69e34c294 --- /dev/null +++ b/examples/socketmode/socketmode.go @@ -0,0 +1,149 @@ +package main + +import ( + "fmt" + "log" + "os" + "strings" + + "github.com/slack-go/slack/socketmode" + + "github.com/slack-go/slack" + "github.com/slack-go/slack/slackevents" +) + +func main() { + appToken := os.Getenv("SLACK_APP_TOKEN") + if appToken == "" { + + } + + if !strings.HasPrefix(appToken, "xapp-") { + fmt.Fprintf(os.Stderr, "SLACK_APP_TOKEN must have the prefix \"xapp-\".") + } + + botToken := os.Getenv("SLACK_BOT_TOKEN") + if botToken == "" { + fmt.Fprintf(os.Stderr, "SLACK_BOT_TOKEN must be set.\n") + os.Exit(1) + } + + if !strings.HasPrefix(botToken, "xoxb-") { + fmt.Fprintf(os.Stderr, "SLACK_BOT_TOKEN must have the prefix \"xoxb-\".") + } + + api := slack.New( + botToken, + slack.OptionDebug(true), + slack.OptionLog(log.New(os.Stdout, "api: ", log.Lshortfile|log.LstdFlags)), + slack.OptionAppLevelToken(appToken), + ) + + client := socketmode.New( + api, + socketmode.OptionDebug(true), + socketmode.OptionLog(log.New(os.Stdout, "socketmode: ", log.Lshortfile|log.LstdFlags)), + ) + + go func() { + for evt := range client.Events { + switch evt.Type { + case socketmode.EventTypeConnecting: + fmt.Println("Connecting to Slack with Socket Mode...") + case socketmode.EventTypeConnectionError: + fmt.Println("Connection failed. Retrying later...") + case socketmode.EventTypeConnected: + fmt.Println("Connected to Slack with Socket Mode.") + case socketmode.EventTypeEventsAPI: + eventsAPIEvent, ok := evt.Data.(slackevents.EventsAPIEvent) + if !ok { + fmt.Printf("Ignored %+v\n", evt) + + continue + } + + fmt.Printf("Event received: %+v\n", eventsAPIEvent) + + client.Ack(*evt.Request) + + switch eventsAPIEvent.Type { + case slackevents.CallbackEvent: + innerEvent := eventsAPIEvent.InnerEvent + switch ev := innerEvent.Data.(type) { + case *slackevents.AppMentionEvent: + _, _, err := api.PostMessage(ev.Channel, slack.MsgOptionText("Yes, hello.", false)) + if err != nil { + fmt.Printf("failed posting message: %v", err) + } + case *slackevents.MemberJoinedChannelEvent: + fmt.Printf("user %q joined to channel %q", ev.User, ev.Channel) + } + default: + client.Debugf("unsupported Events API event received") + } + case socketmode.EventTypeInteractive: + callback, ok := evt.Data.(slack.InteractionCallback) + if !ok { + fmt.Printf("Ignored %+v\n", evt) + + continue + } + + fmt.Printf("Interaction received: %+v\n", callback) + + var payload interface{} + + switch callback.Type { + case slack.InteractionTypeBlockActions: + // See https://api.slack.com/apis/connections/socket-implement#button + + client.Debugf("button clicked!") + case slack.InteractionTypeShortcut: + case slack.InteractionTypeViewSubmission: + // See https://api.slack.com/apis/connections/socket-implement#modal + case slack.InteractionTypeDialogSubmission: + default: + + } + + client.Ack(*evt.Request, payload) + case socketmode.EventTypeSlashCommand: + cmd, ok := evt.Data.(slack.SlashCommand) + if !ok { + fmt.Printf("Ignored %+v\n", evt) + + continue + } + + client.Debugf("Slash command received: %+v", cmd) + + payload := map[string]interface{}{ + "blocks": []slack.Block{ + slack.NewSectionBlock( + &slack.TextBlockObject{ + Type: slack.MarkdownType, + Text: "foo", + }, + nil, + slack.NewAccessory( + slack.NewButtonBlockElement( + "", + "somevalue", + &slack.TextBlockObject{ + Type: slack.PlainTextType, + Text: "bar", + }, + ), + ), + ), + }} + + client.Ack(*evt.Request, payload) + default: + fmt.Fprintf(os.Stderr, "Unexpected event type received: %s\n", evt.Type) + } + } + }() + + client.Run() +} diff --git a/backoff.go b/internal/backoff/backoff.go similarity index 82% rename from backoff.go rename to internal/backoff/backoff.go index 2ba697e7e..df210f80d 100644 --- a/backoff.go +++ b/internal/backoff/backoff.go @@ -1,4 +1,4 @@ -package slack +package backoff import ( "math/rand" @@ -11,7 +11,7 @@ import ( // call to Duration() it is multiplied by Factor. It is capped at // Max. It returns to Min on every call to Reset(). Used in // conjunction with the time package. -type backoff struct { +type Backoff struct { attempts int // Initial value to scale out Initial time.Duration @@ -23,7 +23,7 @@ type backoff struct { // Returns the current value of the counter and then multiplies it // Factor -func (b *backoff) Duration() (dur time.Duration) { +func (b *Backoff) Duration() (dur time.Duration) { // Zero-values are nonsensical, so we use // them to apply defaults if b.Max == 0 { @@ -52,6 +52,11 @@ func (b *backoff) Duration() (dur time.Duration) { } //Resets the current value of the counter back to Min -func (b *backoff) Reset() { +func (b *Backoff) Reset() { b.attempts = 0 } + +// Attempts returns the number of attempts that we had done so far +func (b *Backoff) Attempts() int { + return b.attempts +} diff --git a/internal/misc/misc.go b/internal/misc/misc.go new file mode 100644 index 000000000..eab8cdd8c --- /dev/null +++ b/internal/misc/misc.go @@ -0,0 +1,28 @@ +package misc + +import ( + "fmt" + "net/http" +) + +// StatusCodeError represents an http response error. +// type httpStatusCode interface { HTTPStatusCode() int } to handle it. +type StatusCodeError struct { + Code int + Status string +} + +func (t StatusCodeError) Error() string { + return fmt.Sprintf("slack server error: %s", t.Status) +} + +func (t StatusCodeError) HTTPStatusCode() int { + return t.Code +} + +func (t StatusCodeError) Retryable() bool { + if t.Code >= 500 || t.Code == http.StatusTooManyRequests { + return true + } + return false +} diff --git a/misc.go b/misc.go index 336f0afb5..821bda869 100644 --- a/misc.go +++ b/misc.go @@ -18,6 +18,8 @@ import ( "strconv" "strings" "time" + + "github.com/slack-go/slack/internal/misc" ) // SlackResponse handles parsing out errors from the web api. @@ -42,28 +44,6 @@ func (t SlackResponse) Err() error { return errors.New(t.Error) } -// StatusCodeError represents an http response error. -// type httpStatusCode interface { HTTPStatusCode() int } to handle it. -type statusCodeError struct { - Code int - Status string -} - -func (t statusCodeError) Error() string { - return fmt.Sprintf("slack server error: %s", t.Status) -} - -func (t statusCodeError) HTTPStatusCode() int { - return t.Code -} - -func (t statusCodeError) Retryable() bool { - if t.Code >= 500 || t.Code == http.StatusTooManyRequests { - return true - } - return false -} - // RateLimitedError represents the rate limit respond from slack type RateLimitedError struct { RetryAfter time.Duration @@ -312,7 +292,7 @@ func checkStatusCode(resp *http.Response, d Debug) error { // Slack seems to send an HTML body along with 5xx error codes. Don't parse it. if resp.StatusCode != http.StatusOK { logResponse(resp, d) - return statusCodeError{Code: resp.StatusCode, Status: resp.Status} + return misc.StatusCodeError{Code: resp.StatusCode, Status: resp.Status} } return nil diff --git a/misc_test.go b/misc_test.go index 808572b23..4dd2a6378 100644 --- a/misc_test.go +++ b/misc_test.go @@ -8,6 +8,8 @@ import ( "sync" "testing" + "github.com/slack-go/slack/internal/misc" + "github.com/slack-go/slack/slackutilsx" ) @@ -92,8 +94,8 @@ func TestParseResponseInvalidToken(t *testing.T) { func TestRetryable(t *testing.T) { for _, e := range []error{ &RateLimitedError{}, - statusCodeError{Code: http.StatusInternalServerError}, - statusCodeError{Code: http.StatusTooManyRequests}, + misc.StatusCodeError{Code: http.StatusInternalServerError}, + misc.StatusCodeError{Code: http.StatusTooManyRequests}, } { r, ok := e.(slackutilsx.Retryable) if !ok { diff --git a/socket_mode.go b/socket_mode.go new file mode 100644 index 000000000..69e40d99d --- /dev/null +++ b/socket_mode.go @@ -0,0 +1,34 @@ +package slack + +import ( + "context" +) + +// SocketModeConnection contains various details about the SocketMode connection. +// It is returned by an "apps.connections.open" API call. +type SocketModeConnection struct { + URL string `json:"url,omitempty"` + Data map[string]interface{} `json:"-"` +} + +type openResponseFull struct { + SlackResponse + SocketModeConnection +} + +// StartSocketModeContext calls the "apps.connections.open" endpoint and returns the provided URL and the full Info block with a custom context. +// +// To have a fully managed Socket Mode connection, use `socketmode.New()`, and call `Run()` on it. +func (api *Client) StartSocketModeContext(ctx context.Context) (info *SocketModeConnection, websocketURL string, err error) { + response := &openResponseFull{} + err = postJSON(ctx, api.httpclient, api.endpoint+"apps.connections.open", api.appLevelToken, nil, response, api) + if err != nil { + return nil, "", err + } + + if response.Err() == nil { + api.Debugln("Using URL:", response.SocketModeConnection.URL) + } + + return &response.SocketModeConnection, response.SocketModeConnection.URL, response.Err() +} diff --git a/socketmode/client.go b/socketmode/client.go new file mode 100644 index 000000000..cce8a703b --- /dev/null +++ b/socketmode/client.go @@ -0,0 +1,63 @@ +package socketmode + +import ( + "encoding/json" + "time" + + "github.com/slack-go/slack" + + "github.com/gorilla/websocket" +) + +type ConnectedEvent struct { + ConnectionCount int // 1 = first time, 2 = second time + Info *slack.SocketModeConnection +} + +type DebugInfo struct { + // Host is the name of the host name on the Slack end, that can be something like `applink-7fc4fdbb64-4x5xq` + Host string `json:"host"` + + // `hello` type only + BuildNumber int `json:"build_number"` + ApproximateConnectionTime int `json:"approximate_connection_time"` +} + +type ConnectionInfo struct { + AppID string `json:"app_id"` +} + +type SocketModeMessagePayload struct { + Event json.RawMessage `json:"´event"` +} + +// Client is a Socket Mode client that allows programs to use [Events API](https://api.slack.com/events-api) +// and [interactive components](https://api.slack.com/interactivity) over WebSocket. +// Please see [Intro to Socket Mode](https://api.slack.com/apis/connections/socket) for more information +// on Socket Mode. +// +// The implementation is highly inspired by https://www.npmjs.com/package/@slack/socket-mode, +// but the structure and the design has been adapted as much as possible to that of our RTM client for consistency +// within the library. +// +// You can instantiate the socket mode client with +// Client's New() and call Run() to start it. Please see examples/socketmode for the usage. +type Client struct { + // Client is the main API, embedded + apiClient slack.Client + + // maxPingInterval is the maximum duration elapsed after the last WebSocket PING sent from Slack + // until Client considers the WebSocket connection is dead and needs to be reopened. + maxPingInterval time.Duration + + // Connection life-cycle + Events chan Event + socketModeResponses chan *Response + + // dialer is a gorilla/websocket Dialer. If nil, use the default + // Dialer. + dialer *websocket.Dialer + + debug bool + log ilogger +} diff --git a/socketmode/deadman.go b/socketmode/deadman.go new file mode 100644 index 000000000..7aeea760e --- /dev/null +++ b/socketmode/deadman.go @@ -0,0 +1,31 @@ +package socketmode + +import "time" + +type deadmanTimer struct { + timeout time.Duration + timer *time.Timer +} + +func newDeadmanTimer(timeout time.Duration) *deadmanTimer { + return &deadmanTimer{ + timeout: timeout, + timer: time.NewTimer(timeout), + } +} + +func (smc *deadmanTimer) Elapsed() <-chan time.Time { + return smc.timer.C +} + +func (smc *deadmanTimer) Reset() { + // Note that this is the correct way to Reset a non-expired timer + if !smc.timer.Stop() { + select { + case <-smc.timer.C: + default: + } + } + + smc.timer.Reset(smc.timeout) +} diff --git a/socketmode/event.go b/socketmode/event.go new file mode 100644 index 000000000..5ae434a70 --- /dev/null +++ b/socketmode/event.go @@ -0,0 +1,30 @@ +package socketmode + +import "encoding/json" + +// Event is the event sent to the consumer of Client +type Event struct { + Type EventType + Data interface{} + + // Request is the json-decoded raw WebSocket message that is received via the Slack Socket Mode + // WebSocket connection. + Request *Request +} + +type ErrorBadMessage struct { + Cause error + Message json.RawMessage +} + +type ErrorWriteFailed struct { + Cause error + Response *Response +} + +type errorRequestedDisconnect struct { +} + +func (e errorRequestedDisconnect) Error() string { + return "disconnection requested: Slack requested us to disconnect" +} diff --git a/socketmode/log.go b/socketmode/log.go new file mode 100644 index 000000000..9f3b7f690 --- /dev/null +++ b/socketmode/log.go @@ -0,0 +1,51 @@ +package socketmode + +import "fmt" + +// TODO merge logger, ilogger, and internalLogger with the top-level package's equivalents + +// logger is a logger interface compatible with both stdlib and some +// 3rd party loggers. +type logger interface { + Output(int, string) error +} + +// ilogger represents the internal logging api we use. +type ilogger interface { + logger + Print(...interface{}) + Printf(string, ...interface{}) + Println(...interface{}) +} + +// internalLog implements the additional methods used by our internal logging. +type internalLog struct { + logger +} + +// Println replicates the behaviour of the standard logger. +func (t internalLog) Println(v ...interface{}) { + t.Output(2, fmt.Sprintln(v...)) +} + +// Printf replicates the behaviour of the standard logger. +func (t internalLog) Printf(format string, v ...interface{}) { + t.Output(2, fmt.Sprintf(format, v...)) +} + +// Print replicates the behaviour of the standard logger. +func (t internalLog) Print(v ...interface{}) { + t.Output(2, fmt.Sprint(v...)) +} + +func (smc *Client) Debugf(format string, v ...interface{}) { + if smc.debug { + smc.log.Output(2, fmt.Sprintf(format, v...)) + } +} + +func (smc *Client) Debugln(v ...interface{}) { + if smc.debug { + smc.log.Output(2, fmt.Sprintln(v...)) + } +} diff --git a/socketmode/request.go b/socketmode/request.go new file mode 100644 index 000000000..078003a0a --- /dev/null +++ b/socketmode/request.go @@ -0,0 +1,38 @@ +package socketmode + +import "encoding/json" + +// Request maps to the content of each WebSocket message received via a Socket Mode WebSocket connection +// +// We call this a "request" rather than e.g. a WebSocket message or an Socket Mode "event" following python-slack-sdk: +// +// https://github.com/slackapi/python-slack-sdk/blob/3f1c4c6e27bf7ee8af57699b2543e6eb7848bcf9/slack_sdk/socket_mode/request.py#L6 +// +// We know that node-slack-sdk calls it an "event", that makes it hard for us to distinguish our client's own event +// that wraps both internal events and Socket Mode "events", vs node-slack-sdk's is for the latter only. +// +// https://github.com/slackapi/node-slack-sdk/blob/main/packages/socket-mode/src/SocketModeClient.ts#L537 +type Request struct { + Type string `json:"type"` + + // `hello` type only + NumConnections int `json:"num_connections"` + ConnectionInfo ConnectionInfo `json:"connection_info"` + + // `disconnect` type only + + // Reason can be "warning" or else + Reason string `json:"reason"` + + // `hello` and `disconnect` types only + DebugInfo DebugInfo `json:"debug_info"` + + // `events_api` type only + EnvelopeID string `json:"envelope_id"` + // TODO Can it really be a non-object type? + // See https://github.com/slackapi/python-slack-sdk/blob/3f1c4c6e27bf7ee8af57699b2543e6eb7848bcf9/slack_sdk/socket_mode/request.py#L26-L31 + Payload json.RawMessage `json:"payload"` + AcceptsResponsePayload bool `json:"accepts_response_payload"` + RetryAttempt int `json:"retry_attempt"` + RetryReason string `json:"retry_reason"` +} diff --git a/socketmode/response.go b/socketmode/response.go new file mode 100644 index 000000000..5c7bfabcf --- /dev/null +++ b/socketmode/response.go @@ -0,0 +1,6 @@ +package socketmode + +type Response struct { + EnvelopeID string `json:"envelope_id"` + Payload interface{} `json:"payload,omitempty"` +} diff --git a/socketmode/socket_mode_managed_conn.go b/socketmode/socket_mode_managed_conn.go new file mode 100644 index 000000000..4cd649c1c --- /dev/null +++ b/socketmode/socket_mode_managed_conn.go @@ -0,0 +1,550 @@ +package socketmode + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "sync" + "time" + + "github.com/slack-go/slack" + "github.com/slack-go/slack/internal/backoff" + "github.com/slack-go/slack/internal/misc" + "github.com/slack-go/slack/slackevents" + + "github.com/gorilla/websocket" + "github.com/slack-go/slack/internal/timex" +) + +// Run is a blocking function that connects the Slack Socket Mode API and handles all incoming +// requests and outgoing responses. +// +// The consumer of the Client and this function should read the Client.Events channel to receive +// `socketmode.Event`s that includes the client-specific events that may or may not wrap Socket Mode requests. +// +// Note that this function automatically reconnect on requested by Slack through a `disconnect` message. +// This function exists with an error only when a reconnection is failued due to some reason. +// If you want to retry even on reconnection failure, you'd need to write your own wrapper for this function +// to do so. +func (smc *Client) Run() error { + ctx := context.TODO() + + for connectionCount := 0; ; connectionCount++ { + if err := smc.run(ctx, connectionCount); err != nil { + return err + } + + // Continue and run the loop again to reconnect + } +} + +func (smc *Client) run(ctx context.Context, connectionCount int) error { + messages := make(chan json.RawMessage) + + deadmanTimer := newDeadmanTimer(smc.maxPingInterval) + + pingHandler := func(_ string) error { + deadmanTimer.Reset() + + return nil + } + + // Start trying to connect + // the returned err is already passed onto the Events channel + // + // We also configures an additional ping handler for the deadmanTimer that triggers a timeout when + // Slack did not send us WebSocket PING for more than Client.maxPingInterval. + // We can use `<-smc.pingTimeout.C` to wait for the timeout. + info, conn, err := smc.connect(ctx, connectionCount, pingHandler) + if err != nil { + // when the connection is unsuccessful its fatal, and we need to bail out. + smc.Debugf("Failed to connect with Socket Mode on try %d: %s", connectionCount, err) + + return err + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + smc.Events <- newEvent(EventTypeConnected, &ConnectedEvent{ + ConnectionCount: connectionCount, + Info: info, + }) + + smc.Debugf("WebSocket connection succeeded on try %d", connectionCount) + + // We're now connected so we can set up listeners + + var ( + wg sync.WaitGroup + firstErr error + firstErrOnce sync.Once + ) + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + // The response sender sends Socket Mode responses over the WebSocket conn + if err := smc.runResponseSender(ctx, conn); err != nil { + firstErrOnce.Do(func() { + firstErr = err + }) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + // The handler reads Socket Mode requests, and enqueues responses for sending by the response sender + if err := smc.runRequestHandler(ctx, messages); err != nil { + firstErrOnce.Do(func() { + firstErr = err + }) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + // The receiver reads WebSocket messages, and enqueues parsed Socket Mode requests to be handled by + // the request handler + if err := smc.runMessageReceiver(ctx, conn, messages); err != nil { + firstErrOnce.Do(func() { + firstErr = err + }) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + select { + case <-ctx.Done(): + // Detect when the connection is dead. + case <-deadmanTimer.Elapsed(): + firstErrOnce.Do(func() { + firstErr = errors.New("ping timeout: Slack did not send us WebSocket PING for more than Client.maxInterval") + }) + + cancel() + } + }() + + wg.Wait() + + // wg.Wait() finishes only after any of the above go routines finishes. + // Also, we can expect firstErr to be not nil, as goroutines can finish only on error. + smc.Debugf("Reconnecting due to %v", firstErr) + + if err = conn.Close(); err != nil { + smc.Debugf("Failed to close connection: %v", err) + } + + return nil +} + +// connect attempts to connect to the slack websocket API. It handles any +// errors that occur while connecting and will return once a connection +// has been successfully opened. +func (smc *Client) connect(ctx context.Context, connectionCount int, additionalPingHandler func(string) error) (*slack.SocketModeConnection, *websocket.Conn, error) { + const ( + errInvalidAuth = "invalid_auth" + errInactiveAccount = "account_inactive" + errMissingAuthToken = "not_authed" + ) + + // used to provide exponential backoff wait time with jitter before trying + // to connect to slack again + boff := &backoff.Backoff{ + Max: 5 * time.Minute, + } + + for { + var ( + backoff time.Duration + ) + + // send connecting event + smc.Events <- newEvent(EventTypeConnecting, &slack.ConnectingEvent{ + Attempt: boff.Attempts() + 1, + ConnectionCount: connectionCount, + }) + + // attempt to start the connection + info, conn, err := smc.openAndDial(additionalPingHandler) + if err == nil { + return info, conn, nil + } + + // check for fatal errors + switch err.Error() { + case errInvalidAuth, errInactiveAccount, errMissingAuthToken: + smc.Debugf("invalid auth when connecting with SocketMode: %s", err) + return nil, nil, err + default: + } + + switch actual := err.(type) { + case misc.StatusCodeError: + if actual.Code == http.StatusNotFound { + smc.Debugf("invalid auth when connecting with Socket Mode: %s", err) + smc.Events <- newEvent(EventTypeInvalidAuth, &slack.InvalidAuthEvent{}) + return nil, nil, err + } + case *slack.RateLimitedError: + backoff = actual.RetryAfter + default: + } + + backoff = timex.Max(backoff, boff.Duration()) + // any other errors are treated as recoverable and we try again after + // sending the event along the Events channel + smc.Events <- newEvent(EventTypeConnectionError, &slack.ConnectionErrorEvent{ + Attempt: boff.Attempts(), + Backoff: backoff, + ErrorObj: err, + }) + + // get time we should wait before attempting to connect again + smc.Debugf("reconnection %d failed: %s reconnecting in %v\n", boff.Attempts(), err, backoff) + + // wait for one of the following to occur, + // backoff duration has elapsed, disconnectCh is signalled, or + // the smc finishes disconnecting. + select { + case <-time.After(backoff): // retry after the backoff. + case <-ctx.Done(): + return nil, nil, ctx.Err() + } + } +} + +// openAndDial attempts to open a Socket Mode connection and dial to the connection endpoint using WebSocket. +// It returns the full information returned by the "apps.connections.open" method on the +// Slack API. +func (smc *Client) openAndDial(additionalPingHandler func(string) error) (info *slack.SocketModeConnection, _ *websocket.Conn, err error) { + var ( + url string + ) + + smc.Debugf("Starting SocketMode") + info, url, err = smc.Open() + + if err != nil { + smc.Debugf("Failed to start or connect with SocketMode: %s", err) + return nil, nil, err + } + + smc.Debugf("Dialing to websocket on url %s", url) + // Only use HTTPS for connections to prevent MITM attacks on the connection. + upgradeHeader := http.Header{} + upgradeHeader.Add("Origin", "https://api.slack.com") + dialer := websocket.DefaultDialer + if smc.dialer != nil { + dialer = smc.dialer + } + conn, _, err := dialer.Dial(url, upgradeHeader) + if err != nil { + smc.Debugf("Failed to dial to the websocket: %s", err) + return nil, nil, err + } + + conn.SetPingHandler(func(appData string) error { + if additionalPingHandler != nil { + if err := additionalPingHandler(appData); err != nil { + return err + } + } + + smc.handlePing(conn, appData) + + return nil + }) + + // We don't need to conn.SetCloseHandler because the default handler is effective enough that + // it sends back the CLOSE message to the server and let conn.ReadJSON() fail with CloseError. + // The CloseError must be handled normally in our receiveMessagesInto function. + //conn.SetCloseHandler(func(code int, text string) error { + // ... + // }) + + return info, conn, err +} + +// runResponseSender runs the handler that reads Socket Mode responses enqueued onto Client.socketModeResponses channel +// and sends them one by one over the WebSocket connection. +// Gorilla WebSocket is not goroutine safe hence this needs to be the single place you write to the WebSocket connection. +func (smc *Client) runResponseSender(ctx context.Context, conn *websocket.Conn) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + // 3. listen for messages that need to be sent + case res := <-smc.socketModeResponses: + smc.Debugf("Sending Socket Mode response with envelope ID %q: %v", res.EnvelopeID, res) + + if err := unsafeWriteSocketModeResponse(conn, res); err != nil { + smc.Events <- newEvent(EventTypeErrorWriteFailed, &ErrorWriteFailed{ + Cause: err, + Response: res, + }) + } + + smc.Debugf("Finished sending Socket Mode response with envelope ID %q", res.EnvelopeID) + } + } +} + +// runRequestHandler is a blocking function that runs the Socket Mode request receiver. +// +// It reads WebSocket messages sent from Slack's Socket Mode WebSocket connection, +// parses them as Socket Mode requests, and processes them and optionally emit our own events into Client.Events channel. +func (smc *Client) runRequestHandler(ctx context.Context, websocket chan json.RawMessage) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case message := <-websocket: + smc.Debugf("Received WebSocket message: %s", message) + + // listen for incoming messages that need to be parsed + evt, err := smc.parseEvent(message) + if err != nil { + smc.Events <- newEvent(EventTypeErrorBadMessage, &ErrorBadMessage{ + Cause: err, + Message: message, + }) + } else if evt != nil { + if evt.Type == EventTypeDisconnect { + // We treat the `disconnect` request from Slack as an error internally, + // so that we can tell the consumer of this function to reopen the connection on it. + return errorRequestedDisconnect{} + } + + smc.Events <- *evt + } + } + } +} + +// runMessageReceiver monitors the Socket Mode opened WebSocket connection for any incoming +// messages. It pushes the raw events into the channel. +// The receiver runs until the context is closed. +func (smc *Client) runMessageReceiver(ctx context.Context, conn *websocket.Conn, sink chan json.RawMessage) error { + for { + if err := smc.receiveMessagesInto(ctx, conn, sink); err != nil { + return err + } + } +} + +// unsafeWriteSocketModeResponse sends a WebSocket message back to Slack. +// WARNING: Call to this function must be serialized! +// +// Here's why - Gorilla WebSocket's Writes functions are not concurrency-safe. +// That is, we must serialize all the writes to it with e.g. a goroutine or mutex. +// We intentionally chose to use goroutine, which makes it harder to propagate write errors to the caller, +// but is more computationally efficient. +// +// See the below for more information on this topic: +// https://stackoverflow.com/questions/43225340/how-to-ensure-concurrency-in-golang-gorilla-websocket-package +func unsafeWriteSocketModeResponse(conn *websocket.Conn, res *Response) error { + // set a write deadline on the connection + if err := conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil { + return err + } + + // Remove write deadline regardless of WriteJSON succeeds or not + defer conn.SetWriteDeadline(time.Time{}) + + if err := conn.WriteJSON(res); err != nil { + return err + } + + return nil +} + +func newEvent(tpe EventType, data interface{}, req ...*Request) Event { + evt := Event{Type: tpe, Data: data} + + if len(req) > 0 { + evt.Request = req[0] + } + + return evt +} + +// Ack acknowledges the Socket Mode request with the payload. +// +// This tells Slack that the we have received the request denoted by the envelope ID, +// by sending back the envelope ID over the WebSocket connection. +func (smc *Client) Ack(req Request, payload ...interface{}) { + res := Response{ + EnvelopeID: req.EnvelopeID, + } + + if len(payload) > 0 { + res.Payload = payload[0] + } + + smc.Send(res) +} + +// Send sends the Socket Mode response over a WebSocket connection. +// This is usually used for acknowledging requests, but if you need more control over Client.Ack(). +// It's normally recommended to use Client.Ack() instead of this. +func (smc *Client) Send(res Response) { + js, err := json.Marshal(res) + if err != nil { + panic(err) + } + + smc.Debugf("Scheduling Socket Mode response for envelope ID %s: %s", res.EnvelopeID, js) + + smc.socketModeResponses <- &res +} + +// receiveMessagesInto attempts to receive an event from the WebSocket connection for Socket Mode. +// This will block until a frame is available from the WebSocket. +// If the read from the WebSocket results in a fatal error, this function will return non-nil. +func (smc *Client) receiveMessagesInto(ctx context.Context, conn *websocket.Conn, sink chan json.RawMessage) error { + smc.Debugf("Starting to receive message") + defer smc.Debugf("Finished to receive message") + + event := json.RawMessage{} + err := conn.ReadJSON(&event) + + // check if the connection was closed. + if websocket.IsUnexpectedCloseError(err) { + return err + } + + switch { + case err == io.ErrUnexpectedEOF: + // EOF's don't seem to signify a failed connection so instead we ignore + // them here and detect a failed connection upon attempting to send a + // 'PING' message + + // Unlike RTM, we don't ping from the our end as there seem to have no client ping. + // We just continue to the next loop so that we `smc.disconnected` should be received if + // this EOF error was actually due to disconnection. + + return nil + case err != nil: + // All other errors from ReadJSON come from NextReader, and should + // kill the read loop and force a reconnect. + smc.Events <- newEvent(EventTypeIncomingError, &slack.IncomingEventError{ + ErrorObj: err, + }) + + return err + case len(event) == 0: + smc.Debugln("Received empty event") + default: + if smc.debug { + buf := &bytes.Buffer{} + d := json.NewEncoder(buf) + d.SetIndent("", " ") + if err := d.Encode(event); err != nil { + smc.Debugln("Failed encoding decoded json:", err) + } + reencoded := buf.String() + + smc.Debugln("Incoming WebSocket message:", reencoded) + } + + select { + case sink <- event: + case <-ctx.Done(): + smc.Debugln("cancelled while attempting to send raw event") + + return ctx.Err() + } + } + + return nil +} + +// parseEvent takes a raw JSON message received from the slack websocket +// and handles the encoded event. +// returns the our own event that wraps the socket mode request. +func (smc *Client) parseEvent(wsMsg json.RawMessage) (*Event, error) { + req := &Request{} + err := json.Unmarshal(wsMsg, req) + if err != nil { + return nil, fmt.Errorf("unmarshalling WebSocket message: %v", err) + } + + var evt Event + + // See below two links for all the available message types. + // - https://github.com/slackapi/node-slack-sdk/blob/c3f4d7109062a0356fb765d53794b7b5f6b3b5ae/packages/socket-mode/src/SocketModeClient.ts#L533 + // - https://api.slack.com/apis/connections/socket-implement + switch req.Type { + case RequestTypeHello: + evt = newEvent(EventTypeHello, nil, req) + case RequestTypeEventsAPI: + payloadEvent := req.Payload + + eventsAPIEvent, err := slackevents.ParseEvent(payloadEvent, slackevents.OptionNoVerifyToken()) + if err != nil { + return nil, fmt.Errorf("parsing Events API event: %v", err) + } + + evt = newEvent(EventTypeEventsAPI, eventsAPIEvent, req) + case RequestTypeDisconnect: + // See https://api.slack.com/apis/connections/socket-implement#disconnect + + evt = newEvent(EventTypeDisconnect, nil, req) + case RequestTypeSlashCommands: + // See https://api.slack.com/apis/connections/socket-implement#command + var cmd slack.SlashCommand + + if err := json.Unmarshal(req.Payload, &cmd); err != nil { + return nil, fmt.Errorf("parsing slash command: %v", err) + } + + evt = newEvent(EventTypeSlashCommand, cmd, req) + case RequestTypeInteractive: + // See belows: + // - https://api.slack.com/apis/connections/socket-implement#button + // - https://api.slack.com/apis/connections/socket-implement#home + // - https://api.slack.com/apis/connections/socket-implement#modal + // - https://api.slack.com/apis/connections/socket-implement#menu + + var callback slack.InteractionCallback + + if err := json.Unmarshal(req.Payload, &callback); err != nil { + return nil, fmt.Errorf("parsing interaction callback: %v", err) + } + + evt = newEvent(EventTypeInteractive, callback, req) + default: + return nil, fmt.Errorf("processing WebSocket message: encountered unsupported type %q", req.Type) + } + + return &evt, nil +} + +// handlePing handles an incoming 'PONG' message which should be in response to +// a previously sent 'PING' message. This is then used to compute the +// connection's latency. +func (smc *Client) handlePing(conn *websocket.Conn, event string) { + smc.Debugf("WebSocket ping message received: %s", event) + + // In WebSocket, we need to respond a PING from the server with a PONG with the same payload as the PING. + if err := conn.WriteControl(websocket.PongMessage, []byte(event), time.Now().Add(10*time.Second)); err != nil { + smc.Debugf("Failed writing WebSocket PONG message: %v", err) + } +} diff --git a/socketmode/socketmode.go b/socketmode/socketmode.go new file mode 100644 index 000000000..635e818e3 --- /dev/null +++ b/socketmode/socketmode.go @@ -0,0 +1,114 @@ +package socketmode + +import ( + "context" + "log" + "os" + "time" + + "github.com/slack-go/slack" + + "github.com/gorilla/websocket" +) + +// EventType is the type of events that are emitted by scoketmode.Client. +// You receive and handle those events from a socketmode.Client.Events channel. +// Those event types does not necessarily match 1:1 to those of Slack Events API events. +type EventType string + +const ( + // The following request types are the types of requests sent from Slack via Socket Mode WebSocket connection + // and handled internally by the socketmode.Client. + // The consumer of socketmode.Client will never see it. + + RequestTypeHello = "hello" + RequestTypeEventsAPI = "events_api" + RequestTypeDisconnect = "disconnect" + RequestTypeSlashCommands = "slash_commands" + RequestTypeInteractive = "interactive" + + // The following event types are for events emitted by socketmode.Client itself and + // does not originate from Slack. + EventTypeConnecting = EventType("connecting") + EventTypeInvalidAuth = EventType("invalid_auth") + EventTypeConnectionError = EventType("connection_error") + EventTypeConnected = EventType("connected") + EventTypeIncomingError = EventType("incoming_error") + EventTypeErrorWriteFailed = EventType("write_error") + EventTypeErrorBadMessage = EventType("error_bad_message") + + // + // The following event types are guaranteed to not change unless Slack changes + // + + EventTypeHello = EventType("hello") + EventTypeDisconnect = EventType("disconnect") + EventTypeEventsAPI = EventType("events_api") + EventTypeInteractive = EventType("interactive") + EventTypeSlashCommand = EventType("slash_commands") + + websocketDefaultTimeout = 10 * time.Second + defaultMaxPingInterval = 30 * time.Second +) + +// Open calls the "apps.connections.open" endpoint and returns the provided URL and the full Info block. +// +// To have a fully managed Websocket connection, use `New`, and call `Run()` on it. +func (smc *Client) Open() (info *slack.SocketModeConnection, websocketURL string, err error) { + ctx, cancel := context.WithTimeout(context.Background(), websocketDefaultTimeout) + defer cancel() + + return smc.apiClient.StartSocketModeContext(ctx) +} + +// Option options for the managed Client. +type Option func(client *Client) + +// OptionDialer takes a gorilla websocket Dialer and uses it as the +// Dialer when opening the websocket for the Socket Mode connection. +func OptionDialer(d *websocket.Dialer) Option { + return func(smc *Client) { + smc.dialer = d + } +} + +// OptionPingInterval determines how often we expect Slack to deliver WebSocket ping to us. +// If no ping is delivered to us within this interval after the last ping, we assumes the WebSocket connection +// is dead and needs to be reconnected. +func OptionPingInterval(d time.Duration) Option { + return func(smc *Client) { + smc.maxPingInterval = d + } +} + +// OptionDebug enable debugging for the client +func OptionDebug(b bool) func(*Client) { + return func(c *Client) { + c.debug = b + } +} + +// OptionLog set logging for client. +func OptionLog(l logger) func(*Client) { + return func(c *Client) { + c.log = internalLog{logger: l} + } +} + +// New returns a Socket Mode client which provides a fully managed connection to +// Slack's Websocket-based Socket Mode. +func New(api *slack.Client, options ...Option) *Client { + result := &Client{ + apiClient: *api, + Events: make(chan Event, 50), + socketModeResponses: make(chan *Response, 20), + maxPingInterval: defaultMaxPingInterval, + log: log.New(os.Stderr, "slack-go/slack/socketmode", log.LstdFlags|log.Lshortfile), + } + + for _, opt := range options { + opt(result) + } + + return result +} diff --git a/socketmode/socketmode_test.go b/socketmode/socketmode_test.go new file mode 100644 index 000000000..b9ce77690 --- /dev/null +++ b/socketmode/socketmode_test.go @@ -0,0 +1,293 @@ +package socketmode + +import ( + "bytes" + "encoding/json" + "reflect" + "testing" + + "github.com/slack-go/slack/slackevents" + + "github.com/pkg/errors" +) + +const ( + EventDisconnect = `{ + "type": "disconnect", + "reason": "warning", + "debug_info": { + "host": "applink-7fc4fdbb64-4x5xq" + } +} +` + EventHello = `{ + "type": "hello", + "num_connections": 4, + "debug_info": { + "host": "applink-7fc4fdbb64-4x5xq", + "build_number": 10, + "approximate_connection_time": 18060 + }, + "connection_info": { + "app_id": "A01K58AR4RF" + } +} +` + + EventAppMention = `{ + "envelope_id": "c67a03d0-4094-4744-90ca-d286e00a3ab1", + "payload": { + "token": "redacted", + "team_id": "redacted", + "api_app_id": "redacted", + "event": { + "client_msg_id": "c714568f-67df-42d7-a343-0a8e4d9c6030", + "type": "app_mention", + "text": "\u003c@U01JKSB8T7Y\u003e test", + "user": "redacted", + "ts": "1610927831.000200", + "team": "redacted", + "blocks": [ + { + "type": "rich_text", + "block_id": "2Le", + "elements": [ + { + "type": "rich_text_section", + "elements": [ + { + "type": "user", + "user_id": "redacted" + }, + { + "type": "text", + "text": " test39" + } + ] + } + ] + } + ], + "channel": "redacted", + "event_ts": "1610927831.000200" + }, + "type": "event_callback", + "event_id": "Ev01JZ2T7S3U", + "event_time": 1610927831, + "authorizations": [ + { + "enterprise_id": null, + "team_id": "redacted", + "user_id": "redacted", + "is_bot": true, + "is_enterprise_install": false + } + ], + "is_ext_shared_channel": false, + "event_context": "1-app_mention-redacted-redacted" + }, + "type": "events_api", + "accepts_response_payload": false, + "retry_attempt": 0, + "retry_reason": "" +}` +) + +func TestEventParsing(t *testing.T) { + testParsing(t, + EventHello, + &Event{ + Type: EventTypeHello, + Request: &Request{ + Type: RequestTypeHello, + NumConnections: 4, + DebugInfo: DebugInfo{ + Host: "applink-7fc4fdbb64-4x5xq", + BuildNumber: 10, + ApproximateConnectionTime: 18060, + }, + ConnectionInfo: ConnectionInfo{ + AppID: "A01K58AR4RF", + }, + }, + }) + + testParsing(t, + EventDisconnect, + &Event{ + Type: EventTypeDisconnect, + Request: &Request{ + Type: RequestTypeDisconnect, + Reason: "warning", + DebugInfo: DebugInfo{ + Host: "applink-7fc4fdbb64-4x5xq", + }, + }, + }) + + rawAppMention := json.RawMessage(`{ + "client_msg_id": "c714568f-67df-42d7-a343-0a8e4d9c6030", + "type": "app_mention", + "text": "\u003c@U01JKSB8T7Y\u003e test", + "user": "redacted", + "ts": "1610927831.000200", + "team": "redacted", + "blocks": [ + { + "type": "rich_text", + "block_id": "2Le", + "elements": [ + { + "type": "rich_text_section", + "elements": [ + { + "type": "user", + "user_id": "redacted" + }, + { + "type": "text", + "text": " test39" + } + ] + } + ] + } + ], + "channel": "redacted", + "event_ts": "1610927831.000200" + }`) + + rawAppMentionReqPayload := json.RawMessage(`{ + "token": "redacted", + "team_id": "redacted", + "api_app_id": "redacted", + "event": { + "client_msg_id": "c714568f-67df-42d7-a343-0a8e4d9c6030", + "type": "app_mention", + "text": "\u003c@U01JKSB8T7Y\u003e test", + "user": "redacted", + "ts": "1610927831.000200", + "team": "redacted", + "blocks": [ + { + "type": "rich_text", + "block_id": "2Le", + "elements": [ + { + "type": "rich_text_section", + "elements": [ + { + "type": "user", + "user_id": "redacted" + }, + { + "type": "text", + "text": " test39" + } + ] + } + ] + } + ], + "channel": "redacted", + "event_ts": "1610927831.000200" + }, + "type": "event_callback", + "event_id": "Ev01JZ2T7S3U", + "event_time": 1610927831, + "authorizations": [ + { + "enterprise_id": null, + "team_id": "redacted", + "user_id": "redacted", + "is_bot": true, + "is_enterprise_install": false + } + ], + "is_ext_shared_channel": false, + "event_context": "1-app_mention-redacted-redacted" + }`) + testParsing(t, + EventAppMention, + &Event{ + Type: EventTypeEventsAPI, + Data: slackevents.EventsAPIEvent{ + Token: "redacted", + TeamID: "redacted", + Type: "event_callback", + APIAppID: "redacted", + Data: &slackevents.EventsAPICallbackEvent{ + Type: "event_callback", + Token: "redacted", + TeamID: "redacted", + APIAppID: "redacted", + InnerEvent: &rawAppMention, + AuthedUsers: nil, + AuthedTeams: nil, + EventID: "Ev01JZ2T7S3U", + EventTime: 1610927831, + EventContext: "1-app_mention-redacted-redacted", + }, + InnerEvent: slackevents.EventsAPIInnerEvent{ + Type: slackevents.AppMention, + Data: &slackevents.AppMentionEvent{ + Type: slackevents.AppMention, + User: "redacted", + Text: "<@U01JKSB8T7Y> test", + TimeStamp: "1610927831.000200", + ThreadTimeStamp: "", + Channel: "redacted", + EventTimeStamp: json.Number("1610927831.000200"), + }, + }, + }, + Request: &Request{ + Type: RequestTypeEventsAPI, + EnvelopeID: "c67a03d0-4094-4744-90ca-d286e00a3ab1", + Payload: rawAppMentionReqPayload, + AcceptsResponsePayload: false, + RetryAttempt: 0, + RetryReason: "", + }, + }) +} + +func testParsing(t *testing.T, raw string, want interface{}) { + t.Helper() + + got, err := parse(raw) + if err != nil { + t.Fatalf("unexpected error parsing event %q: %v", raw, err) + } + + if !reflect.DeepEqual(want, got) { + t.Fatalf("unexpected parse result: want %s, got %s", dump(t, want), dump(t, got)) + } +} + +func dump(t *testing.T, data interface{}) string { + t.Helper() + + var buf bytes.Buffer + + e := json.NewEncoder(&buf) + e.SetIndent("", " ") + + if err := e.Encode(data); err != nil { + t.Fatalf("encoding data to json: %v", err) + } + + return buf.String() +} + +func parse(raw string) (*Event, error) { + c := &Client{} + + evt, err := c.parseEvent(json.RawMessage([]byte(raw))) + + if evt == nil { + return nil, errors.New("failed handling raw event: event was empty") + } + + return evt, err +} diff --git a/websocket_managed_conn.go b/websocket_managed_conn.go index 8607b3a3d..fe6802e49 100644 --- a/websocket_managed_conn.go +++ b/websocket_managed_conn.go @@ -9,6 +9,9 @@ import ( "reflect" "time" + "github.com/slack-go/slack/internal/backoff" + "github.com/slack-go/slack/internal/misc" + "github.com/gorilla/websocket" "github.com/slack-go/slack/internal/errorsx" "github.com/slack-go/slack/internal/timex" @@ -92,7 +95,7 @@ func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocke // used to provide exponential backoff wait time with jitter before trying // to connect to slack again - boff := &backoff{ + boff := &backoff.Backoff{ Max: 5 * time.Minute, } @@ -103,7 +106,7 @@ func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocke // send connecting event rtm.IncomingEvents <- RTMEvent{"connecting", &ConnectingEvent{ - Attempt: boff.attempts + 1, + Attempt: boff.Attempts() + 1, ConnectionCount: connectionCount, }} @@ -123,7 +126,7 @@ func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocke } switch actual := err.(type) { - case statusCodeError: + case misc.StatusCodeError: if actual.Code == http.StatusNotFound { rtm.Debugf("invalid auth when connecting with RTM: %s", err) rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}} @@ -138,13 +141,13 @@ func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocke // any other errors are treated as recoverable and we try again after // sending the event along the IncomingEvents channel rtm.IncomingEvents <- RTMEvent{"connection_error", &ConnectionErrorEvent{ - Attempt: boff.attempts, + Attempt: boff.Attempts(), Backoff: backoff, ErrorObj: err, }} // get time we should wait before attempting to connect again - rtm.Debugf("reconnection %d failed: %s reconnecting in %v\n", boff.attempts, err, backoff) + rtm.Debugf("reconnection %d failed: %s reconnecting in %v\n", boff.Attempts(), err, backoff) // wait for one of the following to occur, // backoff duration has elapsed, killChannel is signalled, or