Skip to content

Commit

Permalink
Use zerolog in the route agent
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis authored and dfarrell07 committed Sep 29, 2022
1 parent c1aa0f3 commit 013bba2
Show file tree
Hide file tree
Showing 22 changed files with 220 additions and 183 deletions.
21 changes: 17 additions & 4 deletions pkg/netlink/fake/netlink.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"syscall"

. "github.com/onsi/gomega"
"github.com/pkg/errors"
netlinkAPI "github.com/submariner-io/submariner/pkg/netlink"
"github.com/vishvananda/netlink"
)
Expand All @@ -45,6 +46,18 @@ type NetLink struct {

var _ netlinkAPI.Interface = &NetLink{}

type linkNotFoundError struct{}

func (e linkNotFoundError) Error() string {
return "Link not found"
}

func (e linkNotFoundError) Is(err error) bool {
// nolint:errorlint // The given error should not be wrapped.
_, ok := err.(netlink.LinkNotFoundError)
return ok
}

func New() *NetLink {
return &NetLink{
Adapter: netlinkAPI.Adapter{Basic: &basicType{
Expand Down Expand Up @@ -97,7 +110,7 @@ func (n *basicType) LinkByName(name string) (netlink.Link, error) {

link, found := n.links[name]
if !found {
return nil, &netlink.LinkNotFoundError{}
return nil, linkNotFoundError{}
}

return link, nil
Expand Down Expand Up @@ -258,10 +271,10 @@ func (n *NetLink) AwaitLink(name string) (link netlink.Link) {
}

func (n *NetLink) AwaitNoLink(name string) {
Eventually(func() error {
Eventually(func() bool {
_, err := n.LinkByName(name)
return err
}, 5).Should(BeAssignableToTypeOf(&netlink.LinkNotFoundError{}), "Link %q exists", name)
return errors.Is(err, netlink.LinkNotFoundError{})
}, 5).Should(BeTrue(), "Link %q exists", name)
}

func (n *NetLink) routeDestList(linkIndex int) []net.IPNet {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@ limitations under the License.
package cabledriver

import (
"github.com/submariner-io/admiral/pkg/log"
"github.com/submariner-io/submariner/pkg/cable/vxlan"
"github.com/submariner-io/submariner/pkg/event"
"github.com/submariner-io/submariner/pkg/netlink"
"k8s.io/klog/v2"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

type vxlanCleanup struct {
event.HandlerBase
}

var logger = log.Logger{Logger: logf.Log.WithName("CableDriver")}

func NewVXLANCleanup() event.Handler {
return &vxlanCleanup{}
}
Expand All @@ -42,7 +45,7 @@ func (h *vxlanCleanup) GetName() string {
}

func (h *vxlanCleanup) TransitionToNonGateway() error {
klog.Infof("Cleaning up the routes")
logger.Infof("Cleaning up the routes")

return netlink.DeleteIfaceAndAssociatedRoutes(vxlan.VxlanIface, vxlan.TableID) // nolint:wrapcheck // No need to wrap this error
}
3 changes: 1 addition & 2 deletions pkg/routeagent_driver/cabledriver/xfrm_cleanup_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package cabledriver
import (
"github.com/submariner-io/submariner/pkg/event"
"github.com/submariner-io/submariner/pkg/netlink"
"k8s.io/klog/v2"
)

type xrfmCleanup struct {
Expand All @@ -41,7 +40,7 @@ func (h *xrfmCleanup) GetNetworkPlugins() []string {
}

func (h *xrfmCleanup) TransitionToNonGateway() error {
klog.Info("Transitioned to non-Gateway, cleaning up the IPsec xfrm rules")
logger.Info("Transitioned to non-Gateway, cleaning up the IPsec xfrm rules")

return netlink.DeleteXfrmRules() // nolint:wrapcheck // No need to wrap this error
}
14 changes: 8 additions & 6 deletions pkg/routeagent_driver/cni/cni_iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

type Interface struct {
Name string
IPAddress string
}

var logger = log.Logger{Logger: logf.Log.WithName("CNI")}

// DiscoverFunc is a hook for unit tests.
var DiscoverFunc func(clusterCIDR string) (*Interface, error)

Expand Down Expand Up @@ -67,14 +69,14 @@ func discover(clusterCIDR string) (*Interface, error) {
for i := range addrs {
ipAddr, _, err := net.ParseCIDR(addrs[i].String())
if err != nil {
klog.Errorf("Unable to ParseCIDR : %q", addrs[i].String())
logger.Errorf(err, "Unable to ParseCIDR : %q", addrs[i].String())
} else if ipAddr.To4() != nil {
klog.V(log.DEBUG).Infof("Interface %q has %q address", iface.Name, ipAddr)
logger.V(log.DEBUG).Infof("Interface %q has %q address", iface.Name, ipAddr)
address := net.ParseIP(ipAddr.String())

// Verify that interface has an address from cluster CIDR
if clusterNetwork.Contains(address) {
klog.V(log.DEBUG).Infof("Found CNI Interface %q that has IP %q from ClusterCIDR %q",
logger.V(log.DEBUG).Infof("Found CNI Interface %q that has IP %q from ClusterCIDR %q",
iface.Name, ipAddr, clusterCIDR)
return &Interface{IPAddress: ipAddr.String(), Name: iface.Name}, nil
}
Expand Down Expand Up @@ -127,9 +129,9 @@ func AnnotateNodeWithCNIInterfaceIP(nodeName string, clientSet kubernetes.Interf
}

if setAnnotation {
klog.Infof("Successfully annotated node %q with cniIfaceIP %q", nodeName, cniIPAddress)
logger.Infof("Successfully annotated node %q with cniIfaceIP %q", nodeName, cniIPAddress)
} else {
klog.Infof("Successfully removed %q from node %q annotation", constants.CNIInterfaceIP, nodeName)
logger.Infof("Successfully removed %q from node %q annotation", constants.CNIInterfaceIP, nodeName)
}

return nil
Expand Down
21 changes: 10 additions & 11 deletions pkg/routeagent_driver/handlers/kubeproxy/endpoint_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pkg/errors"
submV1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
"github.com/submariner-io/submariner/pkg/cidr"
"k8s.io/klog/v2"
)

func (kp *SyncHandler) LocalEndpointCreated(endpoint *submV1.Endpoint) error {
Expand Down Expand Up @@ -54,11 +53,11 @@ func (kp *SyncHandler) LocalEndpointCreated(endpoint *submV1.Endpoint) error {
return errors.Wrap(err, "failed to derive the remoteVtepIP")
}

klog.Infof("Creating the vxlan interface %s with gateway node IP %s", VxLANIface, localClusterGwNodeIP)
logger.Infof("Creating the vxlan interface %s with gateway node IP %s", VxLANIface, localClusterGwNodeIP)

err = kp.createVxLANInterface(endpoint.Spec.Hostname, VxInterfaceWorker, localClusterGwNodeIP)
if err != nil {
klog.Fatalf("Unable to create VxLAN interface on non-GatewayNode (%s): %v", endpoint.Spec.Hostname, err)
logger.Fatalf("Unable to create VxLAN interface on non-GatewayNode (%s): %v", endpoint.Spec.Hostname, err)
}

kp.vxlanGwIP = &remoteVtepIP
Expand Down Expand Up @@ -98,7 +97,7 @@ func (kp *SyncHandler) LocalEndpointRemoved(endpoint *submV1.Endpoint) error {
func (kp *SyncHandler) RemoteEndpointCreated(endpoint *submV1.Endpoint) error {
if err := cidr.OverlappingSubnets(kp.localServiceCidr, kp.localClusterCidr, endpoint.Spec.Subnets); err != nil {
// Skip processing the endpoint when CIDRs overlap and return nil to avoid re-queuing.
klog.Errorf("overlappingSubnets for new remote %#v returned error: %v", endpoint, err)
logger.Errorf(err, "overlappingSubnets for new remote %#v returned error", endpoint)
return nil
}

Expand All @@ -108,7 +107,7 @@ func (kp *SyncHandler) RemoteEndpointCreated(endpoint *submV1.Endpoint) error {
lastProcessedTime, ok := kp.remoteEndpointTimeStamp[endpoint.Spec.ClusterID]

if ok && lastProcessedTime.After(endpoint.CreationTimestamp.Time) {
klog.Infof("Ignoring new remote %#v since a later endpoint was already"+
logger.Infof("Ignoring new remote %#v since a later endpoint was already"+
"processed", endpoint)
return nil
}
Expand All @@ -123,8 +122,8 @@ func (kp *SyncHandler) RemoteEndpointCreated(endpoint *submV1.Endpoint) error {
}

if err := kp.updateRoutingRulesForInterClusterSupport(endpoint.Spec.Subnets, Add); err != nil {
klog.Errorf("updateRoutingRulesForInterClusterSupport for new remote %#v returned error: %+v",
endpoint, err)
logger.Errorf(err, "updateRoutingRulesForInterClusterSupport for new remote %#v returned error",
endpoint)
return err
}

Expand All @@ -148,7 +147,7 @@ func (kp *SyncHandler) RemoteEndpointRemoved(endpoint *submV1.Endpoint) error {
lastProcessedTime, ok := kp.remoteEndpointTimeStamp[endpoint.Spec.ClusterID]

if ok && lastProcessedTime.After(endpoint.CreationTimestamp.Time) {
klog.Infof("Ignoring deleted remote %#v since a later endpoint was already"+
logger.Infof("Ignoring deleted remote %#v since a later endpoint was already"+
"processed", endpoint)
return nil
}
Expand All @@ -162,8 +161,8 @@ func (kp *SyncHandler) RemoteEndpointRemoved(endpoint *submV1.Endpoint) error {
// TODO: Handle a remote endpoint removal use-case
// - remove related iptable rules
if err := kp.updateRoutingRulesForInterClusterSupport(endpoint.Spec.Subnets, Delete); err != nil {
klog.Errorf("updateRoutingRulesForInterClusterSupport for removed remote %#v returned error: %+v",
err, endpoint)
logger.Errorf(err, "updateRoutingRulesForInterClusterSupport for removed remote %#v returned error",
endpoint)
return err
}

Expand All @@ -183,7 +182,7 @@ func (kp *SyncHandler) getHostIfaceIPAddress() (net.IP, error) {
for i := range addrs {
ipAddr, _, err := net.ParseCIDR(addrs[i].String())
if err != nil {
klog.Errorf("Unable to ParseCIDR %v: %v", addrs, err)
logger.Errorf(err, "Unable to ParseCIDR %v", addrs)
}

if ipAddr.To4() != nil {
Expand Down
17 changes: 8 additions & 9 deletions pkg/routeagent_driver/handlers/kubeproxy/gw_transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import (
"github.com/submariner-io/admiral/pkg/log"
netlinkAPI "github.com/submariner-io/submariner/pkg/netlink"
"github.com/submariner-io/submariner/pkg/routeagent_driver/constants"
"k8s.io/klog/v2"
)

func (kp *SyncHandler) TransitionToNonGateway() error {
klog.V(log.DEBUG).Info("The current node is no longer a Gateway")
logger.V(log.DEBUG).Info("The current node is no longer a Gateway")
kp.syncHandlerMutex.Lock()
defer kp.syncHandlerMutex.Unlock()
kp.isGatewayNode = false
Expand All @@ -37,33 +36,33 @@ func (kp *SyncHandler) TransitionToNonGateway() error {

err := kp.netLink.RuleDelIfPresent(netlinkAPI.NewTableRule(constants.RouteAgentHostNetworkTableID))
if err != nil {
klog.Errorf("Unable to delete ip rule to table %d on non-Gateway node %s: %v",
constants.RouteAgentHostNetworkTableID, kp.hostname, err)
logger.Errorf(err, "Unable to delete ip rule to table %d on non-Gateway node %s",
constants.RouteAgentHostNetworkTableID, kp.hostname)
}

return nil
}

func (kp *SyncHandler) TransitionToGateway() error {
klog.V(log.DEBUG).Info("The current node has become a Gateway")
logger.V(log.DEBUG).Info("The current node has become a Gateway")
kp.cleanVxSubmarinerRoutes()

kp.syncHandlerMutex.Lock()
defer kp.syncHandlerMutex.Unlock()
kp.isGatewayNode = true
kp.wasGatewayPreviously = true

klog.Infof("Creating the vxlan interface: %s on the gateway node", VxLANIface)
logger.Infof("Creating the vxlan interface: %s on the gateway node", VxLANIface)

err := kp.createVxLANInterface(kp.hostname, VxInterfaceGateway, nil)
if err != nil {
klog.Fatalf("Unable to create VxLAN interface on gateway node (%s): %v", kp.hostname, err)
logger.Fatalf("Unable to create VxLAN interface on gateway node (%s): %v", kp.hostname, err)
}

err = kp.netLink.RuleAddIfNotPresent(netlinkAPI.NewTableRule(constants.RouteAgentHostNetworkTableID))
if err != nil {
klog.Errorf("Unable to add ip rule to table %d on Gateway node %s: %v",
constants.RouteAgentHostNetworkTableID, kp.hostname, err)
logger.Errorf(err, "Unable to add ip rule to table %d on Gateway node %s",
constants.RouteAgentHostNetworkTableID, kp.hostname)
}

// Add routes to the new endpoint on the GatewayNode.
Expand Down
19 changes: 9 additions & 10 deletions pkg/routeagent_driver/handlers/kubeproxy/iptables_iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/submariner-io/submariner/pkg/port"
"github.com/submariner-io/submariner/pkg/routeagent_driver/constants"
iptcommon "github.com/submariner-io/submariner/pkg/routeagent_driver/iptables"
"k8s.io/klog/v2"
)

func (kp *SyncHandler) createIPTableChains() error {
Expand All @@ -41,7 +40,7 @@ func (kp *SyncHandler) createIPTableChains() error {
return errors.Wrap(err, "error initializing POST routing chain")
}

klog.V(log.DEBUG).Infof("Install/ensure %q chain exists", constants.SmInputChain)
logger.V(log.DEBUG).Infof("Install/ensure %q chain exists", constants.SmInputChain)

if err = ipt.CreateChainIfNotExists(constants.FilterTable, constants.SmInputChain); err != nil {
return errors.Wrap(err, "unable to create SUBMARINER-INPUT chain in iptables")
Expand All @@ -52,15 +51,15 @@ func (kp *SyncHandler) createIPTableChains() error {
return errors.Wrapf(err, "unable to append iptables rule %q", strings.Join(forwardToSubInputRuleSpec, " "))
}

klog.V(log.DEBUG).Infof("Allow VxLAN incoming traffic in %q Chain", constants.SmInputChain)
logger.V(log.DEBUG).Infof("Allow VxLAN incoming traffic in %q Chain", constants.SmInputChain)

ruleSpec := []string{"-p", "udp", "-m", "udp", "--dport", strconv.Itoa(port.IntraClusterVxLAN), "-j", "ACCEPT"}

if err = ipt.AppendUnique(constants.FilterTable, constants.SmInputChain, ruleSpec...); err != nil {
return errors.Wrapf(err, "unable to append iptables rule %q", strings.Join(ruleSpec, " "))
}

klog.V(log.DEBUG).Infof("Insert rule to allow traffic over %s interface in FORWARDing Chain", VxLANIface)
logger.V(log.DEBUG).Infof("Insert rule to allow traffic over %s interface in FORWARDing Chain", VxLANIface)

ruleSpec = []string{"-o", VxLANIface, "-j", "ACCEPT"}

Expand All @@ -72,7 +71,7 @@ func (kp *SyncHandler) createIPTableChains() error {
// Program rules to support communication from HostNetwork to remoteCluster
sourceAddress := strconv.Itoa(VxLANVTepNetworkPrefix) + ".0.0.0/8"
ruleSpec = []string{"-s", sourceAddress, "-o", VxLANIface, "-j", "SNAT", "--to", kp.cniIface.IPAddress}
klog.V(log.DEBUG).Infof("Installing rule for host network to remote cluster communication: %s", strings.Join(ruleSpec, " "))
logger.V(log.DEBUG).Infof("Installing rule for host network to remote cluster communication: %s", strings.Join(ruleSpec, " "))

if err = ipt.AppendUnique(constants.NATTable, constants.SmPostRoutingChain, ruleSpec...); err != nil {
return errors.Wrapf(err, "error appending iptables rule %q", strings.Join(ruleSpec, " "))
Expand All @@ -86,7 +85,7 @@ func (kp *SyncHandler) updateIptableRulesForInterClusterTraffic(inputCidrBlocks
for _, inputCidrBlock := range inputCidrBlocks {
err := kp.programIptableRulesForInterClusterTraffic(inputCidrBlock, operation)
if err != nil {
klog.Errorf("Failed to program iptable rules. %v", err)
logger.Errorf(err, "Failed to program iptable rules")
}
}
}
Expand All @@ -102,25 +101,25 @@ func (kp *SyncHandler) programIptableRulesForInterClusterTraffic(remoteCidrBlock
incomingRuleSpec := []string{"-s", remoteCidrBlock, "-d", localClusterCidr, "-j", "ACCEPT"}

if operation == Add {
klog.V(log.DEBUG).Infof("Installing iptables rule for outgoing traffic: %s", strings.Join(outboundRuleSpec, " "))
logger.V(log.DEBUG).Infof("Installing iptables rule for outgoing traffic: %s", strings.Join(outboundRuleSpec, " "))

if err = ipt.AppendUnique(constants.NATTable, constants.SmPostRoutingChain, outboundRuleSpec...); err != nil {
return errors.Wrapf(err, "error appending iptables rule %q", strings.Join(outboundRuleSpec, " "))
}

klog.V(log.DEBUG).Infof("Installing iptables rule for incoming traffic: %s", strings.Join(incomingRuleSpec, " "))
logger.V(log.DEBUG).Infof("Installing iptables rule for incoming traffic: %s", strings.Join(incomingRuleSpec, " "))

if err = ipt.AppendUnique(constants.NATTable, constants.SmPostRoutingChain, incomingRuleSpec...); err != nil {
return errors.Wrapf(err, "error appending iptables rule %q", strings.Join(incomingRuleSpec, " "))
}
} else if operation == Delete {
klog.V(log.DEBUG).Infof("Deleting iptables rule for outgoing traffic: %s", strings.Join(outboundRuleSpec, " "))
logger.V(log.DEBUG).Infof("Deleting iptables rule for outgoing traffic: %s", strings.Join(outboundRuleSpec, " "))

if err = ipt.Delete(constants.NATTable, constants.SmPostRoutingChain, outboundRuleSpec...); err != nil {
return errors.Wrapf(err, "error deleting iptables rule %q", strings.Join(outboundRuleSpec, " "))
}

klog.V(log.DEBUG).Infof("Deleting iptables rule for incoming traffic: %s", strings.Join(incomingRuleSpec, " "))
logger.V(log.DEBUG).Infof("Deleting iptables rule for incoming traffic: %s", strings.Join(incomingRuleSpec, " "))

if err = ipt.Delete(constants.NATTable, constants.SmPostRoutingChain, incomingRuleSpec...); err != nil {
return errors.Wrapf(err, "error deleting iptables rule %q", strings.Join(incomingRuleSpec, " "))
Expand Down
7 changes: 5 additions & 2 deletions pkg/routeagent_driver/handlers/kubeproxy/kp_iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
"sync"

"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/log"
"github.com/submariner-io/admiral/pkg/stringset"
cni "github.com/submariner-io/submariner/pkg/cni"
"github.com/submariner-io/submariner/pkg/event"
"github.com/submariner-io/submariner/pkg/netlink"
cniapi "github.com/submariner-io/submariner/pkg/routeagent_driver/cni"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

type SyncHandler struct {
Expand All @@ -57,6 +58,8 @@ type SyncHandler struct {
defaultHostIface *net.Interface
}

var logger = log.Logger{Logger: logf.Log.WithName("KubeProxy")}

func NewSyncHandler(localClusterCidr, localServiceCidr []string) *SyncHandler {
return &SyncHandler{
localClusterCidr: localClusterCidr,
Expand Down Expand Up @@ -112,7 +115,7 @@ func (kp *SyncHandler) Init() error {
// to work, but the following use-cases may not work.
// 1. Hostnetworking to remote cluster support will be broken
// 2. Health-check verification between the Gateway nodes will be disabled
klog.Errorf("Error discovering the CNI interface %v", err)
logger.Errorf(err, "Error discovering the CNI interface")
}

// Create the necessary IPTable chains in the filter and nat tables.
Expand Down
Loading

0 comments on commit 013bba2

Please sign in to comment.