Skip to content

Commit

Permalink
fix(live): fix live loader to load with force namespace (#7445)
Browse files Browse the repository at this point in the history
Fixes the live loader for loading data using force-namespace flag.
Earlier, the schema alteration was not following the passed flag.
  • Loading branch information
NamanJain8 authored Feb 18, 2021
1 parent f35fbf8 commit e2d7bb8
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 16 deletions.
5 changes: 3 additions & 2 deletions dgraph/cmd/live/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,10 +673,11 @@ func run() error {
}
}()
ctx := context.Background()
if len(creds.GetString("user")) > 0 && opt.namespaceToLoad == math.MaxUint64 {
if len(creds.GetString("user")) > 0 && creds.GetUint64("namespace") == x.GalaxyNamespace &&
opt.namespaceToLoad != x.GalaxyNamespace {
// Attach the galaxy to the context to specify that the query/mutations with this context
// will be galaxy-wide.
ctx = x.AttachGalaxyOperation(ctx)
ctx = x.AttachGalaxyOperation(ctx, opt.namespaceToLoad)
// We don't support upsert predicate while loading data in multiple namespace.
if len(opt.upsertPredicate) > 0 {
return errors.Errorf("Upsert Predicate feature is not supported for loading" +
Expand Down
18 changes: 9 additions & 9 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,25 +262,25 @@ func parseSchemaFromAlterOperation(ctx context.Context, op *api.Operation) (*sch
return nil, errIndexingInProgress
}

var result *schema.ParsedSchema
var err error
namespace, err := x.ExtractNamespace(ctx)
if err != nil {
return nil, errors.Wrapf(err, "While parsing schema")
}

if x.IsGalaxyOperation(ctx) {
// Only the guardian of the galaxy can do a galaxy wide query/mutation. This operation is
// needed by live loader.
if err := AuthGuardianOfTheGalaxy(ctx); err != nil {
return nil, errors.Wrap(err, "Non guardian of galaxy user cannot bypass namespaces.")
}
// Parse the schema preserving the namespace.
result, err = schema.Parse(op.Schema)
} else {
var ns uint64
ns, err = x.ExtractNamespace(ctx)
var err error
namespace, err = strconv.ParseUint(x.GetForceNamespace(ctx), 0, 64)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "Valid force namespace not found in metadata")
}
result, err = schema.ParseWithNamespace(op.Schema, ns)
}

result, err := schema.ParseWithNamespace(op.Schema, namespace)
if err != nil {
return nil, err
}
Expand Down
27 changes: 24 additions & 3 deletions systest/multi-tenancy/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ func TestDeleteNamespace(t *testing.T) {
`, ns)),
CommitNow: true,
}
// TODO(Naman): This should return and error.
_, err := dg[ns].NewTxn().Mutate(context.Background(), mutation)
require.NoError(t, err)
}
Expand Down Expand Up @@ -265,7 +264,7 @@ func TestLiveLoadMulti(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), "Attribute name is not indexed")

// Load data by non-galaxy user.
// live load data into namespace ns using the guardian of galaxy.
require.NoError(t, liveLoadData(t, &liveOpts{
rdfs: fmt.Sprintf(`
_:a <name> "ns chew" .
Expand All @@ -274,6 +273,26 @@ func TestLiveLoadMulti(t *testing.T) {
`, ns, 0x100),
schema: `
name: string @index(term) .
`,
creds: &testutil.LoginParams{UserID: "groot", Passwd: "password",
Namespace: x.GalaxyNamespace},
forceNs: int64(ns),
}))

resp = testutil.QueryData(t, dc1, query3)
testutil.CompareJSON(t,
`{"me": [{"name":"ns alice"}, {"name": "ns bob"},{"name":"ns chew"},
{"name": "ns dan"},{"name":"ns eon"}]}`, string(resp))

// Load data by non-galaxy user.
require.NoError(t, liveLoadData(t, &liveOpts{
rdfs: fmt.Sprintf(`
_:a <name> "ns free" .
_:b <name> "ns gary" <%#x> .
_:c <name> "ns hola" <%#x> .
`, ns, 0x100),
schema: `
name: string @index(term) .
`,
creds: &testutil.LoginParams{UserID: "groot", Passwd: "password", Namespace: ns},
forceNs: -1, // this will be ignored.
Expand All @@ -282,7 +301,9 @@ func TestLiveLoadMulti(t *testing.T) {
resp = testutil.QueryData(t, dc1, query3)
testutil.CompareJSON(t,
`{"me": [{"name":"ns alice"}, {"name": "ns bob"},{"name":"ns chew"},
{"name": "ns dan"},{"name":"ns eon"}]}`, string(resp))
{"name": "ns dan"},{"name":"ns eon"}, {"name": "ns free"},{"name":"ns gary"},
{"name": "ns hola"}]}`, string(resp))

}

func TestMain(m *testing.M) {
Expand Down
17 changes: 15 additions & 2 deletions x/x.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,24 @@ func ExtractNamespace(ctx context.Context) (uint64, error) {
func IsGalaxyOperation(ctx context.Context) bool {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
glog.Fatal("No metadata in the context")
return false
}
ns := md.Get("galaxy-operation")
return len(ns) > 0 && (ns[0] == "true" || ns[0] == "True")
}

func GetForceNamespace(ctx context.Context) string {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return ""
}
ns := md.Get("force-namespace")
if len(ns) == 0 {
return ""
}
return ns[0]
}

func ExtractJwt(ctx context.Context) ([]string, error) {
// extract the jwt and unmarshal the jwt to get the list of groups
md, ok := metadata.FromIncomingContext(ctx)
Expand Down Expand Up @@ -451,12 +463,13 @@ func AttachNamespace(ctx context.Context, namespace uint64) context.Context {
}

// AttachGalaxyOperation specifies in the context that it will be used for doing a galaxy operation.
func AttachGalaxyOperation(ctx context.Context) context.Context {
func AttachGalaxyOperation(ctx context.Context, ns uint64) context.Context {
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(nil)
}
md.Set("galaxy-operation", "true")
md.Set("force-namespace", strconv.FormatUint(ns, 10))
return metadata.NewOutgoingContext(ctx, md)
}

Expand Down

0 comments on commit e2d7bb8

Please sign in to comment.