Skip to content

Commit

Permalink
Added RackAffinityGroupBalancer strategy (segmentio#361)
Browse files Browse the repository at this point in the history
This strategy attempts to optimize round trip transfer time between
brokers and consumers in addition to minimizing inter-zone data
transfer costs in cloud environments.
  • Loading branch information
Steve van Loben Sels authored Feb 17, 2020
1 parent 16d85b1 commit 55e867e
Show file tree
Hide file tree
Showing 3 changed files with 587 additions and 1 deletion.
121 changes: 121 additions & 0 deletions example_groupbalancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package kafka

import (
"context"
"encoding/json"
"io/ioutil"
"net/http"
"os"
"strings"
"time"
)

// ExampleAWSRackLocal shows how the RackAffinityGroupBalancer can be used to
// pair up consumers with brokers in the same AWS availability zone. This code
// assumes that each brokers' rack is configured to be the name of the AZ in
// which it is running.
func ExampleAWSRackLocal() {
r := NewReader(ReaderConfig{
Brokers: []string{"kafka:9092"},
GroupID: "my-group",
Topic: "my-topic",
GroupBalancers: []GroupBalancer{
RackAffinityGroupBalancer{Rack: findRack()},
RangeGroupBalancer{},
},
})

r.ReadMessage(context.Background())

r.Close()
}

// findRack is the basic rack resolver strategy for use in AWS. It supports
// * ECS with the task metadata endpoint enabled (returns the container
// instance's availability zone)
// * Linux EC2 (returns the instance's availability zone)
func findRack() string {
switch whereAmI() {
case "ecs":
return ecsAvailabilityZone()
case "ec2":
return ec2AvailabilityZone()
}
return ""
}

const ecsContainerMetadataURI = "ECS_CONTAINER_METADATA_URI"

// whereAmI determines which strategy the rack resolver should use.
func whereAmI() string {
// https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint.html
if os.Getenv(ecsContainerMetadataURI) != "" {
return "ecs"
}
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/identify_ec2_instances.html
for _, path := range [...]string{
"/sys/devices/virtual/dmi/id/product_uuid",
"/sys/hypervisor/uuid",
} {
b, err := ioutil.ReadFile(path)
if err != nil {
continue
}
s := string(b)
switch {
case strings.HasPrefix(s, "EC2"), strings.HasPrefix(s, "ec2"):
return "ec2"
}
}
return "somewhere"
}

// ecsAvailabilityZone queries the task endpoint for the metadata URI that ECS
// injects into the ECS_CONTAINER_METADATA_URI variable in order to retrieve
// the availability zone where the task is running.
func ecsAvailabilityZone() string {
client := http.Client{
Timeout: time.Second,
Transport: &http.Transport{
DisableCompression: true,
DisableKeepAlives: true,
},
}
r, err := client.Get(os.Getenv(ecsContainerMetadataURI) + "/task")
if err != nil {
return ""
}
defer r.Body.Close()

var md struct {
AvailabilityZone string
}
if err := json.NewDecoder(r.Body).Decode(&md); err != nil {
return ""
}
return md.AvailabilityZone
}

// ec2AvailabilityZone queries the metadata endpoint to discover the
// availability zone where this code is running. we avoid calling this function
// unless we know we're in EC2. Otherwise, in other environments, we would need
// to wait for the request to 169.254.169.254 to timeout before proceeding.
func ec2AvailabilityZone() string {
client := http.Client{
Timeout: time.Second,
Transport: &http.Transport{
DisableCompression: true,
DisableKeepAlives: true,
},
}
r, err := client.Get("http://169.254.169.254/latest/meta-data/placement/availability-zone")
if err != nil {
return ""
}
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return ""
}
return string(b)
}
154 changes: 153 additions & 1 deletion groupbalancer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package kafka

import "sort"
import (
"sort"
)

// GroupMember describes a single participant in a consumer group.
type GroupMember struct {
Expand Down Expand Up @@ -133,6 +135,156 @@ func (r RoundRobinGroupBalancer) AssignGroups(members []GroupMember, topicPartit
return groupAssignments
}

// RackAffinityGroupBalancer makes a best effort to pair up consumers with
// partitions whose leader is in the same rack. This strategy can have
// performance benefits by minimizing round trip latency between the consumer
// and the broker. In environments where network traffic across racks incurs
// charges (such as cross AZ data transfer in AWS), this strategy is also a cost
// optimization measure because it keeps network traffic within the local rack
// where possible.
//
// The primary objective is to spread partitions evenly across consumers with a
// secondary focus on maximizing the number of partitions where the leader and
// the consumer are in the same rack. For best affinity, it's recommended to
// have a balanced spread of consumers and partition leaders across racks.
//
// This balancer requires Kafka version 0.10.0.0+ or later. Earlier versions do
// not return the brokers' racks in the metadata request.
type RackAffinityGroupBalancer struct {
// Rack is the name of the rack where this consumer is running. It will be
// communicated to the consumer group leader via the UserData so that
// assignments can be made with affinity to the partition leader.
Rack string
}

func (r RackAffinityGroupBalancer) ProtocolName() string {
return "rack-affinity"
}

func (r RackAffinityGroupBalancer) AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments {
membersByTopic := make(map[string][]GroupMember)
for _, m := range members {
for _, t := range m.Topics {
membersByTopic[t] = append(membersByTopic[t], m)
}
}

partitionsByTopic := make(map[string][]Partition)
for _, p := range partitions {
partitionsByTopic[p.Topic] = append(partitionsByTopic[p.Topic], p)
}

assignments := GroupMemberAssignments{}
for topic := range membersByTopic {
topicAssignments := r.assignTopic(membersByTopic[topic], partitionsByTopic[topic])
for member, parts := range topicAssignments {
memberAssignments, ok := assignments[member]
if !ok {
memberAssignments = make(map[string][]int)
assignments[member] = memberAssignments
}
memberAssignments[topic] = parts
}
}
return assignments
}

func (r RackAffinityGroupBalancer) UserData() ([]byte, error) {
return []byte(r.Rack), nil
}

func (r *RackAffinityGroupBalancer) assignTopic(members []GroupMember, partitions []Partition) map[string][]int {
zonedPartitions := make(map[string][]int)
for _, part := range partitions {
zone := part.Leader.Rack
zonedPartitions[zone] = append(zonedPartitions[zone], part.ID)
}

zonedConsumers := make(map[string][]string)
for _, member := range members {
zone := string(member.UserData)
zonedConsumers[zone] = append(zonedConsumers[zone], member.ID)
}

targetPerMember := len(partitions) / len(members)
remainder := len(partitions) % len(members)
assignments := make(map[string][]int)

// assign as many as possible in zone. this will assign up to partsPerMember
// to each consumer. it will also prefer to allocate remainder partitions
// in zone if possible.
for zone, parts := range zonedPartitions {
consumers := zonedConsumers[zone]
if len(consumers) == 0 {
continue
}

// don't over-allocate. cap partition assignments at the calculated
// target.
partsPerMember := len(parts) / len(consumers)
if partsPerMember > targetPerMember {
partsPerMember = targetPerMember
}

for _, consumer := range consumers {
assignments[consumer] = append(assignments[consumer], parts[:partsPerMember]...)
parts = parts[partsPerMember:]
}

// if we had enough partitions for each consumer in this zone to hit its
// target, attempt to use any leftover partitions to satisfy the total
// remainder by adding at most 1 partition per consumer.
leftover := len(parts)
if partsPerMember == targetPerMember {
if leftover > remainder {
leftover = remainder
}
if leftover > len(consumers) {
leftover = len(consumers)
}
remainder -= leftover
}

// this loop covers the case where we're assigning extra partitions or
// if there weren't enough to satisfy the targetPerMember and the zoned
// partitions didn't divide evenly.
for i := 0; i < leftover; i++ {
assignments[consumers[i]] = append(assignments[consumers[i]], parts[i])
}
parts = parts[leftover:]

if len(parts) == 0 {
delete(zonedPartitions, zone)
} else {
zonedPartitions[zone] = parts
}
}

// assign out remainders regardless of zone.
var remaining []int
for _, partitions := range zonedPartitions {
remaining = append(remaining, partitions...)
}

for _, member := range members {
assigned := assignments[member.ID]
delta := targetPerMember - len(assigned)
// if it were possible to assign the remainder in zone, it's been taken
// care of already. now we will portion out any remainder to a member
// that can take it.
if delta >= 0 && remainder > 0 {
delta++
remainder--
}
if delta > 0 {
assignments[member.ID] = append(assigned, remaining[:delta]...)
remaining = remaining[delta:]
}
}

return assignments
}

// findPartitions extracts the partition ids associated with the topic from the
// list of Partitions provided
func findPartitions(topic string, partitions []Partition) []int {
Expand Down
Loading

0 comments on commit 55e867e

Please sign in to comment.