diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/RawSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/RawSftpClient.java
index 4ab3373db..0bc947632 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/RawSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/RawSftpClient.java
@@ -28,6 +28,7 @@
* @author Apache MINA SSHD Project
*/
public interface RawSftpClient {
+
/**
* @param cmd Command to send - Note: only lower 8-bits are used
* @param buffer The {@link Buffer} containing the command data
@@ -36,6 +37,8 @@ public interface RawSftpClient {
*/
int send(int cmd, Buffer buffer) throws IOException;
+ SftpMessage write(int cmd, Buffer buffer) throws IOException;
+
/**
* @param id The expected request id
* @return The received response {@link Buffer} containing the request id
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/SftpMessage.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/SftpMessage.java
new file mode 100644
index 000000000..a21ee0a0e
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/SftpMessage.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.sftp.client;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Objects;
+
+import org.apache.sshd.common.io.IoWriteFuture;
+
+/**
+ * A representation of a written SFTP message.
+ *
+ * @author Apache MINA SSHD Project
+ */
+public class SftpMessage {
+
+ private final int id;
+ private final IoWriteFuture future;
+ private final Duration timeout;
+
+ /**
+ * Creates a new instance.
+ *
+ * @param id SFTP message id
+ * @param future {@link IoWriteFuture} of the SFTP message; can be used to wait until the message has been actually
+ * sent
+ * @param timeout the configured SFTP write timeout
+ */
+ public SftpMessage(int id, IoWriteFuture future, Duration timeout) {
+ this.id = id;
+ this.future = Objects.requireNonNull(future);
+ this.timeout = Objects.requireNonNull(timeout);
+ }
+
+ /**
+ * Retrieves the SFTP message id.
+ *
+ * @return the SFTP message id
+ */
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * Retrieves the {@link IoWriteFuture} of the message; can be used to wait until the message has been actually sent.
+ *
+ * @return the {@link IoWriteFuture}, never {@code null}
+ */
+ public IoWriteFuture getFuture() {
+ return future;
+ }
+
+ /**
+ * Retrieves the write timeout configured when the message was sent.
+ *
+ * @return the timeout, never {@code null}
+ */
+ public Duration getTimeout() {
+ return timeout;
+ }
+
+ /**
+ * Waits with the configured timeout until the message has been sent.
+ *
+ * @throws IOException if the message could not be sent, or waiting is interrupted.
+ */
+ public void waitUntilSent() throws IOException {
+ getFuture().verify(getTimeout());
+ }
+}
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/extensions/helpers/AbstractSftpClientExtension.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/extensions/helpers/AbstractSftpClientExtension.java
index eca4156c2..e7e3c983c 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/extensions/helpers/AbstractSftpClientExtension.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/extensions/helpers/AbstractSftpClientExtension.java
@@ -36,6 +36,7 @@
import org.apache.sshd.sftp.client.RawSftpClient;
import org.apache.sshd.sftp.client.SftpClient;
import org.apache.sshd.sftp.client.SftpClient.Handle;
+import org.apache.sshd.sftp.client.SftpMessage;
import org.apache.sshd.sftp.client.extensions.SftpClientExtension;
import org.apache.sshd.sftp.client.impl.SftpResponse;
import org.apache.sshd.sftp.client.impl.SftpStatus;
@@ -93,6 +94,11 @@ public int send(int cmd, Buffer buffer) throws IOException {
return raw.send(cmd, buffer);
}
+ @Override
+ public SftpMessage write(int cmd, Buffer buffer) throws IOException {
+ return raw.write(cmd, buffer);
+ }
+
@Override
public Buffer receive(int id) throws IOException {
return raw.receive(id);
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/fs/SftpFileSystem.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/fs/SftpFileSystem.java
index 5a347e5f1..fbb9a48e9 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/fs/SftpFileSystem.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/fs/SftpFileSystem.java
@@ -66,6 +66,7 @@
import org.apache.sshd.sftp.client.SftpClient;
import org.apache.sshd.sftp.client.SftpClientFactory;
import org.apache.sshd.sftp.client.SftpErrorDataHandler;
+import org.apache.sshd.sftp.client.SftpMessage;
import org.apache.sshd.sftp.client.SftpVersionSelector;
import org.apache.sshd.sftp.client.impl.AbstractSftpClient;
import org.apache.sshd.sftp.client.impl.SftpPathImpl;
@@ -606,6 +607,21 @@ public int send(int cmd, Buffer buffer) throws IOException {
}
}
+ @Override
+ public SftpMessage write(int cmd, Buffer buffer) throws IOException {
+ if (!isOpen()) {
+ throw new IOException("write(cmd=" + SftpConstants.getCommandMessageName(cmd) + ") client is closed");
+ }
+
+ if (delegate instanceof RawSftpClient) {
+ return ((RawSftpClient) delegate).write(cmd, buffer);
+ } else {
+ throw new StreamCorruptedException(
+ "write(cmd=" + SftpConstants.getCommandMessageName(cmd) + ") delegate is not a "
+ + RawSftpClient.class.getSimpleName());
+ }
+ }
+
@Override
public Buffer receive(int id) throws IOException {
if (!isOpen()) {
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/AbstractSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/AbstractSftpClient.java
index fed65aadb..44f0cdff1 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/AbstractSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/AbstractSftpClient.java
@@ -1240,10 +1240,7 @@ public InputStream read(String path, int bufferSize, Collection mode)
@Override
public OutputStream write(String path, int bufferSize, Collection mode) throws IOException {
- if (bufferSize <= 0) {
- bufferSize = getWriteBufferSize();
- }
- if (bufferSize < MIN_WRITE_BUFFER_SIZE) {
+ if (bufferSize != 0 && bufferSize < MIN_WRITE_BUFFER_SIZE) {
throw new IllegalArgumentException("Insufficient write buffer size: " + bufferSize + ", min.="
+ MIN_WRITE_BUFFER_SIZE);
}
@@ -1260,6 +1257,7 @@ protected int getReadBufferSize() {
}
protected int getWriteBufferSize() {
+ // Do not use. -13 is wrong anyway.
return (int) getClientChannel().getRemoteWindow().getPacketSize() - 13;
}
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java
index 860177d99..d5b3cd367 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java
@@ -60,6 +60,7 @@
import org.apache.sshd.core.CoreModuleProperties;
import org.apache.sshd.sftp.SftpModuleProperties;
import org.apache.sshd.sftp.client.SftpErrorDataHandler;
+import org.apache.sshd.sftp.client.SftpMessage;
import org.apache.sshd.sftp.client.SftpVersionSelector;
import org.apache.sshd.sftp.common.SftpConstants;
import org.apache.sshd.sftp.common.extensions.ParserUtils;
@@ -268,6 +269,13 @@ protected void process(Buffer incoming) throws IOException {
@Override
public int send(int cmd, Buffer buffer) throws IOException {
+ SftpMessage msg = write(cmd, buffer);
+ msg.waitUntilSent();
+ return msg.getId();
+ }
+
+ @Override
+ public SftpMessage write(int cmd, Buffer buffer) throws IOException {
int id = cmdId.incrementAndGet();
int len = buffer.available();
if (log.isTraceEnabled()) {
@@ -298,9 +306,8 @@ public int send(int cmd, Buffer buffer) throws IOException {
ClientChannel clientChannel = getClientChannel();
IoOutputStream asyncIn = clientChannel.getAsyncIn();
IoWriteFuture writeFuture = asyncIn.writeBuffer(buf);
- Duration cmdTimeout = SFTP_CLIENT_CMD_TIMEOUT.getRequired(clientChannel);
- writeFuture.verify(cmdTimeout);
- return id;
+ Duration sendTimeout = SFTP_CLIENT_CMD_TIMEOUT.getRequired(clientChannel);
+ return new SftpMessage(id, writeFuture, sendTimeout);
}
@Override
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpInputStreamAsync.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpInputStreamAsync.java
index d1ddc31c3..02c69f6e9 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpInputStreamAsync.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpInputStreamAsync.java
@@ -37,7 +37,6 @@
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.common.util.io.input.InputStreamWithChannel;
import org.apache.sshd.sftp.client.SftpClient;
-import org.apache.sshd.sftp.client.SftpClient.Attributes;
import org.apache.sshd.sftp.client.SftpClient.CloseableHandle;
import org.apache.sshd.sftp.client.SftpClient.OpenMode;
import org.apache.sshd.sftp.client.SftpClientHolder;
@@ -65,25 +64,26 @@ public class SftpInputStreamAsync extends InputStreamWithChannel implements Sftp
private final AbstractSftpClient clientInstance;
private final String path;
+ private final boolean ownsHandle;
public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize,
String path, Collection mode)
throws IOException {
- this.log = LoggerFactory.getLogger(getClass());
- this.clientInstance = Objects.requireNonNull(client, "No SFTP client instance");
- this.path = path;
- Attributes attrs = client.stat(path);
- this.fileSize = attrs.getSize();
- this.handle = client.open(path, mode);
- this.bufferSize = bufferSize;
+ this(client, bufferSize, 0, client.stat(path).getSize(), path, client.open(path, mode));
}
public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize, long clientOffset, long fileSize,
String path, CloseableHandle handle) {
+ this(client, bufferSize, clientOffset, fileSize, path, handle, true);
+ }
+
+ public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize, long clientOffset, long fileSize,
+ String path, CloseableHandle handle, boolean closeHandle) {
this.log = LoggerFactory.getLogger(getClass());
this.clientInstance = Objects.requireNonNull(client, "No SFTP client instance");
this.path = path;
this.handle = handle;
+ this.ownsHandle = closeHandle;
this.bufferSize = bufferSize;
this.requestOffset = clientOffset;
this.clientOffset = clientOffset;
@@ -398,10 +398,12 @@ public void close() throws IOException {
pollBuffer(ack);
}
} finally {
- if (debugEnabled) {
- log.debug("close({}) closing file handle; {} short reads", this, shortReads);
+ if (ownsHandle) {
+ if (debugEnabled) {
+ log.debug("close({}) closing file handle; {} short reads", this, shortReads);
+ }
+ handle.close();
}
- handle.close();
}
} finally {
handle = null;
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpOutputStreamAsync.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpOutputStreamAsync.java
index e3d16e0ab..bd9200083 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpOutputStreamAsync.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpOutputStreamAsync.java
@@ -18,7 +18,11 @@
*/
package org.apache.sshd.sftp.client.impl;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
import java.time.Duration;
import java.util.Collection;
import java.util.Deque;
@@ -27,6 +31,8 @@
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.session.helpers.PacketBuffer;
+import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.common.util.io.output.OutputStreamWithChannel;
@@ -34,6 +40,7 @@
import org.apache.sshd.sftp.client.SftpClient.CloseableHandle;
import org.apache.sshd.sftp.client.SftpClient.OpenMode;
import org.apache.sshd.sftp.client.SftpClientHolder;
+import org.apache.sshd.sftp.client.SftpMessage;
import org.apache.sshd.sftp.common.SftpConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,29 +57,89 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel implements Sf
protected Buffer buffer;
protected CloseableHandle handle;
protected long offset;
- protected final Deque pendingWrites = new LinkedList<>();
+ protected final Deque pendingAcks = new LinkedList<>();
private final AbstractSftpClient clientInstance;
private final String path;
+ private final byte[] handleId;
+ private final boolean ownsHandle;
+ private final Buffer[] bufferPool = new Buffer[2];
+ private final int packetSize;
+ private final int sftpPreamble;
+ private final boolean usePacket;
+ private int nextBuffer;
+ private SftpMessage lastMsg;
+
+ /**
+ * Creates a new stream to write data to a remote file.
+ *
+ * @param client {@link AbstractSftpClient} to use for writing data
+ * @param bufferSize SFTP packet length to use. Most servers have a limit of 256kB. If zero, the stream picks a
+ * size such that each SFTP packet fits into a single SSH packet, i.e., roughly 32kB.
+ * @param path remote path to write to
+ * @param mode {@link OpenMode}s for opening the file.
+ * @throws IOException if the remote file cannot be opened
+ */
public SftpOutputStreamAsync(AbstractSftpClient client, int bufferSize,
String path, Collection mode)
throws IOException {
- this.log = LoggerFactory.getLogger(getClass());
- this.clientInstance = Objects.requireNonNull(client, "No SFTP client instance");
- this.path = path;
- this.handle = client.open(path, mode);
- this.bufferSize = bufferSize;
+ this(client, bufferSize, path, client.open(path, mode), true);
}
+ /**
+ * Creates a new stream to write data to a remote file.
+ *
+ * @param client {@link AbstractSftpClient} to use for writing data
+ * @param bufferSize SFTP packet length to use. Most servers have a limit of 256kB. If zero, the stream picks a size
+ * such that each SFTP packet fits into a single SSH packet, i.e., roughly 32kB.
+ * @param handle {@link CloseableHandle} of the remote file to write to; will be closed when this output stream
+ * is closed
+ */
public SftpOutputStreamAsync(AbstractSftpClient client, int bufferSize,
- String path, CloseableHandle handle)
- throws IOException {
+ String path, CloseableHandle handle) {
+ this(client, bufferSize, path, handle, true);
+ }
+
+ /**
+ * Creates a new stream to write data to a remote file.
+ *
+ * @param client {@link AbstractSftpClient} to use for writing data
+ * @param bufferSize SFTP packet length to use. Most servers have a limit of 256kB. If zero, the stream picks a
+ * size such that each SFTP packet fits into a single SSH packet, i.e., roughly 32kB.
+ * @param handle {@link CloseableHandle} of the remote file to write to
+ * @param closeHandle whether to close the {@code handle} when this output stream is closed
+ */
+ public SftpOutputStreamAsync(AbstractSftpClient client, int bufferSize,
+ String path, CloseableHandle handle, boolean closeHandle) {
this.log = LoggerFactory.getLogger(getClass());
this.clientInstance = Objects.requireNonNull(client, "No SFTP client instance");
this.path = path;
this.handle = handle;
- this.bufferSize = bufferSize;
+ this.handleId = this.handle.getIdentifier();
+ // SFTP WRITE packet header:
+ // 9 = length + type + sftp request id
+ // 4 = handle length
+ // handle bytes
+ // 8 = file offset
+ // 4 = length of actual data
+ this.sftpPreamble = 9 + 4 + handleId.length + 8 + 4;
+ this.ownsHandle = closeHandle;
+ this.packetSize = (int) client.getChannel().getRemoteWindow().getPacketSize();
+ int bufSize = bufferSize;
+ if (bufSize == 0) {
+ bufSize = packetSize;
+ } else {
+ ValidateUtils.checkTrue(bufferSize >= SftpClient.MIN_WRITE_BUFFER_SIZE, "SFTP write buffer too small: %d < %d",
+ bufferSize, SftpClient.MIN_WRITE_BUFFER_SIZE);
+ bufSize += sftpPreamble;
+ }
+ this.usePacket = bufSize <= packetSize;
+ if (usePacket) {
+ // 9 = SSH_MSG_CHANNEL_DATA + recipient channel + length (RFC 4254); length <= packet size
+ bufSize += 9;
+ }
+ this.bufferSize = bufSize;
}
@Override
@@ -106,45 +173,128 @@ public void write(int b) throws IOException {
@Override
public void write(byte[] b, int off, int len) throws IOException {
- byte[] id = handle.getIdentifier();
+ ByteArrayInputStream in = new ByteArrayInputStream(b, off, len);
+ internalTransfer(in::read, false);
+ }
+
+ public long transferFrom(InputStream stream) throws IOException {
+ return internalTransfer(stream::read, true);
+ }
+
+ public long transferFrom(ReadableByteChannel stream, long count) throws IOException {
+ return internalTransfer(new ChannelReader(stream, count), false);
+ }
+
+ private Buffer getBuffer(Session session) {
+ Buffer buf = bufferPool[nextBuffer];
+ if (buf == null) {
+ if (nextBuffer == 1 && lastMsg != null && lastMsg.getFuture().isDone()) {
+ // No need to allocate a second buffer, we may re-use the 0 buffer
+ nextBuffer = 0;
+ buf = bufferPool[0];
+ } else {
+ if (usePacket) {
+ buf = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA, bufferSize);
+ } else {
+ buf = new ByteArrayBuffer(bufferSize, false);
+ }
+ bufferPool[nextBuffer] = buf;
+ }
+ }
+ nextBuffer ^= 1;
+ int hdr;
+ if (buf instanceof PacketBuffer) {
+ // 9 = SshConstants.SSH_MSG_CHANNEL_DATA + recipient channel + length (RFC 4254)
+ hdr = SshConstants.SSH_PACKET_HEADER_LEN + 9 + sftpPreamble;
+ } else {
+ // Only the SFTP header. The channel will split this large SFTP packet into smaller SSH packets anyway, and
+ // allocate its own SSH_MSG_CHANNEL_DATA packets. (Larger SFTP packets may result in less ACKs, but involve
+ // copying buffers around.)
+ hdr = sftpPreamble;
+ }
+ buf.rpos(hdr);
+ buf.wpos(hdr);
+ return buf;
+ }
+
+ @FunctionalInterface
+ private interface ByteInput {
+ int read(byte[] buffer, int offset, int length) throws IOException;
+ }
+
+ private static class ChannelReader implements ByteInput {
+ private final ReadableByteChannel src;
+ private long stillToRead;
+
+ ChannelReader(ReadableByteChannel src, long toRead) {
+ this.src = src;
+ this.stillToRead = toRead;
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int length) throws IOException {
+ if (stillToRead <= 0) {
+ return -1;
+ }
+ ByteBuffer wrap = ByteBuffer.wrap(buffer, offset, (int) Math.min(length, stillToRead));
+ int actuallyRead = src.read(wrap);
+ if (actuallyRead < 0) {
+ // EOF.
+ stillToRead = 0;
+ return -1;
+ }
+ stillToRead -= actuallyRead;
+ return actuallyRead;
+ }
+ }
+
+ private long internalTransfer(ByteInput stream, boolean forceFlush) throws IOException {
SftpClient client = getClient();
Session session = client.getSession();
boolean traceEnabled = log.isTraceEnabled();
- int writtenCount = 0;
- int totalLen = len;
+ long writtenCount = 0;
+ boolean eof = false;
do {
if (buffer == null) {
- if (traceEnabled) {
- log.trace("write({}) allocate buffer size={} after {}/{} bytes",
- this, bufferSize, writtenCount, totalLen);
- }
-
- buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA, bufferSize);
- int hdr = 9 + 16 + 8 + id.length + buffer.wpos();
- buffer.rpos(hdr);
- buffer.wpos(hdr);
+ buffer = getBuffer(session);
}
- int max = bufferSize - (9 + 16 + id.length + 72);
- int nb = Math.min(len, Math.max(0, max - buffer.available()));
- buffer.putRawBytes(b, off, nb);
-
- off += nb;
- len -= nb;
- writtenCount += nb;
+ int pos = buffer.wpos();
+ int off = pos;
+ int toRead = bufferSize - off;
+ while (toRead > 0) {
+ int n = stream.read(buffer.array(), off, toRead);
+ if (n < 0) {
+ eof = true;
+ break;
+ }
+ off += n;
+ toRead -= n;
+ }
- if (buffer.available() >= max) {
+ writtenCount += off - pos;
+ buffer.wpos(off);
+ if (off == bufferSize || eof && forceFlush && buffer.available() > 0) {
if (traceEnabled) {
- log.trace("write({}) flush after {}/{} bytes", this, writtenCount, totalLen);
+ log.trace("write({}) flush after {} bytes", this, writtenCount);
}
- flush();
+ internalFlush();
}
- } while (len > 0);
+ } while (!eof);
+ return writtenCount;
}
@Override
public void flush() throws IOException {
+ internalFlush();
+ if (lastMsg != null) {
+ lastMsg.waitUntilSent();
+ lastMsg = null;
+ }
+ }
+
+ private void internalFlush() throws IOException {
if (!isOpen()) {
throw new IOException("flush(" + getPath() + ") stream is closed");
}
@@ -152,7 +302,7 @@ public void flush() throws IOException {
boolean debugEnabled = log.isDebugEnabled();
AbstractSftpClient client = getClient();
for (int ackIndex = 1;; ackIndex++) {
- SftpAckData ack = pendingWrites.peek();
+ SftpAckData ack = pendingAcks.peek();
if (ack == null) {
if (debugEnabled) {
log.debug("flush({}) processed {} pending writes", this, ackIndex);
@@ -176,7 +326,7 @@ public void flush() throws IOException {
log.debug("flush({}) processing ack #{}: {}", this, ackIndex, ack);
}
- pendingWrites.removeFirst();
+ pendingAcks.removeFirst();
SftpResponse response = SftpResponse.parse(SftpConstants.SSH_FXP_WRITE, buf);
client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, ack.id, SftpStatus.parse(response));
}
@@ -188,31 +338,29 @@ public void flush() throws IOException {
return;
}
- byte[] id = handle.getIdentifier();
int avail = buffer.available();
- Buffer buf;
- if (buffer.rpos() >= (16 + id.length)) {
- int wpos = buffer.wpos();
- buffer.rpos(buffer.rpos() - 16 - id.length);
- buffer.wpos(buffer.rpos());
- buffer.putBytes(id);
- buffer.putLong(offset);
- buffer.putUInt(avail);
- buffer.wpos(wpos);
- buf = buffer;
- } else {
- buf = new ByteArrayBuffer(id.length + avail + Long.SIZE /* some extra fields */, false);
- buf.putBytes(id);
- buf.putLong(offset);
- buf.putBytes(buffer.array(), buffer.rpos(), avail);
- }
- int reqId = client.send(SftpConstants.SSH_FXP_WRITE, buf);
- SftpAckData ack = new SftpAckData(reqId, offset, avail);
+ int wpos = buffer.wpos();
+ // 4 = handle length
+ // handle bytes
+ // 8 = file offset
+ // 4 = length of actual data
+ buffer.rpos(buffer.rpos() - 16 - handleId.length);
+ buffer.wpos(buffer.rpos());
+ buffer.putBytes(handleId);
+ buffer.putLong(offset);
+ buffer.putUInt(avail);
+ buffer.wpos(wpos);
+
+ if (lastMsg != null) {
+ lastMsg.waitUntilSent();
+ }
+ lastMsg = client.write(SftpConstants.SSH_FXP_WRITE, buffer);
+ SftpAckData ack = new SftpAckData(lastMsg.getId(), offset, avail);
if (debugEnabled) {
log.debug("flush({}) enqueue pending ack={}", this, ack);
}
- pendingWrites.add(ack);
+ pendingAcks.add(ack);
offset += avail;
buffer = null;
@@ -233,12 +381,16 @@ public void close() throws IOException {
if (debugEnabled) {
log.debug("close({}) flushing {} pending bytes", this, pendingSize);
}
- flush();
+ internalFlush();
+ }
+ if (lastMsg != null) {
+ lastMsg.waitUntilSent();
+ lastMsg = null;
}
AbstractSftpClient client = getClient();
- for (int ackIndex = 1; !pendingWrites.isEmpty(); ackIndex++) {
- SftpAckData ack = pendingWrites.removeFirst();
+ for (int ackIndex = 1; !pendingAcks.isEmpty(); ackIndex++) {
+ SftpAckData ack = pendingAcks.removeFirst();
if (debugEnabled) {
log.debug("close({}) processing ack #{}: {}", this, ackIndex, ack);
}
@@ -250,13 +402,19 @@ public void close() throws IOException {
client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response.getId(), SftpStatus.parse(response));
}
} finally {
- if (debugEnabled) {
- log.debug("close({}) closing file handle", this);
+ if (ownsHandle) {
+ if (debugEnabled) {
+ log.debug("close({}) closing file handle", this);
+ }
+ handle.close();
}
- handle.close();
}
} finally {
handle = null;
+ buffer = null;
+ bufferPool[0] = null;
+ bufferPool[1] = null;
+ lastMsg = null;
}
}
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpRemotePathChannel.java b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpRemotePathChannel.java
index 1fcc1f53f..bf36154e6 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpRemotePathChannel.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpRemotePathChannel.java
@@ -362,14 +362,10 @@ public long transferTo(long position, long count, WritableByteChannel target) th
long totalRead;
synchronized (lock) {
- try {
- beginBlocking("transferTo");
+ beginBlocking("transferTo");
- // DO NOT CLOSE THE STREAM AS IT WOULD CLOSE THE HANDLE
- @SuppressWarnings("resource")
- SftpInputStreamAsync input = new SftpInputStreamAsync(
- (AbstractSftpClient) sftp,
- copySize, position, count, getRemotePath(), handle);
+ try (SftpInputStreamAsync input = new SftpInputStreamAsync(
+ (AbstractSftpClient) sftp, copySize, position, count, getRemotePath(), handle, false)) {
totalRead = input.transferTo(count, target);
eof = input.isEof();
completed = true;
@@ -400,42 +396,22 @@ public long transferFrom(ReadableByteChannel src, long position, long count) thr
}
ensureMode(true);
- ClientSession clientSession = sftp.getClientSession();
- int copySize = SftpModuleProperties.COPY_BUF_SIZE.getRequired(clientSession);
boolean debugEnabled = log.isDebugEnabled();
if (debugEnabled) {
- log.debug("transferFrom({})[position={}, count={}] use copySize={} for source={}",
- this, position, count, copySize, src);
+ log.debug("transferFrom({})[position={}, count={}] for source={}",
+ this, position, count, src);
}
boolean completed = false;
- long totalRead = 0L;
- byte[] buffer = new byte[(int) Math.min(copySize, count)];
+ long totalWritten = 0;
synchronized (lock) {
- try {
- beginBlocking("transferFrom");
-
- // DO NOT CLOSE THE OUTPUT STREAM AS IT WOULD CLOSE THE HANDLE
- @SuppressWarnings("resource")
- SftpOutputStreamAsync output = new SftpOutputStreamAsync(
- (AbstractSftpClient) sftp,
- copySize, getRemotePath(), handle);
+ beginBlocking("transferFrom");
+ try (SftpOutputStreamAsync output = new SftpOutputStreamAsync(
+ (AbstractSftpClient) sftp, 0, getRemotePath(), handle, false)) {
output.setOffset(position);
-
- while (totalRead < count) {
- ByteBuffer wrap = ByteBuffer.wrap(
- buffer, 0, (int) Math.min(buffer.length, count - totalRead));
- int read = src.read(wrap);
- if (read > 0) {
- output.write(buffer, 0, read);
- totalRead += read;
- } else {
- break;
- }
- }
+ totalWritten = output.transferFrom(src, count);
output.flush();
- // DO NOT CLOSE THE OUTPUT STREAM AS IT WOULD CLOSE THE HANDLE
completed = true;
} finally {
endBlocking("transferFrom", completed);
@@ -443,10 +419,10 @@ public long transferFrom(ReadableByteChannel src, long position, long count) thr
}
if (debugEnabled) {
- log.debug("transferFrom({})[position={}, count={}] use copySize={} - totalRead={}, completed={} for source={}",
- this, position, count, copySize, totalRead, completed, src);
+ log.debug("transferFrom({})[position={}, count={}] - totalRead={}, completed={} for source={}",
+ this, position, count, totalWritten, completed, src);
}
- return totalRead;
+ return totalWritten;
}
@Override