Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(interactive): block service until graph schema is synced #3280

Merged
merged 1 commit into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions interactive_engine/assembly/src/conf/groot/logback.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
<configuration scan="true" scanPeriod="30 seconds">
<define name="hostname" class="ch.qos.logback.core.property.CanonicalHostNamePropertyDefiner"/>
<property name="log_dir" value="${log.dir:-/var/log/graphscope}"/>
<property name="log_name" value="${log.name:-groot}"/>
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
Expand All @@ -11,12 +10,12 @@
<totalSizeCap>1GB</totalSizeCap>
</rollingPolicy>
<encoder>
<pattern>[%d{ISO8601}][%p][%t][${hostname}][%c:%L] %m%n</pattern>
<pattern>[%d{ISO8601}][%p][%t][%c:%L] %m%n</pattern>
</encoder>
</appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%d{ISO8601}][%p][%t][${hostname}][%c:%L] %m%n</pattern>
<pattern>[%d{ISO8601}][%p][%t][%c:%L] %m%n</pattern>
</encoder>
</appender>
<appender name="Metric" class="ch.qos.logback.core.rolling.RollingFileAppender">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@
import io.grpc.netty.NettyServerBuilder;

import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;

public class Frontend extends NodeBase {
private static final Logger logger = LoggerFactory.getLogger(Frontend.class);

private CuratorFramework curator;
private NodeDiscovery discovery;
Expand All @@ -63,6 +66,8 @@ public class Frontend extends NodeBase {
private ClientService clientService;
private AbstractService graphService;

private SnapshotCache snapshotCache;

public Frontend(Configs configs) {
super(configs);
configs = reConfig(configs);
Expand All @@ -75,13 +80,9 @@ public Frontend(Configs configs) {
}
NameResolver.Factory nameResolverFactory = new GrootNameResolverFactory(this.discovery);
this.channelManager = new ChannelManager(configs, nameResolverFactory);
SnapshotCache snapshotCache = new SnapshotCache();
this.metaService = new DefaultMetaService(configs);
MetricsCollector metricsCollector = new MetricsCollector(configs);
RoleClients<IngestorWriteClient> ingestorWriteClients =
new RoleClients<>(this.channelManager, RoleType.INGESTOR, IngestorWriteClient::new);
FrontendSnapshotService frontendSnapshotService =
new FrontendSnapshotService(snapshotCache);

snapshotCache = new SnapshotCache();

RoleClients<MetricsCollectClient> frontendMetricsCollectClients =
new RoleClients<>(
this.channelManager, RoleType.FRONTEND, MetricsCollectClient::new);
Expand All @@ -96,18 +97,21 @@ public Frontend(Configs configs) {
frontendMetricsCollectClients,
ingestorMetricsCollectClients,
storeMetricsCollectClients);

StoreIngestor storeIngestClients =
new StoreIngestClients(this.channelManager, RoleType.STORE, StoreIngestClient::new);
SchemaWriter schemaWriter =
new SchemaWriter(
new RoleClients<>(
this.channelManager, RoleType.COORDINATOR, SchemaClient::new));
DdlExecutors ddlExecutors = new DdlExecutors();

BatchDdlClient batchDdlClient =
new BatchDdlClient(ddlExecutors, snapshotCache, schemaWriter);
new BatchDdlClient(new DdlExecutors(), snapshotCache, schemaWriter);
StoreStateFetcher storeStateClients =
new StoreStateClients(this.channelManager, RoleType.STORE, StoreStateClient::new);

this.metaService = new DefaultMetaService(configs);

this.clientService =
new ClientService(
snapshotCache,
Expand All @@ -116,26 +120,35 @@ public Frontend(Configs configs) {
this.metaService,
batchDdlClient,
storeStateClients);
GrootDdlService clientDdlService = new GrootDdlService(snapshotCache, batchDdlClient);

FrontendSnapshotService frontendSnapshotService =
new FrontendSnapshotService(snapshotCache);

MetricsCollector metricsCollector = new MetricsCollector(configs);
MetricsCollectService metricsCollectService = new MetricsCollectService(metricsCollector);
WriteSessionGenerator writeSessionGenerator = new WriteSessionGenerator(configs);
this.rpcServer =
new RpcServer(
configs, localNodeProvider, frontendSnapshotService, metricsCollectService);

GrootDdlService clientDdlService = new GrootDdlService(snapshotCache, batchDdlClient);

EdgeIdGenerator edgeIdGenerator = new DefaultEdgeIdGenerator(configs, this.channelManager);
RoleClients<IngestorWriteClient> ingestorWriteClients =
new RoleClients<>(this.channelManager, RoleType.INGESTOR, IngestorWriteClient::new);
GraphWriter graphWriter =
new GraphWriter(
snapshotCache,
edgeIdGenerator,
this.metaService,
ingestorWriteClients,
metricsCollector);
WriteSessionGenerator writeSessionGenerator = new WriteSessionGenerator(configs);
ClientWriteService clientWriteService =
new ClientWriteService(writeSessionGenerator, graphWriter);

RoleClients<BackupClient> backupClients =
new RoleClients<>(this.channelManager, RoleType.COORDINATOR, BackupClient::new);
ClientBackupService clientBackupService = new ClientBackupService(backupClients);
this.rpcServer =
new RpcServer(
configs, localNodeProvider, frontendSnapshotService, metricsCollectService);

this.serviceServer =
buildServiceServer(
configs,
Expand Down Expand Up @@ -182,6 +195,15 @@ public void start() {
}
this.discovery.start();
this.channelManager.start();

while (snapshotCache.getSnapshotWithSchema().getGraphDef() == null) {
try {
Thread.sleep(1000);
logger.info("Waiting for schema ready...");
} catch (InterruptedException e) {
throw new GrootException(e);
}
}
this.graphService.start();
try {
this.serviceServer.start();
Expand Down