Skip to content

Commit

Permalink
Gstreamer support (#239)
Browse files Browse the repository at this point in the history
* WIP

* WIP

* ported to task

* Works on my machine (c) (r)

* adds gstreamer + glib to CI/CD

* some cleanup

* chatgpt imaginary package

* Also install gstreamer with macos

* Continuing for windows

* Clippy

* clippy

* added pkg-config on windows

* Trying to fix windows

* Adding dependency deprecation

* [CHORE] (cu_aligner) bump circular_buffer from 0.1.9 to 1.0.0 (#238)

* [FIX] (CI) rust components of weekly

* Added Readme

* Moar Windows

* missing subdir bin

* use a plugin to install gstreamer

* remove the id, maybe that' s the issue?

* This was a doc bug on their part

* maybe it is not needed on macos?

* try 22.04

* ignore the test that needs HW

* try to split arm64 for macos from x86 for the rest

* clashing ids

* Adds debug info for macos

* try to force the path for macOS

---------

Co-authored-by: Yang Zhou <[email protected]>
  • Loading branch information
gbin and makeecat authored Feb 6, 2025
1 parent 651ea51 commit 083de5a
Show file tree
Hide file tree
Showing 10 changed files with 409 additions and 4 deletions.
64 changes: 61 additions & 3 deletions .github/workflows/general.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ jobs:

runs-on: ${{ matrix.os }}

env:
DYLD_LIBRARY_PATH: /Library/Frameworks/GStreamer.framework/Versions/Current/lib:$DYLD_LIBRARY_PATH

strategy:
matrix:
os: [ ubuntu-latest, macos-latest, windows-latest ]
os: [ ubuntu-22.04, macos-latest, windows-latest ]
mode: [ debug, release, cuda-release ]

steps:
Expand Down Expand Up @@ -53,16 +56,71 @@ jobs:

- name: Install dependencies (Linux)
if: runner.os == 'Linux'
run: sudo apt-get update && sudo apt-get install -y libudev-dev libpcap-dev
run: sudo apt-get update && sudo apt-get install -y pkg-config libudev-dev libpcap-dev libglib2.0-dev

- name: Install dependencies (Windows)
if: runner.os == 'Windows'
run: |
# Verify winget availability
if (!(Get-Command winget -ErrorAction SilentlyContinue)) {
Write-Host "winget is not installed or available."
exit 1
}
# Install PCAP SDK
Invoke-WebRequest -Uri https://npcap.com/dist/npcap-sdk-1.13.zip -OutFile npcap-sdk.zip
Expand-Archive -Path npcap-sdk.zip -DestinationPath $env:USERPROFILE\npcap-sdk
Remove-Item npcap-sdk.zip
# Install Win10Pcap (if winget is available)
winget install DaiyuuNobori.Win10Pcap --accept-source-agreements --accept-package-agreements
Add-Content -Path $env:GITHUB_ENV -Value "LIB=$env:USERPROFILE\npcap-sdk\Lib\x64"
# Set PCAP library path
echo "LIB=$env:USERPROFILE\npcap-sdk\Lib\x64" | Out-File -Append -Encoding utf8 $env:GITHUB_ENV
- name: Setup GStreamer (x86_64)
if: runner.os != 'MacOs'
id: setup_gstreamer_mac
uses: blinemedical/[email protected]
with:
arch: 'x86_64'

- name: Setup GStreamer
if: runner.os == 'MacOS'
id: setup_gstreamer_other
uses: blinemedical/[email protected]
with:
arch: 'arm64'

- name: Debug GStreamer Installation
if: runner.os == 'macOS'
run: |
echo "Checking GStreamer version..."
pkg-config --modversion gstreamer-1.0 || { echo "GStreamer not found!"; }
echo "Checking GStreamer library path..."
if [[ -d "/Library/Frameworks/GStreamer.framework" ]]; then
echo "GStreamer is installed at /Library/Frameworks/GStreamer.framework"
else
echo "GStreamer framework is missing!"
fi
echo "Checking GStreamer shared libraries..."
ls -l /Library/Frameworks/GStreamer.framework/Versions/Current/lib/ | grep libgstapp || { echo "libgstapp missing!"; }
echo "Checking PKG_CONFIG_PATH..."
echo "PKG_CONFIG_PATH=$PKG_CONFIG_PATH"
echo "Checking DYLD_LIBRARY_PATH..."
echo "DYLD_LIBRARY_PATH=$DYLD_LIBRARY_PATH"
echo "Checking linked libraries for cu_gstreamer..."
if ls target/debug/deps/cu_gstreamer-* 1> /dev/null 2>&1; then
otool -L target/debug/deps/cu_gstreamer-* | grep libgstapp || { echo "libgstapp is not linked!"; }
else
echo "cu_gstreamer binary not found, skipping otool check."
fi
- name: Install CUDA
uses: Jimver/cuda-toolkit@master
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ members = [
"examples/cu_multisources",
"examples/cu_pointclouds",
"examples/cu_rp_balancebot",
"examples/cu_standalone_structlog",
"examples/cu_standalone_structlog", "components/sources/cu_gstreamer",
]

# put only the core crates here that are not platform specific
Expand Down
3 changes: 3 additions & 0 deletions components/payloads/cu_sensor_payloads/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ uom = { workspace = true }
derive_more = { workspace = true }
image = { version = "0.25.5", optional = true }
kornia = { version = "0.1.8", optional = true }
gstreamer = { version = "0.23.4", optional = true }
gstreamer-app = { version = "0.23.4", optional = true }

[features]
image = ["dep:image"]
kornia = ["dep:kornia"]
gst = ["dep:gstreamer", "dep:gstreamer-app"]
26 changes: 26 additions & 0 deletions components/sources/cu_gstreamer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "cu-gstreamer"
description = "This is a Copper GStreamer sink."

version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
keywords.workspace = true
categories.workspace = true
homepage.workspace = true
repository.workspace = true

[dependencies]
cu29 = { workspace = true }
bincode = { workspace = true }
circular-buffer = "1.0.0"
gstreamer = "0.23.4"
gstreamer-app = "0.23.4"

[dev-dependencies]
rerun = { workspace = true }
cu29-helpers = { workspace = true }

[build-dependencies]
cfg_aliases = "0.2.1"
32 changes: 32 additions & 0 deletions components/sources/cu_gstreamer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# GStreamer Appsink Source for Copper

## Compatibility

It should be broad, any buffer you can send to a gstreamer Sink should be able to be handled on the Copper Side.

## Usage

Add the driver like any other source in Copper:

```RON
tasks: [
(
id: "src",
type: "cu_gstreamer::CuDefaultGStreamer", // the default is a pool of 8 images, you can define your own type to change this pool size.
config: { // a gstreamer pipeline example that takes a mjpeg webcam and converts it to NV12. IMPORTANT: the appsink has to be named "copper"
"pipeline": "v4l2src device=/dev/video0 ! image/jpeg, width=1920, height=1080 ! jpegdec ! videoconvert ! video/x-raw, format=NV12 ! appsink name=copper",
"caps": "video/x-raw, format=NV12, width=1920, height=1080", // this is what the Copper source will accept, here we just match the pipeline we define.
},
),
],
```

When you connect this driver to the rest of the system you need to use the `cu_gstreamer::CuGstBuffer` message type.

```RON
cnx: [
(src: "src", dst: "dst", msg: "cu_gstreamer::CuGstBuffer"),
],
```

You can see a small test application in the `tests/` folder on github.
11 changes: 11 additions & 0 deletions components/sources/cu_gstreamer/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use cfg_aliases::cfg_aliases;
fn main() {
println!(
"cargo:rustc-env=LOG_INDEX_DIR={}",
std::env::var("OUT_DIR").unwrap()
);
cfg_aliases! {
hardware: { all(target_os = "linux", not(feature = "mock")) },
mock: { any(not(target_os = "linux"), feature = "mock") },
}
}
167 changes: 167 additions & 0 deletions components/sources/cu_gstreamer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
use cu29::prelude::*;
use gstreamer::prelude::*;

use bincode::de::Decoder;
use bincode::enc::Encoder;
use bincode::error::{DecodeError, EncodeError};
use bincode::{Decode, Encode};
use circular_buffer::CircularBuffer;
use gstreamer::{parse, Buffer, BufferRef, Caps, FlowSuccess, Pipeline};
use gstreamer_app::{AppSink, AppSinkCallbacks};
use std::fmt::Debug;
use std::ops::{Deref, DerefMut};
use std::str::FromStr;
use std::sync::{Arc, Mutex};

#[derive(Debug, Clone, Default)]
pub struct CuGstBuffer(Buffer);

impl Deref for CuGstBuffer {
type Target = Buffer;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl DerefMut for CuGstBuffer {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl Decode for CuGstBuffer {
fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, DecodeError> {
let vec: Vec<u8> = Vec::decode(decoder)?;
let buffer = Buffer::from_slice(vec);
Ok(CuGstBuffer(buffer))
}
}

impl Encode for CuGstBuffer {
fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
self.0
.as_ref()
.map_readable()
.map_err(|_| EncodeError::Other("Could not map readable"))?
.encode(encoder)
}
}

pub type CuDefaultGStreamer = CuGStreamer<8>;

pub struct CuGStreamer<const N: usize> {
pipeline: Pipeline,
circular_buffer: Arc<Mutex<CircularBuffer<N, CuGstBuffer>>>,
_appsink: AppSink,
}

impl<const N: usize> Freezable for CuGStreamer<N> {}

impl<'cl, const N: usize> CuSrcTask<'cl> for CuGStreamer<N> {
type Output = output_msg!('cl, CuGstBuffer);

fn new(config: Option<&ComponentConfig>) -> CuResult<Self>
where
Self: Sized,
{
if !gstreamer::INITIALIZED.load(std::sync::atomic::Ordering::SeqCst) {
gstreamer::init()
.map_err(|e| CuError::new_with_cause("Failed to initialize gstreamer.", e))?;
} else {
debug!("Gstreamer already initialized.");
}

let config = config.ok_or_else(|| CuError::from("No config provided."))?;

let pipeline = if let Some(pipeline_str) = config.get::<String>("pipeline") {
debug!("Creating with pipeline: {}", &pipeline_str);
let pipeline = parse::launch(pipeline_str.as_str())
.map_err(|e| CuError::new_with_cause("Failed to parse pipeline.", e))?;
Ok(pipeline)
} else {
Err(CuError::from("No pipeline provided."))
}?;
let caps_str = if let Some(caps_str) = config.get::<String>("caps") {
debug!("Creating with caps: {}", &caps_str);
Ok(caps_str)
} else {
Err(CuError::from(
"No Caps (ie format for example \"video/x-raw, format=NV12, width=1920, height=1080\") provided for the appsink element.",
))
}?;

let pipeline = pipeline
.dynamic_cast::<Pipeline>()
.map_err(|_| CuError::from("Failed to cast pipeline to gstreamer::Pipeline."))?;

let appsink = pipeline.by_name("copper").ok_or::<CuError>("Failed to get find the \"appsink\" element in the pipeline string, be sure you have an appsink name=copper to feed this task.".into())?;
let appsink = appsink
.dynamic_cast::<AppSink>()
.map_err(|_| CuError::from("Failed to cast appsink to gstreamer::AppSink."))?;
let caps = Caps::from_str(caps_str.as_str())
.map_err(|e| CuError::new_with_cause("Failed to create caps for appsink.", e))?;

appsink.set_caps(Some(&caps));

let circular_buffer = Arc::new(Mutex::new(CircularBuffer::new()));

// Configure `appsink` to handle incoming buffers
appsink.set_callbacks(
AppSinkCallbacks::builder()
.new_sample({
let circular_buffer = circular_buffer.clone();
move |appsink| {
let sample = appsink
.pull_sample()
.map_err(|_| gstreamer::FlowError::Eos)?;
let buffer: &BufferRef =
sample.buffer().ok_or(gstreamer::FlowError::Error)?;
circular_buffer
.lock()
.unwrap()
.push_back(CuGstBuffer(buffer.to_owned()));

Ok(FlowSuccess::Ok)
}
})
.build(),
);

let s = CuGStreamer {
pipeline,
circular_buffer,
_appsink: appsink,
};
Ok(s)
}

fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
self.circular_buffer.lock().unwrap().clear();
self.pipeline
.set_state(gstreamer::State::Playing)
.map_err(|e| CuError::new_with_cause("Failed to start the gstreamer pipeline.", e))?;
Ok(())
}

fn process(&mut self, _clock: &RobotClock, new_msg: Self::Output) -> CuResult<()> {
let mut circular_buffer = self.circular_buffer.lock().unwrap();
if let Some(buffer) = circular_buffer.pop_front() {
// TODO: timing metadata
new_msg.set_payload(buffer);
} else {
debug!("Empty circular buffer.");
}
Ok(())
}

fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
self.pipeline
.set_state(gstreamer::State::Null)
.map_err(|e| CuError::new_with_cause("Failed to stop the gstreamer pipeline.", e))?;
self.circular_buffer.lock().unwrap().clear();
Ok(())
}
}

// No test here, see the integration tests.
21 changes: 21 additions & 0 deletions components/sources/cu_gstreamer/tests/copperconfig.ron
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
(
tasks: [
(
id: "src",
type: "cu_gstreamer::CuDefaultGStreamer",
config: { // my webcam produces mjpeg, this is just to emulate a more embedded format like NV12
"pipeline": "v4l2src device=/dev/video0 ! image/jpeg, width=1920, height=1080 ! jpegdec ! videoconvert ! video/x-raw, format=NV12 ! appsink name=copper",
"caps": "video/x-raw, format=NV12, width=1920, height=1080",
},
),
( id: "tester",
type: "GStreamerTester"
),
],
cnx: [
(src: "src", dst: "tester", msg: "cu_gstreamer::CuGstBuffer"),
],
logging: (
enable_task_logging: false
)
)
Loading

0 comments on commit 083de5a

Please sign in to comment.