Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FTP: Enable SFTP connector to issue more than one unconfirmed read request #2567

Merged
merged 3 commits into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/src/main/paradox/ftp.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ Scala
Java
: @@snip [snip](/ftp/src/test/java/docs/javadsl/ConfigureCustomSSHClient.java) { #configure-custom-ssh-client }

### Improving SFTP throughput
For SFTP connection allowing more than one unconfirmed read request to be sent by the client you can use `withMaxUnconfirmedReads` on @scaladoc[SftpSettings](akka.stream.alpakka.ftp.SftpSettings)
conorgriffin marked this conversation as resolved.
Show resolved Hide resolved
Command-line `sftp` uses a value of 64 by default. This can significantly improve throughput by reducing the impact of latency.
conorgriffin marked this conversation as resolved.
Show resolved Hide resolved

Scala
: @@snip [snip](/ftp/src/test/scala/docs/scaladsl/scalaExamples.scala) { #retrieving-with-unconfirmed-reads }

Java
: @@snip [snip](/ftp/src/test/java/docs/javadsl/SftpRetrievingExample.java) { #retrieving-with-unconfirmed-reads }

## Traversing a remote FTP folder recursively

In order to traverse a remote folder recursively, you need to use the `ls` method in the FTP API:
Expand Down
3 changes: 3 additions & 0 deletions ftp/src/main/mima-filters/2.0.2.backwards.excludes
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Allow change to SFTPSettings
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.ftp.SftpSettings.this")

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import akka.util.ByteString.ByteString1C

import scala.concurrent.{Future, Promise}
import java.io.{IOException, InputStream, OutputStream}

import akka.annotation.InternalApi

import scala.util.control.NonFatal
Expand Down Expand Up @@ -112,12 +111,29 @@ private[ftp] trait FtpIOSourceStage[FtpClient, S <: RemoteFileSettings]

protected[this] def doPreStart(): Unit =
isOpt = ftpLike match {
case ur: UnconfirmedReads =>
withUnconfirmedReads(ur)
case ro: RetrieveOffset =>
Some(ro.retrieveFileInputStream(path, handler.get.asInstanceOf[ro.Handler], offset).get)
case _ =>
Some(ftpLike.retrieveFileInputStream(path, handler.get).get)
}

private def withUnconfirmedReads(
ftpLikeWithUnconfirmedReads: FtpLike[FtpClient, S] with UnconfirmedReads
): Option[InputStream] =
connectionSettings match {
case s: SftpSettings =>
Some(
ftpLikeWithUnconfirmedReads
.retrieveFileInputStream(path,
handler.get.asInstanceOf[ftpLikeWithUnconfirmedReads.Handler],
offset,
s.maxUnconfirmedReads)
.get
)
}

protected[this] def matSuccess(): Boolean =
matValuePromise.trySuccess(IOResult.createSuccessful(readBytesTotal))

Expand Down
13 changes: 12 additions & 1 deletion ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ protected[ftp] trait RetrieveOffset { _: FtpLike[_, _] =>

}

/**
* INTERNAL API
*/
@InternalApi
protected[ftp] trait UnconfirmedReads { _: FtpLike[_, _] =>

def retrieveFileInputStream(name: String, handler: Handler, offset: Long, maxUnconfirmedReads: Int): Try[InputStream]

}

/**
* INTERNAL API
*/
Expand All @@ -59,5 +69,6 @@ object FtpLike {
// type class instances
implicit val ftpLikeInstance = new FtpLike[FTPClient, FtpSettings] with RetrieveOffset with FtpOperations
implicit val ftpsLikeInstance = new FtpLike[FTPSClient, FtpsSettings] with RetrieveOffset with FtpsOperations
implicit val sFtpLikeInstance = new FtpLike[SSHClient, SftpSettings] with RetrieveOffset with SftpOperations
implicit val sFtpLikeInstance =
new FtpLike[SSHClient, SftpSettings] with RetrieveOffset with SftpOperations with UnconfirmedReads
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.commons.net.DefaultSocketFactory

import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.util.{Failure, Success, Try}
import scala.util.{Failure, Try}

/**
* INTERNAL API
Expand Down Expand Up @@ -123,15 +123,34 @@ private[ftp] trait SftpOperations { _: FtpLike[SSHClient, SftpSettings] =>
retrieveFileInputStream(name, handler, 0L)

def retrieveFileInputStream(name: String, handler: Handler, offset: Long): Try[InputStream] =
retrieveFileInputStream(name, handler, offset, 1)

def retrieveFileInputStream(name: String,
handler: Handler,
offset: Long,
maxUnconfirmedReads: Int): Try[InputStream] =
Try {
val remoteFile = handler.open(name, java.util.EnumSet.of(OpenMode.READ))
val is = new remoteFile.RemoteFileInputStream(offset) {

override def close(): Unit =
try {
super.close()
} finally {
remoteFile.close()
val is = maxUnconfirmedReads match {
case m if m > 1 =>
new remoteFile.ReadAheadRemoteFileInputStream(m, offset) {

override def close(): Unit =
try {
super.close()
} finally {
remoteFile.close()
}
}
case _ =>
new remoteFile.RemoteFileInputStream(offset) {

override def close(): Unit =
try {
super.close()
} finally {
remoteFile.close()
}
}
}
Option(is).getOrElse {
Expand Down
17 changes: 12 additions & 5 deletions ftp/src/main/scala/akka/stream/alpakka/ftp/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ object FtpsSettings {
* @param knownHosts known hosts file to be used when connecting
* @param sftpIdentity private/public key config to use when connecting
* @param proxy An optional proxy to use when connecting with these settings
* @param maxUnconfirmedReads determines the number of read requests sent in parallel, disabled if set to <=1
*/
final class SftpSettings private (
val host: java.net.InetAddress,
Expand All @@ -270,7 +271,8 @@ final class SftpSettings private (
val strictHostKeyChecking: Boolean,
val knownHosts: Option[String],
val sftpIdentity: Option[SftpIdentity],
val proxy: Option[Proxy]
val proxy: Option[Proxy],
val maxUnconfirmedReads: Int = 1
) extends RemoteFileSettings {

def withHost(value: java.net.InetAddress): SftpSettings = copy(host = value)
Expand All @@ -281,6 +283,7 @@ final class SftpSettings private (
def withKnownHosts(value: String): SftpSettings = copy(knownHosts = Option(value))
def withSftpIdentity(value: SftpIdentity): SftpSettings = copy(sftpIdentity = Option(value))
def withProxy(value: Proxy): SftpSettings = copy(proxy = Some(value))
def withMaxUnconfirmedReads(value: Int): SftpSettings = copy(maxUnconfirmedReads = value)

private def copy(
host: java.net.InetAddress = host,
Expand All @@ -289,15 +292,17 @@ final class SftpSettings private (
strictHostKeyChecking: Boolean = strictHostKeyChecking,
knownHosts: Option[String] = knownHosts,
sftpIdentity: Option[SftpIdentity] = sftpIdentity,
proxy: Option[Proxy] = proxy
proxy: Option[Proxy] = proxy,
maxUnconfirmedReads: Int = maxUnconfirmedReads
): SftpSettings = new SftpSettings(
host = host,
port = port,
credentials = credentials,
strictHostKeyChecking = strictHostKeyChecking,
knownHosts = knownHosts,
sftpIdentity = sftpIdentity,
proxy = proxy
proxy = proxy,
maxUnconfirmedReads = maxUnconfirmedReads
)

override def toString =
Expand All @@ -308,7 +313,8 @@ final class SftpSettings private (
s"strictHostKeyChecking=$strictHostKeyChecking," +
s"knownHosts=$knownHosts," +
s"sftpIdentity=$sftpIdentity," +
s"proxy=$proxy)"
s"proxy=$proxy," +
s"maxUnconfirmedReads=$maxUnconfirmedReads)"
}

/**
Expand All @@ -327,7 +333,8 @@ object SftpSettings {
strictHostKeyChecking = true,
knownHosts = None,
sftpIdentity = None,
proxy = None
proxy = None,
maxUnconfirmedReads = 1
)

/** Java API */
Expand Down
24 changes: 24 additions & 0 deletions ftp/src/test/java/docs/javadsl/SftpRetrievingExample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package docs.javadsl;

// #retrieving-with-unconfirmed-reads

import akka.stream.IOResult;
import akka.stream.alpakka.ftp.SftpSettings;
import akka.stream.alpakka.ftp.javadsl.Sftp;
import akka.stream.javadsl.Source;
import akka.util.ByteString;

import java.util.concurrent.CompletionStage;

public class SftpRetrievingExample {

public Source<ByteString, CompletionStage<IOResult>> retrieveFromPath(
String path, SftpSettings settings) throws Exception {
return Sftp.fromPath(path, settings.withMaxUnconfirmedReads(64));
}
}
// #retrieving-with-unconfirmed-reads
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,20 @@ final class StrictHostCheckingSftpSourceSpec extends BaseSftpSpec with CommonFtp
)
}

final class UnconfirmedReadsSftpSourceSpec extends BaseSftpSpec with CommonFtpStageSpec {
override val settings = SftpSettings(
InetAddress.getByName(HOSTNAME)
).withPort(PORT)
.withCredentials(FtpCredentials.create("username", "wrong password"))
.withStrictHostKeyChecking(true)
.withKnownHosts(getKnownHostsFile.getPath)
.withSftpIdentity(
SftpIdentity
.createFileSftpIdentity(getClientPrivateKeyFile.getPath, ClientPrivateKeyPassphrase)
)
.withMaxUnconfirmedReads(8)
}

trait CommonFtpStageSpec extends BaseSpec with Eventually {

implicit val system = getSystem
Expand Down
17 changes: 16 additions & 1 deletion ftp/src/test/scala/docs/scaladsl/scalaExamples.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/

package docs.scaladsl
import akka.stream.alpakka.ftp.{FtpFile, FtpSettings}
import akka.stream.alpakka.ftp.{FtpFile, FtpSettings, SftpSettings}

object scalaExamples {

Expand Down Expand Up @@ -44,6 +44,21 @@ object scalaExamples {
//#retrieving
}

object retrievingUnconfirmedReads {
//#retrieving-with-unconfirmed-reads
import akka.stream.IOResult
import akka.stream.alpakka.ftp.scaladsl.Sftp
import akka.stream.scaladsl.Source
import akka.util.ByteString

import scala.concurrent.Future

def retrieveFromPath(path: String, settings: SftpSettings): Source[ByteString, Future[IOResult]] =
Sftp.fromPath(path, settings.withMaxUnconfirmedReads(64))

//#retrieving-with-unconfirmed-reads
}

object removing {
//#removing
import akka.stream.IOResult
Expand Down