Skip to content

Commit

Permalink
ARTEMIS-5305 Improving performance of paging with multiple producers
Browse files Browse the repository at this point in the history
This is improving how locking works by using a single threaded executor on paging and depaging
as well as batching writes instead of directly writing on the files from the producers.
  • Loading branch information
clebertsuconic committed Feb 18, 2025
1 parent 2c7378b commit 0cdd5fb
Show file tree
Hide file tree
Showing 50 changed files with 1,586 additions and 502 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,12 @@ protected void initializeJournal(Configuration configuration) throws Exception {

PagingStoreFactory pageStoreFactory = new PagingStoreFactoryDatabase((DatabaseStorageConfiguration) configuration.getStoreConfiguration(),
storageManager, 1000L,
scheduledExecutorService, executorFactory, executorFactory,
scheduledExecutorService, executorFactory,
false, null);
pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository, configuration.getManagementAddress());
} else {
storageManager = new JournalStorageManager(config, EmptyCriticalAnalyzer.getInstance(), executorFactory, executorFactory);
PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), 1000L, scheduledExecutorService, executorFactory, executorFactory, true, null);
PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), 1000L, scheduledExecutorService, executorFactory, true, null);
pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository, configuration.getManagementAddress());

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ private static void printPages(File pageDirectory, DescribeJournal describeJourn
try {

final StorageManager sm = new NullStorageManager();
PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(sm, pageDirectory, 1000L, scheduled, execfactory, execfactory, false, null);
PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(sm, pageDirectory, 1000L, scheduled, execfactory, false, null);
HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<>();
addressSettingsRepository.setDefault(new AddressSettings());
PagingManager manager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
Expand Down Expand Up @@ -114,11 +113,6 @@ default PagingStore enforceAddressFullMessagePolicy(AddressFullMessagePolicy enf
* It's ok to look for this with an estimate on starting a task or not, but you will need to recheck on actual paging operations. */
boolean isPaging();

/**
* Schedules sync to the file storage.
*/
void addSyncPoint(OperationContext context) throws Exception;

/**
* Performs a real sync on the current IO file.
*/
Expand Down Expand Up @@ -162,7 +156,11 @@ default PagingStore enforceAddressFullMessagePolicy(AddressFullMessagePolicy enf

Page removePage(int pageId);

void forceAnotherPage() throws Exception;
default void forceAnotherPage() throws Exception {
forceAnotherPage(false);
}

void forceAnotherPage(boolean useExecutor) throws Exception;

Page getCurrentPage();

Expand Down Expand Up @@ -204,19 +202,27 @@ default void addSize(int size) {
*/
boolean checkReleasedMemory();

void writeLock();

/**
* Write lock the PagingStore.
*
* @param timeout milliseconds to wait for the lock. If value is {@literal -1} then wait
* indefinitely.
* @return {@code true} if the lock was obtained, {@code false} otherwise
*/
boolean lock(long timeout);
boolean writeLock(long timeout);

/**
* Releases locks acquired with {@link PagingStore#lock(long)}.
* Releases locks acquired with {@link PagingStore#writeLock(long)}.
*/
void unlock();
void writeUnlock();

void readLock();

boolean readLock(long timeout);

void readUnlock();

/**
* This is used mostly by tests.
Expand Down Expand Up @@ -265,6 +271,10 @@ default void addSize(int size) {

void unblock();

default boolean hasPendingIO() {
return false;
}

default StorageManager getStorageManager() {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public PageCounterRebuildManager(PagingManager pagingManager, PagingStore store,
/** this method will perform the copy from Acked recorded from the subscription into a separate data structure.
* So we can count data while we consolidate at the end */
private void initialize(PagingStore store) {
store.lock(-1);
store.writeLock();
try {
try {
paging = store.isPaging();
Expand Down Expand Up @@ -134,7 +134,7 @@ private void initialize(PagingStore store) {
});
});
} finally {
store.unlock();
store.writeUnlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ protected void cleanup() {
try (ArtemisCloseable readLock = storageManager.closeableReadLock()) {

while (true) {
if (pagingStore.lock(100)) {
if (pagingStore.writeLock(1_000)) {
break;
}
if (!pagingStore.isStarted())
Expand Down Expand Up @@ -318,8 +318,8 @@ protected void cleanup() {

assert pagingStore.getNumberOfPages() >= 0;

if (pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() == 1 && (pagingStore.getCurrentPage() == null || pagingStore.getCurrentPage().getNumberOfMessages() == 0)) {
logger.trace("StopPaging being called on {}", pagingStore);
if (!pagingStore.hasPendingIO() && (pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() == 1 && (pagingStore.getCurrentPage() == null || pagingStore.getCurrentPage().getNumberOfMessages() == 0))) {
logger.trace("StopPaging being called on {}, pending={}", pagingStore, pagingStore.hasPendingIO());
pagingStore.stopPaging();
} else {
if (logger.isTraceEnabled()) {
Expand All @@ -333,7 +333,7 @@ protected void cleanup() {
return;
} finally {
logger.trace("<<<< Cleanup end on {}", pagingStore.getAddress());
pagingStore.unlock();
pagingStore.writeUnlock();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ public String debugMessages() throws Exception {
return sb.toString();
}

public synchronized void write(final PagedMessage message) throws Exception {
public synchronized void write(final PagedMessage message, boolean lineUp, boolean originallyReplicated) throws Exception {
writeDirect(message);
storageManager.pageWrite(storeName, message, pageId);
storageManager.pageWrite(storeName, message, pageId, lineUp, originallyReplicated);
}

/** This write will not interact back with the storage manager.
Expand Down

This file was deleted.

Loading

0 comments on commit 0cdd5fb

Please sign in to comment.