Skip to content

Commit

Permalink
Pipe: fixed the bug that processor dies when encountered memory short…
Browse files Browse the repository at this point in the history
…age for tablets in a period of time (#12042)

Co-authored-by: Steve Yurong Su <[email protected]>
  • Loading branch information
Caideyipi and SteveYurongSu authored Feb 19, 2024
1 parent 89b9804 commit 1e7c9c0
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -74,7 +73,7 @@ public IoTDBThriftSyncClientManager(
}
}

public void checkClientStatusAndTryReconstructIfNecessary() throws IOException {
public void checkClientStatusAndTryReconstructIfNecessary() {
// reconstruct all dead clients
for (final Map.Entry<TEndPoint, Pair<IoTDBThriftSyncConnectorClient, Boolean>> entry :
endPoint2ClientAndStatus.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
Expand All @@ -42,7 +42,7 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet
private final EnrichedEvent sourceEvent;
private boolean needToReport;

private PipeMemoryBlock allocatedMemoryBlock;
private PipeTabletMemoryBlock allocatedMemoryBlock;

private TabletInsertionDataContainer dataContainer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class PipeMemoryManager {
// threshold, allocations of memory block for tablets will be rejected.
private static final double TABLET_MEMORY_REJECT_THRESHOLD =
PipeConfig.getInstance().getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold();
private long usedMemorySizeInBytesOfTablets;
private volatile long usedMemorySizeInBytesOfTablets;

private final Set<PipeMemoryBlock> allocatedBlocks = new HashSet<>();

Expand All @@ -78,8 +78,13 @@ public synchronized PipeMemoryBlock forceAllocate(long sizeInBytes)
return forceAllocate(sizeInBytes, false);
}

public PipeMemoryBlock forceAllocateWithRetry(Tablet tablet)
public PipeTabletMemoryBlock forceAllocateWithRetry(Tablet tablet)
throws PipeRuntimeOutOfMemoryCriticalException {
if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
// No need to calculate the tablet size, skip it to save time
return new PipeTabletMemoryBlock(0);
}

for (int i = 1; i <= MEMORY_ALLOCATE_MAX_RETRIES; i++) {
if ((double) usedMemorySizeInBytesOfTablets / TOTAL_MEMORY_SIZE_IN_BYTES
< TABLET_MEMORY_REJECT_THRESHOLD) {
Expand All @@ -104,7 +109,8 @@ public PipeMemoryBlock forceAllocateWithRetry(Tablet tablet)
}

synchronized (this) {
final PipeMemoryBlock block = forceAllocate(calculateTabletSizeInBytes(tablet), true);
final PipeTabletMemoryBlock block =
(PipeTabletMemoryBlock) forceAllocate(calculateTabletSizeInBytes(tablet), true);
usedMemorySizeInBytesOfTablets += block.getMemoryUsageInBytes();
return block;
}
Expand All @@ -113,8 +119,9 @@ public PipeMemoryBlock forceAllocateWithRetry(Tablet tablet)
private PipeMemoryBlock forceAllocate(long sizeInBytes, boolean isForTablet)
throws PipeRuntimeOutOfMemoryCriticalException {
if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
// No need to consider isForTablet, for memory control is disabled
return new PipeMemoryBlock(sizeInBytes);
return isForTablet
? new PipeTabletMemoryBlock(sizeInBytes)
: new PipeMemoryBlock(sizeInBytes);
}

for (int i = 1; i <= MEMORY_ALLOCATE_MAX_RETRIES; i++) {
Expand Down Expand Up @@ -155,6 +162,11 @@ public synchronized PipeMemoryBlock forceAllocateIfSufficient(
if (usedThreshold < 0.0f || usedThreshold > 1.0f) {
return null;
}

if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
return new PipeMemoryBlock(sizeInBytes);
}

if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes
&& (float) usedMemorySizeInBytes / TOTAL_MEMORY_SIZE_IN_BYTES < usedThreshold) {
return forceAllocate(sizeInBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public synchronized void collect(Event event) {
} else {
collectEvent(event);
}
} catch (PipeException e) {
throw e;
} catch (Exception e) {
throw new PipeException("Error occurred when collecting events from processor.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ private void onEnrichedEventFailure(@NotNull Throwable throwable) {
throwable);
}

if (retryCount.get() < MAX_RETRY_TIMES) {
retryCount.incrementAndGet();
retryCount.incrementAndGet();
if (retryCount.get() <= MAX_RETRY_TIMES) {
LOGGER.warn(
"Retry executing subtask {} (creation time: {}, simple class: {}), retry count [{}/{}]",
taskID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.apache.iotdb.db.pipe.task.subtask.processor;

import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.pipe.execution.scheduler.PipeSubtaskScheduler;
import org.apache.iotdb.commons.pipe.task.EventSupplier;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.metric.PipeProcessorMetrics;
Expand Down Expand Up @@ -139,6 +141,11 @@ protected boolean executeOnce() throws Exception {
}
}
releaseLastEvent(!isClosed.get() && outputEventCollector.hasNoCollectInvocationAfterReset());
} catch (PipeRuntimeOutOfMemoryCriticalException e) {
LOGGER.info(
"Temporarily out of memory in pipe event processing, will wait for the memory to release.",
e);
return false;
} catch (Exception e) {
if (!isClosed.get()) {
throw new PipeException(
Expand All @@ -162,6 +169,10 @@ public void submitSelf() {
// and the worker will be submitted to the executor
}

public boolean isStoppedByException() {
return lastEvent instanceof EnrichedEvent && retryCount.get() > MAX_RETRY_TIMES;
}

@Override
public void close() {
PipeProcessorMetrics.getInstance().deregister(taskID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

import static org.apache.iotdb.db.pipe.task.subtask.PipeDataNodeSubtask.MAX_RETRY_TIMES;

public class PipeProcessorSubtaskWorker extends WrappedRunnable {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeProcessorSubtaskWorker.class);
Expand Down Expand Up @@ -71,9 +69,7 @@ private boolean runSubtasks() {
boolean canSleepBeforeNextRound = true;

for (final PipeProcessorSubtask subtask : subtasks.keySet()) {
if (subtask.isClosed()
|| !subtask.isSubmittingSelf()
|| MAX_RETRY_TIMES <= subtask.getRetryCount()) {
if (subtask.isClosed() || !subtask.isSubmittingSelf() || subtask.isStoppedByException()) {
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,4 @@ public String getTaskID() {
public long getCreationTime() {
return creationTime;
}

public int getRetryCount() {
return retryCount.get();
}
}

0 comments on commit 1e7c9c0

Please sign in to comment.