diff --git a/cmd/agent/daemon/state/controller.go b/cmd/agent/daemon/state/controller.go index 4e52f6c1..67f36066 100644 --- a/cmd/agent/daemon/state/controller.go +++ b/cmd/agent/daemon/state/controller.go @@ -99,15 +99,6 @@ func NewController( panic(err) } - // IP info cache is used only in netflow pipeline. - // It's safe to use non synced lru since it's accessed form on goroutine. - ipInfoCache, err := freelru.New[netip.Addr, *kubepb.IPInfo](1024, func(k netip.Addr) uint32 { - return uint32(xxhash.Sum64(k.AsSlice())) - }) - if err != nil { - panic(err) - } - // Conntrack cache is used only in netflow pipeline. // It's safe to use non synced lru since it's accessed form on goroutine. conntrackCacheKey := xxhash.New() @@ -139,7 +130,6 @@ func NewController( mutedNamespaces: map[string]struct{}{}, netflows: map[uint64]*netflowVal{}, dnsCache: dnsCache, - ipInfoCache: ipInfoCache, podCache: podCache, conntrackCache: conntrackCache, processTreeCollector: processTreeCollector, @@ -177,7 +167,6 @@ type Controller struct { kubeClient kubepb.KubeAPIClient dnsCache *freelru.SyncedLRU[uint64, *freelru.SyncedLRU[netip.Addr, string]] podCache *freelru.SyncedLRU[string, *kubepb.Pod] - ipInfoCache *freelru.LRU[netip.Addr, *kubepb.IPInfo] conntrackCache *freelru.LRU[types.AddrTuple, netip.AddrPort] } diff --git a/cmd/agent/daemon/state/netflow_pipeline.go b/cmd/agent/daemon/state/netflow_pipeline.go index 82cffc19..e7801efa 100644 --- a/cmd/agent/daemon/state/netflow_pipeline.go +++ b/cmd/agent/daemon/state/netflow_pipeline.go @@ -123,6 +123,8 @@ func (c *Controller) enqueueNetflowExport(now time.Time) { ) }() + podsByIPCache := map[netip.Addr]*kubepb.IPInfo{} + for key, netflow := range c.netflows { // Flow was exported before and doesn't have new changes. Delete it and continue. if netflow.exportedAt.After(time.UnixMicro(int64(netflow.event.Context.Ts) / 1000)) { @@ -152,6 +154,7 @@ func (c *Controller) enqueueNetflowExport(now time.Time) { pbNetFlow.Destinations = make([]*castpb.NetflowDestination, 0, len(activeNetflowDests)) for _, dest := range activeNetflowDests { flowDest := c.toProtoNetflowDest( + podsByIPCache, netflow.event.Context.CgroupID, args.Tuple.Src, dest.addrPort, @@ -236,7 +239,7 @@ func (c *Controller) toProtoNetflow(e *types.Event, args *types.NetFlowBaseArgs) return res } -func (c *Controller) toProtoNetflowDest(cgroupID uint64, src, dst netip.AddrPort, txBytes, rxBytes, txPackets, rxPackets uint64) *castpb.NetflowDestination { +func (c *Controller) toProtoNetflowDest(podsByIPCache map[netip.Addr]*kubepb.IPInfo, cgroupID uint64, src, dst netip.AddrPort, txBytes, rxBytes, txPackets, rxPackets uint64) *castpb.NetflowDestination { dns := c.getAddrDnsQuestion(cgroupID, dst.Addr()) if c.clusterInfo.serviceCidr.Contains(dst.Addr()) { @@ -256,7 +259,7 @@ func (c *Controller) toProtoNetflowDest(cgroupID uint64, src, dst netip.AddrPort } if c.clusterInfo.serviceCidr.Contains(dst.Addr()) || c.clusterInfo.podCidr.Contains(dst.Addr()) { - ipInfo, found := c.getIPInfo(dst.Addr()) + ipInfo, found := c.getIPInfo(podsByIPCache, dst.Addr()) if found { res.PodName = ipInfo.PodName res.Namespace = ipInfo.Namespace @@ -268,8 +271,8 @@ func (c *Controller) toProtoNetflowDest(cgroupID uint64, src, dst netip.AddrPort return res } -func (c *Controller) getIPInfo(addr netip.Addr) (*kubepb.IPInfo, bool) { - ipInfo, found := c.ipInfoCache.Get(addr) +func (c *Controller) getIPInfo(podsByIPCache map[netip.Addr]*kubepb.IPInfo, addr netip.Addr) (*kubepb.IPInfo, bool) { + ipInfo, found := podsByIPCache[addr] if !found { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() @@ -279,7 +282,7 @@ func (c *Controller) getIPInfo(addr netip.Addr) (*kubepb.IPInfo, bool) { return nil, false } ipInfo = resp.Info - c.ipInfoCache.Add(addr, ipInfo) + podsByIPCache[addr] = ipInfo } return ipInfo, true }