Skip to content
This repository has been archived by the owner on Nov 5, 2021. It is now read-only.

Commit

Permalink
Ping: Capture and use kernel timestamp for latency computation. (#546)
Browse files Browse the repository at this point in the history
Use SO_TIMESTAMP socket option (supported only on Unix systems) to capture kernel timestamp for incoming packets. This will make ping probe's latency measurement much more accurate.

Impact of this on intra-region latency measurement:
#540 (comment)

Reference:
#531
#540

I tested this change on MacOS and Linux. Logic is not changing for Windows.

PiperOrigin-RevId: 350035725
  • Loading branch information
manugarg authored Jan 6, 2021
1 parent f16a0a0 commit e759f8c
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 39 deletions.
31 changes: 18 additions & 13 deletions probes/ping/icmpconn.go → probes/ping/icmpconn_nonunix.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017 Google Inc.
// Copyright 2020 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,37 +12,42 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !aix,!darwin,!dragonfly,!freebsd,!linux,!netbsd,!openbsd,!solaris

package ping

import (
"net"
"strconv"
"time"

"golang.org/x/net/icmp"
)

// icmpConn is an interface wrapper for *icmp.PacketConn to allow testing.
type icmpConn interface {
read(buf []byte) (n int, peer net.Addr, err error)
write(buf []byte, peer net.Addr) (int, error)
setReadDeadline(deadline time.Time)
close()
}

type icmpPacketConn struct {
c *icmp.PacketConn
}

func newICMPConn(proto, source string) (icmpConn, error) {
c, err := icmp.ListenPacket(proto, source)
func newICMPConn(sourceIP net.IP, ipVer int, datagramSocket bool) (icmpConn, error) {
network := map[int]string{
4: "ip4:icmp",
6: "ip6:ipv6-icmp",
}[ipVer]

if datagramSocket {
network = "udp" + strconv.Itoa(ipVer)
}

c, err := icmp.ListenPacket(network, sourceIP.String())
if err != nil {
return nil, err
}
return &icmpPacketConn{c}, nil
}

func (ipc *icmpPacketConn) read(buf []byte) (int, net.Addr, error) {
return ipc.c.ReadFrom(buf)
func (ipc *icmpPacketConn) read(buf []byte) (int, net.Addr, time.Time, error) {
n, addr, err := ipc.c.ReadFrom(buf)
return n, addr, time.Now(), err
}

func (ipc *icmpPacketConn) write(buf []byte, peer net.Addr) (int, error) {
Expand Down
209 changes: 209 additions & 0 deletions probes/ping/icmpconn_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Copyright 2020 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build aix darwin dragonfly freebsd linux netbsd openbsd solaris

package ping

import (
"bytes"
"encoding/binary"
"net"
"os"
"runtime"
"syscall"
"time"
"unsafe"
)

// NativeEndian is the machine native endian implementation of ByteOrder.
var NativeEndian binary.ByteOrder

const (
protocolIP = 0
)

func sockaddr(sourceIP net.IP, ipVer int) (syscall.Sockaddr, error) {
a := &net.IPAddr{IP: sourceIP}

// If sourceIP is unspecified, we bind to the 0 IP address (all).
if sourceIP == nil {
a.IP = map[int]net.IP{4: net.IPv4zero, 6: net.IPv6unspecified}[ipVer]
}

switch ipVer {
case 4:
sa := &syscall.SockaddrInet4{}
copy(sa.Addr[:], a.IP)
return sa, nil
case 6:
sa := &syscall.SockaddrInet6{}
copy(sa.Addr[:], a.IP)
return sa, nil
default:
return nil, net.InvalidAddrError("unexpected family")
}
}

// listenPacket listens for incoming ICMP packets addressed to sourceIP.
// We need to write our own listenPacket instead of using "net.ListenPacket"
// for the following reasons:
// 1. ListenPacket doesn't support ICMP for SOCK_DGRAM sockets. You create
// datagram sockets by specifying network as "udp", but UDP new connection
// implementation ignores the protocol field entirely.
// 2. ListenPacket doesn't support setting socket options (we need
// SO_TIMESTAMP) in a straightforward way.
func listenPacket(sourceIP net.IP, ipVer int, datagramSocket bool) (*icmpPacketConn, error) {
var family, proto int

switch ipVer {
case 4:
family, proto = syscall.AF_INET, protocolICMP
case 6:
family, proto = syscall.AF_INET6, protocolIPv6ICMP
}

sockType := syscall.SOCK_RAW
if datagramSocket {
sockType = syscall.SOCK_DGRAM
}

s, err := syscall.Socket(family, sockType, proto)
if err != nil {
return nil, os.NewSyscallError("socket", err)
}

if runtime.GOOS == "darwin" && family == syscall.AF_INET {
// 0x17 = IP_STRIPHDR -- this is required for darwin IPv4.
if err := syscall.SetsockoptInt(s, protocolIP, 0x17, 1); err != nil {
syscall.Close(s)
return nil, os.NewSyscallError("setsockopt", err)
}
}

// Set socket option to receive kernel's timestamp from each packet.
// Ref: https://man7.org/linux/man-pages/man7/socket.7.html (SO_TIMESTAMP)
if err := syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_TIMESTAMP, 1); err != nil {
syscall.Close(s)
return nil, os.NewSyscallError("setsockopt", err)
}

sa, err := sockaddr(sourceIP, ipVer)
if err != nil {
syscall.Close(s)
return nil, err
}
if err := syscall.Bind(s, sa); err != nil {
syscall.Close(s)
return nil, os.NewSyscallError("bind", err)
}

// FilePacketConn is you get a PacketConn from a socket descriptor.
// Behind the scene, FilePacketConn creates either an IPConn or UDPConn,
// based on socket's local address (it gets that from the fd).
f := os.NewFile(uintptr(s), "icmp")
c, cerr := net.FilePacketConn(f)
f.Close()

if cerr != nil {
syscall.Close(s)
return nil, cerr
}

ipc := &icmpPacketConn{c: c}
ipc.ipConn, _ = c.(*net.IPConn)
ipc.udpConn, _ = c.(*net.UDPConn)

return ipc, nil
}

type icmpPacketConn struct {
c net.PacketConn

// We use ipConn and udpConn for reading OOB data from the connection.
ipConn *net.IPConn
udpConn *net.UDPConn
}

func (ipc *icmpPacketConn) read(buf []byte) (n int, addr net.Addr, recvTime time.Time, err error) {
// We need to convert to IPConn/UDPConn so that we can read out-of-band data
// using ReadMsg<IP,UDP> functions. PacketConn interface doesn't have method
// that exposes OOB data.
oob := make([]byte, 64)
var oobn int
if ipc.ipConn != nil {
n, oobn, _, addr, err = ipc.ipConn.ReadMsgIP(buf, oob)
}
if ipc.udpConn != nil {
n, oobn, _, addr, err = ipc.udpConn.ReadMsgUDP(buf, oob)
}

if err != nil {
return
}

cmsgs, err := syscall.ParseSocketControlMessage(oob[:oobn])

for _, m := range cmsgs {
// We are interested only in socket-level control messages
// (syscall.SOL_SOCKET)
if m.Header.Level != syscall.SOL_SOCKET {
continue
}

// SCM_TIMESTAMP is the type of the timestamp control message.
// Note that syscall.SO_TIMESTAMP == syscall.SCM_TIMESTAMP for linux, but
// that doesn't have to be true for other operating systems, e.g. Mac OS X.
if m.Header.Type == syscall.SCM_TIMESTAMP {
var tv syscall.Timeval
binary.Read(bytes.NewReader(m.Data), NativeEndian, &tv)
recvTime = time.Unix(tv.Unix()) // syscall.Timeval -> time.Time
}
}

return
}

// write writes the ICMP message b to dst.
func (ipc *icmpPacketConn) write(buf []byte, dst net.Addr) (int, error) {
return ipc.c.WriteTo(buf, dst)
}

// Close closes the endpoint.
func (ipc *icmpPacketConn) close() {
ipc.c.Close()
}

// setReadDeadline sets the read deadline associated with the
// endpoint.
func (ipc *icmpPacketConn) setReadDeadline(t time.Time) {
ipc.c.SetReadDeadline(t)
}

func newICMPConn(sourceIP net.IP, ipVer int, datagramSocket bool) (*icmpPacketConn, error) {
return listenPacket(sourceIP, ipVer, datagramSocket)
}

// Find out native endianness when this packages is loaded.
// This code is based on:
// https://github.com/golang/net/blob/master/internal/socket/sys.go
func init() {
i := uint32(1)
b := (*[4]byte)(unsafe.Pointer(&i))
if b[0] == 1 {
NativeEndian = binary.LittleEndian
} else {
NativeEndian = binary.BigEndian
}
}
42 changes: 20 additions & 22 deletions probes/ping/ping.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017-2019 Google Inc.
// Copyright 2017-2020 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -72,6 +72,14 @@ type result struct {
validationFailure *metrics.Map
}

// icmpConn is an interface wrapper for *icmp.PacketConn to allow testing.
type icmpConn interface {
read(buf []byte) (n int, peer net.Addr, recvTime time.Time, err error)
write(buf []byte, peer net.Addr) (int, error)
setReadDeadline(deadline time.Time)
close()
}

// Probe implements a ping probe type that sends ICMP ping packets to the targets and reports
// back statistics on packets sent, received and the rtt.
type Probe struct {
Expand All @@ -81,7 +89,6 @@ type Probe struct {
l *logger.Logger

// book-keeping params
source string
ipVer int
targets []endpoint.Endpoint
results map[string]*result
Expand Down Expand Up @@ -143,10 +150,6 @@ func (p *Probe) initInternal() error {
p.target2addr = make(map[string]net.Addr)
p.useDatagramSocket = p.c.GetUseDatagramSocket()

if p.opts.SourceIP != nil {
p.source = p.opts.SourceIP.String()
}

// Update targets run peiodically as well.
p.updateTargets()

Expand Down Expand Up @@ -182,19 +185,8 @@ func (p *Probe) configureIntegrityCheck() error {
}

func (p *Probe) listen() error {
netProto := "ip4:icmp"
if p.ipVer == 6 {
netProto = "ip6:ipv6-icmp"
}

if p.useDatagramSocket {
// udp network represents datagram ICMP sockets. The name is a bit
// misleading, but that's what Go's icmp package uses.
netProto = fmt.Sprintf("udp%d", p.ipVer)
}

var err error
p.conn, err = newICMPConn(netProto, p.source)
p.conn, err = newICMPConn(p.opts.SourceIP, p.ipVer, p.useDatagramSocket)
return err
}

Expand Down Expand Up @@ -324,7 +316,7 @@ func (p *Probe) recvPackets(runID uint16, tracker chan bool) {
}

// Read packet from the socket
pktLen, peer, err := p.conn.read(pktbuf)
pktLen, peer, recvTime, err := p.conn.read(pktbuf)

if err != nil {
p.l.Warning(err.Error())
Expand All @@ -339,8 +331,14 @@ func (p *Probe) recvPackets(runID uint16, tracker chan bool) {
continue
}

// Record fetch time before further processing.
fetchTime := time.Now()
// recvTime should never be zero:
// -- On Unix systems, recvTime comes from the sockets.
// -- On Non-Unix systems, read() call returns recvTime based on when
// packet was received by cloudprober.
if recvTime.IsZero() {
p.l.Info("didn't get fetch time from the connection (SO_TIMESTAMP), using current time")
recvTime = time.Now()
}

var ip net.IP
if p.useDatagramSocket {
Expand All @@ -360,7 +358,7 @@ func (p *Probe) recvPackets(runID uint16, tracker chan bool) {
}

var pkt = rcvdPkt{
tsUnix: fetchTime.UnixNano(),
tsUnix: recvTime.UnixNano(),
target: target,
// ICMP packet body starts from the 5th byte
id: binary.BigEndian.Uint16(pktbuf[4:6]),
Expand Down
8 changes: 4 additions & 4 deletions probes/ping/ping_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017-2019 Google Inc.
// Copyright 2017-2020 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -92,7 +92,7 @@ func (tic *testICMPConn) setFlipLastByte() {
tic.flipLastByte = true
}

func (tic *testICMPConn) read(buf []byte) (int, net.Addr, error) {
func (tic *testICMPConn) read(buf []byte) (int, net.Addr, time.Time, error) {
// We create per-target select cases, with each target's select-case
// pointing to that target's sentPackets channel.
var cases []reflect.SelectCase
Expand All @@ -105,7 +105,7 @@ func (tic *testICMPConn) read(buf []byte) (int, net.Addr, error) {
// Select over the select cases.
chosen, value, ok := reflect.Select(cases)
if !ok {
return 0, nil, fmt.Errorf("nothing to read")
return 0, nil, time.Now(), fmt.Errorf("nothing to read")
}

pkt := value.Bytes()
Expand All @@ -128,7 +128,7 @@ func (tic *testICMPConn) read(buf []byte) (int, net.Addr, error) {
if tic.c.GetUseDatagramSocket() {
peer = &net.UDPAddr{IP: peerIP}
}
return len(pkt), peer, nil
return len(pkt), peer, time.Now(), nil
}

// write simply queues packets into the sentPackets channel. These packets are
Expand Down

0 comments on commit e759f8c

Please sign in to comment.