Skip to content

Commit

Permalink
fix(bulk): enable running bulk loader with only gql schema
Browse files Browse the repository at this point in the history
If only gql schema is provided, and DQL schema is missing
bulk loader used to fail. This PR adds the ability to
auto generate the DQL schema from the gql schema and use
it in the bulk loader.

This PR also adds support for running bulk loader in the
dgraphtest package.
  • Loading branch information
mangalaman93 committed Jul 12, 2023
1 parent ed7af9c commit 96adccf
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 49 deletions.
53 changes: 44 additions & 9 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/dgraph-io/dgraph/chunker"
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/filestore"
gqlSchema "github.com/dgraph-io/dgraph/graphql/schema"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/x"
Expand Down Expand Up @@ -195,6 +196,10 @@ func (ld *loader) leaseNamespaces() {
}

func readSchema(opt *options) *schema.ParsedSchema {
if opt.SchemaFile == "" {
return genDQLSchema(opt)
}

f, err := filestore.Open(opt.SchemaFile)
x.Check(err)
defer func() {
Expand Down Expand Up @@ -222,6 +227,32 @@ func readSchema(opt *options) *schema.ParsedSchema {
return result
}

func genDQLSchema(opt *options) *schema.ParsedSchema {
gqlSchBytes := readGqlSchema(opt)
nsToSchemas := parseGqlSchema(string(gqlSchBytes))

var finalSch schema.ParsedSchema
for ns, gqlSch := range nsToSchemas {
if opt.Namespace != math.MaxUint64 {
ns = opt.Namespace
}

h, err := gqlSchema.NewHandler(gqlSch, false)
x.Check(err)

_, err = gqlSchema.FromString(h.GQLSchema(), ns)
x.Check(err)

ps, err := schema.ParseWithNamespace(h.DGSchema(), ns)
x.Check(err)

finalSch.Preds = append(finalSch.Preds, ps.Preds...)
finalSch.Types = append(finalSch.Types, ps.Types...)
}

return &finalSch
}

func (ld *loader) mapStage() {
ld.prog.setPhase(mapPhase)
var db *badger.DB
Expand Down Expand Up @@ -332,32 +363,35 @@ func parseGqlSchema(s string) map[uint64]string {
return schemaMap
}

func (ld *loader) processGqlSchema(loadType chunker.InputFormat) {
if ld.opt.GqlSchemaFile == "" {
return
}

f, err := filestore.Open(ld.opt.GqlSchemaFile)
func readGqlSchema(opt *options) []byte {
f, err := filestore.Open(opt.GqlSchemaFile)
x.Check(err)
defer func() {
if err := f.Close(); err != nil {
glog.Warningf("error while closing fd: %v", err)
}
}()

key := ld.opt.EncryptionKey
if !ld.opt.Encrypted {
key := opt.EncryptionKey
if !opt.Encrypted {
key = nil
}
r, err := enc.GetReader(key, f)
x.Check(err)
if filepath.Ext(ld.opt.GqlSchemaFile) == ".gz" {
if filepath.Ext(opt.GqlSchemaFile) == ".gz" {
r, err = gzip.NewReader(r)
x.Check(err)
}

buf, err := io.ReadAll(r)
x.Check(err)
return buf
}

func (ld *loader) processGqlSchema(loadType chunker.InputFormat) {
if ld.opt.GqlSchemaFile == "" {
return
}

rdfSchema := `_:gqlschema <dgraph.type> "dgraph.graphql" <%#x> .
_:gqlschema <dgraph.graphql.xid> "dgraph.graphql.schema" <%#x> .
Expand Down Expand Up @@ -388,6 +422,7 @@ func (ld *loader) processGqlSchema(loadType chunker.InputFormat) {
ld.readerChunkCh <- gqlBuf
}

buf := readGqlSchema(ld.opt)
schemas := parseGqlSchema(string(buf))
if ld.opt.Namespace == math.MaxUint64 {
// Preserve the namespace.
Expand Down
16 changes: 10 additions & 6 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,16 @@ func run() {
fmt.Printf("Encrypted input: %v; Encrypted output: %v\n", opt.Encrypted, opt.EncryptedOut)

if opt.SchemaFile == "" {
fmt.Fprint(os.Stderr, "Schema file must be specified.\n")
os.Exit(1)
}
if !filestore.Exists(opt.SchemaFile) {
fmt.Fprintf(os.Stderr, "Schema path(%v) does not exist.\n", opt.SchemaFile)
os.Exit(1)
// if only graphql schema is provided, we can generate DQL schema from it.
if opt.GqlSchemaFile == "" {
fmt.Fprint(os.Stderr, "Schema file must be specified.\n")
os.Exit(1)
}
} else {
if !filestore.Exists(opt.SchemaFile) {
fmt.Fprintf(os.Stderr, "Schema path(%v) does not exist.\n", opt.SchemaFile)
os.Exit(1)
}
}
if opt.DataFiles == "" {
fmt.Fprint(os.Stderr, "RDF or JSON file(s) location must be specified.\n")
Expand Down
8 changes: 8 additions & 0 deletions dgraphtest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type ClusterConfig struct {
uidLease int
// exposed port offset for grpc/http port for both alpha/zero
portOffset int
bulkOutDir string
}

func NewClusterConfig() ClusterConfig {
Expand Down Expand Up @@ -163,3 +164,10 @@ func (cc ClusterConfig) WithExposedPortOffset(offset uint64) ClusterConfig {
cc.portOffset = int(offset)
return cc
}

// WithBulkLoadOutDir sets the out dir for the bulk loader. This ensures
// that the same p directory is used while setting up alphas.
func (cc ClusterConfig) WithBulkLoadOutDir(dir string) ClusterConfig {
cc.bulkOutDir = dir
return cc
}
38 changes: 15 additions & 23 deletions dgraphtest/dgraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

const (
binaryName = "dgraph_%v"
binaryNameFmt = "dgraph_%v"
zeroNameFmt = "%v_zero%d"
zeroAliasNameFmt = "zero%d"
alphaNameFmt = "%v_alpha%d"
Expand All @@ -46,6 +46,7 @@ const (

alphaWorkingDir = "/data/alpha"
zeroWorkingDir = "/data/zero"
DefaultAlphaPDir = "/data/alpha/p"
DefaultBackupDir = "/data/backups"
DefaultExportDir = "/data/exports"

Expand Down Expand Up @@ -275,6 +276,19 @@ func (a *alpha) mounts(c *LocalCluster) ([]mount.Mount, error) {
})
}

pDir := filepath.Join(c.conf.bulkOutDir, strconv.Itoa(a.id/c.conf.replicas), "p")
if err := os.MkdirAll(pDir, os.ModePerm); err != nil {
return nil, errors.Wrap(err, "erorr creating bulk dir")
}
if c.conf.bulkOutDir != "" {
mounts = append(mounts, mount.Mount{
Type: mount.TypeBind,
Source: pDir,
Target: DefaultAlphaPDir,
ReadOnly: false,
})
}

for dir, vol := range c.conf.volumes {
mounts = append(mounts, mount.Mount{
Type: mount.TypeVolume,
Expand Down Expand Up @@ -333,28 +347,6 @@ func publicPort(dcli *docker.Client, dc dnode, privatePort string) (string, erro
}

func mountBinary(c *LocalCluster) (mount.Mount, error) {
if c.conf.version == localVersion {
return mount.Mount{
Type: mount.TypeBind,
Source: filepath.Join(os.Getenv("GOPATH"), "bin"),
Target: "/gobin",
ReadOnly: true,
}, nil
}

isFileExist, err := fileExists(filepath.Join(c.tempBinDir, "dgraph"))
if err != nil {
return mount.Mount{}, err
}
if isFileExist {
return mount.Mount{
Type: mount.TypeBind,
Source: c.tempBinDir,
Target: "/gobin",
ReadOnly: true,
}, nil
}

if err := c.setupBinary(); err != nil {
return mount.Mount{}, err
}
Expand Down
18 changes: 14 additions & 4 deletions dgraphtest/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ func (c *LocalCluster) dgraphImage() string {
}

func (c *LocalCluster) setupBinary() error {
isFileThere, err := fileExists(filepath.Join(binDir, fmt.Sprintf(binaryName, c.conf.version)))
if c.conf.version == localVersion {
fromDir := filepath.Join(os.Getenv("GOPATH"), "bin")
return copyBinary(fromDir, c.tempBinDir, c.conf.version)
}

isFileThere, err := fileExists(filepath.Join(binDir, fmt.Sprintf(binaryNameFmt, c.conf.version)))
if err != nil {
return err
}
Expand Down Expand Up @@ -138,15 +143,20 @@ func buildDgraphBinary(dir, binaryDir, version string) error {
return errors.Wrapf(err, "error while building dgraph binary\noutput:%v", string(out))
}
if err := copy(filepath.Join(dir, "dgraph", "dgraph"),
filepath.Join(binaryDir, fmt.Sprintf(binaryName, version))); err != nil {
filepath.Join(binaryDir, fmt.Sprintf(binaryNameFmt, version))); err != nil {
return errors.Wrap(err, "error while copying binary")
}
return nil
}

func copyBinary(fromDir, toDir, version string) error {
if err := copy(filepath.Join(fromDir, fmt.Sprintf(binaryName, version)),
filepath.Join(toDir, "dgraph")); err != nil {
binaryName := "dgraph"
if version != localVersion {
binaryName = fmt.Sprintf(binaryNameFmt, version)
}
fromPath := filepath.Join(fromDir, binaryName)
toPath := filepath.Join(toDir, "dgraph")
if err := copy(fromPath, toPath); err != nil {
return errors.Wrap(err, "error while copying binary into tempBinDir")
}
return nil
Expand Down
51 changes: 48 additions & 3 deletions dgraphtest/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"

"github.com/pkg/errors"
Expand All @@ -34,7 +35,7 @@ import (
)

type LiveOpts struct {
RdfFiles []string
DataFiles []string
SchemaFiles []string
GqlSchemaFiles []string
}
Expand Down Expand Up @@ -140,7 +141,7 @@ func (c *LocalCluster) LiveLoad(opts LiveOpts) error {

args := []string{
"live",
"--files", strings.Join(opts.RdfFiles, ","),
"--files", strings.Join(opts.DataFiles, ","),
"--alpha", strings.Join(alphaURLs, ","),
"--zero", zeroURL,
}
Expand Down Expand Up @@ -227,7 +228,7 @@ func (c *LocalCluster) LiveLoadFromExport(exportDir string) error {
}

opts := LiveOpts{
RdfFiles: rdfFiles,
DataFiles: rdfFiles,
SchemaFiles: schemaFiles,
GqlSchemaFiles: gqlSchemaFiles,
}
Expand All @@ -236,3 +237,47 @@ func (c *LocalCluster) LiveLoadFromExport(exportDir string) error {
}
return nil
}

type BulkOpts struct {
DataFiles []string
SchemaFiles []string
GQLSchemaFiles []string
}

func (c *LocalCluster) BulkLoad(opts BulkOpts) error {
zeroURL, err := c.zeros[0].zeroURL(c)
if err != nil {
return errors.Wrap(err, "error finding URL of first zero")
}

shards := c.conf.numAlphas / c.conf.replicas
args := []string{"bulk",
"--store_xids=true",
"--zero", zeroURL,
"--reduce_shards", strconv.Itoa(shards),
"--map_shards", strconv.Itoa(shards),
"--out", c.conf.bulkOutDir,
// we had to create the dir for setting up docker, hence, replacing it here.
"--replace_out",
}

if len(opts.DataFiles) > 0 {
args = append(args, "-f", strings.Join(opts.DataFiles, ","))
}
if len(opts.SchemaFiles) > 0 {
args = append(args, "-s", strings.Join(opts.SchemaFiles, ","))
}
if len(opts.GQLSchemaFiles) > 0 {
args = append(args, "-g", strings.Join(opts.GQLSchemaFiles, ","))
}

log.Printf("[INFO] running bulk loader with args: [%v]", strings.Join(args, " "))
cmd := exec.Command(filepath.Join(c.tempBinDir, "dgraph"), args...)
if out, err := cmd.CombinedOutput(); err != nil {
return errors.Wrapf(err, "error running bulk loader: %v", string(out))
} else {
log.Printf("[INFO] ==== output for bulk loader ====")
log.Println(string(out))
return nil
}
}
8 changes: 6 additions & 2 deletions dgraphtest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (c *LocalCluster) Start() error {
return err
}
}
return c.HealthCheck()
return c.HealthCheck(false)
}

func (c *LocalCluster) StartZero(id int) error {
Expand Down Expand Up @@ -343,7 +343,7 @@ func (c *LocalCluster) killContainer(dc dnode) error {
return nil
}

func (c *LocalCluster) HealthCheck() error {
func (c *LocalCluster) HealthCheck(zeroOnly bool) error {
log.Printf("[INFO] checking health of containers")
for i := 0; i < c.conf.numZeros; i++ {
url, err := c.zeros[i].healthURL(c)
Expand All @@ -355,6 +355,10 @@ func (c *LocalCluster) HealthCheck() error {
}
log.Printf("[INFO] container [zero-%v] passed health check", i)
}
if zeroOnly {
return nil
}

for i := 0; i < c.conf.numAlphas; i++ {
url, err := c.alphas[i].healthURL(c)
if err != nil {
Expand Down
Loading

0 comments on commit 96adccf

Please sign in to comment.