-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ludicrous): Run mutations from the same predicate concurrently in ludicrous mode #6060
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change the title to say "run mutations from the same predicate concurrently in ludicrous mode". Looks alright to me, get it reviewed by someone and definitely be careful around the increase memory usage.
Reviewed 1 of 2 files at r1, 1 of 1 files at r3.
Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @harshil-goel)
worker/executor.go, line 71 at r3 (raw file):
func generateConflictKeys(p *subMutation) []uint64 { keys := make([]uint64, 0)
Move this closer to usage.
var keys []uint64
worker/executor.go, line 81 at r3 (raw file):
} if schema.State().IsList(edge.Attr) {
If you have count index, only then do serially. Not on IsList.
use uniq[0]
worker/executor.go, line 180 at r3 (raw file):
for _, i := range toRun { go func(j *mutation) { e.workerChan <- j
Maybe see if you don't need channels. You could just create goroutines to do the work. Do use throttle to limit how many goroutines you do create.
worker/executor.go, line 193 at r3 (raw file):
for payload := range ch { conflicts := generateConflictKeys(payload) m := &mutation{m: payload, keys: conflicts, outEdges: make(map[uint64]*mutation), graph: g, inDeg: 0}
100 chars.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 2 files reviewed, 4 unresolved discussions (waiting on @manishrjain)
worker/executor.go, line 71 at r3 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Move this closer to usage.
var keys []uint64
Done.
worker/executor.go, line 81 at r3 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
If you have count index, only then do serially. Not on IsList.
use
uniq[0]
Done.
worker/executor.go, line 180 at r3 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Maybe see if you don't need channels. You could just create goroutines to do the work. Do use throttle to limit how many goroutines you do create.
Done.
worker/executor.go, line 193 at r3 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
100 chars.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 2 files reviewed, 19 unresolved discussions (waiting on @harshil-goel, @manishrjain, @martinmr, and @vvbalaji-dgraph)
posting/list.go, line 437 at r4 (raw file):
} func GetConflictKeys(pk x.ParsedKey, key []byte, t *pb.DirectedEdge) uint64 {
Its returning single uint64, should we call it GetConflictKey
instead of GetConflictKeys
?
worker/executor.go, line 59 at r4 (raw file):
func newExecutor(applied *y.WaterMark) *executor { runtime.SetBlockProfileRate(1)
Remove this line.
worker/executor.go, line 64 at r4 (raw file):
closer: y.NewCloser(0), applied: applied, throttle: y.NewThrottle(2000),
Comment about default value of 2000
would be nice.
worker/executor.go, line 72 at r4 (raw file):
func generateTokenKeys(nq *pb.DirectedEdge, tokenizers []tok.Tokenizer) ([]uint64, error) { keys := make([]uint64, len(tokenizers))
Looks it it should be keys := make([]uint64, 0, len(tokenizers))
worker/executor.go, line 82 at r4 (raw file):
schemaVal, err := types.Convert(storageVal, types.TypeID(nq.GetValueType())) if err != nil { errs = append(errs, err.Error())
instead of calling err.Error()
lets have []error only at top.
Also if there is an error here, should be continue further processing?
worker/executor.go, line 87 at r4 (raw file):
nq.Lang)) if err != nil { errs = append(errs, err.Error())
same here should we continue processing here even if there is error?
worker/executor.go, line 122 at r4 (raw file):
keys, err := generateTokenKeys(edge, tokenizers) for _, i := range keys {
instead of i call it key
worker/executor.go, line 138 at r4 (raw file):
type mutation struct { m *subMutation
we can call it sm.
worker/executor.go, line 139 at r4 (raw file):
type mutation struct { m *subMutation keys []uint64
we can call it conflictKeys.
worker/executor.go, line 142 at r4 (raw file):
inDeg int outEdges map[uint64]*mutation
we should call it dependent mutations.
worker/executor.go, line 194 at r4 (raw file):
dependent.inDeg -= 1 if dependent.inDeg == 0 { x.Check(e.throttle.Do())
Not sure, if we should crash here.
worker/executor.go, line 197 at r4 (raw file):
go func(d *mutation) { e.worker(d) }(dependent)
go e.worker(dependent)
worker/executor.go, line 202 at r4 (raw file):
for _, c := range mut.keys { i := 0
we should rename it to something else.
worker/executor.go, line 205 at r4 (raw file):
arr := mut.graph.conflicts[c] for _, x := range arr {
Add a comment that we are deleting the mutation here.
worker/executor.go, line 261 at r4 (raw file):
x.Check(e.throttle.Do()) go func() { e.worker(m)
go e.worker(m)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 3 files reviewed, 19 unresolved discussions (waiting on @ashish-goswami, @manishrjain, @martinmr, and @vvbalaji-dgraph)
posting/list.go, line 437 at r4 (raw file):
Previously, ashish-goswami (Ashish Goswami) wrote…
Its returning single uint64, should we call it
GetConflictKey
instead ofGetConflictKeys
?
Done.
worker/executor.go, line 59 at r4 (raw file):
Previously, ashish-goswami (Ashish Goswami) wrote…
Remove this line.
Done.
worker/executor.go, line 64 at r4 (raw file):
Previously, ashish-goswami (Ashish Goswami) wrote…
Comment about default value of
2000
would be nice.
Done.
worker/executor.go, line 72 at r4 (raw file):
Previously, ashish-goswami (Ashish Goswami) wrote…
Looks it it should be
keys := make([]uint64, 0, len(tokenizers))
Done.
worker/executor.go, line 82 at r4 (raw file):
Previously, ashish-goswami (Ashish Goswami) wrote…
instead of calling
err.Error()
lets have []error only at top.
Also if there is an error here, should be continue further processing?
Yeah, We just care about whatever conflict keys we can generate, to best avoid conflicting mutations running together.
I used error
instead of string
because we need to join these errors later, and would need to convert to string to do that.
worker/executor.go, line 87 at r4 (raw file):
Previously, ashish-goswami (Ashish Goswami) wrote…
same here should we continue processing here even if there is error?
Done.
worker/executor.go, line 122 at r4 (raw file):
Previously, ashish-goswami (Ashish Goswami) wrote…
instead of i call it key
Done.
worker/executor.go, line 138 at r4 (raw file):
Previously, ashish-goswami (Ashish Goswami) wrote…
we can call it sm.
Done.
worker/executor.go, line 139 at r4 (raw file):
Previously, ashish-goswami (Ashish Goswami) wrote…
we can call it conflictKeys.
Done.
worker/executor.go, line 142 at r4 (raw file):
Previously, ashish-goswami (Ashish Goswami) wrote…
we should call it dependent mutations.
Done.
worker/executor.go, line 194 at r4 (raw file):
Previously, ashish-goswami (Ashish Goswami) wrote…
Not sure, if we should crash here.
We always return nil, so it wouldn't crash. If it crashes, it's a bug.
worker/executor.go, line 197 at r4 (raw file):
Previously, ashish-goswami (Ashish Goswami) wrote…
go e.worker(dependent)
Done.
worker/executor.go, line 202 at r4 (raw file):
Previously, ashish-goswami (Ashish Goswami) wrote…
we should rename it to something else.
Done.
worker/executor.go, line 205 at r4 (raw file):
Previously, ashish-goswami (Ashish Goswami) wrote…
Add a comment that we are deleting the mutation here.
Done.
worker/executor.go, line 261 at r4 (raw file):
Previously, ashish-goswami (Ashish Goswami) wrote…
go e.worker(m)
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add UT?
worker/executor.go
Outdated
} | ||
|
||
func newExecutor(applied *y.WaterMark) *executor { | ||
ex := &executor{ | ||
predChan: make(map[string]chan *subMutation), | ||
closer: y.NewCloser(0), | ||
applied: applied, | ||
throttle: y.NewThrottle(2000), // Run 2000 mutations at a time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we make this configurable?
worker/executor.go
Outdated
} | ||
} | ||
|
||
keys := make([]uint64, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed, we can just return the map instead of the slice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 5 files reviewed, 21 unresolved discussions (waiting on @ashish-goswami, @manishrjain, @martinmr, @parasssh, and @vvbalaji-dgraph)
worker/executor.go, line 62 at r5 (raw file):
Previously, parasssh wrote…
should we make this configurable?
Done.
worker/executor.go, line 127 at r5 (raw file):
Previously, parasssh wrote…
as discussed, we can just return the map instead of the slice.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got some comments.
Reviewed 1 of 2 files at r5, 1 of 4 files at r6.
Reviewable status: 2 of 5 files reviewed, 25 unresolved discussions (waiting on @ashish-goswami, @harshil-goel, @manishrjain, @martinmr, @parasssh, and @vvbalaji-dgraph)
worker/executor.go, line 94 at r6 (raw file):
if len(errs) > 0 { return keys, fmt.Errorf(strings.Join(errs, "\n"))
Shouldn't his be nil, fmt....
worker/executor.go, line 106 at r6 (raw file):
pk, err := x.Parse(key) if err != nil { continue
Why are we skipping an error? Do we expect x.Parse to fail in any case?
worker/executor.go, line 118 at r6 (raw file):
} tokens, err := generateTokenKeys(edge, tokenizers)
err should be checked before you use tokens
.
worker/executor.go, line 181 at r6 (raw file):
e.throttle.Done(nil) mut.graph.Lock()
defer mut.graph.unlock(). Right now the unlock is at the end of the function which makes it hard to find.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 2 of 5 files reviewed, 25 unresolved discussions (waiting on @ashish-goswami, @jarifibrahim, @manishrjain, @martinmr, @parasssh, and @vvbalaji-dgraph)
worker/executor.go, line 94 at r6 (raw file):
Previously, jarifibrahim (Ibrahim Jarif) wrote…
Shouldn't his be
nil, fmt....
No. We need all the keys we can get to make sure the data's safe.
worker/executor.go, line 106 at r6 (raw file):
Previously, jarifibrahim (Ibrahim Jarif) wrote…
Why are we skipping an error? Do we expect x.Parse to fail in any case?
No, we don't. But let's log in any case.
worker/executor.go, line 118 at r6 (raw file):
Previously, jarifibrahim (Ibrahim Jarif) wrote…
err should be checked before you use
tokens
.
We continue if the err is not nil, err at this point is always nil.
worker/executor.go, line 181 at r6 (raw file):
Previously, jarifibrahim (Ibrahim Jarif) wrote…
defer mut.graph.unlock(). Right now the unlock is at the end of the function which makes it hard to find.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 2 of 5 files reviewed, 23 unresolved discussions (waiting on @ashish-goswami, @harshil-goel, @jarifibrahim, @manishrjain, @martinmr, @parasssh, and @vvbalaji-dgraph)
worker/executor.go, line 106 at r7 (raw file):
pk, err := x.Parse(key) if err != nil { glog.V(2).Info("Error in generating conflic keys", err)
conflic => conflict
Added topological sort to ludicrous mode executor to parallelize execution in one predicate.
Master: 21 Million dataset: 3m 15 sec
This PR: 21 Million dataset: 2m 25 sec
Fixes #DGRAPH-1357
This change is
Docs Preview: