-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
consumer.go
4317 lines (3873 loc) · 112 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2019-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"math/rand"
"reflect"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/nats-io/nuid"
"golang.org/x/time/rate"
)
// Headers sent with Request Timeout
const (
JSPullRequestPendingMsgs = "Nats-Pending-Messages"
JSPullRequestPendingBytes = "Nats-Pending-Bytes"
)
type ConsumerInfo struct {
Stream string `json:"stream_name"`
Name string `json:"name"`
Created time.Time `json:"created"`
Config *ConsumerConfig `json:"config,omitempty"`
Delivered SequenceInfo `json:"delivered"`
AckFloor SequenceInfo `json:"ack_floor"`
NumAckPending int `json:"num_ack_pending"`
NumRedelivered int `json:"num_redelivered"`
NumWaiting int `json:"num_waiting"`
NumPending uint64 `json:"num_pending"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
PushBound bool `json:"push_bound,omitempty"`
}
type ConsumerConfig struct {
// Durable is deprecated. All consumers will have names. picked by clients.
Durable string `json:"durable_name,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
AckPolicy AckPolicy `json:"ack_policy"`
AckWait time.Duration `json:"ack_wait,omitempty"`
MaxDeliver int `json:"max_deliver,omitempty"`
BackOff []time.Duration `json:"backoff,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
ReplayPolicy ReplayPolicy `json:"replay_policy"`
RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec
SampleFrequency string `json:"sample_freq,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
HeadersOnly bool `json:"headers_only,omitempty"`
// Pull based options.
MaxRequestBatch int `json:"max_batch,omitempty"`
MaxRequestExpires time.Duration `json:"max_expires,omitempty"`
MaxRequestMaxBytes int `json:"max_bytes,omitempty"`
// Push based consumers.
DeliverSubject string `json:"deliver_subject,omitempty"`
DeliverGroup string `json:"deliver_group,omitempty"`
// Ephemeral inactivity threshold.
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
// Generally inherited by parent stream and other markers, now can be configured directly.
Replicas int `json:"num_replicas"`
// Force memory storage.
MemoryStorage bool `json:"mem_storage,omitempty"`
// Don't add to general clients.
Direct bool `json:"direct,omitempty"`
}
// SequenceInfo has both the consumer and the stream sequence and last activity.
type SequenceInfo struct {
Consumer uint64 `json:"consumer_seq"`
Stream uint64 `json:"stream_seq"`
Last *time.Time `json:"last_active,omitempty"`
}
type CreateConsumerRequest struct {
Stream string `json:"stream_name"`
Config ConsumerConfig `json:"config"`
}
// ConsumerNakOptions is for optional NAK values, e.g. delay.
type ConsumerNakOptions struct {
Delay time.Duration `json:"delay"`
}
// DeliverPolicy determines how the consumer should select the first message to deliver.
type DeliverPolicy int
const (
// DeliverAll will be the default so can be omitted from the request.
DeliverAll DeliverPolicy = iota
// DeliverLast will start the consumer with the last sequence received.
DeliverLast
// DeliverNew will only deliver new messages that are sent after the consumer is created.
DeliverNew
// DeliverByStartSequence will look for a defined starting sequence to start.
DeliverByStartSequence
// DeliverByStartTime will select the first messsage with a timestamp >= to StartTime.
DeliverByStartTime
// DeliverLastPerSubject will start the consumer with the last message for all subjects received.
DeliverLastPerSubject
)
func (dp DeliverPolicy) String() string {
switch dp {
case DeliverAll:
return "all"
case DeliverLast:
return "last"
case DeliverNew:
return "new"
case DeliverByStartSequence:
return "by_start_sequence"
case DeliverByStartTime:
return "by_start_time"
case DeliverLastPerSubject:
return "last_per_subject"
default:
return "undefined"
}
}
// AckPolicy determines how the consumer should acknowledge delivered messages.
type AckPolicy int
const (
// AckNone requires no acks for delivered messages.
AckNone AckPolicy = iota
// AckAll when acking a sequence number, this implicitly acks all sequences below this one as well.
AckAll
// AckExplicit requires ack or nack for all messages.
AckExplicit
)
func (a AckPolicy) String() string {
switch a {
case AckNone:
return "none"
case AckAll:
return "all"
default:
return "explicit"
}
}
// ReplayPolicy determines how the consumer should replay messages it already has queued in the stream.
type ReplayPolicy int
const (
// ReplayInstant will replay messages as fast as possible.
ReplayInstant ReplayPolicy = iota
// ReplayOriginal will maintain the same timing as the messages were received.
ReplayOriginal
)
func (r ReplayPolicy) String() string {
switch r {
case ReplayInstant:
return "instant"
default:
return "original"
}
}
// OK
const OK = "+OK"
// Ack responses. Note that a nil or no payload is same as AckAck
var (
// Ack
AckAck = []byte("+ACK") // nil or no payload to ack subject also means ACK
AckOK = []byte(OK) // deprecated but +OK meant ack as well.
// Nack
AckNak = []byte("-NAK")
// Progress indicator
AckProgress = []byte("+WPI")
// Ack + Deliver the next message(s).
AckNext = []byte("+NXT")
// Terminate delivery of the message.
AckTerm = []byte("+TERM")
)
func (consCfg ConsumerConfig) replicas(strCfg *StreamConfig) int {
if consCfg.Replicas == 0 {
if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy {
return 1
}
return strCfg.Replicas
} else {
return consCfg.Replicas
}
}
// Consumer is a jetstream consumer.
type consumer struct {
// Atomic used to notify that we want to process an ack.
// This will be checked in checkPending to abort processing
// and let ack be processed in priority.
awl int64
mu sync.RWMutex
js *jetStream
mset *stream
acc *Account
srv *Server
client *client
sysc *client
sid int
name string
stream string
sseq uint64
dseq uint64
adflr uint64
asflr uint64
npc uint64
npcm uint64
dsubj string
qgroup string
lss *lastSeqSkipList
rlimit *rate.Limiter
reqSub *subscription
ackSub *subscription
ackReplyT string
ackSubj string
nextMsgSubj string
maxp int
pblimit int
maxpb int
pbytes int
fcsz int
fcid string
fcSub *subscription
outq *jsOutQ
pending map[uint64]*Pending
ptmr *time.Timer
rdq []uint64
rdqi map[uint64]struct{}
rdc map[uint64]uint64
maxdc uint64
waiting *waitQueue
cfg ConsumerConfig
ici *ConsumerInfo
store ConsumerStore
active bool
replay bool
filterWC bool
dtmr *time.Timer
gwdtmr *time.Timer
dthresh time.Duration
mch chan struct{}
qch chan struct{}
inch chan bool
sfreq int32
ackEventT string
nakEventT string
deliveryExcEventT string
created time.Time
ldt time.Time
lat time.Time
closed bool
// Clustered.
ca *consumerAssignment
node RaftNode
infoSub *subscription
lqsent time.Time
prm map[string]struct{}
prOk bool
uch chan struct{}
retention RetentionPolicy
// R>1 proposals
pch chan struct{}
phead *proposal
ptail *proposal
// Ack queue
ackMsgs *ipQueue
}
type proposal struct {
data []byte
next *proposal
}
const (
// JsAckWaitDefault is the default AckWait, only applicable on explicit ack policy consumers.
JsAckWaitDefault = 30 * time.Second
// JsDeleteWaitTimeDefault is the default amount of time we will wait for non-durable
// consumers to be in an inactive state before deleting them.
JsDeleteWaitTimeDefault = 5 * time.Second
// JsFlowControlMaxPending specifies default pending bytes during flow control that can be
// outstanding.
JsFlowControlMaxPending = 32 * 1024 * 1024
// JsDefaultMaxAckPending is set for consumers with explicit ack that do not set the max ack pending.
JsDefaultMaxAckPending = 1000
)
// Helper function to set consumer config defaults from above.
func setConsumerConfigDefaults(config *ConsumerConfig, lim *JSLimitOpts, accLim *JetStreamAccountLimits) {
// Set to default if not specified.
if config.DeliverSubject == _EMPTY_ && config.MaxWaiting == 0 {
config.MaxWaiting = JSWaitQueueDefaultMax
}
// Setup proper default for ack wait if we are in explicit ack mode.
if config.AckWait == 0 && (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) {
config.AckWait = JsAckWaitDefault
}
// Setup default of -1, meaning no limit for MaxDeliver.
if config.MaxDeliver == 0 {
config.MaxDeliver = -1
}
// If BackOff was specified that will override the AckWait and the MaxDeliver.
if len(config.BackOff) > 0 {
config.AckWait = config.BackOff[0]
}
// Set proper default for max ack pending if we are ack explicit and none has been set.
if (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) && config.MaxAckPending == 0 {
accPending := JsDefaultMaxAckPending
if lim.MaxAckPending > 0 && lim.MaxAckPending < accPending {
accPending = lim.MaxAckPending
}
if accLim.MaxAckPending > 0 && accLim.MaxAckPending < accPending {
accPending = accLim.MaxAckPending
}
config.MaxAckPending = accPending
}
// if applicable set max request batch size
if config.DeliverSubject == _EMPTY_ && config.MaxRequestBatch == 0 && lim.MaxRequestBatch > 0 {
config.MaxRequestBatch = lim.MaxRequestBatch
}
}
// Check the consumer config. If we are recovering don't check filter subjects.
func checkConsumerCfg(
config *ConsumerConfig,
srvLim *JSLimitOpts,
cfg *StreamConfig,
acc *Account,
accLim *JetStreamAccountLimits,
isRecovering bool,
) *ApiError {
// Check if replicas is defined but exceeds parent stream.
if config.Replicas > 0 && config.Replicas > cfg.Replicas {
return NewJSConsumerReplicasExceedsStreamError()
}
// Check that it is not negative
if config.Replicas < 0 {
return NewJSReplicasCountCannotBeNegativeError()
}
// Check if we have a BackOff defined that MaxDeliver is within range etc.
if lbo := len(config.BackOff); lbo > 0 && config.MaxDeliver <= lbo {
return NewJSConsumerMaxDeliverBackoffError()
}
if len(config.Description) > JSMaxDescriptionLen {
return NewJSConsumerDescriptionTooLongError(JSMaxDescriptionLen)
}
// For now expect a literal subject if its not empty. Empty means work queue mode (pull mode).
if config.DeliverSubject != _EMPTY_ {
if !subjectIsLiteral(config.DeliverSubject) {
return NewJSConsumerDeliverToWildcardsError()
}
if !IsValidSubject(config.DeliverSubject) {
return NewJSConsumerInvalidDeliverSubjectError()
}
if deliveryFormsCycle(cfg, config.DeliverSubject) {
return NewJSConsumerDeliverCycleError()
}
if config.MaxWaiting != 0 {
return NewJSConsumerPushMaxWaitingError()
}
if config.MaxAckPending > 0 && config.AckPolicy == AckNone {
return NewJSConsumerMaxPendingAckPolicyRequiredError()
}
if config.Heartbeat > 0 && config.Heartbeat < 100*time.Millisecond {
return NewJSConsumerSmallHeartbeatError()
}
} else {
// Pull mode with work queue retention from the stream requires an explicit ack.
if config.AckPolicy == AckNone && cfg.Retention == WorkQueuePolicy {
return NewJSConsumerPullRequiresAckError()
}
if config.RateLimit > 0 {
return NewJSConsumerPullWithRateLimitError()
}
if config.MaxWaiting < 0 {
return NewJSConsumerMaxWaitingNegativeError()
}
if config.Heartbeat > 0 {
return NewJSConsumerHBRequiresPushError()
}
if config.FlowControl {
return NewJSConsumerFCRequiresPushError()
}
if config.MaxRequestBatch < 0 {
return NewJSConsumerMaxRequestBatchNegativeError()
}
if config.MaxRequestExpires != 0 && config.MaxRequestExpires < time.Millisecond {
return NewJSConsumerMaxRequestExpiresToSmallError()
}
if srvLim.MaxRequestBatch > 0 && config.MaxRequestBatch > srvLim.MaxRequestBatch {
return NewJSConsumerMaxRequestBatchExceededError(srvLim.MaxRequestBatch)
}
}
if srvLim.MaxAckPending > 0 && config.MaxAckPending > srvLim.MaxAckPending {
return NewJSConsumerMaxPendingAckExcessError(srvLim.MaxAckPending)
}
if accLim.MaxAckPending > 0 && config.MaxAckPending > accLim.MaxAckPending {
return NewJSConsumerMaxPendingAckExcessError(accLim.MaxAckPending)
}
// Direct need to be non-mapped ephemerals.
if config.Direct {
if config.DeliverSubject == _EMPTY_ {
return NewJSConsumerDirectRequiresPushError()
}
if isDurableConsumer(config) {
return NewJSConsumerDirectRequiresEphemeralError()
}
}
// As best we can make sure the filtered subject is valid.
if config.FilterSubject != _EMPTY_ {
subjects := copyStrings(cfg.Subjects)
// explicitly skip validFilteredSubject when recovering
hasExt := isRecovering
if !isRecovering {
subjects, hasExt = gatherSourceMirrorSubjects(subjects, cfg, acc)
}
if !hasExt && !validFilteredSubject(config.FilterSubject, subjects) {
return NewJSConsumerFilterNotSubsetError()
}
}
// Helper function to formulate similar errors.
badStart := func(dp, start string) error {
return fmt.Errorf("consumer delivery policy is deliver %s, but optional start %s is also set", dp, start)
}
notSet := func(dp, notSet string) error {
return fmt.Errorf("consumer delivery policy is deliver %s, but optional %s is not set", dp, notSet)
}
// Check on start position conflicts.
switch config.DeliverPolicy {
case DeliverAll:
if config.OptStartSeq > 0 {
return NewJSConsumerInvalidPolicyError(badStart("all", "sequence"))
}
if config.OptStartTime != nil {
return NewJSConsumerInvalidPolicyError(badStart("all", "time"))
}
case DeliverLast:
if config.OptStartSeq > 0 {
return NewJSConsumerInvalidPolicyError(badStart("last", "sequence"))
}
if config.OptStartTime != nil {
return NewJSConsumerInvalidPolicyError(badStart("last", "time"))
}
case DeliverLastPerSubject:
if config.OptStartSeq > 0 {
return NewJSConsumerInvalidPolicyError(badStart("last per subject", "sequence"))
}
if config.OptStartTime != nil {
return NewJSConsumerInvalidPolicyError(badStart("last per subject", "time"))
}
if config.FilterSubject == _EMPTY_ {
return NewJSConsumerInvalidPolicyError(notSet("last per subject", "filter subject"))
}
case DeliverNew:
if config.OptStartSeq > 0 {
return NewJSConsumerInvalidPolicyError(badStart("new", "sequence"))
}
if config.OptStartTime != nil {
return NewJSConsumerInvalidPolicyError(badStart("new", "time"))
}
case DeliverByStartSequence:
if config.OptStartSeq == 0 {
return NewJSConsumerInvalidPolicyError(notSet("by start sequence", "start sequence"))
}
if config.OptStartTime != nil {
return NewJSConsumerInvalidPolicyError(badStart("by start sequence", "time"))
}
case DeliverByStartTime:
if config.OptStartTime == nil {
return NewJSConsumerInvalidPolicyError(notSet("by start time", "start time"))
}
if config.OptStartSeq != 0 {
return NewJSConsumerInvalidPolicyError(badStart("by start time", "start sequence"))
}
}
if config.SampleFrequency != _EMPTY_ {
s := strings.TrimSuffix(config.SampleFrequency, "%")
if sampleFreq, err := strconv.Atoi(s); err != nil || sampleFreq < 0 {
return NewJSConsumerInvalidSamplingError(err)
}
}
// We reject if flow control is set without heartbeats.
if config.FlowControl && config.Heartbeat == 0 {
return NewJSConsumerWithFlowControlNeedsHeartbeatsError()
}
if config.Durable != _EMPTY_ && config.Name != _EMPTY_ {
if config.Name != config.Durable {
return NewJSConsumerCreateDurableAndNameMismatchError()
}
}
return nil
}
func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) {
return mset.addConsumerWithAssignment(config, _EMPTY_, nil, false)
}
func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname string, ca *consumerAssignment, isRecovering bool) (*consumer, error) {
mset.mu.RLock()
s, jsa, tierName, cfg, acc := mset.srv, mset.jsa, mset.tier, mset.cfg, mset.acc
retention := cfg.Retention
mset.mu.RUnlock()
// If we do not have the consumer currently assigned to us in cluster mode we will proceed but warn.
// This can happen on startup with restored state where on meta replay we still do not have
// the assignment. Running in single server mode this always returns true.
if oname != _EMPTY_ && !jsa.consumerAssigned(mset.name(), oname) {
s.Debugf("Consumer %q > %q does not seem to be assigned to this server", mset.name(), oname)
}
if config == nil {
return nil, NewJSConsumerConfigRequiredError()
}
jsa.usageMu.RLock()
selectedLimits, limitsFound := jsa.limits[tierName]
jsa.usageMu.RUnlock()
if !limitsFound {
return nil, NewJSNoLimitsError()
}
srvLim := &s.getOpts().JetStreamLimits
// Make sure we have sane defaults.
setConsumerConfigDefaults(config, srvLim, &selectedLimits)
if err := checkConsumerCfg(config, srvLim, &cfg, acc, &selectedLimits, isRecovering); err != nil {
return nil, err
}
sampleFreq := 0
if config.SampleFrequency != _EMPTY_ {
// Can't fail as checkConsumerCfg checks correct format
sampleFreq, _ = strconv.Atoi(strings.TrimSuffix(config.SampleFrequency, "%"))
}
// Grab the client, account and server reference.
c := mset.client
if c == nil {
return nil, NewJSStreamInvalidError()
}
var accName string
c.mu.Lock()
s, a := c.srv, c.acc
if a != nil {
accName = a.Name
}
c.mu.Unlock()
// Hold mset lock here.
mset.mu.Lock()
if mset.client == nil || mset.store == nil || mset.consumers == nil {
mset.mu.Unlock()
return nil, errors.New("invalid stream")
}
// If this one is durable and already exists, we let that be ok as long as only updating what should be allowed.
var cName string
if isDurableConsumer(config) {
cName = config.Durable
} else if config.Name != _EMPTY_ {
cName = config.Name
}
if cName != _EMPTY_ {
if eo, ok := mset.consumers[cName]; ok {
mset.mu.Unlock()
err := eo.updateConfig(config)
if err == nil {
return eo, nil
}
return nil, NewJSConsumerCreateError(err, Unless(err))
}
}
// Check for any limits, if the config for the consumer sets a limit we check against that
// but if not we use the value from account limits, if account limits is more restrictive
// than stream config we prefer the account limits to handle cases where account limits are
// updated during the lifecycle of the stream
maxc := mset.cfg.MaxConsumers
if maxc <= 0 || (selectedLimits.MaxConsumers > 0 && selectedLimits.MaxConsumers < maxc) {
maxc = selectedLimits.MaxConsumers
}
if maxc > 0 && mset.numPublicConsumers() >= maxc {
mset.mu.Unlock()
return nil, NewJSMaximumConsumersLimitError()
}
// Check on stream type conflicts with WorkQueues.
if mset.cfg.Retention == WorkQueuePolicy && !config.Direct {
// Force explicit acks here.
if config.AckPolicy != AckExplicit {
mset.mu.Unlock()
return nil, NewJSConsumerWQRequiresExplicitAckError()
}
if len(mset.consumers) > 0 {
if config.FilterSubject == _EMPTY_ {
mset.mu.Unlock()
return nil, NewJSConsumerWQMultipleUnfilteredError()
} else if !mset.partitionUnique(config.FilterSubject) {
// Prior to v2.9.7, on a stream with WorkQueue policy, the servers
// were not catching the error of having multiple consumers with
// overlapping filter subjects depending on the scope, for instance
// creating "foo.*.bar" and then "foo.>" was not detected, while
// "foo.>" and then "foo.*.bar" would have been. Failing here
// in recovery mode would leave the rejected consumer in a bad state,
// so we will simply warn here, asking the user to remove this
// consumer administratively. Otherwise, if this is the creation
// of a new consumer, we will return the error.
if isRecovering {
s.Warnf("Consumer %q > %q has a filter subject that overlaps "+
"with other consumers, which is not allowed for a stream "+
"with WorkQueue policy, it should be administratively deleted",
cfg.Name, cName)
} else {
// We have a partition but it is not unique amongst the others.
mset.mu.Unlock()
return nil, NewJSConsumerWQConsumerNotUniqueError()
}
}
}
if config.DeliverPolicy != DeliverAll {
mset.mu.Unlock()
return nil, NewJSConsumerWQConsumerNotDeliverAllError()
}
}
// Set name, which will be durable name if set, otherwise we create one at random.
o := &consumer{
mset: mset,
js: s.getJetStream(),
acc: a,
srv: s,
client: s.createInternalJetStreamClient(),
sysc: s.createInternalJetStreamClient(),
cfg: *config,
dsubj: config.DeliverSubject,
outq: mset.outq,
active: true,
qch: make(chan struct{}),
uch: make(chan struct{}, 1),
mch: make(chan struct{}, 1),
sfreq: int32(sampleFreq),
maxdc: uint64(config.MaxDeliver),
maxp: config.MaxAckPending,
retention: retention,
created: time.Now().UTC(),
}
// Bind internal client to the user account.
o.client.registerWithAccount(a)
// Bind to the system account.
o.sysc.registerWithAccount(s.SystemAccount())
if isDurableConsumer(config) {
if len(config.Durable) > JSMaxNameLen {
mset.mu.Unlock()
o.deleteWithoutAdvisory()
return nil, NewJSConsumerNameTooLongError(JSMaxNameLen)
}
o.name = config.Durable
} else if oname != _EMPTY_ {
o.name = oname
} else {
if config.Name != _EMPTY_ {
o.name = config.Name
} else {
// Legacy ephemeral auto-generated.
for {
o.name = createConsumerName()
if _, ok := mset.consumers[o.name]; !ok {
break
}
}
config.Name = o.name
}
}
// Create ackMsgs queue now that we have a consumer name
o.ackMsgs = s.newIPQueue(fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' ackMsgs", accName, o.name, mset.cfg.Name))
// Create our request waiting queue.
if o.isPullMode() {
o.waiting = newWaitQueue(config.MaxWaiting)
}
// Check if we have filtered subject that is a wildcard.
if config.FilterSubject != _EMPTY_ && subjectHasWildcard(config.FilterSubject) {
o.filterWC = true
}
// already under lock, mset.Name() would deadlock
o.stream = mset.cfg.Name
o.ackEventT = JSMetricConsumerAckPre + "." + o.stream + "." + o.name
o.nakEventT = JSAdvisoryConsumerMsgNakPre + "." + o.stream + "." + o.name
o.deliveryExcEventT = JSAdvisoryConsumerMaxDeliveryExceedPre + "." + o.stream + "." + o.name
if !isValidName(o.name) {
mset.mu.Unlock()
o.deleteWithoutAdvisory()
return nil, NewJSConsumerBadDurableNameError()
}
// Setup our storage if not a direct consumer.
if !config.Direct {
store, err := mset.store.ConsumerStore(o.name, config)
if err != nil {
mset.mu.Unlock()
o.deleteWithoutAdvisory()
return nil, NewJSConsumerStoreFailedError(err)
}
o.store = store
}
if o.store != nil && o.store.HasState() {
// Restore our saved state.
o.mu.Lock()
o.readStoredState(0)
o.mu.Unlock()
} else {
// Select starting sequence number
o.selectStartingSeqNo()
}
// Now register with mset and create the ack subscription.
// Check if we already have this one registered.
if eo, ok := mset.consumers[o.name]; ok {
mset.mu.Unlock()
if !o.isDurable() || !o.isPushMode() {
o.name = _EMPTY_ // Prevent removal since same name.
o.deleteWithoutAdvisory()
return nil, NewJSConsumerNameExistError()
}
// If we are here we have already registered this durable. If it is still active that is an error.
if eo.isActive() {
o.name = _EMPTY_ // Prevent removal since same name.
o.deleteWithoutAdvisory()
return nil, NewJSConsumerExistingActiveError()
}
// Since we are here this means we have a potentially new durable so we should update here.
// Check that configs are the same.
if !configsEqualSansDelivery(o.cfg, eo.cfg) {
o.name = _EMPTY_ // Prevent removal since same name.
o.deleteWithoutAdvisory()
return nil, NewJSConsumerReplacementWithDifferentNameError()
}
// Once we are here we have a replacement push-based durable.
eo.updateDeliverSubject(o.cfg.DeliverSubject)
return eo, nil
}
// Set up the ack subscription for this consumer. Will use wildcard for all acks.
// We will remember the template to generate replies with sequence numbers and use
// that to scanf them back in.
mn := mset.cfg.Name
pre := fmt.Sprintf(jsAckT, mn, o.name)
o.ackReplyT = fmt.Sprintf("%s.%%d.%%d.%%d.%%d.%%d", pre)
o.ackSubj = fmt.Sprintf("%s.*.*.*.*.*", pre)
o.nextMsgSubj = fmt.Sprintf(JSApiRequestNextT, mn, o.name)
// Check/update the inactive threshold
o.updateInactiveThreshold(&o.cfg)
if o.isPushMode() {
if !o.isDurable() {
// Check if we are not durable that the delivery subject has interest.
// Check in place here for interest. Will setup properly in setLeader.
r := o.acc.sl.Match(o.cfg.DeliverSubject)
if !o.hasDeliveryInterest(len(r.psubs)+len(r.qsubs) > 0) {
// Let the interest come to us eventually, but setup delete timer.
o.updateDeliveryInterest(false)
}
}
}
// Set our ca.
if ca != nil {
o.setConsumerAssignment(ca)
}
// Check if we have a rate limit set.
if config.RateLimit != 0 {
o.setRateLimit(config.RateLimit)
}
mset.setConsumer(o)
mset.mu.Unlock()
if config.Direct || (!s.JetStreamIsClustered() && s.standAloneMode()) {
o.setLeader(true)
}
// This is always true in single server mode.
o.mu.RLock()
isLdr := o.isLeader()
o.mu.RUnlock()
if isLdr {
// Send advisory.
var suppress bool
if !s.standAloneMode() && ca == nil {
suppress = true
} else if ca != nil {
suppress = ca.responded
}
if !suppress {
o.sendCreateAdvisory()
}
}
return o, nil
}
// Updates the consumer `dthresh` delete timer duration and set
// cfg.InactiveThreshold to JsDeleteWaitTimeDefault for ephemerals
// if not explicitly already specified by the user.
// Lock should be held.
func (o *consumer) updateInactiveThreshold(cfg *ConsumerConfig) {
// Ephemerals will always have inactive thresholds.
if !o.isDurable() && cfg.InactiveThreshold <= 0 {
// Add in 1 sec of jitter above and beyond the default of 5s.
o.dthresh = JsDeleteWaitTimeDefault + 100*time.Millisecond + time.Duration(rand.Int63n(900))*time.Millisecond
// Only stamp config with default sans jitter.
cfg.InactiveThreshold = JsDeleteWaitTimeDefault
} else if cfg.InactiveThreshold > 0 {
// Add in up to 1 sec of jitter if pull mode.
if o.isPullMode() {
o.dthresh = cfg.InactiveThreshold + 100*time.Millisecond + time.Duration(rand.Int63n(900))*time.Millisecond
} else {
o.dthresh = cfg.InactiveThreshold
}
} else if cfg.InactiveThreshold <= 0 {
// We accept InactiveThreshold be set to 0 (for durables)
o.dthresh = 0
}
}
func (o *consumer) consumerAssignment() *consumerAssignment {
o.mu.RLock()
defer o.mu.RUnlock()
return o.ca
}
func (o *consumer) setConsumerAssignment(ca *consumerAssignment) {
o.mu.Lock()
defer o.mu.Unlock()
o.ca = ca
if ca == nil {
return
}
// Set our node.
o.node = ca.Group.node
// Trigger update chan.
select {
case o.uch <- struct{}{}:
default:
}
}
func (o *consumer) updateC() <-chan struct{} {
o.mu.RLock()
defer o.mu.RUnlock()
return o.uch
}
// checkQueueInterest will check on our interest's queue group status.
// Lock should be held.
func (o *consumer) checkQueueInterest() {
if !o.active || o.cfg.DeliverSubject == _EMPTY_ {
return
}
subj := o.dsubj
if subj == _EMPTY_ {
subj = o.cfg.DeliverSubject
}
if rr := o.acc.sl.Match(subj); len(rr.qsubs) > 0 {
// Just grab first
if qsubs := rr.qsubs[0]; len(qsubs) > 0 {
if sub := rr.qsubs[0][0]; len(sub.queue) > 0 {
o.qgroup = string(sub.queue)
}
}
}
}
// clears our node if we have one. When we scale down to 1.
func (o *consumer) clearNode() {
o.mu.Lock()
defer o.mu.Unlock()
if o.node != nil {
o.node.Delete()
o.node = nil
}
}
// Lock should be held.
func (o *consumer) isLeader() bool {
if o.node != nil {
return o.node.Leader()
}
return true
}
func (o *consumer) setLeader(isLeader bool) {
o.mu.RLock()
mset := o.mset
isRunning := o.ackSub != nil
o.mu.RUnlock()
// If we are here we have a change in leader status.
if isLeader {
if mset == nil || isRunning {
return
}
mset.mu.RLock()
s, jsa, stream, lseq := mset.srv, mset.jsa, mset.cfg.Name, mset.lseq
mset.mu.RUnlock()
o.mu.Lock()
o.rdq, o.rdqi = nil, nil
// Restore our saved state. During non-leader status we just update our underlying store.
o.readStoredState(lseq)
// Setup initial num pending.
o.streamNumPending()
// Cleanup lss when we take over in clustered mode.
if o.hasSkipListPending() && o.sseq >= o.lss.resume {
o.lss = nil
}
// Update the group on the our starting sequence if we are starting but we skipped some in the stream.
if o.dseq == 1 && o.sseq > 1 {
o.updateSkipped()
}
// Do info sub.
if o.infoSub == nil && jsa != nil {
isubj := fmt.Sprintf(clusterConsumerInfoT, jsa.acc(), stream, o.name)
// Note below the way we subscribe here is so that we can send requests to ourselves.
o.infoSub, _ = s.systemSubscribe(isubj, _EMPTY_, false, o.sysc, o.handleClusterConsumerInfoRequest)
}
var err error
if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.pushAck); err != nil {