diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index 5dd11463c68..2feaa612b8f 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -42,6 +42,7 @@ import ( "github.com/dgraph-io/dgraph/chunker" "github.com/dgraph-io/dgraph/ee/enc" "github.com/dgraph-io/dgraph/filestore" + gqlSchema "github.com/dgraph-io/dgraph/graphql/schema" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/x" @@ -195,6 +196,10 @@ func (ld *loader) leaseNamespaces() { } func readSchema(opt *options) *schema.ParsedSchema { + if opt.SchemaFile == "" { + return genDQLSchema(opt) + } + f, err := filestore.Open(opt.SchemaFile) x.Check(err) defer func() { @@ -222,6 +227,32 @@ func readSchema(opt *options) *schema.ParsedSchema { return result } +func genDQLSchema(opt *options) *schema.ParsedSchema { + gqlSchBytes := readGqlSchema(opt) + nsToSchemas := parseGqlSchema(string(gqlSchBytes)) + + var finalSch schema.ParsedSchema + for ns, gqlSch := range nsToSchemas { + if opt.Namespace != math.MaxUint64 { + ns = opt.Namespace + } + + h, err := gqlSchema.NewHandler(gqlSch, false) + x.Check(err) + + _, err = gqlSchema.FromString(h.GQLSchema(), ns) + x.Check(err) + + ps, err := schema.ParseWithNamespace(h.DGSchema(), ns) + x.Check(err) + + finalSch.Preds = append(finalSch.Preds, ps.Preds...) + finalSch.Types = append(finalSch.Types, ps.Types...) + } + + return &finalSch +} + func (ld *loader) mapStage() { ld.prog.setPhase(mapPhase) var db *badger.DB @@ -332,12 +363,8 @@ func parseGqlSchema(s string) map[uint64]string { return schemaMap } -func (ld *loader) processGqlSchema(loadType chunker.InputFormat) { - if ld.opt.GqlSchemaFile == "" { - return - } - - f, err := filestore.Open(ld.opt.GqlSchemaFile) +func readGqlSchema(opt *options) []byte { + f, err := filestore.Open(opt.GqlSchemaFile) x.Check(err) defer func() { if err := f.Close(); err != nil { @@ -345,19 +372,26 @@ func (ld *loader) processGqlSchema(loadType chunker.InputFormat) { } }() - key := ld.opt.EncryptionKey - if !ld.opt.Encrypted { + key := opt.EncryptionKey + if !opt.Encrypted { key = nil } r, err := enc.GetReader(key, f) x.Check(err) - if filepath.Ext(ld.opt.GqlSchemaFile) == ".gz" { + if filepath.Ext(opt.GqlSchemaFile) == ".gz" { r, err = gzip.NewReader(r) x.Check(err) } buf, err := io.ReadAll(r) x.Check(err) + return buf +} + +func (ld *loader) processGqlSchema(loadType chunker.InputFormat) { + if ld.opt.GqlSchemaFile == "" { + return + } rdfSchema := `_:gqlschema "dgraph.graphql" <%#x> . _:gqlschema "dgraph.graphql.schema" <%#x> . @@ -388,6 +422,7 @@ func (ld *loader) processGqlSchema(loadType chunker.InputFormat) { ld.readerChunkCh <- gqlBuf } + buf := readGqlSchema(ld.opt) schemas := parseGqlSchema(string(buf)) if ld.opt.Namespace == math.MaxUint64 { // Preserve the namespace. diff --git a/dgraph/cmd/bulk/run.go b/dgraph/cmd/bulk/run.go index 8d61258da03..773329b0678 100644 --- a/dgraph/cmd/bulk/run.go +++ b/dgraph/cmd/bulk/run.go @@ -215,12 +215,16 @@ func run() { fmt.Printf("Encrypted input: %v; Encrypted output: %v\n", opt.Encrypted, opt.EncryptedOut) if opt.SchemaFile == "" { - fmt.Fprint(os.Stderr, "Schema file must be specified.\n") - os.Exit(1) - } - if !filestore.Exists(opt.SchemaFile) { - fmt.Fprintf(os.Stderr, "Schema path(%v) does not exist.\n", opt.SchemaFile) - os.Exit(1) + // if only graphql schema is provided, we can generate DQL schema from it. + if opt.GqlSchemaFile == "" { + fmt.Fprint(os.Stderr, "Schema file must be specified.\n") + os.Exit(1) + } + } else { + if !filestore.Exists(opt.SchemaFile) { + fmt.Fprintf(os.Stderr, "Schema path(%v) does not exist.\n", opt.SchemaFile) + os.Exit(1) + } } if opt.DataFiles == "" { fmt.Fprint(os.Stderr, "RDF or JSON file(s) location must be specified.\n") diff --git a/dgraphtest/config.go b/dgraphtest/config.go index 9b611b22d23..6fa5e175037 100644 --- a/dgraphtest/config.go +++ b/dgraphtest/config.go @@ -84,6 +84,7 @@ type ClusterConfig struct { uidLease int // exposed port offset for grpc/http port for both alpha/zero portOffset int + bulkOutDir string } func NewClusterConfig() ClusterConfig { @@ -163,3 +164,10 @@ func (cc ClusterConfig) WithExposedPortOffset(offset uint64) ClusterConfig { cc.portOffset = int(offset) return cc } + +// WithBulkLoadOutDir sets the out dir for the bulk loader. This ensures +// that the same p directory is used while setting up alphas. +func (cc ClusterConfig) WithBulkLoadOutDir(dir string) ClusterConfig { + cc.bulkOutDir = dir + return cc +} diff --git a/dgraphtest/dgraph.go b/dgraphtest/dgraph.go index da7ca05a68f..553cf5d170c 100644 --- a/dgraphtest/dgraph.go +++ b/dgraphtest/dgraph.go @@ -31,7 +31,7 @@ import ( ) const ( - binaryName = "dgraph_%v" + binaryNameFmt = "dgraph_%v" zeroNameFmt = "%v_zero%d" zeroAliasNameFmt = "zero%d" alphaNameFmt = "%v_alpha%d" @@ -46,6 +46,7 @@ const ( alphaWorkingDir = "/data/alpha" zeroWorkingDir = "/data/zero" + DefaultAlphaPDir = "/data/alpha/p" DefaultBackupDir = "/data/backups" DefaultExportDir = "/data/exports" @@ -275,6 +276,19 @@ func (a *alpha) mounts(c *LocalCluster) ([]mount.Mount, error) { }) } + pDir := filepath.Join(c.conf.bulkOutDir, strconv.Itoa(a.id/c.conf.replicas), "p") + if err := os.MkdirAll(pDir, os.ModePerm); err != nil { + return nil, errors.Wrap(err, "erorr creating bulk dir") + } + if c.conf.bulkOutDir != "" { + mounts = append(mounts, mount.Mount{ + Type: mount.TypeBind, + Source: pDir, + Target: DefaultAlphaPDir, + ReadOnly: false, + }) + } + for dir, vol := range c.conf.volumes { mounts = append(mounts, mount.Mount{ Type: mount.TypeVolume, @@ -333,28 +347,6 @@ func publicPort(dcli *docker.Client, dc dnode, privatePort string) (string, erro } func mountBinary(c *LocalCluster) (mount.Mount, error) { - if c.conf.version == localVersion { - return mount.Mount{ - Type: mount.TypeBind, - Source: filepath.Join(os.Getenv("GOPATH"), "bin"), - Target: "/gobin", - ReadOnly: true, - }, nil - } - - isFileExist, err := fileExists(filepath.Join(c.tempBinDir, "dgraph")) - if err != nil { - return mount.Mount{}, err - } - if isFileExist { - return mount.Mount{ - Type: mount.TypeBind, - Source: c.tempBinDir, - Target: "/gobin", - ReadOnly: true, - }, nil - } - if err := c.setupBinary(); err != nil { return mount.Mount{}, err } diff --git a/dgraphtest/image.go b/dgraphtest/image.go index 953ff94b813..bf135be3e59 100644 --- a/dgraphtest/image.go +++ b/dgraphtest/image.go @@ -32,7 +32,12 @@ func (c *LocalCluster) dgraphImage() string { } func (c *LocalCluster) setupBinary() error { - isFileThere, err := fileExists(filepath.Join(binDir, fmt.Sprintf(binaryName, c.conf.version))) + if c.conf.version == localVersion { + fromDir := filepath.Join(os.Getenv("GOPATH"), "bin") + return copyBinary(fromDir, c.tempBinDir, c.conf.version) + } + + isFileThere, err := fileExists(filepath.Join(binDir, fmt.Sprintf(binaryNameFmt, c.conf.version))) if err != nil { return err } @@ -138,15 +143,20 @@ func buildDgraphBinary(dir, binaryDir, version string) error { return errors.Wrapf(err, "error while building dgraph binary\noutput:%v", string(out)) } if err := copy(filepath.Join(dir, "dgraph", "dgraph"), - filepath.Join(binaryDir, fmt.Sprintf(binaryName, version))); err != nil { + filepath.Join(binaryDir, fmt.Sprintf(binaryNameFmt, version))); err != nil { return errors.Wrap(err, "error while copying binary") } return nil } func copyBinary(fromDir, toDir, version string) error { - if err := copy(filepath.Join(fromDir, fmt.Sprintf(binaryName, version)), - filepath.Join(toDir, "dgraph")); err != nil { + binaryName := "dgraph" + if version != localVersion { + binaryName = fmt.Sprintf(binaryNameFmt, version) + } + fromPath := filepath.Join(fromDir, binaryName) + toPath := filepath.Join(toDir, "dgraph") + if err := copy(fromPath, toPath); err != nil { return errors.Wrap(err, "error while copying binary into tempBinDir") } return nil diff --git a/dgraphtest/load.go b/dgraphtest/load.go index cf888b98701..98d222fed70 100644 --- a/dgraphtest/load.go +++ b/dgraphtest/load.go @@ -26,6 +26,7 @@ import ( "os" "os/exec" "path/filepath" + "strconv" "strings" "github.com/pkg/errors" @@ -34,7 +35,7 @@ import ( ) type LiveOpts struct { - RdfFiles []string + DataFiles []string SchemaFiles []string GqlSchemaFiles []string } @@ -140,7 +141,7 @@ func (c *LocalCluster) LiveLoad(opts LiveOpts) error { args := []string{ "live", - "--files", strings.Join(opts.RdfFiles, ","), + "--files", strings.Join(opts.DataFiles, ","), "--alpha", strings.Join(alphaURLs, ","), "--zero", zeroURL, } @@ -227,7 +228,7 @@ func (c *LocalCluster) LiveLoadFromExport(exportDir string) error { } opts := LiveOpts{ - RdfFiles: rdfFiles, + DataFiles: rdfFiles, SchemaFiles: schemaFiles, GqlSchemaFiles: gqlSchemaFiles, } @@ -236,3 +237,47 @@ func (c *LocalCluster) LiveLoadFromExport(exportDir string) error { } return nil } + +type BulkOpts struct { + DataFiles []string + SchemaFiles []string + GQLSchemaFiles []string +} + +func (c *LocalCluster) BulkLoad(opts BulkOpts) error { + zeroURL, err := c.zeros[0].zeroURL(c) + if err != nil { + return errors.Wrap(err, "error finding URL of first zero") + } + + shards := c.conf.numAlphas / c.conf.replicas + args := []string{"bulk", + "--store_xids=true", + "--zero", zeroURL, + "--reduce_shards", strconv.Itoa(shards), + "--map_shards", strconv.Itoa(shards), + "--out", c.conf.bulkOutDir, + // we had to create the dir for setting up docker, hence, replacing it here. + "--replace_out", + } + + if len(opts.DataFiles) > 0 { + args = append(args, "-f", strings.Join(opts.DataFiles, ",")) + } + if len(opts.SchemaFiles) > 0 { + args = append(args, "-s", strings.Join(opts.SchemaFiles, ",")) + } + if len(opts.GQLSchemaFiles) > 0 { + args = append(args, "-g", strings.Join(opts.GQLSchemaFiles, ",")) + } + + log.Printf("[INFO] running bulk loader with args: [%v]", strings.Join(args, " ")) + cmd := exec.Command(filepath.Join(c.tempBinDir, "dgraph"), args...) + if out, err := cmd.CombinedOutput(); err != nil { + return errors.Wrapf(err, "error running bulk loader: %v", string(out)) + } else { + log.Printf("[INFO] ==== output for bulk loader ====") + log.Println(string(out)) + return nil + } +} diff --git a/dgraphtest/local_cluster.go b/dgraphtest/local_cluster.go index a246d5044cc..04089e053ec 100644 --- a/dgraphtest/local_cluster.go +++ b/dgraphtest/local_cluster.go @@ -262,7 +262,7 @@ func (c *LocalCluster) Start() error { return err } } - return c.HealthCheck() + return c.HealthCheck(false) } func (c *LocalCluster) StartZero(id int) error { @@ -343,7 +343,7 @@ func (c *LocalCluster) killContainer(dc dnode) error { return nil } -func (c *LocalCluster) HealthCheck() error { +func (c *LocalCluster) HealthCheck(zeroOnly bool) error { log.Printf("[INFO] checking health of containers") for i := 0; i < c.conf.numZeros; i++ { url, err := c.zeros[i].healthURL(c) @@ -355,6 +355,10 @@ func (c *LocalCluster) HealthCheck() error { } log.Printf("[INFO] container [zero-%v] passed health check", i) } + if zeroOnly { + return nil + } + for i := 0; i < c.conf.numAlphas; i++ { url, err := c.alphas[i].healthURL(c) if err != nil { diff --git a/systest/integration2/bulk_loader_test.go b/systest/integration2/bulk_loader_test.go new file mode 100644 index 00000000000..61d8a600358 --- /dev/null +++ b/systest/integration2/bulk_loader_test.go @@ -0,0 +1,110 @@ +//go:build integration2 + +/* + * Copyright 2023 Dgraph Labs, Inc. and Contributors * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/dgraph-io/dgraph/dgraphtest" + "github.com/dgraph-io/dgraph/x" + "github.com/stretchr/testify/require" +) + +const ( + gqlSchema = `[{"namespace": 0, "schema": "type Message ` + + `{\nid: ID!\ncontent: String!\nauthor: String\nuniqueId: Int64! @id\ndatePosted: DateTime\n}"}]` + jsonData = ` + [ + {"Message.content": "XBNTBGBHGQ", "Message.author": "PXYNHBWGGD", ` + + `"Message.uniqueId": 7, "Message.datePosted": "2024-02-08T16:48:25Z", "dgraph.type": "Message"}, + {"Message.content": "ILEOMLXRYX", "Message.author": "BBBZKURCJH", ` + + `"Message.uniqueId": 5, "Message.datePosted": "2024-04-27T16:48:25Z", "dgraph.type": "Message"}, + {"Message.content": "RFAXPWUCUN", "Message.author": "CMZEOCORNL", ` + + `"Message.uniqueId": 0, "Message.datePosted": "2024-04-30T16:48:25Z", "dgraph.type": "Message"}, + {"Message.content": "ZKCRMYNBLT", "Message.author": "TYLORHNKJA", ` + + `"Message.uniqueId": 9, "Message.datePosted": "2024-06-21T16:48:25Z", "dgraph.type": "Message"}, + {"Message.content": "HMODLPKCHE", "Message.author": "ZNTIZEYBMV", ` + + `"Message.uniqueId": 4, "Message.datePosted": "2024-06-10T16:48:25Z", "dgraph.type": "Message"}, + {"Message.content": "FBIOEOJBZF", "Message.author": "EQXLNWFYBN", ` + + `"Message.uniqueId": 6, "Message.datePosted": "2023-10-05T16:48:25Z", "dgraph.type": "Message"}, + {"Message.content": "DVTCTXCVYI", "Message.author": "USYMVFJYXA", ` + + `"Message.uniqueId": 3, "Message.datePosted": "2023-12-29T16:48:25Z", "dgraph.type": "Message"}, + {"Message.content": "SOWTAXHTCT", "Message.author": "SAILDEMEJV", ` + + `"Message.uniqueId": 8, "Message.datePosted": "2023-08-22T16:48:25Z", "dgraph.type": "Message"}, + {"Message.content": "MLMQWMJQQW", "Message.author": "ANBSOCYLXB", ` + + `"Message.uniqueId": 1, "Message.datePosted": "2023-08-15T16:48:25Z", "dgraph.type": "Message"}, + {"Message.content": "CVFSBBIDCL", "Message.author": "JONAEYCCTQ", ` + + `"Message.uniqueId": 2, "Message.datePosted": "2024-04-20T16:48:25Z", "dgraph.type": "Message"} + ]` +) + +func TestBulkLoaderNoDqlSchema(t *testing.T) { + conf := dgraphtest.NewClusterConfig().WithNumAlphas(2).WithNumZeros(1). + WithACL(time.Hour).WithReplicas(1).WithBulkLoadOutDir(t.TempDir()) + c, err := dgraphtest.NewLocalCluster(conf) + require.NoError(t, err) + defer func() { c.Cleanup(t.Failed()) }() + + // start zero + require.NoError(t, c.StartZero(0)) + require.NoError(t, c.HealthCheck(true)) + + baseDir := t.TempDir() + gqlSchemaFile := filepath.Join(baseDir, "gql.schema") + require.NoError(t, ioutil.WriteFile(gqlSchemaFile, []byte(gqlSchema), os.ModePerm)) + dataFile := filepath.Join(baseDir, "data.json") + require.NoError(t, ioutil.WriteFile(dataFile, []byte(jsonData), os.ModePerm)) + + opts := dgraphtest.BulkOpts{ + DataFiles: []string{dataFile}, + GQLSchemaFiles: []string{gqlSchemaFile}, + } + require.NoError(t, c.BulkLoad(opts)) + + // start Alphas + require.NoError(t, c.Start()) + + // run some queries and ensure everything looks good + hc, err := c.HTTPClient() + require.NoError(t, err) + require.NoError(t, hc.LoginIntoNamespace(dgraphtest.DefaultUser, + dgraphtest.DefaultPassword, x.GalaxyNamespace)) + + params := dgraphtest.GraphQLParams{ + Query: `query { + getMessage(uniqueId: 3) { + content + author + datePosted + } + }`, + } + data, err := hc.RunGraphqlQuery(params, false) + require.NoError(t, err) + dgraphtest.CompareJSON(`{ + "getMessage": { + "content": "DVTCTXCVYI", + "author": "USYMVFJYXA", + "datePosted": "2023-12-29T16:48:25Z" + } + }`, string(data)) +} diff --git a/systest/incremental-restore/incremental_restore_test.go b/systest/integration2/incremental_restore_test.go similarity index 95% rename from systest/incremental-restore/incremental_restore_test.go rename to systest/integration2/incremental_restore_test.go index a2dfd14a4f0..ba2d186c333 100644 --- a/systest/incremental-restore/incremental_restore_test.go +++ b/systest/integration2/incremental_restore_test.go @@ -36,14 +36,14 @@ func TestIncrementalRestore(t *testing.T) { conf := dgraphtest.NewClusterConfig().WithNumAlphas(6).WithNumZeros(3).WithReplicas(3).WithACL(time.Hour) c, err := dgraphtest.NewLocalCluster(conf) require.NoError(t, err) - defer c.Cleanup(t.Failed()) + defer func() { c.Cleanup(t.Failed()) }() require.NoError(t, c.Start()) gc, cleanup, err := c.Client() require.NoError(t, err) defer cleanup() require.NoError(t, gc.LoginIntoNamespace(context.Background(), - dgraphtest.DefaultUser, dgraphtest.DefaultPassword, 0)) + dgraphtest.DefaultUser, dgraphtest.DefaultPassword, x.GalaxyNamespace)) hc, err := c.HTTPClient() require.NoError(t, err)