Skip to content

Commit

Permalink
Merge pull request #1112 from nats-io/kv-mirrors
Browse files Browse the repository at this point in the history
Added support for KV mirrors and sources.
  • Loading branch information
derekcollison authored Oct 20, 2022
2 parents 888a91d + 1d9796f commit ffbe2f9
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 17 deletions.
3 changes: 3 additions & 0 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ const (
// jsDomainT is used to create JetStream API prefix by specifying only Domain
jsDomainT = "$JS.%s.API."

// jsExtDomainT is used to create a StreamSource External APIPrefix
jsExtDomainT = "$JS.%s.API"

// apiAccountInfo is for obtaining general information about JetStream.
apiAccountInfo = "INFO"

Expand Down
65 changes: 58 additions & 7 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,44 @@ type StreamSource struct {
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Domain string `json:"-"`
}

// ExternalStream allows you to qualify access to a stream source in another
// account.
type ExternalStream struct {
APIPrefix string `json:"api"`
DeliverPrefix string `json:"deliver"`
DeliverPrefix string `json:"deliver,omitempty"`
}

// Helper for copying when we do not want to change user's version.
func (ss *StreamSource) copy() *StreamSource {
nss := *ss
// Check pointers
if ss.OptStartTime != nil {
t := *ss.OptStartTime
nss.OptStartTime = &t
}
if ss.External != nil {
ext := *ss.External
nss.External = &ext
}
return &nss
}

// If we have a Domain, convert to the appropriate ext.APIPrefix.
// This will change the stream source, so should be a copy passed in.
func (ss *StreamSource) convertDomain() error {
if ss.Domain == _EMPTY_ {
return nil
}
if ss.External != nil {
// These should be mutually exclusive.
// TODO(dlc) - Make generic?
return errors.New("nats: domain and external are both set")
}
ss.External = &ExternalStream{APIPrefix: fmt.Sprintf(jsExtDomainT, ss.Domain)}
return nil
}

// apiResponse is a standard response from the JetStream JSON API
Expand Down Expand Up @@ -689,7 +720,31 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
defer cancel()
}

req, err := json.Marshal(cfg)
// In case we need to change anything, copy so we do not change the caller's version.
ncfg := *cfg

// If we have a mirror and an external domain, convert to ext.APIPrefix.
if cfg.Mirror != nil && cfg.Mirror.Domain != _EMPTY_ {
// Copy so we do not change the caller's version.
ncfg.Mirror = ncfg.Mirror.copy()
if err := ncfg.Mirror.convertDomain(); err != nil {
return nil, err
}
}
// Check sources for the same.
if len(ncfg.Sources) > 0 {
ncfg.Sources = append([]*StreamSource(nil), ncfg.Sources...)
for i, ss := range ncfg.Sources {
if ss.Domain != _EMPTY_ {
ncfg.Sources[i] = ss.copy()
if err := ncfg.Sources[i].convertDomain(); err != nil {
return nil, err
}
}
}
}

req, err := json.Marshal(&ncfg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -991,17 +1046,13 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt
}

var apiSubj string

doDirectGetLastBySubject := o.directGet && mreq.LastFor != _EMPTY_

if doDirectGetLastBySubject {
if o.directGet && mreq.LastFor != _EMPTY_ {
apiSubj = apiDirectMsgGetLastBySubjectT
dsSubj := js.apiSubj(fmt.Sprintf(apiSubj, name, mreq.LastFor))
r, err := js.apiRequestWithContext(o.ctx, dsSubj, nil)
if err != nil {
return nil, err
}

return convertDirectGetMsgResponseToMsg(name, r)
}

Expand Down
64 changes: 54 additions & 10 deletions kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ type KeyValueConfig struct {
Replicas int
Placement *Placement
RePublish *RePublish
Mirror *StreamSource
Sources []*StreamSource
}

// Used to watch all keys.
Expand Down Expand Up @@ -298,10 +300,12 @@ var (
)

const (
kvBucketNameTmpl = "KV_%s"
kvSubjectsTmpl = "$KV.%s.>"
kvSubjectsPreTmpl = "$KV.%s."
kvNoPending = "0"
kvBucketNamePre = "KV_"
kvBucketNameTmpl = "KV_%s"
kvSubjectsTmpl = "$KV.%s.>"
kvSubjectsPreTmpl = "$KV.%s."
kvSubjectsPreDomainTmpl = "%s.$KV.%s."
kvNoPending = "0"
)

// Regex for valid keys and buckets.
Expand Down Expand Up @@ -386,7 +390,6 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
scfg := &StreamConfig{
Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket),
Description: cfg.Description,
Subjects: []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)},
MaxMsgsPerSubject: history,
MaxBytes: maxBytes,
MaxAge: cfg.TTL,
Expand All @@ -402,6 +405,26 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
AllowDirect: true,
RePublish: cfg.RePublish,
}
if cfg.Mirror != nil {
// Copy in case we need to make changes so we do not change caller's version.
m := cfg.Mirror.copy()
if !strings.HasPrefix(m.Name, kvBucketNamePre) {
m.Name = fmt.Sprintf(kvBucketNameTmpl, m.Name)
}
scfg.Mirror = m
scfg.MirrorDirect = true
} else if len(cfg.Sources) > 0 {
// For now we do not allow direct subjects for sources. If that is desired a user could use stream API directly.
for _, ss := range cfg.Sources {
if !strings.HasPrefix(ss.Name, kvBucketNamePre) {
ss = ss.copy()
ss.Name = fmt.Sprintf(kvBucketNameTmpl, ss.Name)
}
scfg.Sources = append(scfg.Sources, ss)
}
} else {
scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}
}

// If we are at server version 2.7.2 or above use DiscardNew. We can not use DiscardNew for 2.7.1 or below.
if js.nc.serverMinVersion(2, 7, 2) {
Expand Down Expand Up @@ -445,6 +468,7 @@ type kvs struct {
name string
stream string
pre string
putPre string
js *js
// If true, it means that APIPrefix/Domain was set in the context
// and we need to add something to some of our high level protocols
Expand Down Expand Up @@ -520,9 +544,9 @@ func (kv *kvs) get(key string, revision uint64) (KeyValueEntry, error) {
var _opts [1]JSOpt
opts := _opts[:0]
if kv.useDirect {
_opts[0] = DirectGet()
opts = _opts[:1]
opts = append(opts, DirectGet())
}

if revision == kvLatestRevision {
m, err = kv.js.GetLastMsg(kv.stream, b.String(), opts...)
} else {
Expand Down Expand Up @@ -573,7 +597,11 @@ func (kv *kvs) Put(key string, value []byte) (revision uint64, err error) {
if kv.useJSPfx {
b.WriteString(kv.js.opts.pre)
}
b.WriteString(kv.pre)
if kv.putPre != _EMPTY_ {
b.WriteString(kv.putPre)
} else {
b.WriteString(kv.pre)
}
b.WriteString(key)

pa, err := kv.js.Publish(b.String(), value)
Expand Down Expand Up @@ -1012,8 +1040,9 @@ func (js *js) KeyValueStores() <-chan KeyValueStatus {
}

func mapStreamToKVS(js *js, info *StreamInfo) *kvs {
bucket := strings.TrimPrefix(info.Config.Name, "KV_")
return &kvs{
bucket := strings.TrimPrefix(info.Config.Name, kvBucketNamePre)

kv := &kvs{
name: bucket,
stream: info.Config.Name,
pre: fmt.Sprintf(kvSubjectsPreTmpl, bucket),
Expand All @@ -1022,4 +1051,19 @@ func mapStreamToKVS(js *js, info *StreamInfo) *kvs {
useJSPfx: js.opts.pre != defaultAPIPrefix,
useDirect: info.Config.AllowDirect,
}

// If we are mirroring, we will have mirror direct on, so just use the mirror name
// and override use
if m := info.Config.Mirror; m != nil {
bucket := strings.TrimPrefix(m.Name, kvBucketNamePre)
if m.External != nil && m.External.APIPrefix != _EMPTY_ {
kv.useJSPfx = false
kv.pre = fmt.Sprintf(kvSubjectsPreTmpl, bucket)
kv.putPre = fmt.Sprintf(kvSubjectsPreDomainTmpl, m.External.APIPrefix, bucket)
} else {
kv.putPre = fmt.Sprintf(kvSubjectsPreTmpl, bucket)
}
}

return kv
}
114 changes: 114 additions & 0 deletions test/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,3 +863,117 @@ func TestListKeyValueStores(t *testing.T) {
})
}
}

func TestKeyValueMirrorCrossDomains(t *testing.T) {
conf := createConfFile(t, []byte(`
server_name: HUB
listen: 127.0.0.1:-1
jetstream: { domain: HUB }
leafnodes { listen: 127.0.0.1:7422 }
}`))
defer os.Remove(conf)
s, _ := RunServerWithConfig(conf)
defer shutdownJSServerAndRemoveStorage(t, s)

lconf := createConfFile(t, []byte(`
server_name: LEAF
listen: 127.0.0.1:-1
jetstream: { domain:LEAF }
leafnodes {
remotes = [ { url: "leaf://127.0.0.1" } ]
}
}`))
defer os.Remove(lconf)
ln, _ := RunServerWithConfig(lconf)
defer shutdownJSServerAndRemoveStorage(t, ln)

// Create main KV on HUB
nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"})
expectOk(t, err)

_, err = kv.PutString("name", "derek")
expectOk(t, err)
_, err = kv.PutString("age", "22")
expectOk(t, err)

lnc, ljs := jsClient(t, ln)
defer lnc.Close()

// Capture cfg so we can make sure it does not change.
// NOTE: We use different name to test all possibilities, etc, but in practice for truly nomadic applications
// this should be named the same, e.g. TEST.
cfg := &nats.KeyValueConfig{
Bucket: "MIRROR",
Mirror: &nats.StreamSource{
Name: "TEST",
Domain: "HUB",
},
}
ccfg := *cfg

_, err = ljs.CreateKeyValue(cfg)
expectOk(t, err)

if !reflect.DeepEqual(cfg, &ccfg) {
t.Fatalf("Did not expect config to be altered: %+v vs %+v", cfg, ccfg)
}

si, err := ljs.StreamInfo("KV_MIRROR")
expectOk(t, err)

// Make sure mirror direct set.
if !si.Config.MirrorDirect {
t.Fatalf("Expected mirror direct to be set")
}

// Make sure we sync.
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
si, err := ljs.StreamInfo("KV_MIRROR")
expectOk(t, err)
if si.State.Msgs == 2 {
return nil
}
return fmt.Errorf("Did not get synched messages: %d", si.State.Msgs)
})

// Bind locally from leafnode and make sure both get and put work.
mkv, err := ljs.KeyValue("MIRROR")
expectOk(t, err)

_, err = mkv.PutString("name", "rip")
expectOk(t, err)

e, err := mkv.Get("name")
expectOk(t, err)
if string(e.Value()) != "rip" {
t.Fatalf("Got wrong value: %q vs %q", e.Value(), "rip")
}

// Bind through leafnode connection but to origin KV.
rjs, err := lnc.JetStream(nats.Domain("HUB"))
expectOk(t, err)

rkv, err := rjs.KeyValue("TEST")
expectOk(t, err)

_, err = rkv.PutString("name", "ivan")
expectOk(t, err)

e, err = rkv.Get("name")
expectOk(t, err)
if string(e.Value()) != "ivan" {
t.Fatalf("Got wrong value: %q vs %q", e.Value(), "ivan")
}

// Shutdown cluster and test get still work.
shutdownJSServerAndRemoveStorage(t, s)

e, err = mkv.Get("name")
expectOk(t, err)
if string(e.Value()) != "ivan" {
t.Fatalf("Got wrong value: %q vs %q", e.Value(), "ivan")
}
}

0 comments on commit ffbe2f9

Please sign in to comment.