Skip to content

Commit

Permalink
[fix][schema]ledger handle leak when update schema
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Aug 25, 2022
1 parent 3958aa6 commit f3672a3
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -449,11 +449,12 @@ private CompletableFuture<SchemaStorageFormat.PositionInfo> addNewSchemaEntryToS
byte[] data
) {
SchemaStorageFormat.SchemaEntry schemaEntry = newSchemaEntry(index, data);
return createLedger(schemaId).thenCompose(ledgerHandle ->
addEntry(ledgerHandle, schemaEntry).thenApply(entryId ->
Functions.newPositionInfo(ledgerHandle.getId(), entryId)
)
);
return createLedger(schemaId).thenCompose(ledgerHandle -> {
final long ledgerId = ledgerHandle.getId();
return addEntry(ledgerHandle, schemaEntry)
.thenCompose(entryId -> ledgerHandle.closeAsync().thenApply(__ -> entryId))
.thenApply(entryId -> Functions.newPositionInfo(ledgerId, entryId));
});
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@
package org.apache.pulsar.schema.compatibility;

import static java.nio.charset.StandardCharsets.UTF_8;
import static net.bytebuddy.jar.asm.Opcodes.ACC_PUBLIC;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.jar.asm.ClassWriter;
import net.bytebuddy.jar.asm.Opcodes;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
Expand Down Expand Up @@ -478,7 +483,56 @@ public void testProducerSendWithOldSchemaAndConsumerCanRead(SchemaCompatibilityS

consumerOne.close();
producerOne.close();
}

@Test
public void testSchemaLedgerAutoRelease() throws Exception {
String namespaceName = PUBLIC_TENANT + "/default";
String topicName = "persistent://" + namespaceName + "/tp";
admin.namespaces().createNamespace(namespaceName, Sets.newHashSet(CLUSTER_NAME));
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
// Update schema 100 times.
ArrayList<Class> classes = createManyClass(classLoader, 100);
for (int i = 0; i < classes.size(); i++){
Schema schema = Schema.KeyValue(Integer.class, classes.get(i));
Producer producer = pulsarClient
.newProducer(schema)
.topic(topicName)
.create();
producer.close();
}
// The other ledgers are about 5.
Assert.assertTrue(mockBookKeeper.getLedgerMap().values().stream()
.filter(ledger -> !ledger.isFenced())
.collect(Collectors.toList()).size() < 20);
admin.topics().delete(topicName, true);
}

private ArrayList<Class> createManyClass(ClassLoader classLoader, int count){
DynamicClassLoader dynamicClassLoader = new DynamicClassLoader(classLoader);
ArrayList<Class> list = new ArrayList<>(count);
for (int i = 0; i < count; i++){
ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES);
String className = "Dynamic_GEN_Class_" + i;
cw.visit(Opcodes.V17, ACC_PUBLIC, className, null, "java/lang/Object", null);
cw.visitEnd();
byte[] bytes = cw.toByteArray();
Class c = dynamicClassLoader.defineClass(className, bytes);
list.add(c);
}
return list;
}

private static class DynamicClassLoader extends ClassLoader{

private DynamicClassLoader(ClassLoader parent) {
super(parent);
}

private Class<?> defineClass(String name, byte[] bytes) throws ClassFormatError {
return defineClass(name, bytes, 0, bytes.length);
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.client;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand All @@ -29,6 +30,7 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import lombok.Getter;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
Expand Down Expand Up @@ -58,6 +60,8 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
final byte[] passwd;
final ReadHandle readHandle;
long lastEntry = -1;
@VisibleForTesting
@Getter
boolean fenced = false;

public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
Expand Down

0 comments on commit f3672a3

Please sign in to comment.