Skip to content

Commit

Permalink
fix(interactive): refine rust otel export (#3810)
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 authored May 20, 2024
1 parent 6e5dbc1 commit 46e3559
Show file tree
Hide file tree
Showing 18 changed files with 116 additions and 58 deletions.
9 changes: 5 additions & 4 deletions charts/graphscope-store/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ data:
discovery.mode={{ .Values.discoveryMode }}
role.name=ROLE
node.idx=INDEX
release.full.name={{ include "graphscope-store.fullname" . }}
{{- else }}
rpc.port=0
discovery.mode=zookeeper
role.name=""
node.idx=0
release.full.name=localhost
{{- end }}

store.node.count={{ .Values.store.replicaCount }}
Expand Down Expand Up @@ -58,7 +60,6 @@ data:
neo4j.bolt.server.disabled=true

log4rs.config=LOG4RS_CONFIG
release.full.name={{ include "graphscope-store.fullname" . }}
## Auth config
auth.username={{ .Values.auth.username }}
auth.password={{ .Values.auth.password }}
Expand All @@ -79,7 +80,6 @@ data:
store.gc.interval.ms={{ .Values.storeGcIntervalMs }}

write.ha.enabled={{ .Values.backup.enabled }}
tracing.enabled={{ .Values.otel.enabled }}

## Coordinator Config
rpc.max.bytes.mb={{ .Values.rpcMaxBytesMb }}
Expand Down Expand Up @@ -137,12 +137,13 @@ data:
export OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE=DELTA
export OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION=BASE2_EXPONENTIAL_BUCKET_HISTOGRAM
export OTEL_EXPORTER_OTLP_COMPRESSION=gzip
# export OTEL_EXPORTER_OTLP_COMPRESSION=gzip
{{- end }}
{{- if .Values.uptrace.enabled }}
export UPTRACE_DSN=http://{{ .Values.uptrace.token }}@${{ .Values.uptrace.service }}:14318?grpc=14317
export OTEL_EXPORTER_OTLP_ENDPOINT=http://{{ .Values.uptrace.service }}:14317
export OTEL_EXPORTER_OTLP_HEADERS=uptrace-dsn=http://{{ .Values.uptrace.token }}@${{ .Values.uptrace.service }}:14318?grpc=14317
export OTEL_EXPORTER_OTLP_HEADERS=uptrace-dsn=${UPTRACE_DSN}
{{- end }}
Expand Down
1 change: 1 addition & 0 deletions charts/graphscope-store/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ otel:
registry: docker.io
repository: jaegertracing/all-in-one
tag: "latest"
# https://opentelemetry.io/docs/languages/sdk-configuration/general/#otel_traces_sampler
traces:
sampler:
name: "traceidratio"
Expand Down
3 changes: 2 additions & 1 deletion interactive_engine/assembly/src/conf/groot/config.template
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ neo4j.bolt.server.disabled=true
pegasus.worker.num=2
pegasus.hosts=localhost:8080

kafka.test.cluster.enable=true
kafka.test.cluster.enable=true
OTEL_SDK_DISABLED=true
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ public class CommonConfig {

public static final Config<Integer> PARTITION_COUNT = Config.intConfig("partition.count", 1);

public static final Config<Long> METRIC_UPDATE_INTERVAL_MS =
Config.longConfig("metric.update.interval.ms", 5000L);

public static final Config<String> LOG4RS_CONFIG = Config.stringConfig("log4rs.config", "");

public static final Config<String> DISCOVERY_MODE =
Expand All @@ -74,8 +71,6 @@ public class CommonConfig {

public static final Config<Boolean> SECONDARY_INSTANCE_ENABLED =
Config.boolConfig("secondary.instance.enabled", false);
public static final Config<Boolean> TRACING_ENABLED =
Config.boolConfig("tracing.enabled", false);

// Create an extra store pod for each original store pod for backup.
// Only available in multi pod mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class CoordinatorConfig {
Config.boolConfig("log.recycle.enable", false);

public static final Config<Long> LOG_RECYCLE_INTERVAL_SECOND =
Config.longConfig("log.recycle.interval.second", 600L);
Config.longConfig("log.recycle.interval.second", 3600L);

public static final Config<String> FILE_META_STORE_PATH =
Config.stringConfig("file.meta.store.path", "./meta");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,14 @@ public class StoreConfig {
public static final Config<Long> STORE_QUEUE_WAIT_MS =
Config.longConfig("store.queue.wait.ms", 3000L);

public static final Config<Long> STORE_COMMIT_INTERVAL_MS =
Config.longConfig("store.commit.interval.ms", 1000L);

public static final Config<Boolean> STORE_GC_ENABLE =
Config.boolConfig("store.gc.enable", true);

public static final Config<Long> STORE_GC_INTERVAL_MS =
Config.longConfig("store.gc.interval.ms", 5000L);
Config.longConfig("store.gc.interval.ms", 3600000L);

public static final Config<Long> STORE_CATCHUP_INTERVAL_MS =
Config.longConfig("store.catchup.interval.ms", 30000L);
Config.longConfig("store.catchup.interval.ms", 5000L);

// set by IS_SECONDARY_INSTANCE, used in graph.rs
public static final Config<String> STORE_STORAGE_ENGINE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,7 @@ fn make_gaia_config(graph_config: Arc<GraphConfig>) -> GaiaConfig {
.no_delay(no_delay)
.send_buffer(send_buffer)
.heartbeat_sec(heartbeat_sec);
let enable_tracing = graph_config
.get_storage_option("tracing.enabled")
.map(|config_str| {
config_str
.parse()
.expect("parse tracing.enabled failed")
});
GaiaConfig { network: Some(network_config), max_pool_size, enable_tracing }
GaiaConfig { network: Some(network_config), max_pool_size }
}

fn make_gaia_rpc_config(graph_config: Arc<GraphConfig>) -> RPCServerConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ macro_rules! configure_with_default {
pub struct Configuration {
pub network: Option<NetworkConfig>,
pub max_pool_size: Option<u32>,
pub enable_tracing: Option<bool>,
}

impl Configuration {
Expand All @@ -46,11 +45,11 @@ impl Configuration {
}

pub fn singleton() -> Self {
Configuration { network: None, max_pool_size: None, enable_tracing: None }
Configuration { network: None, max_pool_size: None }
}

pub fn with(network: NetworkConfig) -> Self {
Configuration { network: Some(network), max_pool_size: None, enable_tracing: None }
Configuration { network: Some(network), max_pool_size: None }
}

pub fn server_id(&self) -> u64 {
Expand Down
1 change: 1 addition & 0 deletions interactive_engine/executor/engine/pegasus/server/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ fn codegen_inplace() -> Result<(), Box<dyn std::error::Error>> {
fn codegen_inplace() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_server(true)
.build_client(true)
.compile(&["proto/job_service.proto", "proto/job_plan.proto"], &["proto"])?;
Ok(())
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Set max threads will be created in the executor pool;
# It will be set to CPU cores by default;
max_pool_size = 8
enable_tracing = false

[network]
# Set server id of current config belongs to;
Expand Down
104 changes: 86 additions & 18 deletions interactive_engine/executor/engine/pegasus/server/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,21 @@ use std::time::Duration;
use futures::Stream;
use hyper::server::accept::Accept;
use hyper::server::conn::{AddrIncoming, AddrStream};
use opentelemetry::trace::TraceContextExt;
use opentelemetry::trace::{TraceContextExt, TraceError};
use opentelemetry::{
global,
propagation::Extractor,
trace::{Span, SpanKind, Tracer},
KeyValue,
};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_otlp::{ExportConfig, Protocol, TonicExporterBuilder, WithExportConfig};
use opentelemetry_sdk::metrics::SdkMeterProvider;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::resource::{
EnvResourceDetector, SdkProvidedResourceDetector, TelemetryResourceDetector,
};
use opentelemetry_sdk::trace::BatchConfigBuilder;
use opentelemetry_sdk::Resource;
use pegasus::api::function::FnResult;
use pegasus::api::FromStream;
use pegasus::errors::{ErrorKind, JobExecError};
Expand Down Expand Up @@ -188,7 +194,8 @@ where
type SubmitStream = UnboundedReceiverStream<Result<pb::JobResponse, Status>>;

async fn cancel(&self, req: Request<pb::CancelRequest>) -> Result<Response<Empty>, Status> {
let parent_ctx = global::get_text_map_propagator(|prop| prop.extract(&MetadataMap(req.metadata())));
let parent_ctx =
global::get_text_map_propagator(|prop| prop.extract(&MyMetadataMap(req.metadata())));
let tracer = global::tracer("executor");
let _span = tracer
.span_builder("JobService/cancel")
Expand All @@ -201,7 +208,8 @@ where

async fn submit(&self, req: Request<pb::JobRequest>) -> Result<Response<Self::SubmitStream>, Status> {
debug!("accept new request from {:?};", req.remote_addr());
let parent_ctx = global::get_text_map_propagator(|prop| prop.extract(&MetadataMap(req.metadata())));
let parent_ctx =
global::get_text_map_propagator(|prop| prop.extract(&MyMetadataMap(req.metadata())));
let tracer = global::tracer("executor");

let pb::JobRequest { conf, source, plan, resource } = req.into_inner();
Expand Down Expand Up @@ -292,9 +300,7 @@ where
D: ServerDetect + 'static,
E: ServiceStartListener,
{
if server_config.enable_tracing.unwrap_or(false) {
let _tracer = init_tracer().expect("Failed to initialize tracer.");
}
init_otel().expect("Failed to initialize open telemetry");
let server_id = server_config.server_id();
if let Some(server_addr) = pegasus::startup_with(server_config, server_detector)? {
listener.on_server_start(server_id, server_addr)?;
Expand Down Expand Up @@ -378,24 +384,86 @@ impl<S: pb::job_service_server::JobService> RPCJobServer<S> {
}
}

fn init_tracer() -> Result<opentelemetry_sdk::trace::Tracer, opentelemetry::trace::TraceError> {
fn init_otel() -> Result<bool, Box<dyn std::error::Error>> {
let otel_disable = std::env::var("OTEL_SDK_DISABLED").unwrap_or("true".to_string());
info!("otel_disable: {}", otel_disable);
if otel_disable.trim().parse().unwrap() {
info!("OTEL is disabled");
return Ok(true);
}

// let mut metadata = tonic::metadata::MetadataMap::with_capacity(1);
// let dsn = std::env::var("UPTRACE_DSN").unwrap_or_default();
// if !dsn.is_empty() {
// metadata.insert("uptrace-dsn", dsn.parse().unwrap());
// info!("using DSN: {}", dsn);
// } else {
// warn!("Error: UPTRACE_DSN not found.");
// }

let default_endpoint = "http://localhost:4317".to_string();
let endpoint = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").unwrap_or(default_endpoint);

let resource = Resource::from_detectors(
Duration::from_secs(0),
vec![
Box::new(SdkProvidedResourceDetector),
Box::new(EnvResourceDetector::new()),
Box::new(TelemetryResourceDetector),
],
);

let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_timeout(Duration::from_secs(5))
.with_endpoint(endpoint.clone());
// .with_metadata(metadata.clone());
let _tracer = init_tracer(resource.clone(), exporter)?;

let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_timeout(Duration::from_secs(5))
.with_endpoint(endpoint);
// .with_metadata(metadata);

let _meter = init_meter_provider(resource, exporter)?;
global::set_meter_provider(_meter);
return Ok(true);
}

fn init_tracer(
resource: Resource, exporter: TonicExporterBuilder,
) -> Result<opentelemetry_sdk::trace::Tracer, TraceError> {
global::set_text_map_propagator(TraceContextPropagator::new());
let batch_config = opentelemetry_sdk::trace::BatchConfigBuilder::default()
.with_max_queue_size(2048)
.with_max_export_batch_size(512)
.with_scheduled_delay(Duration::from_millis(5000))
.build();
let trace_config = opentelemetry_sdk::trace::config().with_resource(resource);
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint("http://localhost:4317"),
)
.with_trace_config(opentelemetry_sdk::trace::config().with_resource(
opentelemetry_sdk::Resource::new(vec![KeyValue::new("service.name", "pegasus")]),
))
.with_exporter(exporter)
.with_batch_config(batch_config)
.with_trace_config(trace_config)
.install_batch(opentelemetry_sdk::runtime::Tokio)
}

struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap);
fn init_meter_provider(
resource: Resource, exporter: TonicExporterBuilder,
) -> opentelemetry::metrics::Result<SdkMeterProvider> {
opentelemetry_otlp::new_pipeline()
.metrics(opentelemetry_sdk::runtime::Tokio)
.with_exporter(exporter)
.with_period(Duration::from_secs(15))
.with_timeout(Duration::from_secs(5))
.with_resource(resource)
.build()
}

struct MyMetadataMap<'a>(&'a tonic::metadata::MetadataMap);

impl<'a> Extractor for MetadataMap<'a> {
impl<'a> Extractor for MyMetadataMap<'a> {
fn get(&self, key: &str) -> Option<&str> {
self.0
.get(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public class Utils {

public static String getHostTemplate(Configs configs, RoleType role) {
String releaseName = DiscoveryConfig.RELEASE_FULL_NAME.get(configs);
if (releaseName.equals("localhost") || releaseName.equals("127.0.0.1")) {
return releaseName;
}
// template = "{releaseName}-{role}-{}.{releaseName}-{role}-headless";
// i.e. demo-graphscope-store-frontend-0.demo-graphscope-store-frontend-headless
String svcTemplate = "%s-%s";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ private void doRecycle() {
List<Long> queueOffsets = this.snapshotManager.getQueueOffsets();
for (int i = 0; i < queueOffsets.size(); i++) {
long offset = queueOffsets.get(i);
offset = Math.max(offset - 3600, 0); // Leave some spaces
try {
logService.deleteBeforeOffset(i, offset);
logger.info("recycled queue [{}] offset [{}]", i, offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public void getPartitionNum(
public void prepareDataLoad(
PrepareDataLoadRequest request,
StreamObserver<PrepareDataLoadResponse> responseObserver) {
logger.info("Preparing data load");
DdlRequestBatch.Builder builder = DdlRequestBatch.newBuilder();
for (DataLoadTargetPb dataLoadTargetPb : request.getDataLoadTargetsList()) {
DataLoadTarget dataLoadTarget = DataLoadTarget.parseProto(dataLoadTargetPb);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public void start() {

public void stop() {
this.shouldStop = true;
try {
updateQueueOffsets();
} catch (IOException ex) {
logger.error("update queue offset failed", ex);
}
if (this.persistOffsetsScheduler != null) {
this.persistOffsetsScheduler.shutdown();
try {
Expand Down Expand Up @@ -163,8 +168,7 @@ private void updateQueueOffsets() throws IOException {
boolean changed = false;
List<Long> consumedOffsets = writerAgent.getConsumedQueueOffsets();
for (int qId = 0; qId < queueOffsets.size(); qId++) {
long minOffset = Long.MAX_VALUE;
minOffset = Math.min(consumedOffsets.get(qId), minOffset);
long minOffset = Math.min(consumedOffsets.get(qId), Long.MAX_VALUE);
if (minOffset != Long.MAX_VALUE && minOffset > newQueueOffsets.get(qId)) {
newQueueOffsets.set(qId, minOffset);
changed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ public class PartitionService {
public PartitionService(Configs configs, StoreService storeService) {
this.storeService = storeService;
this.isSecondary = CommonConfig.SECONDARY_INSTANCE_ENABLED.get(configs);
this.storeCatchupIntervalMS = StoreConfig.STORE_GC_INTERVAL_MS.get(configs);
// this.storeCatchupIntervalMS = StoreConfig.STORE_CATCHUP_INTERVAL_MS.get(configs);
this.storeCatchupIntervalMS = StoreConfig.STORE_CATCHUP_INTERVAL_MS.get(configs);

this.scheduler =
Executors.newScheduledThreadPool(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ private void processBatches() {
this.consumeSI = batchSI;
this.availSnapshotInfoRef.set(new SnapshotInfo(availSI, availDdlSI));
this.commitExecutor.execute(this::asyncCommit);
} else {
logger.warn("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI);
} else { // a flurry of batches with same snapshot ID
logger.debug("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI);
}
if (hasDdl) {
this.consumeDdlSnapshotId = batchSI;
Expand Down
Loading

0 comments on commit 46e3559

Please sign in to comment.