Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use local temp pods by ip cache #330

Merged
merged 1 commit into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions cmd/agent/daemon/state/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,6 @@ func NewController(
panic(err)
}

// IP info cache is used only in netflow pipeline.
// It's safe to use non synced lru since it's accessed form on goroutine.
ipInfoCache, err := freelru.New[netip.Addr, *kubepb.IPInfo](1024, func(k netip.Addr) uint32 {
return uint32(xxhash.Sum64(k.AsSlice()))
})
if err != nil {
panic(err)
}

// Conntrack cache is used only in netflow pipeline.
// It's safe to use non synced lru since it's accessed form on goroutine.
conntrackCacheKey := xxhash.New()
Expand Down Expand Up @@ -139,7 +130,6 @@ func NewController(
mutedNamespaces: map[string]struct{}{},
netflows: map[uint64]*netflowVal{},
dnsCache: dnsCache,
ipInfoCache: ipInfoCache,
podCache: podCache,
conntrackCache: conntrackCache,
processTreeCollector: processTreeCollector,
Expand Down Expand Up @@ -177,7 +167,6 @@ type Controller struct {
kubeClient kubepb.KubeAPIClient
dnsCache *freelru.SyncedLRU[uint64, *freelru.SyncedLRU[netip.Addr, string]]
podCache *freelru.SyncedLRU[string, *kubepb.Pod]
ipInfoCache *freelru.LRU[netip.Addr, *kubepb.IPInfo]
conntrackCache *freelru.LRU[types.AddrTuple, netip.AddrPort]
}

Expand Down
13 changes: 8 additions & 5 deletions cmd/agent/daemon/state/netflow_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func (c *Controller) enqueueNetflowExport(now time.Time) {
)
}()

podsByIPCache := map[netip.Addr]*kubepb.IPInfo{}

for key, netflow := range c.netflows {
// Flow was exported before and doesn't have new changes. Delete it and continue.
if netflow.exportedAt.After(time.UnixMicro(int64(netflow.event.Context.Ts) / 1000)) {
Expand Down Expand Up @@ -152,6 +154,7 @@ func (c *Controller) enqueueNetflowExport(now time.Time) {
pbNetFlow.Destinations = make([]*castpb.NetflowDestination, 0, len(activeNetflowDests))
for _, dest := range activeNetflowDests {
flowDest := c.toProtoNetflowDest(
podsByIPCache,
netflow.event.Context.CgroupID,
args.Tuple.Src,
dest.addrPort,
Expand Down Expand Up @@ -236,7 +239,7 @@ func (c *Controller) toProtoNetflow(e *types.Event, args *types.NetFlowBaseArgs)
return res
}

func (c *Controller) toProtoNetflowDest(cgroupID uint64, src, dst netip.AddrPort, txBytes, rxBytes, txPackets, rxPackets uint64) *castpb.NetflowDestination {
func (c *Controller) toProtoNetflowDest(podsByIPCache map[netip.Addr]*kubepb.IPInfo, cgroupID uint64, src, dst netip.AddrPort, txBytes, rxBytes, txPackets, rxPackets uint64) *castpb.NetflowDestination {
dns := c.getAddrDnsQuestion(cgroupID, dst.Addr())

if c.clusterInfo.serviceCidr.Contains(dst.Addr()) {
Expand All @@ -256,7 +259,7 @@ func (c *Controller) toProtoNetflowDest(cgroupID uint64, src, dst netip.AddrPort
}

if c.clusterInfo.serviceCidr.Contains(dst.Addr()) || c.clusterInfo.podCidr.Contains(dst.Addr()) {
ipInfo, found := c.getIPInfo(dst.Addr())
ipInfo, found := c.getIPInfo(podsByIPCache, dst.Addr())
if found {
res.PodName = ipInfo.PodName
res.Namespace = ipInfo.Namespace
Expand All @@ -268,8 +271,8 @@ func (c *Controller) toProtoNetflowDest(cgroupID uint64, src, dst netip.AddrPort
return res
}

func (c *Controller) getIPInfo(addr netip.Addr) (*kubepb.IPInfo, bool) {
ipInfo, found := c.ipInfoCache.Get(addr)
func (c *Controller) getIPInfo(podsByIPCache map[netip.Addr]*kubepb.IPInfo, addr netip.Addr) (*kubepb.IPInfo, bool) {
ipInfo, found := podsByIPCache[addr]
if !found {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
Expand All @@ -279,7 +282,7 @@ func (c *Controller) getIPInfo(addr netip.Addr) (*kubepb.IPInfo, bool) {
return nil, false
}
ipInfo = resp.Info
c.ipInfoCache.Add(addr, ipInfo)
podsByIPCache[addr] = ipInfo
}
return ipInfo, true
}
Expand Down
Loading