-
Notifications
You must be signed in to change notification settings - Fork 641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HDFS: Add sources and flows #965
Conversation
60fe76e
to
3bbd57c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a first look. Very good stuff.
How should we think about the HDFS version? Would it work with 2.x?
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com> | ||
*/ | ||
|
||
package akka.stream.alpakka.hdfs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great to move internal stuff into an impl
package. That would improve Java Module readiness.
* @param compressionCodec a class encapsulates a streaming compression/decompression pair. | ||
* @param settings Hdfs writing settings | ||
*/ | ||
def compressed( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To reduce the API, you could just have HdfsSink.data
and let the users connect to Sink.ignore
for the other cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I could not understand. HdfsSink.data
, HdfsSink.compressed
, HdfsSink.sequence
are completely different. What exactly do you mean by letting users connect to Sink.ignore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think users needing these cases might as well use the HdfsFlow
and connect it to Sink.ignore
themselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, then can we remove Sink implementations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, just keep the most basic ones. Most users discover they'll need to do something after sending/writing to the destination.
Speaking of that, what we most often need is some kind of pass-through, a value untouched by the flow, but available afterwards (eg. Kafka offsets for committing). Have you thought about that? It would require a wrapper for Writable
, AFAICS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I was thinking about pass-through
like Solr connector. I was just not sure to implement. It will be added soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to consult about it. I was thinking the best strategy for it. Currently, the flow expects ByteString
as an input. Whenever it rotates the output, it pushes WriteLog
to the downstream.
If we want to implement pass-through,
how shall we design it? The dumbest way is
final case class WriteLog[T](path: String, rotation: Int, passThroughs: Seq[T])
However, if we have millions input, keeping this sequence in memory until the flow rotates is super inefficient.
The second idea is
sealed trait OutgoingMessage
final case class RotationMessage(path: String, rotation: Int) extends OutgoingMessage
final case class PassThrough[T](pass: T) extends OutgoingMessage
So flow can push RotationMessage
when it rotates, and for the rest, it will push PassThrough
. However, this has a drawback. Let's talk about Kafka example, we write the input to output and send PassThrough
message with an offset. If something goes wrong in the flow, and if it does not synchronize the output, we will basically fail, but downstream can already commit this offset.
I actually did not like this idea also. Do you have any idea for it?
project/Dependencies.scala
Outdated
Seq( | ||
libraryDependencies ++= Seq( | ||
"org.apache.hadoop" % "hadoop-client" % hadoopVersion, // ApacheV2 | ||
"org.typelevel" %% "cats-core" % catsVersion, // MIT, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might upgrade to Cats 1.1.0.
I am not sure about the HDFS version. How can we provide support for 2.x also? Is there any example for it in Alpakka connectors? |
No there is no example right now. One way of doing it would be to make the HDFS dependency optional and let the users add their version explicitly. |
3515734
to
b687ad5
Compare
I have rearranged the project structure with Can you also check the commit b687ad5 , I have realized that we need to use |
The build error is an extra comma which is not supported in Scala 2.11 in |
We build on Scala 2.11 and 2.12. Since 2.12 you're allowed to have a superfluous comma as in
and that happens to be in there in |
Yes, my late time commit fault :) |
I had a comment in the outdated discussion, so I am posting again here as a new discussion. I would like to consult about it. I was thinking the best strategy for it. Currently, the flow expects If we want to implement
However, if we have millions input, keeping this sequence in memory until the flow rotates is super inefficient. The second idea is
So flow can push I actually did not like this idea also. Do you have any idea for it? |
Java tests added, and some API simplified for Java usages. |
Emitting a message for every incoming is the only reasonable way. case class HdfsWritten[T](passThrough: T, status: Option[RotatationMessage]) |
@ennru did you have time to have a first look at pass-through. |
Here is an update for different versions. I have tested from 2.x to 3.x. Tests passed successfully. Moreover, I have published the library locally, and override the Hadoop version with 2.6 because we use Hadoop 2.6. Data ingestion was smooth. It looks like we do not have any problem with different versions. I added a text in the documentation that mentions the default version and the way to override the default one. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great!
You asked earlier about it: classes that belong to the API may not be hidden in the impl
package.
I wonder if making the rotation strategy extendable would be important, some might want to implement a combined time/size rotation strategy.
|
||
import scala.concurrent.duration.FiniteDuration | ||
|
||
sealed trait RotationStrategy extends Strategy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least the RotationStrategy
interface should be in the public part of the module, as it is part of the API.
It might make sense to have it extendable.
I have added Sources and made |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm really impressed, this is great work.
docs/src/main/paradox/hdfs.md
Outdated
|
||
### Compressed Data Writer | ||
|
||
First, create `CompressionCodec`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By adding
"javadoc.org.apache.hadoop.base_url" -> s"https://hadoop.apache.org/docs/r${hadoopVersion}/api/",
in build.sbt
you can create links to Hadoop's API via @javadoc
.
docs/src/main/paradox/hdfs.md
Outdated
|
||
`FilePathGenerator` provides a functionality to generate rotation path in HDFS. | ||
@scala[@scaladoc[FilePathGenerator](akka.stream.alpakka.hdfs.FilePathGenerator$).] | ||
@java[@scaladoc[FilePathGenerator](akka.stream.alpakka.hdfs.FilePathGenerator$).] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to use @scala/@java
when linking to the same class.
/** | ||
* Internal API | ||
*/ | ||
private[hdfs] final class HdfsFlowStage[W, I, C]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add even @akka.annotation.InternalApi
as private[hdfs]
doesn't protect from Java users using it.
final case class IncomingMessage[T, C](source: T, passThrough: C) | ||
|
||
object IncomingMessage { | ||
// Apply method to use when not using passThrough |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make these doc comments, please.
HdfsWritingSettings() | ||
} | ||
|
||
final case class IncomingMessage[T, C](source: T, passThrough: C) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not super happy with the IncomingMessage
name used in several connectors. It can easily become messy when using several connectors in the same codebase. And "incoming" is tied to the point of view of the stage, for the user the data leaves...
Maybe HdfsWriteMessage
or HdfsData
?
* @param rotationStrategy rotation strategy | ||
* @param settings hdfs writing settings | ||
*/ | ||
def dataWithPassThrough[C]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would P
make an easier type parameter name?
|
||
sealed abstract class OutgoingMessage[+T] | ||
final case class RotationMessage(path: String, rotation: Int) extends OutgoingMessage[Nothing] | ||
final case class WrittenMessage[T](passThrough: T, inRotation: Int) extends OutgoingMessage[T] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document the inRotation
value.
|
||
private[writer] object HdfsWriter { | ||
|
||
val NewLineByteArray: Array[Byte] = ByteString(System.getProperty("line.separator")).toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it useful to use the system's separator? It could be provided via the settings instead.
|
||
override def preStart(): Unit = { | ||
// Schedule timer to rotate output file | ||
initialRotationStrategy match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be expressed by something in RotationStrategy
instead of the type so it becomes extendible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have any hint about this? How can i have a strategy in RotationStrategy for preStart. I would consider to pass stateLogic as a parameter and call schedule there but these methods are not visible. How can i trigger schedule in RotationStrategy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could add an interval: Option[FiniteDuration]
to it and use foreach
in pre-start to schedule the poll.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not like the idea of having an optional field in rotation strategy and have a logic for it. I shared the scheduling API with implementation package and use it in new method(preStart) of RotationStrategy. Please check e6135a7 and if you do not like it, I will figure out something else.
* Java API: creates a Flow with [[akka.stream.alpakka.hdfs.impl.HdfsFlowStage]] | ||
* for [[org.apache.hadoop.fs.FSDataOutputStream]] | ||
* | ||
* @param fs file system |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hadoop file system
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking very good! The only larger change I would like to see is to add the IODispatcher attributes where necessary.
private static MiniDFSCluster hdfsCluster = null; | ||
private static ActorSystem system; | ||
private static ActorMaterializer materializer; | ||
private static String destionation = JavaTestUtils.destination(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/destionation/destination
class HdfsReaderSpec extends WordSpecLike with Matchers with BeforeAndAfterAll with BeforeAndAfterEach { | ||
|
||
private var hdfsCluster: MiniDFSCluster = _ | ||
private val destionation = "/tmp/alpakka/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/destionation/destination
private static MiniDFSCluster hdfsCluster = null; | ||
private static ActorSystem system; | ||
private static ActorMaterializer materializer; | ||
private static String destionation = JavaTestUtils.destination(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/destionation/destination
class HdfsWriterSpec extends WordSpecLike with Matchers with BeforeAndAfterAll with BeforeAndAfterEach { | ||
|
||
private var hdfsCluster: MiniDFSCluster = _ | ||
private val destionation = "/tmp/alpakka/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/destionation/destination
private val out = Outlet[OutgoingMessage[C]](Logging.simpleName(this) + ".out") | ||
|
||
override val shape: FlowShape[HdfsWriteMessage[I, C], OutgoingMessage[C]] = FlowShape(in, out) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the underlying writing abstraction is java.io.OutputStream
which is blocking, we have to signal to the Akka Stream materializer that it should materialize HdfsFlowStage to a separate dispatcher. This allows other parts of the stream to continue uninpacted when this stage is going to block a thread during writing operations.
Therefore add the following here:
override def initialAttributes: Attributes =
super.initialAttributes and ActorAttributes.IODispatcher
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this comment. While I was running it, i was also profiling the app. I guess this is a nice improvement.
} | ||
.takeWhile(_._1) | ||
.map(_._2) | ||
Source.fromIterator(() => it) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume that SequenceFile.Reader
s next
operation is blocking as well. We will have to put this source on a separate dispatcher then as well: .addAttributes(Attributes(ActorAttributes.IODispatcher))
.
codec: CompressionCodec, | ||
chunkSize: Int = 8192 | ||
): Source[ByteString, Future[IOResult]] = | ||
StreamConverters.fromInputStream(() => codec.createInputStream(fs.open(path)), chunkSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A source created by fromInputStream
already runs on the IO dispatcher, therefore it is fine here.
Source.fromIterator(() => it) | ||
Source | ||
.fromIterator(() => it) | ||
.addAttributes(Attributes(IODispatcher)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use ActorAttributes.IODispatcher
here as well since the DefaultAttributes.IODispatcher
is in the impl
package.
Thank you for the ping. Looking very good. Just one last nitpick an we are good to merge. |
Done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome work @burakkose!
Ref: #557
In this pull request, you will find the initial work for Hdfs. The PR is work in progress. There are still some TODOs.
While I am working on these, please review the code, suggest new functionalities, and help for testing.