Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Flags): [Breaking] Add flag for snapshot duration frequency #7675

Merged
merged 6 commits into from
Apr 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion compose/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func getAlpha(idx int, raft string) service {
}

if opts.SnapshotAfter != "" {
raft = fmt.Sprintf("%s; snapshot-after=%s", raft, opts.SnapshotAfter)
raft = fmt.Sprintf("%s; %s", raft, opts.SnapshotAfter)
}
svc.Command += fmt.Sprintf(` --raft "%s"`, raft)

Expand Down
8 changes: 6 additions & 2 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,13 @@ they form a Raft group and provide synchronous replication.
Flag("learner",
`Make this Alpha a "learner" node. In learner mode, this Alpha will not participate `+
"in Raft elections. This can be used to achieve a read-only replica.").
Flag("snapshot-after",
Flag("snapshot-after-entries",
"Create a new Raft snapshot after N number of Raft entries. The lower this number, "+
"the more frequent snapshot creation will be.").
"the more frequent snapshot creation will be. Snapshots are created only if both "+
"snapshot-after-duration and snapshot-after-entries threshold are crossed.").
Flag("snapshot-after-duration",
"Frequency at which we should create a new raft snapshots. Set "+
"to 0 to disable duration based snapshot.").
Flag("pending-proposals",
"Number of pending mutation proposals. Useful for rate limiting.").
String())
Expand Down
65 changes: 25 additions & 40 deletions worker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Auto-generated with: [./compose -a 6 -z 3 -j -w --port_offset=0 --expose_ports=false -O ../worker/docker-compose.yml --mem= --snapshot_after=100 --names=false]
# Auto-generated with: [./compose -a 6 -z 3 -j -w --port_offset=0 --expose_ports=false -O ../worker/docker-compose.yml --mem= --snapshot_after=snapshot-after-entries=100; snapshot-after-duration=1m --names=false]
#
version: "3.5"
services:
Expand All @@ -15,11 +15,9 @@ services:
source: $GOPATH/bin
target: /gobin
read_only: true
command: /gobin/dgraph alpha --my=alpha1:7080
--zero=zero1:5080,zero2:5080,zero3:5080 --logtostderr -v=2
--security "whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16;"
--raft "snapshot-after=100;"
--trace "jaeger=http://jaeger:14268;"
command: /gobin/dgraph alpha --trace "jaeger=http://jaeger:14268;" --my=alpha1:7080
--zero=zero1:5080,zero2:5080,zero3:5080 --logtostderr -v=2 --raft "idx=1; group=1;
snapshot-after-entries=100; snapshot-after-duration=1m" --security "whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16;"
alpha2:
image: dgraph/dgraph:latest
working_dir: /data/alpha2
Expand All @@ -33,11 +31,9 @@ services:
source: $GOPATH/bin
target: /gobin
read_only: true
command: /gobin/dgraph alpha --my=alpha2:7080
--zero=zero1:5080,zero2:5080,zero3:5080 --logtostderr -v=2
--security "whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16;"
--raft "snapshot-after=100;"
--trace "jaeger=http://jaeger:14268;"
command: /gobin/dgraph alpha --trace "jaeger=http://jaeger:14268;" --my=alpha2:7080
--zero=zero1:5080,zero2:5080,zero3:5080 --logtostderr -v=2 --raft "idx=2; group=1;
snapshot-after-entries=100; snapshot-after-duration=1m" --security "whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16;"
alpha3:
image: dgraph/dgraph:latest
working_dir: /data/alpha3
Expand All @@ -51,11 +47,9 @@ services:
source: $GOPATH/bin
target: /gobin
read_only: true
command: /gobin/dgraph alpha --my=alpha3:7080
--zero=zero1:5080,zero2:5080,zero3:5080 --logtostderr -v=2
--security "whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16;"
--raft "snapshot-after=100;"
--trace "jaeger=http://jaeger:14268;"
command: /gobin/dgraph alpha --trace "jaeger=http://jaeger:14268;" --my=alpha3:7080
--zero=zero1:5080,zero2:5080,zero3:5080 --logtostderr -v=2 --raft "idx=3; group=1;
snapshot-after-entries=100; snapshot-after-duration=1m" --security "whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16;"
alpha4:
image: dgraph/dgraph:latest
working_dir: /data/alpha4
Expand All @@ -69,11 +63,9 @@ services:
source: $GOPATH/bin
target: /gobin
read_only: true
command: /gobin/dgraph alpha --my=alpha4:7080
--zero=zero1:5080,zero2:5080,zero3:5080 --logtostderr -v=2
--security "whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16;"
--raft "snapshot-after=100;"
--trace "jaeger=http://jaeger:14268;"
command: /gobin/dgraph alpha --trace "jaeger=http://jaeger:14268;" --my=alpha4:7080
--zero=zero1:5080,zero2:5080,zero3:5080 --logtostderr -v=2 --raft "idx=4; group=2;
snapshot-after-entries=100; snapshot-after-duration=1m" --security "whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16;"
alpha5:
image: dgraph/dgraph:latest
working_dir: /data/alpha5
Expand All @@ -87,11 +79,9 @@ services:
source: $GOPATH/bin
target: /gobin
read_only: true
command: /gobin/dgraph alpha --my=alpha5:7080
--zero=zero1:5080,zero2:5080,zero3:5080 --logtostderr -v=2
--security "whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16;"
--raft "snapshot-after=100;"
--trace "jaeger=http://jaeger:14268;"
command: /gobin/dgraph alpha --trace "jaeger=http://jaeger:14268;" --my=alpha5:7080
--zero=zero1:5080,zero2:5080,zero3:5080 --logtostderr -v=2 --raft "idx=5; group=2;
snapshot-after-entries=100; snapshot-after-duration=1m" --security "whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16;"
alpha6:
image: dgraph/dgraph:latest
working_dir: /data/alpha6
Expand All @@ -105,11 +95,9 @@ services:
source: $GOPATH/bin
target: /gobin
read_only: true
command: /gobin/dgraph alpha --my=alpha6:7080
--zero=zero1:5080,zero2:5080,zero3:5080 --logtostderr -v=2
--security "whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16;"
--raft "snapshot-after=100;"
--trace "jaeger=http://jaeger:14268;"
command: /gobin/dgraph alpha --trace "jaeger=http://jaeger:14268;" --my=alpha6:7080
--zero=zero1:5080,zero2:5080,zero3:5080 --logtostderr -v=2 --raft "idx=6; group=2;
snapshot-after-entries=100; snapshot-after-duration=1m" --security "whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16;"
jaeger:
image: jaegertracing/all-in-one:1.18
working_dir: /working/jaeger
Expand All @@ -133,9 +121,8 @@ services:
source: $GOPATH/bin
target: /gobin
read_only: true
command: /gobin/dgraph zero --raft "idx=1;" --my=zero1:5080
--replicas=3 --logtostderr -v=2 --bindall
--trace "jaeger=http://jaeger:14268;"
command: /gobin/dgraph zero --trace "jaeger=http://jaeger:14268;" --raft='idx=1'
--my=zero1:5080 --replicas=3 --logtostderr -v=2 --bindall
zero2:
image: dgraph/dgraph:latest
working_dir: /data/zero2
Expand All @@ -151,9 +138,8 @@ services:
source: $GOPATH/bin
target: /gobin
read_only: true
command: /gobin/dgraph zero --raft "idx=2;" --my=zero2:5080
--replicas=3 --logtostderr -v=2 --peer=zero1:5080
--trace "jaeger=http://jaeger:14268;"
command: /gobin/dgraph zero --trace "jaeger=http://jaeger:14268;" --raft='idx=2'
--my=zero2:5080 --replicas=3 --logtostderr -v=2 --peer=zero1:5080
zero3:
image: dgraph/dgraph:latest
working_dir: /data/zero3
Expand All @@ -169,7 +155,6 @@ services:
source: $GOPATH/bin
target: /gobin
read_only: true
command: /gobin/dgraph zero --raft "idx=3;" --my=zero3:5080
--replicas=3 --logtostderr -v=2 --peer=zero1:5080
--trace "jaeger=http://jaeger:14268;"
command: /gobin/dgraph zero --trace "jaeger=http://jaeger:14268;" --raft='idx=3'
--my=zero3:5080 --replicas=3 --logtostderr -v=2 --peer=zero1:5080
volumes: {}
31 changes: 21 additions & 10 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,10 +997,13 @@ func (n *node) updateRaftProgress() error {

func (n *node) checkpointAndClose(done chan struct{}) {
slowTicker := time.NewTicker(time.Minute)
lastSnapshotTime := time.Now()
defer slowTicker.Stop()

snapshotAfter := x.WorkerConfig.Raft.GetUint64("snapshot-after")
x.AssertTruef(snapshotAfter > 10, "raft.snapshot-after must be a number greater than 10")
snapshotAfterEntries := x.WorkerConfig.Raft.GetUint64("snapshot-after-entries")
x.AssertTruef(snapshotAfterEntries > 10, "raft.snapshot-after must be a number greater than 10")

snapshotFrequency := x.WorkerConfig.Raft.GetDuration("snapshot-after-duration")

for {
select {
Expand Down Expand Up @@ -1028,16 +1031,22 @@ func (n *node) checkpointAndClose(done chan struct{}) {
// calculate a new snapshot.
calculate := raft.IsEmptySnap(snap) || n.Store.NumLogFiles() > 4

if chk, err := n.Store.Checkpoint(); err == nil {
if first, err := n.Store.FirstIndex(); err == nil {
// Save some cycles by only calculating snapshot if the checkpoint has gone
// quite a bit further than the first index.
calculate = calculate || chk >= first+snapshotAfter
glog.V(3).Infof("Evaluating snapshot first:%d chk:%d (chk-first:%d) "+
"snapshotAfter:%d snap:%v", first, chk, chk-first,
snapshotAfter, calculate)
// Only take snapshot if both snapshotFrequency and
// snapshotAfterEntries requirements are met. If set to 0,
// we consider duration condition to be disabled.
if snapshotFrequency == 0 || time.Since(lastSnapshotTime) > snapshotFrequency {
if chk, err := n.Store.Checkpoint(); err == nil {
rohanprasad marked this conversation as resolved.
Show resolved Hide resolved
if first, err := n.Store.FirstIndex(); err == nil {
// Save some cycles by only calculating snapshot if the checkpoint
// has gone quite a bit further than the first index.
calculate = calculate || chk >= first+snapshotAfterEntries
glog.V(3).Infof("Evaluating snapshot first:%d chk:%d (chk-first:%d) "+
"snapshotAfterEntries:%d snap:%v", first, chk, chk-first,
snapshotAfterEntries, calculate)
}
}
}

// We keep track of the applied index in the p directory. Even if we don't take
// snapshot for a while and let the Raft logs grow and restart, we would not have to
// run all the log entries, because we can tell Raft.Config to set Applied to that
Expand All @@ -1055,6 +1064,8 @@ func (n *node) checkpointAndClose(done chan struct{}) {
// or our checkpoint already crossed the SnapshotAfter threshold.
if err := n.proposeSnapshot(); err != nil {
glog.Errorf("While calculating and proposing snapshot: %v", err)
} else {
lastSnapshotTime = time.Now()
}
}
go n.abortOldTransactions()
Expand Down
9 changes: 5 additions & 4 deletions worker/server_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ const (
// For easy readability, keep the options without default values (if any) at the end of
// the *Defaults string. Also, since these strings are printed in --help text, avoid line
// breaks.
AclDefaults = `access-ttl=6h; refresh-ttl=30d; secret-file=;`
AuditDefaults = `compress=false; days=10; size=100; dir=; output=; encrypt-file=;`
BadgerDefaults = `compression=snappy; goroutines=8; max-retries=-1;`
RaftDefaults = `learner=false; snapshot-after=10000; pending-proposals=256; idx=; group=;`
AclDefaults = `access-ttl=6h; refresh-ttl=30d; secret-file=;`
AuditDefaults = `compress=false; days=10; size=100; dir=; output=; encrypt-file=;`
BadgerDefaults = `compression=snappy; goroutines=8; max-retries=-1;`
RaftDefaults = `learner=false; snapshot-after-entries=10000; ` +
`snapshot-after-duration=30m; pending-proposals=256; idx=; group=;`
SecurityDefaults = `token=; whitelist=;`
LudicrousDefaults = `enabled=false; concurrency=2000;`
CDCDefaults = `file=; kafka=; sasl_user=; sasl_password=; ca_cert=; client_cert=; ` +
Expand Down