diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoAbstractConfig.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoAbstractConfig.java new file mode 100644 index 0000000000000..1c9786ea3d782 --- /dev/null +++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoAbstractConfig.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.mongodb; + +import static com.google.common.base.Preconditions.checkArgument; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.io.Serializable; +import lombok.Data; +import lombok.experimental.Accessors; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.io.core.annotations.FieldDoc; + +/** + * Configuration object for all MongoDB components. + */ +@Data +@Accessors(chain = true) +public abstract class MongoAbstractConfig implements Serializable { + + private static final long serialVersionUID = -3830568531897300005L; + + public static final int DEFAULT_BATCH_SIZE = 100; + + public static final long DEFAULT_BATCH_TIME_MS = 1000; + + @FieldDoc( + required = true, + defaultValue = "", + help = "The URI of MongoDB that the connector connects to " + + "(see: https://docs.mongodb.com/manual/reference/connection-string/)" + ) + private final String mongoUri; + + @FieldDoc( + defaultValue = "", + help = "The database name to which the collection belongs " + + "and which must be watched for the source connector " + + "(required for the sink connector)" + ) + private final String database; + + @FieldDoc( + defaultValue = "", + help = "The collection name where the messages are written " + + "or which is watched for the source connector " + + "(required for the sink connector)" + ) + private final String collection; + + @FieldDoc( + defaultValue = "" + DEFAULT_BATCH_SIZE, + help = "The batch size of write to or read from the database" + ) + private final int batchSize; + + @FieldDoc( + defaultValue = "" + DEFAULT_BATCH_TIME_MS, + help = "The batch operation interval in milliseconds") + private final long batchTimeMs; + + public MongoAbstractConfig() { + this(null, null, null, DEFAULT_BATCH_SIZE, DEFAULT_BATCH_TIME_MS); + } + + @JsonCreator + public MongoAbstractConfig( + @JsonProperty("mongoUri") String mongoUri, + @JsonProperty("database") String database, + @JsonProperty("collection") String collection, + @JsonProperty("batchSize") int batchSize, + @JsonProperty("batchTimeMs") long batchTimeMs + ) { + this.mongoUri = mongoUri; + this.database = database; + this.collection = collection; + this.batchSize = batchSize; + this.batchTimeMs = batchTimeMs; + } + + public void validate() { + checkArgument(!StringUtils.isEmpty(getMongoUri()), "Required MongoDB URI is not set."); + checkArgument(getBatchSize() > 0, "batchSize must be a positive integer."); + checkArgument(getBatchTimeMs() > 0, "batchTimeMs must be a positive long."); + } +} diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoConfig.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoConfig.java deleted file mode 100644 index 2d5c8c62f3887..0000000000000 --- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoConfig.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.mongodb; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import com.google.common.base.Preconditions; -import java.io.File; -import java.io.IOException; -import java.io.Serializable; -import java.util.Map; -import lombok.Data; -import lombok.experimental.Accessors; -import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.io.core.annotations.FieldDoc; - -/** - * Configuration class for the MongoDB Connectors. - */ -@Data -@Accessors(chain = true) -public class MongoConfig implements Serializable { - - private static final long serialVersionUID = 1L; - - public static final int DEFAULT_BATCH_SIZE = 100; - - public static final long DEFAULT_BATCH_TIME_MS = 1000; - - @FieldDoc( - required = true, - defaultValue = "", - help = "The uri of mongodb that the connector connects to" - + " (see: https://docs.mongodb.com/manual/reference/connection-string/)" - ) - private String mongoUri; - - @FieldDoc( - defaultValue = "", - help = "The database name to which the collection belongs and which must be watched for the source connector" - + " (required for the sink connector)" - ) - private String database; - - @FieldDoc( - defaultValue = "", - help = "The collection name where the messages are written or which is watched for the source connector" - + " (required for the sink connector)" - ) - private String collection; - - @FieldDoc( - defaultValue = "" + DEFAULT_BATCH_SIZE, - help = "The batch size of write to or read from the database" - ) - private int batchSize = DEFAULT_BATCH_SIZE; - - @FieldDoc( - defaultValue = "" + DEFAULT_BATCH_TIME_MS, - help = "The batch operation interval in milliseconds") - private long batchTimeMs = DEFAULT_BATCH_TIME_MS; - - - public static MongoConfig load(String yamlFile) throws IOException { - final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); - final MongoConfig cfg = mapper.readValue(new File(yamlFile), MongoConfig.class); - - return cfg; - } - - public static MongoConfig load(Map map) throws IOException { - final ObjectMapper mapper = new ObjectMapper(); - final MongoConfig cfg = mapper.readValue(new ObjectMapper().writeValueAsString(map), MongoConfig.class); - - return cfg; - } - - public void validate(boolean dbRequired, boolean collectionRequired) { - if (StringUtils.isEmpty(getMongoUri()) - || (dbRequired && StringUtils.isEmpty(getDatabase())) - || (collectionRequired && StringUtils.isEmpty(getCollection()))) { - - throw new IllegalArgumentException("Required property not set."); - } - - Preconditions.checkArgument(getBatchSize() > 0, "batchSize must be a positive integer."); - Preconditions.checkArgument(getBatchTimeMs() > 0, "batchTimeMs must be a positive long."); - } -} diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java index 18c630952d6d4..4077442e8af95 100644 --- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java +++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java @@ -57,12 +57,12 @@ name = "mongo", type = IOType.SINK, help = "A sink connector that sends pulsar messages to mongodb", - configClass = MongoConfig.class + configClass = MongoSinkConfig.class ) @Slf4j public class MongoSink implements Sink { - private MongoConfig mongoConfig; + private MongoSinkConfig mongoSinkConfig; private MongoClient mongoClient; @@ -86,22 +86,22 @@ public MongoSink(Supplier clientProvider) { public void open(Map config, SinkContext sinkContext) throws Exception { log.info("Open MongoDB Sink"); - mongoConfig = MongoConfig.load(config); - mongoConfig.validate(true, true); + mongoSinkConfig = MongoSinkConfig.load(config); + mongoSinkConfig.validate(); if (clientProvider != null) { mongoClient = clientProvider.get(); } else { - mongoClient = MongoClients.create(mongoConfig.getMongoUri()); + mongoClient = MongoClients.create(mongoSinkConfig.getMongoUri()); } - final MongoDatabase db = mongoClient.getDatabase(mongoConfig.getDatabase()); - collection = db.getCollection(mongoConfig.getCollection()); + final MongoDatabase db = mongoClient.getDatabase(mongoSinkConfig.getDatabase()); + collection = db.getCollection(mongoSinkConfig.getCollection()); incomingList = Lists.newArrayList(); flushExecutor = Executors.newScheduledThreadPool(1); flushExecutor.scheduleAtFixedRate(() -> flush(), - mongoConfig.getBatchTimeMs(), mongoConfig.getBatchTimeMs(), TimeUnit.MILLISECONDS); + mongoSinkConfig.getBatchTimeMs(), mongoSinkConfig.getBatchTimeMs(), TimeUnit.MILLISECONDS); } @Override @@ -119,7 +119,7 @@ public void write(Record record) { currentSize = incomingList.size(); } - if (currentSize == mongoConfig.getBatchSize()) { + if (currentSize == mongoSinkConfig.getBatchSize()) { flushExecutor.execute(() -> flush()); } } diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSinkConfig.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSinkConfig.java new file mode 100644 index 0000000000000..500b0eceaed94 --- /dev/null +++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSinkConfig.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.mongodb; + +import static com.google.common.base.Preconditions.checkArgument; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import java.io.File; +import java.io.IOException; +import java.util.Map; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.experimental.Accessors; +import org.apache.commons.lang3.StringUtils; + +/** + * Configuration class for the MongoDB Sink Connectors. + */ +@Data +@EqualsAndHashCode(callSuper = false) +@Accessors(chain = true) +public class MongoSinkConfig extends MongoAbstractConfig { + + private static final long serialVersionUID = 8805978990904614084L; + + @JsonCreator + public MongoSinkConfig( + @JsonProperty("mongoUri") String mongoUri, + @JsonProperty("database") String database, + @JsonProperty("collection") String collection, + @JsonProperty("batchSize") int batchSize, + @JsonProperty("batchTimeMs") long batchTimeMs + ) { + super(mongoUri, database, collection, batchSize, batchTimeMs); + } + + public static MongoSinkConfig load(String yamlFile) throws IOException { + final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + final MongoSinkConfig cfg = mapper.readValue(new File(yamlFile), MongoSinkConfig.class); + + return cfg; + } + + public static MongoSinkConfig load(Map map) throws IOException { + final ObjectMapper mapper = new ObjectMapper(); + final MongoSinkConfig cfg = mapper.readValue(new ObjectMapper().writeValueAsString(map), MongoSinkConfig.class); + + return cfg; + } + + @Override + public void validate() { + super.validate(); + checkArgument(!StringUtils.isEmpty(getDatabase()), "Required MongoDB database name is not set."); + checkArgument(!StringUtils.isEmpty(getCollection()), "Required MongoDB collection name is not set."); + } +} diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java index 968df6afbbae2..b71b5eefd3b3a 100644 --- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java +++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java @@ -40,6 +40,8 @@ import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.annotations.Connector; import org.apache.pulsar.io.core.annotations.IOType; +import org.bson.BsonDocument; +import org.bson.BsonTimestamp; import org.bson.Document; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -51,19 +53,17 @@ name = "mongo", type = IOType.SOURCE, help = "A source connector that sends mongodb documents to pulsar", - configClass = MongoConfig.class + configClass = MongoSourceConfig.class ) @Slf4j public class MongoSource extends PushSource { private final Supplier clientProvider; - private MongoConfig mongoConfig; + private MongoSourceConfig mongoSourceConfig; private MongoClient mongoClient; - private Thread streamThread; - private ChangeStreamPublisher stream; @@ -79,38 +79,47 @@ public MongoSource(Supplier clientProvider) { public void open(Map config, SourceContext sourceContext) throws Exception { log.info("Open MongoDB Source"); - mongoConfig = MongoConfig.load(config); - mongoConfig.validate(false, false); + mongoSourceConfig = MongoSourceConfig.load(config); + mongoSourceConfig.validate(); if (clientProvider != null) { mongoClient = clientProvider.get(); } else { - mongoClient = MongoClients.create(mongoConfig.getMongoUri()); + mongoClient = MongoClients.create(mongoSourceConfig.getMongoUri()); } - if (StringUtils.isEmpty(mongoConfig.getDatabase())) { + String mongoDatabase = mongoSourceConfig.getDatabase(); + if (StringUtils.isEmpty(mongoDatabase)) { // Watch all databases - log.info("Watch all"); + log.info("Watch all databases"); stream = mongoClient.watch(); } else { - final MongoDatabase db = mongoClient.getDatabase(mongoConfig.getDatabase()); - - if (StringUtils.isEmpty(mongoConfig.getCollection())) { + final MongoDatabase db = mongoClient.getDatabase(mongoDatabase); + String mongoCollection = mongoSourceConfig.getCollection(); + if (StringUtils.isEmpty(mongoCollection)) { // Watch all collections in a database log.info("Watch db: {}", db.getName()); stream = db.watch(); } else { // Watch a collection - - final MongoCollection collection = db.getCollection(mongoConfig.getCollection()); - log.info("Watch collection: {} {}", db.getName(), mongoConfig.getCollection()); + final MongoCollection collection = db.getCollection(mongoCollection); + log.info("Watch collection: {}.{}", db.getName(), mongoCollection); stream = collection.watch(); } } - stream.batchSize(mongoConfig.getBatchSize()).fullDocument(FullDocument.UPDATE_LOOKUP); + stream.batchSize(mongoSourceConfig.getBatchSize()) + .fullDocument(FullDocument.UPDATE_LOOKUP); + + if (mongoSourceConfig.getSyncType() == SyncType.FULL_SYNC) { + // sync currently existing messages + // startAtOperationTime is the starting point for the change stream + // setting startAtOperationTime to 0 means the start point is the earliest + // see https://www.mongodb.com/docs/v4.2/reference/method/db.collection.watch/ for more information + stream.startAtOperationTime(new BsonTimestamp(0L)); + } stream.subscribe(new Subscriber>() { private ObjectMapper mapper = new ObjectMapper(); @@ -127,6 +136,12 @@ public void onNext(ChangeStreamDocument doc) { try { log.info("New change doc: {}", doc); + BsonDocument documentKey = doc.getDocumentKey(); + if (documentKey == null) { + log.warn("The document key is null"); + return; + } + // Build a record with the essential information final Map recordValue = new HashMap<>(); recordValue.put("fullDocument", doc.getFullDocument()); @@ -134,7 +149,7 @@ public void onNext(ChangeStreamDocument doc) { recordValue.put("operation", doc.getOperationType()); consume(new DocRecord( - Optional.of(doc.getDocumentKey().toJson()), + Optional.of(documentKey.toJson()), mapper.writeValueAsString(recordValue).getBytes(StandardCharsets.UTF_8))); } catch (JsonProcessingException e) { diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java new file mode 100644 index 0000000000000..027c7743187b0 --- /dev/null +++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.mongodb; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import java.io.File; +import java.io.IOException; +import java.util.Map; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.experimental.Accessors; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.io.core.annotations.FieldDoc; + +/** + * Configuration class for the MongoDB Source Connectors. + */ +@Data +@EqualsAndHashCode(callSuper = false) +@Accessors(chain = true) +public class MongoSourceConfig extends MongoAbstractConfig { + + private static final long serialVersionUID = 1152890092264945317L; + + public static final SyncType DEFAULT_SYNC_TYPE = SyncType.INCR_SYNC; + + public static final String DEFAULT_SYNC_TYPE_STR = "INCR_SYNC"; + + @FieldDoc( + defaultValue = DEFAULT_SYNC_TYPE_STR, + help = "The message synchronization type of the source connector. " + + "The field values can be of two types: incr and full. " + + "When it is set to incr, the source connector will only watch for changes made from now on. " + + "When it is set to full, the source connector will synchronize currently existing messages " + + "and watch for future changes." + ) + private SyncType syncType = DEFAULT_SYNC_TYPE; + + @JsonCreator + public MongoSourceConfig( + @JsonProperty("mongoUri") String mongoUri, + @JsonProperty("database") String database, + @JsonProperty("collection") String collection, + @JsonProperty("batchSize") int batchSize, + @JsonProperty("batchTimeMs") long batchTimeMs, + @JsonProperty("syncType") String syncType + ) { + super(mongoUri, database, collection, batchSize, batchTimeMs); + setSyncType(syncType); + } + + public static MongoSourceConfig load(String yamlFile) throws IOException { + final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + final MongoSourceConfig cfg = mapper.readValue(new File(yamlFile), MongoSourceConfig.class); + + return cfg; + } + + public static MongoSourceConfig load(Map map) throws IOException { + final ObjectMapper mapper = new ObjectMapper(); + final MongoSourceConfig cfg = + mapper.readValue(new ObjectMapper().writeValueAsString(map), MongoSourceConfig.class); + + return cfg; + } + + /** + * @param syncTypeStr Sync type string. + */ + private void setSyncType(String syncTypeStr) { + // if syncType is not set, the default sync type is used + if (StringUtils.isEmpty(syncTypeStr)) { + this.syncType = DEFAULT_SYNC_TYPE; + return; + } + + // if syncType is set but not correct, an exception will be thrown + try { + this.syncType = SyncType.valueOf(syncTypeStr.toUpperCase()); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("The value of the syncType field is incorrect."); + } + } + + @Override + public void validate() { + super.validate(); + } +} diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/SyncType.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/SyncType.java new file mode 100644 index 0000000000000..37d8268087a87 --- /dev/null +++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/SyncType.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.mongodb; + +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Stable +public enum SyncType { + + /** + * Synchronize all data. + */ + FULL_SYNC, + + /** + * Synchronize incremental data. + */ + INCR_SYNC +} diff --git a/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml index 4fab476dc728a..59be102d89417 100644 --- a/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml +++ b/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml @@ -20,5 +20,5 @@ name: mongo description: MongoDB source and sink connector sinkClass: org.apache.pulsar.io.mongodb.MongoSink sourceClass: org.apache.pulsar.io.mongodb.MongoSource -sourceConfigClass: org.apache.pulsar.io.mongodb.MongoConfig -sinkConfigClass: org.apache.pulsar.io.mongodb.MongoConfig +sourceConfigClass: org.apache.pulsar.io.mongodb.MongoSourceConfig +sinkConfigClass: org.apache.pulsar.io.mongodb.MongoSinkConfig diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoConfigTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoConfigTest.java deleted file mode 100644 index 8495d87edab3a..0000000000000 --- a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoConfigTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pulsar.io.mongodb; - -import org.testng.annotations.Test; - -import java.io.File; -import java.io.IOException; -import java.util.Map; - -import static org.testng.Assert.assertEquals; - -public class MongoConfigTest { - - private static File getFile(String fileName) { - return new File(MongoConfigTest.class.getClassLoader().getResource(fileName).getFile()); - } - - @Test - public void testMap() throws IOException { - final Map map = TestHelper.createMap(true); - final MongoConfig cfg = MongoConfig.load(map); - - assertEquals(cfg.getMongoUri(), TestHelper.URI); - assertEquals(cfg.getDatabase(), TestHelper.DB); - assertEquals(cfg.getCollection(), TestHelper.COLL); - assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE); - } - - @Test(expectedExceptions = IllegalArgumentException.class, - expectedExceptionsMessageRegExp = "Required property not set.") - public void testBadMap() throws IOException { - final Map map = TestHelper.createMap(false); - final MongoConfig cfg = MongoConfig.load(map); - - cfg.validate(true, true); - } - - @Test(expectedExceptions = IllegalArgumentException.class, - expectedExceptionsMessageRegExp = "batchSize must be a positive integer.") - public void testBadBatchSize() throws IOException { - final Map map = TestHelper.createMap(true); - map.put("batchSize", 0); - final MongoConfig cfg = MongoConfig.load(map); - - cfg.validate(true, true); - } - - @Test(expectedExceptions = IllegalArgumentException.class, - expectedExceptionsMessageRegExp = "batchTimeMs must be a positive long.") - public void testBadBatchTime() throws IOException { - final Map map = TestHelper.createMap(true); - map.put("batchTimeMs", 0); - final MongoConfig cfg = MongoConfig.load(map); - - cfg.validate(true, true); - } - - @Test - public void testYaml() throws IOException { - final File yaml = getFile("mongoSinkConfig.yaml"); - final MongoConfig cfg = MongoConfig.load(yaml.getAbsolutePath()); - - assertEquals(cfg.getMongoUri(), TestHelper.URI); - assertEquals(cfg.getDatabase(), TestHelper.DB); - assertEquals(cfg.getCollection(), TestHelper.COLL); - assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE); - assertEquals(cfg.getBatchTimeMs(), TestHelper.BATCH_TIME); - } -} diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkConfigTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkConfigTest.java new file mode 100644 index 0000000000000..8cdcbe528ce6d --- /dev/null +++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkConfigTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.mongodb; + +import java.util.Map; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; + +import static org.testng.Assert.assertEquals; + +public class MongoSinkConfigTest { + + @Test + public void testLoadMapConfig() throws IOException { + final Map commonConfigMap = TestHelper.createCommonConfigMap(); + commonConfigMap.put("batchSize", TestHelper.BATCH_SIZE); + commonConfigMap.put("batchTimeMs", TestHelper.BATCH_TIME); + + final MongoSinkConfig cfg = MongoSinkConfig.load(commonConfigMap); + + assertEquals(cfg.getMongoUri(), TestHelper.URI); + assertEquals(cfg.getDatabase(), TestHelper.DB); + assertEquals(cfg.getCollection(), TestHelper.COLL); + assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE); + assertEquals(cfg.getBatchTimeMs(), TestHelper.BATCH_TIME); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Required MongoDB URI is not set.") + public void testBadMongoUri() throws IOException { + final Map configMap = TestHelper.createCommonConfigMap(); + TestHelper.removeMongoUri(configMap); + + final MongoSinkConfig cfg = MongoSinkConfig.load(configMap); + + cfg.validate(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Required MongoDB database name is not set.") + public void testBadDatabase() throws IOException { + final Map configMap = TestHelper.createCommonConfigMap(); + TestHelper.removeDatabase(configMap); + + final MongoSinkConfig cfg = MongoSinkConfig.load(configMap); + + cfg.validate(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Required MongoDB collection name is not set.") + public void testBadCollection() throws IOException { + final Map configMap = TestHelper.createCommonConfigMap(); + TestHelper.removeCollection(configMap); + + final MongoSinkConfig cfg = MongoSinkConfig.load(configMap); + + cfg.validate(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "batchSize must be a positive integer.") + public void testBadBatchSize() throws IOException { + final Map configMap = TestHelper.createCommonConfigMap(); + TestHelper.putBatchSize(configMap, 0); + + final MongoSinkConfig cfg = MongoSinkConfig.load(configMap); + + cfg.validate(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "batchTimeMs must be a positive long.") + public void testBadBatchTime() throws IOException { + final Map configMap = TestHelper.createCommonConfigMap(); + TestHelper.putBatchTime(configMap, 0L); + + final MongoSinkConfig cfg = MongoSinkConfig.load(configMap); + + cfg.validate(); + } + + @Test + public void testLoadYamlConfig() throws IOException { + final File yaml = TestHelper.getFile(MongoSinkConfigTest.class, "mongoSinkConfig.yaml"); + final MongoSinkConfig cfg = MongoSinkConfig.load(yaml.getAbsolutePath()); + + assertEquals(cfg.getMongoUri(), TestHelper.URI); + assertEquals(cfg.getDatabase(), TestHelper.DB); + assertEquals(cfg.getCollection(), TestHelper.COLL); + assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE); + assertEquals(cfg.getBatchTimeMs(), TestHelper.BATCH_TIME); + } +} diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java index 5fe9675ae5c8e..f1e5ef1d95c81 100644 --- a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java +++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java @@ -76,7 +76,7 @@ public class MongoSinkTest { @BeforeMethod public void setUp() { - map = TestHelper.createMap(true); + map = TestHelper.createCommonConfigMap(); mockRecord = mock(Record.class); mockSinkContext = mock(SinkContext.class); diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceConfigTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceConfigTest.java new file mode 100644 index 0000000000000..63e01551ae2b2 --- /dev/null +++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceConfigTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.mongodb; + +import static org.testng.Assert.assertEquals; +import com.fasterxml.jackson.databind.JsonMappingException; +import java.io.File; +import java.io.IOException; +import java.util.Map; +import org.testng.annotations.Test; + +public class MongoSourceConfigTest { + + @Test + public void testLoadMapConfig() throws IOException { + final Map configMap = TestHelper.createCommonConfigMap(); + TestHelper.putSyncType(configMap, TestHelper.SYNC_TYPE); + + final MongoSourceConfig cfg = MongoSourceConfig.load(configMap); + + assertEquals(cfg.getMongoUri(), TestHelper.URI); + assertEquals(cfg.getDatabase(), TestHelper.DB); + assertEquals(cfg.getCollection(), TestHelper.COLL); + assertEquals(cfg.getSyncType(), TestHelper.SYNC_TYPE); + assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE); + assertEquals(cfg.getBatchTimeMs(), TestHelper.BATCH_TIME); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Required MongoDB URI is not set.") + public void testBadMongoUri() throws IOException { + final Map configMap = TestHelper.createCommonConfigMap(); + TestHelper.removeMongoUri(configMap); + + final MongoSourceConfig cfg = MongoSourceConfig.load(configMap); + + cfg.validate(); + } + + /** + * Test whether an exception is thrown when the syncType field has an incorrect value. + */ + @Test(expectedExceptions = {IllegalArgumentException.class, JsonMappingException.class}) + public void testBadSyncType() throws IOException { + final Map configMap = TestHelper.createCommonConfigMap(); + TestHelper.putSyncType(configMap, "wrong_sync_type_str"); + + final MongoSourceConfig cfg = MongoSourceConfig.load(configMap); + + cfg.validate(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "batchSize must be a positive integer.") + public void testBadBatchSize() throws IOException { + final Map configMap = TestHelper.createCommonConfigMap(); + TestHelper.putBatchSize(configMap, 0); + + final MongoSourceConfig cfg = MongoSourceConfig.load(configMap); + + cfg.validate(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "batchTimeMs must be a positive long.") + public void testBadBatchTime() throws IOException { + final Map configMap = TestHelper.createCommonConfigMap(); + TestHelper.putBatchTime(configMap, 0L); + + final MongoSourceConfig cfg = MongoSourceConfig.load(configMap); + + cfg.validate(); + } + + @Test + public void testLoadYamlConfig() throws IOException { + final File yaml = TestHelper.getFile(MongoSourceConfigTest.class, "mongoSourceConfig.yaml"); + final MongoSourceConfig cfg = MongoSourceConfig.load(yaml.getAbsolutePath()); + + assertEquals(cfg.getMongoUri(), TestHelper.URI); + assertEquals(cfg.getDatabase(), TestHelper.DB); + assertEquals(cfg.getCollection(), TestHelper.COLL); + assertEquals(cfg.getSyncType(), TestHelper.SYNC_TYPE); + assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE); + assertEquals(cfg.getBatchTimeMs(), TestHelper.BATCH_TIME); + } +} diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java index 06df54e164912..c9b9d02a20f03 100644 --- a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java +++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java @@ -77,7 +77,7 @@ public class MongoSourceTest { @BeforeMethod public void setUp() { - map = TestHelper.createMap(true); + map = TestHelper.createCommonConfigMap(); mockSourceContext = mock(SourceContext.class); mockMongoClient = mock(MongoClient.class); diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/TestHelper.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/TestHelper.java index 82a0744c175d6..fcfc46ecfee64 100644 --- a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/TestHelper.java +++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/TestHelper.java @@ -19,6 +19,9 @@ package org.apache.pulsar.io.mongodb; +import static org.testng.Assert.assertNotNull; +import java.io.File; +import java.net.URL; import java.util.HashMap; import java.util.Map; @@ -30,26 +33,57 @@ public final class TestHelper { public static final String COLL = "messages"; + public static final SyncType SYNC_TYPE = SyncType.FULL_SYNC; + public static final int BATCH_SIZE = 2; - public static final int BATCH_TIME = 500; + public static final long BATCH_TIME = 500L; + + private TestHelper() { + } + public static File getFile(Class clazz, String fileName) { + ClassLoader classLoader = clazz.getClassLoader(); + URL url = classLoader.getResource(fileName); + assertNotNull(url); + return new File(url.getFile()); + } - public static Map createMap(boolean full) { + /** + * @return a map with all common fields + */ + public static Map createCommonConfigMap() { final Map map = new HashMap<>(); map.put("mongoUri", URI); map.put("database", DB); + map.put("collection", COLL); + map.put("batchSize", BATCH_SIZE); + map.put("batchTimeMs", BATCH_TIME); + return map; + } + + public static void removeMongoUri(Map configMap) { + configMap.remove("mongoUri"); + } - if (full) { - map.put("collection", COLL); - map.put("batchSize", BATCH_SIZE); - map.put("batchTimeMs", BATCH_TIME); - } + public static void removeDatabase(Map configMap) { + configMap.remove("database"); + } - return map; + public static void removeCollection(Map configMap) { + configMap.remove("collection"); } - private TestHelper() { + public static void putSyncType(Map configMap, Object syncType) { + configMap.put("syncType", syncType); + } + + public static void putBatchSize(Map configMap, int batchSize) { + configMap.put("batchSize", batchSize); + } + public static void putBatchTime(Map configMap, long batchTime) { + configMap.put("batchTimeMs", batchTime); } + } diff --git a/pulsar-io/mongo/src/test/resources/mongoSinkConfig.yaml b/pulsar-io/mongo/src/test/resources/mongoSinkConfig.yaml index f7a9ea28a76f9..9857beab55617 100644 --- a/pulsar-io/mongo/src/test/resources/mongoSinkConfig.yaml +++ b/pulsar-io/mongo/src/test/resources/mongoSinkConfig.yaml @@ -23,4 +23,4 @@ "collection": "messages", "batchSize": 2, "batchTimeMs": 500 -} \ No newline at end of file +} diff --git a/pulsar-io/mongo/src/test/resources/mongoSourceConfig.yaml b/pulsar-io/mongo/src/test/resources/mongoSourceConfig.yaml new file mode 100644 index 0000000000000..68c783ca1cac1 --- /dev/null +++ b/pulsar-io/mongo/src/test/resources/mongoSourceConfig.yaml @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +{ + "mongoUri": "mongodb://localhost", + "database": "pulsar", + "collection": "messages", + "syncType": "full_sync", + "batchSize": 2, + "batchTimeMs": 500 +} diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md index 83986c8c82754..1b6f9e85bc2e0 100644 --- a/site2/docs/io-connectors.md +++ b/site2/docs/io-connectors.md @@ -101,7 +101,13 @@ Pulsar has various source connectors, which are sorted alphabetically as below. * [Configuration](io-kinesis-source.md#configuration) * [Java class](https://github.com/apache/pulsar/blob/master/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java) - + +### MongoDB + +* [Configuration](io-mongo-source.md#configuration) + +* [Java class](https://github.com/apache/pulsar/blob/master/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java) + ### Netty * [Configuration](io-netty-source.md#configuration) diff --git a/site2/docs/io-mongo-source.md b/site2/docs/io-mongo-source.md new file mode 100644 index 0000000000000..a525cb79b3282 --- /dev/null +++ b/site2/docs/io-mongo-source.md @@ -0,0 +1,55 @@ +--- +id: io-mongo-source +title: MongoDB source connector +sidebar_label: "MongoDB source connector" +--- + +The MongoDB source connector pulls documents from MongoDB and persists the messages to Pulsar topics. + +This guide explains how to configure and use the MongoDB source connector. + +## Configuration + +The configuration of the MongoDB source connector has the following properties. + +### Property + +| Name | Type | Required | Default | Description | +|---------------|--------|----------|--------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `mongoUri` | String | true | " " (empty string) | The MongoDB URI to which the connector connects.

For more information, see [connection string URI format](https://docs.mongodb.com/manual/reference/connection-string/). | +| `database` | String | false | " " (empty string) | The name of the watched database.

If this field is not set, the source connector will watch the entire MongoDB for all changes. | +| `collection` | String | false | " " (empty string) | The name of the watched collection.

If this field is not set, the source connector will watch the database for all changes. | +| `syncType` | String | false | "INCR_SYNC" | The synchronization type between MongoDB and Pulsar: full synchronization or incremental synchronization.

Valid values are `full_sync`, `FULL_SYNC`, `incr_sync` and `INCR_SYNC`. | +| `batchSize` | int | false | 100 | The batch size of pulling documents from collections. | +| `batchTimeMs` | long | false | 1000 | The batch operation interval in milliseconds. | + +### Example + +Before using the Mongo source connector, you need to create a configuration file through one of the following methods. + +* JSON + + ```json + { + "configs": { + "mongoUri": "mongodb://localhost:27017", + "database": "pulsar", + "collection": "messages", + "syncType": "full_sync", + "batchSize": "2", + "batchTimeMs": "500" + } + } + ``` + +* YAML + + ```yaml + configs: + mongoUri: "mongodb://localhost:27017" + database: "pulsar" + collection: "messages" + syncType: "full_sync", + batchSize: 2 + batchTimeMs: 500 + ``` diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml index c491d005fda66..7b842bc58b0cc 100644 --- a/tests/integration/pom.xml +++ b/tests/integration/pom.xml @@ -34,6 +34,7 @@ pulsar.xml + 4.1.2 @@ -208,6 +209,19 @@ test + + + org.testcontainers + mongodb + test + + + org.mongodb + mongodb-driver-reactivestreams + ${mongo-reactivestreams.version} + test + + diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/MongoSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/MongoSourceTester.java new file mode 100644 index 0000000000000..273a1ecdd48e4 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/MongoSourceTester.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.tests.integration.io.sources; + +import com.mongodb.client.model.changestream.ChangeStreamDocument; +import com.mongodb.client.model.changestream.FullDocument; +import com.mongodb.reactivestreams.client.ChangeStreamPublisher; +import com.mongodb.reactivestreams.client.MongoClient; +import com.mongodb.reactivestreams.client.MongoClients; +import com.mongodb.reactivestreams.client.MongoDatabase; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.tests.integration.containers.DebeziumMongoDbContainer; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.bson.Document; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.MongoDBContainer; + +@Slf4j +public class MongoSourceTester extends SourceTester { + + private static final String SOURCE_TYPE = "mongo"; + + private static final String DEFAULT_DATABASE = "test"; + + private static final int DEFAULT_BATCH_SIZE = 2; + + private final MongoDBContainer mongoContainer; + + private final PulsarCluster pulsarCluster; + + protected MongoSourceTester(MongoDBContainer mongoContainer, PulsarCluster pulsarCluster) { + super(SOURCE_TYPE); + this.mongoContainer = mongoContainer; + this.pulsarCluster = pulsarCluster; + + sourceConfig.put("mongoUri", mongoContainer.getConnectionString()); + sourceConfig.put("database", DEFAULT_DATABASE); + sourceConfig.put("syncType", "full_sync"); + sourceConfig.put("batchSize", DEFAULT_BATCH_SIZE); + } + + @Override + public void setServiceContainer(MongoDBContainer serviceContainer) { + log.info("start mongodb server container."); + pulsarCluster.startService(DebeziumMongoDbContainer.NAME, mongoContainer); + } + + @Override + public void prepareSource() throws Exception { + MongoClient mongoClient = MongoClients.create(mongoContainer.getConnectionString()); + MongoDatabase db = mongoClient.getDatabase(DEFAULT_DATABASE); + log.info("Subscribing mongodb change streams on: {}", mongoContainer.getReplicaSetUrl(DEFAULT_DATABASE)); + + ChangeStreamPublisher stream = db.watch(); + stream.batchSize(DEFAULT_BATCH_SIZE) + .fullDocument(FullDocument.UPDATE_LOOKUP); + + stream.subscribe(new Subscriber<>() { + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(Integer.MAX_VALUE); + } + + @Override + public void onNext(ChangeStreamDocument doc) { + log.info("New change doc: {}", doc); + } + + @Override + public void onError(Throwable error) { + log.error("Subscriber error", error); + } + + @Override + public void onComplete() { + log.info("Subscriber complete"); + } + }); + + log.info("Successfully subscribe to mongodb change streams"); + } + + @Override + public void prepareInsertEvent() throws Exception { + Container.ExecResult execResult = this.mongoContainer.execInContainer( + "/usr/bin/mongo", + "--eval", + "db.products.insert" + + "({" + + "name: \"test-mongo\"," + + "description: \"test message\"" + + "})" + ); + log.info("Successfully insert a message: {}", execResult.getStdout()); + } + + @Override + public void prepareDeleteEvent() throws Exception { + Container.ExecResult execResult = mongoContainer.execInContainer( + "/usr/bin/mongo", + "--eval", + "db.products.deleteOne" + + "({" + + "name: \"test-mongo\"" + + "})" + ); + log.info("Successfully delete a message: {}", execResult.getStdout()); + } + + @Override + public void prepareUpdateEvent() throws Exception { + Container.ExecResult execResult = mongoContainer.execInContainer( + "/usr/bin/mongo", + "--eval", + "db.products.update" + + "(" + + "{name: \"test-mongo-source\"}" + + "," + + "{$set:{name:\"test-mongo-update\", description: \"updated message\"}}" + + ")" + ); + log.info("Successfully update a message: {}", execResult.getStdout()); + } + + @Override + public Map produceSourceMessages(int numMessages) throws Exception { + log.info("mongodb server already contains preconfigured data."); + return null; + } + + @Override + public void close() throws Exception { + if (mongoContainer != null) { + mongoContainer.close(); + } + } +}