Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics to Scalar DB Server #245

Merged
merged 2 commits into from
Jul 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ dependencies {
implementation(group: 'com.scalar-labs', name: 'scalar-admin', version: '1.0.0') {
exclude group: 'io.grpc'
}
implementation group: 'io.dropwizard.metrics', name: 'metrics-core', version: '4.2.2'
implementation group: 'io.dropwizard.metrics', name: 'metrics-jmx', version: '4.2.2'
testImplementation project(':core').sourceSets.integrationTest.output
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ public class DistributedStorageAdminService
LoggerFactory.getLogger(DistributedStorageAdminService.class);

private final DistributedStorageAdmin admin;
private final Metrics metrics;

@Inject
public DistributedStorageAdminService(DistributedStorageAdmin admin) {
public DistributedStorageAdminService(DistributedStorageAdmin admin, Metrics metrics) {
this.admin = admin;
this.metrics = metrics;
}

@Override
Expand All @@ -42,7 +44,8 @@ public void createTable(CreateTableRequest request, StreamObserver<Empty> respon
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
},
responseObserver);
responseObserver,
"createTable");
}

@Override
Expand All @@ -53,7 +56,8 @@ public void dropTable(DropTableRequest request, StreamObserver<Empty> responseOb
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
},
responseObserver);
responseObserver,
"dropTable");
}

@Override
Expand All @@ -64,7 +68,8 @@ public void truncateTable(TruncateTableRequest request, StreamObserver<Empty> re
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
},
responseObserver);
responseObserver,
"truncateTable");
}

@Override
Expand All @@ -81,13 +86,14 @@ public void getTableMetadata(
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
},
responseObserver);
responseObserver,
"getTableMetadata");
}

private static void execute(
ThrowableRunnable<Throwable> runnable, StreamObserver<?> responseObserver) {
private void execute(
ThrowableRunnable<Throwable> runnable, StreamObserver<?> responseObserver, String method) {
try {
runnable.run();
metrics.measure(DistributedStorageAdminService.class, method, runnable);
} catch (IllegalArgumentException | IllegalStateException e) {
responseObserver.onError(
Status.INVALID_ARGUMENT.withDescription(e.getMessage()).asRuntimeException());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ public class DistributedStorageService extends DistributedStorageGrpc.Distribute

private final DistributedStorage storage;
private final Pauser pauser;
private final Metrics metrics;

@Inject
public DistributedStorageService(DistributedStorage storage, Pauser pauser) {
public DistributedStorageService(DistributedStorage storage, Pauser pauser, Metrics metrics) {
this.storage = storage;
this.pauser = pauser;
this.metrics = metrics;
}

@Override
Expand All @@ -54,12 +56,14 @@ public void get(GetRequest request, StreamObserver<GetResponse> responseObserver
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
},
responseObserver);
responseObserver,
"get");
}

@Override
public StreamObserver<ScanRequest> scan(StreamObserver<ScanResponse> responseObserver) {
return new ScanStreamObserver(storage, responseObserver, this::preProcess, this::postProcess);
return new ScanStreamObserver(
storage, responseObserver, metrics, this::preProcess, this::postProcess);
}

@Override
Expand All @@ -74,17 +78,19 @@ public void mutate(MutateRequest request, StreamObserver<Empty> responseObserver
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
},
responseObserver);
responseObserver,
"mutate");
}

private void execute(ThrowableRunnable<Throwable> runnable, StreamObserver<?> responseObserver) {
private void execute(
ThrowableRunnable<Throwable> runnable, StreamObserver<?> responseObserver, String method) {
if (!preProcess(responseObserver)) {
// Unavailable
return;
}

try {
runnable.run();
metrics.measure(DistributedStorageService.class, method, runnable);
} catch (IllegalArgumentException | IllegalStateException e) {
responseObserver.onError(
Status.INVALID_ARGUMENT.withDescription(e.getMessage()).asRuntimeException());
Expand Down Expand Up @@ -124,6 +130,7 @@ private static class ScanStreamObserver implements StreamObserver<ScanRequest> {

private final DistributedStorage storage;
private final StreamObserver<ScanResponse> responseObserver;
private final Metrics metrics;
private final Function<StreamObserver<?>, Boolean> preProcessor;
private final Runnable postProcessor;
private final AtomicBoolean preProcessed = new AtomicBoolean();
Expand All @@ -134,10 +141,12 @@ private static class ScanStreamObserver implements StreamObserver<ScanRequest> {
public ScanStreamObserver(
DistributedStorage storage,
StreamObserver<ScanResponse> responseObserver,
Metrics metrics,
Function<StreamObserver<?>, Boolean> preProcessor,
Runnable postProcessor) {
this.storage = storage;
this.responseObserver = responseObserver;
this.metrics = metrics;
this.preProcessor = preProcessor;
this.postProcessor = postProcessor;
}
Expand All @@ -157,25 +166,21 @@ public void onNext(ScanRequest request) {
return;
}
if (!openScanner(request)) {
// failed to open a scanner
return;
}
} else if (request.hasScan()) {
respondInvalidArgumentError("scanner has already been opened. Don't specify a Scan object");
return;
}

Iterator<Result> resultIterator = scanner.iterator();
List<Result> results =
fetch(
resultIterator,
request.hasFetchCount() ? request.getFetchCount() : DEFAULT_SCAN_FETCH_COUNT);
boolean hasMoreResults = resultIterator.hasNext();

ScanResponse.Builder builder = ScanResponse.newBuilder();
results.forEach(r -> builder.addResult(ProtoUtil.toResult(r)));
ScanResponse response = builder.setHasMoreResults(hasMoreResults).build();
ScanResponse response = next(request);
if (response == null) {
// failed to next
return;
}

if (hasMoreResults) {
if (response.getHasMoreResults()) {
responseObserver.onNext(response);
} else {
// cleans up and completes this stream if no more results
Expand All @@ -187,34 +192,49 @@ public void onNext(ScanRequest request) {

private boolean openScanner(ScanRequest request) {
try {
Scan scan = ProtoUtil.toScan(request.getScan());
scanner = storage.scan(scan);
metrics.measure(
DistributedStorageService.class,
"scan.openScanner",
() -> {
Scan scan = ProtoUtil.toScan(request.getScan());
scanner = storage.scan(scan);
});
return true;
} catch (IllegalArgumentException e) {
respondInvalidArgumentError(e.getMessage());
return false;
} catch (Throwable t) {
LOGGER.error("an internal error happened when opening a scanner", t);
respondInternalError(t.getMessage());
if (t instanceof Error) {
throw (Error) t;
}
return false;
}
return false;
}

@Override
public void onError(Throwable t) {
LOGGER.error("an error received", t);
cleanUp();
}

@Override
public void onCompleted() {
if (!cleanedUp.get()) {
cleanUp();
responseObserver.onCompleted();
private ScanResponse next(ScanRequest request) {
try {
return metrics.measure(
DistributedStorageService.class,
"scan.next",
() -> {
Iterator<Result> resultIterator = scanner.iterator();
List<Result> results =
fetch(
resultIterator,
request.hasFetchCount() ? request.getFetchCount() : DEFAULT_SCAN_FETCH_COUNT);
ScanResponse.Builder builder = ScanResponse.newBuilder();
results.forEach(r -> builder.addResult(ProtoUtil.toResult(r)));
return builder.setHasMoreResults(resultIterator.hasNext()).build();
});
} catch (Throwable t) {
LOGGER.error("an internal error happened during the execution", t);
respondInternalError(t.getMessage());
if (t instanceof Error) {
throw (Error) t;
}
}
return null;
}

private List<Result> fetch(Iterator<Result> resultIterator, int fetchCount) {
Expand All @@ -228,6 +248,20 @@ private List<Result> fetch(Iterator<Result> resultIterator, int fetchCount) {
return results;
}

@Override
public void onError(Throwable t) {
LOGGER.error("an error received", t);
cleanUp();
}

@Override
public void onCompleted() {
if (!cleanedUp.get()) {
cleanUp();
responseObserver.onCompleted();
}
}

private void cleanUp() {
try {
if (scanner != null) {
Expand Down
Loading