Skip to content

Commit

Permalink
[fix][schema]ledger handle leak when update schema (#17283)
Browse files Browse the repository at this point in the history
in the schema update, will create a `ledgerHandle` and write data to BK, after that `ledgerHandle` is no longer useful and no other object holds references to it. `ledgerHandle` will be recycled with GC, but `ledgerHandle` also hold external connections, which will cause leakage.

https://github.com/apache/pulsar/blob/40b9d7ea50cef54becb09f2543193e08375abe0b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java#L452-L456

after the schema is updated, close the `ledgerHandle`, just like schema-read:

https://github.com/apache/pulsar/blob/40b9d7ea50cef54becb09f2543193e08375abe0b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java#L519-L525
(cherry picked from commit 2620450)
  • Loading branch information
poorbarcode authored and congbobo184 committed Nov 26, 2022
1 parent 68f7cde commit 6398330
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -452,11 +452,14 @@ private CompletableFuture<SchemaStorageFormat.PositionInfo> addNewSchemaEntryToS
byte[] data
) {
SchemaStorageFormat.SchemaEntry schemaEntry = newSchemaEntry(index, data);
return createLedger(schemaId).thenCompose(ledgerHandle ->
addEntry(ledgerHandle, schemaEntry).thenApply(entryId ->
Functions.newPositionInfo(ledgerHandle.getId(), entryId)
)
);
return createLedger(schemaId).thenCompose(ledgerHandle -> {
final long ledgerId = ledgerHandle.getId();
return addEntry(ledgerHandle, schemaEntry)
.thenApply(entryId -> {
ledgerHandle.closeAsync();
return Functions.newPositionInfo(ledgerId, entryId);
});
});
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -478,6 +479,33 @@ public void testProducerSendWithOldSchemaAndConsumerCanRead(SchemaCompatibilityS

consumerOne.close();
producerOne.close();
}

@Test
public void testSchemaLedgerAutoRelease() throws Exception {
String namespaceName = PUBLIC_TENANT + "/default";
String topicName = "persistent://" + namespaceName + "/tp";
admin.namespaces().createNamespace(namespaceName, Sets.newHashSet(CLUSTER_NAME));
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
// Update schema 100 times.
for (int i = 0; i < 100; i++){
Schema schema = Schema.JSON(SchemaDefinition.builder()
.withJsonDef(String.format("{\"type\": \"record\",\"name\": "
+ "\"Test_Pojo\",\"namespace\": \"org.apache.pulsar.schema.compatibility\","
+ "\"fields\": [{\"name\": \"prop_%s\",\"type\": "
+ "[\"null\", \"string\"],\"default\": null}]}", i))
.build());
Producer producer = pulsarClient
.newProducer(schema)
.topic(topicName)
.create();
producer.close();
}
// The other ledgers are about 5.
Assert.assertTrue(mockBookKeeper.getLedgerMap().values().stream()
.filter(ledger -> !ledger.isFenced())
.collect(Collectors.toList()).size() < 20);
admin.topics().delete(topicName, true);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import lombok.AccessLevel;
import lombok.Getter;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.client;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;

import io.netty.buffer.ByteBuf;
Expand All @@ -31,7 +32,7 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;

import lombok.Getter;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
Expand All @@ -44,7 +45,6 @@
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Versioned;
import org.slf4j.Logger;
Expand All @@ -62,6 +62,8 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
final byte[] passwd;
final ReadHandle readHandle;
long lastEntry = -1;
@VisibleForTesting
@Getter
boolean fenced = false;

public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
Expand Down

0 comments on commit 6398330

Please sign in to comment.