diff --git a/example_groupbalancer_test.go b/example_groupbalancer_test.go new file mode 100644 index 000000000..48f44f8a8 --- /dev/null +++ b/example_groupbalancer_test.go @@ -0,0 +1,121 @@ +package kafka + +import ( + "context" + "encoding/json" + "io/ioutil" + "net/http" + "os" + "strings" + "time" +) + +// ExampleAWSRackLocal shows how the RackAffinityGroupBalancer can be used to +// pair up consumers with brokers in the same AWS availability zone. This code +// assumes that each brokers' rack is configured to be the name of the AZ in +// which it is running. +func ExampleAWSRackLocal() { + r := NewReader(ReaderConfig{ + Brokers: []string{"kafka:9092"}, + GroupID: "my-group", + Topic: "my-topic", + GroupBalancers: []GroupBalancer{ + RackAffinityGroupBalancer{Rack: findRack()}, + RangeGroupBalancer{}, + }, + }) + + r.ReadMessage(context.Background()) + + r.Close() +} + +// findRack is the basic rack resolver strategy for use in AWS. It supports +// * ECS with the task metadata endpoint enabled (returns the container +// instance's availability zone) +// * Linux EC2 (returns the instance's availability zone) +func findRack() string { + switch whereAmI() { + case "ecs": + return ecsAvailabilityZone() + case "ec2": + return ec2AvailabilityZone() + } + return "" +} + +const ecsContainerMetadataURI = "ECS_CONTAINER_METADATA_URI" + +// whereAmI determines which strategy the rack resolver should use. +func whereAmI() string { + // https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint.html + if os.Getenv(ecsContainerMetadataURI) != "" { + return "ecs" + } + // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/identify_ec2_instances.html + for _, path := range [...]string{ + "/sys/devices/virtual/dmi/id/product_uuid", + "/sys/hypervisor/uuid", + } { + b, err := ioutil.ReadFile(path) + if err != nil { + continue + } + s := string(b) + switch { + case strings.HasPrefix(s, "EC2"), strings.HasPrefix(s, "ec2"): + return "ec2" + } + } + return "somewhere" +} + +// ecsAvailabilityZone queries the task endpoint for the metadata URI that ECS +// injects into the ECS_CONTAINER_METADATA_URI variable in order to retrieve +// the availability zone where the task is running. +func ecsAvailabilityZone() string { + client := http.Client{ + Timeout: time.Second, + Transport: &http.Transport{ + DisableCompression: true, + DisableKeepAlives: true, + }, + } + r, err := client.Get(os.Getenv(ecsContainerMetadataURI) + "/task") + if err != nil { + return "" + } + defer r.Body.Close() + + var md struct { + AvailabilityZone string + } + if err := json.NewDecoder(r.Body).Decode(&md); err != nil { + return "" + } + return md.AvailabilityZone +} + +// ec2AvailabilityZone queries the metadata endpoint to discover the +// availability zone where this code is running. we avoid calling this function +// unless we know we're in EC2. Otherwise, in other environments, we would need +// to wait for the request to 169.254.169.254 to timeout before proceeding. +func ec2AvailabilityZone() string { + client := http.Client{ + Timeout: time.Second, + Transport: &http.Transport{ + DisableCompression: true, + DisableKeepAlives: true, + }, + } + r, err := client.Get("http://169.254.169.254/latest/meta-data/placement/availability-zone") + if err != nil { + return "" + } + defer r.Body.Close() + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return "" + } + return string(b) +} diff --git a/groupbalancer.go b/groupbalancer.go index 7e46cc7d4..8075512d2 100644 --- a/groupbalancer.go +++ b/groupbalancer.go @@ -1,6 +1,8 @@ package kafka -import "sort" +import ( + "sort" +) // GroupMember describes a single participant in a consumer group. type GroupMember struct { @@ -133,6 +135,156 @@ func (r RoundRobinGroupBalancer) AssignGroups(members []GroupMember, topicPartit return groupAssignments } +// RackAffinityGroupBalancer makes a best effort to pair up consumers with +// partitions whose leader is in the same rack. This strategy can have +// performance benefits by minimizing round trip latency between the consumer +// and the broker. In environments where network traffic across racks incurs +// charges (such as cross AZ data transfer in AWS), this strategy is also a cost +// optimization measure because it keeps network traffic within the local rack +// where possible. +// +// The primary objective is to spread partitions evenly across consumers with a +// secondary focus on maximizing the number of partitions where the leader and +// the consumer are in the same rack. For best affinity, it's recommended to +// have a balanced spread of consumers and partition leaders across racks. +// +// This balancer requires Kafka version 0.10.0.0+ or later. Earlier versions do +// not return the brokers' racks in the metadata request. +type RackAffinityGroupBalancer struct { + // Rack is the name of the rack where this consumer is running. It will be + // communicated to the consumer group leader via the UserData so that + // assignments can be made with affinity to the partition leader. + Rack string +} + +func (r RackAffinityGroupBalancer) ProtocolName() string { + return "rack-affinity" +} + +func (r RackAffinityGroupBalancer) AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments { + membersByTopic := make(map[string][]GroupMember) + for _, m := range members { + for _, t := range m.Topics { + membersByTopic[t] = append(membersByTopic[t], m) + } + } + + partitionsByTopic := make(map[string][]Partition) + for _, p := range partitions { + partitionsByTopic[p.Topic] = append(partitionsByTopic[p.Topic], p) + } + + assignments := GroupMemberAssignments{} + for topic := range membersByTopic { + topicAssignments := r.assignTopic(membersByTopic[topic], partitionsByTopic[topic]) + for member, parts := range topicAssignments { + memberAssignments, ok := assignments[member] + if !ok { + memberAssignments = make(map[string][]int) + assignments[member] = memberAssignments + } + memberAssignments[topic] = parts + } + } + return assignments +} + +func (r RackAffinityGroupBalancer) UserData() ([]byte, error) { + return []byte(r.Rack), nil +} + +func (r *RackAffinityGroupBalancer) assignTopic(members []GroupMember, partitions []Partition) map[string][]int { + zonedPartitions := make(map[string][]int) + for _, part := range partitions { + zone := part.Leader.Rack + zonedPartitions[zone] = append(zonedPartitions[zone], part.ID) + } + + zonedConsumers := make(map[string][]string) + for _, member := range members { + zone := string(member.UserData) + zonedConsumers[zone] = append(zonedConsumers[zone], member.ID) + } + + targetPerMember := len(partitions) / len(members) + remainder := len(partitions) % len(members) + assignments := make(map[string][]int) + + // assign as many as possible in zone. this will assign up to partsPerMember + // to each consumer. it will also prefer to allocate remainder partitions + // in zone if possible. + for zone, parts := range zonedPartitions { + consumers := zonedConsumers[zone] + if len(consumers) == 0 { + continue + } + + // don't over-allocate. cap partition assignments at the calculated + // target. + partsPerMember := len(parts) / len(consumers) + if partsPerMember > targetPerMember { + partsPerMember = targetPerMember + } + + for _, consumer := range consumers { + assignments[consumer] = append(assignments[consumer], parts[:partsPerMember]...) + parts = parts[partsPerMember:] + } + + // if we had enough partitions for each consumer in this zone to hit its + // target, attempt to use any leftover partitions to satisfy the total + // remainder by adding at most 1 partition per consumer. + leftover := len(parts) + if partsPerMember == targetPerMember { + if leftover > remainder { + leftover = remainder + } + if leftover > len(consumers) { + leftover = len(consumers) + } + remainder -= leftover + } + + // this loop covers the case where we're assigning extra partitions or + // if there weren't enough to satisfy the targetPerMember and the zoned + // partitions didn't divide evenly. + for i := 0; i < leftover; i++ { + assignments[consumers[i]] = append(assignments[consumers[i]], parts[i]) + } + parts = parts[leftover:] + + if len(parts) == 0 { + delete(zonedPartitions, zone) + } else { + zonedPartitions[zone] = parts + } + } + + // assign out remainders regardless of zone. + var remaining []int + for _, partitions := range zonedPartitions { + remaining = append(remaining, partitions...) + } + + for _, member := range members { + assigned := assignments[member.ID] + delta := targetPerMember - len(assigned) + // if it were possible to assign the remainder in zone, it's been taken + // care of already. now we will portion out any remainder to a member + // that can take it. + if delta >= 0 && remainder > 0 { + delta++ + remainder-- + } + if delta > 0 { + assignments[member.ID] = append(assigned, remaining[:delta]...) + remaining = remaining[delta:] + } + } + + return assignments +} + // findPartitions extracts the partition ids associated with the topic from the // list of Partitions provided func findPartitions(topic string, partitions []Partition) []int { diff --git a/groupbalancer_test.go b/groupbalancer_test.go index 5c0778179..bbea3551d 100644 --- a/groupbalancer_test.go +++ b/groupbalancer_test.go @@ -375,3 +375,316 @@ func TestFindMembersByTopicSortsByMemberID(t *testing.T) { }) } } + +func TestRackAffinityGroupBalancer(t *testing.T) { + t.Run("User Data", func(t *testing.T) { + t.Run("unknown zone", func(t *testing.T) { + b := RackAffinityGroupBalancer{} + zone, err := b.UserData() + if err != nil { + t.Fatal(err) + } + if string(zone) != "" { + t.Fatalf("expected empty zone but got %s", zone) + } + }) + + t.Run("configure zone", func(t *testing.T) { + b := RackAffinityGroupBalancer{Rack: "zone1"} + zone, err := b.UserData() + if err != nil { + t.Fatal(err) + } + if string(zone) != "zone1" { + t.Fatalf("expected zone1 az but got %s", zone) + } + }) + }) + + t.Run("Balance", func(t *testing.T) { + b := RackAffinityGroupBalancer{} + + brokers := map[string]Broker{ + "z1": {ID: 1, Rack: "z1"}, + "z2": {ID: 2, Rack: "z2"}, + "z3": {ID: 2, Rack: "z3"}, + "": {}, + } + + tests := []struct { + name string + memberCounts map[string]int + partitionCounts map[string]int + result map[string]map[string]int + }{ + { + name: "unknown and known zones", + memberCounts: map[string]int{ + "": 1, + "z1": 1, + "z2": 1, + }, + partitionCounts: map[string]int{ + "z1": 5, + "z2": 4, + "": 9, + }, + result: map[string]map[string]int{ + "z1": {"": 1, "z1": 5}, + "z2": {"": 2, "z2": 4}, + "": {"": 6}, + }, + }, + { + name: "all unknown", + memberCounts: map[string]int{ + "": 5, + }, + partitionCounts: map[string]int{ + "": 103, + }, + result: map[string]map[string]int{ + "": {"": 103}, + }, + }, + { + name: "remainder stays local", + memberCounts: map[string]int{ + "z1": 3, + "z2": 3, + "z3": 3, + }, + partitionCounts: map[string]int{ + "z1": 20, + "z2": 19, + "z3": 20, + }, + result: map[string]map[string]int{ + "z1": {"z1": 20}, + "z2": {"z2": 19}, + "z3": {"z3": 20}, + }, + }, + { + name: "imbalanced partitions", + memberCounts: map[string]int{ + "z1": 1, + "z2": 1, + "z3": 1, + }, + partitionCounts: map[string]int{ + "z1": 7, + "z2": 0, + "z3": 7, + }, + result: map[string]map[string]int{ + "z1": {"z1": 5}, + "z2": {"z1": 2, "z3": 2}, + "z3": {"z3": 5}, + }, + }, + { + name: "imbalanced members", + memberCounts: map[string]int{ + "z1": 5, + "z2": 3, + "z3": 1, + }, + partitionCounts: map[string]int{ + "z1": 9, + "z2": 9, + "z3": 9, + }, + result: map[string]map[string]int{ + "z1": {"z1": 9, "z3": 6}, + "z2": {"z2": 9}, + "z3": {"z3": 3}, + }, + }, + { + name: "no consumers in zone", + memberCounts: map[string]int{ + "z2": 10, + }, + partitionCounts: map[string]int{ + "z1": 20, + "z3": 19, + }, + result: map[string]map[string]int{ + "z2": {"z1": 20, "z3": 19}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + // create members per the distribution in the test case. + var members []GroupMember + for zone, count := range tt.memberCounts { + for i := 0; i < count; i++ { + members = append(members, GroupMember{ + ID: zone + ":" + strconv.Itoa(len(members)+1), + Topics: []string{"test"}, + UserData: []byte(zone), + }) + } + } + + // create partitions per the distribution in the test case. + var partitions []Partition + for zone, count := range tt.partitionCounts { + for i := 0; i < count; i++ { + partitions = append(partitions, Partition{ + ID: len(partitions), + Topic: "test", + Leader: brokers[zone], + }) + } + } + + res := b.AssignGroups(members, partitions) + + // verification #1...all members must be assigned and with the + // correct load. + minLoad := len(partitions) / len(members) + maxLoad := minLoad + if len(partitions)%len(members) != 0 { + maxLoad++ + } + for _, member := range members { + assignments, _ := res[member.ID]["test"] + if len(assignments) < minLoad || len(assignments) > maxLoad { + t.Errorf("expected between %d and %d partitions for member %s", minLoad, maxLoad, member.ID) + } + } + + // verification #2...all partitions are assigned, and the distribution + // per source zone matches. + partsPerZone := make(map[string]map[string]int) + uniqueParts := make(map[int]struct{}) + for id, topicToPartitions := range res { + + for topic, assignments := range topicToPartitions { + if topic != "test" { + t.Fatalf("wrong topic...expected test but got %s", topic) + } + + var member GroupMember + for _, m := range members { + if id == m.ID { + member = m + break + } + } + if member.ID == "" { + t.Fatal("empty member ID returned") + } + + var partition Partition + for _, id := range assignments { + + uniqueParts[id] = struct{}{} + + for _, p := range partitions { + if p.ID == int(id) { + partition = p + break + } + } + if partition.Topic == "" { + t.Fatal("empty topic ID returned") + } + counts, ok := partsPerZone[string(member.UserData)] + if !ok { + counts = make(map[string]int) + partsPerZone[string(member.UserData)] = counts + } + counts[partition.Leader.Rack]++ + } + } + } + + if len(partitions) != len(uniqueParts) { + t.Error("not all partitions were assigned") + } + if !reflect.DeepEqual(tt.result, partsPerZone) { + t.Errorf("wrong balanced zones. expected %v but got %v", tt.result, partsPerZone) + } + }) + } + }) + + t.Run("Multi Topic", func(t *testing.T) { + b := RackAffinityGroupBalancer{} + + brokers := map[string]Broker{ + "z1": {ID: 1, Rack: "z1"}, + "z2": {ID: 2, Rack: "z2"}, + "z3": {ID: 2, Rack: "z3"}, + "": {}, + } + + members := []GroupMember{ + { + ID: "z1", + Topics: []string{"topic1", "topic2"}, + UserData: []byte("z1"), + }, + { + ID: "z2", + Topics: []string{"topic2", "topic3"}, + UserData: []byte("z2"), + }, + { + ID: "z3", + Topics: []string{"topic3", "topic1"}, + UserData: []byte("z3"), + }, + } + + partitions := []Partition{ + { + ID: 1, + Topic: "topic1", + Leader: brokers["z1"], + }, + { + ID: 2, + Topic: "topic1", + Leader: brokers["z3"], + }, + { + ID: 1, + Topic: "topic2", + Leader: brokers["z1"], + }, + { + ID: 2, + Topic: "topic2", + Leader: brokers["z2"], + }, + { + ID: 1, + Topic: "topic3", + Leader: brokers["z3"], + }, + { + ID: 2, + Topic: "topic3", + Leader: brokers["z2"], + }, + } + + expected := GroupMemberAssignments{ + "z1": {"topic1": []int{1}, "topic2": []int{1}}, + "z2": {"topic2": []int{2}, "topic3": []int{2}}, + "z3": {"topic3": []int{1}, "topic1": []int{2}}, + } + + res := b.AssignGroups(members, partitions) + if !reflect.DeepEqual(expected, res) { + t.Fatalf("incorrect group assignment. expected %v but got %v", expected, res) + } + }) +}