Skip to content

Commit

Permalink
[feature][pulsar-io-mongo] Add support for full message synchronizati…
Browse files Browse the repository at this point in the history
…on (#16003)

### Motivation

Now, the MongoDB source connector only supports the incremental message synchronization.
This PR adds support for full message synchronization.

Since MongDB 4.0, we can set the starting point for the change stream by the `startAtOperationTime` field.
So, we can set it to `0` to make start point the earliest.
See https://www.mongodb.com/docs/v4.2/reference/method/db.collection.watch/ for more information.
  • Loading branch information
shink authored Sep 20, 2022
1 parent f6665fb commit cda2ea7
Show file tree
Hide file tree
Showing 20 changed files with 888 additions and 233 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.mongodb;

import static com.google.common.base.Preconditions.checkArgument;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.Serializable;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.io.core.annotations.FieldDoc;

/**
* Configuration object for all MongoDB components.
*/
@Data
@Accessors(chain = true)
public abstract class MongoAbstractConfig implements Serializable {

private static final long serialVersionUID = -3830568531897300005L;

public static final int DEFAULT_BATCH_SIZE = 100;

public static final long DEFAULT_BATCH_TIME_MS = 1000;

@FieldDoc(
required = true,
defaultValue = "",
help = "The URI of MongoDB that the connector connects to "
+ "(see: https://docs.mongodb.com/manual/reference/connection-string/)"
)
private final String mongoUri;

@FieldDoc(
defaultValue = "",
help = "The database name to which the collection belongs "
+ "and which must be watched for the source connector "
+ "(required for the sink connector)"
)
private final String database;

@FieldDoc(
defaultValue = "",
help = "The collection name where the messages are written "
+ "or which is watched for the source connector "
+ "(required for the sink connector)"
)
private final String collection;

@FieldDoc(
defaultValue = "" + DEFAULT_BATCH_SIZE,
help = "The batch size of write to or read from the database"
)
private final int batchSize;

@FieldDoc(
defaultValue = "" + DEFAULT_BATCH_TIME_MS,
help = "The batch operation interval in milliseconds")
private final long batchTimeMs;

public MongoAbstractConfig() {
this(null, null, null, DEFAULT_BATCH_SIZE, DEFAULT_BATCH_TIME_MS);
}

@JsonCreator
public MongoAbstractConfig(
@JsonProperty("mongoUri") String mongoUri,
@JsonProperty("database") String database,
@JsonProperty("collection") String collection,
@JsonProperty("batchSize") int batchSize,
@JsonProperty("batchTimeMs") long batchTimeMs
) {
this.mongoUri = mongoUri;
this.database = database;
this.collection = collection;
this.batchSize = batchSize;
this.batchTimeMs = batchTimeMs;
}

public void validate() {
checkArgument(!StringUtils.isEmpty(getMongoUri()), "Required MongoDB URI is not set.");
checkArgument(getBatchSize() > 0, "batchSize must be a positive integer.");
checkArgument(getBatchTimeMs() > 0, "batchTimeMs must be a positive long.");
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@
name = "mongo",
type = IOType.SINK,
help = "A sink connector that sends pulsar messages to mongodb",
configClass = MongoConfig.class
configClass = MongoSinkConfig.class
)
@Slf4j
public class MongoSink implements Sink<byte[]> {

private MongoConfig mongoConfig;
private MongoSinkConfig mongoSinkConfig;

private MongoClient mongoClient;

Expand All @@ -86,22 +86,22 @@ public MongoSink(Supplier<MongoClient> clientProvider) {
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
log.info("Open MongoDB Sink");

mongoConfig = MongoConfig.load(config);
mongoConfig.validate(true, true);
mongoSinkConfig = MongoSinkConfig.load(config);
mongoSinkConfig.validate();

if (clientProvider != null) {
mongoClient = clientProvider.get();
} else {
mongoClient = MongoClients.create(mongoConfig.getMongoUri());
mongoClient = MongoClients.create(mongoSinkConfig.getMongoUri());
}

final MongoDatabase db = mongoClient.getDatabase(mongoConfig.getDatabase());
collection = db.getCollection(mongoConfig.getCollection());
final MongoDatabase db = mongoClient.getDatabase(mongoSinkConfig.getDatabase());
collection = db.getCollection(mongoSinkConfig.getCollection());

incomingList = Lists.newArrayList();
flushExecutor = Executors.newScheduledThreadPool(1);
flushExecutor.scheduleAtFixedRate(() -> flush(),
mongoConfig.getBatchTimeMs(), mongoConfig.getBatchTimeMs(), TimeUnit.MILLISECONDS);
mongoSinkConfig.getBatchTimeMs(), mongoSinkConfig.getBatchTimeMs(), TimeUnit.MILLISECONDS);
}

@Override
Expand All @@ -119,7 +119,7 @@ public void write(Record<byte[]> record) {
currentSize = incomingList.size();
}

if (currentSize == mongoConfig.getBatchSize()) {
if (currentSize == mongoSinkConfig.getBatchSize()) {
flushExecutor.execute(() -> flush());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.mongodb;

import static com.google.common.base.Preconditions.checkArgument;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;

/**
* Configuration class for the MongoDB Sink Connectors.
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class MongoSinkConfig extends MongoAbstractConfig {

private static final long serialVersionUID = 8805978990904614084L;

@JsonCreator
public MongoSinkConfig(
@JsonProperty("mongoUri") String mongoUri,
@JsonProperty("database") String database,
@JsonProperty("collection") String collection,
@JsonProperty("batchSize") int batchSize,
@JsonProperty("batchTimeMs") long batchTimeMs
) {
super(mongoUri, database, collection, batchSize, batchTimeMs);
}

public static MongoSinkConfig load(String yamlFile) throws IOException {
final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
final MongoSinkConfig cfg = mapper.readValue(new File(yamlFile), MongoSinkConfig.class);

return cfg;
}

public static MongoSinkConfig load(Map<String, Object> map) throws IOException {
final ObjectMapper mapper = new ObjectMapper();
final MongoSinkConfig cfg = mapper.readValue(new ObjectMapper().writeValueAsString(map), MongoSinkConfig.class);

return cfg;
}

@Override
public void validate() {
super.validate();
checkArgument(!StringUtils.isEmpty(getDatabase()), "Required MongoDB database name is not set.");
checkArgument(!StringUtils.isEmpty(getCollection()), "Required MongoDB collection name is not set.");
}
}
Loading

0 comments on commit cda2ea7

Please sign in to comment.