Skip to content

Commit

Permalink
dpdk: refactor the main packet loop into smaller functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukas Sismis authored and victorjulien committed Mar 16, 2024
1 parent 0b5966c commit 5592ec0
Showing 1 changed file with 176 additions and 127 deletions.
303 changes: 176 additions & 127 deletions src/source-dpdk.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ static inline void InterruptsTurnOnOff(uint16_t port_id, uint16_t queue_id, bool
rte_spinlock_unlock(&(intr_lock[port_id]));
}

static void DPDKFreeMbufArray(struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset)
static inline void DPDKFreeMbufArray(
struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset)
{
for (int i = offset; i < mbuf_cnt; i++) {
rte_pktmbuf_free(mbuf_array[i]);
Expand Down Expand Up @@ -394,151 +395,204 @@ static void DPDKReleasePacket(Packet *p)
PacketFreeOrRelease(p);
}

/**
* \brief Main DPDK reading Loop function
*/
static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
static TmEcode ReceiveDPDKLoopInit(ThreadVars *tv, DPDKThreadVars *ptv)
{
SCEnter();
Packet *p;
uint16_t nb_rx;
time_t last_dump = 0;
time_t current_time;
bool segmented_mbufs_warned = 0;
SCTime_t t = DPDKSetTimevalReal(&machine_start_time);
uint64_t last_timeout_msec = SCTIME_MSECS(t);

DPDKThreadVars *ptv = (DPDKThreadVars *)data;
TmSlot *s = (TmSlot *)slot;

ptv->slot = s->slot_next;

// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
// Indicate that the thread is actually running its application level
// code (i.e., it can poll packets)
TmThreadsSetFlag(tv, THV_RUNNING);
PacketPoolWait();

rte_eth_stats_reset(ptv->port_id);
rte_eth_xstats_reset(ptv->port_id);

uint32_t pwd_zero_rx_packet_polls_count = 0;
if (ptv->intr_enabled && !InterruptsRXEnable(ptv->port_id, ptv->queue_id))
SCReturnInt(TM_ECODE_FAILED);

while (1) {
if (unlikely(suricata_ctl_flags != 0)) {
SCLogDebug("Stopping Suricata!");
SC_ATOMIC_ADD(ptv->workers_sync->worker_checked_in, 1);
while (SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) <
ptv->workers_sync->worker_cnt) {
rte_delay_us(10);
SCReturnInt(TM_ECODE_OK);
}

static inline void LoopHandleTimeoutOnIdle(ThreadVars *tv)
{
static uint64_t last_timeout_msec = 0;
SCTime_t t = DPDKSetTimevalReal(&machine_start_time);
uint64_t msecs = SCTIME_MSECS(t);
if (msecs > last_timeout_msec + 100) {
TmThreadsCaptureHandleTimeout(tv, NULL);
last_timeout_msec = msecs;
}
}

/**
* \brief Decides if it should retry the packet poll or continue with the packet processing
* \return true if the poll should be retried, false otherwise
*/
static inline bool RXPacketCountHeuristic(ThreadVars *tv, DPDKThreadVars *ptv, uint16_t nb_rx)
{
static uint32_t zero_pkt_polls_cnt = 0;

if (nb_rx > 0) {
zero_pkt_polls_cnt = 0;
return false;
}

LoopHandleTimeoutOnIdle(tv);
if (!ptv->intr_enabled)
return true;

zero_pkt_polls_cnt++;
if (zero_pkt_polls_cnt <= MIN_ZERO_POLL_COUNT)
return true;

uint32_t pwd_idle_hint = InterruptsSleepHeuristic(zero_pkt_polls_cnt);
if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) {
rte_delay_us(pwd_idle_hint);
} else {
InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, true);
struct rte_epoll_event event;
rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS);
InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, false);
return true;
}

return false;
}

/**
* \brief Initializes a packet from an mbuf
* \return true if the packet was initialized successfully, false otherwise
*/
static inline Packet *PacketInitFromMbuf(DPDKThreadVars *ptv, struct rte_mbuf *mbuf)
{
Packet *p = PacketGetFromQueueOrAlloc();
if (unlikely(p == NULL)) {
return NULL;
}
PKT_SET_SRC(p, PKT_SRC_WIRE);
p->datalink = LINKTYPE_ETHERNET;
if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
p->flags |= PKT_IGNORE_CHECKSUM;
}

p->ts = DPDKSetTimevalReal(&machine_start_time);
p->dpdk_v.mbuf = mbuf;
p->ReleasePacket = DPDKReleasePacket;
p->dpdk_v.copy_mode = ptv->copy_mode;
p->dpdk_v.out_port_id = ptv->out_port_id;
p->dpdk_v.out_queue_id = ptv->queue_id;
p->livedev = ptv->livedev;

if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
p->flags |= PKT_IGNORE_CHECKSUM;
} else if (ptv->checksum_mode == CHECKSUM_VALIDATION_OFFLOAD) {
uint64_t ol_flags = p->dpdk_v.mbuf->ol_flags;
if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD &&
(ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) {
SCLogDebug("HW detected GOOD IP and L4 chsum, ignoring validation");
p->flags |= PKT_IGNORE_CHECKSUM;
} else {
if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) {
SCLogDebug("HW detected BAD IP checksum");
// chsum recalc will not be triggered but rule keyword check will be
p->level3_comp_csum = 0;
}
if (ptv->queue_id == 0) {
rte_delay_us(20); // wait for all threads to get out of the sync loop
SC_ATOMIC_SET(ptv->workers_sync->worker_checked_in, 0);
// If Suricata runs in peered mode, the peer threads might still want to send
// packets to our port. Instead, we know, that we are done with the peered port, so
// we stop it. The peered threads will stop our port.
if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) {
rte_eth_dev_stop(ptv->out_port_id);
} else {
// in IDS we stop our port - no peer threads are running
rte_eth_dev_stop(ptv->port_id);
}
if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) {
SCLogDebug("HW detected BAD L4 chsum");
p->level4_comp_csum = 0;
}
DPDKDumpCounters(ptv);
break;
}
}

return p;
}

static inline void DPDKSegmentedMbufWarning(struct rte_mbuf *mbuf)
{
static bool segmented_mbufs_warned = false;
if (!segmented_mbufs_warned && !rte_pktmbuf_is_contiguous(mbuf)) {
char warn_s[] = "Segmented mbufs detected! Redmine Ticket #6012 "
"Check your configuration or report the issue";
enum rte_proc_type_t eal_t = rte_eal_process_type();
if (eal_t == RTE_PROC_SECONDARY) {
SCLogWarning("%s. To avoid segmented mbufs, "
"try to increase mbuf size in your primary application",
warn_s);
} else if (eal_t == RTE_PROC_PRIMARY) {
SCLogWarning("%s. To avoid segmented mbufs, "
"try to increase MTU in your suricata.yaml",
warn_s);
}

nb_rx = rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE);
if (unlikely(nb_rx == 0)) {
t = DPDKSetTimevalReal(&machine_start_time);
uint64_t msecs = SCTIME_MSECS(t);
if (msecs > last_timeout_msec + 100) {
TmThreadsCaptureHandleTimeout(tv, NULL);
last_timeout_msec = msecs;
}
segmented_mbufs_warned = true;
}
}

if (!ptv->intr_enabled)
continue;
static void HandleShutdown(DPDKThreadVars *ptv)
{
SCLogDebug("Stopping Suricata!");
SC_ATOMIC_ADD(ptv->workers_sync->worker_checked_in, 1);
while (SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) < ptv->workers_sync->worker_cnt) {
rte_delay_us(10);
}
if (ptv->queue_id == 0) {
rte_delay_us(20); // wait for all threads to get out of the sync loop
SC_ATOMIC_SET(ptv->workers_sync->worker_checked_in, 0);
// If Suricata runs in peered mode, the peer threads might still want to send
// packets to our port. Instead, we know, that we are done with the peered port, so
// we stop it. The peered threads will stop our port.
if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) {
rte_eth_dev_stop(ptv->out_port_id);
} else {
// in IDS we stop our port - no peer threads are running
rte_eth_dev_stop(ptv->port_id);
}
}
DPDKDumpCounters(ptv);
}

pwd_zero_rx_packet_polls_count++;
if (pwd_zero_rx_packet_polls_count <= MIN_ZERO_POLL_COUNT)
continue;
static void PeriodicDPDKDumpCounters(DPDKThreadVars *ptv)
{
static time_t last_dump = 0;
time_t current_time = DPDKGetSeconds();
/* Trigger one dump of stats every second */
if (current_time != last_dump) {
DPDKDumpCounters(ptv);
last_dump = current_time;
}
}

uint32_t pwd_idle_hint = InterruptsSleepHeuristic(pwd_zero_rx_packet_polls_count);
/**
* \brief Main DPDK reading Loop function
*/
static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
{
SCEnter();
DPDKThreadVars *ptv = (DPDKThreadVars *)data;
ptv->slot = (TmSlot *)slot;
TmEcode ret = ReceiveDPDKLoopInit(tv, ptv);
if (ret != TM_ECODE_OK) {
SCReturnInt(ret);
}
while (true) {
if (unlikely(suricata_ctl_flags != 0)) {
HandleShutdown(ptv);
break;
}

if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) {
rte_delay_us(pwd_idle_hint);
} else {
InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, true);
struct rte_epoll_event event;
rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS);
InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, false);
continue;
}
} else if (ptv->intr_enabled && pwd_zero_rx_packet_polls_count) {
pwd_zero_rx_packet_polls_count = 0;
uint16_t nb_rx =
rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE);
if (RXPacketCountHeuristic(tv, ptv, nb_rx)) {
continue;
}

ptv->pkts += (uint64_t)nb_rx;
for (uint16_t i = 0; i < nb_rx; i++) {
p = PacketGetFromQueueOrAlloc();
if (unlikely(p == NULL)) {
Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]);
if (p == NULL) {
rte_pktmbuf_free(ptv->received_mbufs[i]);
continue;
}
PKT_SET_SRC(p, PKT_SRC_WIRE);
p->datalink = LINKTYPE_ETHERNET;
if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
p->flags |= PKT_IGNORE_CHECKSUM;
}

p->ts = DPDKSetTimevalReal(&machine_start_time);
p->dpdk_v.mbuf = ptv->received_mbufs[i];
p->ReleasePacket = DPDKReleasePacket;
p->dpdk_v.copy_mode = ptv->copy_mode;
p->dpdk_v.out_port_id = ptv->out_port_id;
p->dpdk_v.out_queue_id = ptv->queue_id;
p->livedev = ptv->livedev;

if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
p->flags |= PKT_IGNORE_CHECKSUM;
} else if (ptv->checksum_mode == CHECKSUM_VALIDATION_OFFLOAD) {
uint64_t ol_flags = ptv->received_mbufs[i]->ol_flags;
if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD &&
(ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) {
SCLogDebug("HW detected GOOD IP and L4 chsum, ignoring validation");
p->flags |= PKT_IGNORE_CHECKSUM;
} else {
if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) {
SCLogDebug("HW detected BAD IP checksum");
// chsum recalc will not be triggered but rule keyword check will be
p->level3_comp_csum = 0;
}
if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) {
SCLogDebug("HW detected BAD L4 chsum");
p->level4_comp_csum = 0;
}
}
}

if (!rte_pktmbuf_is_contiguous(p->dpdk_v.mbuf) && !segmented_mbufs_warned) {
char warn_s[] = "Segmented mbufs detected! Redmine Ticket #6012 "
"Check your configuration or report the issue";
enum rte_proc_type_t eal_t = rte_eal_process_type();
if (eal_t == RTE_PROC_SECONDARY) {
SCLogWarning("%s. To avoid segmented mbufs, "
"try to increase mbuf size in your primary application",
warn_s);
} else if (eal_t == RTE_PROC_PRIMARY) {
SCLogWarning("%s. To avoid segmented mbufs, "
"try to increase MTU in your suricata.yaml",
warn_s);
}

segmented_mbufs_warned = 1;
}

DPDKSegmentedMbufWarning(ptv->received_mbufs[i]);
PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *),
rte_pktmbuf_pkt_len(p->dpdk_v.mbuf));
if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
Expand All @@ -548,12 +602,7 @@ static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
}
}

/* Trigger one dump of stats every second */
current_time = DPDKGetSeconds();
if (current_time != last_dump) {
DPDKDumpCounters(ptv);
last_dump = current_time;
}
PeriodicDPDKDumpCounters(ptv);
StatsSyncCountersIfSignalled(tv);
}

Expand Down

0 comments on commit 5592ec0

Please sign in to comment.