Skip to content

Commit

Permalink
More closing of redis connections and logging of errors
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Nov 11, 2024
1 parent cbd4a77 commit b35a305
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 31 deletions.
63 changes: 37 additions & 26 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,48 +380,59 @@ func (b *backend) NewIncomingMsg(channel courier.Channel, urn urns.URN, text str

// PopNextOutgoingMsg pops the next message that needs to be sent
func (b *backend) PopNextOutgoingMsg(ctx context.Context) (courier.MsgOut, error) {
// pop the next message off our queue
rc := b.rp.Get()
defer rc.Close()
tryToPop := func() (queue.WorkerToken, string, error) {
rc := b.rp.Get()
defer rc.Close()
return queue.PopFromQueue(rc, msgQueueName)
}

token, msgJSON, err := queue.PopFromQueue(rc, msgQueueName)
markComplete := func(token queue.WorkerToken) {
rc := b.rp.Get()
defer rc.Close()
if err := queue.MarkComplete(rc, msgQueueName, token); err != nil {
slog.Error("error marking queue task complete", "error", err)
}
}

// pop the next message off our queue
token, msgJSON, err := tryToPop()
if err != nil {
return nil, err
}

for token == queue.Retry {
token, msgJSON, err = queue.PopFromQueue(rc, msgQueueName)
token, msgJSON, err = tryToPop()
if err != nil {
return nil, err
}
}

if msgJSON != "" {
dbMsg := &Msg{}
err = json.Unmarshal([]byte(msgJSON), dbMsg)
if err != nil {
queue.MarkComplete(rc, msgQueueName, token)
return nil, fmt.Errorf("unable to unmarshal message: %s: %w", string(msgJSON), err)
}
if msgJSON == "" {
return nil, nil
}

// populate the channel on our db msg
channel, err := b.GetChannel(ctx, courier.AnyChannelType, dbMsg.ChannelUUID_)
if err != nil {
queue.MarkComplete(rc, msgQueueName, token)
return nil, err
}
dbMsg := &Msg{}
err = json.Unmarshal([]byte(msgJSON), dbMsg)
if err != nil {
markComplete(token)
return nil, fmt.Errorf("unable to unmarshal message: %s: %w", string(msgJSON), err)
}

dbMsg.Direction_ = MsgOutgoing
dbMsg.channel = channel.(*Channel)
dbMsg.workerToken = token
// populate the channel on our db msg
channel, err := b.GetChannel(ctx, courier.AnyChannelType, dbMsg.ChannelUUID_)
if err != nil {
markComplete(token)
return nil, err
}

// clear out our seen incoming messages
b.clearMsgSeen(rc, dbMsg)
dbMsg.Direction_ = MsgOutgoing
dbMsg.channel = channel.(*Channel)
dbMsg.workerToken = token

return dbMsg, nil
}
// clear out our seen incoming messages
b.clearMsgSeen(dbMsg)

return nil, nil
return dbMsg, nil
}

// WasMsgSent returns whether the passed in message has already been sent
Expand Down
18 changes: 13 additions & 5 deletions backends/rapidpro/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"time"

"github.com/buger/jsonparser"
"github.com/gomodule/redigo/redis"
filetype "github.com/h2non/filetype"
"github.com/lib/pq"
"github.com/nyaruka/courier"
Expand Down Expand Up @@ -363,17 +362,26 @@ func (b *backend) recordMsgReceived(msg *Msg) {
if msg.ExternalID_ != "" {
fingerprint := fmt.Sprintf("%s|%s|%s", msg.Channel().UUID(), msg.URN().Identity(), msg.ExternalID())

b.receivedExternalIDs.Set(rc, fingerprint, string(msg.UUID()))
if err := b.receivedExternalIDs.Set(rc, fingerprint, string(msg.UUID())); err != nil {
slog.Error("error recording received external id", "msg", msg.UUID(), "error", err)
}
} else {
fingerprint := fmt.Sprintf("%s|%s", msg.Channel().UUID(), msg.URN().Identity())

b.receivedMsgs.Set(rc, fingerprint, fmt.Sprintf("%s|%s", msg.UUID(), msg.hash()))
if err := b.receivedMsgs.Set(rc, fingerprint, fmt.Sprintf("%s|%s", msg.UUID(), msg.hash())); err != nil {
slog.Error("error recording received msg", "msg", msg.UUID(), "error", err)
}
}
}

// clearMsgSeen clears our seen incoming messages for the passed in channel and URN
func (b *backend) clearMsgSeen(rc redis.Conn, msg *Msg) {
func (b *backend) clearMsgSeen(msg *Msg) {
rc := b.rp.Get()
defer rc.Close()

fingerprint := fmt.Sprintf("%s|%s", msg.Channel().UUID(), msg.URN().Identity())

b.receivedMsgs.Del(rc, fingerprint)
if err := b.receivedMsgs.Del(rc, fingerprint); err != nil {
slog.Error("error clearing received msgs", "urn", msg.URN().Identity(), "error", err)
}
}

0 comments on commit b35a305

Please sign in to comment.