Skip to content

Commit

Permalink
Support for draft-03 (#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
kixelated authored Mar 8, 2024
1 parent ad5cc5d commit 0d9c0ed
Show file tree
Hide file tree
Showing 51 changed files with 910 additions and 1,093 deletions.
8 changes: 0 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions dev/pub
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ ADDR="${ADDR:-$HOST:$PORT}"
# Generate a random 16 character name by default.
#NAME="${NAME:-$(head /dev/urandom | LC_ALL=C tr -dc 'a-zA-Z0-9' | head -c 16)}"

# JK use the name "dev" instead
# JK use the name "bbb" instead, matching the Big Buck Bunny demo.
# TODO use that random name if the host is not localhost
NAME="${NAME:-dev}"
NAME="${NAME:-bbb}"

# Combine the host and name into a URL.
URL="${URL:-"https://$ADDR/$NAME"}"
Expand Down
17 changes: 3 additions & 14 deletions moq-clock/src/clock.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::time;

use anyhow::Context;
use moq_transport::{
cache::{fragment, segment, track},
Expand Down Expand Up @@ -30,7 +28,6 @@ impl Publisher {
.create_segment(segment::Info {
sequence: VarInt::from_u32(sequence),
priority: 0,
expires: Some(time::Duration::from_secs(60)),
})
.context("failed to create minute segment")?;

Expand All @@ -56,19 +53,11 @@ impl Publisher {
// Everything but the second.
let base = now.format("%Y-%m-%d %H:%M:").to_string();

segment
.fragment(VarInt::ZERO, base.len())?
.chunk(base.clone().into())
.context("failed to write base")?;
segment.write(base.clone().into()).context("failed to write base")?;

loop {
let delta = now.format("%S").to_string();
let sequence = VarInt::from_u32(now.second() + 1);

segment
.fragment(sequence, delta.len())?
.chunk(delta.clone().into())
.context("failed to write delta")?;
segment.write(delta.clone().into()).context("failed to write delta")?;

println!("{}{}", base, delta);

Expand Down Expand Up @@ -119,7 +108,7 @@ impl Subscriber {

log::debug!("got first: {:?}", first);

if first.sequence.into_inner() != 0 {
if first.sequence != VarInt::ZERO {
anyhow::bail!("first object must be zero; I'm not going to implement a reassembly buffer");
}

Expand Down
41 changes: 14 additions & 27 deletions moq-pub/src/media.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{self, Context};
use moq_transport::cache::{broadcast, fragment, segment, track};
use moq_transport::cache::{broadcast, segment, track};
use moq_transport::VarInt;
use mp4::{self, ReadBox};
use serde_json::json;
Expand Down Expand Up @@ -41,16 +41,13 @@ impl<I: AsyncRead + Send + Unpin + 'static> Media<I> {

// Create the catalog track with a single segment.
let mut init_track = broadcast.create_track("0.mp4")?;
let init_segment = init_track.create_segment(segment::Info {
let mut init_segment = init_track.create_segment(segment::Info {
sequence: VarInt::ZERO,
priority: 0,
expires: None,
})?;

// Create a single fragment, optionally setting the size
let mut init_fragment = init_segment.final_fragment(VarInt::ZERO)?;

init_fragment.chunk(init.into())?;
// Write the init segment to the track.
init_segment.write(init.into())?;

let mut tracks = HashMap::new();

Expand Down Expand Up @@ -128,10 +125,9 @@ impl<I: AsyncRead + Send + Unpin + 'static> Media<I> {
init_track_name: &str,
moov: &mp4::MoovBox,
) -> Result<(), anyhow::Error> {
let segment = track.create_segment(segment::Info {
let mut segment = track.create_segment(segment::Info {
sequence: VarInt::ZERO,
priority: 0,
expires: None,
})?;

let mut tracks = Vec::new();
Expand Down Expand Up @@ -214,10 +210,7 @@ impl<I: AsyncRead + Send + Unpin + 'static> Media<I> {
log::info!("catalog: {}", catalog_str);

// Create a single fragment for the segment.
let mut fragment = segment.final_fragment(VarInt::ZERO)?;

// Add the segment and add the fragment.
fragment.chunk(catalog_str.into())?;
segment.write(catalog_str.into())?;

Ok(())
}
Expand Down Expand Up @@ -265,7 +258,7 @@ struct Track {
track: track::Publisher,

// The current segment
current: Option<fragment::Publisher>,
current: Option<segment::Publisher>,

// The number of units per second.
timescale: u64,
Expand All @@ -288,7 +281,7 @@ impl Track {
if let Some(current) = self.current.as_mut() {
if !fragment.keyframe {
// Use the existing segment
current.chunk(raw.into())?;
current.write(raw.into())?;
return Ok(());
}
}
Expand All @@ -304,33 +297,27 @@ impl Track {
.context("timestamp too large")?;

// Create a new segment.
let segment = self.track.create_segment(segment::Info {
let mut segment = self.track.create_segment(segment::Info {
sequence: VarInt::try_from(self.sequence).context("sequence too large")?,

// Newer segments are higher priority
priority: u32::MAX.checked_sub(timestamp).context("priority too large")?,

// Delete segments after 10s.
expires: Some(time::Duration::from_secs(10)),
})?;

// Create a single fragment for the segment that we will keep appending.
let mut fragment = segment.final_fragment(VarInt::ZERO)?;

self.sequence += 1;

// Insert the raw atom into the segment.
fragment.chunk(raw.into())?;
// Create a single fragment for the segment that we will keep appending.
segment.write(raw.into())?;

// Save for the next iteration
self.current = Some(fragment);
self.current = Some(segment);

Ok(())
}

pub fn data(&mut self, raw: Vec<u8>) -> anyhow::Result<()> {
let fragment = self.current.as_mut().context("missing current fragment")?;
fragment.chunk(raw.into())?;
let segment = self.current.as_mut().context("missing current fragment")?;
segment.write(raw.into())?;

Ok(())
}
Expand Down
12 changes: 0 additions & 12 deletions moq-relay/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,4 @@ impl moq_transport::MoqError for RelayError {
Self::WebTransportServer(_) => 500,
}
}

fn reason(&self) -> String {
match self {
Self::Transport(err) => format!("transport error: {}", err.reason()),
Self::Cache(err) => format!("cache error: {}", err.reason()),
Self::MoqApi(err) => format!("api error: {}", err),
Self::Url(err) => format!("url error: {}", err),
Self::MissingNode => "missing node".to_owned(),
Self::WebTransportServer(err) => format!("upstream server error: {}", err),
Self::WebTransportClient(err) => format!("upstream client error: {}", err),
}
}
}
4 changes: 2 additions & 2 deletions moq-relay/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ impl Session {
match role {
Role::Publisher => {
if let Err(err) = self.serve_publisher(id, request, &path).await {
log::warn!("error serving publisher: id={} path={} err={:#?}", id, path, err);
log::warn!("error serving publisher: id={} path={} err={}", id, path, err);
}
}
Role::Subscriber => {
if let Err(err) = self.serve_subscriber(id, request, &path).await {
log::warn!("error serving subscriber: id={} path={} err={:#?}", id, path, err);
log::warn!("error serving subscriber: id={} path={} err={}", id, path, err);
}
}
Role::Both => {
Expand Down
2 changes: 0 additions & 2 deletions moq-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ bytes = "1"
thiserror = "1"
tokio = { version = "1", features = ["macros", "io-util", "sync"] }
log = "0.4"
indexmap = "2"

quinn = "0.10"
webtransport-quinn = "0.6.1"
#webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" }

async-trait = "0.1"
paste = "1"

[dev-dependencies]
# QUIC
Expand Down
18 changes: 6 additions & 12 deletions moq-transport/src/cache/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub enum CacheError {
#[error("closed")]
Closed,

/// An ANNOUNCE_RESET or SUBSCRIBE_RESET was sent by the publisher.
/// A SUBSCRIBE_DONE or ANNOUNCE_CANCEL was received.
#[error("reset code={0:?}")]
Reset(u32),

Expand All @@ -24,6 +24,10 @@ pub enum CacheError {
/// A resource already exists with that ID.
#[error("duplicate")]
Duplicate,

/// We reported the wrong size for a fragment.
#[error("wrong size")]
WrongSize,
}

impl MoqError for CacheError {
Expand All @@ -35,17 +39,7 @@ impl MoqError for CacheError {
Self::Stop => 206,
Self::NotFound => 404,
Self::Duplicate => 409,
}
}

/// A reason that is sent over the wire.
fn reason(&self) -> String {
match self {
Self::Closed => "closed".to_owned(),
Self::Reset(code) => format!("reset code: {}", code),
Self::Stop => "stop".to_owned(),
Self::NotFound => "not found".to_owned(),
Self::Duplicate => "duplicate".to_owned(),
Self::WrongSize => 500,
}
}
}
30 changes: 25 additions & 5 deletions moq-transport/src/cache/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ pub struct Info {
// NOTE: These may be received out of order or with gaps.
pub sequence: VarInt,

// The size of the fragment, optionally None if this is the last fragment in a segment.
// TODO enforce this size.
pub size: Option<usize>,
// The size of the fragment.
pub size: usize,
}

struct State {
Expand Down Expand Up @@ -79,27 +78,48 @@ pub struct Publisher {
// Immutable segment state.
info: Arc<Info>,

// The amount of promised data that has yet to be written.
remain: usize,

// Closes the segment when all Publishers are dropped.
_dropped: Arc<Dropped>,
}

impl Publisher {
fn new(state: Watch<State>, info: Arc<Info>) -> Self {
let _dropped = Arc::new(Dropped::new(state.clone()));
Self { state, info, _dropped }
let remain = info.size;

Self {
state,
info,
remain,
_dropped,
}
}

/// Write a new chunk of bytes.
pub fn chunk(&mut self, chunk: Bytes) -> Result<(), CacheError> {
if chunk.len() > self.remain {
return Err(CacheError::WrongSize);
}
self.remain -= chunk.len();

let mut state = self.state.lock_mut();
state.closed.clone()?;
state.chunks.push(chunk);

Ok(())
}

/// Close the segment with an error.
pub fn close(self, err: CacheError) -> Result<(), CacheError> {
self.state.lock_mut().close(err)
self.state.lock_mut().close(err)?;
if self.remain != 0 {
Err(CacheError::WrongSize)
} else {
Ok(())
}
}
}

Expand Down
Loading

0 comments on commit 0d9c0ed

Please sign in to comment.