Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(schema-update): Start opIndexing only when index creation is required. #7845

Merged
merged 5 commits into from
May 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions edgraph/multi_tenancy_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@ func (s *Server) CreateNamespace(ctx context.Context, passwd string) (uint64, er
return 0, err
}

if err = worker.WaitForIndexing(ctx, true); err != nil {
return 0, errors.Wrap(err, "Creating namespace, got error: ")
}
err = x.RetryUntilSuccess(10, 100*time.Millisecond, func() error {
return createGuardianAndGroot(ctx, ids.StartId, passwd)
})
Expand Down
7 changes: 7 additions & 0 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ func (n *node) waitForTask(id op) {
closer.Wait()
}

func (n *node) isRunningTask(id op) bool {
n.opsLock.Lock()
_, ok := n.ops[id]
n.opsLock.Unlock()
return ok
}

func (n *node) stopAllTasks() {
defer n.closer.Done() // CLOSER:1
<-n.closer.HasBeenClosed()
Expand Down
27 changes: 16 additions & 11 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,6 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs
}
}

// Ensure that rollup is not running.
closer, err := gr.Node.startTask(opIndexing)
if err != nil {
return err
}
defer stopIndexing(closer)

buildIndexesHelper := func(update *pb.SchemaUpdate, rebuild posting.IndexRebuild) error {
wrtCtx := schema.GetWriteContext(context.Background())
if err := rebuild.BuildIndexes(wrtCtx); err != nil {
Expand Down Expand Up @@ -214,9 +207,9 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs
// "Too many open files" error.
throttle := y.NewThrottle(maxOpenFileLimit / 8)

buildIndexes := func(update *pb.SchemaUpdate, rebuild posting.IndexRebuild) {
buildIndexes := func(update *pb.SchemaUpdate, rebuild posting.IndexRebuild, c *z.Closer) {
// In case background indexing is running, we should call it here again.
defer stopIndexing(closer)
defer stopIndexing(c)

// We should only start building indexes once this function has returned.
// This is in order to ensure that we do not call DropPrefix for one predicate
Expand All @@ -233,6 +226,7 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs
throttle.Done(nil)
}

var closer *z.Closer
for _, su := range updates {
if tablet, err := groups().Tablet(su.Predicate); err != nil {
return err
Expand All @@ -251,6 +245,17 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs
OldSchema: &old,
CurrentSchema: su,
}
shouldRebuild := ok && rebuild.NeedIndexRebuild()

// Start opIndexing task only if schema update needs to build the indexes.
if shouldRebuild && !gr.Node.isRunningTask(opIndexing) {
closer, err = gr.Node.startTask(opIndexing)
if err != nil {
return err
}
defer stopIndexing(closer)
}

querySchema := rebuild.GetQuerySchema()
// Sets the schema only in memory. The schema is written to
// disk only after schema mutations are successful.
Expand All @@ -273,8 +278,8 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs
return err
}

if ok && rebuild.NeedIndexRebuild() {
go buildIndexes(su, rebuild)
if shouldRebuild {
go buildIndexes(su, rebuild, closer)
} else if err := updateSchema(su); err != nil {
return err
}
Expand Down