Skip to content

Commit

Permalink
MongoDb: Sink (#164) and Flow
Browse files Browse the repository at this point in the history
* Add mongo sink
* add mongo update and delete operations
* document mongo update and delete operations.
* Seperate sink and flow.
  • Loading branch information
stephennancekivell authored and ennru committed Dec 6, 2017
1 parent 5a0d8a5 commit 52f5c06
Show file tree
Hide file tree
Showing 7 changed files with 429 additions and 8 deletions.
2 changes: 1 addition & 1 deletion docs/src/main/paradox/cassandra.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Cassandra Connector

The Cassandra connector provides a way to provide the result of a Cassandra query as a stream of rows.
The Cassandra connector allows you to read and write to Cassandra. You can query a stream of rows from @scaladoc[CassandraSource](akka.stream.alpakka.cassandra.scaladsl.CassandraSource$) or use prepared statements to insert or update with @scaladoc[CassandraSink](akka.stream.alpakka.cassandra.scaladsl.CassandraSink$).

## Artifacts

Expand Down
37 changes: 36 additions & 1 deletion docs/src/main/paradox/mongodb.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# MongoDB Connector

The MongoDB connector provides a way to provide the result of a MongoDB query as a stream of documents.
The MongoDB connector allows you to read and save documents. You can query as a stream of documents from @scaladoc[MongoSource](akka.stream.alpakka.mongodb.scaladsl.MongoSource$) or update documents in a collection with @scaladoc[MongoSink](akka.stream.alpakka.mongodb.scaladsl.MongoSink$).

This connector is based off the [mongo-scala-driver](https://github.com/mongodb/mongo-scala-driver) and does not have a java interface.

## Artifacts

Expand Down Expand Up @@ -38,6 +40,39 @@ Scala

Here we used a basic sink to complete the stream by collecting all of the stream elements to a collection. The power of streams comes from building larger data pipelines which leverage backpressure to ensure efficient flow control. Feel free to edit the example code and build @extref[more advanced stream topologies](akka-docs:scala/stream/stream-introduction).

### Flow and Sink Usage

Each of these sink factory methods have a corresponding factory in @scaladoc[insertOne](akka.stream.alpakka.mongodb.scaladsl.MongoFlow) which will emit the written document or result of the operation downstream.

#### Insert

We can use a Source of documents to save them to a mongo collection using @scaladoc[insertOne](akka.stream.alpakka.mongodb.scaladsl.MongoSink$#insertOne) or @scaladoc[insertMany](akka.stream.alpakka.mongodb.scaladsl.MongoSink$#insertMany).


Scala
: @@snip (../../../../mongodb/src/test/scala/akka/stream/alpakka/mongodb/MongoSinkSpec.scala) { #insertOne }

#### Insert Many

Insert many can be used if you have a collection of documents to insert at once.

Scala
: @@snip (../../../../mongodb/src/test/scala/akka/stream/alpakka/mongodb/MongoSinkSpec.scala) { #insertMany }

#### Update

We can update documents with a Source of @scaladoc[DocumentUpdate](akka.stream.alpakka.mongodb.scaladsl.DocumentUpdate) which is a filter and a update definition.
Use either @scaladoc[updateOne](akka.stream.alpakka.mongodb.scaladsl.MongoSink$#updateOne) or @scaladoc[updateMany](akka.stream.alpakka.mongodb.scaladsl.MongoSink$#updateMany) if the filter should target one or many documents.

Scala
: @@snip (../../../../mongodb/src/test/scala/akka/stream/alpakka/mongodb/MongoSinkSpec.scala) { #updateOne }

#### Delete
We can delete documents with a Source of filters. Use either @scaladoc[deleteOne](akka.stream.alpakka.mongodb.scaladsl.MongoSink$#deleteOne) or @scaladoc[deleteMany](akka.stream.alpakka.mongodb.scaladsl.MongoSink$#deleteMany) if the filter should target one or many documents.

Scala
: @@snip (../../../../mongodb/src/test/scala/akka/stream/alpakka/mongodb/MongoSinkSpec.scala) { #deleteOne }

### Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.mongodb.scaladsl

import org.mongodb.scala.bson.conversions.Bson

/**
*
* @param filter a document describing the query filter, which may not be null. This can be of any type for which a { @code Codec} is
* registered
* @param update a document describing the update, which may not be null. The update to apply must include only update operators. This
* can be of any type for which a { @code Codec} is registered
*/
final case class DocumentUpdate(filter: Bson, update: Bson)
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.mongodb.scaladsl

import akka.stream.scaladsl.Flow
import akka.NotUsed
import org.mongodb.scala.bson.conversions.Bson
import org.mongodb.scala.model.UpdateOptions
import org.mongodb.scala.result.{DeleteResult, UpdateResult}
import org.mongodb.scala.{Document, MongoCollection}

import scala.concurrent.ExecutionContext

object MongoFlow {

/**
* A [[Flow]] that will insert documents into a collection.
* @param parallelism number of documents to insert in parallel.
* @param collection mongo db collection to insert to.
*/
def insertOne(parallelism: Int, collection: MongoCollection[Document])(
implicit executionContext: ExecutionContext
): Flow[Document, Document, NotUsed] =
Flow[Document]
.mapAsync(parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))

/**
* A [[Flow]] that will insert batches documents into a collection.
* @param parallelism number of batches of documents to insert in parallel.
* @param collection mongo db collection to insert to.
*/
def insertMany(parallelism: Int, collection: MongoCollection[Document])(
implicit executionContext: ExecutionContext
): Flow[Seq[Document], Seq[Document], NotUsed] =
Flow[Seq[Document]].mapAsync(parallelism)(docs => collection.insertMany(docs).toFuture().map(_ => docs))

/**
* A [[Flow]] that will update documents as defined by a [[DocumentUpdate]].
*
* @param parallelism the number of documents to update in parallel.
* @param collection the mongo db collection to update.
* @param maybeUpdateOptions optional additional [[UpdateOptions]]
*/
def updateOne(
parallelism: Int,
collection: MongoCollection[Document],
maybeUpdateOptions: Option[UpdateOptions] = None
)(implicit executionContext: ExecutionContext): Flow[DocumentUpdate, (UpdateResult, DocumentUpdate), NotUsed] =
maybeUpdateOptions match {
case None =>
Flow[DocumentUpdate].mapAsync(parallelism)(
documentUpdate =>
collection.updateOne(documentUpdate.filter, documentUpdate.update).toFuture().map(_ -> documentUpdate)
)
case Some(options) =>
Flow[DocumentUpdate].mapAsync(parallelism)(
documentUpdate =>
collection
.updateOne(documentUpdate.filter, documentUpdate.update, options)
.toFuture()
.map(_ -> documentUpdate)
)
}

/**
* A [[Flow]] that will update many documents as defined by a [[DocumentUpdate]].
*
* @param parallelism the number of documents to update in parallel.
* @param collection the mongo db collection to update.
* @param maybeUpdateOptions optional additional [[UpdateOptions]]
*/
def updateMany(
parallelism: Int,
collection: MongoCollection[Document],
maybeUpdateOptions: Option[UpdateOptions] = None
)(implicit executionContext: ExecutionContext): Flow[DocumentUpdate, (UpdateResult, DocumentUpdate), NotUsed] =
maybeUpdateOptions match {
case None =>
Flow[DocumentUpdate].mapAsync(parallelism)(
documentUpdate =>
collection.updateMany(documentUpdate.filter, documentUpdate.update).toFuture().map(_ -> documentUpdate)
)
case Some(options) =>
Flow[DocumentUpdate].mapAsync(parallelism)(
documentUpdate =>
collection
.updateMany(documentUpdate.filter, documentUpdate.update, options)
.toFuture()
.map(_ -> documentUpdate)
)
}

/**
* A [[Flow]] that will delete individual documents as defined by a [[Bson]] filter query.
*
* @param parallelism the number of documents to delete in parallel.
* @param collection the mongo db collection to update.
*/
def deleteOne(parallelism: Int, collection: MongoCollection[Document])(
implicit executionContext: ExecutionContext
): Flow[Bson, (DeleteResult, Bson), NotUsed] =
Flow[Bson].mapAsync(parallelism)(bson => collection.deleteOne(bson).toFuture().map(_ -> bson))

/**
* A [[Flow]] that will delete many documents as defined by a [[Bson]] filter query.
*
* @param parallelism the number of documents to delete in parallel.
* @param collection the mongo db collection to update.
*/
def deleteMany(parallelism: Int, collection: MongoCollection[Document])(
implicit executionContext: ExecutionContext
): Flow[Bson, (DeleteResult, Bson), NotUsed] =
Flow[Bson].mapAsync(parallelism)(bson => collection.deleteMany(bson).toFuture().map(_ -> bson))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.mongodb.scaladsl

import akka.stream.scaladsl.{Keep, Sink}
import akka.Done
import org.mongodb.scala.bson.conversions.Bson
import org.mongodb.scala.model.UpdateOptions
import org.mongodb.scala.{Document, MongoCollection}

import scala.concurrent.{ExecutionContext, Future}

object MongoSink {

/**
* A [[Sink]] that will insert documents into a collection.
* @param parallelism number of documents to insert in parallel.
* @param collection mongo db collection to insert to.
*/
def insertOne(parallelism: Int, collection: MongoCollection[Document])(
implicit executionContext: ExecutionContext
): Sink[Document, Future[Done]] =
MongoFlow.insertOne(parallelism, collection).toMat(Sink.ignore)(Keep.right)

/**
* A [[Sink]] that will insert batches of documents into a collection.
* @param parallelism number of batches of documents to insert in parallel.
* @param collection mongo db collection to insert to.
*/
def insertMany(parallelism: Int, collection: MongoCollection[Document])(
implicit executionContext: ExecutionContext
): Sink[Seq[Document], Future[Done]] =
MongoFlow.insertMany(parallelism, collection).toMat(Sink.ignore)(Keep.right)

/**
* A [[Sink]] that will update documents as defined by a [[DocumentUpdate]].
*
* @param parallelism the number of documents to update in parallel.
* @param collection the mongo db collection to update.
* @param maybeUpdateOptions optional additional [[UpdateOptions]]
*/
def updateOne(
parallelism: Int,
collection: MongoCollection[Document],
maybeUpdateOptions: Option[UpdateOptions] = None
)(implicit executionContext: ExecutionContext): Sink[DocumentUpdate, Future[Done]] =
MongoFlow.updateOne(parallelism, collection, maybeUpdateOptions).toMat(Sink.ignore)(Keep.right)

/**
* A [[Sink]] that will update many documents as defined by a [[DocumentUpdate]].
*
* @param parallelism the number of documents to update in parallel.
* @param collection the mongo db collection to update.
* @param maybeUpdateOptions optional additional [[UpdateOptions]]
*/
def updateMany(
parallelism: Int,
collection: MongoCollection[Document],
maybeUpdateOptions: Option[UpdateOptions] = None
)(implicit executionContext: ExecutionContext): Sink[DocumentUpdate, Future[Done]] =
MongoFlow.updateMany(parallelism, collection, maybeUpdateOptions).toMat(Sink.ignore)(Keep.right)

/**
* A [[Sink]] that will delete individual documents as defined by a [[Bson]] filter query.
*
* @param parallelism the number of documents to delete in parallel.
* @param collection the mongo db collection to update.
*/
def deleteOne(parallelism: Int, collection: MongoCollection[Document])(
implicit executionContext: ExecutionContext
): Sink[Bson, Future[Done]] =
MongoFlow.deleteOne(parallelism, collection).toMat(Sink.ignore)(Keep.right)

/**
* A [[Sink]] that will delete many documents as defined by a [[Bson]] filter query.
*
* @param parallelism the number of documents to delete in parallel.
* @param collection the mongo db collection to update.
*/
def deleteMany(parallelism: Int, collection: MongoCollection[Document])(
implicit executionContext: ExecutionContext
): Sink[Bson, Future[Done]] =
MongoFlow.deleteMany(parallelism, collection).toMat(Sink.ignore)(Keep.right)

}
Loading

0 comments on commit 52f5c06

Please sign in to comment.