diff --git a/example_test.go b/example_test.go index d54c8e2a4..11ebe8f18 100644 --- a/example_test.go +++ b/example_test.go @@ -675,6 +675,11 @@ func ExampleSubOpt() { js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.Durable("FOO"), nats.SkipConsumerLookup()) + + // Use multiple subject filters. + js.Subscribe("", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.Durable("FOO"), nats.ConsumerFilterSubjects("foo", "bar"), nats.BindStream("test_stream")) } func ExampleMaxWait() { diff --git a/go_test.mod b/go_test.mod index 767a2feee..8902c1edd 100644 --- a/go_test.mod +++ b/go_test.mod @@ -4,19 +4,19 @@ go 1.19 require ( github.com/golang/protobuf v1.4.2 - github.com/klauspost/compress v1.16.7 - github.com/nats-io/nats-server/v2 v2.9.22 - github.com/nats-io/nkeys v0.4.4 + github.com/klauspost/compress v1.17.0 + github.com/nats-io/nats-server/v2 v2.10.0 + github.com/nats-io/nkeys v0.4.5 github.com/nats-io/nuid v1.0.1 go.uber.org/goleak v1.2.1 - golang.org/x/text v0.12.0 + golang.org/x/text v0.13.0 google.golang.org/protobuf v1.23.0 ) require ( github.com/minio/highwayhash v1.0.2 // indirect - github.com/nats-io/jwt/v2 v2.5.0 // indirect - golang.org/x/crypto v0.12.0 // indirect - golang.org/x/sys v0.11.0 // indirect + github.com/nats-io/jwt/v2 v2.5.2 // indirect + golang.org/x/crypto v0.13.0 // indirect + golang.org/x/sys v0.12.0 // indirect golang.org/x/time v0.3.0 // indirect ) diff --git a/go_test.sum b/go_test.sum index 5739e72be..ce4ba9205 100644 --- a/go_test.sum +++ b/go_test.sum @@ -10,29 +10,30 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= -github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= +github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= -github.com/nats-io/jwt/v2 v2.5.0 h1:WQQ40AAlqqfx+f6ku+i0pOVm+ASirD4fUh+oQsiE9Ak= -github.com/nats-io/jwt/v2 v2.5.0/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= -github.com/nats-io/nats-server/v2 v2.9.22 h1:rzl88pqWFFrU4G00ed+JnY+uGHSLZ+3jrxDnJxzKwGA= -github.com/nats-io/nats-server/v2 v2.9.22/go.mod h1:wEjrEy9vnqIGE4Pqz4/c75v9Pmaq7My2IgFmnykc4C0= -github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= -github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= +github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU= +github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= +github.com/nats-io/nats-server/v2 v2.10.0 h1:rcU++Hzo+wARxtJugrV3J5z5iGdHeVG8tT8Chb3bKDg= +github.com/nats-io/nats-server/v2 v2.10.0/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto= +github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= +github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= -golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= -golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= +golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck= +golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= -golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= -golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= diff --git a/jetstream/README.md b/jetstream/README.md index bd1be570c..40b5962e9 100644 --- a/jetstream/README.md +++ b/jetstream/README.md @@ -253,16 +253,33 @@ CRUD operations on consumers can be achieved on 2 levels: js, _ := jetstream.New(nc) // create a consumer (this is an idempotent operation) -cons, _ := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{ +// an error will be returned if consumer already exists and has different configuration. +cons, _ := js.CreateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{ Durable: "foo", AckPolicy: jetstream.AckExplicitPolicy, }) // create an ephemeral pull consumer by not providing `Durable` -ephemeral, _ := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{ +ephemeral, _ := js.CreateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{ AckPolicy: jetstream.AckExplicitPolicy, }) + +// consumer can also be created using CreateOrUpdateConsumer +// this method will either create a consumer if it does not exist +// or update existing consumer (if possible) +cons2 := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{ + Name: "bar", +}) + +// consumers can be updated +// an error will be returned if consumer with given name does not exists +// or an illegal property is to be updated (e.g. AckPolicy) +updated, _ := js.UpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{ + AckPolicy: jetstream.AckExplicitPolicy, + Description: "updated consumer" +}) + // get consumer handle cons, _ = js.Consumer(ctx, "ORDERS", "foo") @@ -280,7 +297,7 @@ js, _ := jetstream.New(nc) stream, _ := js.Stream(ctx, "ORDERS") // create consumer -cons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ +cons, _ := stream.CreateConsumer(ctx, jetstream.ConsumerConfig{ Durable: "foo", AckPolicy: jetstream.AckExplicitPolicy, }) diff --git a/jetstream/consumer.go b/jetstream/consumer.go index 05e0d6c28..1fb6fb5c6 100644 --- a/jetstream/consumer.go +++ b/jetstream/consumer.go @@ -51,6 +51,12 @@ type ( // CachedInfo returns [*ConsumerInfo] cached on a consumer struct CachedInfo() *ConsumerInfo } + + createConsumerRequest struct { + Stream string `json:"stream_name"` + Config *ConsumerConfig `json:"config"` + Action string `json:"action"` + } ) // Info returns [ConsumerInfo] for a given consumer @@ -84,7 +90,7 @@ func (p *pullConsumer) CachedInfo() *ConsumerInfo { return p.info } -func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig) (Consumer, error) { +func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig, action string) (Consumer, error) { ctx, cancel := wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() @@ -92,6 +98,7 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu req := createConsumerRequest{ Stream: stream, Config: &cfg, + Action: action, } reqJSON, err := json.Marshal(req) if err != nil { @@ -111,7 +118,7 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu } var ccSubj string - if cfg.FilterSubject != "" { + if cfg.FilterSubject != "" && len(cfg.FilterSubjects) == 0 { ccSubj = apiSubj(js.apiPrefix, fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject)) } else { ccSubj = apiSubj(js.apiPrefix, fmt.Sprintf(apiConsumerCreateT, stream, consumerName)) @@ -128,6 +135,11 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu return nil, resp.Error } + // check whether multiple filter subjects (if used) are reflected in the returned ConsumerInfo + if len(cfg.FilterSubjects) != 0 && len(resp.Config.FilterSubjects) == 0 { + return nil, ErrConsumerMultipleFilterSubjectsNotSupported + } + return &pullConsumer{ jetStream: js, stream: stream, @@ -138,6 +150,12 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu }, nil } +const ( + consumerActionCreate = "create" + consumerActionUpdate = "update" + consumerActionCreateOrUpdate = "" +) + func generateConsName() string { name := nuid.Next() sha := sha256.New() diff --git a/jetstream/consumer_config.go b/jetstream/consumer_config.go index 41764baa2..6729b47fd 100644 --- a/jetstream/consumer_config.go +++ b/jetstream/consumer_config.go @@ -69,6 +69,11 @@ type ( // NOTE: FilterSubjects requires nats-server v2.10.0+ FilterSubjects []string `json:"filter_subjects,omitempty"` + + // Metadata is additional metadata for the Consumer. + // Keys starting with `_nats` are reserved. + // NOTE: Metadata requires nats-server v2.10.0+ + Metadata map[string]string `json:"metadata,omitempty"` } OrderedConsumerConfig struct { diff --git a/jetstream/errors.go b/jetstream/errors.go index ffefea540..e8d1a774d 100644 --- a/jetstream/errors.go +++ b/jetstream/errors.go @@ -49,10 +49,15 @@ const ( JSErrCodeStreamNotFound ErrorCode = 10059 JSErrCodeStreamNameInUse ErrorCode = 10058 - JSErrCodeConsumerCreate ErrorCode = 10012 - JSErrCodeConsumerNotFound ErrorCode = 10014 - JSErrCodeConsumerNameExists ErrorCode = 10013 - JSErrCodeConsumerAlreadyExists ErrorCode = 10105 + JSErrCodeConsumerCreate ErrorCode = 10012 + JSErrCodeConsumerNotFound ErrorCode = 10014 + JSErrCodeConsumerNameExists ErrorCode = 10013 + JSErrCodeConsumerAlreadyExists ErrorCode = 10105 + JSErrCodeConsumerExists ErrorCode = 10148 + JSErrCodeDuplicateFilterSubjects ErrorCode = 10136 + JSErrCodeOverlappingFilterSubjects ErrorCode = 10138 + JSErrCodeConsumerEmptyFilter ErrorCode = 10139 + JSErrCodeConsumerDoesNotExist ErrorCode = 10149 JSErrCodeMessageNotFound ErrorCode = 10037 @@ -76,9 +81,35 @@ var ( // ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration. ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}} + // ErrStreamSubjectTransformNotSupported is returned when the connected nats-server version does not support setting + // the stream subject transform. If this error is returned when executing CreateStream(), the stream with invalid + // configuration was already created in the server. + ErrStreamSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"} + + // ErrStreamSourceSubjectTransformNotSupported is returned when the connected nats-server version does not support setting + // the stream source subject transform. If this error is returned when executing CreateStream(), the stream with invalid + // configuration was already created in the server. + ErrStreamSourceSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"} + + // ErrStreamSourceNotSupported is returned when the connected nats-server version does not support setting + // the stream sources. If this error is returned when executing CreateStream(), the stream with invalid + // configuration was already created in the server. + ErrStreamSourceNotSupported JetStreamError = &jsError{message: "stream sourcing is not supported by nats-server"} + + // ErrStreamSourceMultipleFilterSubjectsNotSupported is returned when the connected nats-server version does not support setting + // the stream sources. If this error is returned when executing CreateStream(), the stream with invalid + // configuration was already created in the server. + ErrStreamSourceMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "stream sourcing with multiple subject filters not supported by nats-server"} + // ErrConsumerNotFound is an error returned when consumer with given name does not exist. ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}} + // ErrConsumerExists is returned when attempting to create a consumer with CreateConsumer but a consumer with given name already exists. + ErrConsumerExists JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerExists, Description: "consumer already exists", Code: 400}} + + // ErrConsumerNameExists is returned when attempting to update a consumer with UpdateConsumer but a consumer with given name does not exist. + ErrConsumerDoesNotExist JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerDoesNotExist, Description: "consumer does not exist", Code: 400}} + // ErrMsgNotFound is returned when message with provided sequence number does not exist. ErrMsgNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMessageNotFound, Description: "message not found", Code: 404}} @@ -88,8 +119,22 @@ var ( // ErrConsumerCreate is returned when nats-server reports error when creating consumer (e.g. illegal update). ErrConsumerCreate JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerCreate, Description: "could not create consumer", Code: 500}} + // ErrDuplicateFilterSubjects is returned when both FilterSubject and FilterSubjects are specified when creating consumer. + ErrDuplicateFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeDuplicateFilterSubjects, Description: "consumer cannot have both FilterSubject and FilterSubjects specified", Code: 500}} + + // ErrDuplicateFilterSubjects is returned when filter subjects overlap when creating consumer. + ErrOverlappingFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeOverlappingFilterSubjects, Description: "consumer subject filters cannot overlap", Code: 500}} + + // ErrEmptyFilter is returned when a filter in FilterSubjects is empty. + ErrEmptyFilter JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerEmptyFilter, Description: "consumer filter in FilterSubjects cannot be empty", Code: 500}} + // Client errors + // ErrConsumerMultipleFilterSubjectsNotSupported is returned when the connected nats-server version does not support setting + // multiple filter subjects with filter_subjects field. If this error is returned when executing AddConsumer(), the consumer with invalid + // configuration was already created in the server. + ErrConsumerMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "multiple consumer filter subjects not supported by nats-server"} + // ErrConsumerNotFound is an error returned when consumer with given name does not exist. ErrConsumerNameAlreadyInUse JetStreamError = &jsError{message: "consumer name already in use"} diff --git a/jetstream/jetstream.go b/jetstream/jetstream.go index 1ff86c336..eadd3e69c 100644 --- a/jetstream/jetstream.go +++ b/jetstream/jetstream.go @@ -86,6 +86,14 @@ type ( // If consumer already exists, it will be updated (if possible). // Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages) CreateOrUpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) + // CreateConsumer creates a consumer on a given stream with given config. + // If consumer already exists, ErrConsumerExists is returned. + // Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages) + CreateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) + // UpdateConsumer updates an existing consumer. + // If consumer does not exist, ErrConsumerDoesNotExist is returned. + // Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages) + UpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) // OrderedConsumer returns an OrderedConsumer instance. // OrderedConsumer allows fetching messages from a stream (just like standard consumer), // for in order delivery of messages. Underlying consumer is re-created when necessary, @@ -350,6 +358,22 @@ func (js *jetStream) CreateStream(ctx context.Context, cfg StreamConfig) (Stream return nil, resp.Error } + // check that input subject transform (if used) is reflected in the returned StreamInfo + if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil { + return nil, ErrStreamSubjectTransformNotSupported + } + + if len(cfg.Sources) != 0 { + if len(cfg.Sources) != len(resp.Sources) { + return nil, ErrStreamSourceNotSupported + } + for i := range cfg.Sources { + if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 { + return nil, ErrStreamSourceMultipleFilterSubjectsNotSupported + } + } + } + return &stream{ jetStream: js, name: cfg.Name, @@ -413,6 +437,22 @@ func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream return nil, resp.Error } + // check that input subject transform (if used) is reflected in the returned StreamInfo + if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil { + return nil, ErrStreamSubjectTransformNotSupported + } + + if len(cfg.Sources) != 0 { + if len(cfg.Sources) != len(resp.Sources) { + return nil, ErrStreamSourceNotSupported + } + for i := range cfg.Sources { + if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 { + return nil, ErrStreamSourceMultipleFilterSubjectsNotSupported + } + } + } + return &stream{ jetStream: js, name: cfg.Name, @@ -480,7 +520,21 @@ func (js *jetStream) CreateOrUpdateConsumer(ctx context.Context, stream string, if err := validateStreamName(stream); err != nil { return nil, err } - return upsertConsumer(ctx, js, stream, cfg) + return upsertConsumer(ctx, js, stream, cfg, consumerActionCreateOrUpdate) +} + +func (js *jetStream) CreateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) { + if err := validateStreamName(stream); err != nil { + return nil, err + } + return upsertConsumer(ctx, js, stream, cfg, consumerActionCreate) +} + +func (js *jetStream) UpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) { + if err := validateStreamName(stream); err != nil { + return nil, err + } + return upsertConsumer(ctx, js, stream, cfg, consumerActionUpdate) } func (js *jetStream) OrderedConsumer(ctx context.Context, stream string, cfg OrderedConsumerConfig) (Consumer, error) { diff --git a/jetstream/kv.go b/jetstream/kv.go index 3546f0176..c16c57146 100644 --- a/jetstream/kv.go +++ b/jetstream/kv.go @@ -351,12 +351,20 @@ func (js *jetStream) CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (Ke } else if len(cfg.Sources) > 0 { // For now we do not allow direct subjects for sources. If that is desired a user could use stream API directly. for _, ss := range cfg.Sources { - if !strings.HasPrefix(ss.Name, kvBucketNamePre) { - ss = ss.copy() + var sourceBucketName string + if strings.HasPrefix(ss.Name, kvBucketNamePre) { + sourceBucketName = ss.Name[len(kvBucketNamePre):] + } else { + sourceBucketName = ss.Name ss.Name = fmt.Sprintf(kvBucketNameTmpl, ss.Name) } + + if ss.External == nil || sourceBucketName != cfg.Bucket { + ss.SubjectTransforms = []SubjectTransformConfig{{Source: fmt.Sprintf(kvSubjectsTmpl, sourceBucketName), Destination: fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}} + } scfg.Sources = append(scfg.Sources, ss) } + scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)} } else { scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)} } diff --git a/jetstream/stream.go b/jetstream/stream.go index dfb52442e..e864cf164 100644 --- a/jetstream/stream.go +++ b/jetstream/stream.go @@ -56,6 +56,16 @@ type ( // Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages). CreateOrUpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) + // CreateConsumer creates a consumer on a given stream with given config. + // If consumer already exists, an ErrConsumerExists is returned. + // Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages). + CreateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) + + // UpdateConsumer updates an existing consumer with given config. + // If consumer does not exist, an ErrConsumerDoesNotExist is returned. + // Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages). + UpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) + // OrderedConsumer returns an OrderedConsumer instance. // OrderedConsumer allows fetching messages from a stream (just like standard consumer), // for in order delivery of messages. Underlying consumer is re-created when necessary, @@ -74,6 +84,7 @@ type ( // ConsumerNames returns a ConsumerNameLister enabling iterating over a channel of consumer names ConsumerNames(context.Context) ConsumerNameLister } + RawStreamMsg struct { Subject string Sequence uint64 @@ -100,11 +111,6 @@ type ( *ConsumerInfo } - createConsumerRequest struct { - Stream string `json:"stream_name"` - Config *ConsumerConfig `json:"config"` - } - StreamPurgeOpt func(*StreamPurgeRequest) error StreamPurgeRequest struct { @@ -194,7 +200,15 @@ type ( ) func (s *stream) CreateOrUpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) { - return upsertConsumer(ctx, s.jetStream, s.name, cfg) + return upsertConsumer(ctx, s.jetStream, s.name, cfg, consumerActionCreateOrUpdate) +} + +func (s *stream) CreateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) { + return upsertConsumer(ctx, s.jetStream, s.name, cfg, consumerActionCreate) +} + +func (s *stream) UpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) { + return upsertConsumer(ctx, s.jetStream, s.name, cfg, consumerActionUpdate) } func (s *stream) OrderedConsumer(ctx context.Context, cfg OrderedConsumerConfig) (Consumer, error) { diff --git a/jetstream/stream_config.go b/jetstream/stream_config.go index 9b5461df0..bb4fd1060 100644 --- a/jetstream/stream_config.go +++ b/jetstream/stream_config.go @@ -35,30 +35,35 @@ type ( } StreamConfig struct { - Name string `json:"name"` - Description string `json:"description,omitempty"` - Subjects []string `json:"subjects,omitempty"` - Retention RetentionPolicy `json:"retention"` - MaxConsumers int `json:"max_consumers"` - MaxMsgs int64 `json:"max_msgs"` - MaxBytes int64 `json:"max_bytes"` - Discard DiscardPolicy `json:"discard"` - DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"` - MaxAge time.Duration `json:"max_age"` - MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"` - MaxMsgSize int32 `json:"max_msg_size,omitempty"` - Storage StorageType `json:"storage"` - Replicas int `json:"num_replicas"` - NoAck bool `json:"no_ack,omitempty"` - Template string `json:"template_owner,omitempty"` - Duplicates time.Duration `json:"duplicate_window,omitempty"` - Placement *Placement `json:"placement,omitempty"` - Mirror *StreamSource `json:"mirror,omitempty"` - Sources []*StreamSource `json:"sources,omitempty"` - Sealed bool `json:"sealed,omitempty"` - DenyDelete bool `json:"deny_delete,omitempty"` - DenyPurge bool `json:"deny_purge,omitempty"` - AllowRollup bool `json:"allow_rollup_hdrs,omitempty"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + Subjects []string `json:"subjects,omitempty"` + Retention RetentionPolicy `json:"retention"` + MaxConsumers int `json:"max_consumers"` + MaxMsgs int64 `json:"max_msgs"` + MaxBytes int64 `json:"max_bytes"` + Discard DiscardPolicy `json:"discard"` + DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"` + MaxAge time.Duration `json:"max_age"` + MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"` + MaxMsgSize int32 `json:"max_msg_size,omitempty"` + Storage StorageType `json:"storage"` + Replicas int `json:"num_replicas"` + NoAck bool `json:"no_ack,omitempty"` + Template string `json:"template_owner,omitempty"` + Duplicates time.Duration `json:"duplicate_window,omitempty"` + Placement *Placement `json:"placement,omitempty"` + Mirror *StreamSource `json:"mirror,omitempty"` + Sources []*StreamSource `json:"sources,omitempty"` + Sealed bool `json:"sealed,omitempty"` + DenyDelete bool `json:"deny_delete,omitempty"` + DenyPurge bool `json:"deny_purge,omitempty"` + AllowRollup bool `json:"allow_rollup_hdrs,omitempty"` + Compression StoreCompression `json:"compression"` + FirstSeq uint64 `json:"first_seq,omitempty"` + + // Allow applying a subject transform to incoming messages before doing anything else + SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"` // Allow republish of the message after being sequenced and stored. RePublish *RePublish `json:"republish,omitempty"` @@ -67,13 +72,23 @@ type ( AllowDirect bool `json:"allow_direct"` // Allow higher performance and unified direct access for mirrors as well. MirrorDirect bool `json:"mirror_direct"` + + // Limits for consumers on this stream. + ConsumerLimits StreamConsumerLimits `json:"consumer_limits,omitempty"` + + // Metadata is additional metadata for the Stream. + // Keys starting with `_nats` are reserved. + // NOTE: Metadata requires nats-server v2.10.0+ + Metadata map[string]string `json:"metadata,omitempty"` } // StreamSourceInfo shows information about an upstream stream source. StreamSourceInfo struct { - Name string `json:"name"` - Lag uint64 `json:"lag"` - Active time.Duration `json:"active"` + Name string `json:"name"` + Lag uint64 `json:"lag"` + Active time.Duration `json:"active"` + FilterSubject string `json:"filter_subject,omitempty"` + SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` } // StreamState is information about the given stream. @@ -109,6 +124,12 @@ type ( Lag uint64 `json:"lag,omitempty"` } + // SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received + SubjectTransformConfig struct { + Source string `json:"src"` + Destination string `json:"dest"` + } + // RePublish is for republishing messages once committed to a stream. The original // subject is remapped from the subject pattern to the destination pattern. RePublish struct { @@ -125,12 +146,13 @@ type ( // StreamSource dictates how streams can source from other streams. StreamSource struct { - Name string `json:"name"` - OptStartSeq uint64 `json:"opt_start_seq,omitempty"` - OptStartTime *time.Time `json:"opt_start_time,omitempty"` - FilterSubject string `json:"filter_subject,omitempty"` - External *ExternalStream `json:"external,omitempty"` - Domain string `json:"-"` + Name string `json:"name"` + OptStartSeq uint64 `json:"opt_start_seq,omitempty"` + OptStartTime *time.Time `json:"opt_start_time,omitempty"` + FilterSubject string `json:"filter_subject,omitempty"` + SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` + External *ExternalStream `json:"external,omitempty"` + Domain string `json:"-"` } // ExternalStream allows you to qualify access to a stream source in another @@ -140,6 +162,13 @@ type ( DeliverPrefix string `json:"deliver"` } + // StreamConsumerLimits are the limits for a consumer on a stream. + // These can be overridden on a per consumer basis. + StreamConsumerLimits struct { + InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"` + MaxAckPending int `json:"max_ack_pending,omitempty"` + } + // DiscardPolicy determines how to proceed when limits of messages or bytes are // reached. DiscardPolicy int @@ -149,6 +178,9 @@ type ( // StorageType determines how messages are stored for retention. StorageType int + + // StoreCompression determines how messages are compressed. + StoreCompression uint8 ) const ( @@ -299,3 +331,48 @@ func (st *StorageType) UnmarshalJSON(data []byte) error { func jsonString(s string) string { return "\"" + s + "\"" } + +const ( + NoCompression StoreCompression = iota + S2Compression +) + +func (alg StoreCompression) String() string { + switch alg { + case NoCompression: + return "None" + case S2Compression: + return "S2" + default: + return "Unknown StoreCompression" + } +} + +func (alg StoreCompression) MarshalJSON() ([]byte, error) { + var str string + switch alg { + case S2Compression: + str = "s2" + case NoCompression: + str = "none" + default: + return nil, fmt.Errorf("unknown compression algorithm") + } + return json.Marshal(str) +} + +func (alg *StoreCompression) UnmarshalJSON(b []byte) error { + var str string + if err := json.Unmarshal(b, &str); err != nil { + return err + } + switch str { + case "s2": + *alg = S2Compression + case "none": + *alg = NoCompression + default: + return fmt.Errorf("unknown compression algorithm") + } + return nil +} diff --git a/jetstream/test/jetstream_test.go b/jetstream/test/jetstream_test.go index c41312c26..d9e5106ae 100644 --- a/jetstream/test/jetstream_test.go +++ b/jetstream/test/jetstream_test.go @@ -205,6 +205,7 @@ func TestCreateStream(t *testing.T) { name string stream string subject string + metadata map[string]string timeout time.Duration withError error }{ @@ -214,10 +215,30 @@ func TestCreateStream(t *testing.T) { timeout: 10 * time.Second, subject: "FOO.123", }, + { + name: "create stream with metadata", + stream: "foo_meta", + metadata: map[string]string{ + "foo": "bar", + "name": "test", + }, + timeout: 10 * time.Second, + subject: "FOO.meta", + }, + { + name: "create stream with metadata, reserved prefix", + stream: "foo_meta1", + metadata: map[string]string{ + "foo": "bar", + "_nats_version": "2.10.0", + }, + timeout: 10 * time.Second, + subject: "FOO.meta1", + }, { name: "with empty context", - stream: "foo", - subject: "FOO.123", + stream: "foo_empty_ctx", + subject: "FOO.ctx", }, { name: "invalid stream name", @@ -270,7 +291,7 @@ func TestCreateStream(t *testing.T) { ctx, cancel = context.WithTimeout(context.Background(), test.timeout) defer cancel() } - _, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: test.stream, Subjects: []string{test.subject}}) + s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: test.stream, Subjects: []string{test.subject}, Metadata: test.metadata}) if test.withError != nil { if !errors.Is(err, test.withError) { t.Fatalf("Expected error: %v; got: %v", test.withError, err) @@ -280,6 +301,9 @@ func TestCreateStream(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + if !reflect.DeepEqual(s.CachedInfo().Config.Metadata, test.metadata) { + t.Fatalf("Invalid metadata; want: %v, got: %v", test.metadata, s.CachedInfo().Config.Metadata) + } }) } } @@ -436,6 +460,7 @@ func TestUpdateStream(t *testing.T) { name string stream string subject string + metadata map[string]string timeout time.Duration withError error }{ @@ -450,6 +475,16 @@ func TestUpdateStream(t *testing.T) { stream: "foo", subject: "FOO.123", }, + { + name: "update existing, add metadata", + stream: "foo", + subject: "BAR.123", + metadata: map[string]string{ + "foo": "bar", + "name": "test", + }, + timeout: 10 * time.Second, + }, { name: "invalid stream name", stream: "foo.123", @@ -504,7 +539,7 @@ func TestUpdateStream(t *testing.T) { ctx, cancel = context.WithTimeout(context.Background(), test.timeout) defer cancel() } - s, err := js.UpdateStream(ctx, jetstream.StreamConfig{Name: test.stream, Subjects: []string{test.subject}}) + s, err := js.UpdateStream(ctx, jetstream.StreamConfig{Name: test.stream, Subjects: []string{test.subject}, Metadata: test.metadata}) if test.withError != nil { if !errors.Is(err, test.withError) { t.Fatalf("Expected error: %v; got: %v", test.withError, err) @@ -521,6 +556,9 @@ func TestUpdateStream(t *testing.T) { if len(info.Config.Subjects) != 1 || info.Config.Subjects[0] != test.subject { t.Fatalf("Invalid stream subjects after update: %v", info.Config.Subjects) } + if !reflect.DeepEqual(info.Config.Metadata, test.metadata) { + t.Fatalf("Invalid metadata; want: %v, got: %v", test.metadata, info.Config.Metadata) + } }) } } @@ -1090,6 +1128,187 @@ func TestJetStream_CreateOrUpdateConsumer(t *testing.T) { } } +func TestJetStream_CreateConsumer(t *testing.T) { + tests := []struct { + name string + consumerConfig jetstream.ConsumerConfig + shouldCreate bool + stream string + withError error + }{ + { + name: "create consumer", + consumerConfig: jetstream.ConsumerConfig{Durable: "dur"}, + stream: "foo", + shouldCreate: true, + }, + { + name: "consumer already exists, error", + consumerConfig: jetstream.ConsumerConfig{Durable: "dur", Description: "test consumer"}, + stream: "foo", + withError: jetstream.ErrConsumerExists, + }, + { + name: "stream does not exist", + stream: "abc", + consumerConfig: jetstream.ConsumerConfig{Durable: "dur"}, + withError: jetstream.ErrStreamNotFound, + }, + } + + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var sub *nats.Subscription + if test.consumerConfig.FilterSubject != "" { + sub, err = nc.SubscribeSync(fmt.Sprintf("$JS.API.CONSUMER.CREATE.foo.*.%s", test.consumerConfig.FilterSubject)) + } else { + sub, err = nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo.*") + } + c, err := js.CreateConsumer(ctx, test.stream, test.consumerConfig) + if test.withError != nil { + if !errors.Is(err, test.withError) { + t.Fatalf("Expected error: %v; got: %v", test.withError, err) + } + return + } + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if test.shouldCreate { + if _, err := sub.NextMsgWithContext(ctx); err != nil { + t.Fatalf("Expected request on %s; got %s", sub.Subject, err) + } + } + ci, err := s.Consumer(ctx, c.CachedInfo().Name) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if ci.CachedInfo().Config.AckPolicy != test.consumerConfig.AckPolicy { + t.Fatalf("Invalid ack policy; want: %s; got: %s", test.consumerConfig.AckPolicy, ci.CachedInfo().Config.AckPolicy) + } + if !reflect.DeepEqual(test.consumerConfig.FilterSubjects, ci.CachedInfo().Config.FilterSubjects) { + t.Fatalf("Invalid filter subjects; want: %v; got: %v", test.consumerConfig.FilterSubjects, ci.CachedInfo().Config.FilterSubjects) + } + }) + } +} + +func TestJetStream_UpdateConsumer(t *testing.T) { + tests := []struct { + name string + consumerConfig jetstream.ConsumerConfig + shouldUpdate bool + stream string + withError error + }{ + { + name: "update consumer", + consumerConfig: jetstream.ConsumerConfig{Name: "testcons", Description: "updated consumer"}, + stream: "foo", + shouldUpdate: true, + }, + { + name: "illegal update", + consumerConfig: jetstream.ConsumerConfig{Name: "testcons", AckPolicy: jetstream.AckNonePolicy}, + stream: "foo", + withError: jetstream.ErrConsumerCreate, + }, + { + name: "consumer does not exist", + consumerConfig: jetstream.ConsumerConfig{Name: "abc"}, + stream: "foo", + withError: jetstream.ErrConsumerDoesNotExist, + }, + { + name: "stream does not exist", + consumerConfig: jetstream.ConsumerConfig{Name: "testcons"}, + stream: "abc", + withError: jetstream.ErrStreamNotFound, + }, + } + + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = s.CreateConsumer(ctx, jetstream.ConsumerConfig{Name: "testcons"}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var sub *nats.Subscription + if test.consumerConfig.FilterSubject != "" { + sub, err = nc.SubscribeSync(fmt.Sprintf("$JS.API.CONSUMER.CREATE.foo.*.%s", test.consumerConfig.FilterSubject)) + } else { + sub, err = nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo.*") + } + c, err := js.UpdateConsumer(ctx, test.stream, test.consumerConfig) + if test.withError != nil { + if !errors.Is(err, test.withError) { + t.Fatalf("Expected error: %v; got: %v", test.withError, err) + } + return + } + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if test.shouldUpdate { + if _, err := sub.NextMsgWithContext(ctx); err != nil { + t.Fatalf("Expected request on %s; got %s", sub.Subject, err) + } + } + ci, err := s.Consumer(ctx, c.CachedInfo().Name) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if ci.CachedInfo().Config.AckPolicy != test.consumerConfig.AckPolicy { + t.Fatalf("Invalid ack policy; want: %s; got: %s", test.consumerConfig.AckPolicy, ci.CachedInfo().Config.AckPolicy) + } + if !reflect.DeepEqual(test.consumerConfig.FilterSubjects, ci.CachedInfo().Config.FilterSubjects) { + t.Fatalf("Invalid filter subjects; want: %v; got: %v", test.consumerConfig.FilterSubjects, ci.CachedInfo().Config.FilterSubjects) + } + }) + } +} + func TestJetStream_Consumer(t *testing.T) { tests := []struct { name string @@ -1249,12 +1468,12 @@ func TestJetStream_DeleteConsumer(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + defer nc.Close() js, err := jetstream.New(nc) if err != nil { t.Fatalf("Unexpected error: %v", err) } - defer nc.Close() s, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) if err != nil { @@ -1388,3 +1607,193 @@ func TestStreamNameBySubject(t *testing.T) { }) } } + +func TestJetStreamTransform(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + nc, js := jsClient(t, s) + defer nc.Close() + + ctx := context.Background() + _, err := js.CreateStream(ctx, jetstream.StreamConfig{ + Name: "ORIGIN", + Subjects: []string{"test"}, + SubjectTransform: &jetstream.SubjectTransformConfig{Source: ">", Destination: "transformed.>"}, + Storage: jetstream.MemoryStorage, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + err = nc.Publish("test", []byte("1")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + sourcingStream, err := js.CreateStream(ctx, jetstream.StreamConfig{ + Subjects: []string{}, + Name: "SOURCING", + Sources: []*jetstream.StreamSource{{Name: "ORIGIN", SubjectTransforms: []jetstream.SubjectTransformConfig{{Source: ">", Destination: "fromtest.>"}}}}, + Storage: jetstream.MemoryStorage, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + cons, err := sourcingStream.CreateConsumer(ctx, jetstream.ConsumerConfig{FilterSubject: "fromtest.>", MemoryStorage: true}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + m, err := cons.Next() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if m.Subject() != "fromtest.transformed.test" { + t.Fatalf("the subject of the message doesn't match the expected fromtest.transformed.test: %s", m.Subject()) + } +} + +func TestStreamConfigMatches(t *testing.T) { + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + cfgSource := jetstream.StreamConfig{ + Name: "source", + Description: "desc", + Subjects: []string{"foo.*"}, + Retention: jetstream.WorkQueuePolicy, + MaxConsumers: 10, + MaxMsgs: 100, + MaxBytes: 1000, + Discard: jetstream.DiscardNew, + DiscardNewPerSubject: true, + MaxAge: 100 * time.Second, + MaxMsgsPerSubject: 1000, + MaxMsgSize: 10000, + Storage: jetstream.MemoryStorage, + Replicas: 1, + NoAck: true, + Duplicates: 10 * time.Second, + Sealed: false, + DenyDelete: true, + DenyPurge: false, + AllowRollup: true, + Compression: jetstream.S2Compression, + FirstSeq: 5, + SubjectTransform: &jetstream.SubjectTransformConfig{Source: ">", Destination: "transformed.>"}, + RePublish: &jetstream.RePublish{ + Source: ">", + Destination: "RP.>", + HeadersOnly: true, + }, + AllowDirect: true, + ConsumerLimits: jetstream.StreamConsumerLimits{ + InactiveThreshold: 10 * time.Second, + MaxAckPending: 500, + }, + Metadata: map[string]string{ + "foo": "bar", + }, + } + + s, err := js.CreateStream(context.Background(), cfgSource) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(s.CachedInfo().Config, cfgSource) { + t.Fatalf("StreamConfig doesn't match") + } + + cfg := jetstream.StreamConfig{ + Name: "mirror", + MaxConsumers: 10, + MaxMsgs: 100, + MaxBytes: 1000, + MaxAge: 100 * time.Second, + MaxMsgsPerSubject: 1000, + MaxMsgSize: 10000, + Replicas: 1, + Mirror: &jetstream.StreamSource{ + Name: "source", + OptStartSeq: 10, + SubjectTransforms: []jetstream.SubjectTransformConfig{ + {Source: ">", Destination: "transformed.>"}, + }, + }, + MirrorDirect: true, + SubjectTransform: &jetstream.SubjectTransformConfig{Source: ">", Destination: "transformed.>"}, + } + + s, err = js.CreateStream(context.Background(), cfg) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(s.CachedInfo().Config, cfg) { + t.Fatalf("StreamConfig doesn't match") + } +} + +func TestConsumerConfigMatches(t *testing.T) { + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + s, err := js.CreateStream(context.Background(), jetstream.StreamConfig{ + Name: "FOO", + Subjects: []string{"foo.*"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + cfg := jetstream.ConsumerConfig{ + Name: "cons", + Durable: "cons", + Description: "test", + DeliverPolicy: jetstream.DeliverByStartSequencePolicy, + OptStartSeq: 5, + AckPolicy: jetstream.AckAllPolicy, + AckWait: 1 * time.Second, + MaxDeliver: 5, + BackOff: []time.Duration{1 * time.Second, 2 * time.Second, 3 * time.Second}, + ReplayPolicy: jetstream.ReplayOriginalPolicy, + SampleFrequency: "50%", + MaxWaiting: 100, + MaxAckPending: 1000, + HeadersOnly: true, + MaxRequestBatch: 100, + MaxRequestExpires: 10 * time.Second, + MaxRequestMaxBytes: 1000, + InactiveThreshold: 20 * time.Second, + Replicas: 1, + MemoryStorage: true, + FilterSubjects: []string{"foo.1", "foo.2"}, + Metadata: map[string]string{ + "foo": "bar", + }, + } + + c, err := s.CreateConsumer(context.Background(), cfg) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(c.CachedInfo().Config, cfg) { + fmt.Printf("%#v\n", c.CachedInfo().Config) + fmt.Printf("%#v\n", cfg) + t.Fatalf("ConsumerConfig doesn't match") + } +} diff --git a/jetstream/test/stream_test.go b/jetstream/test/stream_test.go index b90a79b7a..bb85f67d9 100644 --- a/jetstream/test/stream_test.go +++ b/jetstream/test/stream_test.go @@ -53,6 +53,21 @@ func TestCreateOrUpdateConsumer(t *testing.T) { consumerConfig: jetstream.ConsumerConfig{FilterSubjects: []string{"FOO.A", "FOO.B"}}, shouldCreate: true, }, + { + name: "with multiple filter subjects, overlapping subjects", + consumerConfig: jetstream.ConsumerConfig{FilterSubjects: []string{"FOO.*", "FOO.B"}}, + withError: jetstream.ErrOverlappingFilterSubjects, + }, + { + name: "with multiple filter subjects and filter subject provided", + consumerConfig: jetstream.ConsumerConfig{FilterSubjects: []string{"FOO.A", "FOO.B"}, FilterSubject: "FOO.C"}, + withError: jetstream.ErrDuplicateFilterSubjects, + }, + { + name: "with empty subject in FilterSubjects", + consumerConfig: jetstream.ConsumerConfig{FilterSubjects: []string{"FOO.A", ""}}, + withError: jetstream.ErrEmptyFilter, + }, { name: "consumer already exists, update", consumerConfig: jetstream.ConsumerConfig{Durable: "dur", Description: "test consumer"}, @@ -119,6 +134,224 @@ func TestCreateOrUpdateConsumer(t *testing.T) { if ci.CachedInfo().Config.AckPolicy != test.consumerConfig.AckPolicy { t.Fatalf("Invalid ack policy; want: %s; got: %s", test.consumerConfig.AckPolicy, ci.CachedInfo().Config.AckPolicy) } + if !reflect.DeepEqual(test.consumerConfig.FilterSubjects, ci.CachedInfo().Config.FilterSubjects) { + t.Fatalf("Invalid filter subjects; want: %v; got: %v", test.consumerConfig.FilterSubjects, ci.CachedInfo().Config.FilterSubjects) + } + }) + } +} + +func TestCreateConsumer(t *testing.T) { + tests := []struct { + name string + consumerConfig jetstream.ConsumerConfig + shouldCreate bool + withError error + }{ + { + name: "create durable pull consumer", + consumerConfig: jetstream.ConsumerConfig{Durable: "dur"}, + shouldCreate: true, + }, + { + name: "idempotent create, no error", + consumerConfig: jetstream.ConsumerConfig{Durable: "dur"}, + shouldCreate: true, + }, + { + name: "create ephemeral pull consumer", + consumerConfig: jetstream.ConsumerConfig{AckPolicy: jetstream.AckNonePolicy}, + shouldCreate: true, + }, + { + name: "with filter subject", + consumerConfig: jetstream.ConsumerConfig{FilterSubject: "FOO.A"}, + shouldCreate: true, + }, + { + name: "with metadata", + consumerConfig: jetstream.ConsumerConfig{Metadata: map[string]string{"foo": "bar", "baz": "quux"}}, + shouldCreate: true, + }, + { + name: "with multiple filter subjects", + consumerConfig: jetstream.ConsumerConfig{FilterSubjects: []string{"FOO.A", "FOO.B"}}, + shouldCreate: true, + }, + { + name: "with multiple filter subjects, overlapping subjects", + consumerConfig: jetstream.ConsumerConfig{FilterSubjects: []string{"FOO.*", "FOO.B"}}, + withError: jetstream.ErrOverlappingFilterSubjects, + }, + { + name: "with multiple filter subjects and filter subject provided", + consumerConfig: jetstream.ConsumerConfig{FilterSubjects: []string{"FOO.A", "FOO.B"}, FilterSubject: "FOO.C"}, + withError: jetstream.ErrDuplicateFilterSubjects, + }, + { + name: "with empty subject in FilterSubjects", + consumerConfig: jetstream.ConsumerConfig{FilterSubjects: []string{"FOO.A", ""}}, + withError: jetstream.ErrEmptyFilter, + }, + { + name: "consumer already exists, error", + consumerConfig: jetstream.ConsumerConfig{Durable: "dur", Description: "test consumer"}, + withError: jetstream.ErrConsumerExists, + }, + { + name: "invalid durable name", + consumerConfig: jetstream.ConsumerConfig{Durable: "dur.123"}, + withError: jetstream.ErrInvalidConsumerName, + }, + } + + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var sub *nats.Subscription + if test.consumerConfig.FilterSubject != "" { + sub, err = nc.SubscribeSync(fmt.Sprintf("$JS.API.CONSUMER.CREATE.foo.*.%s", test.consumerConfig.FilterSubject)) + } else { + sub, err = nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo.*") + } + c, err := s.CreateConsumer(ctx, test.consumerConfig) + if test.withError != nil { + if !errors.Is(err, test.withError) { + t.Fatalf("Expected error: %v; got: %v", test.withError, err) + } + return + } + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if test.shouldCreate { + if _, err := sub.NextMsgWithContext(ctx); err != nil { + t.Fatalf("Expected request on %s; got %s", sub.Subject, err) + } + } + ci, err := s.Consumer(ctx, c.CachedInfo().Name) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if ci.CachedInfo().Config.AckPolicy != test.consumerConfig.AckPolicy { + t.Fatalf("Invalid ack policy; want: %s; got: %s", test.consumerConfig.AckPolicy, ci.CachedInfo().Config.AckPolicy) + } + if !reflect.DeepEqual(test.consumerConfig.FilterSubjects, ci.CachedInfo().Config.FilterSubjects) { + t.Fatalf("Invalid filter subjects; want: %v; got: %v", test.consumerConfig.FilterSubjects, ci.CachedInfo().Config.FilterSubjects) + } + if !reflect.DeepEqual(test.consumerConfig.Metadata, ci.CachedInfo().Config.Metadata) { + t.Fatalf("Invalid metadata; want: %v; got: %v", test.consumerConfig.Metadata, ci.CachedInfo().Config.Metadata) + } + }) + } +} + +func TestUpdateConsumer(t *testing.T) { + tests := []struct { + name string + consumerConfig jetstream.ConsumerConfig + shouldUpdate bool + withError error + }{ + { + name: "update consumer", + consumerConfig: jetstream.ConsumerConfig{Name: "testcons", Description: "updated consumer"}, + shouldUpdate: true, + }, + { + name: "update consumer, with metadata", + consumerConfig: jetstream.ConsumerConfig{Name: "testcons", Description: "updated consumer", Metadata: map[string]string{"foo": "bar", "baz": "quux"}}, + shouldUpdate: true, + }, + { + name: "illegal update", + consumerConfig: jetstream.ConsumerConfig{Name: "testcons", AckPolicy: jetstream.AckNonePolicy}, + withError: jetstream.ErrConsumerCreate, + }, + { + name: "consumer does not exist", + consumerConfig: jetstream.ConsumerConfig{Name: "abc"}, + withError: jetstream.ErrConsumerDoesNotExist, + }, + } + + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = s.CreateConsumer(ctx, jetstream.ConsumerConfig{Name: "testcons"}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var sub *nats.Subscription + if test.consumerConfig.FilterSubject != "" { + sub, err = nc.SubscribeSync(fmt.Sprintf("$JS.API.CONSUMER.CREATE.foo.*.%s", test.consumerConfig.FilterSubject)) + } else { + sub, err = nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo.*") + } + c, err := s.UpdateConsumer(ctx, test.consumerConfig) + if test.withError != nil { + if !errors.Is(err, test.withError) { + t.Fatalf("Expected error: %v; got: %v", test.withError, err) + } + return + } + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if test.shouldUpdate { + if _, err := sub.NextMsgWithContext(ctx); err != nil { + t.Fatalf("Expected request on %s; got %s", sub.Subject, err) + } + } + ci, err := s.Consumer(ctx, c.CachedInfo().Name) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if ci.CachedInfo().Config.AckPolicy != test.consumerConfig.AckPolicy { + t.Fatalf("Invalid ack policy; want: %s; got: %s", test.consumerConfig.AckPolicy, ci.CachedInfo().Config.AckPolicy) + } + if !reflect.DeepEqual(test.consumerConfig.FilterSubjects, ci.CachedInfo().Config.FilterSubjects) { + t.Fatalf("Invalid filter subjects; want: %v; got: %v", test.consumerConfig.FilterSubjects, ci.CachedInfo().Config.FilterSubjects) + } }) } } diff --git a/js.go b/js.go index ae19641fc..b31839bf2 100644 --- a/js.go +++ b/js.go @@ -1118,6 +1118,7 @@ type ConsumerConfig struct { MaxDeliver int `json:"max_deliver,omitempty"` BackOff []time.Duration `json:"backoff,omitempty"` FilterSubject string `json:"filter_subject,omitempty"` + FilterSubjects []string `json:"filter_subjects,omitempty"` ReplayPolicy ReplayPolicy `json:"replay_policy"` RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec SampleFrequency string `json:"sample_freq,omitempty"` @@ -1143,6 +1144,11 @@ type ConsumerConfig struct { Replicas int `json:"num_replicas"` // Force memory storage. MemoryStorage bool `json:"mem_storage,omitempty"` + + // Metadata is additional metadata for the Consumer. + // Keys starting with `_nats` are reserved. + // NOTE: Metadata requires nats-server v2.10.0+ + Metadata map[string]string `json:"metadata,omitempty"` } // ConsumerInfo is the info from a JetStream consumer. @@ -2570,6 +2576,16 @@ func ConsumerName(name string) SubOpt { }) } +// ConsumerFilterSubjects can be used to set multiple subject filters on the consumer. +// It has to be used in conjunction with [nats.BindStream] and +// with empty 'subject' parameter. +func ConsumerFilterSubjects(subjects ...string) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.FilterSubjects = subjects + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. @@ -3724,6 +3740,53 @@ func (st *StorageType) UnmarshalJSON(data []byte) error { return nil } +type StoreCompression uint8 + +const ( + NoCompression StoreCompression = iota + S2Compression +) + +func (alg StoreCompression) String() string { + switch alg { + case NoCompression: + return "None" + case S2Compression: + return "S2" + default: + return "Unknown StoreCompression" + } +} + +func (alg StoreCompression) MarshalJSON() ([]byte, error) { + var str string + switch alg { + case S2Compression: + str = "s2" + case NoCompression: + str = "none" + default: + return nil, fmt.Errorf("unknown compression algorithm") + } + return json.Marshal(str) +} + +func (alg *StoreCompression) UnmarshalJSON(b []byte) error { + var str string + if err := json.Unmarshal(b, &str); err != nil { + return err + } + switch str { + case "s2": + *alg = S2Compression + case "none": + *alg = NoCompression + default: + return fmt.Errorf("unknown compression algorithm") + } + return nil +} + // Length of our hash used for named consumers. const nameHashLen = 8 diff --git a/js_test.go b/js_test.go index f01547e04..a36d96285 100644 --- a/js_test.go +++ b/js_test.go @@ -1276,3 +1276,51 @@ func TestStreamNameBySubject(t *testing.T) { } } } + +func TestJetStreamTransform(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + _, err := js.AddStream(&StreamConfig{ + Name: "ORIGIN", + Subjects: []string{"test"}, + SubjectTransform: &SubjectTransformConfig{Source: ">", Destination: "transformed.>"}, + Storage: MemoryStorage, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + err = nc.Publish("test", []byte("1")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&StreamConfig{ + Subjects: []string{}, + Name: "SOURCING", + Sources: []*StreamSource{{Name: "ORIGIN", SubjectTransforms: []SubjectTransformConfig{{Source: ">", Destination: "fromtest.>"}}}}, + Storage: MemoryStorage, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Create a sync subscription with an in-memory ephemeral consumer. + sub, err := js.SubscribeSync("fromtest.>", ConsumerMemoryStorage(), BindStream("SOURCING")) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + m, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if m.Subject != "fromtest.transformed.test" { + t.Fatalf("the subject of the message doesn't match the expected fromtest.transformed.test: %s", m.Subject) + } + +} diff --git a/jserrors.go b/jserrors.go index cb3845cce..c8b1f5fc6 100644 --- a/jserrors.go +++ b/jserrors.go @@ -33,6 +33,26 @@ var ( // ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration. ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}} + // ErrStreamSubjectTransformNotSupported is returned when the connected nats-server version does not support setting + // the stream subject transform. If this error is returned when executing AddStream(), the stream with invalid + // configuration was already created in the server. + ErrStreamSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"} + + // ErrStreamSourceSubjectTransformNotSupported is returned when the connected nats-server version does not support setting + // the stream source subject transform. If this error is returned when executing AddStream(), the stream with invalid + // configuration was already created in the server. + ErrStreamSourceSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"} + + // ErrStreamSourceNotSupported is returned when the connected nats-server version does not support setting + // the stream sources. If this error is returned when executing AddStream(), the stream with invalid + // configuration was already created in the server. + ErrStreamSourceNotSupported JetStreamError = &jsError{message: "stream sourcing is not supported by nats-server"} + + // ErrStreamSourceMultipleSubjectTransformsNotSupported is returned when the connected nats-server version does not support setting + // the stream sources. If this error is returned when executing AddStream(), the stream with invalid + // configuration was already created in the server. + ErrStreamSourceMultipleSubjectTransformsNotSupported JetStreamError = &jsError{message: "stream sourceing with multiple subject transforms not supported by nats-server"} + // ErrConsumerNotFound is an error returned when consumer with given name does not exist. ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}} @@ -42,6 +62,15 @@ var ( // ErrBadRequest is returned when invalid request is sent to JetStream API. ErrBadRequest JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeBadRequest, Description: "bad request", Code: 400}} + // ErrDuplicateFilterSubjects is returned when both FilterSubject and FilterSubjects are specified when creating consumer. + ErrDuplicateFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeDuplicateFilterSubjects, Description: "consumer cannot have both FilterSubject and FilterSubjects specified", Code: 500}} + + // ErrDuplicateFilterSubjects is returned when filter subjects overlap when creating consumer. + ErrOverlappingFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeOverlappingFilterSubjects, Description: "consumer subject filters cannot overlap", Code: 500}} + + // ErrEmptyFilter is returned when a filter in FilterSubjects is empty. + ErrEmptyFilter JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerEmptyFilter, Description: "consumer filter in FilterSubjects cannot be empty", Code: 500}} + // Client errors // ErrConsumerNameAlreadyInUse is an error returned when consumer with given name already exists. @@ -62,6 +91,11 @@ var ( // ErrConsumerNameRequired is returned when the provided consumer durable name is empty. ErrConsumerNameRequired JetStreamError = &jsError{message: "consumer name is required"} + // ErrConsumerMultipleFilterSubjectsNotSupported is returned when the connected nats-server version does not support setting + // multiple filter subjects with filter_subjects field. If this error is returned when executing AddConsumer(), the consumer with invalid + // configuration was already created in the server. + ErrConsumerMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "multiple consumer filter subjects not supported by nats-server"} + // ErrConsumerConfigRequired is returned when empty consumer consuguration is supplied to add/update consumer. ErrConsumerConfigRequired JetStreamError = &jsError{message: "consumer configuration is required"} @@ -123,13 +157,17 @@ const ( JSErrCodeStreamNotFound ErrorCode = 10059 JSErrCodeStreamNameInUse ErrorCode = 10058 - JSErrCodeConsumerNotFound ErrorCode = 10014 - JSErrCodeConsumerNameExists ErrorCode = 10013 - JSErrCodeConsumerAlreadyExists ErrorCode = 10105 + JSErrCodeConsumerNotFound ErrorCode = 10014 + JSErrCodeConsumerNameExists ErrorCode = 10013 + JSErrCodeConsumerAlreadyExists ErrorCode = 10105 + JSErrCodeDuplicateFilterSubjects ErrorCode = 10136 + JSErrCodeOverlappingFilterSubjects ErrorCode = 10138 + JSErrCodeConsumerEmptyFilter ErrorCode = 10139 JSErrCodeMessageNotFound ErrorCode = 10037 - JSErrCodeBadRequest ErrorCode = 10003 + JSErrCodeBadRequest ErrorCode = 10003 + JSStreamInvalidConfig ErrorCode = 10052 JSErrCodeStreamWrongLastSequence ErrorCode = 10071 ) diff --git a/jsm.go b/jsm.go index c6684692b..720efe2ee 100644 --- a/jsm.go +++ b/jsm.go @@ -102,30 +102,35 @@ type JetStreamManager interface { // There are sensible defaults for most. If no subjects are // given the name will be used as the only subject. type StreamConfig struct { - Name string `json:"name"` - Description string `json:"description,omitempty"` - Subjects []string `json:"subjects,omitempty"` - Retention RetentionPolicy `json:"retention"` - MaxConsumers int `json:"max_consumers"` - MaxMsgs int64 `json:"max_msgs"` - MaxBytes int64 `json:"max_bytes"` - Discard DiscardPolicy `json:"discard"` - DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"` - MaxAge time.Duration `json:"max_age"` - MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"` - MaxMsgSize int32 `json:"max_msg_size,omitempty"` - Storage StorageType `json:"storage"` - Replicas int `json:"num_replicas"` - NoAck bool `json:"no_ack,omitempty"` - Template string `json:"template_owner,omitempty"` - Duplicates time.Duration `json:"duplicate_window,omitempty"` - Placement *Placement `json:"placement,omitempty"` - Mirror *StreamSource `json:"mirror,omitempty"` - Sources []*StreamSource `json:"sources,omitempty"` - Sealed bool `json:"sealed,omitempty"` - DenyDelete bool `json:"deny_delete,omitempty"` - DenyPurge bool `json:"deny_purge,omitempty"` - AllowRollup bool `json:"allow_rollup_hdrs,omitempty"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + Subjects []string `json:"subjects,omitempty"` + Retention RetentionPolicy `json:"retention"` + MaxConsumers int `json:"max_consumers"` + MaxMsgs int64 `json:"max_msgs"` + MaxBytes int64 `json:"max_bytes"` + Discard DiscardPolicy `json:"discard"` + DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"` + MaxAge time.Duration `json:"max_age"` + MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"` + MaxMsgSize int32 `json:"max_msg_size,omitempty"` + Storage StorageType `json:"storage"` + Replicas int `json:"num_replicas"` + NoAck bool `json:"no_ack,omitempty"` + Template string `json:"template_owner,omitempty"` + Duplicates time.Duration `json:"duplicate_window,omitempty"` + Placement *Placement `json:"placement,omitempty"` + Mirror *StreamSource `json:"mirror,omitempty"` + Sources []*StreamSource `json:"sources,omitempty"` + Sealed bool `json:"sealed,omitempty"` + DenyDelete bool `json:"deny_delete,omitempty"` + DenyPurge bool `json:"deny_purge,omitempty"` + AllowRollup bool `json:"allow_rollup_hdrs,omitempty"` + Compression StoreCompression `json:"compression"` + FirstSeq uint64 `json:"first_seq,omitempty"` + + // Allow applying a subject transform to incoming messages before doing anything else. + SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"` // Allow republish of the message after being sequenced and stored. RePublish *RePublish `json:"republish,omitempty"` @@ -134,6 +139,20 @@ type StreamConfig struct { AllowDirect bool `json:"allow_direct"` // Allow higher performance and unified direct access for mirrors as well. MirrorDirect bool `json:"mirror_direct"` + + // Limits for consumers on this stream. + ConsumerLimits StreamConsumerLimits `json:"consumer_limits,omitempty"` + + // Metadata is additional metadata for the Stream. + // Keys starting with `_nats` are reserved. + // NOTE: Metadata requires nats-server v2.10.0+ + Metadata map[string]string `json:"metadata,omitempty"` +} + +// SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received. +type SubjectTransformConfig struct { + Source string `json:"src,omitempty"` + Destination string `json:"dest"` } // RePublish is for republishing messages once committed to a stream. The original @@ -152,12 +171,13 @@ type Placement struct { // StreamSource dictates how streams can source from other streams. type StreamSource struct { - Name string `json:"name"` - OptStartSeq uint64 `json:"opt_start_seq,omitempty"` - OptStartTime *time.Time `json:"opt_start_time,omitempty"` - FilterSubject string `json:"filter_subject,omitempty"` - External *ExternalStream `json:"external,omitempty"` - Domain string `json:"-"` + Name string `json:"name"` + OptStartSeq uint64 `json:"opt_start_seq,omitempty"` + OptStartTime *time.Time `json:"opt_start_time,omitempty"` + FilterSubject string `json:"filter_subject,omitempty"` + SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` + External *ExternalStream `json:"external,omitempty"` + Domain string `json:"-"` } // ExternalStream allows you to qualify access to a stream source in another @@ -167,6 +187,13 @@ type ExternalStream struct { DeliverPrefix string `json:"deliver,omitempty"` } +// StreamConsumerLimits are the limits for a consumer on a stream. +// These can be overridden on a per consumer basis. +type StreamConsumerLimits struct { + InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"` + MaxAckPending int `json:"max_ack_pending,omitempty"` +} + // Helper for copying when we do not want to change user's version. func (ss *StreamSource) copy() *StreamSource { nss := *ss @@ -407,6 +434,11 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o } return nil, info.Error } + + // check whether multiple filter subjects (if used) are reflected in the returned ConsumerInfo + if len(cfg.FilterSubjects) != 0 && len(info.Config.FilterSubjects) == 0 { + return nil, ErrConsumerMultipleFilterSubjectsNotSupported + } return info.ConsumerInfo, nil } @@ -780,6 +812,21 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { return nil, resp.Error } + // check that input subject transform (if used) is reflected in the returned ConsumerInfo + if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil { + return nil, ErrStreamSubjectTransformNotSupported + } + if len(cfg.Sources) != 0 { + if len(cfg.Sources) != len(resp.Sources) { + return nil, ErrStreamSourceNotSupported + } + for i := range cfg.Sources { + if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 { + return nil, ErrStreamSourceMultipleSubjectTransformsNotSupported + } + } + } + return resp.StreamInfo, nil } @@ -897,11 +944,13 @@ type StreamAlternate struct { // StreamSourceInfo shows information about an upstream stream source. type StreamSourceInfo struct { - Name string `json:"name"` - Lag uint64 `json:"lag"` - Active time.Duration `json:"active"` - External *ExternalStream `json:"external"` - Error *APIError `json:"error"` + Name string `json:"name"` + Lag uint64 `json:"lag"` + Active time.Duration `json:"active"` + External *ExternalStream `json:"external"` + Error *APIError `json:"error"` + FilterSubject string `json:"filter_subject,omitempty"` + SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` } // StreamState is information about the given stream. @@ -973,6 +1022,23 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error } return nil, resp.Error } + + // check that input subject transform (if used) is reflected in the returned StreamInfo + if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil { + return nil, ErrStreamSubjectTransformNotSupported + } + + if len(cfg.Sources) != 0 { + if len(cfg.Sources) != len(resp.Sources) { + return nil, ErrStreamSourceNotSupported + } + for i := range cfg.Sources { + if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 { + return nil, ErrStreamSourceMultipleSubjectTransformsNotSupported + } + } + } + return resp.StreamInfo, nil } diff --git a/kv.go b/kv.go index 17c9e2fd5..7382f4d87 100644 --- a/kv.go +++ b/kv.go @@ -432,14 +432,21 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { scfg.Mirror = m scfg.MirrorDirect = true } else if len(cfg.Sources) > 0 { - // For now we do not allow direct subjects for sources. If that is desired a user could use stream API directly. for _, ss := range cfg.Sources { - if !strings.HasPrefix(ss.Name, kvBucketNamePre) { - ss = ss.copy() + var sourceBucketName string + if strings.HasPrefix(ss.Name, kvBucketNamePre) { + sourceBucketName = ss.Name[len(kvBucketNamePre):] + } else { + sourceBucketName = ss.Name ss.Name = fmt.Sprintf(kvBucketNameTmpl, ss.Name) } + + if ss.External == nil || sourceBucketName != cfg.Bucket { + ss.SubjectTransforms = []SubjectTransformConfig{{Source: fmt.Sprintf(kvSubjectsTmpl, sourceBucketName), Destination: fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}} + } scfg.Sources = append(scfg.Sources, ss) } + scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)} } else { scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)} } diff --git a/kv_test.go b/kv_test.go index 0eb96cbdb..3e0d4843c 100644 --- a/kv_test.go +++ b/kv_test.go @@ -251,3 +251,66 @@ func TestKeyValueCreate(t *testing.T) { t.Fatalf("Unexpected error code, got: %v", kerr.APIError().ErrorCode) } } + +func TestKeyValueSourcing(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + kvA, err := js.CreateKeyValue(&KeyValueConfig{Bucket: "A"}) + if err != nil { + t.Fatalf("Error creating kv: %v", err) + } + + _, err = kvA.Create("keyA", []byte("1")) + if err != nil { + t.Fatalf("Error creating key: %v", err) + } + + if _, err := kvA.Get("keyA"); err != nil { + t.Fatalf("Got error getting keyA from A: %v", err) + } + + kvB, err := js.CreateKeyValue(&KeyValueConfig{Bucket: "B"}) + if err != nil { + t.Fatalf("Error creating kv: %v", err) + } + + _, err = kvB.Create("keyB", []byte("1")) + if err != nil { + t.Fatalf("Error creating key: %v", err) + } + + kvC, err := js.CreateKeyValue(&KeyValueConfig{Bucket: "C", Sources: []*StreamSource{{Name: "A"}, {Name: "B"}}}) + if err != nil { + t.Fatalf("Error creating kv: %v", err) + } + + // Wait half a second to make sure it has time to populate the stream from it's sources + i := 0 + for { + status, err := kvC.Status() + if err != nil { + t.Fatalf("Error getting bucket status: %v", err) + } + if status.Values() == 2 { + break + } else { + i++ + if i > 3 { + t.Fatalf("Error sourcing bucket does not contain the expected number of values") + } + } + time.Sleep(20 * time.Millisecond) + } + + if _, err := kvC.Get("keyA"); err != nil { + t.Fatalf("Got error getting keyA from C: %v", err) + } + + if _, err := kvC.Get("keyB"); err != nil { + t.Fatalf("Got error getting keyB from C: %v", err) + } +} diff --git a/object.go b/object.go index 86b72abdd..f6ba8fb16 100644 --- a/object.go +++ b/object.go @@ -149,6 +149,10 @@ type ObjectStoreConfig struct { Storage StorageType `json:"storage,omitempty"` Replicas int `json:"num_replicas,omitempty"` Placement *Placement `json:"placement,omitempty"` + + // Bucket-specific metadata + // NOTE: Metadata requires nats-server v2.10.0+ + Metadata map[string]string `json:"metadata,omitempty"` } type ObjectStoreStatus interface { @@ -168,6 +172,8 @@ type ObjectStoreStatus interface { Size() uint64 // BackingStore provides details about the underlying storage BackingStore() string + // Metadata is the user supplied metadata for the bucket + Metadata() map[string]string } // ObjectMetaOptions @@ -178,9 +184,10 @@ type ObjectMetaOptions struct { // ObjectMeta is high level information about an object. type ObjectMeta struct { - Name string `json:"name"` - Description string `json:"description,omitempty"` - Headers Header `json:"headers,omitempty"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + Headers Header `json:"headers,omitempty"` + Metadata map[string]string `json:"metadata,omitempty"` // Optional options. Opts *ObjectMetaOptions `json:"options,omitempty"` @@ -272,6 +279,7 @@ func (js *js) CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) { Discard: DiscardNew, AllowRollup: true, AllowDirect: true, + Metadata: cfg.Metadata, } // Create our stream. @@ -966,6 +974,7 @@ func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error { info.Name = meta.Name info.Description = meta.Description info.Headers = meta.Headers + info.Metadata = meta.Metadata // Prepare the meta message if err = publishMeta(info, obs.js); err != nil { @@ -1189,6 +1198,9 @@ func (s *ObjectBucketStatus) Size() uint64 { return s.nfo.State.Bytes } // BackingStore indicates what technology is used for storage of the bucket func (s *ObjectBucketStatus) BackingStore() string { return "JetStream" } +// Metadata is the metadata supplied when creating the bucket +func (s *ObjectBucketStatus) Metadata() map[string]string { return s.nfo.Config.Metadata } + // StreamInfo is the stream info retrieved to create the status func (s *ObjectBucketStatus) StreamInfo() *StreamInfo { return s.nfo } diff --git a/test/js_test.go b/test/js_test.go index ab599c4e0..83e7e3d2a 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -2069,14 +2069,46 @@ func TestJetStreamManagement(t *testing.T) { // Create the stream using our client API. var si *nats.StreamInfo + t.Run("create stream", func(t *testing.T) { - si, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"foo", "bar"}}) + consLimits := nats.StreamConsumerLimits{ + MaxAckPending: 100, + InactiveThreshold: 10 * time.Second, + } + cfg := &nats.StreamConfig{ + Name: "foo", + Subjects: []string{"foo", "bar", "baz"}, + Compression: nats.S2Compression, + ConsumerLimits: nats.StreamConsumerLimits{ + MaxAckPending: 100, + InactiveThreshold: 10 * time.Second, + }, + FirstSeq: 22, + Metadata: map[string]string{ + "foo": "bar", + "baz": "quux", + }, + } + + si, err := js.AddStream(cfg) if err != nil { t.Fatalf("Unexpected error: %v", err) } if si == nil || si.Config.Name != "foo" { t.Fatalf("StreamInfo is not correct %+v", si) } + if !reflect.DeepEqual(si.Config.Metadata, map[string]string{"foo": "bar", "baz": "quux"}) { + t.Fatalf("Metadata is not correct %+v", si.Config.Metadata) + } + if si.Config.Compression != nats.S2Compression { + t.Fatalf("Compression is not correct %+v", si.Config.Compression) + } + if si.Config.FirstSeq != 22 { + t.Fatalf("FirstSeq is not correct %+v", si.Config.FirstSeq) + } + if si.Config.ConsumerLimits != consLimits { + t.Fatalf("ConsumerLimits is not correct %+v", si.Config.ConsumerLimits) + } }) t.Run("stream with given name already exists", func(t *testing.T) { @@ -2161,7 +2193,14 @@ func TestJetStreamManagement(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } defer sub.Unsubscribe() - ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}) + ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{ + Durable: "dlc", + AckPolicy: nats.AckExplicitPolicy, + Metadata: map[string]string{ + "foo": "bar", + "baz": "quux", + }, + }) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -2175,6 +2214,9 @@ func TestJetStreamManagement(t *testing.T) { if ci == nil || ci.Name != "dlc" || ci.Stream != "foo" { t.Fatalf("ConsumerInfo is not correct %+v", ci) } + if !reflect.DeepEqual(ci.Config.Metadata, map[string]string{"foo": "bar", "baz": "quux"}) { + t.Fatalf("Metadata is not correct %+v", ci.Config.Metadata) + } }) t.Run("with name set", func(t *testing.T) { sub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo.dlc-1") @@ -2294,6 +2336,82 @@ func TestJetStreamManagement(t *testing.T) { } }) + t.Run("durable consumer with multiple filter subjects", func(t *testing.T) { + sub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo.dlc-5") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{ + Durable: "dlc-5", + AckPolicy: nats.AckExplicitPolicy, + FilterSubjects: []string{"foo", "bar"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + msg, err := sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !strings.Contains(string(msg.Data), `"durable_name":"dlc-5"`) { + t.Fatalf("create consumer message is not correct: %q", string(msg.Data)) + } + if ci == nil || ci.Config.Durable != "dlc-5" || !reflect.DeepEqual(ci.Config.FilterSubjects, []string{"foo", "bar"}) { + t.Fatalf("ConsumerInfo is not correct %+v", ci) + } + }) + + t.Run("ephemeral consumer with multiple filter subjects", func(t *testing.T) { + sub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{ + AckPolicy: nats.AckExplicitPolicy, + FilterSubjects: []string{"foo", "bar"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + _, err = sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if ci == nil || !reflect.DeepEqual(ci.Config.FilterSubjects, []string{"foo", "bar"}) { + t.Fatalf("ConsumerInfo is not correct %+v", ci) + } + }) + + t.Run("multiple filter subjects errors", func(t *testing.T) { + // both filter subject and filter subjects provided + _, err := js.AddConsumer("foo", &nats.ConsumerConfig{ + AckPolicy: nats.AckExplicitPolicy, + FilterSubjects: []string{"foo", "bar"}, + FilterSubject: "baz", + }) + if !errors.Is(err, nats.ErrDuplicateFilterSubjects) { + t.Fatalf("Expected: %v; got: %v", nats.ErrDuplicateFilterSubjects, err) + } + // overlapping filter subjects + _, err = js.AddConsumer("foo", &nats.ConsumerConfig{ + AckPolicy: nats.AckExplicitPolicy, + FilterSubjects: []string{"foo.*", "foo.A"}, + }) + if !errors.Is(err, nats.ErrOverlappingFilterSubjects) { + t.Fatalf("Expected: %v; got: %v", nats.ErrOverlappingFilterSubjects, err) + } + // empty filter subject in filter subjects + _, err = js.AddConsumer("foo", &nats.ConsumerConfig{ + AckPolicy: nats.AckExplicitPolicy, + FilterSubjects: []string{"foo", ""}, + }) + if !errors.Is(err, nats.ErrEmptyFilter) { + t.Fatalf("Expected: %v; got: %v", nats.ErrEmptyFilter, err) + } + }) + t.Run("with invalid consumer name", func(t *testing.T) { if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "test.durable"}); err != nats.ErrInvalidConsumerName { t.Fatalf("Expected: %v; got: %v", nats.ErrInvalidConsumerName, err) @@ -2400,7 +2518,7 @@ func TestJetStreamManagement(t *testing.T) { for info := range js.Consumers("foo") { infos = append(infos, info) } - if len(infos) != 6 || infos[0].Stream != "foo" { + if len(infos) != 8 || infos[0].Stream != "foo" { t.Fatalf("ConsumerInfo is not correct %+v", infos) } }) @@ -2412,7 +2530,7 @@ func TestJetStreamManagement(t *testing.T) { for name := range js.ConsumerNames("foo", nats.Context(ctx)) { names = append(names, name) } - if got, want := len(names), 6; got != want { + if got, want := len(names), 8; got != want { t.Fatalf("Unexpected names, got=%d, want=%d", got, want) } }) @@ -2596,6 +2714,88 @@ func TestJetStreamManagement(t *testing.T) { }) } +func TestStreamConfigMatches(t *testing.T) { + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + nc, js := jsClient(t, srv) + defer nc.Close() + + cfgSource := nats.StreamConfig{ + Name: "source", + Description: "desc", + Subjects: []string{"foo.*"}, + Retention: nats.WorkQueuePolicy, + MaxConsumers: 10, + MaxMsgs: 100, + MaxBytes: 1000, + Discard: nats.DiscardNew, + DiscardNewPerSubject: true, + MaxAge: 100 * time.Second, + MaxMsgsPerSubject: 1000, + MaxMsgSize: 10000, + Storage: nats.MemoryStorage, + Replicas: 1, + NoAck: true, + Duplicates: 10 * time.Second, + Sealed: false, + DenyDelete: true, + DenyPurge: false, + AllowRollup: true, + Compression: nats.S2Compression, + FirstSeq: 5, + SubjectTransform: &nats.SubjectTransformConfig{Source: ">", Destination: "transformed.>"}, + RePublish: &nats.RePublish{ + Source: ">", + Destination: "RP.>", + HeadersOnly: true, + }, + AllowDirect: true, + ConsumerLimits: nats.StreamConsumerLimits{ + InactiveThreshold: 10 * time.Second, + MaxAckPending: 500, + }, + Metadata: map[string]string{ + "foo": "bar", + }, + } + + s, err := js.AddStream(&cfgSource) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(s.Config, cfgSource) { + t.Fatalf("StreamConfig doesn't match") + } + + cfg := nats.StreamConfig{ + Name: "mirror", + MaxConsumers: 10, + MaxMsgs: 100, + MaxBytes: 1000, + MaxAge: 100 * time.Second, + MaxMsgsPerSubject: 1000, + MaxMsgSize: 10000, + Replicas: 1, + Mirror: &nats.StreamSource{ + Name: "source", + OptStartSeq: 10, + SubjectTransforms: []nats.SubjectTransformConfig{ + {Source: ">", Destination: "transformed.>"}, + }, + }, + MirrorDirect: true, + SubjectTransform: &nats.SubjectTransformConfig{Source: ">", Destination: "transformed.>"}, + } + + s, err = js.AddStream(&cfg) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(s.Config, cfg) { + t.Fatalf("StreamConfig doesn't match") + } +} + func TestStreamLister(t *testing.T) { tests := []struct { name string @@ -5571,6 +5771,70 @@ func TestJetStreamSubscribe_RateLimit(t *testing.T) { } } +func TestJetStreamSubscribe_FilterSubjects(t *testing.T) { + tests := []struct { + name string + durable string + }{ + { + name: "ephemeral consumer", + }, + { + name: "durable consumer", + durable: "cons", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + var err error + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar", "baz"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + for i := 0; i < 5; i++ { + js.Publish("foo", []byte("msg")) + } + for i := 0; i < 5; i++ { + js.Publish("bar", []byte("msg")) + } + for i := 0; i < 5; i++ { + js.Publish("baz", []byte("msg")) + } + + opts := []nats.SubOpt{nats.BindStream("TEST"), nats.ConsumerFilterSubjects("foo", "baz")} + if test.durable != "" { + opts = append(opts, nats.Durable(test.durable)) + } + sub, err := js.SubscribeSync("", opts...) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + + for i := 0; i < 10; i++ { + msg, err := sub.NextMsg(500 * time.Millisecond) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + if msg.Subject != "foo" && msg.Subject != "baz" { + t.Fatalf("Unexpected message subject: %s", msg.Subject) + } + } + }) + } + +} + func TestJetStreamSubscribe_ConfigCantChange(t *testing.T) { s := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, s) @@ -6100,17 +6364,6 @@ func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) { } }) - t.Run("bind to stream with wrong subject fails", func(t *testing.T) { - _, err := js.SubscribeSync("secret", nats.BindStream("origin")) - if err == nil { - t.Fatal("Unexpected success") - } - apiErr := &nats.APIError{} - if !errors.As(err, &apiErr) || apiErr.ErrorCode != 10093 { - t.Fatalf("Expected API error 10093; got: %v", err) - } - }) - t.Run("bind to origin stream", func(t *testing.T) { // This would only avoid the stream names lookup. sub, err := js.SubscribeSync("origin", nats.BindStream("origin")) @@ -6264,103 +6517,78 @@ func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) { } }) - // Commenting out this test until we figure out what was the intent. - // Since v2.8.0, this test would fail with a "detected cycle" error, - // I guess because "m1" already sources "origin", so creating a - // stream with both as a source is bad. - /* - t.Run("create sourced stream from origin", func(t *testing.T) { - sources := make([]*nats.StreamSource, 0) - sources = append(sources, &nats.StreamSource{Name: "origin"}) - sources = append(sources, &nats.StreamSource{Name: "m1"}) - streamName := "s2" - _, err = js.AddStream(&nats.StreamConfig{ - Name: streamName, - Sources: sources, - Storage: nats.FileStorage, - Replicas: 1, - }) - if err != nil { - t.Fatalf("Unexpected error creating stream: %v", err) - } - - msgs := make([]*nats.RawStreamMsg, 0) - - // Stored message sequences start at 1 - startSequence := 1 - expectedTotal := totalMsgs * 2 + t.Run("bind to stream with subject not in stream", func(t *testing.T) { + // The Stream does not have a subject called 'nothing' but client is still able + // to bind to the origin stream even though it cannot consume messages. + // After updating the stream with the subject this consumer will be able to + // match and receive messages. + sub, err := js.SubscribeSync("nothing", nats.BindStream("origin")) + if err != nil { + t.Fatal(err) + } + _, err = sub.NextMsg(1 * time.Second) + if !errors.Is(err, nats.ErrTimeout) { + t.Fatal("Expected timeout error") + } - GetNextMsg: - for i := startSequence; i < expectedTotal+1; i++ { - var ( - err error - seq = uint64(i) - msg *nats.RawStreamMsg - timeout = time.Now().Add(5 * time.Second) - ) + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + got := info.Stream + expected := "origin" + if got != expected { + t.Fatalf("Expected %v, got %v", expected, got) + } - Retry: - for time.Now().Before(timeout) { - msg, err = js.GetMsg(streamName, seq) - if err != nil { - time.Sleep(100 * time.Millisecond) - continue Retry - } - msgs = append(msgs, msg) - continue GetNextMsg - } - if err != nil { - t.Fatalf("Unexpected error fetching seq=%v: %v", seq, err) - } - } + got = info.Config.FilterSubject + expected = "nothing" + if got != expected { + t.Fatalf("Expected %v, got %v", expected, got) + } - got := len(msgs) - if got < expectedTotal { - t.Errorf("Expected %v, got: %v", expectedTotal, got) - } + t.Run("can consume after stream update", func(t *testing.T) { + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "origin", + Placement: &nats.Placement{ + Tags: []string{"NODE_0"}, + }, + Storage: nats.MemoryStorage, + Replicas: 1, + Subjects: []string{"origin", "nothing"}, + }) + js.Publish("nothing", []byte("hello world")) - si, err := js.StreamInfo(streamName) + msg, err := sub.NextMsg(1 * time.Second) if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - got = int(si.State.Msgs) - if got != expectedTotal { - t.Errorf("Expected %v, got: %v", expectedTotal, got) + t.Error(err) } - - got = len(si.Sources) - expected := 2 + got = msg.Subject + expected = "nothing" if got != expected { - t.Errorf("Expected %v, got: %v", expected, got) + t.Fatalf("Expected %v, got %v", expected, got) } - t.Run("consume from sourced stream", func(t *testing.T) { - sub, err := js.SubscribeSync("origin", nats.BindStream(streamName)) - if err != nil { - t.Fatal(err) - } + }) + }) - mmsgs := make([]*nats.Msg, 0) - for i := 0; i < totalMsgs; i++ { - msg, err := sub.NextMsg(2 * time.Second) - if err != nil { - t.Error(err) - } - meta, err := msg.Metadata() - if err != nil { - t.Error(err) - } - if meta.Stream != streamName { - t.Errorf("Expected m1, got: %v", meta.Stream) - } - mmsgs = append(mmsgs, msg) - } - if len(mmsgs) != totalMsgs { - t.Errorf("Expected to consume %v msgs, got: %v", totalMsgs, len(mmsgs)) - } - }) + t.Run("create sourced stream with a cycle", func(t *testing.T) { + // Since v2.8.0, this test would fail with a "detected cycle" error. + sources := make([]*nats.StreamSource, 0) + sources = append(sources, &nats.StreamSource{Name: "origin"}) + sources = append(sources, &nats.StreamSource{Name: "m1"}) + streamName := "s2" + _, err = js.AddStream(&nats.StreamConfig{ + Name: streamName, + Sources: sources, + Storage: nats.FileStorage, + Replicas: 1, }) - */ + var aerr *nats.APIError + if ok := errors.As(err, &aerr); !ok || aerr.ErrorCode != nats.JSStreamInvalidConfig { + t.Fatalf("Expected nats.APIError, got %v", err) + } + }) } func TestJetStream_ClusterMultipleSubscribe(t *testing.T) { diff --git a/test/object_test.go b/test/object_test.go index 93e345eca..28b109028 100644 --- a/test/object_test.go +++ b/test/object_test.go @@ -401,27 +401,45 @@ func TestObjectMetadata(t *testing.T) { nc, js := jsClient(t, s) defer nc.Close() - obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "META-TEST"}) + bucketMetadata := map[string]string{"foo": "bar", "baz": "boo"} + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{ + Bucket: "META-TEST", + Metadata: bucketMetadata, + }) + expectOk(t, err) + status, err := obs.Status() expectOk(t, err) + if !reflect.DeepEqual(status.Metadata(), bucketMetadata) { + t.Fatalf("invalid bucket metadata: %+v", status.Metadata()) + } // Simple with no Meta. _, err = obs.PutString("A", "AAA") expectOk(t, err) - _, err = obs.PutString("C", "CCC") + buf := bytes.NewBufferString("CCC") + objectMetadata := map[string]string{"name": "C", "description": "descC"} + info, err := obs.Put(&nats.ObjectMeta{Name: "C", Metadata: objectMetadata}, buf) expectOk(t, err) + if !reflect.DeepEqual(info.Metadata, objectMetadata) { + t.Fatalf("invalid object metadata: %+v", info.Metadata) + } meta := &nats.ObjectMeta{Name: "A"} meta.Description = "descA" meta.Headers = make(nats.Header) meta.Headers.Set("color", "blue") + objectMetadata["description"] = "updated desc" + objectMetadata["version"] = "0.1" + meta.Metadata = objectMetadata // simple update that does not change the name, just adds data err = obs.UpdateMeta("A", meta) expectOk(t, err) - info, err := obs.GetInfo("A") + info, err = obs.GetInfo("A") expectOk(t, err) - if info.Name != "A" || info.Description != "descA" || info.Headers == nil || info.Headers.Get("color") != "blue" { + if info.Name != "A" || info.Description != "descA" || info.Headers == nil || info.Headers.Get("color") != "blue" || + !reflect.DeepEqual(info.Metadata, objectMetadata) { t.Fatalf("Update failed: %+v", info) } @@ -430,6 +448,7 @@ func TestObjectMetadata(t *testing.T) { meta.Description = "descB" meta.Headers = make(nats.Header) meta.Headers.Set("color", "red") + meta.Metadata = nil err = obs.UpdateMeta("A", meta) expectOk(t, err) @@ -441,7 +460,7 @@ func TestObjectMetadata(t *testing.T) { info, err = obs.GetInfo("B") expectOk(t, err) - if info.Name != "B" || info.Description != "descB" || info.Headers == nil || info.Headers.Get("color") != "red" { + if info.Name != "B" || info.Description != "descB" || info.Headers == nil || info.Headers.Get("color") != "red" || info.Metadata != nil { t.Fatalf("Update failed: %+v", info) }