Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small optim of frame serialization for puffin_http #239

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ jobs:
- name: Ensure that the tool cache is populated with the cargo-vet binary
# build from source, as are not published binaries yet :(
# tracked in https://github.com/mozilla/cargo-vet/issues/484
run: cargo install --root ${{ runner.tool_cache }}/cargo-vet --version ${{ env.CARGO_VET_VERSION }} cargo-vet
run: cargo +stable install --root ${{ runner.tool_cache }}/cargo-vet --version ${{ env.CARGO_VET_VERSION }} cargo-vet
- name: Invoke cargo-vet
run: |
cargo vet --locked
Expand Down
55 changes: 38 additions & 17 deletions puffin/src/frame_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,18 +561,14 @@ impl FrameData {
#[cfg(feature = "serialization")]
pub fn write_into(
&self,
scope_collection: &crate::ScopeCollection,
send_all_scopes: bool,
write: &mut impl std::io::Write,
scope_collection: Option<&crate::ScopeCollection>,
mut write: &mut impl std::io::Write,
) -> anyhow::Result<()> {
use bincode::Options as _;
use byteorder::{WriteBytesExt as _, LE};
use byteorder::WriteBytesExt as _;

let meta_serialized = bincode::options().serialize(&self.meta)?;

write.write_all(b"PFD4")?;
write.write_all(&(meta_serialized.len() as u32).to_le_bytes())?;
write.write_all(&meta_serialized)?;
write.write_all(b"PFD5")?;
bincode::options().serialize_into(&mut write, &self.meta)?;

self.create_packed();
let packed_streams_lock = self.data.read();
Expand All @@ -582,15 +578,12 @@ impl FrameData {
write.write_u8(packed_streams.compression_kind as u8)?;
write.write_all(&packed_streams.bytes)?;

let to_serialize_scopes: Vec<_> = if send_all_scopes {
scope_collection.scopes_by_id().values().cloned().collect()
if let Some(scope_collection) = scope_collection {
bincode::options().serialize_into(&mut write, &scope_collection.serializable())?;
} else {
self.scope_delta.clone()
};
bincode::options().serialize_into(write, &self.scope_delta)?;
}

let serialized_scopes = bincode::options().serialize(&to_serialize_scopes)?;
write.write_u32::<LE>(serialized_scopes.len() as u32)?;
write.write_all(&serialized_scopes)?;
Ok(())
}

Expand All @@ -599,7 +592,7 @@ impl FrameData {
/// [`None`] is returned if the end of the stream is reached (EOF),
/// or an end-of-stream sentinel of `0u32` is read.
#[cfg(feature = "serialization")]
pub fn read_next(read: &mut impl std::io::Read) -> anyhow::Result<Option<Self>> {
pub fn read_next(mut read: &mut impl std::io::Read) -> anyhow::Result<Option<Self>> {
use anyhow::Context as _;
use bincode::Options as _;
use byteorder::{ReadBytesExt, LE};
Expand Down Expand Up @@ -779,6 +772,34 @@ impl FrameData {
scope_delta: new_scopes,
full_delta: false,
}))
} else if &header == b"PFD5" {
// Added 2024-12-22: remove useless manual sequence size serialization and temporary vector.
let meta = {
bincode::options()
.deserialize_from(&mut read)
.context("bincode deserialize")?
};

let streams_compressed_length = read.read_u32::<LE>()? as usize;
let compression_kind = CompressionKind::from_u8(read.read_u8()?)?;
let streams_compressed = {
let mut streams_compressed = vec![0_u8; streams_compressed_length];
read.read_exact(&mut streams_compressed)?;
PackedStreams::new(compression_kind, streams_compressed)
};

let deserialized_scopes: Vec<Arc<crate::ScopeDetails>> = {
bincode::options()
.deserialize_from(read) // serialized_scopes.as_slice()
.context("Can not deserialize scope details")?
};

Ok(Some(Self {
meta,
data: RwLock::new(FrameDataState::Packed(streams_compressed)),
scope_delta: deserialized_scopes,
full_delta: false,
}))
} else {
anyhow::bail!("Failed to decode: this data is newer than this reader. Please update your puffin version!");
}
Expand Down
3 changes: 2 additions & 1 deletion puffin/src/profile_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct FrameView {
/// Maintain stats as we add/remove frames
stats: FrameStats,

/// Collect all scope infos(id/name) from the start of the profiling.
scope_collection: ScopeCollection,
}

Expand Down Expand Up @@ -229,7 +230,7 @@ impl FrameView {
write.write_all(b"PUF0")?;

for frame in self.all_uniq() {
frame.write_into(&self.scope_collection, false, write)?;
frame.write_into(None, write)?;
}
Ok(())
}
Expand Down
22 changes: 22 additions & 0 deletions puffin/src/scope_details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,28 @@ impl ScopeCollection {
pub fn scopes_by_id(&self) -> &HashMap<ScopeId, Arc<ScopeDetails>> {
&self.0.scope_id_to_details
}

/// A wrapper than allow `Serialize` all the scopes values of `ScopeCollection`.
#[cfg(feature = "serialization")]
pub fn serializable(&self) -> Serializable<'_> {
Serializable(self)
}
}

/// A wrapper than impl `Serialize` for `ScopeCollection`.
/// This `struct` is created by the [`serializable`] method on `ScopeCollection`.
#[cfg(feature = "serialization")]
pub struct Serializable<'a>(&'a crate::ScopeCollection);

#[cfg(feature = "serialization")]
impl<'a> serde::Serialize for Serializable<'a> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let iter = self.0.scopes_by_id().values();
serializer.collect_seq(iter)
}
}

/// Scopes are identified by user-provided name while functions are identified by the function name.
Expand Down
43 changes: 30 additions & 13 deletions puffin_http/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use anyhow::Context as _;
use puffin::{FrameSinkId, FrameView, GlobalProfiler};
use crossbeam_channel::TryRecvError;
use puffin::{FrameSinkId, GlobalProfiler, ScopeCollection};
use std::{
io::Write,
net::{SocketAddr, TcpListener, TcpStream},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};

/// Maximum size of the backlog of packets to send to a client if they aren't reading fast enough.
Expand Down Expand Up @@ -249,17 +251,25 @@ impl Server {
clients: Default::default(),
num_clients: num_clients_cloned,
send_all_scopes: false,
frame_view: Default::default(),
scope_collection: Default::default(),
};

while let Ok(frame) = rx.recv() {
server_impl.frame_view.add_frame(frame.clone());
loop {
if let Err(err) = server_impl.accept_new_clients() {
log::warn!("puffin server failure: {}", err);
}

if let Err(err) = server_impl.send(&frame) {
log::warn!("puffin server failure: {}", err);
match rx.try_recv() {
Ok(frame) => {
if let Err(err) = server_impl.send(&frame) {
log::warn!("puffin server failure: {}", err);
}
}
Err(TryRecvError::Empty) => {
std::thread::sleep(Duration::from_millis(1));
}
Err(TryRecvError::Disconnected) => {
break;
}
}
}
})
Expand Down Expand Up @@ -325,7 +335,7 @@ struct PuffinServerImpl {
clients: Vec<Client>,
num_clients: Arc<AtomicUsize>,
send_all_scopes: bool,
frame_view: FrameView,
scope_collection: ScopeCollection,
}

impl PuffinServerImpl {
Expand Down Expand Up @@ -372,18 +382,25 @@ impl PuffinServerImpl {
}
puffin::profile_function!();

// Keep scope_collection up-to-date
frame.scope_delta.iter().for_each(|new_scope| {
self.scope_collection.insert(new_scope.clone());
});

let mut packet = vec![];

packet
.write_all(&crate::PROTOCOL_VERSION.to_le_bytes())
.unwrap();

let scope_collection = if self.send_all_scopes {
Some(&self.scope_collection)
} else {
None
};

frame
.write_into(
self.frame_view.scope_collection(),
self.send_all_scopes,
&mut packet,
)
.write_into(scope_collection, &mut packet)
.context("Encode puffin frame")?;
self.send_all_scopes = false;

Expand Down
Loading