Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do conntrack deletions in the background. #1498

Merged
merged 2 commits into from
Jul 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion routetable/route_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ type RouteTable struct {
ifaceNameToTargets map[string][]Target
pendingIfaceNameToTargets map[string][]Target

pendingConntrackCleanups map[ip.Addr]chan struct{}

inSync bool

// dataplane is our shim for the netlink/arp interface. In production, it maps directly
Expand Down Expand Up @@ -117,6 +119,7 @@ func NewWithShims(interfacePrefixes []string, ipVersion uint8, nl dataplaneIface
pendingIfaceNameToTargets: map[string][]Target{},
dirtyIfaces: set.New(),
dataplane: nl,
pendingConntrackCleanups: map[ip.Addr]chan struct{}{},
}
}

Expand Down Expand Up @@ -199,6 +202,8 @@ func (r *RouteTable) Apply() error {
return set.RemoveItem
})

r.cleanUpPendingConntrackDeletions()

if r.dirtyIfaces.Len() > 0 {
r.logCxt.Warn("Some interfaces still out-of sync.")
r.inSync = false
Expand Down Expand Up @@ -250,7 +255,7 @@ func (r *RouteTable) syncRoutesForLink(ifaceName string) error {
defer oldCIDRs.Iter(func(item interface{}) error {
// Remove and conntrack entries that should no longer be there.
dest := item.(ip.CIDR)
r.dataplane.RemoveConntrackFlows(dest.Version(), dest.Addr().AsNetIP())
r.startConntrackDeletion(dest.Addr())
return nil
})

Expand Down Expand Up @@ -323,6 +328,9 @@ func (r *RouteTable) syncRoutesForLink(ifaceName string) error {
Protocol: syscall.RTPROT_BOOT,
Scope: netlink.SCOPE_LINK,
}
// In case this IP is being re-used, wait for any previous conntrack entry
// to be cleaned up. (No-op if there are no pending deletes.)
r.waitForPendingConntrackDeletion(cidr.Addr())
if err := r.dataplane.RouteAdd(&route); err != nil {
logCxt.WithError(err).Warn("Failed to add route")
updatesFailed = true
Expand All @@ -347,6 +355,49 @@ func (r *RouteTable) syncRoutesForLink(ifaceName string) error {
return nil
}

// startConntrackDeletion starts the deletion of conntrack entries for the given CIDR in the background. Pending
// deletions are tracked in the pendingConntrackCleanups map so we can block waiting for them later.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a bit more context somewhere to record why we're making this change to move conntrack deletions into the background. I'd be happy with any of:

  • adding to the comment here
  • adding to the commit message
  • adding more to the PR comments - either inline explanation or a link to an issue where it's explained.

At the moment - unless I missed it - I don't think we have any of those.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added an issue and updated the comment.

//
// It's important to do the conntrack deletions in the background because scanning the conntrack
// table is very slow if there are a lot of entries. Previously, we did the deletion synchronously
// but that led to lengthy Apply() calls on the critical path.
func (r *RouteTable) startConntrackDeletion(ipAddr ip.Addr) {
log.WithField("ip", ipAddr).Debug("Starting goroutine to delete conntrack entries")
done := make(chan struct{})
r.pendingConntrackCleanups[ipAddr] = done
go func() {
defer close(done)
r.dataplane.RemoveConntrackFlows(r.ipVersion, ipAddr.AsNetIP())
log.WithField("ip", ipAddr).Debug("Deleted conntrack entries")
}()
}

// cleanUpPendingConntrackDeletions scans the pendingConntrackCleanups map for completed entries and removes them.
func (r *RouteTable) cleanUpPendingConntrackDeletions() {
for ipAddr, c := range r.pendingConntrackCleanups {
select {
case <-c:
log.WithField("ip", ipAddr).Debug(
"Background goroutine finished deleting conntrack entries")
delete(r.pendingConntrackCleanups, ipAddr)
default:
log.WithField("ip", ipAddr).Debug(
"Background goroutine yet to finish deleting conntrack entries")
continue
}
}
}

// waitForPendingConntrackDeletion waits for any pending conntrack deletions (if any) for the given IP to complete.
func (r *RouteTable) waitForPendingConntrackDeletion(ipAddr ip.Addr) {
if c := r.pendingConntrackCleanups[ipAddr]; c != nil {
log.WithField("ip", ipAddr).Info("Waiting for pending conntrack deletion to finish")
<-c
log.WithField("ip", ipAddr).Info("Done waiting for pending conntrack deletion to finish")
delete(r.pendingConntrackCleanups, ipAddr)
}
}

// filterErrorByIfaceState checks the current state of the interface; if it's down or gone, it
// returns IfaceDown or IfaceNotPresent, otherwise, it returns the given defaultErr.
func (r *RouteTable) filterErrorByIfaceState(ifaceName string, currentErr, defaultErr error) error {
Expand Down
63 changes: 59 additions & 4 deletions routetable/route_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@ import (
"errors"
"fmt"
"net"
"strings"
"sync"
"syscall"
"time"

log "github.com/Sirupsen/logrus"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vishvananda/netlink"

"github.com/projectcalico/felix/ifacemonitor"
"github.com/projectcalico/felix/ip"
"github.com/projectcalico/felix/set"
"github.com/projectcalico/felix/testutils"

"strings"

"github.com/projectcalico/felix/ifacemonitor"
)

var (
Expand Down Expand Up @@ -107,6 +107,44 @@ var _ = Describe("RouteTable", func() {
Expect(dataplane.routeKeyToRoute).To(ConsistOf(gatewayRoute))
Expect(dataplane.addedRouteKeys).To(BeEmpty())
})
It("should delete only our conntrack entries", func() {
rt.Apply()
Eventually(dataplane.GetDeletedConntrackEntries).Should(ConsistOf(
net.ParseIP("10.0.0.1").To4(),
net.ParseIP("10.0.0.3").To4(),
))
})

Describe("with a slow conntrack deletion", func() {
const delay = 300 * time.Millisecond
BeforeEach(func() {
dataplane.ConntrackSleep = delay
})
It("should block a route add until conntrack finished", func() {
// Initial apply starts a background thread to delete
// 10.0.0.1 and 10.0.0.3.
rt.Apply()
// We try to add 10.0.0.1 back in.
rt.SetRoutes("cali1", []Target{
{CIDR: ip.MustParseCIDR("10.0.0.1/32"), DestMAC: mac1},
})
start := time.Now()
rt.Apply()
Expect(time.Since(start)).To(BeNumerically(">=", delay))
})
It("should not block an unrelated route add ", func() {
// Initial apply starts a background thread to delete
// 10.0.0.1 and 10.0.0.3.
rt.Apply()
// We try to add 10.0.0.10, which hasn't been seen before.
rt.SetRoutes("cali1", []Target{
{CIDR: ip.MustParseCIDR("10.0.0.10/32"), DestMAC: mac1},
})
start := time.Now()
rt.Apply()
Expect(time.Since(start)).To(BeNumerically("<", delay/2))
})
})

// We do the following tests in different failure (and non-failure) scenarios. In
// each case, we make the failure transient so that only the first Apply() should
Expand Down Expand Up @@ -379,6 +417,10 @@ type mockDataplane struct {
deletedRouteKeys set.Set

failuresToSimulate failFlags

mutex sync.Mutex
deletedConntrackEntries []net.IP
ConntrackSleep time.Duration
}

func (d *mockDataplane) addIface(idx int, name string, up bool, running bool) *mockLink {
Expand Down Expand Up @@ -506,7 +548,20 @@ func (d *mockDataplane) RemoveConntrackFlows(ipVersion uint8, ipAddr net.IP) {
log.WithFields(log.Fields{
"ipVersion": ipVersion,
"ipAddr": ipAddr,
"sleepTime": d.ConntrackSleep,
}).Info("Mock dataplane: Removing conntrack flows")
d.mutex.Lock()
d.deletedConntrackEntries = append(d.deletedConntrackEntries, ipAddr)
d.mutex.Unlock()
time.Sleep(d.ConntrackSleep)
}

func (d *mockDataplane) GetDeletedConntrackEntries() []net.IP {
d.mutex.Lock()
defer d.mutex.Unlock()
cpy := make([]net.IP, len(d.deletedConntrackEntries))
copy(cpy, d.deletedConntrackEntries)
return cpy
}

func keyForRoute(route *netlink.Route) string {
Expand Down