diff --git a/jetstream/object.go b/jetstream/object.go index 8b2b7097b..a0eecff33 100644 --- a/jetstream/object.go +++ b/jetstream/object.go @@ -1308,7 +1308,8 @@ func (obs *obs) Watch(ctx context.Context, opts ...WatchOpt) (ObjectWatcher, err } // Used ordered consumer to deliver results. - subOpts := []nats.SubOpt{nats.OrderedConsumer()} + streamName := fmt.Sprintf(objNameTmpl, obs.name) + subOpts := []nats.SubOpt{nats.OrderedConsumer(), nats.BindStream(streamName)} if !o.includeHistory { subOpts = append(subOpts, nats.DeliverLastPerSubject()) } diff --git a/jetstream/test/object_test.go b/jetstream/test/object_test.go index 70e2b7096..8f421c51f 100644 --- a/jetstream/test/object_test.go +++ b/jetstream/test/object_test.go @@ -1197,3 +1197,72 @@ func TestObjectStoreCompression(t *testing.T) { t.Fatalf("Expected stream to be compressed with S2") } } + +func TestObjectStoreMirror(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + bucketName := "test-bucket" + + ctx := context.Background() + obs, err := js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: bucketName, Description: "testing"}) + expectOk(t, err) + + mirrorBucketName := "mirror-test-bucket" + + _, err = js.CreateStream(ctx, jetstream.StreamConfig{ + Name: fmt.Sprintf("OBJ_%s", mirrorBucketName), + Mirror: &jetstream.StreamSource{ + Name: fmt.Sprintf("OBJ_%s", bucketName), + SubjectTransforms: []jetstream.SubjectTransformConfig{ + { + Source: fmt.Sprintf("$O.%s.>", bucketName), + Destination: fmt.Sprintf("$O.%s.>", mirrorBucketName), + }, + }, + }, + AllowRollup: true, // meta messages are always rollups + }) + if err != nil { + t.Fatalf("Error creating object store bucket mirror: %v", err) + } + + _, err = obs.PutString(ctx, "A", "abc") + expectOk(t, err) + + mirrorObs, err := js.ObjectStore(ctx, mirrorBucketName) + expectOk(t, err) + + // Make sure we sync. + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + mirrorValue, err := mirrorObs.GetString(ctx, "A") + if err != nil { + return err + } + if mirrorValue != "abc" { + t.Fatalf("Expected mirrored object store value to be the same as original") + } + return nil + }) + + watcher, err := mirrorObs.Watch(ctx) + if err != nil { + t.Fatalf("Error creating watcher: %v", err) + } + defer watcher.Stop() + + // expect to get one value and nil + for { + select { + case info := <-watcher.Updates(): + if info == nil { + return + } + case <-time.After(2 * time.Second): + t.Fatalf("Expected to receive an update") + } + } +} diff --git a/object.go b/object.go index 4a965adfc..75ceaa8e9 100644 --- a/object.go +++ b/object.go @@ -1115,7 +1115,8 @@ func (obs *obs) Watch(opts ...WatchOpt) (ObjectWatcher, error) { } // Used ordered consumer to deliver results. - subOpts := []SubOpt{OrderedConsumer()} + streamName := fmt.Sprintf(objNameTmpl, obs.name) + subOpts := []SubOpt{OrderedConsumer(), BindStream(streamName)} if !o.includeHistory { subOpts = append(subOpts, DeliverLastPerSubject()) } diff --git a/test/object_test.go b/test/object_test.go index a386ec8ce..f6ecb57a2 100644 --- a/test/object_test.go +++ b/test/object_test.go @@ -1163,4 +1163,22 @@ func TestObjectStoreMirror(t *testing.T) { } return nil }) + + watcher, err := mirrorObs.Watch() + if err != nil { + t.Fatalf("Error creating watcher: %v", err) + } + defer watcher.Stop() + + // expect to get one value and nil + for { + select { + case info := <-watcher.Updates(): + if info == nil { + return + } + case <-time.After(2 * time.Second): + t.Fatalf("Expected to receive an update") + } + } }