Skip to content

Commit

Permalink
[PIP-88] Replicate schemas across clusters (#11441)
Browse files Browse the repository at this point in the history
* [PIP-88] Replicate schemas accross clusters

Here is the proposal: https://github.com/apache/pulsar/wiki/PIP-88%3A-Replicate-schemas-across-multiple
For the implementation, we just need to set the correct SchemaInfo for the replicated message and using
the AutoProduceByte schema for the producer of the
  • Loading branch information
codelipenghui authored Aug 8, 2021
1 parent 8324205 commit a09bd68
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
Expand All @@ -41,6 +43,7 @@ public abstract class AbstractReplicator {
protected final String topicName;
protected final String localCluster;
protected final String remoteCluster;
protected final PulsarClientImpl replicationClient;
protected final PulsarClientImpl client;

protected volatile ProducerImpl producer;
Expand All @@ -63,18 +66,19 @@ protected enum State {
}

public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster,
BrokerService brokerService) throws NamingException {
BrokerService brokerService) throws NamingException, PulsarServerException {
validatePartitionedTopic(topicName, brokerService);
this.brokerService = brokerService;
this.topicName = topicName;
this.replicatorPrefix = replicatorPrefix;
this.localCluster = localCluster.intern();
this.remoteCluster = remoteCluster.intern();
this.client = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster);
this.replicationClient = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster);
this.client = (PulsarClientImpl) brokerService.pulsar().getClient();
this.producer = null;
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();

this.producerBuilder = client.newProducer() //
this.producerBuilder = replicationClient.newProducer(Schema.AUTO_PRODUCE_BYTES()) //
.topic(topicName)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.enableBatching(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
Expand All @@ -48,7 +49,7 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
private final NonPersistentReplicatorStatsImpl stats = new NonPersistentReplicatorStatsImpl();

public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster,
BrokerService brokerService) throws NamingException {
BrokerService brokerService) throws NamingException, PulsarServerException {
super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);

producerBuilder.blockIfQueueFull(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractTopic;
Expand Down Expand Up @@ -561,7 +562,7 @@ protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic
replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new NonPersistentReplicator(NonPersistentTopic.this, localCluster, remoteCluster, brokerService);
} catch (NamingException e) {
} catch (NamingException | PulsarServerException e) {
isReplicatorStarted.set(false);
log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand All @@ -41,6 +42,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
Expand All @@ -56,6 +58,7 @@
import org.apache.pulsar.common.api.proto.MarkerType;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
Expand Down Expand Up @@ -102,7 +105,7 @@ public class PersistentReplicator extends AbstractReplicator
private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();

public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
BrokerService brokerService) throws NamingException {
BrokerService brokerService) throws NamingException, PulsarServerException {
super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
this.topic = topic;
this.cursor = cursor;
Expand Down Expand Up @@ -358,7 +361,15 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {

headersAndPayload.retain();

producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
getSchemaInfo(msg).thenAccept(schemaInfo -> {
msg.setSchemaInfoForReplicator(schemaInfo);
producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
}).exceptionally(ex -> {
log.error("[{}][{} -> {}] Failed to get schema from local cluster", topicName,
localCluster, remoteCluster, ex);
return null;
});

atLeastOneMessageSentForReplication = true;
}
} catch (Exception e) {
Expand All @@ -379,6 +390,14 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
}
}

private CompletableFuture<SchemaInfo> getSchemaInfo(MessageImpl msg) throws ExecutionException {
if (msg.getSchemaVersion() == null || msg.getSchemaVersion().length == 0) {
return CompletableFuture.completedFuture(null);
}
return client.getSchemaProviderLoadingCache().get(topicName)
.getSchemaByVersion(msg.getSchemaVersion());
}

public void updateCursorState() {
if (this.cursor != null) {
if (producer != null && producer.isConnected()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1431,7 +1431,7 @@ protected boolean addReplicationCluster(String remoteCluster, ManagedCursor curs
try {
return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
brokerService);
} catch (NamingException e) {
} catch (NamingException | PulsarServerException e) {
isReplicatorStarted.set(false);
log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public void setup() throws Exception {

mlFactoryMock = mock(ManagedLedgerFactory.class);
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
doReturn(mock(PulsarClientImpl.class)).when(pulsar).getClient();

ZooKeeper mockZk = createMockZooKeeper();
doReturn(mockZk).when(pulsar).getZkClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.schema.Schemas;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
Expand Down Expand Up @@ -375,6 +376,62 @@ public void testReplication(String namespace) throws Exception {
consumer3.receive(1);
}

@Test
public void testReplicationWithSchema() throws Exception {
PulsarClient client1 = pulsar1.getClient();
PulsarClient client2 = pulsar2.getClient();
PulsarClient client3 = pulsar3.getClient();
final TopicName topic = TopicName
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/testReplicationWithSchema"));

final String subName = "my-sub";

@Cleanup
Producer<Schemas.PersonOne> producer1 = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topic.toString())
.create();
@Cleanup
Producer<Schemas.PersonOne> producer2 = client2.newProducer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topic.toString())
.create();
@Cleanup
Producer<Schemas.PersonOne> producer3 = client3.newProducer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topic.toString())
.create();

List<Producer<Schemas.PersonOne>> producers = Lists.newArrayList(producer1, producer2, producer3);

@Cleanup
Consumer<Schemas.PersonOne> consumer1 = client1.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topic.toString())
.subscriptionName(subName)
.subscribe();

@Cleanup
Consumer<Schemas.PersonOne> consumer2 = client2.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topic.toString())
.subscriptionName(subName)
.subscribe();

@Cleanup
Consumer<Schemas.PersonOne> consumer3 = client3.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topic.toString())
.subscriptionName(subName)
.subscribe();

for (int i = 0; i < 3; i++) {
producers.get(i).send(new Schemas.PersonOne(i));
Message<Schemas.PersonOne> msg1 = consumer1.receive();
Message<Schemas.PersonOne> msg2 = consumer2.receive();
Message<Schemas.PersonOne> msg3 = consumer3.receive();
assertTrue(msg1 != null && msg2 != null && msg3 != null);
assertTrue(msg1.getValue().equals(msg2.getValue()) && msg2.getValue().equals(msg3.getValue()));
consumer1.acknowledge(msg1);
consumer2.acknowledge(msg2);
consumer3.acknowledge(msg3);
}
}

@Test
public void testReplicationOverrides() throws Exception {
log.info("--- Starting ReplicatorTest::testReplicationOverrides ---");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class MessageImpl<T> implements Message<T> {
private ByteBuf payload;

private Schema<T> schema;
private SchemaInfo schemaInfoForReplicator;
private SchemaState schemaState = SchemaState.None;
private Optional<EncryptionContext> encryptionCtx = Optional.empty();

Expand Down Expand Up @@ -418,14 +419,29 @@ private void ensureSchemaIsLoaded() {
}
}

private SchemaInfo getSchemaInfo() {
public SchemaInfo getSchemaInfo() {
if (schema == null) {
return null;
}
ensureSchemaIsLoaded();
if (schema instanceof AutoConsumeSchema) {
return ((AutoConsumeSchema) schema).getSchemaInfo(getSchemaVersion());
}
return schema.getSchemaInfo();
}

public void setSchemaInfoForReplicator(SchemaInfo schemaInfo) {
if (msgMetadata.hasReplicatedFrom()) {
this.schemaInfoForReplicator = schemaInfo;
} else {
throw new IllegalArgumentException("Only allowed to set schemaInfoForReplicator for a replicated message.");
}
}

public SchemaInfo getSchemaInfoForReplicator() {
return msgMetadata.hasReplicatedFrom() ? this.schemaInfoForReplicator : null;
}

@Override
public T getValue() {
SchemaInfo schemaInfo = getSchemaInfo();
Expand Down Expand Up @@ -671,6 +687,10 @@ public List<String> getReplicateTo() {
return msgMetadata.getReplicateTosList();
}

public boolean hasReplicateFrom() {
return msgMetadata.hasReplicatedFrom();
}

void setMessageId(MessageIdImpl messageId) {
this.messageId = messageId;
}
Expand All @@ -690,6 +710,9 @@ int getUncompressedSize() {
}

SchemaState getSchemaState() {
if (getSchemaInfo() == null) {
return SchemaState.Ready;
}
return schemaState;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,8 @@ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback call
if (!changeToRegisteringSchemaState()) {
return;
}
SchemaInfo schemaInfo = Optional.ofNullable(msg.getSchemaInternal())
.map(Schema::getSchemaInfo)
SchemaInfo schemaInfo = msg.hasReplicateFrom() ? msg.getSchemaInfoForReplicator() : msg.getSchemaInfo();
schemaInfo = Optional.ofNullable(schemaInfo)
.filter(si -> si.getType().getValue() > 0)
.orElse(Schema.BYTES.getSchemaInfo());
getOrCreateSchemaAsync(cnx, schemaInfo).handle((v, ex) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ private SchemaInfoProvider newSchemaProvider(String topicName) {
return new MultiVersionSchemaInfoProvider(TopicName.get(topicName), this);
}

protected LoadingCache<String, SchemaInfoProvider> getSchemaProviderLoadingCache() {
public LoadingCache<String, SchemaInfoProvider> getSchemaProviderLoadingCache() {
return schemaProviderLoadingCache;
}

Expand Down

0 comments on commit a09bd68

Please sign in to comment.