Skip to content

Commit

Permalink
Add Support for connection and subscription headers
Browse files Browse the repository at this point in the history
  • Loading branch information
snaggen committed Oct 29, 2024
1 parent 0161345 commit 2929b31
Show file tree
Hide file tree
Showing 9 changed files with 378 additions and 107 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ futures = "0.3"
tokio = { version = "1", features = ["net"] }
tokio-util = { version = "0.7", features = ["codec"] }
winnow = "0.6"
typed-builder = "0.20.0"

[dev-dependencies]
tokio = { version = "1", features = ["time", "macros", "rt-multi-thread"] }
59 changes: 41 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,43 +9,66 @@ Sending a message to a queue.

```rust
use futures::prelude::*;
use async_stomp::client;
use async_stomp::client::Connector;
use async_stomp::ToServer;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
let mut conn = client::connect("127.0.0.1:61613", None, None).await.unwrap();

conn.send(
ToServer::Send {
let mut conn = Connector::builder()
.server("127.0.0.1:61613")
.virtualhost("/")
.login("guest".to_string())
.passcode("guest".to_string())
.connect()
.await
.unwrap();

conn.send(
ToServer::Send {
destination: "queue.test".into(),
transaction: None,
headers: vec!(),
headers: None,
body: Some(b"Hello there rustaceans!".to_vec()),
}
.into(),
)
.await.expect("sending message to server");
Ok(())
}
.into(),
)
.await
.expect("sending message to server");
Ok(())
}
```

Receiving a message from a queue.

```rust
use futures::prelude::*;
use async_stomp::client;
use async_stomp::client::Connector;
use async_stomp::client::Subscriber;
use async_stomp::FromServer;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
let mut conn = client::connect("127.0.0.1:61613", None, None).await.unwrap();
conn.send(client::subscribe("queue.test", "custom-subscriber-id")).await.unwrap();
let mut conn = Connector::builder()
.server("127.0.0.1:61613")
.virtualhost("/")
.login("guest".to_string())
.passcode("guest".to_string())
.connect()
.await
.unwrap();

let subscribe = Subscriber::builder()
.destination("queue.test")
.id("custom-subscriber-id")
.subscribe();

conn.send(subscribe)
.await
.unwrap();

while let Some(item) = conn.next().await {
if let FromServer::Message { message_id,body, .. } = item.unwrap().content {
println!("{:?}", body);
println!("{}", message_id);
if let FromServer::Message { message_id, body, .. } = item.unwrap().content {
println!("{:?}", body);
println!("{}", message_id);
}
}
Ok(())
Expand Down
23 changes: 14 additions & 9 deletions examples/connect.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::time::Duration;

use async_stomp::*;
use client::{Connector, Subscriber};
use futures::future::ok;
use futures::prelude::*;
use async_stomp::*;

// The example connects to a local server, then sends the following messages -
// subscribe to a destination, send a message to the destination, unsubscribe and disconnect
Expand All @@ -14,20 +15,24 @@ use async_stomp::*;

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let conn = client::connect(
"127.0.0.1:61613",
"/".to_string(),
"guest".to_string().into(),
"guest".to_string().into(),
)
.await?;
let conn = Connector::builder()
.server("127.0.0.1:61613")
.virtualhost("/")
.login("guest".to_string())
.passcode("guest".to_string())
.connect()
.await?;

tokio::time::sleep(Duration::from_millis(200)).await;

let (mut sink, stream) = conn.split();

let fut1 = async move {
sink.send(client::subscribe("rusty", "myid")).await?;
let subscribe = Subscriber::builder()
.destination("rusty")
.id("myid")
.subscribe();
sink.send(subscribe).await?;
println!("Subscribe sent");

tokio::time::sleep(Duration::from_millis(200)).await;
Expand Down
26 changes: 16 additions & 10 deletions examples/ping_pong.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::time::Duration;

use futures::prelude::*;
use async_stomp::*;
use client::{Connector, Subscriber};
use futures::prelude::*;

// This examples consists of two futures, each of which connects to a local server,
// and then sends either PING or PONG messages to the server while listening
Expand All @@ -11,14 +12,19 @@ use async_stomp::*;
// `docker run -p 61613:61613 rmohr/activemq:latest`

async fn client(listens: &str, sends: &str, msg: &[u8]) -> Result<(), anyhow::Error> {
let mut conn = client::connect(
"127.0.0.1:61613",
"/".to_string(),
"guest".to_string().into(),
"guest".to_string().into(),
)
.await?;
conn.send(client::subscribe(listens, "myid")).await?;
let mut conn = Connector::builder()
.server("127.0.0.1:61613")
.virtualhost("/")
.login("guest".to_string())
.passcode("guest".to_string())
.connect()
.await?;

let subscribe = Subscriber::builder()
.destination(listens)
.id("myid")
.subscribe();
conn.send(subscribe).await?;

loop {
conn.send(
Expand All @@ -33,7 +39,7 @@ async fn client(listens: &str, sends: &str, msg: &[u8]) -> Result<(), anyhow::Er
.await?;
let msg = conn.next().await.transpose()?;
if let Some(FromServer::Message { body, .. }) = msg.as_ref().map(|m| &m.content) {
println!("{}", String::from_utf8_lossy(&body.as_ref().unwrap()));
println!("{}", String::from_utf8_lossy(body.as_ref().unwrap()));
} else {
anyhow::bail!("Unexpected: {:?}", msg)
}
Expand Down
28 changes: 16 additions & 12 deletions examples/receive_message.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
use futures::prelude::*;
use async_stomp::client;
use async_stomp::client::Connector;
use async_stomp::client::Subscriber;
use async_stomp::FromServer;
use futures::prelude::*;

// You can start a simple STOMP server with docker:
// `docker run -p 61613:61613 -p 8161:8161 rmohr/activemq:latest`
// activemq web interface starts at: http://localhost:8161

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
let mut conn = client::connect(
"127.0.0.1:61613",
"/".to_string(),
"guest".to_string().into(),
"guest".to_string().into(),
)
.await
.unwrap();

conn.send(client::subscribe("queue.test", "custom-subscriber-id"))
let mut conn = Connector::builder()
.server("127.0.0.1:61613")
.virtualhost("/")
.login("guest".to_string())
.passcode("guest".to_string())
.connect()
.await
.unwrap();

let subscribe = Subscriber::builder()
.destination("queue.test")
.id("custom-subscriber-id")
.subscribe();

conn.send(subscribe).await.unwrap();

while let Some(item) = conn.next().await {
if let FromServer::Message {
message_id, body, ..
Expand Down
20 changes: 10 additions & 10 deletions examples/send_message.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use futures::prelude::*;
use async_stomp::client;
use async_stomp::client::Connector;
use async_stomp::ToServer;
use futures::prelude::*;

// You can start a simple STOMP server with docker:
// `docker run -p 61613:61613 -p 8161:8161 rmohr/activemq:latest`
// activemq web interface starts at: http://localhost:8161

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
let mut conn = client::connect(
"127.0.0.1:61613",
"/".to_string(),
"guest".to_string().into(),
"guest".to_string().into(),
)
.await
.unwrap();
let mut conn = Connector::builder()
.server("127.0.0.1:61613")
.virtualhost("/")
.login("guest".to_string())
.passcode("guest".to_string())
.connect()
.await
.unwrap();

conn.send(
ToServer::Send {
Expand Down
Loading

0 comments on commit 2929b31

Please sign in to comment.