Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OBSDEF-13969] add delay in read/write path in SLTS #1

Open
wants to merge 8 commits into
base: release-objectscale-1.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,12 @@ private void processOperations(Queue<CompletableOperation> operations) {
// Process the current set of operations.
while (!operations.isEmpty()) {
CompletableOperation o = operations.poll();
this.metrics.operationQueueWaitTime(o.getTimer().getElapsedMillis());
long elapsed = o.getTimer().getElapsedMillis();
this.metrics.operationQueueWaitTime(elapsed);
try {
if (o.getOperation().getDesiredPriority() != null && o.getOperation().getDesiredPriority().isThrottlingExempt()) {
realAaronWu marked this conversation as resolved.
Show resolved Hide resolved
log.debug("queue wait time in container {} for critical operation {} is {}", this.traceObjectId, o.getOperation(), elapsed);
}
processOperation(o);
this.state.addPending(o);
count++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class ThrottlerCalculator {
* Maximum size (in number of operations) of the OperationLog, above which maximum throttling will be applied.
*/
@VisibleForTesting
static final int OPERATION_LOG_MAX_SIZE = 1_000_000;
static final int OPERATION_LOG_MAX_SIZE = 50_000;
realAaronWu marked this conversation as resolved.
Show resolved Hide resolved
/**
* Desired size (in number of operations) of the OperationLog, above which a gradual throttling will begin.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ public CompletableFuture<Void> write(SegmentHandle handle, long offset, InputStr
if (null == handle) {
return CompletableFuture.failedFuture(new IllegalArgumentException("handle must not be null"));
}
return executeSerialized(new WriteOperation(this, handle, offset, data, length), handle.getSegmentName());
return executeSerialized(new WriteOperation(this, handle, offset, data, length, config.getHackSLTSWriteDelayMillis()), handle.getSegmentName());
}

/**
Expand Down Expand Up @@ -626,7 +626,7 @@ public CompletableFuture<SegmentHandle> openRead(String streamSegmentName) {
@Override
public CompletableFuture<Integer> read(SegmentHandle handle, long offset, byte[] buffer, int bufferOffset, int length, Duration timeout) {
checkInitialized();
return executeParallel(new ReadOperation(this, handle, offset, buffer, bufferOffset, length), handle.getSegmentName());
return executeParallel(new ReadOperation(this, handle, offset, buffer, bufferOffset, length, config.getHackSLTSReadDelayMillis()), handle.getSegmentName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public class ChunkedSegmentStorageConfig {
public static final Property<Integer> GARBAGE_COLLECTION_MAX_TXN_BATCH_SIZE = Property.named("garbage.collection.txn.batch.size.max", 5000);
public static final Property<Integer> MAX_METADATA_ENTRIES_IN_BUFFER = Property.named("metadata.buffer.size.max", 1024);
public static final Property<Integer> MAX_METADATA_ENTRIES_IN_CACHE = Property.named("metadata.cache.size.max", 5000);

public static final Property<Integer> HACK_SLTS_READ_DELAY_MILLIS = Property.named("hack.slts.read.delay.millis", 0);
public static final Property<Integer> HACK_SLTS_WRITE_DELAY_MILLIS = Property.named("hack.slts.write.delay.millis", 0);
public static final Property<Integer> JOURNAL_SNAPSHOT_UPDATE_FREQUENCY = Property.named("journal.snapshot.update.frequency.minutes", 5);
public static final Property<Integer> MAX_PER_SNAPSHOT_UPDATE_COUNT = Property.named("journal.snapshot.update.count.max", 100);
public static final Property<Integer> MAX_JOURNAL_READ_ATTEMPTS = Property.named("journal.snapshot.attempts.read.max", 100);
Expand Down Expand Up @@ -89,6 +90,8 @@ public class ChunkedSegmentStorageConfig {
.maxJournalReadAttempts(100)
.maxJournalWriteAttempts(10)
.selfCheckEnabled(false)
.hackSLTSReadDelayMillis(0)
.hackSLTSWriteDelayMillis(0)
.build();

static final String COMPONENT_CODE = "storage";
Expand Down Expand Up @@ -247,6 +250,18 @@ public class ChunkedSegmentStorageConfig {
@Getter
final private boolean selfCheckEnabled;

/**
* hack config for inject random write delays in SLTS path
*/
@Getter
final private int hackSLTSWriteDelayMillis;

/**
* hack config for inject random write delays in SLTS path
*/
@Getter
final private int hackSLTSReadDelayMillis;

/**
* Creates a new instance of the ChunkedSegmentStorageConfig class.
*
Expand Down Expand Up @@ -279,6 +294,8 @@ public class ChunkedSegmentStorageConfig {
this.indexBlockSize = properties.getLong(READ_INDEX_BLOCK_SIZE);
this.maxEntriesInTxnBuffer = properties.getInt(MAX_METADATA_ENTRIES_IN_BUFFER);
this.maxEntriesInCache = properties.getInt(MAX_METADATA_ENTRIES_IN_CACHE);
this.hackSLTSWriteDelayMillis = properties.getInt(HACK_SLTS_WRITE_DELAY_MILLIS);
this.hackSLTSReadDelayMillis = properties.getInt(HACK_SLTS_READ_DELAY_MILLIS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -60,6 +61,7 @@ class ReadOperation implements Callable<CompletableFuture<Integer>> {
private final byte[] buffer;
private final int bufferOffset;
private final int length;
private final int hackDelayMillis;
private final ChunkedSegmentStorage chunkedSegmentStorage;
private final long traceId;
private final Timer timer;
Expand All @@ -76,7 +78,7 @@ class ReadOperation implements Callable<CompletableFuture<Integer>> {
private volatile int bytesToRead;
private final AtomicInteger cntChunksRead = new AtomicInteger();

ReadOperation(ChunkedSegmentStorage chunkedSegmentStorage, SegmentHandle handle, long offset, byte[] buffer, int bufferOffset, int length) {
ReadOperation(ChunkedSegmentStorage chunkedSegmentStorage, SegmentHandle handle, long offset, byte[] buffer, int bufferOffset, int length, int hackDelayMillis) {
this.handle = handle;
this.offset = offset;
this.buffer = buffer;
Expand All @@ -85,6 +87,8 @@ class ReadOperation implements Callable<CompletableFuture<Integer>> {
this.chunkedSegmentStorage = chunkedSegmentStorage;
traceId = LoggerHelpers.traceEnter(log, "read", handle, offset, length);
timer = new Timer();
this.hackDelayMillis = hackDelayMillis;
log.info("inject read delay for {} ms", hackDelayMillis);
}

@Override
Expand All @@ -111,6 +115,8 @@ public CompletableFuture<Integer> call() {
// Now read.
return readData(txn);
}, chunkedSegmentStorage.getExecutor())
// HACK: delay a while after read chunk
.thenComposeAsync(vv -> createDelayFuture(hackDelayMillis), chunkedSegmentStorage.getExecutor())
.exceptionally(ex -> {
log.debug("{} read - exception op={}, segment={}, offset={}, bytesRead={}.",
chunkedSegmentStorage.getLogPrefix(), System.identityHashCode(this), handle.getSegmentName(), offset, totalBytesRead);
Expand Down Expand Up @@ -200,6 +206,10 @@ private CompletableFuture<Void> readData(MetadataTransaction txn) {
.thenCompose(v -> Futures.allOf(chunkReadFutures));
}

private CompletableFuture<Void> createDelayFuture(int millis) {
return Futures.delayedFuture(Duration.ofMillis(millis), (ScheduledExecutorService) chunkedSegmentStorage.getExecutor());
}

private CompletableFuture<Void> readChunk(String chunkName,
long fromOffset,
int bytesToRead,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@
import lombok.val;

import java.io.InputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -60,6 +62,7 @@ class WriteOperation implements Callable<CompletableFuture<Void>> {
private final long offset;
private final InputStream data;
private final int length;
private final int hackDelayMillis;
private final ChunkedSegmentStorage chunkedSegmentStorage;
private final long traceId;
private final Timer timer;
Expand All @@ -82,14 +85,16 @@ class WriteOperation implements Callable<CompletableFuture<Void>> {

private volatile boolean didSegmentLayoutChange = false;

WriteOperation(ChunkedSegmentStorage chunkedSegmentStorage, SegmentHandle handle, long offset, InputStream data, int length) {
WriteOperation(ChunkedSegmentStorage chunkedSegmentStorage, SegmentHandle handle, long offset, InputStream data, int length, int hackDelayMillis) {
this.handle = handle;
this.offset = offset;
this.data = data;
this.length = length;
this.chunkedSegmentStorage = chunkedSegmentStorage;
traceId = LoggerHelpers.traceEnter(log, "write", handle, offset, length);
timer = new Timer();
this.hackDelayMillis = hackDelayMillis;
log.info("inject write delay for {} ms", hackDelayMillis);
}

@Override
Expand Down Expand Up @@ -126,6 +131,8 @@ public CompletableFuture<Void> call() {
return getLastChunk(txn)
.thenComposeAsync(v ->
writeData(txn)
// HACK: delay a while before commit the metadata
.thenComposeAsync(vvv -> createDelayFuture(hackDelayMillis), chunkedSegmentStorage.getExecutor())
.thenComposeAsync(vv ->
commit(txn)
.thenApplyAsync(vvvv ->
Expand All @@ -148,6 +155,10 @@ private Object handleException(Throwable e) {
throw new CompletionException(ex);
}

private CompletableFuture<Void> createDelayFuture(int millis) {
return Futures.delayedFuture(Duration.ofMillis(millis), (ScheduledExecutorService) chunkedSegmentStorage.getExecutor());
}

private Object postCommit() {
// Post commit actions.
// Update the read index.
Expand Down