Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pveentjer committed Feb 11, 2024
1 parent 438e2c9 commit 897840a
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 10 deletions.
1 change: 1 addition & 0 deletions aeron-client/src/main/c/uri/aeron_uri.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ aeron_uri_params_t;
#define AERON_URI_ATS_KEY "ats"
#define AERON_URI_SOCKET_SNDBUF_KEY "so-sndbuf"
#define AERON_URI_SOCKET_RCVBUF_KEY "so-rcvbuf"
#define AERON_URI_SOCKET_TOS_KEY "so-tos"
#define AERON_URI_RECEIVER_WINDOW_KEY "rcv-wnd"
#define AERON_URI_MEDIA_RCV_TIMESTAMP_OFFSET_KEY "media-rcv-ts-offset"
#define AERON_URI_CHANNEL_RCV_TIMESTAMP_OFFSET_KEY "channel-rcv-ts-offset"
Expand Down
5 changes: 5 additions & 0 deletions aeron-client/src/main/java/io/aeron/CommonContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,11 @@ public static InferableBoolean parse(final String value)
*/
public static final String SOCKET_RCVBUF_PARAM_NAME = "so-rcvbuf";

/**
* Parameter name for the underlying OS socket type of service (IP_TOS).
*/
public static final String SOCKET_TOS_PARAM_NAME = "so-tos";

/**
* Parameter name for the congestion control's initial receiver window length.
*/
Expand Down
19 changes: 19 additions & 0 deletions aeron-driver/src/main/java/io/aeron/driver/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,19 @@ public final class Configuration
*/
public static final String SOCKET_MULTICAST_TTL_PROP_NAME = "aeron.socket.multicast.ttl";

/**
* Property name for IP_TOS setting on UDP sockets.
*/
public static final String SOCKET_TOS_PROP_NAME = "aeron.socket.tos";

/**
* Multicast TTL value, 0 means use OS default.
*/
public static final int SOCKET_MULTICAST_TTL_DEFAULT = 0;

// IP_TOS value, -1 means use OS defaults.
public static final int SOCKET_TOS_DEFAULT = -1;

/**
* Property name for linger timeout after draining on {@link Publication}s so they can respond to NAKs.
*/
Expand Down Expand Up @@ -1274,6 +1282,17 @@ public static int socketMulticastTtl()
return getInteger(SOCKET_MULTICAST_TTL_PROP_NAME, SOCKET_MULTICAST_TTL_DEFAULT);
}

/**
* IP_TOS setting to UDP sockets.
*
* @return IP_TOS setting on UDP sockets.
* @see #SOCKET_TOS_PROP_NAME
*/
public static int socketToS()
{
return getInteger(SOCKET_TOS_PROP_NAME, SOCKET_TOS_DEFAULT);
}

/**
* Page size in bytes to align all files to. The file system must support the requested size.
*
Expand Down
26 changes: 26 additions & 0 deletions aeron-driver/src/main/java/io/aeron/driver/MediaDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ public static final class Context extends CommonContext
private int socketSndbufLength = Configuration.socketSndbufLength();
private int socketRcvbufLength = Configuration.socketRcvbufLength();
private int socketMulticastTtl = Configuration.socketMulticastTtl();
private int socketToS = Configuration.socketToS();
private int mtuLength = Configuration.mtuLength();
private int ipcMtuLength = Configuration.ipcMtuLength();
private int filePageSize = Configuration.filePageSize();
Expand Down Expand Up @@ -1788,6 +1789,30 @@ public Context socketRcvbufLength(final int socketRcvbufLength)
return this;
}

/**
* The socket ToS which is the OS IP_TOS.
*
* @return the socket ToS.
* @see Configuration#SOCKET_TOS_PROP_NAME
*/
public int socketToS()
{
return socketToS;
}

/**
* The socket ToS which is the OS IP_TOS.
*
* @param socketToS which is the OS IP_TOS.
* @return this for a fluent API.
* @see Configuration#SOCKET_TOS_PROP_NAME
*/
public Context socketToS(final int socketToS)
{
this.socketToS = socketToS;
return this;
}

/**
* The TTL value to be used for multicast sockets.
*
Expand Down Expand Up @@ -4040,6 +4065,7 @@ public String toString()
"\n socketSndbufLength=" + socketSndbufLength +
"\n socketRcvbufLength=" + socketRcvbufLength +
"\n socketMulticastTtl=" + socketMulticastTtl +
"\n socketToS=" + socketToS +
"\n mtuLength=" + mtuLength +
"\n ipcMtuLength=" + ipcMtuLength +
"\n filePageSize=" + filePageSize +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ abstract class ReceiveDestinationTransportLhsPadding extends UdpChannelTransport
final InetSocketAddress connectAddress,
final MediaDriver.Context context,
final int socketRcvbufLength,
final int socketSndbufLength)
final int socketSndbufLength, final int socketToS)
{
super(
udpChannel, endPointAddress, bindAddress, connectAddress, context.receiverPortManager(),
context, socketRcvbufLength, socketSndbufLength);
context, socketRcvbufLength, socketSndbufLength, socketToS);
}
}

Expand All @@ -58,10 +58,13 @@ abstract class ReceiveDestinationTransportHotFields extends ReceiveDestinationTr
final InetSocketAddress connectAddress,
final MediaDriver.Context context,
final int socketRcvbufLength,
final int socketSndbufLength)
final int socketSndbufLength,
final int socketToS)
{
super(
udpChannel, endPointAddress, bindAddress, connectAddress, context, socketRcvbufLength, socketSndbufLength);
udpChannel, endPointAddress, bindAddress, connectAddress, context, socketRcvbufLength,
socketSndbufLength, socketToS);

}
}

Expand All @@ -79,10 +82,12 @@ abstract class ReceiveDestinationTransportRhsPadding extends ReceiveDestinationT
final InetSocketAddress connectAddress,
final MediaDriver.Context context,
final int socketRcvbufLength,
final int socketSndbufLength)
final int socketSndbufLength,
final int socketToS)
{
super(
udpChannel, endPointAddress, bindAddress, connectAddress, context, socketRcvbufLength, socketSndbufLength);
udpChannel, endPointAddress, bindAddress, connectAddress, context, socketRcvbufLength,
socketSndbufLength, socketToS);
}
}

Expand Down Expand Up @@ -115,7 +120,8 @@ public ReceiveDestinationTransport(
null,
context,
receiveChannelEndpoint.socketRcvbufLength(),
receiveChannelEndpoint.socketSndbufLength());
receiveChannelEndpoint.socketSndbufLength(),
receiveChannelEndpoint.socketToS());

this.timeOfLastActivityNs = context.receiverCachedNanoClock().nanoTime();
this.currentControlAddress = udpChannel.hasExplicitControl() ? udpChannel.localControl() : null;
Expand Down
62 changes: 61 additions & 1 deletion aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import static io.aeron.CommonContext.*;
import static io.aeron.driver.Configuration.SOCKET_TOS_DEFAULT;
import static io.aeron.driver.media.NetworkUtil.*;
import static java.lang.System.lineSeparator;
import static java.net.InetAddress.getByAddress;
Expand Down Expand Up @@ -58,6 +59,7 @@ public final class UdpChannel
private final int multicastTtl;
private final int socketRcvbufLength;
private final int socketSndbufLength;
private final int socketToS;
private final int receiverWindowLength;
private final long tag;
private final InetSocketAddress remoteData;
Expand Down Expand Up @@ -94,6 +96,7 @@ private UdpChannel(final Context context)
channelUri = context.channelUri;
socketRcvbufLength = context.socketRcvbufLength;
socketSndbufLength = context.socketSndbufLength;
socketToS = context.socketToS;
receiverWindowLength = context.receiverWindowLength;
channelReceiveTimestampOffset = context.channelReceiveTimestampOffset;
channelSendTimestampOffset = context.channelSendTimestampOffset;
Expand Down Expand Up @@ -162,6 +165,8 @@ public static UdpChannel parse(
final boolean hasNoDistinguishingCharacteristic =
null == endpointAddress && null == controlAddress && null == tagIdStr;

final int socketToS = parseSocketToS(channelUri);

if (ControlMode.DYNAMIC == controlMode && null == controlAddress)
{
throw new IllegalArgumentException(
Expand All @@ -186,6 +191,7 @@ public static UdpChannel parse(
throw new UnknownHostException("could not resolve control address: " + controlAddress);
}


boolean hasExplicitEndpoint = true;
if (null == endpointAddress)
{
Expand All @@ -206,7 +212,8 @@ public static UdpChannel parse(
.hasNoDistinguishingCharacteristic(hasNoDistinguishingCharacteristic)
.socketRcvbufLength(socketRcvbufLength)
.socketSndbufLength(socketSndbufLength)
.receiverWindowLength(receiverWindowLength);
.receiverWindowLength(receiverWindowLength)
.socketToS(socketToS);

if (null != tagIdStr)
{
Expand Down Expand Up @@ -367,6 +374,31 @@ public static int parseBufferLength(final ChannelUri channelUri, final String pa
return socketBufferLength;
}

/**
* Parses the IP_TOS for a given URI paramName. The range of the IP_TOS is 0 <= x <= 255.
* <p/>
* If the parameter isn't set, <code>SOCKET_TOS_DEFAULT</code> is returned.
*
* @param channelUri to get the value from.
* @return the parsed IP_TOS.
*/
public static int parseSocketToS(final ChannelUri channelUri)
{
int tos = SOCKET_TOS_DEFAULT;

final String paramValue = channelUri.get(SOCKET_TOS_PARAM_NAME);
if (null != paramValue)
{
tos = Integer.parseInt(paramValue);
if (tos < 0 || tos > 255)
{
throw new IllegalArgumentException("Invalid " + SOCKET_TOS_PARAM_NAME + " value: " + paramValue);
}
}

return tos;
}

/**
* Parse the control mode from the channel URI. If the value is null or unknown then {@link ControlMode#NONE} will
* be used.
Expand Down Expand Up @@ -709,6 +741,27 @@ public int socketSndbufLengthOrDefault(final int defaultValue)
return 0 != socketSndbufLength ? socketSndbufLength : defaultValue;
}

/**
* Gets the socket type of service (IP_TOS).
*
* @return socket type of service or -1 if not specified.
*/
public int socketToS()
{
return socketToS;
}

/**
* Get the socket type of service (IP_TOS).
*
* @param defaultValue to be used if the UdpChannel's value is 0 (unspecified).
* @return socket type of service or defaultValue if not specified.
*/
public int socketToSOrDefault(final int defaultValue)
{
return SOCKET_TOS_DEFAULT != socketToS ? socketToS : defaultValue;
}

/**
* Get the receiver window length used as the initial window length for congestion control.
*
Expand Down Expand Up @@ -1070,6 +1123,7 @@ static final class Context
boolean hasNoDistinguishingCharacteristic = false;
int socketRcvbufLength = 0;
int socketSndbufLength = 0;
int socketToS;
int receiverWindowLength = 0;
int multicastTtl;
long tagId;
Expand Down Expand Up @@ -1209,6 +1263,12 @@ Context socketSndbufLength(final int socketSndbufLength)
return this;
}

Context socketToS(final int socketToS)
{
this.socketToS = socketToS;
return this;
}

Context receiverWindowLength(final int receiverWindowLength)
{
this.receiverWindowLength = receiverWindowLength;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;

import static io.aeron.driver.Configuration.SOCKET_TOS_DEFAULT;
import static io.aeron.logbuffer.FrameDescriptor.frameVersion;
import static java.net.StandardSocketOptions.SO_RCVBUF;
import static java.net.StandardSocketOptions.SO_SNDBUF;
Expand Down Expand Up @@ -73,6 +74,8 @@ public abstract class UdpChannelTransport implements AutoCloseable
*/
protected InetSocketAddress connectAddress;



/**
* To be used when polling the transport.
*/
Expand All @@ -91,6 +94,7 @@ public abstract class UdpChannelTransport implements AutoCloseable
private int multicastTtl = 0;
private final int socketSndbufLength;
private final int socketRcvbufLength;
private final int socketToS;

/**
* Construct transport for a given channel.
Expand All @@ -103,6 +107,7 @@ public abstract class UdpChannelTransport implements AutoCloseable
* @param portManager for port binding.
* @param socketRcvbufLength set SO_RCVBUF for socket, 0 for OS default.
* @param socketSndbufLength set SO_SNDBUF for socket, 0 for OS default.
* @param socketToS set IP_TOS for socket, -1 for OS default.
*/
protected UdpChannelTransport(
final UdpChannel udpChannel,
Expand All @@ -112,7 +117,8 @@ protected UdpChannelTransport(
final PortManager portManager,
final MediaDriver.Context context,
final int socketRcvbufLength,
final int socketSndbufLength)
final int socketSndbufLength,
final int socketToS)
{
this.context = context;
this.udpChannel = udpChannel;
Expand All @@ -124,6 +130,7 @@ protected UdpChannelTransport(
this.invalidPackets = context.systemCounters().get(SystemCounterDescriptor.INVALID_PACKETS);
this.socketRcvbufLength = socketRcvbufLength;
this.socketSndbufLength = socketSndbufLength;
this.socketToS = socketToS;
}

/**
Expand Down Expand Up @@ -152,7 +159,8 @@ protected UdpChannelTransport(
portManager,
context,
udpChannel.socketRcvbufLengthOrDefault(context.socketRcvbufLength()),
udpChannel.socketSndbufLengthOrDefault(context.socketSndbufLength()));
udpChannel.socketSndbufLengthOrDefault(context.socketSndbufLength()),
udpChannel.socketToSOrDefault(context.socketToS()));
}

/**
Expand Down Expand Up @@ -225,6 +233,11 @@ else if (context.socketMulticastTtl() != 0)
receiveDatagramChannel.setOption(SO_RCVBUF, socketRcvbufLength());
}

if (SOCKET_TOS_DEFAULT != socketToS)
{
receiveDatagramChannel.socket().setTrafficClass(socketToS);
}

sendDatagramChannel.configureBlocking(false);
receiveDatagramChannel.configureBlocking(false);
}
Expand Down Expand Up @@ -500,4 +513,15 @@ public int socketRcvbufLength()
{
return socketRcvbufLength;
}


/**
* Gets the configured OS type of service (IP_TOS) for the endpoint's socket.
*
* @return OS socket type of service or -1 if using OS default.
*/
public int socketToS()
{
return socketToS;
}
}
10 changes: 10 additions & 0 deletions aeron-driver/src/test/java/io/aeron/driver/UdpChannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,16 @@ void shouldParseReceiverWindow()
assertEquals(8192, udpChannelWithBufferSizes.receiverWindowLength());
}

@Test
void shouldParseToS()
{
final UdpChannel udpChannelWithToS = UdpChannel.parse("aeron:udp?endpoint=127.0.0.1:9999|so-tos=10");
assertEquals(10, udpChannelWithToS.socketToS());

final UdpChannel udpChannelWithoutToS = UdpChannel.parse("aeron:udp?endpoint=127.0.0.1:9999");
assertEquals(-1, udpChannelWithoutToS.socketToS());
}

@Test
void shouldParseChannelSendAndReceiveTimestampOffsets()
{
Expand Down
Loading

0 comments on commit 897840a

Please sign in to comment.