Skip to content

Commit

Permalink
Move publishing to a module (#26)
Browse files Browse the repository at this point in the history
This change puts all the publishing machinery into a `publish` module,
and includes this module if the corresponding feature is enabled (which
it is by default).

This will help to organize the crate better, especially as the consumer
half of the API will be added in the coming weeks.

Along with the module move, the `Message` type was renamed to
`EncodableMessage`, to make it clear that this is for encoding. A
corresponding DecodableMessage trait will be included with the consumer
API, using a separate trait to enable encoding borrowed types and
decoding into owned types.
  • Loading branch information
rnarubin authored Mar 10, 2021
1 parent 17ea5ff commit 8f9377e
Show file tree
Hide file tree
Showing 13 changed files with 263 additions and 236 deletions.
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ maintenance = { status = "actively-developed" }

[features]
default = ["sink"]

# Whether publishing is enabled
publish = []

# Publishers
google = ["base64", "yup-oauth2", "hyper", "http", "serde_json", "serde", "serde/derive", "uuid/serde"]

# Validators
json-schema = ["valico", "serde_json", "serde"]
protobuf = ["prost"]

sink = ["futures-util/sink", "either"]
sink = ["futures-util/sink", "either", "publish"]

[[example]]
name = "publish"
Expand Down
9 changes: 6 additions & 3 deletions examples/publish.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use futures_util::stream::StreamExt;
use hedwig::{publishers::GooglePubSubPublisher, Headers, Message, Publisher};
use hedwig::{
publish::{EncodableMessage, GooglePubSubPublisher, Publisher},
Headers,
};
use std::{env, sync::Arc, time::SystemTime};

#[derive(serde::Serialize)]
Expand All @@ -9,7 +12,7 @@ struct UserCreatedMessage {
user_id: String,
}

impl<'a> Message for &'a UserCreatedMessage {
impl<'a> EncodableMessage for &'a UserCreatedMessage {
type Error = hedwig::validators::JsonSchemaValidatorError;
type Validator = hedwig::validators::JsonSchemaValidator;
fn topic(&self) -> &'static str {
Expand Down Expand Up @@ -91,7 +94,7 @@ async fn run() -> Result<(), Box<dyn std::error::Error + 'static>> {
uuid: uuid::Uuid::new_v4(),
user_id: "U_123".into(),
};
let topic = Message::topic(&&message);
let topic = EncodableMessage::topic(&&message);
let validated = message.encode(&validator).unwrap();
let mut publish = publisher.publish(topic, [validated].iter());
while let Some(r) = publish.next().await {
Expand Down
204 changes: 9 additions & 195 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
//! user_id: String,
//! }
//!
//! impl<'a> hedwig::Message for &'a UserCreatedMessage {
//! impl<'a> hedwig::publish::EncodableMessage for &'a UserCreatedMessage {
//! type Error = hedwig::validators::JsonSchemaValidatorError;
//! type Validator = hedwig::validators::JsonSchemaValidator;
//! fn topic(&self) -> hedwig::Topic { "user.created" }
Expand All @@ -72,12 +72,12 @@
//! }
//!
//! let publisher = /* Some publisher */
//! # hedwig::publishers::NullPublisher;
//! # hedwig::publish::NullPublisher;
//! let validator = hedwig::validators::JsonSchemaValidator::new(schema)?;
//! let mut batch = hedwig::PublishBatch::new();
//! let mut batch = hedwig::publish::PublishBatch::new();
//! batch.message(&validator, &UserCreatedMessage { user_id: String::from("U_123") });
//! let mut result_stream = batch.publish(&publisher);
//! let mut next_batch = hedwig::PublishBatch::new();
//! let mut next_batch = hedwig::publish::PublishBatch::new();
//! async {
//! while let Some(result) = result_stream.next().await {
//! match result {
Expand Down Expand Up @@ -105,29 +105,18 @@
#![cfg_attr(not(test), deny(unused))]
#![cfg_attr(docsrs, feature(doc_cfg))]

use std::{
collections::BTreeMap,
pin::Pin,
task::{Context, Poll},
time::SystemTime,
};
use std::{collections::BTreeMap, time::SystemTime};

use futures_util::{
ready,
stream::{self, Stream},
};
use pin_project::pin_project;
use uuid::Uuid;

pub mod publishers;
#[cfg(feature = "publish")]
#[cfg_attr(docsrs, doc(cfg(feature = "publish")))]
pub mod publish;

#[cfg(test)]
mod tests;
pub mod validators;

#[cfg(feature = "sink")]
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
pub mod sink;

/// A message queue topic name to which messages can be published
pub type Topic = &'static str;

Expand All @@ -140,46 +129,6 @@ pub enum Error {
EncodeMessage(#[source] Box<dyn std::error::Error + Send + Sync>),
}

/// Message publishers.
///
/// Message publishers deliver a validated message to an endpoint, possibly a remote one. Message
/// publishers may also additionally validate a message for publisher-specific requirements (e.g.
/// size).
pub trait Publisher {
/// The identifier for a successfully published message.
type MessageId: 'static;

/// The error that this publisher returns when publishing of a message fails.
type MessageError: std::error::Error + Send + Sync + 'static;

/// The stream of results that the `publish` method returns.
type PublishStream: Stream<Item = Result<Self::MessageId, Self::MessageError>>;

/// Publish a batch of messages.
///
/// The output stream shall return a result for each message in `messages` slice in order.
fn publish<'a, I>(&self, topic: Topic, messages: I) -> Self::PublishStream
where
I: Iterator<Item = &'a ValidatedMessage> + DoubleEndedIterator + ExactSizeIterator;
}

/// Types that can be encoded and published.
pub trait Message {
/// The errors that can occur when calling the [`Message::encode`] method.
///
/// Will typically match the errors returned by the [`Message::Validator`].
type Error: std::error::Error + Send + Sync + 'static;

/// The validator to use for this message.
type Validator;

/// Topic into which this message shall be published.
fn topic(&self) -> Topic;

/// Encode the message payload.
fn encode(self, validator: &Self::Validator) -> Result<ValidatedMessage, Self::Error>;
}

/// Custom headers associated with a message.
pub type Headers = BTreeMap<String, String>;

Expand Down Expand Up @@ -236,138 +185,3 @@ impl ValidatedMessage {
&self.data
}
}

/// A convenience builder for publishing in batches.
#[derive(Default, Debug)]
pub struct PublishBatch {
messages: BTreeMap<Topic, Vec<ValidatedMessage>>,
}

impl PublishBatch {
/// Construct a new batch.
pub fn new() -> Self {
Self::default()
}

/// Number of messages currently queued.
pub fn len(&self) -> usize {
self.messages.iter().fold(0, |acc, (_, v)| acc + v.len())
}

/// Whether the batch is empty.
pub fn is_empty(&self) -> bool {
self.messages.iter().all(|(_, v)| v.is_empty())
}

/// Add an already validated message to be published in this batch.
pub fn push(&mut self, topic: Topic, validated: ValidatedMessage) -> &mut Self {
self.messages.entry(topic).or_default().push(validated);
self
}

/// Validate and add a message to be published in this batch.
pub fn message<M: Message>(
&mut self,
validator: &M::Validator,
msg: M,
) -> Result<&mut Self, Error> {
let topic = msg.topic();
let validated = msg
.encode(validator)
.map_err(|e| Error::EncodeMessage(e.into()))?;
Ok(self.push(topic, validated))
}

/// Publish all the enqueued messages, batching them for high efficiency.
///
/// The order in which messages were added to the batch and the order of messages as seen by
/// the publisher is not strictly preserved. As thus, the output stream will not preserve the
/// message ordering either.
///
/// Some kinds of errors that occur during publishing may not be transient. An example of such
/// an error is attempting to publish a too large message with the [`GooglePubSubPublisher`].
/// For
/// errors like these retrying is most likely incorrect as they would just fail again.
/// Publisher-specific error types may have methods to make a decision easier.
///
/// [`GooglePubSubPublisher`]: publishers::GooglePubSubPublisher
pub fn publish<P>(self, publisher: &P) -> PublishBatchStream<P::PublishStream>
where
P: Publisher,
P::PublishStream: Unpin,
{
PublishBatchStream(
self.messages
.into_iter()
.map(|(topic, msgs)| TopicPublishStream::new(topic, msgs, publisher))
.collect::<stream::SelectAll<_>>(),
)
}
}

/// The stream returned by the method [`PublishBatch::publish`](PublishBatch::publish)
// This stream and TopicPublishStream are made explicit types instead of combinators like
// map/zip/etc so that callers can refer to a concrete return type instead of `impl Stream`
#[pin_project]
#[derive(Debug)]
pub struct PublishBatchStream<P>(#[pin] stream::SelectAll<TopicPublishStream<P>>);

impl<P> Stream for PublishBatchStream<P>
where
P: Stream + Unpin,
{
type Item = (P::Item, Topic, ValidatedMessage);

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.project().0.poll_next(cx)
}
}

#[pin_project]
#[derive(Debug)]
struct TopicPublishStream<P> {
topic: Topic,
messages: std::vec::IntoIter<ValidatedMessage>,

#[pin]
publish_stream: P,
}

impl<P> TopicPublishStream<P> {
fn new<Pub>(topic: Topic, messages: Vec<ValidatedMessage>, publisher: &Pub) -> Self
where
Pub: Publisher<PublishStream = P>,
P: Stream<Item = Result<Pub::MessageId, Pub::MessageError>>,
{
let publish_stream = publisher.publish(topic, messages.iter());
Self {
topic,
messages: messages.into_iter(),
publish_stream,
}
}
}

impl<P> Stream for TopicPublishStream<P>
where
P: Stream,
{
type Item = (P::Item, Topic, ValidatedMessage);

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();

// `map` has lifetime constraints that aren't nice here
#[allow(clippy::manual_map)]
Poll::Ready(match ready!(this.publish_stream.poll_next(cx)) {
None => None,
Some(stream_item) => Some((
stream_item,
this.topic,
this.messages
.next()
.expect("should be as many messages as publishes"),
)),
})
}
}
Loading

0 comments on commit 8f9377e

Please sign in to comment.