Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster file transfer and less CPU consumption #55

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
<artifactId>junit</artifactId>
<version>[4.12,)</version>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
<version>0.11</version>
</dependency>
</dependencies>

<build>
Expand Down
2 changes: 2 additions & 0 deletions yajsync-app/src/assembly/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
<includes>
<include>com.github.perlundq:yajsync-core</include>
<include>com.github.perlundq:yajsync-app</include>
<include>net.openhft:zero-allocation-hashing</include>
</includes>
<useProjectArtifact>true</useProjectArtifact>
<unpack>true</unpack>

</dependencySet>
</dependencySets>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.github.perlundq.yajsync.attr.SymlinkInfo;
import com.github.perlundq.yajsync.attr.User;
import com.github.perlundq.yajsync.internal.channels.ChannelException;
import com.github.perlundq.yajsync.internal.session.ChecksumHash;
import com.github.perlundq.yajsync.internal.session.FileAttributeManager;
import com.github.perlundq.yajsync.internal.session.FileAttributeManagerFactory;
import com.github.perlundq.yajsync.internal.session.SessionStatistics;
Expand Down Expand Up @@ -458,6 +459,18 @@ private List<Option> options()
}
}));

options.add(Option.newStringOption(Option.Policy.OPTIONAL, "checksum-choice", "c",
"which hash to use for checksum ( md5, xxhash )",
option -> {
String checksumName = (String) option.getValue();
try {
_clientBuilder.checksumHash( ChecksumHash.valueOf( checksumName ));
return ArgumentParser.Status.CONTINUE;
} catch (IllegalArgumentException e) {
throw new ArgumentParsingError(String.format(
"failed to set checksum hash to %s: %s",
checksumName, e));
}}));
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -387,6 +389,7 @@ public Modules newAnonymous(InetAddress address)

public class SystemTest
{

private static class ReturnStatus
{
final int rc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.logging.Logger;

import com.github.perlundq.yajsync.attr.FileInfo;
import com.github.perlundq.yajsync.internal.session.ChecksumHash;
import com.github.perlundq.yajsync.internal.session.ClientSessionConfig;
import com.github.perlundq.yajsync.internal.session.FilterMode;
import com.github.perlundq.yajsync.internal.session.Generator;
Expand Down Expand Up @@ -153,7 +154,7 @@ public Result call() throws RsyncException, InterruptedException
return Result.success();
}
Generator generator = new Generator.Builder(out,
cfg.checksumSeed()).
cfg.checksumSeed(), cfg.checksumHash() ).
charset(cfg.charset()).
fileSelection(fileSelection).
isDelete(_isDelete).
Expand Down Expand Up @@ -351,11 +352,11 @@ public FileListing list(Iterable<Path> srcPaths)
FileSelection fileSelection =
Util.defaultIfNull(_fileSelectionOrNull,
FileSelection.TRANSFER_DIRS);
byte[] seed = BitOps.toLittleEndianBuf((int) System.currentTimeMillis());
int seed = (int) System.currentTimeMillis();
Sender sender = new Sender.Builder(toSender.source(),
toReceiver.sink(),
srcPaths,
seed).
seed, _checksumHash).
isExitEarlyIfEmptyList(true).
charset(_charset).
isPreserveDevices(_isPreserveDevices).
Expand All @@ -365,7 +366,7 @@ public FileListing list(Iterable<Path> srcPaths)
isPreserveGroup(_isPreserveGroup).
isNumericIds(_isNumericIds).
fileSelection(fileSelection).build();
Generator generator = new Generator.Builder(toSender.sink(), seed).
Generator generator = new Generator.Builder(toSender.sink(), seed, _checksumHash).
charset(_charset).
fileSelection(fileSelection).
isDelete(_isDelete).
Expand Down Expand Up @@ -403,11 +404,11 @@ private Result localTransfer(Iterable<Path> srcPaths, Path dstPath)
FileSelection fileSelection =
Util.defaultIfNull(_fileSelectionOrNull,
FileSelection.EXACT);
byte[] seed = BitOps.toLittleEndianBuf((int) System.currentTimeMillis());
int seed = (int) System.currentTimeMillis();
Sender sender = new Sender.Builder(toSender.source(),
toReceiver.sink(),
srcPaths,
seed).
seed, _checksumHash).
isExitEarlyIfEmptyList(true).
charset(_charset).
isPreserveDevices(_isPreserveDevices).
Expand All @@ -417,7 +418,7 @@ private Result localTransfer(Iterable<Path> srcPaths, Path dstPath)
isPreserveGroup(_isPreserveGroup).
isNumericIds(_isNumericIds).
fileSelection(fileSelection).build();
Generator generator = new Generator.Builder(toSender.sink(), seed).
Generator generator = new Generator.Builder(toSender.sink(), seed, _checksumHash).
charset(_charset).
fileSelection(fileSelection).
isDelete(_isDelete).
Expand Down Expand Up @@ -487,8 +488,10 @@ public FileListing list(String moduleName,
ClientSessionConfig cfg = new ClientSessionConfig(_in,
_out,
_charset,
_checksumHash,
fileSelection == FileSelection.RECURSE,
_stderr);
_stderr
);
return new FileListing(cfg,
moduleName,
serverArgs,
Expand Down Expand Up @@ -523,6 +526,7 @@ public ModuleListing listModules()
ClientSessionConfig cfg = new ClientSessionConfig(_in,
_out,
_charset,
_checksumHash,
fileSelection == FileSelection.RECURSE,
_stderr);
return new ModuleListing(cfg, serverArgs);
Expand Down Expand Up @@ -585,6 +589,7 @@ public Result to(String moduleName, String dstPathName)
ClientSessionConfig cfg = new ClientSessionConfig(_in,
_out,
_charset,
_checksumHash,
fileSelection == FileSelection.RECURSE,
_stderr);
SessionStatus status = cfg.handshake(moduleName, serverArgs,
Expand All @@ -603,7 +608,7 @@ public Result to(String moduleName, String dstPathName)
Sender sender = Sender.Builder.newClient(_in,
_out,
_srcPaths,
cfg.checksumSeed()).
cfg.checksumSeed(), cfg.checksumHash()).
filterMode(_isDelete ? FilterMode.SEND
: FilterMode.NONE).
charset(_charset).
Expand Down Expand Up @@ -665,6 +670,7 @@ public Result to(Path dstPath)
ClientSessionConfig cfg = new ClientSessionConfig(_in,
_out,
_charset,
_checksumHash,
fileSelection == FileSelection.RECURSE,
_stderr);
SessionStatus status = cfg.handshake(_moduleName, serverArgs,
Expand All @@ -681,7 +687,7 @@ public Result to(Path dstPath)

try {
Generator generator = new Generator.Builder(_out,
cfg.checksumSeed()).
cfg.checksumSeed(), cfg.checksumHash()).
charset(cfg.charset()).
fileSelection(fileSelection).
isDelete(_isDelete).
Expand Down Expand Up @@ -793,6 +799,9 @@ private List<String> createServerArgs(Mode mode,
if (_isPreserveDevices && !_isPreserveSpecials) {
serverArgs.add("--no-specials");
}
if ( _checksumHash != ChecksumHash.md5 ) {
serverArgs.add("--checksum-choice=" + _checksumHash );
}

serverArgs.add("."); // arg delimiter

Expand Down Expand Up @@ -838,6 +847,7 @@ public static class Builder
private boolean _isNumericIds;
private boolean _isPreservePermissions;
private boolean _isPreserveTimes;
private ChecksumHash _checksumHash = ChecksumHash.xxhash;
private Charset _charset = Charset.forName(Text.UTF8_NAME);
private ExecutorService _executorService;
private FileSelection _fileSelection;
Expand Down Expand Up @@ -933,6 +943,11 @@ public Builder isPreserveTimes(boolean isPreserveTimes)
_isPreserveTimes = isPreserveTimes;
return this;
}

public Builder checksumHash( ChecksumHash hash ) {
_checksumHash = hash;
return this;
}

/**
*
Expand Down Expand Up @@ -987,6 +1002,7 @@ public Builder verbosity(int verbosity)
private final boolean _isPreservePermissions;
private final boolean _isPreserveTimes;
private final Charset _charset;
private final ChecksumHash _checksumHash;
private final ExecutorService _executorService;
private final FileSelection _fileSelectionOrNull;
private final int _verbosity;
Expand All @@ -1010,6 +1026,7 @@ private RsyncClient(Builder builder)
_isPreservePermissions = builder._isPreservePermissions;
_isPreserveTimes = builder._isPreserveTimes;
_charset = builder._charset;
_checksumHash = builder._checksumHash;
if (builder._executorService == null) {
_executorService = Executors.newCachedThreadPool();
_isOwnerOfExecutorService = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public boolean serve(Modules modules,
if (cfg.isSender()) {
Sender sender = Sender.Builder.newServer(in, out,
cfg.sourceFiles(),
cfg.checksumSeed()).
cfg.checksumSeed(), cfg.checksumHash() ).
filterMode(FilterMode.RECEIVE).
charset(cfg.charset()).
fileSelection(cfg.fileSelection()).
Expand All @@ -121,7 +121,7 @@ public boolean serve(Modules modules,
return _rsyncTaskExecutor.exec(sender);
} else {
Generator generator = new Generator.Builder(out,
cfg.checksumSeed()).
cfg.checksumSeed(), cfg.checksumHash()).
charset(cfg.charset()).
fileSelection(cfg.fileSelection()).
isDelete(cfg.isDelete()).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ public void put(ByteBuffer src) throws ChannelException
flush();
} else {
ByteBuffer slice = Util.slice(src,
src.position(),
src.position() + l);
src.position(),
src.position() + l);
_buffer.put(slice);
src.position(slice.position());
}
Expand All @@ -102,7 +102,7 @@ public void put(ByteBuffer src) throws ChannelException
public void put(byte[] src, int offset, int length)
throws ChannelException
{
put(ByteBuffer.wrap(src, offset, length));
put( ByteBuffer.wrap( src, offset, length ) );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public String toString()
{
return String.format("%s %s %s %s",
getClass().getSimpleName(), _header, _payload,
Text.byteBufferToString(_payload));
Text.bytesToString(_payload));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected void readNextAvailable(ByteBuffer dst) throws ChannelException
dst.position(),
dst.position() + Math.min(chunkLength,
64));
_log.finest(Text.byteBufferToString(tmp));
_log.finest(Text.bytesToString(tmp));
}
dst.position(slice.position());
_readAmountAvailable -= chunkLength;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
package com.github.perlundq.yajsync.internal.channels;

import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;

import com.github.perlundq.yajsync.internal.util.Consts;
Expand Down Expand Up @@ -79,6 +80,24 @@ public void flush() throws ChannelException
updateTagOffsetAndBufPos(DEFAULT_TAG_OFFSET);
}
}

@Override
public void put( ByteBuffer src ) throws ChannelException
{
if ( !src.isDirect() || !src.hasRemaining() ) {
super.put( src );
return;
}

// for direct buffers we want to avoid extra byte copy of src into _buffer
// ( this copy takes 70% of cpu sending file literal data )
tagCurrentData( src.remaining() );
super.flush();

send( src );
updateTagOffsetAndBufPos(DEFAULT_TAG_OFFSET);

}

@Override
public int numBytesBuffered()
Expand All @@ -95,9 +114,14 @@ private int numBytesUntagged()
}

private void tagCurrentData()
{
tagCurrentData(0);
}

private void tagCurrentData( int delta )
{
putMessageHeader(_tag_offset, new MessageHeader(MessageCode.DATA,
numBytesUntagged()));
numBytesUntagged() + delta ));
}

private void putMessageHeader(int offset, MessageHeader header)
Expand Down
Loading