Skip to content

Commit

Permalink
Merge pull request #649 from robbrockbank/kdd-calico-wep
Browse files Browse the repository at this point in the history
Redfine when a Pod is a valid Calico Workload Endpoint
  • Loading branch information
robbrockbank authored Nov 3, 2017
2 parents 88a4db1 + a2e2878 commit fd037e8
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 29 deletions.
26 changes: 18 additions & 8 deletions lib/backend/k8s/conversion/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,22 +103,32 @@ func (c Converter) NamespaceToProfile(ns *kapiv1.Namespace) (*model.KVPair, erro
return &kvp, nil
}

// IsReadyCalicoPod returns true if the pod should be shown as a workloadEndpoint
// in the Calico API and false otherwise.
func (c Converter) IsReadyCalicoPod(pod *kapiv1.Pod) bool {
// IsValidCalicoWorkloadEndpoint returns true if the pod should be shown as a workloadEndpoint
// in the Calico API and false otherwise. A Pod suitable for Calico should not be host
// networked and should have been scheduled to a Node.
func (c Converter) IsValidCalicoWorkloadEndpoint(pod *kapiv1.Pod) bool {
if c.IsHostNetworked(pod) {
log.WithField("pod", pod.Name).Debug("Pod is host networked.")
return false
} else if !c.HasIPAddress(pod) {
log.WithField("pod", pod.Name).Debug("Pod does not have an IP address.")
return false
} else if !c.IsScheduled(pod) {
log.WithField("pod", pod.Name).Debug("Pod is not scheduled.")
return false
}
return true
}

// IsReadyCalicoPod returns true if the pod is a valid Calico WorkloadEndpoint and has
// an IP address assigned (i.e. it's ready for Calico networking).
func (c Converter) IsReadyCalicoPod(pod *kapiv1.Pod) bool {
if !c.IsValidCalicoWorkloadEndpoint(pod) {
return false
} else if !c.HasIPAddress(pod) {
log.WithField("pod", pod.Name).Debug("Pod does not have an IP address.")
return false
}
return true
}

func (c Converter) IsScheduled(pod *kapiv1.Pod) bool {
return pod.Spec.NodeName != ""
}
Expand Down Expand Up @@ -149,8 +159,8 @@ func (c Converter) PodToWorkloadEndpoint(pod *kapiv1.Pod) (*model.KVPair, error)
return nil, err
}

// We do, in some circumstances, want to parse Pods without an IP address. For example,
// a DELETE update will not include an IP.
// An IP address may not yet be assigned (or may have been removed for a Pod deletion), so
// handle a missing IP gracefully.
ipNets := []string{}
if c.HasIPAddress(pod) {
_, ipNet, err := cnet.ParseCIDROrIP(pod.Status.PodIP)
Expand Down
20 changes: 12 additions & 8 deletions lib/backend/k8s/resources/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (crw *k8sWatcherConverter) processK8sEvents() {
case event := <-crw.k8sWatch.ResultChan():
e := crw.convertEvent(event)
select {
case crw.resultChan <- e:
case crw.resultChan <- *e:
crw.logCxt.Debug("Kubernetes event converted and sent to backend watcher")

// If this is an error event, check to see if it's a terminating one (the
Expand All @@ -118,48 +118,52 @@ func (crw *k8sWatcherConverter) processK8sEvents() {

// convertEvent converts a Kubernetes Watch event into the equivalent Calico backend
// client watch event.
func (crw *k8sWatcherConverter) convertEvent(kevent kwatch.Event) api.WatchEvent {
func (crw *k8sWatcherConverter) convertEvent(kevent kwatch.Event) *api.WatchEvent {
var kvp *model.KVPair
var err error
if kevent.Type != kwatch.Error && kevent.Type != "" {
k8sRes := kevent.Object.(Resource)
kvp, err = crw.converter(k8sRes)
if err != nil {
crw.logCxt.WithError(err).Warning("Error converting Kubernetes resource to Calico resource")
return api.WatchEvent{
return &api.WatchEvent{
Type: api.WatchError,
Error: err,
}
}
if kvp == nil {
crw.logCxt.Debug("Event converted to a no-op")
return nil
}
}

switch kevent.Type {
case kwatch.Error, "":
// An error directly from the k8s watcher is a terminating event.
return api.WatchEvent{
return &api.WatchEvent{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{
Err: fmt.Errorf("terminating error event from Kubernetes watcher: %v", kevent.Object),
},
}
case kwatch.Deleted:
return api.WatchEvent{
return &api.WatchEvent{
Type: api.WatchDeleted,
Old: kvp,
}
case kwatch.Added:
return api.WatchEvent{
return &api.WatchEvent{
Type: api.WatchAdded,
New: kvp,
}
case kwatch.Modified:
// In KDD we don't have access to the previous settings, so just set the current settings.
return api.WatchEvent{
return &api.WatchEvent{
Type: api.WatchModified,
New: kvp,
}
default:
return api.WatchEvent{
return &api.WatchEvent{
Type: api.WatchError,
Error: fmt.Errorf("unhandled Kubernetes watcher event type: %v", kevent.Type),
}
Expand Down
11 changes: 8 additions & 3 deletions lib/backend/k8s/resources/workloadendpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (c *WorkloadEndpointClient) Get(ctx context.Context, key model.Key, revisio
}

// Decide if this pod should be displayed.
if !c.converter.IsReadyCalicoPod(pod) {
if !c.converter.IsValidCalicoWorkloadEndpoint(pod) {
return nil, cerrors.ErrorResourceDoesNotExist{Identifier: k}
}
return c.converter.PodToWorkloadEndpoint(pod)
Expand Down Expand Up @@ -179,8 +179,8 @@ func (c *WorkloadEndpointClient) List(ctx context.Context, list model.ListInterf
// For each Pod, return a workload endpoint.
ret := []*model.KVPair{}
for _, pod := range pods.Items {
// Decide if this pod should be displayed.
if !c.converter.IsReadyCalicoPod(&pod) {
// Decide if this pod should be included.
if !c.converter.IsValidCalicoWorkloadEndpoint(&pod) {
continue
}

Expand Down Expand Up @@ -215,6 +215,11 @@ func (c *WorkloadEndpointClient) Watch(ctx context.Context, list model.ListInter
if !ok {
return nil, errors.New("Pod conversion with incorrect k8s resource type")
}
if !c.converter.IsValidCalicoWorkloadEndpoint(k8sPod) {
// If this is not a valid Calico workload endpoint then don't return in the watch.
// Returning a nil KVP and a nil error swallows the event.
return nil, nil
}
return c.converter.PodToWorkloadEndpoint(k8sPod)
}
return newK8sWatcherConverter(ctx, "Pod", converter, k8sWatch), nil
Expand Down
19 changes: 13 additions & 6 deletions lib/backend/syncersv1/updateprocessors/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ import (
// resource key. The simple update processor is for converting objects that can create
// the v1 key using only the information available in the resource key.

// Function signature to convert a v2 model.ResourceKey to a v1 model.Key type
type ConvertV2ToV1Key func(v2Key model.ResourceKey) (model.Key, error)

// Function to convert a v2 resource to the v1 value. The converter may filter out
// results by returning nil. The generic watchersyncer will handle filtered out events
// by either sending no event or sending delete events depending on whether the entry
// is currently in the cache.
type ConvertV2ToV1Value func(interface{}) (interface{}, error)

func NewSimpleUpdateProcessor(v2Kind string, kConverter ConvertV2ToV1Key, vConverter ConvertV2ToV1Value) watchersyncer.SyncerUpdateProcessor {
Expand Down Expand Up @@ -74,16 +80,17 @@ func (sup *simpleUpdateProcessor) Process(kvp *model.KVPair) ([]*model.KVPair, e
if err != nil {
// Currently treat any values that fail to convert properly as a deletion event.
log.WithField("Resource", kvp.Key).Warn("Unable to process resource data - treating as deleted")
return []*model.KVPair{
&model.KVPair{
Key: v1key,
},
}, nil
return []*model.KVPair{{Key: v1key}}, nil
}
if v1value == nil {
// If the value is filtered out by the processor, treat as a delete.
log.WithField("Resource", kvp.Key).Debug("Filtering out resource - treating as deleted")
return []*model.KVPair{{Key: v1key}}, nil
}
}

return []*model.KVPair{
&model.KVPair{
{
Key: v1key,
Value: v1value,
Revision: kvp.Revision,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"net"
"strings"

log "github.com/sirupsen/logrus"

apiv2 "github.com/projectcalico/libcalico-go/lib/apis/v2"
"github.com/projectcalico/libcalico-go/lib/backend/k8s/conversion"
"github.com/projectcalico/libcalico-go/lib/backend/model"
Expand Down Expand Up @@ -52,6 +54,16 @@ func convertWorkloadEndpointV2ToV1Value(val interface{}) (interface{}, error) {
if !ok {
return nil, errors.New("Value is not a valid WorkloadEndpoint resource value")
}

// If the WEP has no IPNetworks assigned then filter out since we can't yet render the rules.
if len(v2res.Spec.IPNetworks) == 0 {
log.WithFields(log.Fields{
"name": v2res.Name,
"namespace": v2res.Namespace,
}).Debug("Filtering out WEP with no IPNetworks")
return nil, nil
}

var ipv4Nets []cnet.IPNet
var ipv6Nets []cnet.IPNet
for _, ipnString := range v2res.Spec.IPNetworks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ var _ = Describe("Test the WorkloadEndpoint update processor", func() {
res.Spec.Workload = wid1
res.Spec.Endpoint = eid1
res.Spec.InterfaceName = iface1
res.Spec.IPNetworks = []string{"10.100.10.1"}

kvps, err := up.Process(&model.KVPair{
Key: v2WorkloadEndpointKey1,
Expand All @@ -93,6 +94,9 @@ var _ = Describe("Test the WorkloadEndpoint update processor", func() {
})
Expect(err).NotTo(HaveOccurred())
Expect(kvps).To(HaveLen(1))
_, ipn, err := cnet.ParseCIDROrIP("10.100.10.1")
Expect(err).NotTo(HaveOccurred())
expectedIPv4Net := *(ipn.Network())
Expect(kvps[0]).To(Equal(&model.KVPair{
Key: v1WorkloadEndpointKey1,
Value: &model.WorkloadEndpoint{
Expand All @@ -103,6 +107,7 @@ var _ = Describe("Test the WorkloadEndpoint update processor", func() {
"projectcalico.org/namespace": ns1,
"projectcalico.org/orchestrator": oid1,
},
IPv4Nets: []cnet.IPNet{expectedIPv4Net},
},
Revision: "abcde",
}))
Expand All @@ -124,9 +129,9 @@ var _ = Describe("Test the WorkloadEndpoint update processor", func() {
res.Spec.MAC = "01:23:45:67:89:ab"
res.Spec.Profiles = []string{"testProfile"}
res.Spec.IPNetworks = []string{"10.100.10.1"}
_, ipn, err := cnet.ParseCIDROrIP("10.100.10.1")
_, ipn, err = cnet.ParseCIDROrIP("10.100.10.1")
Expect(err).NotTo(HaveOccurred())
expectedIPv4Net := *(ipn.Network())
expectedIPv4Net = *(ipn.Network())
res.Spec.IPNATs = []apiv2.IPNAT{
apiv2.IPNAT{
InternalIP: "10.100.1.1",
Expand Down Expand Up @@ -242,4 +247,32 @@ var _ = Describe("Test the WorkloadEndpoint update processor", func() {
})
Expect(err).To(HaveOccurred())
})

It("should filter out a WEP with no IPNetworks", func() {
up := updateprocessors.NewWorkloadEndpointUpdateProcessor()

By("converting a WorkloadEndpoint with no IPNetworks")
res := apiv2.NewWorkloadEndpoint()
res.Namespace = ns1
res.Labels = map[string]string{
"projectcalico.org/namespace": ns1,
"projectcalico.org/orchestrator": oid1,
}
res.Spec.Node = hn1
res.Spec.Orchestrator = oid1
res.Spec.Workload = wid1
res.Spec.Endpoint = eid1
res.Spec.InterfaceName = iface1

kvps, err := up.Process(&model.KVPair{
Key: v2WorkloadEndpointKey1,
Value: res,
Revision: "abcde",
})
Expect(err).NotTo(HaveOccurred())
Expect(kvps).To(HaveLen(1))
Expect(kvps[0]).To(Equal(&model.KVPair{
Key: v1WorkloadEndpointKey1,
}))
})
})
11 changes: 9 additions & 2 deletions lib/backend/watchersyncer/watchercache.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,14 @@ func (wc *watcherCache) run() {
for {
rc := wc.watch.ResultChan()
wc.logger.WithField("RC", rc).Debug("Reading event from results channel")
event := <-rc
event, ok := <-rc
if !ok {
// If the channel is closed then resync/recreate the watch.
wc.logger.Info("Watch channel closed by remote - recreate watcher")
wc.resyncAndCreateWatcher()
}

// Handle the specific event type.
switch event.Type {
case api.WatchAdded, api.WatchModified:
kvp := event.New
Expand All @@ -99,7 +106,7 @@ func (wc *watcherCache) run() {
}
default:
// Unknown event type - not much we can do other than log.
wc.logger.WithField("EventType", event.Type).Info("Unknown event type received from the datastore")
wc.logger.WithField("EventType", event.Type).Fatal("Unknown event type received from the datastore")
}
}
}
Expand Down

0 comments on commit fd037e8

Please sign in to comment.