Skip to content

Commit

Permalink
ARTEMIS-5002 AMQP producer not unblock if the disk space is freed
Browse files Browse the repository at this point in the history
  • Loading branch information
howardgao authored and clebertsuconic committed Feb 12, 2025
1 parent 9d5feca commit 4e5fd4a
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
Expand Down Expand Up @@ -187,6 +188,10 @@ public void addBlockedStore(PagingStore store) {
blockedStored.add(store);
}

public Set<PagingStore> getBlockedSet() {
return new HashSet<>(blockedStored);
}

@Override
public void onChange() {
reapplySettings();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,7 @@ public boolean checkMemory(boolean runOnFailure, Runnable runWhenAvailableParame
if (isFull()) {
if (runOnFailure && runWhenAvailable != null) {
addToBlockList(runWhenAvailable, blockedCallback);
pagingManager.addBlockedStore(this);
}
return false;
}
Expand Down Expand Up @@ -1178,7 +1179,7 @@ public boolean checkReleasedMemory() {
}
}

return false;
return !blocking;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ public FileStoreMonitor addCallback(Callback callback) {
}
}

public FileStoreMonitor removeCallback(Callback callback) {
synchronized (monitorLock) {
callbackList.remove(callback);
}
return this;
}

public FileStoreMonitor addStore(File file) throws IOException {
synchronized (monitorLock) {
// JDBC storage may return this as null, and we may need to ignore it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,89 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameter;
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith(ParameterizedTestExtension.class)
public class GlobalDiskFullTest extends AmqpClientTestSupport {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

@Parameter(index = 0)
public AddressFullMessagePolicy addressFullPolicy;

@Parameters(name = "addressFullPolicy={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{AddressFullMessagePolicy.FAIL}, {AddressFullMessagePolicy.DROP}, {AddressFullMessagePolicy.PAGE}
});
}

@Override
protected void configureAddressPolicy(ActiveMQServer server) {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(addressFullPolicy);
server.getConfiguration().addAddressSetting(getQueueName(), addressSettings);
}

@Override
protected void addConfiguration(ActiveMQServer server) {
Configuration serverConfig = server.getConfiguration();
serverConfig.setDiskScanPeriod(100);
}

@Test
public void testProducerOnDiskFull() throws Exception {
FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0);
final CountDownLatch latch = new CountDownLatch(1);
monitor.addCallback((usableSpace, totalSpace, ok, type) -> {
protected void waitMonitor(FileStoreMonitor monitor) throws Exception {
CountDownLatch latch = new CountDownLatch(1);

FileStoreMonitor.Callback callback = (a, b, c, d) -> {
latch.countDown();
});
};

assertTrue(latch.await(1, TimeUnit.MINUTES));
monitor.addCallback(callback);

assertTrue(latch.await(10, TimeUnit.SECONDS));

monitor.removeCallback(callback);
}


@TestTemplate
public void testProducerOnDiskFull() throws Exception {

FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor();

waitMonitor(monitor);

//make it full
monitor.setMaxUsage(0.0);

waitMonitor(monitor);

AmqpClient client = createAmqpClient(new URI("tcp://localhost:" + AMQP_PORT));
AmqpConnection connection = addConnection(client.connect());
Expand All @@ -60,59 +111,57 @@ public void testProducerOnDiskFull() throws Exception {
AmqpSender sender = session.createSender(getQueueName());
byte[] payload = new byte[1000];


AmqpSender anonSender = session.createSender();

CountDownLatch sentWithName = new CountDownLatch(1);
CountDownLatch sentAnon = new CountDownLatch(1);

Thread threadWithName = new Thread(() -> {
ExecutorService pool = Executors.newCachedThreadPool();
runAfter(pool::shutdownNow);

pool.execute(() -> {
try {
final AmqpMessage message = new AmqpMessage();
message.setBytes(payload);
sender.setSendTimeout(-1);
sender.send(message);
} catch (Exception e) {
e.printStackTrace();
logger.warn("Caught exception while sending", e);
} finally {
sentWithName.countDown();
}
});

threadWithName.start();


Thread threadWithAnon = new Thread(() -> {
pool.execute(()-> {
try {
final AmqpMessage message = new AmqpMessage();
message.setBytes(payload);
anonSender.setSendTimeout(-1);
message.setAddress(getQueueName());
anonSender.send(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
sentAnon.countDown();
} catch (Exception e) {
logger.warn("Caught exception while sending", e);
}
});

threadWithAnon.start();
PagingManagerImpl pagingManager = (PagingManagerImpl) server.getPagingManager();
Wait.assertTrue(() -> pagingManager.getBlockedSet().size() > 0, 5000);

assertFalse(sentWithName.await(500, TimeUnit.MILLISECONDS), "Thread sender should be blocked");
assertFalse(sentAnon.await(500, TimeUnit.MILLISECONDS), "Thread sender anonymous should be blocked");
assertFalse(sentWithName.await(100, TimeUnit.MILLISECONDS), "Thread sender should be blocked");
assertFalse(sentAnon.await(100, TimeUnit.MILLISECONDS), "Thread sender anonymous should be blocked");

// unblock
monitor.setMaxUsage(100.0);

waitMonitor(monitor);

assertTrue(sentWithName.await(30, TimeUnit.SECONDS), "Thread sender should be released");
assertTrue(sentAnon.await(30, TimeUnit.SECONDS), "Thread sender anonymous should be released");

threadWithName.join(TimeUnit.SECONDS.toMillis(30));
threadWithAnon.join(TimeUnit.SECONDS.toMillis(30));
assertFalse(threadWithName.isAlive());
assertFalse(threadWithAnon.isAlive());
Wait.assertEquals(0, () -> pagingManager.getBlockedSet().size(), 5000);
} finally {
connection.close();
}
}

}

0 comments on commit 4e5fd4a

Please sign in to comment.