Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added RackAffinityGroupBalancer strategy #361

Merged
merged 4 commits into from
Feb 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions example_groupbalancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package kafka

import (
"context"
"encoding/json"
"io/ioutil"
"net/http"
"os"
"strings"
"time"
)

// ExampleAWSRackLocal shows how the RackAffinityGroupBalancer can be used to
// pair up consumers with brokers in the same AWS availability zone. This code
// assumes that each brokers' rack is configured to be the name of the AZ in
// which it is running.
func ExampleAWSRackLocal() {
r := NewReader(ReaderConfig{
Brokers: []string{"kafka:9092"},
GroupID: "my-group",
Topic: "my-topic",
GroupBalancers: []GroupBalancer{
RackAffinityGroupBalancer{Rack: findRack()},
RangeGroupBalancer{},
},
})

r.ReadMessage(context.Background())

r.Close()
}

// findRack is the basic rack resolver strategy for use in AWS. It supports
// * ECS with the task metadata endpoint enabled (returns the container
// instance's availability zone)
// * Linux EC2 (returns the instance's availability zone)
func findRack() string {
switch whereAmI() {
case "ecs":
return ecsAvailabilityZone()
case "ec2":
return ec2AvailabilityZone()
}
return ""
}

const ecsContainerMetadataURI = "ECS_CONTAINER_METADATA_URI"

// whereAmI determines which strategy the rack resolver should use.
func whereAmI() string {
// https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint.html
if os.Getenv(ecsContainerMetadataURI) != "" {
return "ecs"
}
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/identify_ec2_instances.html
for _, path := range [...]string{
"/sys/devices/virtual/dmi/id/product_uuid",
"/sys/hypervisor/uuid",
} {
b, err := ioutil.ReadFile(path)
if err != nil {
continue
}
s := string(b)
switch {
case strings.HasPrefix(s, "EC2"), strings.HasPrefix(s, "ec2"):
return "ec2"
}
}
return "somewhere"
}

// ecsAvailabilityZone queries the task endpoint for the metadata URI that ECS
// injects into the ECS_CONTAINER_METADATA_URI variable in order to retrieve
// the availability zone where the task is running.
func ecsAvailabilityZone() string {
client := http.Client{
Timeout: time.Second,
Transport: &http.Transport{
DisableCompression: true,
DisableKeepAlives: true,
},
}
r, err := client.Get(os.Getenv(ecsContainerMetadataURI) + "/task")
if err != nil {
return ""
}
defer r.Body.Close()

var md struct {
AvailabilityZone string
}
if err := json.NewDecoder(r.Body).Decode(&md); err != nil {
return ""
}
return md.AvailabilityZone
}

// ec2AvailabilityZone queries the metadata endpoint to discover the
// availability zone where this code is running. we avoid calling this function
// unless we know we're in EC2. Otherwise, in other environments, we would need
// to wait for the request to 169.254.169.254 to timeout before proceeding.
func ec2AvailabilityZone() string {
client := http.Client{
Timeout: time.Second,
Transport: &http.Transport{
DisableCompression: true,
DisableKeepAlives: true,
},
}
r, err := client.Get("http://169.254.169.254/latest/meta-data/placement/availability-zone")
if err != nil {
return ""
}
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return ""
}
return string(b)
}
154 changes: 153 additions & 1 deletion groupbalancer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package kafka

import "sort"
import (
"sort"
)

// GroupMember describes a single participant in a consumer group.
type GroupMember struct {
Expand Down Expand Up @@ -133,6 +135,156 @@ func (r RoundRobinGroupBalancer) AssignGroups(members []GroupMember, topicPartit
return groupAssignments
}

// RackAffinityGroupBalancer makes a best effort to pair up consumers with
// partitions whose leader is in the same rack. This strategy can have
// performance benefits by minimizing round trip latency between the consumer
// and the broker. In environments where network traffic across racks incurs
// charges (such as cross AZ data transfer in AWS), this strategy is also a cost
// optimization measure because it keeps network traffic within the local rack
// where possible.
//
// The primary objective is to spread partitions evenly across consumers with a
// secondary focus on maximizing the number of partitions where the leader and
// the consumer are in the same rack. For best affinity, it's recommended to
// have a balanced spread of consumers and partition leaders across racks.
//
// This balancer requires Kafka version 0.10.0.0+ or later. Earlier versions do
// not return the brokers' racks in the metadata request.
type RackAffinityGroupBalancer struct {
// Rack is the name of the rack where this consumer is running. It will be
// communicated to the consumer group leader via the UserData so that
// assignments can be made with affinity to the partition leader.
Rack string
}

func (r RackAffinityGroupBalancer) ProtocolName() string {
return "rack-affinity"
}

func (r RackAffinityGroupBalancer) AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments {
membersByTopic := make(map[string][]GroupMember)
for _, m := range members {
for _, t := range m.Topics {
membersByTopic[t] = append(membersByTopic[t], m)
}
}

partitionsByTopic := make(map[string][]Partition)
for _, p := range partitions {
partitionsByTopic[p.Topic] = append(partitionsByTopic[p.Topic], p)
}

assignments := GroupMemberAssignments{}
for topic := range membersByTopic {
topicAssignments := r.assignTopic(membersByTopic[topic], partitionsByTopic[topic])
for member, parts := range topicAssignments {
memberAssignments, ok := assignments[member]
if !ok {
memberAssignments = make(map[string][]int)
assignments[member] = memberAssignments
}
memberAssignments[topic] = parts
}
}
return assignments
}

func (r RackAffinityGroupBalancer) UserData() ([]byte, error) {
return []byte(r.Rack), nil
}

func (r *RackAffinityGroupBalancer) assignTopic(members []GroupMember, partitions []Partition) map[string][]int {
zonedPartitions := make(map[string][]int)
for _, part := range partitions {
zone := part.Leader.Rack
zonedPartitions[zone] = append(zonedPartitions[zone], part.ID)
}

zonedConsumers := make(map[string][]string)
for _, member := range members {
zone := string(member.UserData)
zonedConsumers[zone] = append(zonedConsumers[zone], member.ID)
}

targetPerMember := len(partitions) / len(members)
remainder := len(partitions) % len(members)
assignments := make(map[string][]int)

// assign as many as possible in zone. this will assign up to partsPerMember
// to each consumer. it will also prefer to allocate remainder partitions
// in zone if possible.
for zone, parts := range zonedPartitions {
consumers := zonedConsumers[zone]
if len(consumers) == 0 {
continue
}

// don't over-allocate. cap partition assignments at the calculated
// target.
partsPerMember := len(parts) / len(consumers)
if partsPerMember > targetPerMember {
partsPerMember = targetPerMember
}

for _, consumer := range consumers {
assignments[consumer] = append(assignments[consumer], parts[:partsPerMember]...)
parts = parts[partsPerMember:]
}

// if we had enough partitions for each consumer in this zone to hit its
// target, attempt to use any leftover partitions to satisfy the total
// remainder by adding at most 1 partition per consumer.
leftover := len(parts)
if partsPerMember == targetPerMember {
if leftover > remainder {
leftover = remainder
}
if leftover > len(consumers) {
leftover = len(consumers)
}
remainder -= leftover
}

// this loop covers the case where we're assigning extra partitions or
// if there weren't enough to satisfy the targetPerMember and the zoned
// partitions didn't divide evenly.
for i := 0; i < leftover; i++ {
assignments[consumers[i]] = append(assignments[consumers[i]], parts[i])
}
parts = parts[leftover:]

if len(parts) == 0 {
delete(zonedPartitions, zone)
} else {
zonedPartitions[zone] = parts
}
}

// assign out remainders regardless of zone.
var remaining []int
for _, partitions := range zonedPartitions {
remaining = append(remaining, partitions...)
}

for _, member := range members {
assigned := assignments[member.ID]
delta := targetPerMember - len(assigned)
// if it were possible to assign the remainder in zone, it's been taken
// care of already. now we will portion out any remainder to a member
// that can take it.
if delta >= 0 && remainder > 0 {
delta++
remainder--
}
if delta > 0 {
assignments[member.ID] = append(assigned, remaining[:delta]...)
remaining = remaining[delta:]
}
}

return assignments
}

// findPartitions extracts the partition ids associated with the topic from the
// list of Partitions provided
func findPartitions(topic string, partitions []Partition) []int {
Expand Down
Loading