Skip to content

Commit

Permalink
ARTEMIS-5305 Improving performance of paging with multiple producers
Browse files Browse the repository at this point in the history
This is improving how locking works by using a single threaded executor on paging and depaging
as well as batching writes instead of directly writing on the files from the producers.
  • Loading branch information
clebertsuconic committed Feb 13, 2025
1 parent 9e09244 commit 1608561
Show file tree
Hide file tree
Showing 45 changed files with 1,544 additions and 480 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
Expand Down Expand Up @@ -114,11 +113,6 @@ default PagingStore enforceAddressFullMessagePolicy(AddressFullMessagePolicy enf
* It's ok to look for this with an estimate on starting a task or not, but you will need to recheck on actual paging operations. */
boolean isPaging();

/**
* Schedules sync to the file storage.
*/
void addSyncPoint(OperationContext context) throws Exception;

/**
* Performs a real sync on the current IO file.
*/
Expand Down Expand Up @@ -162,7 +156,11 @@ default PagingStore enforceAddressFullMessagePolicy(AddressFullMessagePolicy enf

Page removePage(int pageId);

void forceAnotherPage() throws Exception;
default void forceAnotherPage() throws Exception {
forceAnotherPage(false);
}

void forceAnotherPage(boolean useExecutor) throws Exception;

Page getCurrentPage();

Expand Down Expand Up @@ -204,19 +202,32 @@ default void addSize(int size) {
*/
boolean checkReleasedMemory();

default void writeLock() {
writeLock(-1);
}
/**
* Write lock the PagingStore.
*
* @param timeout milliseconds to wait for the lock. If value is {@literal -1} then wait
* indefinitely.
* @return {@code true} if the lock was obtained, {@code false} otherwise
*/
boolean lock(long timeout);
boolean writeLock(long timeout);

default void readLock() {
readLock(-1);
}
default boolean readLock(long timeout) {
return true;
}

default void readUnlock() {
}

/**
* Releases locks acquired with {@link PagingStore#lock(long)}.
* Releases locks acquired with {@link PagingStore#writeLock(long)}.
*/
void unlock();
void writeUnlock();

/**
* This is used mostly by tests.
Expand Down Expand Up @@ -265,6 +276,10 @@ default void addSize(int size) {

void unblock();

default boolean hasPendingIO() {
return false;
}

default StorageManager getStorageManager() {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public PageCounterRebuildManager(PagingManager pagingManager, PagingStore store,
/** this method will perform the copy from Acked recorded from the subscription into a separate data structure.
* So we can count data while we consolidate at the end */
private void initialize(PagingStore store) {
store.lock(-1);
store.writeLock();
try {
try {
paging = store.isPaging();
Expand Down Expand Up @@ -134,7 +134,7 @@ private void initialize(PagingStore store) {
});
});
} finally {
store.unlock();
store.writeUnlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ protected void cleanup() {
try (ArtemisCloseable readLock = storageManager.closeableReadLock()) {

while (true) {
if (pagingStore.lock(100)) {
if (pagingStore.writeLock(1_000)) {
break;
}
if (!pagingStore.isStarted())
Expand Down Expand Up @@ -318,8 +318,8 @@ protected void cleanup() {

assert pagingStore.getNumberOfPages() >= 0;

if (pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() == 1 && (pagingStore.getCurrentPage() == null || pagingStore.getCurrentPage().getNumberOfMessages() == 0)) {
logger.trace("StopPaging being called on {}", pagingStore);
if (!pagingStore.hasPendingIO() && (pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() == 1 && (pagingStore.getCurrentPage() == null || pagingStore.getCurrentPage().getNumberOfMessages() == 0))) {
logger.trace("StopPaging being called on {}, pending={}", pagingStore, pagingStore.hasPendingIO());
pagingStore.stopPaging();
} else {
if (logger.isTraceEnabled()) {
Expand All @@ -333,7 +333,7 @@ protected void cleanup() {
return;
} finally {
logger.trace("<<<< Cleanup end on {}", pagingStore.getAddress());
pagingStore.unlock();
pagingStore.writeUnlock();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ public String debugMessages() throws Exception {
return sb.toString();
}

public synchronized void write(final PagedMessage message) throws Exception {
public synchronized void write(final PagedMessage message, boolean lineUp, boolean originallyReplicated) throws Exception {
writeDirect(message);
storageManager.pageWrite(storeName, message, pageId);
storageManager.pageWrite(storeName, message, pageId, lineUp, originallyReplicated);
}

/** This write will not interact back with the storage manager.
Expand Down

This file was deleted.

Loading

0 comments on commit 1608561

Please sign in to comment.