Skip to content

Commit

Permalink
comments(zero): Add comments in Zero and clarify naming.
Browse files Browse the repository at this point in the history
  • Loading branch information
manishrjain authored and mangalaman93 committed Aug 10, 2023
1 parent 0474a00 commit 9f75d41
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 25 deletions.
44 changes: 24 additions & 20 deletions dgraph/cmd/zero/assign.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,21 @@ const (
func (s *Server) updateLeases() {
var startTs uint64
s.Lock()
s.nextLease[pb.Num_UID] = s.state.MaxUID + 1
s.nextLease[pb.Num_TXN_TS] = s.state.MaxTxnTs + 1
s.nextLease[pb.Num_NS_ID] = s.state.MaxNsID + 1
s.nextUint[pb.Num_UID] = s.state.MaxUID + 1
s.nextUint[pb.Num_TXN_TS] = s.state.MaxTxnTs + 1
s.nextUint[pb.Num_NS_ID] = s.state.MaxNsID + 1

startTs = s.nextLease[pb.Num_TXN_TS]
startTs = s.nextUint[pb.Num_TXN_TS]
glog.Infof("Updated UID: %d. Txn Ts: %d. NsID: %d.",
s.nextLease[pb.Num_UID], s.nextLease[pb.Num_TXN_TS], s.nextLease[pb.Num_NS_ID])
s.nextUint[pb.Num_UID], s.nextUint[pb.Num_TXN_TS], s.nextUint[pb.Num_NS_ID])
s.Unlock()
s.orc.updateStartTxnTs(startTs)
}

// maxLease keeps track of the various ID leases that we have already achieved
// quorum on. This Server can hand out IDs <= maxLease, without the need for any
// more quorum. If a new server becomes Zero leader, they'd renew this lease and
// advance maxLease before handing out new IDs.
func (s *Server) maxLease(typ pb.NumLeaseType) uint64 {
s.RLock()
defer s.RUnlock()
Expand Down Expand Up @@ -97,24 +101,24 @@ func (s *Server) lease(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error
// service it directly.
if glog.V(3) {
glog.Infof("Attempting to serve read only txn ts [%d, %d]",
s.readOnlyTs, s.nextLease[pb.Num_TXN_TS])
s.readOnlyTs, s.nextUint[pb.Num_TXN_TS])
}
if s.readOnlyTs > 0 && s.readOnlyTs == s.nextLease[pb.Num_TXN_TS]-1 {
if s.readOnlyTs > 0 && s.readOnlyTs == s.nextUint[pb.Num_TXN_TS]-1 {
return &pb.AssignedIds{ReadOnly: s.readOnlyTs}, errServedFromMemory
}
}
// We couldn't service it. So, let's request an extra timestamp for
// readonly transactions, if needed.
}
if s.nextLease[pb.Num_UID] == 0 || s.nextLease[pb.Num_TXN_TS] == 0 ||
s.nextLease[pb.Num_NS_ID] == 0 {
if s.nextUint[pb.Num_UID] == 0 || s.nextUint[pb.Num_TXN_TS] == 0 ||
s.nextUint[pb.Num_NS_ID] == 0 {
return nil, errors.New("Server not initialized")
}

// Calculate how many ids do we have available in memory, before we need to
// renew our lease.
maxLease := s.maxLease(typ)
available := maxLease - s.nextLease[typ] + 1
available := maxLease - s.nextUint[typ] + 1

// If we have less available than what we need, we need to renew our lease.
if available < num.Val+1 { // +1 for a potential readonly ts.
Expand All @@ -128,7 +132,7 @@ func (s *Server) lease(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error
}
if howMany < num.Val || maxLease+howMany < maxLease { // check for overflow.
return &emptyAssignedIds, errors.Errorf("Cannot lease %s as the limit has reached."+
" currMax:%d", typ, s.nextLease[typ]-1)
" currMax:%d", typ, s.nextUint[typ]-1)
}

var proposal pb.ZeroProposal
Expand All @@ -149,24 +153,24 @@ func (s *Server) lease(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error
out := &pb.AssignedIds{}
if typ == pb.Num_TXN_TS {
if num.Val > 0 {
out.StartId = s.nextLease[pb.Num_TXN_TS]
out.StartId = s.nextUint[pb.Num_TXN_TS]
out.EndId = out.StartId + num.Val - 1
s.nextLease[pb.Num_TXN_TS] = out.EndId + 1
s.nextUint[pb.Num_TXN_TS] = out.EndId + 1
}
if num.ReadOnly {
s.readOnlyTs = s.nextLease[pb.Num_TXN_TS]
s.nextLease[pb.Num_TXN_TS]++
s.readOnlyTs = s.nextUint[pb.Num_TXN_TS]
s.nextUint[pb.Num_TXN_TS]++
out.ReadOnly = s.readOnlyTs
}
s.orc.doneUntil.Begin(x.Max(out.EndId, out.ReadOnly))
} else if typ == pb.Num_UID {
out.StartId = s.nextLease[pb.Num_UID]
out.StartId = s.nextUint[pb.Num_UID]
out.EndId = out.StartId + num.Val - 1
s.nextLease[pb.Num_UID] = out.EndId + 1
s.nextUint[pb.Num_UID] = out.EndId + 1
} else if typ == pb.Num_NS_ID {
out.StartId = s.nextLease[pb.Num_NS_ID]
out.StartId = s.nextUint[pb.Num_NS_ID]
out.EndId = out.StartId + num.Val - 1
s.nextLease[pb.Num_NS_ID] = out.EndId + 1
s.nextUint[pb.Num_NS_ID] = out.EndId + 1

} else {
return out, errors.Errorf("Unknown lease type: %v\n", typ)
Expand Down Expand Up @@ -253,7 +257,7 @@ func (s *Server) AssignIds(ctx context.Context, num *pb.Num) (*pb.AssignedIds, e
// node is not the leader then the bump request will be forwarded to the leader by lease().
if num.GetBump() && s.Node.AmLeader() {
s.leaseLock.Lock()
cur := s.nextLease[num.GetType()] - 1
cur := s.nextUint[num.GetType()] - 1
s.leaseLock.Unlock()

// We need to lease more UIDs if bump request is more than current max lease.
Expand Down
12 changes: 7 additions & 5 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ type Server struct {
state *pb.MembershipState
nextRaftId uint64

nextLease map[pb.NumLeaseType]uint64
// nextUint is the uint64 which we can hand out next. See maxLease for the
// max ID leased via Zero quorum.
nextUint map[pb.NumLeaseType]uint64
readOnlyTs uint64
leaseLock sync.Mutex // protects nextUID, nextTxnTs, nextNsID and corresponding proposals.
rateLimiter *x.RateLimiter
Expand Down Expand Up @@ -90,11 +92,11 @@ func (s *Server) Init() {
Groups: make(map[uint32]*pb.Group),
Zeros: make(map[uint64]*pb.Member),
}
s.nextLease = make(map[pb.NumLeaseType]uint64)
s.nextUint = make(map[pb.NumLeaseType]uint64)
s.nextRaftId = 1
s.nextLease[pb.Num_UID] = 1
s.nextLease[pb.Num_TXN_TS] = 1
s.nextLease[pb.Num_NS_ID] = 1
s.nextUint[pb.Num_UID] = 1
s.nextUint[pb.Num_TXN_TS] = 1
s.nextUint[pb.Num_NS_ID] = 1
s.nextGroup = 1
s.leaderChangeCh = make(chan struct{}, 1)
s.closer = z.NewCloser(2) // grpc and http
Expand Down

0 comments on commit 9f75d41

Please sign in to comment.