diff --git a/src/tm-threads.c b/src/tm-threads.c index deabe7333b1c..f0b2f823b253 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -42,6 +42,7 @@ #include "util-profiling.h" #include "util-signal.h" #include "queue.h" +#include "util-validate.h" #ifdef PROFILE_LOCKING thread_local uint64_t mutex_lock_contention; @@ -107,6 +108,22 @@ void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag) SC_ATOMIC_AND(tv->flags, ~flag); } +TmEcode TmThreadsProcessDecodePseudoPackets( + ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot) +{ + while (decode_pq->top != NULL) { + Packet *extra_p = PacketDequeueNoLock(decode_pq); + if (unlikely(extra_p == NULL)) + continue; + DEBUG_VALIDATE_BUG_ON(extra_p->flow != NULL); + + if (TmThreadsSlotProcessPkt(tv, slot, extra_p) != TM_ECODE_OK) { + SCReturnInt(TM_ECODE_FAILED); + } + } + SCReturnInt(TM_ECODE_OK); +} + /** * \brief Separate run function so we can call it recursively. */ @@ -124,21 +141,8 @@ TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot) return TM_ECODE_FAILED; } - /* handle new packets */ - while (tv->decode_pq.top != NULL) { - Packet *extra_p = PacketDequeueNoLock(&tv->decode_pq); - if (unlikely(extra_p == NULL)) - continue; - - /* see if we need to process the packet */ - if (s->slot_next != NULL) { - r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next); - if (unlikely(r == TM_ECODE_FAILED)) { - TmThreadsSlotProcessPktFail(tv, s, extra_p); - return TM_ECODE_FAILED; - } - } - tv->tmqh_out(tv, extra_p); + if (TmThreadsProcessDecodePseudoPackets(tv, &tv->decode_pq, s->slot_next) != TM_ECODE_OK) { + return TM_ECODE_FAILED; } } diff --git a/src/tm-threads.h b/src/tm-threads.h index 207a0fb7ff72..d68c9230a372 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -129,6 +129,9 @@ TmSlot *TmThreadGetFirstTmSlotForPartialPattern(const char *); uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags); +TmEcode TmThreadsProcessDecodePseudoPackets( + ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot); + static inline void TmThreadsCleanDecodePQ(PacketQueueNoLock *pq) { while (1) {