Skip to content

Commit

Permalink
Send event directly to RabbitMQ
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasjohansen committed Jul 16, 2023
1 parent 095834a commit faea709
Showing 1 changed file with 52 additions and 22 deletions.
74 changes: 52 additions & 22 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package dispatcher

import (
"context"
"fmt"
"net/http"
"sync"
"time"
Expand Down Expand Up @@ -101,7 +102,7 @@ func (d *Dispatcher) ConsumeFromQueue(ctx context.Context, conn rabbit.RabbitMQC
go func() {
defer wg.Done()
for msg := range workerQueue {
d.dispatch(ctx, msg, ceClient)
d.dispatch(ctx, msg, ceClient, channel)
}
}()
}
Expand Down Expand Up @@ -150,12 +151,11 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) {
return -1, false
}

func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) {
func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client, channel rabbit.RabbitMQChannelInterface) {
start := time.Now()
if _, ok := msg.Headers["knativeerrordest"]; ok {
err := msg.Nack(false, false)
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
if err := msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Nack event: ", err)
}
return
}
Expand All @@ -165,10 +165,8 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c
if err != nil {
logging.FromContext(ctx).Warn("failed parsing event, setting knativeerrordest header and NACK-ing and re-queueing: ", err)

msg.Headers["knativeerrordest"] = d.SubscriberURL
err = msg.Nack(false, true)
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
if err = msg.Nack(false, false); err != nil {
logging.FromContext(ctx).Warn("failed to Nack event: ", err)
}
return
}
Expand Down Expand Up @@ -197,10 +195,19 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c

if !isSuccess {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL)
msg.Headers["knativeerrordest"] = d.SubscriberURL
msg.Headers["knativeerrorcode"] = statusCode
if err = msg.Nack(false, true); err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)

// We need to ack the original message.
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}

// Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery
event.SetExtension("knativeerrordest", d.SubscriberURL)
event.SetExtension("knativeerrorcode", statusCode)

// Queue the event into DLQ with the correct headers.
if err = sendToRabbitMQ(channel, msg.Exchange, span, event); err != nil {
logging.FromContext(ctx).Warn("failed to send event: ", err)
}
return
} else if response != nil {
Expand All @@ -210,22 +217,27 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c
_, isSuccess = getStatus(ctx, result)
if !isSuccess {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL)

// We need to ack the original message.
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}

msg.Headers["knativeerrordest"] = d.SubscriberURL
msg.Headers["knativeerrorcode"] = statusCode
msg.Headers["knativeerrordata"] = result
// Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery
event.SetExtension("knativeerrordest", d.SubscriberURL)
event.SetExtension("knativeerrorcode", statusCode)
event.SetExtension("knativeerrordata", result)

err = msg.Nack(false, true) // not multiple
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
// Queue the event into DLQ with the correct headers.
if err = sendToRabbitMQ(channel, msg.Exchange, span, event); err != nil {
logging.FromContext(ctx).Warn("failed to send event: ", err)
}
return
}
}

err = msg.Ack(false)
if err != nil {
logging.FromContext(ctx).Warn("failed to ACK event: ", err)
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}
if statusCode != -1 {
args := &dispatcher.ReportArgs{EventType: event.Type()}
Expand All @@ -234,6 +246,24 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c
}
}

func sendToRabbitMQ(channel rabbit.RabbitMQChannelInterface, exchangeName string, span *trace.Span, event *cloudevents.Event) error {
tp, ts := (&tracecontext.HTTPFormat{}).SpanContextToHeaders(span.SpanContext())
dc, err := channel.PublishWithDeferredConfirm(
exchangeName,
"", // routing key
false, // mandatory
false, // immediate
*rabbit.CloudEventToRabbitMQMessage(event, tp, ts))
if err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}

if ack := dc.Wait(); !ack {
return errors.New("failed to publish message: nacked")
}
return nil
}

func readSpan(ctx context.Context, msg amqp.Delivery) (context.Context, *trace.Span) {
traceparent, ok := msg.Headers["traceparent"].(string)
if !ok {
Expand Down

0 comments on commit faea709

Please sign in to comment.