Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BREAKING] feat(flags): expand badger to accept all valid options #7677

Merged
merged 11 commits into from
Apr 6, 2021
29 changes: 12 additions & 17 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,13 @@ they form a Raft group and provide synchronous replication.
grpc.EnableTracing = false

flag.String("badger", worker.BadgerDefaults, z.NewSuperFlagHelp(worker.BadgerDefaults).
Head("Badger options").
Head("Badger options (Other badger options are also supported, not shown for brevity.)").
Flag("compression",
`[none, zstd:level, snappy] Specifies the compression algorithm and
compression level (if applicable) for the postings directory."none" would disable
compression, while "zstd:1" would set zstd compression at level 1.`).
Flag("goroutines",
"The number of goroutines to use in badger.Stream.").
Flag("max-retries",
"Commits to disk will give up after these number of retries to prevent locking the "+
"worker in a failed state. Use -1 to retry infinitely.").
String())

// Cache flags.
Expand Down Expand Up @@ -212,6 +209,9 @@ they form a Raft group and provide synchronous replication.
Flag("query-timeout",
"Maximum time after which a query execution will fail. If set to"+
" 0, the timeout is infinite.").
Flag("max-retries",
"Commits to disk will give up after these number of retries to prevent locking the "+
"worker in a failed state. Use -1 to retry infinitely.").
Flag("txn-abort-after", "Abort any pending transactions older than this duration."+
" The liveness of a transaction is determined by its last mutation.").
String())
Expand Down Expand Up @@ -632,23 +632,18 @@ func run() {
postingListCacheSize := (cachePercent[0] * (totalCache << 20)) / 100
pstoreBlockCacheSize := (cachePercent[1] * (totalCache << 20)) / 100
pstoreIndexCacheSize := (cachePercent[2] * (totalCache << 20)) / 100

badger := z.NewSuperFlag(Alpha.Conf.GetString("badger")).MergeAndCheckDefault(
worker.BadgerDefaults)
ctype, clevel := x.ParseCompression(badger.GetString("compression"))
badger := x.MergeAndCheckBadgerDefaults(worker.BadgerDefaults, Alpha.Conf.GetString("badger"))

security := z.NewSuperFlag(Alpha.Conf.GetString("security")).MergeAndCheckDefault(
worker.SecurityDefaults)
conf := audit.GetAuditConf(Alpha.Conf.GetString("audit"))
opts := worker.Options{
PostingDir: Alpha.Conf.GetString("postings"),
WALDir: Alpha.Conf.GetString("wal"),
PostingDirCompression: ctype,
PostingDirCompressionLevel: clevel,
CacheMb: totalCache,
CachePercentage: cachePercentage,
PBlockCacheSize: pstoreBlockCacheSize,
PIndexCacheSize: pstoreIndexCacheSize,
PostingDir: Alpha.Conf.GetString("postings"),
WALDir: Alpha.Conf.GetString("wal"),
CacheMb: totalCache,
CachePercentage: cachePercentage,
PBlockCacheSize: pstoreBlockCacheSize,
PIndexCacheSize: pstoreIndexCacheSize,

MutationsMode: worker.AllowMutations,
AuthToken: security.GetString("token"),
Expand Down Expand Up @@ -713,7 +708,6 @@ func run() {
HmacSecret: opts.HmacSecret,
Audit: opts.Audit != nil,
Badger: badger,
MaxRetries: badger.GetInt64("max-retries"),
}
x.WorkerConfig.Parse(Alpha.Conf)

Expand All @@ -734,6 +728,7 @@ func run() {
x.Config.BlockClusterWideDrop = x.Config.Limit.GetBool("disallow-drop")
x.Config.LimitNormalizeNode = int(x.Config.Limit.GetInt64("normalize-node"))
x.Config.QueryTimeout = x.Config.Limit.GetDuration("query-timeout")
x.Config.MaxRetries = x.Config.Limit.GetInt64("max-retries")

x.Config.GraphQL = z.NewSuperFlag(Alpha.Conf.GetString("graphql")).MergeAndCheckDefault(
worker.GraphQLDefaults)
Expand Down
13 changes: 4 additions & 9 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"google.golang.org/grpc/credentials"

"github.com/dgraph-io/badger/v3"
bo "github.com/dgraph-io/badger/v3/options"
"github.com/dgraph-io/badger/v3/y"

"github.com/dgraph-io/dgraph/chunker"
Expand Down Expand Up @@ -85,12 +84,8 @@ type options struct {
// ........... Badger options ..........
// EncryptionKey is the key used for encryption. Enterprise only feature.
EncryptionKey x.SensitiveByteSlice
// BadgerCompression is the compression algorithm to use while writing to badger.
BadgerCompression bo.CompressionType
// BadgerCompressionlevel is the compression level to use while writing to badger.
BadgerCompressionLevel int
BlockCacheSize int64
IndexCacheSize int64
// Badger options.
Badger badger.Options
}

type state struct {
Expand Down Expand Up @@ -236,8 +231,8 @@ func (ld *loader) mapStage() {
}
ld.xids = xidmap.New(xidmap.XidMapOptions{
UidAssigner: ld.zero,
DB: db,
Dir: filepath.Join(ld.opt.TmpDir, bufferDir),
DB: db,
Dir: filepath.Join(ld.opt.TmpDir, bufferDir),
})

fs := filestore.NewFileStore(ld.opt.DataFiles)
Expand Down
11 changes: 5 additions & 6 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,17 @@ func (r *reducer) createBadgerInternal(dir string, compression bool) *badger.DB
key = nil
}

opt := badger.DefaultOptions(dir).
opt := r.state.opt.Badger.
WithDir(dir).WithValueDir(dir).
WithSyncWrites(false).
WithEncryptionKey(key).
WithBlockCacheSize(r.opt.BlockCacheSize).
WithIndexCacheSize(r.opt.IndexCacheSize)
WithEncryptionKey(key)

opt.Compression = bo.None
opt.ZSTDCompressionLevel = 0
// Overwrite badger options based on the options provided by the user.
if compression {
opt.Compression = r.state.opt.BadgerCompression
opt.ZSTDCompressionLevel = r.state.opt.BadgerCompressionLevel
opt.Compression = r.state.opt.Badger.Compression
opt.ZSTDCompressionLevel = r.state.opt.Badger.ZSTDCompressionLevel
}

db, err := badger.OpenManaged(opt)
Expand Down
19 changes: 3 additions & 16 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func init() {
"Namespace onto which to load the data. If not set, will preserve the namespace.")

flag.String("badger", BulkBadgerDefaults, z.NewSuperFlagHelp(BulkBadgerDefaults).
Head("Badger options").
Head("Badger options (Other badger options are also supported, not shown for brevity.)").
Flag("compression",
"Specifies the compression algorithm and compression level (if applicable) for the "+
`postings directory. "none" would disable compression, while "zstd:1" would set `+
Expand All @@ -142,9 +142,7 @@ func init() {
}

func run() {
badger := z.NewSuperFlag(Bulk.Conf.GetString("badger")).MergeAndCheckDefault(
BulkBadgerDefaults)
ctype, clevel := x.ParseCompression(badger.GetString("compression"))
badger := x.MergeAndCheckBadgerDefaults(BulkBadgerDefaults, Bulk.Conf.GetString("badger"))
opt := options{
DataFiles: Bulk.Conf.GetString("files"),
DataFormat: Bulk.Conf.GetString("format"),
Expand Down Expand Up @@ -172,25 +170,14 @@ func run() {
NewUids: Bulk.Conf.GetBool("new_uids"),
ClientDir: Bulk.Conf.GetString("xidmap"),
Namespace: Bulk.Conf.GetUint64("force-namespace"),

// Badger options
BadgerCompression: ctype,
BadgerCompressionLevel: clevel,
Badger: badger,
}

x.PrintVersion()
if opt.Version {
os.Exit(0)
}

totalCache := int64(badger.GetUint64("cache-mb"))
x.AssertTruef(totalCache >= 0, "ERROR: Cache size must be non-negative")
cachePercent, err := x.GetCachePercentages(badger.GetString("cache-percentage"), 2)
x.Check(err)
totalCache <<= 20 // Convert to MB.
opt.BlockCacheSize = (cachePercent[0] * totalCache) / 100
opt.IndexCacheSize = (cachePercent[1] * totalCache) / 100

_, opt.EncryptionKey = ee.GetKeys(Bulk.Conf)
if len(opt.EncryptionKey) == 0 {
if opt.Encrypted || opt.EncryptedOut {
Expand Down
2 changes: 1 addition & 1 deletion ee/backup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ $ dgraph restore -p . -l /var/backups/dgraph -z localhost:5080

flag.StringVarP(&opt.badger, "badger", "b", worker.BadgerDefaults,
z.NewSuperFlagHelp(worker.BadgerDefaults).
Head("Badger options").
Head("Badger options (Other badger options are also supported, not shown for brevity.)").
NamanJain8 marked this conversation as resolved.
Show resolved Hide resolved
Flag("compression",
"Specifies the compression algorithm and compression level (if applicable) for the "+
`postings directory. "none" would disable compression, while "zstd:1" would set `+
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/Shopify/sarama v1.27.2
github.com/blevesearch/bleve v1.0.13
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
github.com/dgraph-io/badger/v3 v3.0.0-20210309075542-2245c18dfd1f
github.com/dgraph-io/badger/v3 v3.0.0-20210405131809-9b176f1b56e8
github.com/dgraph-io/dgo/v200 v200.0.0-20210401091508-95bfd74de60e
github.com/dgraph-io/gqlgen v0.13.2
github.com/dgraph-io/gqlparser/v2 v2.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo=
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgraph-io/badger/v3 v3.0.0-20210309075542-2245c18dfd1f h1:dZpGNLp9YUpq4h2DRcWAjW5dWj47SM3W3NK71z6FRa0=
github.com/dgraph-io/badger/v3 v3.0.0-20210309075542-2245c18dfd1f/go.mod h1:GHMCYxuDWyzbHkh4k3yyg4PM61tJPFfEGSMbE3Vd5QE=
github.com/dgraph-io/badger/v3 v3.0.0-20210405131809-9b176f1b56e8 h1:zRzQj1hfaEtXlK3IoizRwohnFdpwlMw9uTMsgHSTBaU=
github.com/dgraph-io/badger/v3 v3.0.0-20210405131809-9b176f1b56e8/go.mod h1:GHMCYxuDWyzbHkh4k3yyg4PM61tJPFfEGSMbE3Vd5QE=
github.com/dgraph-io/dgo/v200 v200.0.0-20210401091508-95bfd74de60e h1:kdH2yqGYUl5xJARdI5kN1fjhVUV2sLC+vL1CVXhcAfo=
github.com/dgraph-io/dgo/v200 v200.0.0-20210401091508-95bfd74de60e/go.mod h1:zCfS4R3E/UC/PhETXJYq/Blia0eCH1EQqKrWDvvimxE=
github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM=
Expand Down
2 changes: 1 addition & 1 deletion worker/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *BackupProcessor {
bp := &BackupProcessor{
DB: db,
Request: req,
threads: make([]*threadLocal, x.WorkerConfig.Badger.GetUint64("goroutines")),
threads: make([]*threadLocal, x.WorkerConfig.Badger.NumGoroutines),
}
if req.SinceTs > 0 && db != nil {
bp.txn = db.NewTransactionAt(req.ReadTs, false)
Expand Down
7 changes: 0 additions & 7 deletions worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"path/filepath"
"time"

bo "github.com/dgraph-io/badger/v3/options"
"github.com/dgraph-io/dgraph/x"
)

Expand All @@ -37,12 +36,6 @@ const (
type Options struct {
// PostingDir is the path to the directory storing the postings..
PostingDir string
// PostingDirCompression is the compression algorithem used to compression Postings directory.
PostingDirCompression bo.CompressionType
// PostingDirCompressionLevel is the ZSTD compression level used by Postings directory. A
// higher value means more CPU intensive compression and better compression
// ratio.
PostingDirCompressionLevel int
// WALDir is the path to the directory storing the write-ahead log.
WALDir string
// MutationsMode is the mode used to handle mutation requests.
Expand Down
2 changes: 1 addition & 1 deletion worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error {
return
}
txn.Update()
err := x.RetryUntilSuccess(int(x.WorkerConfig.MaxRetries),
err := x.RetryUntilSuccess(int(x.Config.MaxRetries),
10*time.Millisecond, func() error {
err := txn.CommitToDisk(writer, commit)
if err == badger.ErrBannedKey {
Expand Down
13 changes: 5 additions & 8 deletions worker/server_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ const (
// breaks.
AclDefaults = `access-ttl=6h; refresh-ttl=30d; secret-file=;`
AuditDefaults = `compress=false; days=10; size=100; dir=; output=; encrypt-file=;`
BadgerDefaults = `compression=snappy; goroutines=8; max-retries=-1;`
BadgerDefaults = `compression=snappy; goroutines=8;`
RaftDefaults = `learner=false; snapshot-after-entries=10000; ` +
`snapshot-after-duration=30m; pending-proposals=256; idx=; group=;`
SecurityDefaults = `token=; whitelist=;`
LudicrousDefaults = `enabled=false; concurrency=2000;`
CDCDefaults = `file=; kafka=; sasl_user=; sasl_password=; ca_cert=; client_cert=; ` +
`client_key=;`
LimitDefaults = `mutations=allow; query-edge=1000000; normalize-node=10000; ` +
`mutations-nquad=1000000; disallow-drop=false; query-timeout=0ms; txn-abort-after=5m;`
`mutations-nquad=1000000; disallow-drop=false; query-timeout=0ms; txn-abort-after=5m; ` +
` max-retries=-1;`
ZeroLimitsDefaults = `uid-lease=0; refill-interval=30s; disable-admin-http=false;`
GraphQLDefaults = `introspection=true; debug=false; extensions=true; poll-interval=1s; ` +
`lambda-url=;`
Expand Down Expand Up @@ -98,10 +99,6 @@ func setBadgerOptions(opt badger.Options) badger.Options {
// saved by disabling it.
opt.DetectConflicts = false

glog.Infof("Setting Posting Dir Compression Level: %d", Config.PostingDirCompressionLevel)
opt.Compression = Config.PostingDirCompression
opt.ZSTDCompressionLevel = Config.PostingDirCompressionLevel

// Settings for the data directory.
return opt
}
Expand Down Expand Up @@ -131,9 +128,9 @@ func (s *ServerState) initStorage() {
// All the writes to posting store should be synchronous. We use batched writers
// for posting lists, so the cost of sync writes is amortized.
x.Check(os.MkdirAll(Config.PostingDir, 0700))
opt := badger.DefaultOptions(Config.PostingDir).
opt := x.WorkerConfig.Badger.
WithDir(Config.PostingDir).WithValueDir(Config.PostingDir).
WithNumVersionsToKeep(math.MaxInt32).
WithNumGoroutines(int(x.WorkerConfig.Badger.GetUint64("goroutines"))).
WithBlockCacheSize(Config.PBlockCacheSize).
WithIndexCacheSize(Config.PIndexCacheSize).
WithNamespaceOffset(x.NamespaceOffset)
Expand Down
9 changes: 4 additions & 5 deletions x/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net"
"time"

"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/ristretto/z"
"github.com/spf13/viper"
)
Expand All @@ -43,6 +44,7 @@ type Options struct {
BlockClusterWideDrop bool
LimitNormalizeNode int
QueryTimeout time.Duration
MaxRetries int64

// GraphQL options:
//
Expand Down Expand Up @@ -96,8 +98,8 @@ type WorkerOptions struct {
TLSServerConfig *tls.Config
// Raft stores options related to Raft.
Raft *z.SuperFlag
// Badger stores options related to Badger.
Badger *z.SuperFlag
// Badger stores the badger options.
Badger badger.Options
// WhiteListedIPRanges is a list of IP ranges from which requests will be allowed.
WhiteListedIPRanges []IPRange
// StrictMutations will cause mutations to unknown predicates to fail if set to true.
Expand All @@ -108,9 +110,6 @@ type WorkerOptions struct {
HmacSecret SensitiveByteSlice
// AbortOlderThan tells Dgraph to discard transactions that are older than this duration.
AbortOlderThan time.Duration
// MaxRetries indicates the number of retries Dgraph do to prevent locking the worker in a
// failed state.
MaxRetries int64
// ProposedGroupId will be used if there's a file in the p directory called group_id with the
// proposed group ID for this server.
ProposedGroupId uint32
Expand Down
7 changes: 7 additions & 0 deletions x/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package x

import (
"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/ristretto/z"
"github.com/spf13/pflag"
)
Expand Down Expand Up @@ -62,3 +63,9 @@ func FillCommonFlags(flag *pflag.FlagSet) {
"Send crash events to Sentry.").
String())
}

// MergeAndCheckBadgerDefaults validates the user input for --badger flag. It also overwrites the
// badger defaults with the dgraph defaults and then with the user input.
func MergeAndCheckBadgerDefaults(dgraphDefaults, userInput string) badger.Options {
return badger.DefaultOptions("").FromSuperFlag(dgraphDefaults).FromSuperFlag(userInput)
}