diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index af16eb5792b2..a95aa471365c 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -44,6 +44,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha3...master[Check the HEAD d *Packetbeat* - Add missing nil-check to memcached GapInStream handler. {issue}1162[1162] - Fix NFSv4 Operation returning the first found first-class operation available in compound requests. {pull}1821[1821] +- Fix TCP overlapping segments not being handled correctly. {pull}1898[1898] *Topbeat* diff --git a/packetbeat/protos/tcp/tcp.go b/packetbeat/protos/tcp/tcp.go index 8e7bbf76ab2d..033ab0683778 100644 --- a/packetbeat/protos/tcp/tcp.go +++ b/packetbeat/protos/tcp/tcp.go @@ -119,28 +119,36 @@ func (tcp *Tcp) Process(id *flows.FlowID, tcphdr *layers.TCP, pkt *protos.Packet // protocol modules. defer logp.Recover("Process tcp exception") - debugf("tcp flow id: %p", id) - stream, created := tcp.getStream(pkt) if stream.conn == nil { return } + conn := stream.conn if id != nil { id.AddConnectionID(uint64(stream.conn.id)) } - conn := stream.conn - tcp_start_seq := tcphdr.Seq - tcp_seq := tcp_start_seq + uint32(len(pkt.Payload)) + if isDebug { + debugf("tcp flow id: %p", id) + } + + if len(pkt.Payload) == 0 && !tcphdr.FIN { + // return early if packet is not interesting. Still need to find/create + // stream first in order to update the TCP stream timer + return + } + + tcpStartSeq := tcphdr.Seq + tcpSeq := tcpStartSeq + uint32(len(pkt.Payload)) lastSeq := conn.lastSeq[stream.dir] if isDebug { debugf("pkt.start_seq=%v pkt.last_seq=%v stream.last_seq=%v (len=%d)", - tcp_start_seq, tcp_seq, lastSeq, len(pkt.Payload)) + tcpStartSeq, tcpSeq, lastSeq, len(pkt.Payload)) } - if len(pkt.Payload) > 0 && lastSeq != 0 { - if tcpSeqBeforeEq(tcp_seq, lastSeq) { + if lastSeq != 0 { + if tcpSeqBeforeEq(tcpSeq, lastSeq) { if isDebug { debugf("Ignoring retransmitted segment. pkt.seq=%v len=%v stream.seq=%v", tcphdr.Seq, len(pkt.Payload), lastSeq) @@ -148,10 +156,10 @@ func (tcp *Tcp) Process(id *flows.FlowID, tcphdr *layers.TCP, pkt *protos.Packet return } - if tcpSeqBefore(lastSeq, tcp_start_seq) { + if tcpSeqBefore(lastSeq, tcpStartSeq) { if !created { - gap := int(tcp_start_seq - lastSeq) - logp.Warn("Gap in tcp stream. last_seq: %d, seq: %d, gap: %d", lastSeq, tcp_start_seq, gap) + gap := int(tcpStartSeq - lastSeq) + logp.Warn("Gap in tcp stream. last_seq: %d, seq: %d, gap: %d", lastSeq, tcpStartSeq, gap) drop := stream.gapInStream(gap) if drop { if isDebug { @@ -165,9 +173,25 @@ func (tcp *Tcp) Process(id *flows.FlowID, tcphdr *layers.TCP, pkt *protos.Packet } } } + + // cut away packet overlap + if tcpSeqBefore(tcpStartSeq, lastSeq) { + delta := lastSeq - tcpStartSeq + + // if 'overlap' already covered by previous packet -> return + if int(delta) >= len(pkt.Payload) { + return + } + + pkt.Payload = pkt.Payload[delta:] + tcphdr.Seq += delta + if len(pkt.Payload) == 0 && !tcphdr.FIN { + return + } + } } - conn.lastSeq[stream.dir] = tcp_seq + conn.lastSeq[stream.dir] = tcpSeq stream.addPacket(pkt, tcphdr) } diff --git a/packetbeat/protos/tcp/tcp_test.go b/packetbeat/protos/tcp/tcp_test.go index d396be435575..c173bdcfbcfc 100644 --- a/packetbeat/protos/tcp/tcp_test.go +++ b/packetbeat/protos/tcp/tcp_test.go @@ -213,6 +213,59 @@ func TestTCSeqPayload(t *testing.T) { 10, []byte{5, 6, 7, 8}, }, + {"ACK same sequence number", + []segment{ + {0, []byte{1, 2}}, + {2, nil}, + {2, []byte{3, 4}}, + {4, []byte{5, 6}}, + }, + 0, + []byte{1, 2, 3, 4, 5, 6}, + }, + {"ACK same sequence number 2", + []segment{ + {0, nil}, + {1, nil}, + {1, []byte{1, 2}}, + {3, nil}, + {3, []byte{3, 4}}, + {5, []byte{5, 6}}, + {7, []byte{7, 8}}, + {9, nil}, + }, + 0, + []byte{1, 2, 3, 4, 5, 6, 7, 8}, + }, + {"Overlap, first segment bigger", + []segment{ + {0, []byte{1, 2}}, + {2, []byte{3, 4}}, + {2, []byte{3}}, + {4, []byte{5, 6}}, + }, + 0, + []byte{1, 2, 3, 4, 5, 6}, + }, + {"Overlap, second segment bigger", + []segment{ + {0, []byte{1, 2}}, + {2, []byte{3}}, + {2, []byte{3, 4}}, + {4, []byte{5, 6}}, + }, + 0, + []byte{1, 2, 3, 4, 5, 6}, + }, + {"Overlap, covered", + []segment{ + {0, []byte{1, 2, 3, 4}}, + {1, []byte{2, 3}}, + {4, []byte{5, 6}}, + }, + 0, + []byte{1, 2, 3, 4, 5, 6}, + }, } for i, test := range tests { @@ -248,6 +301,10 @@ func TestTCSeqPayload(t *testing.T) { } assert.Equal(t, test.expectedGaps, gap) + if len(test.expectedState) != len(state) { + assert.Equal(t, len(test.expectedState), len(state)) + continue + } assert.Equal(t, test.expectedState, state) } } @@ -309,7 +366,7 @@ func makeCollectPayload( priv protos.ProtocolData, ) protos.ProtocolData { if resetOnNil && priv == nil { - *state = nil + (*state) = nil } *state = append(*state, p.Payload...) return *state