From 977218e10c992f88ac34e49b986d7e37eb00163b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lech=20G=C5=82owiak?= Date: Mon, 1 Jul 2019 10:48:22 +0100 Subject: [PATCH] Google Cloud Pub/sub: 'messageId' and 'publishTime' should not be published (#1795) * #1786 - 'messageId' and 'publishTime' should not be published nor required when creating PubSubMessage for publishing. --- .../googlecloud/pubsub/impl/PubSubApi.scala | 19 ++++++- .../alpakka/googlecloud/pubsub/model.scala | 12 +++- .../java/docs/javadsl/ExampleUsageJava.java | 3 +- .../googlecloud/pubsub/GooglePubSubSpec.scala | 4 +- .../pubsub/impl/PubSubApiSpec.scala | 56 +++++++++++++++---- .../scala/docs/scaladsl/ExampleUsage.scala | 2 +- 6 files changed, 76 insertions(+), 20 deletions(-) diff --git a/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/impl/PubSubApi.scala b/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/impl/PubSubApi.scala index 135cead288..d49864a2f9 100644 --- a/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/impl/PubSubApi.scala +++ b/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/impl/PubSubApi.scala @@ -17,7 +17,7 @@ import akka.http.scaladsl.unmarshalling.Unmarshal import akka.stream.Materializer import akka.stream.alpakka.googlecloud.pubsub._ import spray.json.DefaultJsonProtocol._ -import spray.json.{deserializationError, DefaultJsonProtocol, JsString, JsValue, RootJsonFormat} +import spray.json.{deserializationError, DefaultJsonProtocol, JsObject, JsString, JsValue, JsonFormat, RootJsonFormat} import scala.collection.immutable import scala.concurrent.Future @@ -57,7 +57,22 @@ private[pubsub] trait PubSubApi { } override def write(instant: Instant): JsValue = JsString(instant.toString) } - private implicit val pubSubMessageFormat = DefaultJsonProtocol.jsonFormat4(PubSubMessage.apply) + + private implicit val pubSubMessageFormat = { + val defaultFormat = DefaultJsonProtocol.jsonFormat4(PubSubMessage.apply) + val attributesFormat = implicitly[JsonFormat[Option[immutable.Map[String, String]]]] + new RootJsonFormat[PubSubMessage] { + def read(json: JsValue): PubSubMessage = defaultFormat.read(json) + //Do not publish "messageId" nor "publishTime" + def write(m: PubSubMessage): JsValue = { + val fields = List("data" -> JsString(m.data)) ++ m.attributes.map( + attrs => "attributes" -> attributesFormat.write(m.attributes) + ) + JsObject(fields: _*) + } + + } + } private implicit val pubSubRequestFormat = DefaultJsonProtocol.jsonFormat1(PublishRequest.apply) private implicit val gcePubSubResponseFormat = DefaultJsonProtocol.jsonFormat1(PublishResponse) private implicit val receivedMessageFormat = DefaultJsonProtocol.jsonFormat2(ReceivedMessage) diff --git a/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/model.scala b/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/model.scala index ec4ef81061..8bd9efd353 100644 --- a/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/model.scala +++ b/google-cloud-pub-sub/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/model.scala @@ -80,11 +80,12 @@ object PubSubConfig { /** * * @param data the base64 encoded data - * @param messageId the message id + * @param messageId the message id given by server. It must not be populated when publishing. * @param attributes optional extra attributes for this message. * @param publishTime the time the message was published. It must not be populated when publishing. */ final case class PubSubMessage(data: String, + //Should be Option[String]. '""' is used as default when creating messages for publishing. messageId: String, attributes: Option[immutable.Map[String, String]] = None, publishTime: Option[Instant] = None) { @@ -99,9 +100,18 @@ final case class PubSubMessage(data: String, object PubSubMessage { + def apply(data: String): PubSubMessage = PubSubMessage(data, "") + + def apply(data: String, attributes: immutable.Map[String, String]): PubSubMessage = + PubSubMessage(data, "", Some(attributes), None) + /** * Java API: create [[PubSubMessage]] */ + def create(data: String) = + PubSubMessage(data) + + @deprecated("Setting messageId when creating message for publishing is futile.", "1.1.0") def create(data: String, messageId: String) = PubSubMessage(data, messageId) } diff --git a/google-cloud-pub-sub/src/test/java/docs/javadsl/ExampleUsageJava.java b/google-cloud-pub-sub/src/test/java/docs/javadsl/ExampleUsageJava.java index 0e9f058583..6288d5bdd6 100644 --- a/google-cloud-pub-sub/src/test/java/docs/javadsl/ExampleUsageJava.java +++ b/google-cloud-pub-sub/src/test/java/docs/javadsl/ExampleUsageJava.java @@ -55,8 +55,7 @@ private static void example() throws NoSuchAlgorithmException, InvalidKeySpecExc // #publish-single PubSubMessage publishMessage = - PubSubMessage.create( - "1", new String(Base64.getEncoder().encode("Hello Google!".getBytes()))); + PubSubMessage.create(new String(Base64.getEncoder().encode("Hello Google!".getBytes()))); PublishRequest publishRequest = PublishRequest.of(Lists.newArrayList(publishMessage)); Source source = Source.single(publishRequest); diff --git a/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/GooglePubSubSpec.scala b/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/GooglePubSubSpec.scala index aca5eb2164..e6afa20582 100644 --- a/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/GooglePubSubSpec.scala +++ b/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/GooglePubSubSpec.scala @@ -47,7 +47,7 @@ class GooglePubSubSpec extends FlatSpec with MockitoSugar with ScalaFutures with config = config ) - val request = PublishRequest(Seq(PubSubMessage(messageId = "1", data = base64String("Hello Google!")))) + val request = PublishRequest(Seq(PubSubMessage(data = base64String("Hello Google!")))) val source = Source(List(request)) @@ -87,7 +87,7 @@ class GooglePubSubSpec extends FlatSpec with MockitoSugar with ScalaFutures with config = config ) - val request = PublishRequest(Seq(PubSubMessage(messageId = "2", data = base64String("Hello Google!")))) + val request = PublishRequest(Seq(PubSubMessage(data = base64String("Hello Google!")))) val source = Source(List(request)) val result = source.via(flow).runWith(Sink.seq) diff --git a/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/impl/PubSubApiSpec.scala b/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/impl/PubSubApiSpec.scala index 30a329116c..0261ca2985 100644 --- a/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/impl/PubSubApiSpec.scala +++ b/google-cloud-pub-sub/src/test/scala/akka/stream/alpakka/googlecloud/pubsub/impl/PubSubApiSpec.scala @@ -52,15 +52,49 @@ class PubSubApiSpec extends FlatSpec with BeforeAndAfterAll with ScalaFutures wi val publishMessage = PubSubMessage( - messageId = "1", data = new String(Base64.getEncoder.encode("Hello Google!".getBytes)), + attributes = Map("row_id" -> "7") + ) + val publishRequest = PublishRequest(Seq(publishMessage)) + + val expectedPublishRequest = + """{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ==","attributes":{"row_id":"7"}}]}""" + val publishResponse = """{"messageIds":["1"]}""" + + mock.register( + WireMock + .post( + urlEqualTo(s"/v1/projects/${config.projectId}/topics/topic1:publish") + ) + .withRequestBody(WireMock.equalToJson(expectedPublishRequest)) + .withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken)) + .willReturn( + aResponse() + .withStatus(200) + .withBody(publishResponse) + .withHeader("Content-Type", "application/json") + ) + ) + + val result = + TestHttpApi.publish(config.projectId, "topic1", Some(accessToken), publishRequest) + + result.futureValue shouldBe Seq("1") + } + + it should "not send 'messageId' and 'publishTime' when publishing" in { + + val publishMessage = + PubSubMessage( + data = new String(Base64.getEncoder.encode("Hello Google!".getBytes)), + messageId = "my-id", attributes = Some(Map("row_id" -> "7")), publishTime = Some(Instant.parse("2014-10-02T15:01:23.045123456Z")) ) val publishRequest = PublishRequest(Seq(publishMessage)) val expectedPublishRequest = - """{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ==","messageId":"1","attributes":{"row_id":"7"},"publishTime":"2014-10-02T15:01:23.045123456Z"}]}""" + """{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ==","attributes":{"row_id":"7"}}]}""" val publishResponse = """{"messageIds":["1"]}""" mock.register( @@ -93,11 +127,11 @@ class PubSubApiSpec extends FlatSpec with BeforeAndAfterAll with ScalaFutures wi } val publishMessage = - PubSubMessage(messageId = "1", data = new String(Base64.getEncoder.encode("Hello Google!".getBytes))) + PubSubMessage(data = new String(Base64.getEncoder.encode("Hello Google!".getBytes))) val publishRequest = PublishRequest(Seq(publishMessage)) val expectedPublishRequest = - """{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ==","messageId":"1"}]}""" + """{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ=="}]}""" val publishResponse = """{"messageIds":["1"]}""" mock.register( @@ -123,7 +157,7 @@ class PubSubApiSpec extends FlatSpec with BeforeAndAfterAll with ScalaFutures wi it should "Pull with results" in { - val publishMessage = + val message = PubSubMessage(messageId = "1", data = new String(Base64.getEncoder.encode("Hello Google!".getBytes))) val pullResponse = @@ -144,7 +178,7 @@ class PubSubApiSpec extends FlatSpec with BeforeAndAfterAll with ScalaFutures wi ) val result = TestHttpApi.pull(config.projectId, "sub1", Some(accessToken), true, 1000) - result.futureValue shouldBe PullResponse(Some(Seq(ReceivedMessage("ack1", publishMessage)))) + result.futureValue shouldBe PullResponse(Some(Seq(ReceivedMessage("ack1", message)))) } @@ -155,7 +189,7 @@ class PubSubApiSpec extends FlatSpec with BeforeAndAfterAll with ScalaFutures wi val GoogleApisHost = s"http://localhost:${wiremockServer.port()}" } - val publishMessage = + val message = PubSubMessage(messageId = "1", data = new String(Base64.getEncoder.encode("Hello Google!".getBytes))) val pullResponse = @@ -176,7 +210,7 @@ class PubSubApiSpec extends FlatSpec with BeforeAndAfterAll with ScalaFutures wi ) val result = TestEmulatorHttpApi.pull(config.projectId, "sub1", None, true, 1000) - result.futureValue shouldBe PullResponse(Some(Seq(ReceivedMessage("ack1", publishMessage)))) + result.futureValue shouldBe PullResponse(Some(Seq(ReceivedMessage("ack1", message)))) } @@ -227,16 +261,14 @@ class PubSubApiSpec extends FlatSpec with BeforeAndAfterAll with ScalaFutures wi it should "return exception with the meaningful error message in case of not successful publish response" in { val publishMessage = PubSubMessage( - messageId = "1", data = new String(Base64.getEncoder.encode("Hello Google!".getBytes)), - attributes = Some(Map("row_id" -> "7")), - publishTime = Some(Instant.parse("2014-10-02T15:01:23.045123456Z")) + attributes = Map("row_id" -> "7") ) val publishRequest = PublishRequest(Seq(publishMessage)) val expectedPublishRequest = - """{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ==","messageId":"1","attributes":{"row_id":"7"},"publishTime":"2014-10-02T15:01:23.045123456Z"}]}""" + """{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ==","attributes":{"row_id":"7"}}]}""" mock.register( WireMock diff --git a/google-cloud-pub-sub/src/test/scala/docs/scaladsl/ExampleUsage.scala b/google-cloud-pub-sub/src/test/scala/docs/scaladsl/ExampleUsage.scala index 491edbff83..e2dcaceef2 100644 --- a/google-cloud-pub-sub/src/test/scala/docs/scaladsl/ExampleUsage.scala +++ b/google-cloud-pub-sub/src/test/scala/docs/scaladsl/ExampleUsage.scala @@ -47,7 +47,7 @@ class ExampleUsage { //#publish-single val publishMessage = - PubSubMessage(messageId = "1", data = new String(Base64.getEncoder.encode("Hello Google!".getBytes))) + PubSubMessage(new String(Base64.getEncoder.encode("Hello Google!".getBytes))) val publishRequest = PublishRequest(Seq(publishMessage)) val source: Source[PublishRequest, NotUsed] = Source.single(publishRequest)