Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(interactive): Support secondary instance of groot #3479

Merged
merged 20 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion charts/graphscope-store-one-pod/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,14 @@ data:
kafka.test.cluster.enable={{ not .Values.kafka.enabled }}

## Zk Config
zk.base.path=/graphscope/groot
zk.base.path={{ .Values.zkBasePath }}
zk.connect.string=ZK_CONNECT

## Secondary config
secondary.instance.enabled={{ .Values.secondary.enabled }}
store.data.secondary.path={{ .Values.secondary.storeDataPath }}
store.gc.interval.ms={{ .Values.storeGcIntervalMs }}

## Extra Config
{{- if .Values.extraConfig }}
{{- $config_list := regexSplit ";" .Values.extraConfig -1 }}
Expand Down
9 changes: 9 additions & 0 deletions charts/graphscope-store-one-pod/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,15 @@ storeDataPath: "/var/lib/graphscope-store"
storeWriteThreadCount: 1
storeQueueBufferSize: 102400

storeGcIntervalMs: 5000

## Kafka Config
kafkaTopic: "graphscope"
kafkaProducerCustomConfigs: ""

## Zk Config
zkBasePath: "/graphscope/groot"

## Key-value pair separated by ;
## For example extraConfig="k1=v1;k2=v2"
extraConfig: ""
Expand All @@ -385,3 +390,7 @@ pegasus:
worker:
num: 1
timeout: 240000

secondary:
enabled: false
storeDataPath: "./data_secondary"
9 changes: 5 additions & 4 deletions charts/graphscope-store/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ data:
dns.name.prefix.coordinator=COORDINATOR
dns.name.prefix.store=STORE

executor.graph.port=55556
executor.query.port=55557
executor.engine.port=55558

log4rs.config=LOG4RS_CONFIG

## GAIA Config
Expand All @@ -86,6 +82,11 @@ data:
pegasus.output.capacity=16
pegasus.hosts=PEGASUS_HOSTS

## Secondary config
secondary.instance.enabled={{ .Values.secondary.enabled }}
store.data.secondary.path={{ .Values.secondary.storeDataPath }}
store.gc.interval.ms={{ .Values.storeGcIntervalMs }}

## Extra Config
{{- if .Values.extraConfig }}
{{- $config_list := regexSplit ";" .Values.extraConfig -1 }}
Expand Down
6 changes: 6 additions & 0 deletions charts/graphscope-store/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ storeDataPath: "/var/lib/graphscope-store"
storeWriteThreadCount: 1
storeQueueBufferSize: 102400

storeGcIntervalMs: 5000

## Kafka Config
kafkaTopic: "graphscope"
kafkaProducerCustomConfigs: ""
Expand All @@ -535,3 +537,7 @@ pegasus:
worker:
num: 1
timeout: 240000

secondary:
enabled: false
storeDataPath: "./data_secondary"
12 changes: 12 additions & 0 deletions docs/storage_engine/groot.md
Original file line number Diff line number Diff line change
Expand Up @@ -678,3 +678,15 @@ The location of the logging configuration file in the container is:

- configuration file of `logback` is in `/usr/local/groot/conf/logback.xml`
- configuration file of `log4rs` is in `/usr/local/groot/conf/log4rs.yml`

### Secondary Instance

Groot support open secondary instance along with primary instances. It leverages the [Secondary Instance](https://github.com/facebook/rocksdb/wiki/Read-only-and-Secondary-instances) of RocksDB
to provide the ability to serve the querying requests as well as catching up the schema and data updates.

To use it, just set the `secondary.enabled=true` in the helm charts.
Also remember the data path, ZK connect string as well as Kafka endpoint and topic should be as same as the primary instance.
And use a different `zk.base.path` for each secondary instance to avoid conflict with each other when doing node discovery.

`storeGcIntervalMs` controls how often should the secondary perform a `try_catch_up_with_primary` call, default to `5000` which is 5 seconds.

2 changes: 1 addition & 1 deletion interactive_engine/assembly/src/bin/groot/store_ctl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ start_server() {
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=4
-XX:GCLogFileSize=64m"

export RUST_BACKTRACE=full
java ${java_opt} \
-Dlogback.configurationFile="${GROOT_LOGBACK_FILE}" \
-Dconfig.file="${GROOT_CONF_FILE}" \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,10 @@
public enum RoleType {
UNKNOWN("unknown"),
FRONTEND("frontend"),
FRONTEND_SERVICE("frontend_service"),
INGESTOR("ingestor"),
STORE("store"),
COORDINATOR("coordinator"),
EXECUTOR_GRAPH("executor_graph"),
EXECUTOR_QUERY("executor_query"),
EXECUTOR_MANAGE("executor_manage"),
EXECUTOR_ENGINE("executor_engine"),
FRONTEND_SERVICE("frontend_service"),
GAIA_ENGINE("gaia_engine"),
GAIA_RPC("gaia_rpc");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ public class CommonConfig {

public static final Config<Integer> RPC_PORT = Config.intConfig("rpc.port", 0);

public static final Config<String> GAIA_RPC_PORT = Config.stringConfig("gaia.rpc.port", "");
public static final Config<String> GAIA_ENGINE_PORT =
Config.stringConfig("gaia.engine.port", "");
public static final Config<String> FRONTEND_RPC_PORT =
Config.stringConfig("frontend.rpc.port", "");
public static final Config<String> COORDINATOR_RPC_PORT =
Config.stringConfig("coordinator.rpc.port", "");
public static final Config<String> INGESTOR_RPC_PORT =
Config.stringConfig("ingestor.rpc.port", "");
public static final Config<String> STORE_RPC_PORT = Config.stringConfig("store.rpc.port", "");

public static final Config<Integer> RPC_THREAD_COUNT =
Config.intConfig(
"rpc.thread.count",
Expand Down Expand Up @@ -75,4 +86,7 @@ public class CommonConfig {
// Whether to create test kafka cluster on MaxNode
public static final Config<Boolean> KAFKA_TEST_CLUSTER_ENABLE =
Config.boolConfig("kafka.test.cluster.enable", true);

public static final Config<Boolean> SECONDARY_INSTANCE_ENABLED =
Config.boolConfig("secondary.instance.enabled", false);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class CoordinatorConfig {
Config.longConfig("offsets.persist.interval.ms", 3000L);

public static final Config<Boolean> LOG_RECYCLE_ENABLE =
Config.boolConfig("log.recycle.enable", true);
Config.boolConfig("log.recycle.enable", false);

public static final Config<Long> LOG_RECYCLE_INTERVAL_SECOND =
Config.longConfig("log.recycle.interval.second", 600L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ public class FrontendConfig {

public static final Config<String> AUTH_PASSWORD = Config.stringConfig("auth.password", "");

//
public static final Config<Integer> FRONTEND_SERVICE_PORT =
Config.intConfig("frontend.service.port", 8182);
Config.intConfig("frontend.service.port", 55556);
public static final Config<Integer> GREMLIN_SERVER_PORT =
Config.intConfig("gremlin.server.port", 8182);

public static final Config<Integer> FRONTEND_SERVICE_THREAD_COUNT =
Config.intConfig(
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ public class IngestorConfig {
Config.intConfig("ingestor.sender.operation.max.count", 102400);

public static final Config<Long> INGESTOR_CHECK_PROCESSOR_INTERVAL_MS =
Config.longConfig("ingestor.check.processor.interval.ms", 3000L);
Config.longConfig("ingestor.check.processor.interval.ms", 2000L);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ public class StoreConfig {
public static final Config<Boolean> STORE_GC_ENABLE =
Config.boolConfig("store.gc.enable", true);

public static final Config<Integer> EXECUTOR_GRAPH_PORT =
Config.intConfig("executor.graph.port", 0);

public static final Config<Integer> EXECUTOR_QUERY_PORT =
Config.intConfig("executor.query.port", 0);

public static final Config<Integer> EXECUTOR_ENGINE_PORT =
Config.intConfig("executor.engine.port", 0);
public static final Config<Long> STORE_GC_INTERVAL_MS =
Config.longConfig("store.gc.interval.ms", 5000L);

// set by IS_SECONDARY_INSTANCE, used in graph.rs
public static final Config<String> STORE_STORAGE_ENGINE =
Config.stringConfig("store.storage.engine", "rocksdb");
public static final Config<String> STORE_SECONDARY_DATA_PATH =
Config.stringConfig("store.data.secondary.path", "./data_secondary");
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public QueryLogger(String query, long queryId) {
this.queryId = queryId;
}

public void debug(String format, Object... args) {
defaultLogger.debug(this + " : " + format, args);
}

public void info(String format, Object... args) {
defaultLogger.info(this + " : " + format, args);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,6 @@ protected void processTraversal(
InterOpCollection.process(opCollection);

long jobId = queryLogger.getQueryId();
String jobName = "ir_plan_" + jobId;
IrPlan irPlan = new IrPlan(irMeta, opCollection);
// print script and jobName with ir plan
queryLogger.info("ir plan {}", irPlan.getPlanAsJson());
Expand All @@ -334,6 +333,7 @@ protected void processTraversal(
PegasusClient.JobRequest.newBuilder()
.setPlan(ByteString.copyFrom(physicalPlanBytes))
.build();
String jobName = "ir_plan_" + jobId;
PegasusClient.JobConfig jobConfig =
PegasusClient.JobConfig.newBuilder()
.setJobId(jobId)
Expand All @@ -346,6 +346,7 @@ protected void processTraversal(
.setAll(PegasusClient.Empty.newBuilder().build())
.build();
request = request.toBuilder().setConf(jobConfig).build();

this.rpcClient.submit(request, resultProcessor, timeoutConfig.getChannelTimeoutMS());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected AbstractResultProcessor(
RequestMessage msg = writeResult.getRequestMessage();
Settings settings = writeResult.getSettings();
// init batch size from resultIterationBatchSize in conf/gremlin-server.yaml,
// or args in RequestMessage which is originate from gremlin client
// or args in RequestMessage which is originated from gremlin client
this.resultCollectorsBatchSize =
(Integer)
msg.optionalArgs(Tokens.ARGS_BATCH_SIZE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void createVolumeIfNotExists(Odps odps) throws IOException {
volumeName,
"created by groot data-load-tools",
Volume.Type.OLD,
7L);
1L);
}
} catch (OdpsException e) {
System.out.println(
Expand Down
31 changes: 22 additions & 9 deletions interactive_engine/executor/assembly/groot/src/store/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ pub extern "C" fn openGraphStore(config_bytes: *const u8, len: usize) -> GraphHa
let buf = unsafe { ::std::slice::from_raw_parts(config_bytes, len) };
let proto = parse_pb::<ConfigPb>(buf).expect("parse config pb failed");
let mut config_builder = GraphConfigBuilder::new();
config_builder.set_storage_engine("rocksdb");
let engine = "rocksdb".to_string();
let engine = proto
.get_configs()
.get("store.storage.engine")
.unwrap_or(&engine);
config_builder.set_storage_engine(engine);
config_builder.set_storage_options(proto.get_configs().clone());
let config = config_builder.build();
let path = config
Expand Down Expand Up @@ -438,14 +443,22 @@ fn delete_edge<G: MultiVersionGraph>(graph: &G, snapshot_id: i64, op: &Operation

#[no_mangle]
pub extern "C" fn garbageCollectSnapshot(ptr: GraphHandle, snapshot_id: i64) -> Box<JnaResponse> {
unsafe {
let graph_store_ptr = &*(ptr as *const GraphStore);
match graph_store_ptr.gc(snapshot_id) {
Ok(_) => JnaResponse::new_success(),
Err(e) => {
let msg = format!("{:?}", e);
JnaResponse::new_error(&msg)
}
let graph_store_ptr = unsafe { &*(ptr as *const GraphStore) };

match graph_store_ptr.try_catch_up_with_primary() {
Ok(_) => (),
Err(e) => {
error!("Error during catch up primary {:?}", e);
}
};
if snapshot_id % 3600 != 0 {
return JnaResponse::new_success();
}
match graph_store_ptr.gc(snapshot_id) {
Ok(_) => JnaResponse::new_success(),
Err(e) => {
let msg = format!("{:?}", e);
JnaResponse::new_error(&msg)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public void onError(Throwable throwable) {

@Override
public void onCompleted() {
logger.info("finish get job response from one server");
if (counter.decrementAndGet() == 0) {
logger.info("finish get job response from all servers");
processor.finish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl<T: 'static> ResultSink<T> {
info_worker!("Job is canceled");
let msg = "Job is canceled".to_string();
let err = JobExecError::from(msg);
warn_worker!("Job is canceled");
tx.on_error(Box::new(err));
}
_ => (),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl From<pb::edge_expand::Direction> for Direction {
}
}

#[derive(Default, Debug)]
#[derive(Default, Debug, Clone)]
pub struct QueryParams {
pub labels: Vec<LabelId>,
pub limit: Option<usize>,
Expand Down
Loading
Loading