Skip to content

Commit

Permalink
Fix tcp segments overlaps
Browse files Browse the repository at this point in the history
  • Loading branch information
urso committed Jun 27, 2016
1 parent 6501e17 commit f42aa72
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha3...master[Check the HEAD d
*Packetbeat*
- Add missing nil-check to memcached GapInStream handler. {issue}1162[1162]
- Fix NFSv4 Operation returning the first found first-class operation available in compound requests. {pull}1821[1821]
- Fix TCP overlapping segments not being handled correctly. {pull}1898[1898]

*Topbeat*

Expand Down
48 changes: 36 additions & 12 deletions packetbeat/protos/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,39 +119,47 @@ func (tcp *Tcp) Process(id *flows.FlowID, tcphdr *layers.TCP, pkt *protos.Packet
// protocol modules.
defer logp.Recover("Process tcp exception")

debugf("tcp flow id: %p", id)

stream, created := tcp.getStream(pkt)
if stream.conn == nil {
return
}

conn := stream.conn
if id != nil {
id.AddConnectionID(uint64(stream.conn.id))
}
conn := stream.conn

tcp_start_seq := tcphdr.Seq
tcp_seq := tcp_start_seq + uint32(len(pkt.Payload))
if isDebug {
debugf("tcp flow id: %p", id)
}

if len(pkt.Payload) == 0 && !tcphdr.FIN {
// return early if packet is not interesting. Still need to find/create
// stream first in order to update the TCP stream timer
return
}

tcpStartSeq := tcphdr.Seq
tcpSeq := tcpStartSeq + uint32(len(pkt.Payload))
lastSeq := conn.lastSeq[stream.dir]
if isDebug {
debugf("pkt.start_seq=%v pkt.last_seq=%v stream.last_seq=%v (len=%d)",
tcp_start_seq, tcp_seq, lastSeq, len(pkt.Payload))
tcpStartSeq, tcpSeq, lastSeq, len(pkt.Payload))
}

if len(pkt.Payload) > 0 && lastSeq != 0 {
if tcpSeqBeforeEq(tcp_seq, lastSeq) {
if lastSeq != 0 {
if tcpSeqBeforeEq(tcpSeq, lastSeq) {
if isDebug {
debugf("Ignoring retransmitted segment. pkt.seq=%v len=%v stream.seq=%v",
tcphdr.Seq, len(pkt.Payload), lastSeq)
}
return
}

if tcpSeqBefore(lastSeq, tcp_start_seq) {
if tcpSeqBefore(lastSeq, tcpStartSeq) {
if !created {
gap := int(tcp_start_seq - lastSeq)
logp.Warn("Gap in tcp stream. last_seq: %d, seq: %d, gap: %d", lastSeq, tcp_start_seq, gap)
gap := int(tcpStartSeq - lastSeq)
logp.Warn("Gap in tcp stream. last_seq: %d, seq: %d, gap: %d", lastSeq, tcpStartSeq, gap)
drop := stream.gapInStream(gap)
if drop {
if isDebug {
Expand All @@ -165,9 +173,25 @@ func (tcp *Tcp) Process(id *flows.FlowID, tcphdr *layers.TCP, pkt *protos.Packet
}
}
}

// cut away packet overlap
if tcpSeqBefore(tcpStartSeq, lastSeq) {
delta := lastSeq - tcpStartSeq

// if 'overlap' already covered by previous packet -> return
if int(delta) >= len(pkt.Payload) {
return
}

pkt.Payload = pkt.Payload[delta:]
tcphdr.Seq += delta
if len(pkt.Payload) == 0 && !tcphdr.FIN {
return
}
}
}

conn.lastSeq[stream.dir] = tcp_seq
conn.lastSeq[stream.dir] = tcpSeq
stream.addPacket(pkt, tcphdr)
}

Expand Down
59 changes: 58 additions & 1 deletion packetbeat/protos/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,59 @@ func TestTCSeqPayload(t *testing.T) {
10,
[]byte{5, 6, 7, 8},
},
{"ACK same sequence number",
[]segment{
{0, []byte{1, 2}},
{2, nil},
{2, []byte{3, 4}},
{4, []byte{5, 6}},
},
0,
[]byte{1, 2, 3, 4, 5, 6},
},
{"ACK same sequence number 2",
[]segment{
{0, nil},
{1, nil},
{1, []byte{1, 2}},
{3, nil},
{3, []byte{3, 4}},
{5, []byte{5, 6}},
{7, []byte{7, 8}},
{9, nil},
},
0,
[]byte{1, 2, 3, 4, 5, 6, 7, 8},
},
{"Overlap, first segment bigger",
[]segment{
{0, []byte{1, 2}},
{2, []byte{3, 4}},
{2, []byte{3}},
{4, []byte{5, 6}},
},
0,
[]byte{1, 2, 3, 4, 5, 6},
},
{"Overlap, second segment bigger",
[]segment{
{0, []byte{1, 2}},
{2, []byte{3}},
{2, []byte{3, 4}},
{4, []byte{5, 6}},
},
0,
[]byte{1, 2, 3, 4, 5, 6},
},
{"Overlap, covered",
[]segment{
{0, []byte{1, 2, 3, 4}},
{1, []byte{2, 3}},
{4, []byte{5, 6}},
},
0,
[]byte{1, 2, 3, 4, 5, 6},
},
}

for i, test := range tests {
Expand Down Expand Up @@ -248,6 +301,10 @@ func TestTCSeqPayload(t *testing.T) {
}

assert.Equal(t, test.expectedGaps, gap)
if len(test.expectedState) != len(state) {
assert.Equal(t, len(test.expectedState), len(state))
continue
}
assert.Equal(t, test.expectedState, state)
}
}
Expand Down Expand Up @@ -309,7 +366,7 @@ func makeCollectPayload(
priv protos.ProtocolData,
) protos.ProtocolData {
if resetOnNil && priv == nil {
*state = nil
(*state) = nil
}
*state = append(*state, p.Payload...)
return *state
Expand Down

0 comments on commit f42aa72

Please sign in to comment.