Skip to content

Commit

Permalink
fix(coordinator): choose highest term and entry node as leader (#579)
Browse files Browse the repository at this point in the history
### Motivation

Follow the
[TLA+](https://github.com/streamnative/oxia/blob/6d2bf5de8ffd027804aab6302b80a5118367b061/tlaplus/OxiaReplication.tla#L508).
We should choose the highest term entry node as the leader, which can
also fix the issue we encountered.

- The leader can't truncate the following because of the [protection
logic](https://github.com/streamnative/oxia/blob/6d2bf5de8ffd027804aab6302b80a5118367b061/server/leader_controller.go#L511-L515).


```
{"level":"info","time":"2024-11-14T01:04:55.195566194Z","component":"shard-controller","namespace":"xxx","shard":10,"term":278,"time":"2024-11-14T01:04:55.195588258Z","message":"Starting leader election"}
{"level":"info","time":"2024-11-14T01:04:55.218118449Z","component":"shard-controller","entry-id":{"term":"78","offset":"5652112"},"namespace":"xxx","server-address":{"public":"xxxxxxx:6648","internal":"xxxxxxx:6649"},"shard":10,"time":"2024-11-14T01:04:55.218152924Z","message":"Processed newTerm response"}
{"level":"info","time":"2024-11-14T01:04:55.218347701Z","component":"shard-controller","entry-id":{"term":"78","offset":"5652112"},"namespace":"xxx","server-address":{"public":"xxxxxxx:6648","internal":"xxxxxxx:6649"},"shard":10,"time":"2024-11-14T01:04:55.218369944Z","message":"Processed newTerm response"}
{"level":"info","time":"2024-11-14T01:04:55.219517988Z","component":"shard-controller","entry-id":{"term":"77","offset":"5653106"},"namespace":"xxx","server-address":{"public":"xxxxxxx:6648","internal":"xxxxxxx:6649"},"shard":10,"time":"2024-11-14T01:04:55.219534113Z","message":"Processed newTerm response"}
{"level":"info","time":"2024-11-14T01:04:55.219554195Z","component":"shard-controller","followers":[{"server-address":{"public":"xxxxxxx:6648","internal":"xxxxxxx:6649"},"entry-id":{"term":78,"offset":5652112}},{"server-address":{"public":"xxxxxxx:6648","internal":"xxxxxxx:6649"},"entry-id":{"term":78,"offset":5652112}}],"namespace":"xxx","new-leader":{"public":"xxxxxxx:6648","internal":"xxxxxxx:6649"},"shard":10,"term":278,"time":"2024-11-14T01:04:55.219564969Z","message":"Successfully moved ensemble to a new term"}
```

Co-authored-by: Matteo Merli <[email protected]>
  • Loading branch information
mattisonchao and merlimat authored Nov 16, 2024
1 parent e7ef873 commit 02d3f5c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 11 deletions.
27 changes: 16 additions & 11 deletions coordinator/impl/shard_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (s *shardController) electLeader() error {
return err
}

newLeader, followers := s.selectNewLeader(fr)
newLeader, followers := selectNewLeader(fr)

if s.log.Enabled(context.Background(), slog.LevelInfo) {
f := make([]struct {
Expand Down Expand Up @@ -655,22 +655,27 @@ func (s *shardController) deleteShardRpc(ctx context.Context, node model.ServerA
return err
}

func (*shardController) selectNewLeader(newTermResponses map[model.ServerAddress]*proto.EntryId) (
func selectNewLeader(newTermResponses map[model.ServerAddress]*proto.EntryId) (
leader model.ServerAddress, followers map[model.ServerAddress]*proto.EntryId) {
// Select all the nodes that have the highest term first
var currentMaxTerm int64 = -1
// Select all the nodes that have the highest entry in the wal
var currentMax int64 = -1
var candidates []model.ServerAddress

for addr, headEntryId := range newTermResponses {
switch {
case headEntryId.Offset < currentMax:
continue
case headEntryId.Offset == currentMax:
candidates = append(candidates, addr)
default:
// Found a new max
currentMax = headEntryId.Offset
candidates = []model.ServerAddress{addr}
if headEntryId.Term >= currentMaxTerm {
currentMaxTerm = headEntryId.Term
switch {
case headEntryId.Offset < currentMax:
continue
case headEntryId.Offset == currentMax:
candidates = append(candidates, addr)
default:
// Found a new max
currentMax = headEntryId.Offset
candidates = []model.ServerAddress{addr}
}
}
}

Expand Down
18 changes: 18 additions & 0 deletions coordinator/impl/shard_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,24 @@ var namespaceConfig = &model.NamespaceConfig{
NotificationsEnabled: common.OptBooleanDefaultTrue{},
}

func TestLeaderElection_ShouldChooseHighestTerm(t *testing.T) {
s1 := model.ServerAddress{Public: "1", Internal: "1"}
s2 := model.ServerAddress{Public: "2", Internal: "2"}
s3 := model.ServerAddress{Public: "3", Internal: "3"}
candidates := map[model.ServerAddress]*proto.EntryId{
s1: {Term: 200, Offset: 2480},
s2: {Term: 200, Offset: 2500},
s3: {Term: 198, Offset: 3000},
}
leader, followers := selectNewLeader(candidates)
assert.EqualValues(t, leader, s2)
assert.EqualValues(t, 2, len(followers))
_, exist := followers[s1]
assert.True(t, exist)
_, exist = followers[s3]
assert.True(t, exist)
}

func TestShardController(t *testing.T) {
var shard int64 = 5
rpc := newMockRpcProvider()
Expand Down

0 comments on commit 02d3f5c

Please sign in to comment.