Skip to content

Commit

Permalink
feat(GraphQL): Zero HTTP endpoints are now available at GraphQL admin…
Browse files Browse the repository at this point in the history
… (GRAPHQL-1118) (#6649)

This PR makes Zero HTTP endpoints available in GraphQL admin. It also adds a flag `--disable_admin_http` in Zero which can be used to disable the administrative HTTP endpoints in Zero. The following are considered as the administrative endpoints for Zero:
* `/state`
* `/removeNode`
* `/moveTablet`
* `/assign`
* `/enterpriseLicense`

RFC: https://discuss.dgraph.io/t/moving-zero-http-endpoints-to-alpha-graphql-admin/10725
(cherry picked from commit 07be3c9)

# Conflicts:
#	dgraph/cmd/zero/run.go
#	protos/pb/pb.pb.go
  • Loading branch information
abhimanyusinghgaur committed Mar 31, 2021
1 parent 3f9effa commit df6e319
Show file tree
Hide file tree
Showing 24 changed files with 2,012 additions and 535 deletions.
67 changes: 23 additions & 44 deletions dgraph/cmd/zero/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (st *state) assign(w http.ResponseWriter, r *http.Request) {
return
}

num := &pb.Num{Val: uint64(val)}
num := &pb.Num{Val: val}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

Expand All @@ -87,7 +87,7 @@ func (st *state) assign(w http.ResponseWriter, r *http.Request) {
ids, err = st.zero.AssignIds(ctx, num)
default:
x.SetStatus(w, x.Error,
fmt.Sprintf("Invalid what: [%s]. Must be one of uids or timestamps", what))
fmt.Sprintf("Invalid what: [%s]. Must be one of: [uids, timestamps, nsids]", what))
return
}
if err != nil {
Expand Down Expand Up @@ -124,7 +124,10 @@ func (st *state) removeNode(w http.ResponseWriter, r *http.Request) {
return
}

if err := st.zero.removeNode(context.Background(), nodeId, uint32(groupId)); err != nil {
if _, err := st.zero.RemoveNode(
context.Background(),
&pb.RemoveNodeRequest{NodeId: nodeId, GroupId: uint32(groupId)},
); err != nil {
x.SetStatus(w, x.Error, err.Error())
return
}
Expand Down Expand Up @@ -154,8 +157,6 @@ func (st *state) moveTablet(w http.ResponseWriter, r *http.Request) {
return
}

tablet := r.URL.Query().Get("tablet")

namespace := r.URL.Query().Get("namespace")
namespace = strings.TrimSpace(namespace)
ns := x.GalaxyNamespace
Expand All @@ -168,7 +169,7 @@ func (st *state) moveTablet(w http.ResponseWriter, r *http.Request) {
}
}

tablet = x.NamespaceAttr(ns, tablet)
tablet := r.URL.Query().Get("tablet")
if len(tablet) == 0 {
w.WriteHeader(http.StatusBadRequest)
x.SetStatus(w, x.ErrorInvalidRequest, "tablet is a mandatory query parameter")
Expand All @@ -178,50 +179,28 @@ func (st *state) moveTablet(w http.ResponseWriter, r *http.Request) {
groupId, ok := intFromQueryParam(w, r, "group")
if !ok {
w.WriteHeader(http.StatusBadRequest)
x.SetStatus(w, x.ErrorInvalidRequest, fmt.Sprintf(
"Query parameter 'group' should contain a valid integer."))
return
}
dstGroup := uint32(groupId)
knownGroups := st.zero.KnownGroups()
var isKnown bool
for _, grp := range knownGroups {
if grp == dstGroup {
isKnown = true
break
}
}
if !isKnown {
w.WriteHeader(http.StatusBadRequest)
x.SetStatus(w, x.ErrorInvalidRequest, fmt.Sprintf("Group: [%d] is not a known group.",
dstGroup))
return
}

tab := st.zero.ServingTablet(tablet)
if tab == nil {
w.WriteHeader(http.StatusBadRequest)
x.SetStatus(w, x.ErrorInvalidRequest, fmt.Sprintf("No tablet found for: %s", tablet))
return
}

srcGroup := tab.GroupId
if srcGroup == dstGroup {
w.WriteHeader(http.StatusInternalServerError)
x.SetStatus(w, x.ErrorInvalidRequest,
fmt.Sprintf("Tablet: [%s] is already being served by group: [%d]", tablet, srcGroup))
"Query parameter 'group' should contain a valid integer.")
return
}
dstGroup := uint32(groupId)

if err := st.zero.movePredicate(tablet, srcGroup, dstGroup); err != nil {
glog.Errorf("While moving predicate %s from %d -> %d. Error: %v",
tablet, srcGroup, dstGroup, err)
w.WriteHeader(http.StatusInternalServerError)
x.SetStatus(w, x.Error, err.Error())
var resp *pb.Status
var err error
if resp, err = st.zero.MoveTablet(
context.Background(),
&pb.MoveTabletRequest{Namespace: ns, Tablet: tablet, DstGroup: dstGroup},
); err != nil {
if resp.GetMsg() == x.ErrorInvalidRequest {
w.WriteHeader(http.StatusBadRequest)
x.SetStatus(w, x.ErrorInvalidRequest, err.Error())
} else {
w.WriteHeader(http.StatusInternalServerError)
x.SetStatus(w, x.Error, err.Error())
}
return
}
_, err := fmt.Fprintf(w, "Predicate: [%s] moved from group [%d] to [%d]",
tablet, srcGroup, dstGroup)
_, err = fmt.Fprint(w, resp.GetMsg())
if err != nil {
glog.Warningf("Error while writing response: %+v", err)
}
Expand Down
5 changes: 2 additions & 3 deletions dgraph/cmd/zero/license_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
package zero

import (
"bytes"
"context"
"io/ioutil"
"math"
Expand Down Expand Up @@ -139,7 +138,7 @@ func (st *state) applyEnterpriseLicense(w http.ResponseWriter, r *http.Request)

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
if err := st.zero.applyLicense(ctx, bytes.NewReader(b)); err != nil {
if _, err := st.zero.ApplyLicense(ctx, &pb.ApplyLicenseRequest{License: b}); err != nil {
w.WriteHeader(http.StatusBadRequest)
x.SetStatus(w, x.ErrorInvalidRequest, err.Error())
return
Expand All @@ -157,7 +156,7 @@ func (s *Server) applyLicenseFile(path string) {
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
if err = s.applyLicense(ctx, bytes.NewReader(content)); err != nil {
if _, err = s.ApplyLicense(ctx, &pb.ApplyLicenseRequest{License: content}); err != nil {
glog.Infof("Unable to apply license at %v due to error %v", path, err)
}
}
15 changes: 10 additions & 5 deletions dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ instances to achieve high-availability.
flag.StringP("wal", "w", "zw", "Directory storing WAL.")
flag.Duration("rebalance_interval", 8*time.Minute, "Interval for trying a predicate move.")
flag.String("enterprise_license", "", "Path to the enterprise license file.")
flag.Bool("disable_admin_http", false,
"Turn on/off the administrative endpoints exposed over Zero's HTTP port.")

flag.String("limit", worker.ZeroLimitsDefaults, z.NewSuperFlagHelp(worker.ZeroLimitsDefaults).
Head("Limit options").
Expand Down Expand Up @@ -325,11 +327,14 @@ func run() {
http.Handle("/", audit.AuditRequestHttp(baseMux))

baseMux.HandleFunc("/health", st.pingResponse)
baseMux.HandleFunc("/state", st.getState)
baseMux.HandleFunc("/removeNode", st.removeNode)
baseMux.HandleFunc("/moveTablet", st.moveTablet)
baseMux.HandleFunc("/assign", st.assign)
baseMux.HandleFunc("/enterpriseLicense", st.applyEnterpriseLicense)
// the following endpoints are disabled only if the flag is explicitly set to true
if !Zero.Conf.GetBool("disable_admin_http") {
baseMux.HandleFunc("/state", st.getState)
baseMux.HandleFunc("/removeNode", st.removeNode)
baseMux.HandleFunc("/moveTablet", st.moveTablet)
baseMux.HandleFunc("/assign", st.assign)
baseMux.HandleFunc("/enterpriseLicense", st.applyEnterpriseLicense)
}
baseMux.HandleFunc("/debug/jemalloc", x.JemallocHandler)
zpages.Handle(baseMux, "/debug/z")

Expand Down
46 changes: 46 additions & 0 deletions dgraph/cmd/zero/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,52 @@ func (s *Server) rebalanceTablets() {
}
}

// MoveTablet can be used to move a tablet to a specific group.
// It takes in tablet and destination group as argument.
// It returns a *pb.Status to be used by the `/moveTablet` HTTP handler in Zero.
func (s *Server) MoveTablet(ctx context.Context, req *pb.MoveTabletRequest) (*pb.Status, error) {
if !s.Node.AmLeader() {
return &pb.Status{Code: 1, Msg: x.Error}, errNotLeader
}

knownGroups := s.KnownGroups()
var isKnown bool
for _, grp := range knownGroups {
if grp == req.DstGroup {
isKnown = true
break
}
}
if !isKnown {
return &pb.Status{Code: 1, Msg: x.ErrorInvalidRequest},
fmt.Errorf("Group: [%d] is not a known group.", req.DstGroup)
}

tablet := x.NamespaceAttr(req.Namespace, req.Tablet)
tab := s.ServingTablet(tablet)
if tab == nil {
return &pb.Status{Code: 1, Msg: x.ErrorInvalidRequest},
fmt.Errorf("namespace: %d. No tablet found for: %s", req.Namespace, req.Tablet)
}

srcGroup := tab.GroupId
if srcGroup == req.DstGroup {
return &pb.Status{Code: 1, Msg: x.ErrorInvalidRequest},
fmt.Errorf("namespace: %d. Tablet: [%s] is already being served by group: [%d]",
req.Namespace, req.Tablet, srcGroup)
}

if err := s.movePredicate(tablet, srcGroup, req.DstGroup); err != nil {
glog.Errorf("namespace: %d. While moving predicate %s from %d -> %d. Error: %v",
req.Namespace, req.Tablet, srcGroup, req.DstGroup, err)
return &pb.Status{Code: 1, Msg: x.Error}, err
}

return &pb.Status{Code: 0, Msg: fmt.Sprintf("namespace: %d. "+
"Predicate: [%s] moved from group [%d] to [%d]", req.Namespace, req.Tablet, srcGroup,
req.DstGroup)}, nil
}

// movePredicate is the main entry point for move predicate logic. This Zero must remain the leader
// for the entire duration of predicate move. If this Zero stops being the leader, the final
// proposal of reassigning the tablet to the destination would fail automatically.
Expand Down
46 changes: 27 additions & 19 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package zero

import (
"bytes"
"context"
"crypto/tls"
"io"
"math"
"strings"
"sync"
Expand Down Expand Up @@ -405,26 +405,32 @@ func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {
return res, nil
}

// removeNode removes the given node from the given group.
// RemoveNode removes the given node from the given group.
// It's the user's responsibility to ensure that node doesn't come back again
// before calling the api.
func (s *Server) removeNode(ctx context.Context, nodeId uint64, groupId uint32) error {
if groupId == 0 {
return s.Node.ProposePeerRemoval(ctx, nodeId)
func (s *Server) RemoveNode(ctx context.Context, req *pb.RemoveNodeRequest) (*pb.Status, error) {
if req.GroupId == 0 {
return nil, s.Node.ProposePeerRemoval(ctx, req.NodeId)
}
zp := &pb.ZeroProposal{}
zp.Member = &pb.Member{Id: nodeId, GroupId: groupId, AmDead: true}
if _, ok := s.state.Groups[groupId]; !ok {
return errors.Errorf("No group with groupId %d found", groupId)
zp.Member = &pb.Member{Id: req.NodeId, GroupId: req.GroupId, AmDead: true}
if _, ok := s.state.Groups[req.GroupId]; !ok {
return nil, errors.Errorf("No group with groupId %d found", req.GroupId)
}
if _, ok := s.state.Groups[groupId].Members[nodeId]; !ok {
return errors.Errorf("No node with nodeId %d found in group %d", nodeId, groupId)
if _, ok := s.state.Groups[req.GroupId].Members[req.NodeId]; !ok {
return nil, errors.Errorf("No node with nodeId %d found in group %d", req.NodeId,
req.GroupId)
}
if len(s.state.Groups[groupId].Members) == 1 && len(s.state.Groups[groupId].Tablets) > 0 {
return errors.Errorf("Move all tablets from group %d before removing the last node", groupId)
if len(s.state.Groups[req.GroupId].Members) == 1 && len(s.state.Groups[req.GroupId].
Tablets) > 0 {
return nil, errors.Errorf("Move all tablets from group %d before removing the last node",
req.GroupId)
}
if err := s.Node.proposeAndWait(ctx, zp); err != nil {
return nil, err
}

return s.Node.proposeAndWait(ctx, zp)
return &pb.Status{}, nil
}

// Connect is used by Alpha nodes to connect the very first time with group zero.
Expand Down Expand Up @@ -805,19 +811,21 @@ func (s *Server) latestMembershipState(ctx context.Context) (*pb.MembershipState
return ms, nil
}

func (s *Server) applyLicense(ctx context.Context, signedData io.Reader) error {
func (s *Server) ApplyLicense(ctx context.Context, req *pb.ApplyLicenseRequest) (*pb.Status,
error) {
var l license
signedData := bytes.NewReader(req.License)
if err := verifySignature(signedData, strings.NewReader(publicKey), &l); err != nil {
return errors.Wrapf(err, "while extracting enterprise details from the license")
return nil, errors.Wrapf(err, "while extracting enterprise details from the license")
}

numNodes := len(s.state.GetZeros())
for _, group := range s.state.GetGroups() {
numNodes += len(group.GetMembers())
}
if uint64(numNodes) > l.MaxNodes {
return errors.Errorf("Your license only allows [%v] (Alpha + Zero) nodes. You have: [%v].",
l.MaxNodes, numNodes)
return nil, errors.Errorf("Your license only allows [%v] (Alpha + Zero) nodes. "+
"You have: [%v].", l.MaxNodes, numNodes)
}

proposal := &pb.ZeroProposal{
Expand All @@ -830,8 +838,8 @@ func (s *Server) applyLicense(ctx context.Context, signedData io.Reader) error {

err := s.Node.proposeAndWait(ctx, proposal)
if err != nil {
return errors.Wrapf(err, "while proposing enterprise license state to cluster")
return nil, errors.Wrapf(err, "while proposing enterprise license state to cluster")
}
glog.Infof("Enterprise license proposed to the cluster %+v", proposal)
return nil
return &pb.Status{}, nil
}
4 changes: 2 additions & 2 deletions dgraph/cmd/zero/zero_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func TestRemoveNode(t *testing.T) {
Groups: map[uint32]*pb.Group{1: {Members: map[uint64]*pb.Member{}}},
},
}
err := server.removeNode(context.TODO(), 3, 1)
_, err := server.RemoveNode(context.TODO(), &pb.RemoveNodeRequest{NodeId: 3, GroupId: 1})
require.Error(t, err)
err = server.removeNode(context.TODO(), 1, 2)
_, err = server.RemoveNode(context.TODO(), &pb.RemoveNodeRequest{NodeId: 1, GroupId: 2})
require.Error(t, err)
}
64 changes: 64 additions & 0 deletions ee/acl/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2736,6 +2736,70 @@ func TestGuardianOnlyAccessForAdminEndpoints(t *testing.T) {
guardianErr: "The path \"\" does not exist or it is inaccessible.",
guardianData: `{"restore": {"code": "Failure"}}`,
},
{
name: "removeNode has guardian auth",
query: `
mutation {
removeNode(input: {nodeId: 1, groupId: 2147483640}) {
response {
code
}
}
}`,
queryName: "removeNode",
testGuardianAccess: true,
guardianErr: "No group with groupId 2147483640 found",
guardianData: `{"removeNode": null}`,
},
{
name: "moveTablet has guardian auth",
query: `
mutation {
moveTablet(input: {tablet: "non_existent_pred", groupId: 2147483640}) {
response {
code
message
}
}
}`,
queryName: "moveTablet",
testGuardianAccess: true,
guardianErr: "Group: [2147483640] is not a known group.",
guardianData: `{"moveTablet": null}`,
},
{
name: "assign has guardian auth",
query: `
mutation {
assign(input: {what: UID, num: 0}) {
response {
startId
endId
readOnly
}
}
}`,
queryName: "assign",
testGuardianAccess: true,
guardianErr: "Nothing to be leased",
guardianData: `{"assign": null}`,
},
{
name: "enterpriseLicense has guardian auth",
query: `
mutation {
enterpriseLicense(input: {license: ""}) {
response {
code
}
}
}`,
queryName: "enterpriseLicense",
testGuardianAccess: true,
guardianErr: "while extracting enterprise details from the license: while decoding" +
" license file: EOF",
guardianData: `{"enterpriseLicense": null}`,
},
{
name: "getGQLSchema has guardian auth",
query: `
Expand Down
Loading

0 comments on commit df6e319

Please sign in to comment.