From c6d69c2cea591c576dcedf2c464c1c91ebc25032 Mon Sep 17 00:00:00 2001 From: Jeff Hansen Date: Mon, 20 Nov 2023 07:41:00 -0500 Subject: [PATCH] feat: add support for message attributes --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/api/parser.rs | 16 ++++- src/api/publisher.rs | 15 ++--- src/api/subscriber.rs | 5 +- src/collections/messages.rs | 4 +- src/subscriptions/outstanding.rs | 2 +- src/topics/topic_message.rs | 6 +- tests/subscriber_test.rs | 103 ++++++++++++++++++++++++++++++- tests/subscription_test.rs | 8 +-- tests/topic_test.rs | 2 +- 11 files changed, 140 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 90dc47b..684e707 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -237,7 +237,7 @@ checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" [[package]] name = "deltio" -version = "0.4.0" +version = "0.5.0" dependencies = [ "async-stream", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 87169f5..0cfb411 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltio" -version = "0.4.0" +version = "0.5.0" edition = "2021" authors = ["Jeff Hansen"] description = "A Google Cloud Pub/Sub emulator alternative for local testing and CI" diff --git a/src/api/parser.rs b/src/api/parser.rs index 5b8f815..e19ace1 100644 --- a/src/api/parser.rs +++ b/src/api/parser.rs @@ -1,12 +1,13 @@ use crate::api::page_token::PageToken; use crate::paging::Paging; use crate::pubsub_proto::push_config::AuthenticationMethod; -use crate::pubsub_proto::PushConfig as PushConfigProto; +use crate::pubsub_proto::{PubsubMessage, PushConfig as PushConfigProto}; use crate::subscriptions::{ AckDeadline, AckId, AckIdParseError, DeadlineModification, PushConfig, PushConfigOidcToken, SubscriptionName, }; -use crate::topics::TopicName; +use crate::topics::{TopicMessage, TopicName}; +use bytes::Bytes; use std::time::Duration; use tokio::time::Instant; use tonic::Status; @@ -139,3 +140,14 @@ pub(crate) fn parse_push_config(push_config_proto: &PushConfigProto) -> Result

TopicMessage { + let data = Bytes::from(message_proto.data.clone()); + let attributes = match message_proto.attributes.len() { + 0 => None, + _ => Some(message_proto.attributes.clone()), + }; + + TopicMessage::new(data, attributes) +} diff --git a/src/api/publisher.rs b/src/api/publisher.rs index 2442c3d..bd533dd 100644 --- a/src/api/publisher.rs +++ b/src/api/publisher.rs @@ -3,13 +3,12 @@ use crate::api::parser; use crate::pubsub_proto::publisher_server::Publisher; use crate::pubsub_proto::*; use crate::topics::topic_manager::TopicManager; +use crate::topics::TopicName; use crate::topics::{ CreateTopicError, DeleteError, GetTopicError, ListSubscriptionsError, ListTopicsError, PublishMessagesError, }; -use crate::topics::{TopicMessage, TopicName}; use crate::tracing::ActivitySpan; -use bytes::Bytes; use std::collections::HashMap; use std::sync::Arc; use tonic::{Request, Response, Status}; @@ -85,13 +84,11 @@ impl Publisher for PublisherService { let topic = self.get_topic_internal(&topic_name).await?; - let mut messages = Vec::with_capacity(request.messages.len()); - - for m in request.messages.iter() { - let data = Bytes::from(m.data.clone()); - let message = TopicMessage::new(data); - messages.push(message); - } + let messages = request + .messages + .iter() + .map(parser::parse_topic_message) + .collect::>(); let result = topic .publish_messages(messages) diff --git a/src/api/subscriber.rs b/src/api/subscriber.rs index ca47fae..eda3db7 100644 --- a/src/api/subscriber.rs +++ b/src/api/subscriber.rs @@ -606,11 +606,14 @@ fn map_to_received_message(m: &PulledMessage) -> ReceivedMessage { message: { let message = m.message(); Some(PubsubMessage { - attributes: Default::default(), publish_time: Some(prost_types::Timestamp::from(message.published_at)), ordering_key: String::default(), message_id: message.id.to_string(), data: message.data.to_vec(), + attributes: match &message.attributes { + Some(attrs) => attrs.clone(), + None => Default::default(), + }, }) }, } diff --git a/src/collections/messages.rs b/src/collections/messages.rs index d0abf67..26a92bd 100644 --- a/src/collections/messages.rs +++ b/src/collections/messages.rs @@ -61,7 +61,7 @@ mod tests { let ids = vec![MessageId::new(1, 1), MessageId::new(1, 2)]; let iter = ids.iter().enumerate().map(|(i, id)| { - let mut m = TopicMessage::new(vec![i as u8].into()); + let mut m = TopicMessage::new(vec![i as u8].into(), None); m.publish(*id, std::time::SystemTime::now()); Arc::new(m) }); @@ -100,7 +100,7 @@ mod tests { fn new_message(data_value: u8) -> Arc { let id = MessageId::new(1, rand::random()); - let mut message = TopicMessage::new(vec![data_value].into()); + let mut message = TopicMessage::new(vec![data_value].into(), None); message.publish(id, std::time::SystemTime::now()); Arc::new(message) } diff --git a/src/subscriptions/outstanding.rs b/src/subscriptions/outstanding.rs index 55f0ff5..a2d3504 100644 --- a/src/subscriptions/outstanding.rs +++ b/src/subscriptions/outstanding.rs @@ -333,7 +333,7 @@ mod tests { /// Helper for creating a pulled message. fn new_pulled_message(ack_id: u64, time: u64) -> PulledMessage { - let message = Arc::new(TopicMessage::new(Bytes::from("hello"))); + let message = Arc::new(TopicMessage::new(Bytes::from("hello"), None)); let ack_id = AckId::new(ack_id); let deadline = deadline_for(time); let delivery_attempt = 1; diff --git a/src/topics/topic_message.rs b/src/topics/topic_message.rs index 83ee7ac..3fd7651 100644 --- a/src/topics/topic_message.rs +++ b/src/topics/topic_message.rs @@ -1,5 +1,7 @@ use bytes::Bytes; +use std::collections::HashMap; use std::fmt::{Display, Formatter}; +use std::hash::Hash; use std::time::SystemTime; /// Represents a published message to a topic. @@ -8,13 +10,15 @@ pub struct TopicMessage { pub id: MessageId, pub published_at: SystemTime, pub data: Bytes, + pub attributes: Option>, } impl TopicMessage { /// Creates a new `TopicMessage` from the data. - pub fn new(data: Bytes) -> Self { + pub fn new(data: Bytes, attributes: Option>) -> Self { Self { data, + attributes, id: MessageId::default(), published_at: SystemTime::UNIX_EPOCH, } diff --git a/tests/subscriber_test.rs b/tests/subscriber_test.rs index 0d22dd1..6be7558 100644 --- a/tests/subscriber_test.rs +++ b/tests/subscriber_test.rs @@ -1,6 +1,6 @@ use deltio::pubsub_proto::{ - DeleteSubscriptionRequest, GetSubscriptionRequest, ListSubscriptionsRequest, PullRequest, - StreamingPullResponse, + DeleteSubscriptionRequest, GetSubscriptionRequest, ListSubscriptionsRequest, PublishRequest, + PubsubMessage, PullRequest, StreamingPullResponse, }; use deltio::subscriptions::SubscriptionName; use deltio::topics::TopicName; @@ -248,6 +248,105 @@ async fn test_streaming_pull() { server.dispose().await; } +#[tokio::test] +async fn test_streaming_pull_message_attributes() { + let mut server = TestHost::start().await.unwrap(); + + // Create a topic to subscribe to. + let topic_name = TopicName::new("test", "topic"); + server.create_topic_with_name(&topic_name).await; + + // Create a subscription. + let subscription_name = SubscriptionName::new("test", "subscription"); + server + .create_subscription_with_name(&topic_name, &subscription_name) + .await; + + // Start polling for messages. + let (sender, mut inbound) = server.streaming_pull(&subscription_name).await; + + // Publish some messages with attributes, some without. + server + .publisher + .publish(PublishRequest { + topic: topic_name.to_string(), + messages: vec![ + PubsubMessage { + publish_time: None, + attributes: vec![ + ("Attr1".to_string(), "Value1".to_string()), + ("Attr2".to_string(), "Value2".to_string()), + ] + .into_iter() + .collect(), + message_id: Default::default(), + ordering_key: Default::default(), + data: "Hello".as_bytes().to_vec(), + }, + PubsubMessage { + publish_time: None, + attributes: vec![("Super".to_string(), "Cool".to_string())] + .into_iter() + .collect(), + message_id: Default::default(), + ordering_key: Default::default(), + data: "World".as_bytes().to_vec(), + }, + PubsubMessage { + publish_time: None, + attributes: Default::default(), + message_id: Default::default(), + ordering_key: Default::default(), + data: "No attrs".as_bytes().to_vec(), + }, + ], + }) + .await + .unwrap(); + + let pull_response = inbound.next().await.unwrap().unwrap(); + assert_eq!(3, pull_response.received_messages.len()); + + // Assert that the messages contain the expected attributes. + let message = pull_response.received_messages[0].message.clone().unwrap(); + assert_eq!( + "Hello".to_string(), + String::from_utf8(message.data.clone()).unwrap() + ); + assert_eq!(message.attributes.len(), 2); + assert_eq!( + Some("Value1".to_string()), + message.attributes.get("Attr1").cloned(), + ); + assert_eq!( + Some("Value2".to_string()), + message.attributes.get("Attr2").cloned(), + ); + + let message = pull_response.received_messages[1].message.clone().unwrap(); + assert_eq!( + "World".to_string(), + String::from_utf8(message.data.clone()).unwrap() + ); + assert_eq!(message.attributes.len(), 1); + assert_eq!( + Some("Cool".to_string()), + message.attributes.get("Super").cloned(), + ); + + let message = pull_response.received_messages[2].message.clone().unwrap(); + assert_eq!( + "No attrs".to_string(), + String::from_utf8(message.data.clone()).unwrap() + ); + assert!(message.attributes.is_empty()); + + // Drop the streaming calls so the shutdown won't wait for them. + drop(sender); + drop(inbound); + server.dispose().await; +} + #[tokio::test] async fn test_streaming_pull_deadline_extension() { // Pause time since we will be advancing it ourselves. diff --git a/tests/subscription_test.rs b/tests/subscription_test.rs index ac16af9..b0fe2b2 100644 --- a/tests/subscription_test.rs +++ b/tests/subscription_test.rs @@ -21,8 +21,8 @@ async fn pulling_messages() { // Publish messages. topic .publish_messages(vec![ - TopicMessage::new(Bytes::from("meow")), - TopicMessage::new(Bytes::from("can haz cheezburger?")), + TopicMessage::new(Bytes::from("meow"), None), + TopicMessage::new(Bytes::from("can haz cheezburger?"), None), ]) .await .unwrap(); @@ -74,8 +74,8 @@ async fn nack_messages() { // Publish messages. topic .publish_messages(vec![ - TopicMessage::new(Bytes::from("meow")), - TopicMessage::new(Bytes::from("can haz cheezburger?")), + TopicMessage::new(Bytes::from("meow"), None), + TopicMessage::new(Bytes::from("can haz cheezburger?"), None), ]) .await .unwrap(); diff --git a/tests/topic_test.rs b/tests/topic_test.rs index a7e5343..8d04f8e 100644 --- a/tests/topic_test.rs +++ b/tests/topic_test.rs @@ -25,7 +25,7 @@ async fn delete_topic() { // Publish a message to the topic, but don't pull the subscription yet. topic - .publish_messages(vec![TopicMessage::new(Bytes::from("hello"))]) + .publish_messages(vec![TopicMessage::new(Bytes::from("hello"), None)]) .await .unwrap();