-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
message.go
221 lines (195 loc) · 7.48 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
package pubsub
import (
"context"
"time"
)
// AckHandler implements ack/nack handling.
type AckHandler interface {
// OnAck processes a message ack.
OnAck()
// OnNack processes a message nack.
OnNack()
// OnAckWithResult processes a message ack and returns
// a result that shows if it succeeded.
OnAckWithResult() *AckResult
// OnNackWithResult processes a message nack and returns
// a result that shows if it succeeded.
OnNackWithResult() *AckResult
}
// Message represents a Pub/Sub message.
type Message struct {
// ID identifies this message. This ID is assigned by the server and is
// populated for Messages obtained from a subscription.
//
// This field is read-only.
ID string
// Data is the actual data in the message.
Data []byte
// Attributes represents the key-value pairs the current message is
// labelled with.
Attributes map[string]string
// PublishTime is the time at which the message was published. This is
// populated by the server for Messages obtained from a subscription.
//
// This field is read-only.
PublishTime time.Time
// DeliveryAttempt is the number of times a message has been delivered.
// This is part of the dead lettering feature that forwards messages that
// fail to be processed (from nack/ack deadline timeout) to a dead letter topic.
// If dead lettering is enabled, this will be set on all attempts, starting
// with value 1. Otherwise, the value will be nil.
// This field is read-only.
DeliveryAttempt *int
// OrderingKey identifies related messages for which publish order should
// be respected. If empty string is used, message will be sent unordered.
OrderingKey string
// ackh handles Ack() or Nack().
ackh AckHandler
}
// Ack indicates successful processing of a Message passed to the Subscriber.Receive callback.
// It should not be called on any other Message value.
// If message acknowledgement fails, the Message will be redelivered.
// Client code must call Ack or Nack when finished for each received Message.
// Calls to Ack or Nack have no effect after the first call.
func (m *Message) Ack() {
if m.ackh != nil {
m.ackh.OnAck()
}
}
// Nack indicates that the client will not or cannot process a Message passed to the Subscriber.Receive callback.
// It should not be called on any other Message value.
// Nack will result in the Message being redelivered more quickly than if it were allowed to expire.
// Client code must call Ack or Nack when finished for each received Message.
// Calls to Ack or Nack have no effect after the first call.
func (m *Message) Nack() {
if m.ackh != nil {
m.ackh.OnNack()
}
}
// AcknowledgeStatus represents the status of an Ack or Nack request.
type AcknowledgeStatus int
const (
// AcknowledgeStatusSuccess indicates the request was a success.
AcknowledgeStatusSuccess AcknowledgeStatus = iota
// AcknowledgeStatusPermissionDenied indicates the caller does not have sufficient permissions.
AcknowledgeStatusPermissionDenied
// AcknowledgeStatusFailedPrecondition indicates the request encountered a FailedPrecondition error.
AcknowledgeStatusFailedPrecondition
// AcknowledgeStatusInvalidAckID indicates one or more of the ack IDs sent were invalid.
AcknowledgeStatusInvalidAckID
// AcknowledgeStatusOther indicates another unknown error was returned.
AcknowledgeStatusOther
)
// AckResult holds the result from a call to Ack or Nack.
type AckResult struct {
ready chan struct{}
res AcknowledgeStatus
err error
}
// Ready returns a channel that is closed when the result is ready.
// When the Ready channel is closed, Get is guaranteed not to block.
func (r *AckResult) Ready() <-chan struct{} { return r.ready }
// Get returns the status and/or error result of a Ack, Nack, or Modack call.
// Get blocks until the Ack/Nack completes or the context is done.
func (r *AckResult) Get(ctx context.Context) (res AcknowledgeStatus, err error) {
// If the result is already ready, return it even if the context is done.
select {
case <-r.Ready():
return r.res, r.err
default:
}
select {
case <-ctx.Done():
// Explicitly return AcknowledgeStatusOther for context cancelled cases,
// since the default is success.
return AcknowledgeStatusOther, ctx.Err()
case <-r.Ready():
return r.res, r.err
}
}
// NewAckResult creates a AckResult.
func NewAckResult() *AckResult {
return &AckResult{
ready: make(chan struct{}),
}
}
// SetAckResult sets the ack response and error for a ack result and closes
// the Ready channel. Any call after the first for the same AckResult
// is a no-op.
func SetAckResult(r *AckResult, res AcknowledgeStatus, err error) {
select {
case <-r.Ready():
return
default:
r.res = res
r.err = err
close(r.ready)
}
}
// AckWithResult acknowledges a message in Pub/Sub and it will not be
// delivered to this subscription again.
//
// You should avoid acknowledging messages until you have
// *finished* processing them, so that in the event of a failure,
// you receive the message again.
//
// If exactly-once delivery is enabled on the subscription, the
// AckResult returned by this method tracks the state of acknowledgement
// operation. If the operation completes successfully, the message is
// guaranteed NOT to be re-delivered. Otherwise, the result will
// contain an error with more details about the failure and the
// message may be re-delivered.
//
// If exactly-once delivery is NOT enabled on the subscription, or
// if using Pub/Sub Lite, AckResult readies immediately with a AcknowledgeStatus.Success.
// Since acks in Cloud Pub/Sub are best effort when exactly-once
// delivery is disabled, the message may be re-delivered. Because
// re-deliveries are possible, you should ensure that your processing
// code is idempotent, as you may receive any given message more than
// once.
func (m *Message) AckWithResult() *AckResult {
if m.ackh != nil {
return m.ackh.OnAckWithResult()
}
return nil
}
// NackWithResult declines to acknowledge the message which indicates that
// the client will not or cannot process a Message. This will cause the message
// to be re-delivered to subscribers. Re-deliveries may take place immediately
// or after a delay.
//
// If exactly-once delivery is enabled on the subscription, the
// AckResult returned by this method tracks the state of nack
// operation. If the operation completes successfully, the result will
// contain AckResponse.Success. Otherwise, the result will contain an error
// with more details about the failure.
//
// If exactly-once delivery is NOT enabled on the subscription, or
// if using Pub/Sub Lite, AckResult readies immediately with a AcknowledgeStatus.Success.
func (m *Message) NackWithResult() *AckResult {
if m.ackh != nil {
return m.ackh.OnNackWithResult()
}
return nil
}
// NewMessage creates a message with an AckHandler implementation, which should
// not be nil.
func NewMessage(ackh AckHandler) *Message {
return &Message{ackh: ackh}
}
// MessageAckHandler provides access to the internal field Message.ackh.
func MessageAckHandler(m *Message) AckHandler {
return m.ackh
}