Skip to content

Commit

Permalink
JCBC-2170 (1/n) Don't switch schedulers when converting reactive quer…
Browse files Browse the repository at this point in the history
…y result

Motivation
----------
Groundwork for asserting reactive results are published
on the user's custom Scheduler.

Modifications
-------------
In reactive returnQueryResult(), build a fully non-blocking
reactive chain instead of switching to bounded elastic scheduler
and blocking.

Rework the relevant utility methods in ContentAsUtil to support
the reactive chain.

Change-Id: Ifab9e60ef71f8c6e8011bf9ac303eea7fdddcb92
Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/218707
Tested-by: Build Bot <[email protected]>
Reviewed-by: Michael Reiche <[email protected]>
Reviewed-by: Graham Pople <[email protected]>
  • Loading branch information
dnault committed Oct 30, 2024
1 parent 0c0762a commit 9dbd436
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1020,20 +1020,13 @@ public static void populateResult(com.couchbase.client.protocol.sdk.query.Comman

var builder = com.couchbase.client.protocol.sdk.query.QueryResult.newBuilder();

var content = ContentAsUtil.contentTypeList(request.getContentAs(),
() -> values.rowsAs(byte[].class),
() -> values.rowsAs(String.class),
() -> values.rowsAs(JsonObject.class),
() -> values.rowsAs(JsonArray.class),
() -> values.rowsAs(Boolean.class),
() -> values.rowsAs(Integer.class),
() -> values.rowsAs(Double.class));

if (content.isFailure()) {
throw content.exception();
}
var contentAs = request.getContentAs();
var rowType = ContentAsUtil.toJavaClass(contentAs);
var content = values.rowsAs(rowType).stream()
.map(row -> ContentAsUtil.toFitContent(row, contentAs))
.toList();

builder.addAllContent(content.value());
builder.addAllContent(content);

// Metadata
var convertedMetaData = convertMetaData(values.metaData());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import com.couchbase.client.core.cnc.RequestSpan;
import com.couchbase.client.java.ReactiveCollection;
import com.couchbase.client.java.json.JsonArray;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.kv.*;
import com.couchbase.client.java.query.ReactiveQueryResult;
import com.couchbase.client.java.kv.GetResult;
Expand Down Expand Up @@ -58,7 +56,6 @@
import com.couchbase.utils.ClusterConnection;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -687,37 +684,25 @@ protected Exception convertException(Throwable raw) {
return convertExceptionShared(raw);
}

private Mono<Result> returnQueryResult(com.couchbase.client.protocol.sdk.query.Command request, Mono<ReactiveQueryResult> queryResult, Result.Builder result, Long start) {
return queryResult.publishOn(Schedulers.boundedElastic()).map(r -> {
result.setElapsedNanos(System.nanoTime() - start);

var builder = com.couchbase.client.protocol.sdk.query.QueryResult.newBuilder();

// FIT only supports testing blocking (not streaming) queries currently, so the .block() here to gather
// the rows is fine.
var content = ContentAsUtil.contentTypeList(request.getContentAs(),
() -> r.rowsAs(byte[].class).collectList().block(),
() -> r.rowsAs(String.class).collectList().block(),
() -> r.rowsAs(JsonObject.class).collectList().block(),
() -> r.rowsAs(JsonArray.class).collectList().block(),
() -> r.rowsAs(Boolean.class).collectList().block(),
() -> r.rowsAs(Integer.class).collectList().block(),
() -> r.rowsAs(Double.class).collectList().block());

if (content.isFailure()) {
throw content.exception();
}

builder.addAllContent(content.value());

// Metadata
var convertedMetaData = convertMetaData(r.metaData().block());
builder.setMetaData(convertedMetaData);

result.setSdk(com.couchbase.client.protocol.sdk.Result.newBuilder()
.setQueryResult(builder));
private Mono<Result> returnQueryResult(com.couchbase.client.protocol.sdk.query.Command request, Mono<ReactiveQueryResult> queryResult, Result.Builder result, Long start) {
return queryResult.flatMap(r -> {
result.setElapsedNanos(System.nanoTime() - start);

return result.build();
});
}
var contentAs = request.getContentAs();
var rowType = ContentAsUtil.toJavaClass(contentAs);
return r.rowsAs(rowType)
.map(it -> ContentAsUtil.toFitContent(it, contentAs))
.collectList()
.zipWith(r.metaData().map(JavaSdkCommandExecutor::convertMetaData))
.map(fitRowsAndMetadata -> {
var fitQueryResult = com.couchbase.client.protocol.sdk.query.QueryResult.newBuilder()
.addAllContent(fitRowsAndMetadata.getT1())
.setMetaData(fitRowsAndMetadata.getT2());

result.setSdk(com.couchbase.client.protocol.sdk.Result.newBuilder()
.setQueryResult(fitQueryResult));
return result.build();
});
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.couchbase.client.protocol.shared.ContentAs;
import com.couchbase.client.protocol.shared.ContentTypes;
import com.google.protobuf.ByteString;
import reactor.util.annotation.Nullable;

import java.util.List;
import java.util.function.Supplier;

public class ContentAsUtil {
Expand Down Expand Up @@ -91,62 +91,38 @@ public static Try<ContentTypes> contentType(ContentAs contentAs,
}
}

public static Try<List<ContentTypes>> contentTypeList(ContentAs contentAs,
Supplier<List<byte[]>> asByteArray,
Supplier<List<String>> asString,
Supplier<List<JsonObject>> asJsonObject,
Supplier<List<JsonArray>> asJsonArray,
Supplier<List<Boolean>> asBoolean,
Supplier<List<Integer>> asInteger,
Supplier<List<Double>> asDouble) {
try {
if (contentAs.hasAsByteArray()) {
return new Try<>(asByteArray.get().stream()
.map(v -> v != null
? ContentTypes.newBuilder().setContentAsBytes(ByteString.copyFrom(v)).build()
: getNullContentType().value())
.toList());
} else if (contentAs.hasAsString()) {
return new Try<>(asString.get().stream()
.map(v -> v != null
? ContentTypes.newBuilder().setContentAsString(v).build()
: getNullContentType().value()).toList());
} else if (contentAs.hasAsJsonObject()) {
return new Try<>(asJsonObject.get().stream()
.map(v -> v != null
? ContentTypes.newBuilder().setContentAsBytes(ByteString.copyFrom(v.toBytes())).build()
: getNullContentType().value())
.toList());
} else if (contentAs.hasAsJsonArray()) {
return new Try<>(asJsonArray.get().stream()
.map(v -> v != null
? ContentTypes.newBuilder().setContentAsBytes(ByteString.copyFrom(v.toBytes())).build()
: getNullContentType().value())
.toList());
} else if (contentAs.getAsBoolean()) {
return new Try<>(asBoolean.get().stream()
.map(v -> v != null
? ContentTypes.newBuilder().setContentAsBool(v).build()
: getNullContentType().value())
.toList());
} else if (contentAs.hasAsInteger()) {
return new Try<>(asInteger.get().stream()
.map(v -> v != null
? ContentTypes.newBuilder().setContentAsInt64(v).build()
: getNullContentType().value())
.toList());
} else if (contentAs.hasAsFloatingPoint()) {
return new Try<>(asDouble.get().stream()
.map(v -> v != null
?ContentTypes.newBuilder().setContentAsDouble(v).build()
: getNullContentType().value())
.toList());
} else {
throw new UnsupportedOperationException("Java performer cannot handle contentAs " + contentAs.toString());
}
} catch (RuntimeException err) {
return new Try<>(err);
public static Class<?> toJavaClass(ContentAs contentAs) {
return switch (contentAs.getAsCase()) {
case AS_STRING -> String.class;
case AS_BYTE_ARRAY -> byte[].class;
case AS_JSON_OBJECT -> JsonObject.class;
case AS_JSON_ARRAY -> JsonArray.class;
case AS_BOOLEAN -> Boolean.class;
case AS_INTEGER -> Integer.class;
case AS_FLOATING_POINT -> Double.class;

default -> throw new UnsupportedOperationException("Java performer cannot handle contentAs " + contentAs);
};
}

public static ContentTypes toFitContent(@Nullable Object value, ContentAs contentAs) {
ContentTypes.Builder builder = ContentTypes.newBuilder();

if (value == null) return builder.setContentAsNull(ContentTypes.NullValue.getDefaultInstance()).build();

switch (contentAs.getAsCase()) {
case AS_STRING -> builder.setContentAsString((String) value);
case AS_BYTE_ARRAY -> builder.setContentAsBytes(ByteString.copyFrom((byte[]) value));
case AS_JSON_OBJECT -> builder.setContentAsBytes(ByteString.copyFrom(((JsonObject) value).toBytes()));
case AS_JSON_ARRAY -> builder.setContentAsBytes(ByteString.copyFrom(((JsonArray) value).toBytes()));
case AS_BOOLEAN -> builder.setContentAsBool((Boolean) value);
case AS_INTEGER -> builder.setContentAsInt64((Integer) value);
case AS_FLOATING_POINT -> builder.setContentAsDouble((Double) value);

default -> throw new UnsupportedOperationException("Java performer cannot handle contentAs " + contentAs);
}

return builder.build();
}

public static byte[] convert(ContentTypes content) {
Expand Down

0 comments on commit 9dbd436

Please sign in to comment.