Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added RackAffinityGroupBalancer strategy #361

Merged
merged 4 commits into from
Feb 17, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 274 additions & 1 deletion groupbalancer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
package kafka

import "sort"
import (
"encoding/json"
"io/ioutil"
"net/http"
"os"
"sort"
"strings"
"time"
)

// GroupMember describes a single participant in a consumer group.
type GroupMember struct {
Expand Down Expand Up @@ -133,6 +141,181 @@ func (r RoundRobinGroupBalancer) AssignGroups(members []GroupMember, topicPartit
return groupAssignments
}

// RackAffinityGroupBalancer makes a best effort to pair up consumers with
// partitions whose leader is in the same rack. This strategy can have
// performance benefits by minimizing round trip latency between the consumer
// and the broker. In environments where network traffic across racks incurs
// charges (such as cross AZ data transfer in AWS), this strategy is also a cost
// optimization measure because it keeps network traffic within the local rack
// where possible.
//
// The primary objective is to spread partitions evenly across consumers with a
// secondary focus on maximizing the number of partitions where the leader and
// the consumer are in the same zone. For best affinity, it's recommended to
// have a balanced spread of consumers and partition leaders across racks.
//
// This balancer requires Kafka version 0.10.0.0+ or later. Earlier versions do
// not return the brokers' racks in the metadata request.
type RackAffinityGroupBalancer struct {
// RackResolver returns the name of the rack where this consumer is running.
// The rack will be communicated to the consumer group leader via the
// UserData so that assignments can be made with affinity to the partition
// leader.
//
// If the zone cannot be determined, the resolver should return the empty
// string with no error.
//
// If RackResolver is left unset, then a default resolver will be used. The
// default strategy currently only supports Linux AWS deployments in EC2 or
// ECS and returns name of the availability zone (e.g. "us-west-2a"). It's
// assumed that the brokers' rack settings are also set to the AZ in which
// they are deployed.
RackResolver func() (string, error)
}

func (r *RackAffinityGroupBalancer) ProtocolName() string {
return "rack-affinity"
}

func (r *RackAffinityGroupBalancer) AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments {
membersByTopic := make(map[string][]GroupMember)
for _, m := range members {
for _, t := range m.Topics {
membersByTopic[t] = append(membersByTopic[t], m)
}
}

partitionsByTopic := make(map[string][]Partition)
for _, p := range partitions {
partitionsByTopic[p.Topic] = append(partitionsByTopic[p.Topic], p)
}

assignments := GroupMemberAssignments{}
for topic := range membersByTopic {
topicAssignments := r.assignTopic(membersByTopic[topic], partitionsByTopic[topic])
for member, parts := range topicAssignments {
memberAssignments, ok := assignments[member]
if !ok {
memberAssignments = make(map[string][]int)
assignments[member] = memberAssignments
}
memberAssignments[topic] = parts
}
}
return assignments
}

func (r *RackAffinityGroupBalancer) UserData() ([]byte, error) {
var rack string
if r.RackResolver == nil {
rack = findRack()
Copy link

@riking riking Oct 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider making findRack an exported function such as RackResolverAWS so that client code can explicitly call it from their own resolver.

It's also a good candidate for living in its own package, alongside other common cloud providers (just look at how many imports you dragged in with this one PR! Even though they're all stdlib.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback! On further reflection, I'm actually planning to pull out the AWS-specific code. If I left it in, then I think the suggestion to export RackResolverAWS is a good one. However, that got me to thinking that, from the perspective of exporting a clean and focused public API, AWS code has no place in kafka-go. That got me thinking even more, and I think that it's weird for the code to use the AWS logic quietly and by default.

That said, I can provide this code as an example and make it available to the community that way. Maybe someday someone will create an open source, multi-cloud library that can be plugged in as a rack resolver. Until then, I think it's better to keep the cloud provider code out of kafka-go. 😄

} else {
var err error
rack, err = r.RackResolver()
if err != nil {
return nil, err
}
}

if rack == "" {
rack = "unknown"
}

return []byte(rack), nil
}

func (r *RackAffinityGroupBalancer) assignTopic(members []GroupMember, partitions []Partition) map[string][]int {
zonedPartitions := make(map[string][]int)
for _, part := range partitions {
zone := part.Leader.Rack
zonedPartitions[zone] = append(zonedPartitions[zone], part.ID)
}

zonedConsumers := make(map[string][]string)
for _, member := range members {
zone := string(member.UserData)
zonedConsumers[zone] = append(zonedConsumers[zone], member.ID)
}

targetPerMember := len(partitions) / len(members)
remainder := len(partitions) % len(members)
assignments := make(map[string][]int)

// assign as many as possible in zone. this will assign up to partsPerMember
// to each consumer. it will also prefer to allocate remainder partitions
// in zone if possible.
for zone, parts := range zonedPartitions {
consumers := zonedConsumers[zone]
if len(consumers) == 0 {
continue
}

// don't over-allocate. cap partition assignments at the calculated
// target.
partsPerMember := len(parts) / len(consumers)
if partsPerMember > targetPerMember {
partsPerMember = targetPerMember
}

for _, consumer := range consumers {
assignments[consumer] = append(assignments[consumer], parts[:partsPerMember]...)
parts = parts[partsPerMember:]
}

// if we had enough partitions for each consumer in this zone to hit its
// target, attempt to use any leftover partitions to satisfy the total
// remainder by adding at most 1 partition per consumer.
leftover := len(parts)
if partsPerMember == targetPerMember {
if leftover > remainder {
leftover = remainder
}
if leftover > len(consumers) {
leftover = len(consumers)
}
remainder -= leftover
}

// this loop covers the case where we're assigning extra partitions or
// if there weren't enough to satisfy the targetPerMember and the zoned
// partitions didn't divide evenly.
for i := 0; i < leftover; i++ {
assignments[consumers[i]] = append(assignments[consumers[i]], parts[i])
}
parts = parts[leftover:]

if len(parts) == 0 {
delete(zonedPartitions, zone)
} else {
zonedPartitions[zone] = parts
}
}

// assign out remainders regardless of zone.
var remaining []int
for _, partitions := range zonedPartitions {
remaining = append(remaining, partitions...)
}

for _, member := range members {
assigned := assignments[member.ID]
delta := targetPerMember - len(assigned)
// if it were possible to assign the remainder in zone, it's been taken
// care of already. now we will portion out any remainder to a member
// that can take it.
if delta >= 0 && remainder > 0 {
delta++
remainder--
}
if delta > 0 {
assignments[member.ID] = append(assigned, remaining[:delta]...)
remaining = remaining[delta:]
}
}

return assignments
}

// findPartitions extracts the partition ids associated with the topic from the
// list of Partitions provided
func findPartitions(topic string, partitions []Partition) []int {
Expand Down Expand Up @@ -185,3 +368,93 @@ func findGroupBalancer(protocolName string, balancers []GroupBalancer) (GroupBal
}
return nil, false
}

// findRack is the default rack resolver strategy. It currently only supports
// * ECS with the task metadata endpoint enabled (returns the container
// instance's availability zone)
// * Linux EC2 (returns the instance's availability zone)
func findRack() string {
switch whereAmI() {
case "ecs":
return ecsAvailabilityZone()
case "ec2":
return ec2AvailabilityZone()
}
return ""
}

const ecsContainerMetadataURI = "ECS_CONTAINER_METADATA_URI"

// whereAmI determines which strategy the rack resolver should use.
func whereAmI() string {
// https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint.html
if os.Getenv(ecsContainerMetadataURI) != "" {
return "ecs"
}
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/identify_ec2_instances.html
for _, path := range [...]string{
"/sys/devices/virtual/dmi/id/product_uuid",
"/sys/hypervisor/uuid",
} {
b, err := ioutil.ReadFile(path)
if err != nil {
continue
}
s := string(b)
switch {
case strings.HasPrefix(s, "EC2"), strings.HasPrefix(s, "ec2"):
return "ec2"
}
}
return "somewhere"
}

// ecsAvailabilityZone queries the task endpoint for the metadata URI that ECS
// injects into the ECS_CONTAINER_METADATA_URI variable in order to retrieve
// the availability zone where the task is running.
func ecsAvailabilityZone() string {
client := http.Client{
Timeout: time.Second,
Transport: &http.Transport{
DisableCompression: true,
DisableKeepAlives: true,
},
}
r, err := client.Get(os.Getenv(ecsContainerMetadataURI) + "/task")
if err != nil {
return ""
}
defer r.Body.Close()

var md struct {
AvailabilityZone string
}
if err := json.NewDecoder(r.Body).Decode(&md); err != nil {
return ""
}
return md.AvailabilityZone
}

// ec2AvailabilityZone queries the metadata endpoint to discover the
// availability zone where this code is running. we avoid calling this function
// unless we know we're in EC2. Otherwise, in other environments, we would need
// to wait for the request to 169.254.169.254 to timeout before proceeding.
func ec2AvailabilityZone() string {
client := http.Client{
Timeout: time.Second,
Transport: &http.Transport{
DisableCompression: true,
DisableKeepAlives: true,
},
}
r, err := client.Get("http://169.254.169.254/latest/meta-data/placement/availability-zone")
if err != nil {
return ""
}
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return ""
}
return string(b)
}
Loading