Skip to content

Commit

Permalink
feat: add support for message attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffijoe committed Nov 20, 2023
1 parent 99cbd13 commit c6d69c2
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltio"
version = "0.4.0"
version = "0.5.0"
edition = "2021"
authors = ["Jeff Hansen"]
description = "A Google Cloud Pub/Sub emulator alternative for local testing and CI"
Expand Down
16 changes: 14 additions & 2 deletions src/api/parser.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::api::page_token::PageToken;
use crate::paging::Paging;
use crate::pubsub_proto::push_config::AuthenticationMethod;
use crate::pubsub_proto::PushConfig as PushConfigProto;
use crate::pubsub_proto::{PubsubMessage, PushConfig as PushConfigProto};
use crate::subscriptions::{
AckDeadline, AckId, AckIdParseError, DeadlineModification, PushConfig, PushConfigOidcToken,
SubscriptionName,
};
use crate::topics::TopicName;
use crate::topics::{TopicMessage, TopicName};
use bytes::Bytes;
use std::time::Duration;
use tokio::time::Instant;
use tonic::Status;
Expand Down Expand Up @@ -139,3 +140,14 @@ pub(crate) fn parse_push_config(push_config_proto: &PushConfigProto) -> Result<P

Ok(PushConfig::new(endpoint, oidc_token, attributes))
}

/// Parses a `TopicMessage`.
pub(crate) fn parse_topic_message(message_proto: &PubsubMessage) -> TopicMessage {
let data = Bytes::from(message_proto.data.clone());
let attributes = match message_proto.attributes.len() {
0 => None,
_ => Some(message_proto.attributes.clone()),
};

TopicMessage::new(data, attributes)
}
15 changes: 6 additions & 9 deletions src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ use crate::api::parser;
use crate::pubsub_proto::publisher_server::Publisher;
use crate::pubsub_proto::*;
use crate::topics::topic_manager::TopicManager;
use crate::topics::TopicName;
use crate::topics::{
CreateTopicError, DeleteError, GetTopicError, ListSubscriptionsError, ListTopicsError,
PublishMessagesError,
};
use crate::topics::{TopicMessage, TopicName};
use crate::tracing::ActivitySpan;
use bytes::Bytes;
use std::collections::HashMap;
use std::sync::Arc;
use tonic::{Request, Response, Status};
Expand Down Expand Up @@ -85,13 +84,11 @@ impl Publisher for PublisherService {

let topic = self.get_topic_internal(&topic_name).await?;

let mut messages = Vec::with_capacity(request.messages.len());

for m in request.messages.iter() {
let data = Bytes::from(m.data.clone());
let message = TopicMessage::new(data);
messages.push(message);
}
let messages = request
.messages
.iter()
.map(parser::parse_topic_message)
.collect::<Vec<_>>();

let result = topic
.publish_messages(messages)
Expand Down
5 changes: 4 additions & 1 deletion src/api/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,11 +606,14 @@ fn map_to_received_message(m: &PulledMessage) -> ReceivedMessage {
message: {
let message = m.message();
Some(PubsubMessage {
attributes: Default::default(),
publish_time: Some(prost_types::Timestamp::from(message.published_at)),
ordering_key: String::default(),
message_id: message.id.to_string(),
data: message.data.to_vec(),
attributes: match &message.attributes {
Some(attrs) => attrs.clone(),
None => Default::default(),
},
})
},
}
Expand Down
4 changes: 2 additions & 2 deletions src/collections/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ mod tests {
let ids = vec![MessageId::new(1, 1), MessageId::new(1, 2)];

let iter = ids.iter().enumerate().map(|(i, id)| {
let mut m = TopicMessage::new(vec![i as u8].into());
let mut m = TopicMessage::new(vec![i as u8].into(), None);
m.publish(*id, std::time::SystemTime::now());
Arc::new(m)
});
Expand Down Expand Up @@ -100,7 +100,7 @@ mod tests {

fn new_message(data_value: u8) -> Arc<TopicMessage> {
let id = MessageId::new(1, rand::random());
let mut message = TopicMessage::new(vec![data_value].into());
let mut message = TopicMessage::new(vec![data_value].into(), None);
message.publish(id, std::time::SystemTime::now());
Arc::new(message)
}
Expand Down
2 changes: 1 addition & 1 deletion src/subscriptions/outstanding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ mod tests {

/// Helper for creating a pulled message.
fn new_pulled_message(ack_id: u64, time: u64) -> PulledMessage {
let message = Arc::new(TopicMessage::new(Bytes::from("hello")));
let message = Arc::new(TopicMessage::new(Bytes::from("hello"), None));
let ack_id = AckId::new(ack_id);
let deadline = deadline_for(time);
let delivery_attempt = 1;
Expand Down
6 changes: 5 additions & 1 deletion src/topics/topic_message.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use bytes::Bytes;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::hash::Hash;
use std::time::SystemTime;

/// Represents a published message to a topic.
Expand All @@ -8,13 +10,15 @@ pub struct TopicMessage {
pub id: MessageId,
pub published_at: SystemTime,
pub data: Bytes,
pub attributes: Option<HashMap<String, String>>,
}

impl TopicMessage {
/// Creates a new `TopicMessage` from the data.
pub fn new(data: Bytes) -> Self {
pub fn new(data: Bytes, attributes: Option<HashMap<String, String>>) -> Self {
Self {
data,
attributes,
id: MessageId::default(),
published_at: SystemTime::UNIX_EPOCH,
}
Expand Down
103 changes: 101 additions & 2 deletions tests/subscriber_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use deltio::pubsub_proto::{
DeleteSubscriptionRequest, GetSubscriptionRequest, ListSubscriptionsRequest, PullRequest,
StreamingPullResponse,
DeleteSubscriptionRequest, GetSubscriptionRequest, ListSubscriptionsRequest, PublishRequest,
PubsubMessage, PullRequest, StreamingPullResponse,
};
use deltio::subscriptions::SubscriptionName;
use deltio::topics::TopicName;
Expand Down Expand Up @@ -248,6 +248,105 @@ async fn test_streaming_pull() {
server.dispose().await;
}

#[tokio::test]
async fn test_streaming_pull_message_attributes() {
let mut server = TestHost::start().await.unwrap();

// Create a topic to subscribe to.
let topic_name = TopicName::new("test", "topic");
server.create_topic_with_name(&topic_name).await;

// Create a subscription.
let subscription_name = SubscriptionName::new("test", "subscription");
server
.create_subscription_with_name(&topic_name, &subscription_name)
.await;

// Start polling for messages.
let (sender, mut inbound) = server.streaming_pull(&subscription_name).await;

// Publish some messages with attributes, some without.
server
.publisher
.publish(PublishRequest {
topic: topic_name.to_string(),
messages: vec![
PubsubMessage {
publish_time: None,
attributes: vec![
("Attr1".to_string(), "Value1".to_string()),
("Attr2".to_string(), "Value2".to_string()),
]
.into_iter()
.collect(),
message_id: Default::default(),
ordering_key: Default::default(),
data: "Hello".as_bytes().to_vec(),
},
PubsubMessage {
publish_time: None,
attributes: vec![("Super".to_string(), "Cool".to_string())]
.into_iter()
.collect(),
message_id: Default::default(),
ordering_key: Default::default(),
data: "World".as_bytes().to_vec(),
},
PubsubMessage {
publish_time: None,
attributes: Default::default(),
message_id: Default::default(),
ordering_key: Default::default(),
data: "No attrs".as_bytes().to_vec(),
},
],
})
.await
.unwrap();

let pull_response = inbound.next().await.unwrap().unwrap();
assert_eq!(3, pull_response.received_messages.len());

// Assert that the messages contain the expected attributes.
let message = pull_response.received_messages[0].message.clone().unwrap();
assert_eq!(
"Hello".to_string(),
String::from_utf8(message.data.clone()).unwrap()
);
assert_eq!(message.attributes.len(), 2);
assert_eq!(
Some("Value1".to_string()),
message.attributes.get("Attr1").cloned(),
);
assert_eq!(
Some("Value2".to_string()),
message.attributes.get("Attr2").cloned(),
);

let message = pull_response.received_messages[1].message.clone().unwrap();
assert_eq!(
"World".to_string(),
String::from_utf8(message.data.clone()).unwrap()
);
assert_eq!(message.attributes.len(), 1);
assert_eq!(
Some("Cool".to_string()),
message.attributes.get("Super").cloned(),
);

let message = pull_response.received_messages[2].message.clone().unwrap();
assert_eq!(
"No attrs".to_string(),
String::from_utf8(message.data.clone()).unwrap()
);
assert!(message.attributes.is_empty());

// Drop the streaming calls so the shutdown won't wait for them.
drop(sender);
drop(inbound);
server.dispose().await;
}

#[tokio::test]
async fn test_streaming_pull_deadline_extension() {
// Pause time since we will be advancing it ourselves.
Expand Down
8 changes: 4 additions & 4 deletions tests/subscription_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ async fn pulling_messages() {
// Publish messages.
topic
.publish_messages(vec![
TopicMessage::new(Bytes::from("meow")),
TopicMessage::new(Bytes::from("can haz cheezburger?")),
TopicMessage::new(Bytes::from("meow"), None),
TopicMessage::new(Bytes::from("can haz cheezburger?"), None),
])
.await
.unwrap();
Expand Down Expand Up @@ -74,8 +74,8 @@ async fn nack_messages() {
// Publish messages.
topic
.publish_messages(vec![
TopicMessage::new(Bytes::from("meow")),
TopicMessage::new(Bytes::from("can haz cheezburger?")),
TopicMessage::new(Bytes::from("meow"), None),
TopicMessage::new(Bytes::from("can haz cheezburger?"), None),
])
.await
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion tests/topic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn delete_topic() {

// Publish a message to the topic, but don't pull the subscription yet.
topic
.publish_messages(vec![TopicMessage::new(Bytes::from("hello"))])
.publish_messages(vec![TopicMessage::new(Bytes::from("hello"), None)])
.await
.unwrap();

Expand Down

0 comments on commit c6d69c2

Please sign in to comment.