Skip to content

Commit

Permalink
Delay port redirect until packet reaches container
Browse files Browse the repository at this point in the history
With port redirect in the ingress path happening before ipvs in the
ingess sandbox, there is a chance of 5-tuple collision in the ipvs
connection table for two entirely different services have different
PublishedPorts but the same TargetPort. To disambiguate the ipvs
connection table, delay the port redirect from PublishedPort to
TargetPort until after the loadbalancing has happened in ipvs. To be
specific, perform the redirect after the packet enters the real backend
container namespace.

Signed-off-by: Jana Radhakrishnan <[email protected]>
  • Loading branch information
mrjana committed Sep 21, 2016
1 parent ab555e2 commit d1f0bfe
Showing 1 changed file with 93 additions and 35 deletions.
128 changes: 93 additions & 35 deletions service_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

func init() {
reexec.Register("fwmarker", fwMarker)
reexec.Register("redirecter", redirecter)
}

func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service {
Expand Down Expand Up @@ -275,6 +276,12 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
n := ep.getNetwork()
eIP := ep.Iface().Address()

if n.ingress {
if err := addRedirectRules(sb.Key(), eIP, ep.ingressPorts); err != nil {
logrus.Errorf("Failed to add redirect rules for ep %s: %v", ep.Name(), err)
}
}

if sb.ingress {
// For the ingress sandbox if this is not gateway
// endpoint do nothing.
Expand Down Expand Up @@ -390,7 +397,7 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P
}

logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, ingressPorts)
if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, filteredPorts, eIP, false); err != nil {
if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, false); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
return
}
Expand Down Expand Up @@ -461,7 +468,7 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
}
}

if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, filteredPorts, eIP, true); err != nil {
if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, true); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
}
}
Expand Down Expand Up @@ -755,11 +762,8 @@ func readPortsFromFile(fileName string) ([]*PortConfig, error) {

// Invoke fwmarker reexec routine to mark vip destined packets with
// the passed firewall mark.
func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, filteredPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error {
var (
ingressPortsFile string
filteredPortsFile string
)
func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error {
var ingressPortsFile string

if len(ingressPorts) != 0 {
var err error
Expand All @@ -769,22 +773,14 @@ func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*Port
}
}

if len(filteredPorts) != 0 {
var err error
filteredPortsFile, err = writePortsToFile(filteredPorts)
if err != nil {
return err
}
}

addDelOpt := "-A"
if isDelete {
addDelOpt = "-D"
}

cmd := &exec.Cmd{
Path: reexec.Self(),
Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, filteredPortsFile, eIP.String()),
Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.String()),
Stdout: os.Stdout,
Stderr: os.Stderr,
}
Expand All @@ -801,13 +797,12 @@ func fwMarker() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()

if len(os.Args) < 8 {
if len(os.Args) < 7 {
logrus.Error("invalid number of arguments..")
os.Exit(1)
}

var ingressPorts []*PortConfig
var filteredPorts []*PortConfig
if os.Args[5] != "" {
var err error
ingressPorts, err = readPortsFromFile(os.Args[5])
Expand All @@ -817,15 +812,6 @@ func fwMarker() {
}
}

if os.Args[6] != "" {
var err error
filteredPorts, err = readPortsFromFile(os.Args[6])
if err != nil {
logrus.Errorf("Failed reading filtered ports file: %v", err)
os.Exit(7)
}
}

vip := os.Args[2]
fwMark, err := strconv.ParseUint(os.Args[3], 10, 32)
if err != nil {
Expand All @@ -835,12 +821,6 @@ func fwMarker() {
addDelOpt := os.Args[4]

rules := [][]string{}
for _, iPort := range filteredPorts {
rule := strings.Fields(fmt.Sprintf("-t nat %s PREROUTING -p %s --dport %d -j REDIRECT --to-port %d",
addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort))
rules = append(rules, rule)
}

for _, iPort := range ingressPorts {
rule := strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d",
addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, fwMark))
Expand All @@ -860,9 +840,9 @@ func fwMarker() {
}

if addDelOpt == "-A" {
eIP, subnet, err := net.ParseCIDR(os.Args[7])
eIP, subnet, err := net.ParseCIDR(os.Args[6])
if err != nil {
logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[7], err)
logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[6], err)
os.Exit(9)
}

Expand All @@ -889,3 +869,81 @@ func fwMarker() {
}
}
}

func addRedirectRules(path string, eIP *net.IPNet, ingressPorts []*PortConfig) error {
var ingressPortsFile string

if len(ingressPorts) != 0 {
var err error
ingressPortsFile, err = writePortsToFile(ingressPorts)
if err != nil {
return err
}
}

cmd := &exec.Cmd{
Path: reexec.Self(),
Args: append([]string{"redirecter"}, path, eIP.String(), ingressPortsFile),
Stdout: os.Stdout,
Stderr: os.Stderr,
}

if err := cmd.Run(); err != nil {
return fmt.Errorf("reexec failed: %v", err)
}

return nil
}

// Redirecter reexec function.
func redirecter() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()

if len(os.Args) < 4 {
logrus.Error("invalid number of arguments..")
os.Exit(1)
}

var ingressPorts []*PortConfig
if os.Args[3] != "" {
var err error
ingressPorts, err = readPortsFromFile(os.Args[3])
if err != nil {
logrus.Errorf("Failed reading ingress ports file: %v", err)
os.Exit(2)
}
}

eIP, _, err := net.ParseCIDR(os.Args[2])
if err != nil {
logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[2], err)
os.Exit(3)
}

rules := [][]string{}
for _, iPort := range ingressPorts {
rule := strings.Fields(fmt.Sprintf("-t nat -A PREROUTING -d %s -p %s --dport %d -j REDIRECT --to-port %d",
eIP.String(), strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort))
rules = append(rules, rule)
}

ns, err := netns.GetFromPath(os.Args[1])
if err != nil {
logrus.Errorf("failed get network namespace %q: %v", os.Args[1], err)
os.Exit(4)
}
defer ns.Close()

if err := netns.Set(ns); err != nil {
logrus.Errorf("setting into container net ns %v failed, %v", os.Args[1], err)
os.Exit(5)
}

for _, rule := range rules {
if err := iptables.RawCombinedOutputNative(rule...); err != nil {
logrus.Errorf("setting up rule failed, %v: %v", rule, err)
os.Exit(5)
}
}
}

0 comments on commit d1f0bfe

Please sign in to comment.