Skip to content

Commit

Permalink
[IMPROVED] Added tests for checking StreamConfig and ConsumerConfig w…
Browse files Browse the repository at this point in the history
…ith server (#1409)

Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio authored Sep 20, 2023
1 parent d1cd52d commit cc6870c
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 1 deletion.
149 changes: 148 additions & 1 deletion jetstream/test/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1468,12 +1468,12 @@ func TestJetStream_DeleteConsumer(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

s, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
Expand Down Expand Up @@ -1650,3 +1650,150 @@ func TestJetStreamTransform(t *testing.T) {
t.Fatalf("the subject of the message doesn't match the expected fromtest.transformed.test: %s", m.Subject())
}
}

func TestStreamConfigMatches(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
cfgSource := jetstream.StreamConfig{
Name: "source",
Description: "desc",
Subjects: []string{"foo.*"},
Retention: jetstream.WorkQueuePolicy,
MaxConsumers: 10,
MaxMsgs: 100,
MaxBytes: 1000,
Discard: jetstream.DiscardNew,
DiscardNewPerSubject: true,
MaxAge: 100 * time.Second,
MaxMsgsPerSubject: 1000,
MaxMsgSize: 10000,
Storage: jetstream.MemoryStorage,
Replicas: 1,
NoAck: true,
Duplicates: 10 * time.Second,
Sealed: false,
DenyDelete: true,
DenyPurge: false,
AllowRollup: true,
Compression: jetstream.S2Compression,
FirstSeq: 5,
SubjectTransform: &jetstream.SubjectTransformConfig{Source: ">", Destination: "transformed.>"},
RePublish: &jetstream.RePublish{
Source: ">",
Destination: "RP.>",
HeadersOnly: true,
},
AllowDirect: true,
ConsumerLimits: jetstream.StreamConsumerLimits{
InactiveThreshold: 10 * time.Second,
MaxAckPending: 500,
},
Metadata: map[string]string{
"foo": "bar",
},
}

s, err := js.CreateStream(context.Background(), cfgSource)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !reflect.DeepEqual(s.CachedInfo().Config, cfgSource) {
t.Fatalf("StreamConfig doesn't match")
}

cfg := jetstream.StreamConfig{
Name: "mirror",
MaxConsumers: 10,
MaxMsgs: 100,
MaxBytes: 1000,
MaxAge: 100 * time.Second,
MaxMsgsPerSubject: 1000,
MaxMsgSize: 10000,
Replicas: 1,
Mirror: &jetstream.StreamSource{
Name: "source",
OptStartSeq: 10,
SubjectTransforms: []jetstream.SubjectTransformConfig{
{Source: ">", Destination: "transformed.>"},
},
},
MirrorDirect: true,
SubjectTransform: &jetstream.SubjectTransformConfig{Source: ">", Destination: "transformed.>"},
}

s, err = js.CreateStream(context.Background(), cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !reflect.DeepEqual(s.CachedInfo().Config, cfg) {
t.Fatalf("StreamConfig doesn't match")
}
}

func TestConsumerConfigMatches(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
s, err := js.CreateStream(context.Background(), jetstream.StreamConfig{
Name: "FOO",
Subjects: []string{"foo.*"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
cfg := jetstream.ConsumerConfig{
Name: "cons",
Durable: "cons",
Description: "test",
DeliverPolicy: jetstream.DeliverByStartSequencePolicy,
OptStartSeq: 5,
AckPolicy: jetstream.AckAllPolicy,
AckWait: 1 * time.Second,
MaxDeliver: 5,
BackOff: []time.Duration{1 * time.Second, 2 * time.Second, 3 * time.Second},
ReplayPolicy: jetstream.ReplayOriginalPolicy,
SampleFrequency: "50%",
MaxWaiting: 100,
MaxAckPending: 1000,
HeadersOnly: true,
MaxRequestBatch: 100,
MaxRequestExpires: 10 * time.Second,
MaxRequestMaxBytes: 1000,
InactiveThreshold: 20 * time.Second,
Replicas: 1,
MemoryStorage: true,
FilterSubjects: []string{"foo.1", "foo.2"},
Metadata: map[string]string{
"foo": "bar",
},
}

c, err := s.CreateConsumer(context.Background(), cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !reflect.DeepEqual(c.CachedInfo().Config, cfg) {
fmt.Printf("%#v\n", c.CachedInfo().Config)
fmt.Printf("%#v\n", cfg)
t.Fatalf("ConsumerConfig doesn't match")
}
}
82 changes: 82 additions & 0 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2714,6 +2714,88 @@ func TestJetStreamManagement(t *testing.T) {
})
}

func TestStreamConfigMatches(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
nc, js := jsClient(t, srv)
defer nc.Close()

cfgSource := nats.StreamConfig{
Name: "source",
Description: "desc",
Subjects: []string{"foo.*"},
Retention: nats.WorkQueuePolicy,
MaxConsumers: 10,
MaxMsgs: 100,
MaxBytes: 1000,
Discard: nats.DiscardNew,
DiscardNewPerSubject: true,
MaxAge: 100 * time.Second,
MaxMsgsPerSubject: 1000,
MaxMsgSize: 10000,
Storage: nats.MemoryStorage,
Replicas: 1,
NoAck: true,
Duplicates: 10 * time.Second,
Sealed: false,
DenyDelete: true,
DenyPurge: false,
AllowRollup: true,
Compression: nats.S2Compression,
FirstSeq: 5,
SubjectTransform: &nats.SubjectTransformConfig{Source: ">", Destination: "transformed.>"},
RePublish: &nats.RePublish{
Source: ">",
Destination: "RP.>",
HeadersOnly: true,
},
AllowDirect: true,
ConsumerLimits: nats.StreamConsumerLimits{
InactiveThreshold: 10 * time.Second,
MaxAckPending: 500,
},
Metadata: map[string]string{
"foo": "bar",
},
}

s, err := js.AddStream(&cfgSource)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !reflect.DeepEqual(s.Config, cfgSource) {
t.Fatalf("StreamConfig doesn't match")
}

cfg := nats.StreamConfig{
Name: "mirror",
MaxConsumers: 10,
MaxMsgs: 100,
MaxBytes: 1000,
MaxAge: 100 * time.Second,
MaxMsgsPerSubject: 1000,
MaxMsgSize: 10000,
Replicas: 1,
Mirror: &nats.StreamSource{
Name: "source",
OptStartSeq: 10,
SubjectTransforms: []nats.SubjectTransformConfig{
{Source: ">", Destination: "transformed.>"},
},
},
MirrorDirect: true,
SubjectTransform: &nats.SubjectTransformConfig{Source: ">", Destination: "transformed.>"},
}

s, err = js.AddStream(&cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !reflect.DeepEqual(s.Config, cfg) {
t.Fatalf("StreamConfig doesn't match")
}
}

func TestStreamLister(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit cc6870c

Please sign in to comment.