Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add support for IP_TOS (Type Of Service) #1555

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions aeron-client/src/main/c/uri/aeron_uri.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ aeron_uri_params_t;
#define AERON_URI_ATS_KEY "ats"
#define AERON_URI_SOCKET_SNDBUF_KEY "so-sndbuf"
#define AERON_URI_SOCKET_RCVBUF_KEY "so-rcvbuf"
#define AERON_URI_SOCKET_TOS_KEY "so-tos"
#define AERON_URI_RECEIVER_WINDOW_KEY "rcv-wnd"
#define AERON_URI_MEDIA_RCV_TIMESTAMP_OFFSET_KEY "media-rcv-ts-offset"
#define AERON_URI_CHANNEL_RCV_TIMESTAMP_OFFSET_KEY "channel-rcv-ts-offset"
Expand Down
5 changes: 5 additions & 0 deletions aeron-client/src/main/java/io/aeron/CommonContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,11 @@ public static InferableBoolean parse(final String value)
*/
public static final String SOCKET_RCVBUF_PARAM_NAME = "so-rcvbuf";

/**
* Parameter name for the underlying OS socket type of service (IP_TOS).
*/
public static final String SOCKET_TOS_PARAM_NAME = "so-tos";

/**
* Parameter name for the congestion control's initial receiver window length.
*/
Expand Down
21 changes: 21 additions & 0 deletions aeron-driver/src/main/java/io/aeron/driver/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,21 @@ public final class Configuration
*/
public static final String SOCKET_MULTICAST_TTL_PROP_NAME = "aeron.socket.multicast.ttl";

/**
* Property name for IP_TOS setting on UDP sockets.
*/
public static final String SOCKET_TOS_PROP_NAME = "aeron.socket.tos";

/**
* Multicast TTL value, 0 means use OS default.
*/
public static final int SOCKET_MULTICAST_TTL_DEFAULT = 0;

/**
* IP_TOS value, -1 means use OS defaults.
*/
public static final int SOCKET_TOS_DEFAULT = Aeron.NULL_VALUE;

/**
* Property name for linger timeout after draining on {@link Publication}s so they can respond to NAKs.
*/
Expand Down Expand Up @@ -1297,6 +1307,17 @@ public static int socketMulticastTtl()
return getInteger(SOCKET_MULTICAST_TTL_PROP_NAME, SOCKET_MULTICAST_TTL_DEFAULT);
}

/**
* IP_TOS setting to UDP sockets.
*
* @return IP_TOS setting on UDP sockets.
* @see #SOCKET_TOS_PROP_NAME
*/
public static int socketToS()
{
return getInteger(SOCKET_TOS_PROP_NAME, SOCKET_TOS_DEFAULT);
}

/**
* Page size in bytes to align all files to. The file system must support the requested size.
*
Expand Down
26 changes: 26 additions & 0 deletions aeron-driver/src/main/java/io/aeron/driver/MediaDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ public static final class Context extends CommonContext
private int socketSndbufLength = Configuration.socketSndbufLength();
private int socketRcvbufLength = Configuration.socketRcvbufLength();
private int socketMulticastTtl = Configuration.socketMulticastTtl();
private int socketToS = Configuration.socketToS();
private int mtuLength = Configuration.mtuLength();
private int ipcMtuLength = Configuration.ipcMtuLength();
private int filePageSize = Configuration.filePageSize();
Expand Down Expand Up @@ -1817,6 +1818,30 @@ public Context socketRcvbufLength(final int socketRcvbufLength)
return this;
}

/**
* The socket ToS which is the OS IP_TOS.
*
* @return the socket ToS.
* @see Configuration#SOCKET_TOS_PROP_NAME
*/
public int socketToS()
{
return socketToS;
}

/**
* The socket ToS which is the OS IP_TOS.
*
* @param socketToS which is the OS IP_TOS.
* @return this for a fluent API.
* @see Configuration#SOCKET_TOS_PROP_NAME
*/
public Context socketToS(final int socketToS)
{
this.socketToS = socketToS;
return this;
}

/**
* The TTL value to be used for multicast sockets.
*
Expand Down Expand Up @@ -4070,6 +4095,7 @@ public String toString()
"\n socketSndbufLength=" + socketSndbufLength +
"\n socketRcvbufLength=" + socketRcvbufLength +
"\n socketMulticastTtl=" + socketMulticastTtl +
"\n socketToS=" + socketToS +
"\n mtuLength=" + mtuLength +
"\n ipcMtuLength=" + ipcMtuLength +
"\n filePageSize=" + filePageSize +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ abstract class ReceiveDestinationTransportLhsPadding extends UdpChannelTransport
final InetSocketAddress connectAddress,
final MediaDriver.Context context,
final int socketRcvbufLength,
final int socketSndbufLength)
final int socketSndbufLength,
final int socketToS)
{
super(
udpChannel, endPointAddress, bindAddress, connectAddress, context.receiverPortManager(),
context, socketRcvbufLength, socketSndbufLength);
context, socketRcvbufLength, socketSndbufLength, socketToS);
}
}

Expand All @@ -58,10 +59,13 @@ abstract class ReceiveDestinationTransportHotFields extends ReceiveDestinationTr
final InetSocketAddress connectAddress,
final MediaDriver.Context context,
final int socketRcvbufLength,
final int socketSndbufLength)
final int socketSndbufLength,
final int socketToS)
{
super(
udpChannel, endPointAddress, bindAddress, connectAddress, context, socketRcvbufLength, socketSndbufLength);
udpChannel, endPointAddress, bindAddress, connectAddress, context, socketRcvbufLength,
socketSndbufLength, socketToS);

}
}

Expand All @@ -79,10 +83,12 @@ abstract class ReceiveDestinationTransportRhsPadding extends ReceiveDestinationT
final InetSocketAddress connectAddress,
final MediaDriver.Context context,
final int socketRcvbufLength,
final int socketSndbufLength)
final int socketSndbufLength,
final int socketToS)
{
super(
udpChannel, endPointAddress, bindAddress, connectAddress, context, socketRcvbufLength, socketSndbufLength);
udpChannel, endPointAddress, bindAddress, connectAddress, context, socketRcvbufLength,
socketSndbufLength, socketToS);
}
}

Expand Down Expand Up @@ -115,7 +121,8 @@ public ReceiveDestinationTransport(
null,
context,
receiveChannelEndpoint.socketRcvbufLength(),
receiveChannelEndpoint.socketSndbufLength());
receiveChannelEndpoint.socketSndbufLength(),
receiveChannelEndpoint.socketToS());

this.timeOfLastActivityNs = context.receiverCachedNanoClock().nanoTime();
this.currentControlAddress = udpChannel.hasExplicitControl() ? udpChannel.localControl() : null;
Expand Down
59 changes: 59 additions & 0 deletions aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import static io.aeron.CommonContext.*;
import static io.aeron.driver.Configuration.SOCKET_TOS_DEFAULT;
import static io.aeron.driver.media.NetworkUtil.*;
import static java.lang.System.lineSeparator;
import static java.net.InetAddress.getByAddress;
Expand Down Expand Up @@ -58,6 +59,7 @@ public final class UdpChannel
private final int multicastTtl;
private final int socketRcvbufLength;
private final int socketSndbufLength;
private final int socketToS;
private final int receiverWindowLength;
private final long tag;
private final InetSocketAddress remoteData;
Expand Down Expand Up @@ -95,6 +97,7 @@ private UdpChannel(final Context context)
channelUri = context.channelUri;
socketRcvbufLength = context.socketRcvbufLength;
socketSndbufLength = context.socketSndbufLength;
socketToS = context.socketToS;
receiverWindowLength = context.receiverWindowLength;
channelReceiveTimestampOffset = context.channelReceiveTimestampOffset;
channelSendTimestampOffset = context.channelSendTimestampOffset;
Expand Down Expand Up @@ -164,6 +167,8 @@ public static UdpChannel parse(
final boolean hasNoDistinguishingCharacteristic =
null == endpointAddress && null == controlAddress && null == tagIdStr;

final int socketToS = parseSocketToS(channelUri);

if (ControlMode.DYNAMIC == controlMode && null == controlAddress)
{
throw new IllegalArgumentException(
Expand Down Expand Up @@ -208,6 +213,7 @@ public static UdpChannel parse(
.hasNoDistinguishingCharacteristic(hasNoDistinguishingCharacteristic)
.socketRcvbufLength(socketRcvbufLength)
.socketSndbufLength(socketSndbufLength)
.socketToS(socketToS)
.receiverWindowLength(receiverWindowLength)
.nakDelayNs(parseOptionalDurationNs(channelUri, NAK_DELAY_PARAM_NAME));

Expand Down Expand Up @@ -370,6 +376,31 @@ public static int parseBufferLength(final ChannelUri channelUri, final String pa
return socketBufferLength;
}

/**
* Parses the IP_TOS for a given URI paramName. The range of the IP_TOS is 0 <= x <= 255.
* <p>
* If the parameter isn't set, <code>SOCKET_TOS_DEFAULT</code> is returned.
*
* @param channelUri to get the value from.
* @return the parsed IP_TOS.
*/
public static int parseSocketToS(final ChannelUri channelUri)
{
int tos = SOCKET_TOS_DEFAULT;

final String paramValue = channelUri.get(SOCKET_TOS_PARAM_NAME);
if (null != paramValue)
{
tos = Integer.parseInt(paramValue);
if (tos < 0 || tos > 255)
{
throw new IllegalArgumentException("Invalid " + SOCKET_TOS_PARAM_NAME + " value: " + paramValue);
}
}

return tos;
}

/**
* Parse the control mode from the channel URI. If the value is null or unknown then {@link ControlMode#NONE} will
* be used.
Expand Down Expand Up @@ -730,6 +761,27 @@ public int socketSndbufLengthOrDefault(final int defaultValue)
return 0 != socketSndbufLength ? socketSndbufLength : defaultValue;
}

/**
* Gets the socket type of service (IP_TOS).
*
* @return socket type of service or -1 if not specified.
*/
public int socketToS()
{
return socketToS;
}

/**
* Get the socket type of service (IP_TOS).
*
* @param defaultValue to be used if the UdpChannel's value is 0 (unspecified).
* @return socket type of service or defaultValue if not specified.
*/
public int socketToSOrDefault(final int defaultValue)
{
return SOCKET_TOS_DEFAULT != socketToS ? socketToS : defaultValue;
}

/**
* Get the receiver window length used as the initial window length for congestion control.
*
Expand Down Expand Up @@ -1101,6 +1153,7 @@ static final class Context
boolean hasNoDistinguishingCharacteristic = false;
int socketRcvbufLength = 0;
int socketSndbufLength = 0;
int socketToS;
int receiverWindowLength = 0;
int multicastTtl;
long tagId;
Expand Down Expand Up @@ -1241,6 +1294,12 @@ Context socketSndbufLength(final int socketSndbufLength)
return this;
}

Context socketToS(final int socketToS)
{
this.socketToS = socketToS;
return this;
}

Context receiverWindowLength(final int receiverWindowLength)
{
this.receiverWindowLength = receiverWindowLength;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;

import static io.aeron.driver.Configuration.SOCKET_TOS_DEFAULT;
import static io.aeron.logbuffer.FrameDescriptor.frameVersion;
import static java.net.StandardSocketOptions.SO_RCVBUF;
import static java.net.StandardSocketOptions.SO_SNDBUF;
Expand Down Expand Up @@ -73,6 +74,8 @@ public abstract class UdpChannelTransport implements AutoCloseable
*/
protected InetSocketAddress connectAddress;



/**
* To be used when polling the transport.
*/
Expand All @@ -91,6 +94,7 @@ public abstract class UdpChannelTransport implements AutoCloseable
private int multicastTtl = 0;
private final int socketSndbufLength;
private final int socketRcvbufLength;
private final int socketToS;

/**
* Construct transport for a given channel.
Expand All @@ -103,6 +107,7 @@ public abstract class UdpChannelTransport implements AutoCloseable
* @param portManager for port binding.
* @param socketRcvbufLength set SO_RCVBUF for socket, 0 for OS default.
* @param socketSndbufLength set SO_SNDBUF for socket, 0 for OS default.
* @param socketToS set IP_TOS for socket, -1 for OS default.
*/
protected UdpChannelTransport(
final UdpChannel udpChannel,
Expand All @@ -112,7 +117,8 @@ protected UdpChannelTransport(
final PortManager portManager,
final MediaDriver.Context context,
final int socketRcvbufLength,
final int socketSndbufLength)
final int socketSndbufLength,
final int socketToS)
{
this.context = context;
this.udpChannel = udpChannel;
Expand All @@ -124,6 +130,7 @@ protected UdpChannelTransport(
this.invalidPackets = context.systemCounters().get(SystemCounterDescriptor.INVALID_PACKETS);
this.socketRcvbufLength = socketRcvbufLength;
this.socketSndbufLength = socketSndbufLength;
this.socketToS = socketToS;
}

/**
Expand Down Expand Up @@ -152,7 +159,8 @@ protected UdpChannelTransport(
portManager,
context,
udpChannel.socketRcvbufLengthOrDefault(context.socketRcvbufLength()),
udpChannel.socketSndbufLengthOrDefault(context.socketSndbufLength()));
udpChannel.socketSndbufLengthOrDefault(context.socketSndbufLength()),
udpChannel.socketToSOrDefault(context.socketToS()));
}

/**
Expand Down Expand Up @@ -225,6 +233,11 @@ else if (context.socketMulticastTtl() != 0)
receiveDatagramChannel.setOption(SO_RCVBUF, socketRcvbufLength());
}

if (SOCKET_TOS_DEFAULT != socketToS)
{
receiveDatagramChannel.socket().setTrafficClass(socketToS);
}

sendDatagramChannel.configureBlocking(false);
receiveDatagramChannel.configureBlocking(false);
}
Expand Down Expand Up @@ -514,4 +527,15 @@ public int socketRcvbufLength()
{
return socketRcvbufLength;
}

/**
* Gets the configured OS type of service (IP_TOS) for the endpoint's socket.
*
* @return OS socket type of service or {@link io.aeron.driver.Configuration#SOCKET_TOS_DEFAULT}
* if using OS default.
*/
public int socketToS()
{
return socketToS;
}
}
Loading
Loading