Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add opentelemetry capability to the runtime #234

Merged
merged 12 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ on:
paths:
- apis/python/**
- binaries/runtime/**
- examples/python-dataflow/**
- examples/python-operator-dataflow/**
pull_request:
workflow_dispatch:

Expand Down
108 changes: 93 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions apis/python/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["tracing"]
default = ["tracing", "telemetry"]
tracing = ["dora-node-api/tracing"]
telemetry = ["dora-runtime/telemetry"]

[dependencies]
dora-node-api = { workspace = true }
Expand All @@ -19,7 +20,7 @@ pyo3 = { version = "0.16", features = ["eyre", "abi3-py37"] }
eyre = "0.6"
serde_yaml = "0.8.23"
flume = "0.10.14"
dora-runtime = { workspace = true }
dora-runtime = { workspace = true, features = ["tracing"] }

[lib]
name = "dora"
Expand Down
4 changes: 2 additions & 2 deletions apis/rust/node/src/daemon_connection/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ pub enum DaemonChannel {
}

impl DaemonChannel {
#[tracing::instrument]
#[tracing::instrument(level = "trace")]
pub fn new_tcp(stream: TcpStream) -> eyre::Result<Self> {
stream.set_nodelay(true).context("failed to set nodelay")?;
Ok(DaemonChannel::Tcp(stream))
}

#[tracing::instrument]
#[tracing::instrument(level = "trace")]
pub unsafe fn new_shmem(daemon_control_region_id: &str) -> eyre::Result<Self> {
let daemon_events_region = ShmemConf::new()
.os_id(daemon_control_region_id)
Expand Down
2 changes: 1 addition & 1 deletion apis/rust/node/src/daemon_connection/control_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub(crate) struct ControlChannel {
}

impl ControlChannel {
#[tracing::instrument(skip(channel, event_stream_thread_handle))]
#[tracing::instrument(skip(channel, event_stream_thread_handle), level = "trace")]
pub(crate) fn init(
dataflow_id: DataflowId,
node_id: &NodeId,
Expand Down
10 changes: 5 additions & 5 deletions apis/rust/node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ pub struct DoraNode {

impl DoraNode {
pub fn init_from_env() -> eyre::Result<(Self, EventStream)> {
#[cfg(feature = "tracing")]
set_up_tracing().context("failed to set up tracing subscriber")?;

let node_config = {
let node_config: NodeConfig = {
let raw = std::env::var("DORA_NODE_CONFIG")
.wrap_err("env variable DORA_NODE_CONFIG must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize operator config")?
};
#[cfg(feature = "tracing")]
set_up_tracing(&node_config.node_id.to_string())
.context("failed to set up tracing subscriber")?;
Self::init(node_config)
}

Expand Down Expand Up @@ -201,7 +201,7 @@ impl DoraNode {
}

impl Drop for DoraNode {
#[tracing::instrument(skip(self), fields(self.id = %self.id))]
#[tracing::instrument(skip(self), fields(self.id = %self.id), level = "trace")]
fn drop(&mut self) {
if !self.sent_out_shared_memory.is_empty() {
tracing::info!(
Expand Down
2 changes: 1 addition & 1 deletion binaries/coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use eyre::Context;
#[tokio::main]
async fn main() -> eyre::Result<()> {
#[cfg(feature = "tracing")]
set_up_tracing().context("failed to set up tracing subscriber")?;
set_up_tracing("dora-coordinator").context("failed to set up tracing subscriber")?;

dora_coordinator::run().await
}
6 changes: 5 additions & 1 deletion binaries/daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["tracing"]
default = ["tracing", "telemetry"]
tracing = ["dep:dora-tracing"]
# telemetry flag enables to trace dora-daemon as well as send ticks with opentelemetry context
# for distributed tracing.
telemetry = ["dep:tracing-opentelemetry"]

[dependencies]
eyre = "0.6.8"
tokio = { version = "1.20.1", features = ["full"] }
tokio-stream = { version = "0.1.11", features = ["net"] }
tracing = "0.1.36"
tracing-opentelemetry = { version = "0.18.0", optional = true }
futures-concurrency = "7.1.0"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.86"
Expand Down
28 changes: 23 additions & 5 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use coordinator::CoordinatorEvent;
use dora_core::config::Input;
use dora_core::daemon_messages::Data;
use dora_core::message::uhlc::HLC;
use dora_core::message::MetadataParameters;
use dora_core::{
config::{DataId, InputMapping, NodeId},
coordinator_messages::DaemonEvent,
Expand Down Expand Up @@ -35,6 +36,11 @@ mod listener;
mod spawn;
mod tcp_utils;

#[cfg(feature = "telemetry")]
use dora_tracing::telemetry::serialize_context;
#[cfg(feature = "telemetry")]
use tracing_opentelemetry::OpenTelemetrySpanExt;

pub struct Daemon {
running: HashMap<DataflowId, RunningDataflow>,

Expand Down Expand Up @@ -546,7 +552,7 @@ impl Daemon {
Ok(())
}

#[tracing::instrument(skip(self))]
#[tracing::instrument(skip(self), level = "trace")]
async fn handle_node_stop(
&mut self,
dataflow_id: Uuid,
Expand Down Expand Up @@ -831,13 +837,25 @@ impl RunningDataflow {
loop {
interval_stream.tick().await;

let span = tracing::span!(tracing::Level::TRACE, "tick");
let _ = span.enter();

let metadata = dora_core::message::Metadata::from_parameters(
hlc.new_timestamp(),
MetadataParameters {
watermark: 0,
deadline: 0,
#[cfg(feature = "telemetry")]
open_telemetry_context: serialize_context(&span.context()).into(),
#[cfg(not(feature = "telemetry"))]
open_telemetry_context: "".into(),
},
);

let event = DoraEvent::Timer {
dataflow_id,
interval,
metadata: dora_core::message::Metadata::from_parameters(
hlc.new_timestamp(),
Default::default(),
),
metadata,
};
if events_tx.send(event.into()).await.is_err() {
break;
Expand Down
4 changes: 2 additions & 2 deletions binaries/daemon/src/listener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl Listener {
Ok(())
}

#[tracing::instrument(skip(self), fields(%self.node_id))]
#[tracing::instrument(skip(self), fields(%self.node_id), level = "trace")]
async fn drop_oldest_inputs(&mut self) -> Result<(), eyre::ErrReport> {
let mut queue_size_remaining = self.queue_sizes.clone();
let mut dropped = 0;
Expand Down Expand Up @@ -268,7 +268,7 @@ impl Listener {
Ok(())
}

#[tracing::instrument(skip(self, connection), fields(%self.dataflow_id, %self.node_id))]
#[tracing::instrument(skip(self, connection), fields(%self.dataflow_id, %self.node_id), level = "trace")]
async fn handle_message<C: Connection>(
&mut self,
message: DaemonRequest,
Expand Down
Loading