Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(interactive): handle the case when meta or statistics are not ready in GOpt #3986

Merged
merged 9 commits into from
Jul 2, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class DynamicIrMetaFetcher extends IrMetaFetcher implements AutoCloseable
private static final Logger logger = LoggerFactory.getLogger(DynamicIrMetaFetcher.class);
private final ScheduledExecutorService scheduler;
private volatile IrMetaStats currentState;
// To manage the state changes of statistics resulting from different update operations.
private volatile StatsState statsState;

public DynamicIrMetaFetcher(Configs configs, IrMetaReader dataReader, IrMetaTracker tracker) {
super(dataReader, tracker);
Expand All @@ -66,20 +68,23 @@ public Optional<IrMeta> fetch() {
private synchronized void syncMeta() {
try {
IrMeta meta = this.reader.readMeta();
GraphStatistics curStats;
// if the graph id is changed, we need to update the statistics
GraphStatistics curStats =
(this.currentState == null
|| !this.currentState.getGraphId().equals(meta.getGraphId()))
? null
: this.currentState.getStatistics();
if (this.currentState == null
|| !this.currentState.getGraphId().equals(meta.getGraphId())) {
this.statsState = StatsState.INITIALIZED;
curStats = null;
} else {
curStats = this.currentState.getStatistics();
}
this.currentState =
new IrMetaStats(
meta.getGraphId(),
meta.getSnapshotId(),
meta.getSchema(),
meta.getStoredProcedures(),
curStats);
if (this.currentState.getStatistics() == null) {
if (this.statsState != StatsState.SYNCED) {
syncStats();
}
} catch (Exception e) {
Expand All @@ -101,15 +106,30 @@ private synchronized void syncStats() {
if (tracker != null) {
tracker.onChanged(this.currentState);
}
this.statsState = StatsState.SYNCED;
}
}
} catch (Exception e) {
logger.warn("failed to read graph statistics, error is {}", e);
} finally {
if (this.currentState != null
&& tracker != null
&& this.statsState == StatsState.INITIALIZED) {
tracker.onChanged(this.currentState);
this.statsState = StatsState.MOCKED;
}
}
}

@Override
public void close() throws Exception {
this.scheduler.shutdown();
}

public enum StatsState {
INITIALIZED, // first initialized or graph id changed
MOCKED, // the switch can only occur from the INITIALIZED state. If remote statistics are
// unavailable, a mocked statistics object is created once.
SYNCED // remote stats is synced
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,25 @@ public GlogueSchema(
this.edgeTypeCardinality = edgeTypeCardinality;
}

public GlogueSchema(GraphSchema graphSchema) {
schemaGraph = new DirectedPseudograph<Integer, EdgeTypeId>(EdgeTypeId.class);
vertexTypeCardinality = new HashMap<Integer, Double>();
edgeTypeCardinality = new HashMap<EdgeTypeId, Double>();
for (GraphVertex vertex : graphSchema.getVertexList()) {
schemaGraph.addVertex(vertex.getLabelId());
vertexTypeCardinality.put(vertex.getLabelId(), 1.0);
}
for (GraphEdge edge : graphSchema.getEdgeList()) {
for (EdgeRelation relation : edge.getRelationList()) {
int sourceType = relation.getSource().getLabelId();
int targetType = relation.getTarget().getLabelId();
EdgeTypeId edgeType = new EdgeTypeId(sourceType, targetType, edge.getLabelId());
schemaGraph.addEdge(sourceType, targetType, edgeType);
edgeTypeCardinality.put(edgeType, 1.0);
}
}
}

public GlogueSchema(GraphSchema graphSchema, GraphStatistics statistics) {
schemaGraph = new DirectedPseudograph<Integer, EdgeTypeId>(EdgeTypeId.class);
vertexTypeCardinality = new HashMap<Integer, Double>();
Expand All @@ -62,6 +81,8 @@ public GlogueSchema(GraphSchema graphSchema, GraphStatistics statistics) {
if (vertexTypeCount == null) {
throw new IllegalArgumentException(
"Vertex type count not found for vertex type: " + vertex.getLabelId());
} else if (vertexTypeCount == 0) {
vertexTypeCardinality.put(vertex.getLabelId(), 1.0);
} else {
vertexTypeCardinality.put(vertex.getLabelId(), vertexTypeCount.doubleValue());
}
Expand All @@ -80,6 +101,8 @@ public GlogueSchema(GraphSchema graphSchema, GraphStatistics statistics) {
if (edgeTypeCount == null) {
throw new IllegalArgumentException(
"Edge type count not found for edge type: " + edge.getLabelId());
} else if (edgeTypeCount == 0) {
edgeTypeCardinality.put(edgeType, 1.0);
} else {
edgeTypeCardinality.put(edgeType, edgeTypeCount.doubleValue());
}
Expand All @@ -88,7 +111,13 @@ public GlogueSchema(GraphSchema graphSchema, GraphStatistics statistics) {
}

public static GlogueSchema fromMeta(IrMetaStats irMeta) {
return new GlogueSchema(irMeta.getSchema(), irMeta.getStatistics());
if (irMeta.getStatistics() == null) {
// build a default GlogueSchema by assuming all vertex and edge types have the same
// cardinality 1.0
return new GlogueSchema(irMeta.getSchema());
} else {
return new GlogueSchema(irMeta.getSchema(), irMeta.getStatistics());
}
}

public List<Integer> getVertexTypes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,7 @@
// hack implementation here: rpc client is the old way to submit job (gremlin -> traversal
// -> ir_core -> pegasus), we should remove it after replacing it with gremlin-calcite
// stack.
if (FrontendConfig.GREMLIN_SCRIPT_LANGUAGE_NAME
.get(this.configs)
.equals(AntlrGremlinScriptEngineFactory.LANGUAGE_NAME)
&& FrontendConfig.ENGINE_TYPE.get(this.configs).equals("pegasus")) {
if (FrontendConfig.ENGINE_TYPE.get(this.configs).equals("pegasus")) {
this.rpcClient = new RpcClient(fetcher.fetch());
} else {
this.rpcClient = null;
Expand All @@ -145,159 +142,167 @@
initMetrics();
}

@Override
protected void evalOpInternal(
final Context ctx,
final Supplier<GremlinExecutor> gremlinExecutorSupplier,
final AbstractEvalOpProcessor.BindingSupplier bindingsSupplier) {
RequestMessage msg = ctx.getRequestMessage();
GremlinExecutor gremlinExecutor = gremlinExecutorSupplier.get();
Map<String, Object> args = msg.getArgs();
String script = (String) args.get("gremlin");

String defaultValidateQuery = "''";
// ad-hoc handling for connection validation
if (script.equals(defaultValidateQuery)) {
ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SUCCESS).create());
return;
}
BigInteger jobId = idGenerator.generateId();
String jobName = idGenerator.generateName(jobId);
String language = FrontendConfig.GREMLIN_SCRIPT_LANGUAGE_NAME.get(configs);
IrMeta irMeta = metaQueryCallback.beforeExec();
// If the current graph schema is empty (as service startup can occur before data loading in
// Groot), we temporarily switch to the original IR core.
// In the future, once schema-free support is implemented, we will replace this temporary
// solution.
if (irMeta.getSchema().getVertexList().isEmpty()
&& irMeta.getSchema().getEdgeList().isEmpty()) {
language = AntlrGremlinScriptEngineFactory.LANGUAGE_NAME;
}
QueryStatusCallback statusCallback = createQueryStatusCallback(script, jobId);
QueryTimeoutConfig timeoutConfig = new QueryTimeoutConfig(ctx.getRequestTimeout());
String language = FrontendConfig.GREMLIN_SCRIPT_LANGUAGE_NAME.get(configs);
GremlinExecutor.LifeCycle lifeCycle;
switch (language) {
case AntlrGremlinScriptEngineFactory.LANGUAGE_NAME:
lifeCycle =
createLifeCycle(
ctx,
gremlinExecutorSupplier,
bindingsSupplier,
irMeta,
statusCallback,
timeoutConfig);
break;
case GremlinCalciteScriptEngineFactory.LANGUAGE_NAME:
lifeCycle =
new LifeCycleSupplier(
configs,
ctx,
queryCache,
executionClient,
jobId,
jobName,
irMeta,
statusCallback,
timeoutConfig)
.get();
break;
default:
throw new IllegalArgumentException("invalid script language name: " + language);
}
try {
CompletableFuture<Object> evalFuture =
gremlinExecutor.eval(script, language, new SimpleBindings(), lifeCycle);
evalFuture.handle(
(v, t) -> {
metaQueryCallback.afterExec(irMeta);
// TimeoutException has been handled in ResultProcessor, skip it here
if (t != null && !(t instanceof TimeoutException)) {
statusCallback.onEnd(false, t.getMessage());
Optional<Throwable> possibleTemporaryException =
determineIfTemporaryException(t);
if (possibleTemporaryException.isPresent()) {
ctx.writeAndFlush(
ResponseMessage.build(msg)
.code(ResponseStatusCode.SERVER_ERROR_TEMPORARY)
.statusMessage(
((Throwable)
possibleTemporaryException
.get())
.getMessage())
.statusAttributeException(
(Throwable)
possibleTemporaryException.get())
.create());
} else if (t instanceof OpProcessorException) {
ctx.writeAndFlush(((OpProcessorException) t).getResponseMessage());
} else {
String errorMessage;
if (t instanceof TimedInterruptTimeoutException) {
errorMessage =
String.format(
"A timeout occurred within the script during"
+ " evaluation of [%s] - consider"
+ " increasing the limit given to"
+ " TimedInterruptCustomizerProvider",
msg);
statusCallback.getQueryLogger().warn(errorMessage);
ctx.writeAndFlush(
ResponseMessage.build(msg)
.code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
.statusMessage(
"Timeout during script evaluation"
+ " triggered by"
+ " TimedInterruptCustomizerProvider")
.statusAttributeException(t)
.create());
} else if (t instanceof MultipleCompilationErrorsException
&& t.getMessage().contains("Method too large")
&& ((MultipleCompilationErrorsException) t)
.getErrorCollector()
.getErrorCount()
== 1) {
errorMessage =
String.format(
"The Gremlin statement that was submitted"
+ " exceeds the maximum compilation size"
+ " allowed by the JVM, please split it"
+ " into multiple smaller statements - %s",
msg);
statusCallback.getQueryLogger().warn(errorMessage);
ctx.writeAndFlush(
ResponseMessage.build(msg)
.code(
ResponseStatusCode
.SERVER_ERROR_EVALUATION)
.statusMessage(errorMessage)
.statusAttributeException(t)
.create());
} else {
errorMessage =
t.getMessage() == null ? t.toString() : t.getMessage();
statusCallback
.getQueryLogger()
.warn(
String.format(
"Exception processing a script on"
+ " request [%s].",
msg),
t);
ctx.writeAndFlush(
ResponseMessage.build(msg)
.code(
ResponseStatusCode
.SERVER_ERROR_EVALUATION)
.statusMessage(errorMessage)
.statusAttributeException(t)
.create());
}
}
}
return null;
});
} catch (RejectedExecutionException var17) {
statusCallback.getQueryLogger().error(var17.getMessage());
ctx.writeAndFlush(
ResponseMessage.build(msg)
.code(ResponseStatusCode.TOO_MANY_REQUESTS)
.statusMessage(var17.getMessage())
.create());
}
}

Check notice on line 305 in interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java

View check run for this annotation

codefactor.io / CodeFactor

interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java#L145-L305

Complex Method
protected QueryStatusCallback createQueryStatusCallback(String query, BigInteger queryId) {
return new QueryStatusCallback(
new MetricsCollector(evalOpTimer), queryHistogram, new QueryLogger(query, queryId));
Expand Down
Loading