Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP-88] Replicate schemas across clusters #11441

Merged
merged 4 commits into from
Aug 8, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
Expand All @@ -41,6 +43,7 @@ public abstract class AbstractReplicator {
protected final String topicName;
protected final String localCluster;
protected final String remoteCluster;
protected final PulsarClientImpl replicationClient;
protected final PulsarClientImpl client;

protected volatile ProducerImpl producer;
Expand All @@ -63,18 +66,19 @@ protected enum State {
}

public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster,
BrokerService brokerService) throws NamingException {
BrokerService brokerService) throws NamingException, PulsarServerException {
validatePartitionedTopic(topicName, brokerService);
this.brokerService = brokerService;
this.topicName = topicName;
this.replicatorPrefix = replicatorPrefix;
this.localCluster = localCluster.intern();
this.remoteCluster = remoteCluster.intern();
this.client = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster);
this.replicationClient = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster);
this.client = (PulsarClientImpl) brokerService.pulsar().getClient();
this.producer = null;
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();

this.producerBuilder = client.newProducer() //
this.producerBuilder = replicationClient.newProducer(Schema.AUTO_PRODUCE_BYTES()) //
.topic(topicName)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.enableBatching(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
Expand All @@ -48,7 +49,7 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
private final NonPersistentReplicatorStatsImpl stats = new NonPersistentReplicatorStatsImpl();

public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster,
BrokerService brokerService) throws NamingException {
BrokerService brokerService) throws NamingException, PulsarServerException {
super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);

producerBuilder.blockIfQueueFull(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractTopic;
Expand Down Expand Up @@ -554,7 +555,7 @@ protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic
replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new NonPersistentReplicator(NonPersistentTopic.this, localCluster, remoteCluster, brokerService);
} catch (NamingException e) {
} catch (NamingException | PulsarServerException e) {
isReplicatorStarted.set(false);
log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand All @@ -41,6 +42,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
Expand All @@ -56,6 +58,7 @@
import org.apache.pulsar.common.api.proto.MarkerType;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
Expand Down Expand Up @@ -102,7 +105,7 @@ public class PersistentReplicator extends AbstractReplicator
private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();

public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
BrokerService brokerService) throws NamingException {
BrokerService brokerService) throws NamingException, PulsarServerException {
super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
this.topic = topic;
this.cursor = cursor;
Expand Down Expand Up @@ -358,7 +361,15 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {

headersAndPayload.retain();

producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
getSchemaInfo(msg).thenAccept(schemaInfo -> {
msg.setSchemaInfo(schemaInfo);
producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
}).exceptionally(ex -> {
log.error("[{}][{} -> {}] Failed to get schema from local cluster", topicName,
localCluster, remoteCluster, ex);
return null;
});

atLeastOneMessageSentForReplication = true;
}
} catch (Exception e) {
Expand All @@ -379,6 +390,14 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
}
}

private CompletableFuture<SchemaInfo> getSchemaInfo(MessageImpl msg) throws ExecutionException {
if (msg.getSchemaVersion() == null || msg.getSchemaVersion().length == 0) {
return CompletableFuture.completedFuture(null);
}
return client.getSchemaProviderLoadingCache().get(topicName)
.getSchemaByVersion(msg.getSchemaVersion());
}

public void updateCursorState() {
if (this.cursor != null) {
if (producer != null && producer.isConnected()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1424,7 +1424,7 @@ protected boolean addReplicationCluster(String remoteCluster, ManagedCursor curs
try {
return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
brokerService);
} catch (NamingException e) {
} catch (NamingException | PulsarServerException e) {
isReplicatorStarted.set(false);
log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public void setup() throws Exception {

mlFactoryMock = mock(ManagedLedgerFactory.class);
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
doReturn(mock(PulsarClientImpl.class)).when(pulsar).getClient();

ZooKeeper mockZk = createMockZooKeeper();
doReturn(mockZk).when(pulsar).getZkClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.schema.Schemas;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
Expand Down Expand Up @@ -375,6 +376,62 @@ public void testReplication(String namespace) throws Exception {
consumer3.receive(1);
}

@Test
public void testReplicationWithSchema() throws Exception {
PulsarClient client1 = pulsar1.getClient();
PulsarClient client2 = pulsar2.getClient();
PulsarClient client3 = pulsar3.getClient();
final TopicName topic = TopicName
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/testReplicationWithSchema"));

final String subName = "my-sub";

@Cleanup
Producer<Schemas.PersonOne> producer1 = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topic.toString())
.create();
@Cleanup
Producer<Schemas.PersonOne> producer2 = client2.newProducer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topic.toString())
.create();
@Cleanup
Producer<Schemas.PersonOne> producer3 = client3.newProducer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topic.toString())
.create();

List<Producer<Schemas.PersonOne>> producers = Lists.newArrayList(producer1, producer2, producer3);

@Cleanup
Consumer<Schemas.PersonOne> consumer1 = client1.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topic.toString())
.subscriptionName(subName)
.subscribe();

@Cleanup
Consumer<Schemas.PersonOne> consumer2 = client2.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topic.toString())
.subscriptionName(subName)
.subscribe();

@Cleanup
Consumer<Schemas.PersonOne> consumer3 = client3.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topic.toString())
.subscriptionName(subName)
.subscribe();

for (int i = 0; i < 3; i++) {
producers.get(i).send(new Schemas.PersonOne(i));
Message<Schemas.PersonOne> msg1 = consumer1.receive();
Message<Schemas.PersonOne> msg2 = consumer2.receive();
Message<Schemas.PersonOne> msg3 = consumer3.receive();
assertTrue(msg1 != null && msg2 != null && msg3 != null);
assertTrue(msg1.getValue().equals(msg2.getValue()) && msg2.getValue().equals(msg3.getValue()));
consumer1.acknowledge(msg1);
consumer2.acknowledge(msg2);
consumer3.acknowledge(msg3);
}
}

@Test
public void testReplicationOverrides() throws Exception {
log.info("--- Starting ReplicatorTest::testReplicationOverrides ---");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class MessageImpl<T> implements Message<T> {
private ByteBuf payload;

private Schema<T> schema;
private SchemaInfo schemaInfo;
Copy link
Contributor

@315157973 315157973 Jul 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little confused about the modification.
We can get schemaInfo from schema now.
Can we use public static Schema<?> getSchema(SchemaInfo schemaInfo) in AutoConsumeSchema to convert schemaInfo to Schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the schema replication, we don't need to convert schemaInfo to schema for the local topic and then convert the schema to the schemaInfo for the remote topic. Or we only need to pass the schemaInfo to the message which need to replicate to the remote cluster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a schema, but we added a schemaInfo, which will make other developers very confused. If we can't reuse existing attributes, we can at least do this:

  1. Make the new properties easier to understand, such as: naming modification, or adding comments, so that other developers can know schemaInfo only for replicator
  2. Independent get/set

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I second @315157973 concern.
We already have "getReaderSchema", why cannot we use that ?

private SchemaState schemaState = SchemaState.None;
private Optional<EncryptionContext> encryptionCtx = Optional.empty();

Expand Down Expand Up @@ -418,14 +419,24 @@ private void ensureSchemaIsLoaded() {
}
}

private SchemaInfo getSchemaInfo() {
public SchemaInfo getSchemaInfo() {
if (schemaInfo != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is quite confusing, we should use what the internal Schema is reporting

return schemaInfo;
}
if (schema == null) {
return null;
}
ensureSchemaIsLoaded();
if (schema instanceof AutoConsumeSchema) {
return ((AutoConsumeSchema) schema).getSchemaInfo(getSchemaVersion());
}
return schema.getSchemaInfo();
}

public void setSchemaInfo(SchemaInfo schemaInfo) {
this.schemaInfo = schemaInfo;
}

@Override
public T getValue() {
SchemaInfo schemaInfo = getSchemaInfo();
Expand Down Expand Up @@ -690,6 +701,9 @@ int getUncompressedSize() {
}

SchemaState getSchemaState() {
if (getSchemaInfo() == null) {
return SchemaState.Ready;
}
return schemaState;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,7 @@ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback call
if (!changeToRegisteringSchemaState()) {
return;
}
SchemaInfo schemaInfo = Optional.ofNullable(msg.getSchemaInternal())
.map(Schema::getSchemaInfo)
SchemaInfo schemaInfo = Optional.ofNullable(msg.getSchemaInfo())
.filter(si -> si.getType().getValue() > 0)
.orElse(Schema.BYTES.getSchemaInfo());
getOrCreateSchemaAsync(cnx, schemaInfo).handle((v, ex) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ private SchemaInfoProvider newSchemaProvider(String topicName) {
return new MultiVersionSchemaInfoProvider(TopicName.get(topicName), this);
}

protected LoadingCache<String, SchemaInfoProvider> getSchemaProviderLoadingCache() {
public LoadingCache<String, SchemaInfoProvider> getSchemaProviderLoadingCache() {
return schemaProviderLoadingCache;
}

Expand Down