Skip to content

Commit

Permalink
Add metrics to Scalar DB Server (#245)
Browse files Browse the repository at this point in the history
  • Loading branch information
brfrn169 authored Jul 15, 2021
1 parent 1d21091 commit de34596
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 62 deletions.
2 changes: 2 additions & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ dependencies {
implementation(group: 'com.scalar-labs', name: 'scalar-admin', version: '1.0.0') {
exclude group: 'io.grpc'
}
implementation group: 'io.dropwizard.metrics', name: 'metrics-core', version: '4.2.2'
implementation group: 'io.dropwizard.metrics', name: 'metrics-jmx', version: '4.2.2'
testImplementation project(':core').sourceSets.integrationTest.output
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ public class DistributedStorageAdminService
LoggerFactory.getLogger(DistributedStorageAdminService.class);

private final DistributedStorageAdmin admin;
private final Metrics metrics;

@Inject
public DistributedStorageAdminService(DistributedStorageAdmin admin) {
public DistributedStorageAdminService(DistributedStorageAdmin admin, Metrics metrics) {
this.admin = admin;
this.metrics = metrics;
}

@Override
Expand All @@ -42,7 +44,8 @@ public void createTable(CreateTableRequest request, StreamObserver<Empty> respon
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
},
responseObserver);
responseObserver,
"createTable");
}

@Override
Expand All @@ -53,7 +56,8 @@ public void dropTable(DropTableRequest request, StreamObserver<Empty> responseOb
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
},
responseObserver);
responseObserver,
"dropTable");
}

@Override
Expand All @@ -64,7 +68,8 @@ public void truncateTable(TruncateTableRequest request, StreamObserver<Empty> re
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
},
responseObserver);
responseObserver,
"truncateTable");
}

@Override
Expand All @@ -81,13 +86,14 @@ public void getTableMetadata(
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
},
responseObserver);
responseObserver,
"getTableMetadata");
}

private static void execute(
ThrowableRunnable<Throwable> runnable, StreamObserver<?> responseObserver) {
private void execute(
ThrowableRunnable<Throwable> runnable, StreamObserver<?> responseObserver, String method) {
try {
runnable.run();
metrics.measure(DistributedStorageAdminService.class, method, runnable);
} catch (IllegalArgumentException | IllegalStateException e) {
responseObserver.onError(
Status.INVALID_ARGUMENT.withDescription(e.getMessage()).asRuntimeException());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ public class DistributedStorageService extends DistributedStorageGrpc.Distribute

private final DistributedStorage storage;
private final Pauser pauser;
private final Metrics metrics;

@Inject
public DistributedStorageService(DistributedStorage storage, Pauser pauser) {
public DistributedStorageService(DistributedStorage storage, Pauser pauser, Metrics metrics) {
this.storage = storage;
this.pauser = pauser;
this.metrics = metrics;
}

@Override
Expand All @@ -54,12 +56,14 @@ public void get(GetRequest request, StreamObserver<GetResponse> responseObserver
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
},
responseObserver);
responseObserver,
"get");
}

@Override
public StreamObserver<ScanRequest> scan(StreamObserver<ScanResponse> responseObserver) {
return new ScanStreamObserver(storage, responseObserver, this::preProcess, this::postProcess);
return new ScanStreamObserver(
storage, responseObserver, metrics, this::preProcess, this::postProcess);
}

@Override
Expand All @@ -74,17 +78,19 @@ public void mutate(MutateRequest request, StreamObserver<Empty> responseObserver
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
},
responseObserver);
responseObserver,
"mutate");
}

private void execute(ThrowableRunnable<Throwable> runnable, StreamObserver<?> responseObserver) {
private void execute(
ThrowableRunnable<Throwable> runnable, StreamObserver<?> responseObserver, String method) {
if (!preProcess(responseObserver)) {
// Unavailable
return;
}

try {
runnable.run();
metrics.measure(DistributedStorageService.class, method, runnable);
} catch (IllegalArgumentException | IllegalStateException e) {
responseObserver.onError(
Status.INVALID_ARGUMENT.withDescription(e.getMessage()).asRuntimeException());
Expand Down Expand Up @@ -124,6 +130,7 @@ private static class ScanStreamObserver implements StreamObserver<ScanRequest> {

private final DistributedStorage storage;
private final StreamObserver<ScanResponse> responseObserver;
private final Metrics metrics;
private final Function<StreamObserver<?>, Boolean> preProcessor;
private final Runnable postProcessor;
private final AtomicBoolean preProcessed = new AtomicBoolean();
Expand All @@ -134,10 +141,12 @@ private static class ScanStreamObserver implements StreamObserver<ScanRequest> {
public ScanStreamObserver(
DistributedStorage storage,
StreamObserver<ScanResponse> responseObserver,
Metrics metrics,
Function<StreamObserver<?>, Boolean> preProcessor,
Runnable postProcessor) {
this.storage = storage;
this.responseObserver = responseObserver;
this.metrics = metrics;
this.preProcessor = preProcessor;
this.postProcessor = postProcessor;
}
Expand All @@ -157,25 +166,21 @@ public void onNext(ScanRequest request) {
return;
}
if (!openScanner(request)) {
// failed to open a scanner
return;
}
} else if (request.hasScan()) {
respondInvalidArgumentError("scanner has already been opened. Don't specify a Scan object");
return;
}

Iterator<Result> resultIterator = scanner.iterator();
List<Result> results =
fetch(
resultIterator,
request.hasFetchCount() ? request.getFetchCount() : DEFAULT_SCAN_FETCH_COUNT);
boolean hasMoreResults = resultIterator.hasNext();

ScanResponse.Builder builder = ScanResponse.newBuilder();
results.forEach(r -> builder.addResult(ProtoUtil.toResult(r)));
ScanResponse response = builder.setHasMoreResults(hasMoreResults).build();
ScanResponse response = next(request);
if (response == null) {
// failed to next
return;
}

if (hasMoreResults) {
if (response.getHasMoreResults()) {
responseObserver.onNext(response);
} else {
// cleans up and completes this stream if no more results
Expand All @@ -187,34 +192,49 @@ public void onNext(ScanRequest request) {

private boolean openScanner(ScanRequest request) {
try {
Scan scan = ProtoUtil.toScan(request.getScan());
scanner = storage.scan(scan);
metrics.measure(
DistributedStorageService.class,
"scan.openScanner",
() -> {
Scan scan = ProtoUtil.toScan(request.getScan());
scanner = storage.scan(scan);
});
return true;
} catch (IllegalArgumentException e) {
respondInvalidArgumentError(e.getMessage());
return false;
} catch (Throwable t) {
LOGGER.error("an internal error happened when opening a scanner", t);
respondInternalError(t.getMessage());
if (t instanceof Error) {
throw (Error) t;
}
return false;
}
return false;
}

@Override
public void onError(Throwable t) {
LOGGER.error("an error received", t);
cleanUp();
}

@Override
public void onCompleted() {
if (!cleanedUp.get()) {
cleanUp();
responseObserver.onCompleted();
private ScanResponse next(ScanRequest request) {
try {
return metrics.measure(
DistributedStorageService.class,
"scan.next",
() -> {
Iterator<Result> resultIterator = scanner.iterator();
List<Result> results =
fetch(
resultIterator,
request.hasFetchCount() ? request.getFetchCount() : DEFAULT_SCAN_FETCH_COUNT);
ScanResponse.Builder builder = ScanResponse.newBuilder();
results.forEach(r -> builder.addResult(ProtoUtil.toResult(r)));
return builder.setHasMoreResults(resultIterator.hasNext()).build();
});
} catch (Throwable t) {
LOGGER.error("an internal error happened during the execution", t);
respondInternalError(t.getMessage());
if (t instanceof Error) {
throw (Error) t;
}
}
return null;
}

private List<Result> fetch(Iterator<Result> resultIterator, int fetchCount) {
Expand All @@ -228,6 +248,20 @@ private List<Result> fetch(Iterator<Result> resultIterator, int fetchCount) {
return results;
}

@Override
public void onError(Throwable t) {
LOGGER.error("an error received", t);
cleanUp();
}

@Override
public void onCompleted() {
if (!cleanedUp.get()) {
cleanUp();
responseObserver.onCompleted();
}
}

private void cleanUp() {
try {
if (scanner != null) {
Expand Down
Loading

0 comments on commit de34596

Please sign in to comment.