Skip to content

Commit

Permalink
Set hedders in msg and requeue
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasjohansen committed Jul 10, 2023
1 parent 0371885 commit 51afdb7
Showing 1 changed file with 21 additions and 20 deletions.
41 changes: 21 additions & 20 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,25 +152,25 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) {

func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) {
start := time.Now()

requeue := false

if _, ok := msg.Headers["knativeerrordest"]; !ok {
err := msg.Nack(false, false)
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
return
}

msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg)
event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding)
if err != nil {
extensions := event.Extensions()
if _, ok := extensions["knativeerrordest"]; ok {
logging.FromContext(ctx).Warn("failed creating event from delivery, err (NACK-ing and not re-queueing): ", err)
err = msg.Nack(false, false)
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
logging.FromContext(ctx).Warn("failed parsing event, setting knativeerrordest header and NACK-ing and re-queueing: ", err)

return
} else {
requeue = true
event.SetExtension("knativeerrordest", d.SubscriberURL)
msg.Headers["knativeerrordest"] = d.SubscriberURL
err = msg.Nack(false, true)
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
return
}

ctx, span := readSpan(ctx, msg)
Expand All @@ -193,14 +193,14 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c
if err = d.Reporter.ReportEventCount(args, statusCode); err != nil {
logging.FromContext(ctx).Errorf("Something happened: %v", err)

requeue = true
event.SetExtension("knativeerrorcode", statusCode)
msg.Headers["knativeerrordest"] = d.SubscriberURL
msg.Headers["knativeerrorcode"] = statusCode
}
}

if !isSuccess {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL)
if err = msg.Nack(false, requeue); err != nil {
if err = msg.Nack(false, true); err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
return
Expand All @@ -212,10 +212,11 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c
if !isSuccess {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL)

requeue = true
event.SetExtension("knativeerrordata", result)
msg.Headers["knativeerrordest"] = d.SubscriberURL
msg.Headers["knativeerrorcode"] = statusCode
msg.Headers["knativeerrordata"] = result

err = msg.Nack(false, requeue) // not multiple
err = msg.Nack(false, true) // not multiple
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
Expand Down

0 comments on commit 51afdb7

Please sign in to comment.