Skip to content

Commit

Permalink
Moved things around
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasjohansen committed Jul 17, 2023
1 parent 6be7192 commit 5fb51ca
Showing 1 changed file with 70 additions and 70 deletions.
140 changes: 70 additions & 70 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ func (d *Dispatcher) ConsumeFromQueue(ctx context.Context, conn rabbit.RabbitMQC
defer wg.Done()
for msg := range workerQueue {
if d.DLX {
d.dispatchDLQ(ctx, msg, ceClient, channel)
d.dispatchDLQ(ctx, msg, ceClient)
} else {
d.dispatch(ctx, msg, ceClient)
d.dispatch(ctx, msg, ceClient, channel)
}
}
}()
Expand Down Expand Up @@ -156,74 +156,7 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) {
return -1, false
}

func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) {
start := time.Now()
msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg)
event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding)
if err != nil {
logging.FromContext(ctx).Warn("failed creating event from delivery, err (NACK-ing and not re-queueing): ", err)
err = msg.Nack(false, false)
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
return
}

ctx, span := readSpan(ctx, msg)
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(client.EventTraceAttributes(event)...)
}

ctx = cloudevents.ContextWithTarget(ctx, d.SubscriberURL)
if d.BackoffPolicy == eventingduckv1.BackoffPolicyLinear {
ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, d.BackoffDelay, d.MaxRetries)
} else {
ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.BackoffDelay, d.MaxRetries)
}

response, result := ceClient.Request(ctx, *event)
statusCode, isSuccess := getStatus(ctx, result)
if statusCode != -1 {
args := &dispatcher.ReportArgs{EventType: event.Type()}
if err := d.Reporter.ReportEventCount(args, statusCode); err != nil {
logging.FromContext(ctx).Errorf("Something happened: %v", err)
}
}

if !isSuccess {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL)
if err := msg.Nack(false, false); err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
return
} else if response != nil {
logging.FromContext(ctx).Infof("Sending an event: %+v", response)
ctx = cloudevents.ContextWithTarget(ctx, d.BrokerIngressURL)
result := ceClient.Send(ctx, *response)
_, isSuccess := getStatus(ctx, result)
if !isSuccess {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL)
err = msg.Nack(false, false) // not multiple
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
return
}
}

err = msg.Ack(false)
if err != nil {
logging.FromContext(ctx).Warn("failed to ACK event: ", err)
}
if statusCode != -1 {
args := &dispatcher.ReportArgs{EventType: event.Type()}
dispatchTime := time.Since(start)
_ = d.Reporter.ReportEventDispatchTime(args, statusCode, dispatchTime)
}
}

func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client, channel rabbit.RabbitMQChannelInterface) {
func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client, channel rabbit.RabbitMQChannelInterface) {
start := time.Now()
if _, ok := msg.Headers["knativeerrordest"]; ok {
if err := msg.Nack(false, false); err != nil {
Expand Down Expand Up @@ -327,6 +260,73 @@ func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClien
}
}

func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) {
start := time.Now()
msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg)
event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding)
if err != nil {
logging.FromContext(ctx).Warn("failed creating event from delivery, err (NACK-ing and not re-queueing): ", err)
err = msg.Nack(false, false)
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
return
}

ctx, span := readSpan(ctx, msg)
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(client.EventTraceAttributes(event)...)
}

ctx = cloudevents.ContextWithTarget(ctx, d.SubscriberURL)
if d.BackoffPolicy == eventingduckv1.BackoffPolicyLinear {
ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, d.BackoffDelay, d.MaxRetries)
} else {
ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.BackoffDelay, d.MaxRetries)
}

response, result := ceClient.Request(ctx, *event)
statusCode, isSuccess := getStatus(ctx, result)
if statusCode != -1 {
args := &dispatcher.ReportArgs{EventType: event.Type()}
if err := d.Reporter.ReportEventCount(args, statusCode); err != nil {
logging.FromContext(ctx).Errorf("Something happened: %v", err)
}
}

if !isSuccess {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL)
if err := msg.Nack(false, false); err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
return
} else if response != nil {
logging.FromContext(ctx).Infof("Sending an event: %+v", response)
ctx = cloudevents.ContextWithTarget(ctx, d.BrokerIngressURL)
result := ceClient.Send(ctx, *response)
_, isSuccess := getStatus(ctx, result)
if !isSuccess {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL)
err = msg.Nack(false, false) // not multiple
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
return
}
}

err = msg.Ack(false)
if err != nil {
logging.FromContext(ctx).Warn("failed to ACK event: ", err)
}
if statusCode != -1 {
args := &dispatcher.ReportArgs{EventType: event.Type()}
dispatchTime := time.Since(start)
_ = d.Reporter.ReportEventDispatchTime(args, statusCode, dispatchTime)
}
}

func sendToRabbitMQ(channel rabbit.RabbitMQChannelInterface, exchangeName string, span *trace.Span, event *cloudevents.Event) error {
tp, ts := (&tracecontext.HTTPFormat{}).SpanContextToHeaders(span.SpanContext())
dc, err := channel.PublishWithDeferredConfirm(
Expand Down

0 comments on commit 5fb51ca

Please sign in to comment.