Skip to content

Commit

Permalink
FTP: Enable SFTP connector to issue more than one unconfirmed read re…
Browse files Browse the repository at this point in the history
…quest (#2567)
  • Loading branch information
conorgriffin authored Feb 10, 2021
1 parent 8c98f81 commit 10c268d
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 16 deletions.
10 changes: 10 additions & 0 deletions docs/src/main/paradox/ftp.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ Scala
Java
: @@snip [snip](/ftp/src/test/java/docs/javadsl/ConfigureCustomSSHClient.java) { #configure-custom-ssh-client }

### Improving SFTP throughput
For SFTP connections allowing more than one unconfirmed read request to be sent by the client you can use `withMaxUnconfirmedReads` on @scaladoc[SftpSettings](akka.stream.alpakka.ftp.SftpSettings)
The command-line tool `sftp` uses a value of `64` by default. This can significantly improve throughput by reducing the impact of latency.

Scala
: @@snip [snip](/ftp/src/test/scala/docs/scaladsl/scalaExamples.scala) { #retrieving-with-unconfirmed-reads }

Java
: @@snip [snip](/ftp/src/test/java/docs/javadsl/SftpRetrievingExample.java) { #retrieving-with-unconfirmed-reads }

## Traversing a remote FTP folder recursively

In order to traverse a remote folder recursively, you need to use the `ls` method in the FTP API:
Expand Down
3 changes: 3 additions & 0 deletions ftp/src/main/mima-filters/2.0.2.backwards.excludes
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Allow change to SFTPSettings
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.ftp.SftpSettings.this")

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import akka.util.ByteString.ByteString1C

import scala.concurrent.{Future, Promise}
import java.io.{IOException, InputStream, OutputStream}

import akka.annotation.InternalApi

import scala.util.control.NonFatal
Expand Down Expand Up @@ -112,12 +111,29 @@ private[ftp] trait FtpIOSourceStage[FtpClient, S <: RemoteFileSettings]

protected[this] def doPreStart(): Unit =
isOpt = ftpLike match {
case ur: UnconfirmedReads =>
withUnconfirmedReads(ur)
case ro: RetrieveOffset =>
Some(ro.retrieveFileInputStream(path, handler.get.asInstanceOf[ro.Handler], offset).get)
case _ =>
Some(ftpLike.retrieveFileInputStream(path, handler.get).get)
}

private def withUnconfirmedReads(
ftpLikeWithUnconfirmedReads: FtpLike[FtpClient, S] with UnconfirmedReads
): Option[InputStream] =
connectionSettings match {
case s: SftpSettings =>
Some(
ftpLikeWithUnconfirmedReads
.retrieveFileInputStream(path,
handler.get.asInstanceOf[ftpLikeWithUnconfirmedReads.Handler],
offset,
s.maxUnconfirmedReads)
.get
)
}

protected[this] def matSuccess(): Boolean =
matValuePromise.trySuccess(IOResult.createSuccessful(readBytesTotal))

Expand Down
13 changes: 12 additions & 1 deletion ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ protected[ftp] trait RetrieveOffset { _: FtpLike[_, _] =>

}

/**
* INTERNAL API
*/
@InternalApi
protected[ftp] trait UnconfirmedReads { _: FtpLike[_, _] =>

def retrieveFileInputStream(name: String, handler: Handler, offset: Long, maxUnconfirmedReads: Int): Try[InputStream]

}

/**
* INTERNAL API
*/
Expand All @@ -59,5 +69,6 @@ object FtpLike {
// type class instances
implicit val ftpLikeInstance = new FtpLike[FTPClient, FtpSettings] with RetrieveOffset with FtpOperations
implicit val ftpsLikeInstance = new FtpLike[FTPSClient, FtpsSettings] with RetrieveOffset with FtpsOperations
implicit val sFtpLikeInstance = new FtpLike[SSHClient, SftpSettings] with RetrieveOffset with SftpOperations
implicit val sFtpLikeInstance =
new FtpLike[SSHClient, SftpSettings] with RetrieveOffset with SftpOperations with UnconfirmedReads
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.commons.net.DefaultSocketFactory

import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.util.{Failure, Success, Try}
import scala.util.{Failure, Try}

/**
* INTERNAL API
Expand Down Expand Up @@ -123,15 +123,34 @@ private[ftp] trait SftpOperations { _: FtpLike[SSHClient, SftpSettings] =>
retrieveFileInputStream(name, handler, 0L)

def retrieveFileInputStream(name: String, handler: Handler, offset: Long): Try[InputStream] =
retrieveFileInputStream(name, handler, offset, 1)

def retrieveFileInputStream(name: String,
handler: Handler,
offset: Long,
maxUnconfirmedReads: Int): Try[InputStream] =
Try {
val remoteFile = handler.open(name, java.util.EnumSet.of(OpenMode.READ))
val is = new remoteFile.RemoteFileInputStream(offset) {

override def close(): Unit =
try {
super.close()
} finally {
remoteFile.close()
val is = maxUnconfirmedReads match {
case m if m > 1 =>
new remoteFile.ReadAheadRemoteFileInputStream(m, offset) {

override def close(): Unit =
try {
super.close()
} finally {
remoteFile.close()
}
}
case _ =>
new remoteFile.RemoteFileInputStream(offset) {

override def close(): Unit =
try {
super.close()
} finally {
remoteFile.close()
}
}
}
Option(is).getOrElse {
Expand Down
17 changes: 12 additions & 5 deletions ftp/src/main/scala/akka/stream/alpakka/ftp/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ object FtpsSettings {
* @param knownHosts known hosts file to be used when connecting
* @param sftpIdentity private/public key config to use when connecting
* @param proxy An optional proxy to use when connecting with these settings
* @param maxUnconfirmedReads determines the number of read requests sent in parallel, disabled if set to <=1
*/
final class SftpSettings private (
val host: java.net.InetAddress,
Expand All @@ -270,7 +271,8 @@ final class SftpSettings private (
val strictHostKeyChecking: Boolean,
val knownHosts: Option[String],
val sftpIdentity: Option[SftpIdentity],
val proxy: Option[Proxy]
val proxy: Option[Proxy],
val maxUnconfirmedReads: Int = 1
) extends RemoteFileSettings {

def withHost(value: java.net.InetAddress): SftpSettings = copy(host = value)
Expand All @@ -281,6 +283,7 @@ final class SftpSettings private (
def withKnownHosts(value: String): SftpSettings = copy(knownHosts = Option(value))
def withSftpIdentity(value: SftpIdentity): SftpSettings = copy(sftpIdentity = Option(value))
def withProxy(value: Proxy): SftpSettings = copy(proxy = Some(value))
def withMaxUnconfirmedReads(value: Int): SftpSettings = copy(maxUnconfirmedReads = value)

private def copy(
host: java.net.InetAddress = host,
Expand All @@ -289,15 +292,17 @@ final class SftpSettings private (
strictHostKeyChecking: Boolean = strictHostKeyChecking,
knownHosts: Option[String] = knownHosts,
sftpIdentity: Option[SftpIdentity] = sftpIdentity,
proxy: Option[Proxy] = proxy
proxy: Option[Proxy] = proxy,
maxUnconfirmedReads: Int = maxUnconfirmedReads
): SftpSettings = new SftpSettings(
host = host,
port = port,
credentials = credentials,
strictHostKeyChecking = strictHostKeyChecking,
knownHosts = knownHosts,
sftpIdentity = sftpIdentity,
proxy = proxy
proxy = proxy,
maxUnconfirmedReads = maxUnconfirmedReads
)

override def toString =
Expand All @@ -308,7 +313,8 @@ final class SftpSettings private (
s"strictHostKeyChecking=$strictHostKeyChecking," +
s"knownHosts=$knownHosts," +
s"sftpIdentity=$sftpIdentity," +
s"proxy=$proxy)"
s"proxy=$proxy," +
s"maxUnconfirmedReads=$maxUnconfirmedReads)"
}

/**
Expand All @@ -327,7 +333,8 @@ object SftpSettings {
strictHostKeyChecking = true,
knownHosts = None,
sftpIdentity = None,
proxy = None
proxy = None,
maxUnconfirmedReads = 1
)

/** Java API */
Expand Down
24 changes: 24 additions & 0 deletions ftp/src/test/java/docs/javadsl/SftpRetrievingExample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package docs.javadsl;

// #retrieving-with-unconfirmed-reads

import akka.stream.IOResult;
import akka.stream.alpakka.ftp.SftpSettings;
import akka.stream.alpakka.ftp.javadsl.Sftp;
import akka.stream.javadsl.Source;
import akka.util.ByteString;

import java.util.concurrent.CompletionStage;

public class SftpRetrievingExample {

public Source<ByteString, CompletionStage<IOResult>> retrieveFromPath(
String path, SftpSettings settings) throws Exception {
return Sftp.fromPath(path, settings.withMaxUnconfirmedReads(64));
}
}
// #retrieving-with-unconfirmed-reads
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,20 @@ final class StrictHostCheckingSftpSourceSpec extends BaseSftpSpec with CommonFtp
)
}

final class UnconfirmedReadsSftpSourceSpec extends BaseSftpSpec with CommonFtpStageSpec {
override val settings = SftpSettings(
InetAddress.getByName(HOSTNAME)
).withPort(PORT)
.withCredentials(FtpCredentials.create("username", "wrong password"))
.withStrictHostKeyChecking(true)
.withKnownHosts(getKnownHostsFile.getPath)
.withSftpIdentity(
SftpIdentity
.createFileSftpIdentity(getClientPrivateKeyFile.getPath, ClientPrivateKeyPassphrase)
)
.withMaxUnconfirmedReads(8)
}

trait CommonFtpStageSpec extends BaseSpec with Eventually {

implicit val system = getSystem
Expand Down
17 changes: 16 additions & 1 deletion ftp/src/test/scala/docs/scaladsl/scalaExamples.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/

package docs.scaladsl
import akka.stream.alpakka.ftp.{FtpFile, FtpSettings}
import akka.stream.alpakka.ftp.{FtpFile, FtpSettings, SftpSettings}

object scalaExamples {

Expand Down Expand Up @@ -44,6 +44,21 @@ object scalaExamples {
//#retrieving
}

object retrievingUnconfirmedReads {
//#retrieving-with-unconfirmed-reads
import akka.stream.IOResult
import akka.stream.alpakka.ftp.scaladsl.Sftp
import akka.stream.scaladsl.Source
import akka.util.ByteString

import scala.concurrent.Future

def retrieveFromPath(path: String, settings: SftpSettings): Source[ByteString, Future[IOResult]] =
Sftp.fromPath(path, settings.withMaxUnconfirmedReads(64))

//#retrieving-with-unconfirmed-reads
}

object removing {
//#removing
import akka.stream.IOResult
Expand Down

0 comments on commit 10c268d

Please sign in to comment.