Skip to content

Commit

Permalink
relase/v20.07 - Fix(Alpha): MASA: Make Alpha Shutdown Again (#6313) (#…
Browse files Browse the repository at this point in the history
…6402)

* Fix(Alpha): MASA: Make Alpha Shutdown Again (#6313)

This PR removes the usage of context.Background() in groups.go and ensures that all the various goroutines exit as intended.

Changes:
* Shutdown SubscribeForUpdates correctly.
* Fix up all the closing conditions.
* Consolidate updaters into one closer
* Update Badger to master
* fix(build): Update ResetAcl args for OSS build.
* chore: Remove TODO comment.

Co-authored-by: Daniel Mai <[email protected]>
(cherry picked from commit f1941b3)
  • Loading branch information
Ibrahim Jarif authored Sep 4, 2020
1 parent 18cc5f9 commit 88994d0
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 97 deletions.
23 changes: 17 additions & 6 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,14 +758,13 @@ func run() {
}
}()

// Setup external communication.
aclCloser := z.NewCloser(1)
updaters := z.NewCloser(2)
go func() {
worker.StartRaftNodes(worker.State.WALstore, bindall)
// initialization of the admin account can only be done after raft nodes are running
// and health check passes
edgraph.ResetAcl()
edgraph.RefreshAcls(aclCloser)
edgraph.ResetAcl(updaters)
edgraph.RefreshAcls(updaters)
}()

// Graphql subscribes to alpha to get schema updates. We need to close that before we
Expand All @@ -774,11 +773,23 @@ func run() {

setupServer(adminCloser)
glog.Infoln("GRPC and HTTP stopped.")
aclCloser.SignalAndWait()

// This might not close until group is given the signal to close. So, only signal here,
// wait for it after group is closed.
updaters.Signal()

worker.BlockingStop()
glog.Infoln("worker stopped.")

adminCloser.SignalAndWait()
glog.Info("Disposing server state.")
glog.Infoln("adminCloser closed.")

worker.State.Dispose()
x.RemoveCidFile()
glog.Info("worker.State disposed.")

updaters.Wait()
glog.Infoln("updaters closed.")

glog.Infoln("Server shutdown. Bye!")
}
7 changes: 6 additions & 1 deletion dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,5 +373,10 @@ func run() {

glog.Infoln("Running Dgraph Zero...")
st.zero.closer.Wait()
glog.Infoln("All done.")
glog.Infoln("Closer closed.")

err = kv.Close()
glog.Infof("Badger closed with err: %v\n", err)

glog.Infoln("All done. Goodbye!")
}
2 changes: 1 addition & 1 deletion edgraph/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *Server) Login(ctx context.Context,
}

// ResetAcl is an empty method since ACL is only supported in the enterprise version.
func ResetAcl() {
func ResetAcl(closer *z.Closer) {
// do nothing
}

Expand Down
35 changes: 20 additions & 15 deletions edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,15 +305,15 @@ func authorizeUser(ctx context.Context, userid string, password string) (

// RefreshAcls queries for the ACL triples and refreshes the ACLs accordingly.
func RefreshAcls(closer *z.Closer) {
defer closer.Done()
defer func() {
glog.Infoln("RefreshAcls closed")
closer.Done()
}()
if len(worker.Config.HmacSecret) == 0 {
// the acl feature is not turned on
return
}

ticker := time.NewTicker(worker.Config.AclRefreshInterval)
defer ticker.Stop()

// retrieve the full data set of ACLs from the corresponding alpha server, and update the
// aclCachePtr
retrieveAcls := func() error {
Expand All @@ -323,9 +323,7 @@ func RefreshAcls(closer *z.Closer) {
ReadOnly: true,
}

ctx := context.Background()
var err error
queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, NoAuthorize)
queryResp, err := (&Server{}).doQuery(closer.Ctx(), &queryRequest, NoAuthorize)
if err != nil {
return errors.Errorf("unable to retrieve acls: %v", err)
}
Expand All @@ -339,14 +337,16 @@ func RefreshAcls(closer *z.Closer) {
return nil
}

ticker := time.NewTicker(worker.Config.AclRefreshInterval)
defer ticker.Stop()
for {
select {
case <-closer.HasBeenClosed():
return
case <-ticker.C:
if err := retrieveAcls(); err != nil {
glog.Errorf("Error while retrieving acls:%v", err)
glog.Errorf("Error while retrieving acls: %v", err)
}
case <-closer.HasBeenClosed():
return
}
}
}
Expand All @@ -367,7 +367,12 @@ const queryAcls = `
`

// ResetAcl clears the aclCachePtr and upserts the Groot account.
func ResetAcl() {
func ResetAcl(closer *z.Closer) {
defer func() {
glog.Infof("ResetAcl closed")
closer.Done()
}()

if len(worker.Config.HmacSecret) == 0 {
// The acl feature is not turned on.
return
Expand Down Expand Up @@ -434,8 +439,8 @@ func ResetAcl() {
return nil
}

for {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
for closer.Ctx().Err() == nil {
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
defer cancel()
if err := upsertGuardians(ctx); err != nil {
glog.Infof("Unable to upsert the guardian group. Error: %v", err)
Expand All @@ -445,8 +450,8 @@ func ResetAcl() {
break
}

for {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
for closer.Ctx().Err() == nil {
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
defer cancel()
if err := upsertGroot(ctx); err != nil {
glog.Infof("Unable to upsert the groot account. Error: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
// reset their in-memory GraphQL schema
_, err = UpdateGQLSchema(ctx, "", "")
// recreate the admin account after a drop all operation
ResetAcl()
ResetAcl(nil)
return empty, err
}

Expand All @@ -347,7 +347,7 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
// just reinsert the GraphQL schema, no need to alter dgraph schema as this was drop_data
_, err = UpdateGQLSchema(ctx, graphQLSchema, "")
// recreate the admin account after a drop data operation
ResetAcl()
ResetAcl(nil)
return empty, err
}

Expand Down
Loading

0 comments on commit 88994d0

Please sign in to comment.