Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jms synchronous call to RabbitMQ (with spring boot context) - setting amqp = true for tmp queues #1208

Open
adamlukaszewski opened this issue Aug 30, 2024 · 9 comments

Comments

@adamlukaszewski
Copy link

adamlukaszewski commented Aug 30, 2024

Citrus Version

4.1.0

Question

How can I enable amqp for temporary created RMQDestinations (jms-temp-queues) during a synchronous citrus invocation?

What I've tried so far

Some notes before:

  • The architecture of the application is based on messaging with RabbitMQ as the broker
  • The logic is implemented with the in-out pattern (synchronous call): The consumer is putting a message into the queue and waits for a response. That's what I want to test: Ship in a message --> let the application do whatever it has to do --> receive the finished response)
  • JSON will be used as message type.

The use case seems to be very simple. I have this test case:

@CitrusSpringSupport
@ContextConfiguration(classes = {CitrusSpringConfig.class, EndpointConfig.class})
public class TodoListIT {

    @CitrusEndpoint
    private JmsEndpoint personsQueueCreate;


    @Autowired
    private CitrusSpringConfig citrusSpringConfig;

    @Test
    @CitrusTest
    void testPost(@CitrusResource TestCaseRunner test) {
        test.variable("todoName", "citrus:concat('todo_', citrus:randomNumber(4))");
        test.variable("todoDescription", "Description: ${todoName}");

        // Use the endpoint configured without a specific destination, set headers in the send action
        test.$(SendMessageAction.Builder.send()
                        .endpoint(personsQueueCreate)
//                .fork(false)
                        .message()
                        .header(MP.MIDSA_PARAM_PERSON_ID.s(), "46b85fef-02a0-4f3f-bdf1-aaaa")
                        .header(MP.MIDSA_PARAM_BIOMETRIC_INCLUDED.s(), "false")
                        .header(MP.MIDSA_PARAM_MAX_RECENT_MOVEMENTS.s(), 1)
                        .body("{ \"title\": \"${todoName}\", \"description\": \"${todoDescription}\" }")
        );

        test.$(ReceiveMessageAction.Builder.receive()
                .endpoint(personsQueueCreate)
                .message()
                .body("\"Message received\""));
    }
}

Currently I am getting some when the temporary jms consumer (waiting for the response message) is getting the message. The stack shows:

com.rabbitmq.jms.util.RMQJMSException: invalid stream header: 7B227575
org.citrusframework.exceptions.TestCaseFailedException: com.rabbitmq.jms.util.RMQJMSException: invalid stream header: 7B227575
	at org.citrusframework.DefaultTestCase.executeAction(DefaultTestCase.java:146)
	at org.citrusframework.DefaultTestCaseRunner.run(DefaultTestCaseRunner.java:129)
	at org.citrusframework.TestActionRunner.$(TestActionRunner.java:35)
	at com.midsa.border.guard.citrus.TodoListIT.testPost(TodoListIT.java:39)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.citrusframework.junit.jupiter.CitrusExtension.lambda$interceptTestMethod$3(CitrusExtension.java:148)
	at org.citrusframework.common.DefaultTestLoader.lambda$doLoad$1(DefaultTestLoader.java:118)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.citrusframework.common.DefaultTestLoader.doLoad(DefaultTestLoader.java:118)
	at org.citrusframework.common.DefaultTestLoader.load(DefaultTestLoader.java:95)
	at org.citrusframework.junit.jupiter.CitrusExtension.interceptTestMethod(CitrusExtension.java:156)
	at org.citrusframework.junit.jupiter.spring.CitrusSpringExtension.interceptTestMethod(CitrusSpringExtension.java:87)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
	at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:119)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:94)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:89)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:62)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
	at jdk.proxy1/jdk.proxy1.$Proxy2.stop(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
	at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
Caused by: org.citrusframework.exceptions.CitrusRuntimeException: com.rabbitmq.jms.util.RMQJMSException: invalid stream header: 7B227575
	at org.citrusframework.jms.endpoint.JmsSyncProducer.send(JmsSyncProducer.java:149)
	at org.citrusframework.actions.SendMessageAction.doExecute(SendMessageAction.java:175)
	at org.citrusframework.actions.AbstractTestAction.execute(AbstractTestAction.java:59)
	at org.citrusframework.DefaultTestCase.executeAction(DefaultTestCase.java:139)
	... 97 more
Caused by: com.rabbitmq.jms.util.RMQJMSException: invalid stream header: 7B227575
	at com.rabbitmq.jms.client.RMQMessage.fromMessage(RMQMessage.java:1087)
	at com.rabbitmq.jms.client.RMQMessage.convertJmsMessage(RMQMessage.java:841)
	at com.rabbitmq.jms.client.RMQMessage.convertMessage(RMQMessage.java:835)
	at com.rabbitmq.jms.client.RMQMessageConsumer.receive(RMQMessageConsumer.java:358)
	at com.rabbitmq.jms.client.RMQMessageConsumer.receive(RMQMessageConsumer.java:272)
	at org.citrusframework.jms.endpoint.JmsSyncProducer.send(JmsSyncProducer.java:135)
	... 100 more

Some findings:

  • After some debugging I could narrow down that the implicitly created jms queue (by citrus) waiting on the response message is not able to handle correctly the response object.

  • The application is working, also the correct json message was generated. In the debugger I was also able to convert the byte[] into a string and I have seen the json string which I have expected (I debugged the receive, when the response message was processed). --> This let me assume that the backend and calling logic is working as expected. I could also verify this in my logs and traces of the application.

  • I have also seen in the debugger, that for the temporary created queue amqp = false was set:

    image

    • Could it be that this is the issue? How can I set the amqp option for the temporary created queues to true?

Additional information

  • Since, I am working in a spring context: This is the endpoint configuration I am using:
package com.midsa.border.guard;

import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.admin.RMQDestination;
import jakarta.jms.ConnectionFactory;
import org.citrusframework.dsl.endpoint.CitrusEndpoints;
import org.citrusframework.jms.endpoint.JmsEndpoint;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Import(TodoAppAutoConfiguration.class)
@Configuration
public class EndpointConfig {


    @Bean
    public ConnectionFactory connectionFactory() {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setUsername("xxxxx");
        connectionFactory.setPassword("xxxxx");
        connectionFactory.setVirtualHost("xxxxxx");
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672); // Replace with appropriate port
        return connectionFactory;
    }


    @Bean
    public RMQDestination jmsDestination() {
        RMQDestination jmsDestination = new RMQDestination();
        jmsDestination.setAmqp(true);
        return jmsDestination;
    }


    @Bean
    public JmsEndpoint personsQueueCreate(ConnectionFactory connectionFactory,
                                          RMQDestination jmsDestination) {
        jmsDestination.setDestinationName("midsa-service-border-guard.persons.get");
        jmsDestination.setAmqpExchangeName("midsa.persons.get");
        jmsDestination.setAmqpRoutingKey("");
        jmsDestination.setAmqpQueueName("midsa-service-border-guard.persons.get");

        return CitrusEndpoints
                .jms()
//                .asynchronous()
                .synchronous()
                .connectionFactory(connectionFactory)
                .destination(jmsDestination)
                .build();
    }
}
  • Is there maybe some springboot resolver logic that allows me to set for all destinations the correct values?

Thanks a lot and I am looking forward to hear from you :).

@bbortt bbortt added State: To discuss In case there are open questions concerning the issue Type: Question Prio: Medium labels Aug 31, 2024
bbortt added a commit that referenced this issue Aug 31, 2024
@bbortt
Copy link
Collaborator

bbortt commented Aug 31, 2024

I tried adding a reproducer test (see the referenced commit), but didn't quit manage to. maybe you could help me? I have little to no knowledge about jms and/or amqp.

@adamlukaszewski
Copy link
Author

Hi Timo,

I will create a new sample or adapt yours. Since, I have some meetings today, I will provide it a little bit later. You will hear from me, for sure.

Thanks and best regards,
Adam L.

@adamlukaszewski
Copy link
Author

Hi Timo,

here we are... sorry there were some more meetings than planed... :).

I have prepared a small sample project, showing exactly the issue. It is based on spring boot, apache camel (for processing) and it gives you also a docker-compose.yaml to start the rabbitmq with some predefined exchanges and queues.

I guess that there is everything we need. That are the interesting lines:

2024-09-02T19:05:54.426+02:00  INFO 1211709 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [127.0.0.1:5672]
2024-09-02T19:05:54.492+02:00  INFO 1211709 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#19526f1d:0/SimpleConnection@615bad16 [delegate=amqp://admin@127.0.0.1:5672/, localPort=33416]
2024-09-02T19:05:54.495+02:00 DEBUG 1211709 --- [           main] o.s.amqp.rabbit.core.RabbitAdmin         : Initializing declarations
2024-09-02T19:05:54.498+02:00 DEBUG 1211709 --- [           main] o.s.amqp.rabbit.core.RabbitAdmin         : Nothing to declare
2024-09-02T19:05:54.504+02:00  INFO 1211709 --- [           main] .c.s.CamelDirectMessageListenerContainer : Container initialized for queues: [foo.queue]
2024-09-02T19:05:54.506+02:00 DEBUG 1211709 --- [           main] o.a.c.i.e.InternalRouteStartupManager    : Route: worker-route started and consuming from: spring-rabbitmq://foo
2024-09-02T19:05:54.508+02:00 DEBUG 1211709 --- [           main] o.a.c.impl.engine.AbstractCamelContext   : Clearing BeanIntrospection cache with 3 objects using during starting Camel
2024-09-02T19:05:54.508+02:00 DEBUG 1211709 --- [           main] o.a.c.impl.engine.AbstractCamelContext   : BeanIntrospection invoked 7 times during starting Camel
2024-09-02T19:05:54.510+02:00  INFO 1211709 --- [           main] o.a.c.impl.engine.AbstractCamelContext   : Routes startup (started:1)
2024-09-02T19:05:54.510+02:00  INFO 1211709 --- [           main] o.a.c.impl.engine.AbstractCamelContext   :     Started worker-route (spring-rabbitmq://foo)
2024-09-02T19:05:54.510+02:00  INFO 1211709 --- [           main] o.a.c.impl.engine.AbstractCamelContext   : Apache Camel 4.4.1 (Cdo Citrus with Rabbit JMS not working) started in 459ms (build:40ms init:271ms start:148ms)
2024-09-02T19:05:54.513+02:00 DEBUG 1211709 --- [           main] o.apache.camel.impl.DefaultCamelContext  : start() took 424 millis
2024-09-02T19:05:54.515+02:00 DEBUG 1211709 --- [a.Spring.bean-1] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://admin@127.0.0.1:5672/,1)
2024-09-02T19:05:54.516+02:00  INFO 1211709 --- [           main] com.intellij.rt.junit.JUnitStarter       : Started JUnitStarter in 3.051 seconds (process running for 4.608)
2024-09-02T19:05:54.522+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : 
2024-09-02T19:05:54.522+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : BEFORE TEST SUITE: SUCCESS
2024-09-02T19:05:54.522+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : ------------------------------------------------------------------------
2024-09-02T19:05:54.522+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : 
2024-09-02T19:05:54.535+02:00  INFO 1211709 --- [a.Spring.bean-1] .c.s.CamelDirectMessageListenerContainer : SimpleConsumer [queue=foo.queue, index=0, consumerTag=amq.ctag-aTHX4nc6BHtUk3waWIXc9Q identity=634cad4e] started
OpenJDK 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended
2024-09-02T19:05:55.186+02:00  INFO 1211709 --- [           main] o.c.jms.endpoint.JmsSyncProducer         : Message was sent to JMS destination: 'foo.queue'
2024-09-02T19:05:55.201+02:00  INFO 1211709 --- [pool-2-thread-4] SampleCamelRouter:34                     : *--> [worker-route                  ] [from[spring-rabbitmq:foo?queues=foo.queue]        ] Exchange[Id: 393CEDE6064ED09-0000000000000000, Headers: {CamelSpringRabbitmqContentType=application/octet-stream, CamelSpringRabbitmqDeliveryMode=PERSISTENT, CamelSpringRabbitmqDeliveryTag=1, CamelSpringRabbitmqExchangeName=foo, CamelSpringRabbitmqPriority=4, CamelSpringRabbitmqRedelivered=false, CamelSpringRabbitmqReplyTo=jms-temp-queue-e001a063-10d1-4e1b-8990-a6a01e0c9c25, CamelSpringRabbitmqRoutingKey=, JMSDeliveryMode=PERSISTENT, JMSMessageID=ID:b0990cf7-72de-4e2b-ac65-772ebb2b1c3d, JMSPriority=4, JMSTimestamp=1725296755183, timestamp=1725296755167}, BodyType: byte[], Body: A foo description]
2024-09-02T19:05:55.202+02:00  INFO 1211709 --- [pool-2-thread-4] SampleCamelRouter:36                     :      [worker-route                  ] [log                                               ] Exchange[Id: 393CEDE6064ED09-0000000000000000, Headers: {CamelSpringRabbitmqContentType=application/octet-stream, CamelSpringRabbitmqDeliveryMode=PERSISTENT, CamelSpringRabbitmqDeliveryTag=1, CamelSpringRabbitmqExchangeName=foo, CamelSpringRabbitmqPriority=4, CamelSpringRabbitmqRedelivered=false, CamelSpringRabbitmqReplyTo=jms-temp-queue-e001a063-10d1-4e1b-8990-a6a01e0c9c25, CamelSpringRabbitmqRoutingKey=, JMSDeliveryMode=PERSISTENT, JMSMessageID=ID:b0990cf7-72de-4e2b-ac65-772ebb2b1c3d, JMSPriority=4, JMSTimestamp=1725296755183, timestamp=1725296755167}, BodyType: byte[], Body: A foo description]
2024-09-02T19:05:55.203+02:00  INFO 1211709 --- [pool-2-thread-4] worker-route                             : >>> GOT THE FOLLOWING MESSAGE FROM RABBITMQ: A foo description
2024-09-02T19:05:55.203+02:00  INFO 1211709 --- [pool-2-thread-4] SampleCamelRouter:37                     :      [worker-route                  ] [Processor@0xe795a1d                               ] Exchange[Id: 393CEDE6064ED09-0000000000000000, Headers: {CamelSpringRabbitmqContentType=application/octet-stream, CamelSpringRabbitmqDeliveryMode=PERSISTENT, CamelSpringRabbitmqDeliveryTag=1, CamelSpringRabbitmqExchangeName=foo, CamelSpringRabbitmqPriority=4, CamelSpringRabbitmqRedelivered=false, CamelSpringRabbitmqReplyTo=jms-temp-queue-e001a063-10d1-4e1b-8990-a6a01e0c9c25, CamelSpringRabbitmqRoutingKey=, JMSDeliveryMode=PERSISTENT, JMSMessageID=ID:b0990cf7-72de-4e2b-ac65-772ebb2b1c3d, JMSPriority=4, JMSTimestamp=1725296755183, timestamp=1725296755167}, BodyType: byte[], Body: A foo description]
2024-09-02T19:05:55.204+02:00  INFO 1211709 --- [pool-2-thread-4] SampleCamelRouter:49                     :      [worker-route                  ] [wireTap[spring-rabbitmq:?queues=foo.queue.debug&ro] Exchange[Id: 393CEDE6064ED09-0000000000000000, Headers: {CamelSpringRabbitmqContentType=application/octet-stream, CamelSpringRabbitmqDeliveryMode=PERSISTENT, CamelSpringRabbitmqDeliveryTag=1, CamelSpringRabbitmqExchangeName=foo, CamelSpringRabbitmqPriority=4, CamelSpringRabbitmqRedelivered=false, CamelSpringRabbitmqReplyTo=jms-temp-queue-e001a063-10d1-4e1b-8990-a6a01e0c9c25, CamelSpringRabbitmqRoutingKey=, Content-Type=application/json, JMSDeliveryMode=PERSISTENT, JMSMessageID=ID:b0990cf7-72de-4e2b-ac65-772ebb2b1c3d, JMSPriority=4, JMSTimestamp=1725296755183, timestamp=1725296755167}, BodyType: String, Body: {"response": "Processed message with ID null and body: A foo description"}]
2024-09-02T19:05:55.206+02:00  INFO 1211709 --- [pool-2-thread-4] SampleCamelRouter:50                     :      [worker-route                  ] [log                                               ] Exchange[Id: 393CEDE6064ED09-0000000000000000, Headers: {CamelSpringRabbitmqContentType=application/octet-stream, CamelSpringRabbitmqDeliveryMode=PERSISTENT, CamelSpringRabbitmqDeliveryTag=1, CamelSpringRabbitmqExchangeName=foo, CamelSpringRabbitmqPriority=4, CamelSpringRabbitmqRedelivered=false, CamelSpringRabbitmqReplyTo=jms-temp-queue-e001a063-10d1-4e1b-8990-a6a01e0c9c25, CamelSpringRabbitmqRoutingKey=, Content-Type=application/json, JMSDeliveryMode=PERSISTENT, JMSMessageID=ID:b0990cf7-72de-4e2b-ac65-772ebb2b1c3d, JMSPriority=4, JMSTimestamp=1725296755183, timestamp=1725296755167}, BodyType: String, Body: {"response": "Processed message with ID null and body: A foo description"}]
2024-09-02T19:05:55.206+02:00  INFO 1211709 --- [pool-2-thread-4] worker-route                             : >>> SENDING RESPONSE BACK TO PRODUCER: {"response": "Processed message with ID null and body: A foo description"}
2024-09-02T19:05:55.207+02:00  INFO 1211709 --- [pool-2-thread-4] SampleCamelRouter:51                     :      [worker-route                  ] [setExchangePattern[InOut]                         ] Exchange[Id: 393CEDE6064ED09-0000000000000000, Headers: {CamelSpringRabbitmqContentType=application/octet-stream, CamelSpringRabbitmqDeliveryMode=PERSISTENT, CamelSpringRabbitmqDeliveryTag=1, CamelSpringRabbitmqExchangeName=foo, CamelSpringRabbitmqPriority=4, CamelSpringRabbitmqRedelivered=false, CamelSpringRabbitmqReplyTo=jms-temp-queue-e001a063-10d1-4e1b-8990-a6a01e0c9c25, CamelSpringRabbitmqRoutingKey=, Content-Type=application/json, JMSDeliveryMode=PERSISTENT, JMSMessageID=ID:b0990cf7-72de-4e2b-ac65-772ebb2b1c3d, JMSPriority=4, JMSTimestamp=1725296755183, timestamp=1725296755167}, BodyType: String, Body: {"response": "Processed message with ID null and body: A foo description"}]
2024-09-02T19:05:55.208+02:00  INFO 1211709 --- [pool-2-thread-4] SampleCamelRouter:34                     : *<-- [worker-route                  ] [from[spring-rabbitmq:foo?queues=foo.queue]        ] Exchange[Id: 393CEDE6064ED09-0000000000000000, Headers: {CamelSpringRabbitmqContentType=application/octet-stream, CamelSpringRabbitmqDeliveryMode=PERSISTENT, CamelSpringRabbitmqDeliveryTag=1, CamelSpringRabbitmqExchangeName=foo, CamelSpringRabbitmqPriority=4, CamelSpringRabbitmqRedelivered=false, CamelSpringRabbitmqReplyTo=jms-temp-queue-e001a063-10d1-4e1b-8990-a6a01e0c9c25, CamelSpringRabbitmqRoutingKey=, Content-Type=application/json, JMSDeliveryMode=PERSISTENT, JMSMessageID=ID:b0990cf7-72de-4e2b-ac65-772ebb2b1c3d, JMSPriority=4, JMSTimestamp=1725296755183, timestamp=1725296755167}, BodyType: String, Body: {"response": "Processed message with ID null and body: A foo description"}]
2024-09-02T19:05:55.209+02:00 DEBUG 1211709 --- [ad #1 - WireTap] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://admin@127.0.0.1:5672/,2)
2024-09-02T19:05:55.211+02:00 DEBUG 1211709 --- [ad #1 - WireTap] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitTemplate$$Lambda$1406/0x000074606878cca0 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://admin@127.0.0.1:5672/,2), conn: Proxy@123ca460 Shared Rabbit Connection: SimpleConnection@615bad16 [delegate=amqp://admin@127.0.0.1:5672/, localPort=33416]
2024-09-02T19:05:55.211+02:00 DEBUG 1211709 --- [pool-2-thread-4] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://admin@127.0.0.1:5672/,3)
2024-09-02T19:05:55.212+02:00 DEBUG 1211709 --- [pool-2-thread-4] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitTemplate$$Lambda$1406/0x000074606878cca0 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://admin@127.0.0.1:5672/,3), conn: Proxy@123ca460 Shared Rabbit Connection: SimpleConnection@615bad16 [delegate=amqp://admin@127.0.0.1:5672/, localPort=33416]
2024-09-02T19:05:55.222+02:00 DEBUG 1211709 --- [pool-2-thread-4] o.s.amqp.rabbit.core.RabbitTemplate      : Publishing message [(Body:'[B@5643c4c5(byte[74])' MessageProperties [headers={JMSPriority=4, JMSMessageID=ID:b0990cf7-72de-4e2b-ac65-772ebb2b1c3d, JMSTimestamp=1725296755183, JMSDeliveryMode=PERSISTENT, timestamp=1725296755167}, replyTo=jms-temp-queue-e001a063-10d1-4e1b-8990-a6a01e0c9c25, contentType=text/plain, contentEncoding=UTF-8, contentLength=74, deliveryMode=PERSISTENT, priority=4, deliveryTag=0])] on exchange [], routingKey = [jms-temp-queue-e001a063-10d1-4e1b-8990-a6a01e0c9c25]
2024-09-02T19:05:55.222+02:00 DEBUG 1211709 --- [ad #1 - WireTap] o.s.amqp.rabbit.core.RabbitTemplate      : Publishing message [(Body:'[B@5bde5bd7(byte[74])' MessageProperties [headers={JMSPriority=4, JMSMessageID=ID:b0990cf7-72de-4e2b-ac65-772ebb2b1c3d, JMSTimestamp=1725296755183, JMSDeliveryMode=PERSISTENT, timestamp=1725296755167}, replyTo=jms-temp-queue-e001a063-10d1-4e1b-8990-a6a01e0c9c25, contentType=text/plain, contentEncoding=UTF-8, contentLength=74, deliveryMode=PERSISTENT, priority=4, deliveryTag=0])] on exchange [], routingKey = [foo.queue.debug]
2024-09-02T19:05:55.226+02:00 DEBUG 1211709 --- [ad #1 - WireTap] o.s.a.r.c.CachingConnectionFactory       : Closing cached Channel: AMQChannel(amqp://admin@127.0.0.1:5672/,2)
2024-09-02T19:05:55.297+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : 
2024-09-02T19:05:55.298+02:00 ERROR 1211709 --- [           main] o.c.report.LoggingReporter               : TEST FAILED springBeanTest <sample.camel> Nested exception is: 

org.citrusframework.exceptions.CitrusRuntimeException: com.rabbitmq.jms.util.RMQJMSException: invalid stream header: 7B227265
	at org.citrusframework.jms.endpoint.JmsSyncProducer.send(JmsSyncProducer.java:149) ~[citrus-jms-4.1.0.jar:na]
	at org.citrusframework.actions.SendMessageAction.doExecute(SendMessageAction.java:175) ~[citrus-base-4.1.0.jar:na]
	at org.citrusframework.actions.AbstractTestAction.execute(AbstractTestAction.java:59) ~[citrus-base-4.1.0.jar:na]
	at org.citrusframework.DefaultTestCase.executeAction(DefaultTestCase.java:139) ~[citrus-base-4.1.0.jar:na]
	at org.citrusframework.DefaultTestCaseRunner.run(DefaultTestCaseRunner.java:129) ~[citrus-base-4.1.0.jar:na]
	at org.citrusframework.TestActionRunner.$(TestActionRunner.java:35) ~[citrus-api-4.1.0.jar:na]
	at sample.camel.SampleCamelRouterTest.springBeanTest(SampleCamelRouterTest.java:36) ~[test-classes/:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:569) ~[na:na]
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) ~[junit-platform-commons-1.10.2.jar:1.10.2]
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) ~[junit-jupiter-engine-5.10.2.jar:5.10.2]
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) ~[junit-jupiter-engine-5.10.2.jar:5.10.2]
	at org.citrusframework.junit.jupiter.CitrusExtension.lambda$interceptTestMethod$3(CitrusExtension.java:148) ~[citrus-junit5-4.1.0.jar:na]
	at org.citrusframework.common.DefaultTestLoader.lambda$doLoad$1(DefaultTestLoader.java:118) ~[citrus-base-4.1.0.jar:na]
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) ~[na:na]
	at org.citrusframework.common.DefaultTestLoader.doLoad(DefaultTestLoader.java:118) ~[citrus-base-4.1.0.jar:na]
	at org.citrusframework.common.DefaultTestLoader.load(DefaultTestLoader.java:95) ~[citrus-base-4.1.0.jar:na]
	at org.citrusframework.junit.jupiter.CitrusExtension.interceptTestMethod(CitrusExtension.java:156) ~[citrus-junit5-4.1.0.jar:na]
	at org.citrusframework.junit.jupiter.spring.CitrusSpringExtension.interceptTestMethod(CitrusSpringExtension.java:87) ~[citrus-junit5-4.1.0.jar:na]
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) ~[junit-jupiter-engine-5.10.2.jar:5.10.2]
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) ~[junit-jupiter-engine-5.10.2.jar:5.10.2]
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) ~[junit-jupiter-engine-5.10.2.jar:5.10.2]
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) ~[junit-jupiter-engine-5.10.2.jar:5.10.2]
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) ~[junit-jupiter-engine-5.10.2.jar:5.10.2]
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) ~[junit-jupiter-engine-5.10.2.jar:5.10.2]
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) ~[junit-jupiter-engine-5.10.2.jar:5.10.2]
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) ~[junit-jupiter-engine-5.10.2.jar:5.10.2]
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) ~[junit-jupiter-engine-5.10.2.jar:5.10.2]
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) ~[junit-jupiter-engine-5.10.2.jar:5.10.2]
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) ~[junit-jupiter-engine-5.10.2.jar:5.10.2]
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) ~[junit-jupiter-engine-5.10.2.jar:5.10.2]
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) ~[junit-jupiter-engine-5.10.2.jar:5.10.2]
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) ~[junit-jupiter-engine-5.10.2.jar:5.10.2]
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) ~[junit-jupiter-engine-5.10.2.jar:5.10.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214) ~[junit-jupiter-engine-5.10.2.jar:5.10.2]
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139) ~[junit-jupiter-engine-5.10.2.jar:5.10.2]
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69) ~[junit-jupiter-engine-5.10.2.jar:5.10.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) ~[na:na]
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) ~[na:na]
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54) ~[junit-platform-engine-1.10.2.jar:1.10.2]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:198) ~[junit-platform-launcher-1.10.2.jar:1.10.2]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:169) ~[junit-platform-launcher-1.10.2.jar:1.10.2]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:93) ~[junit-platform-launcher-1.10.2.jar:1.10.2]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:58) ~[junit-platform-launcher-1.10.2.jar:1.10.2]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:141) ~[junit-platform-launcher-1.10.2.jar:1.10.2]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:57) ~[junit-platform-launcher-1.10.2.jar:1.10.2]
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:103) ~[junit-platform-launcher-1.10.2.jar:1.10.2]
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:85) ~[junit-platform-launcher-1.10.2.jar:1.10.2]
	at org.junit.platform.launcher.core.DelegatingLauncher.execute(DelegatingLauncher.java:47) ~[junit-platform-launcher-1.10.2.jar:1.10.2]
	at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:63) ~[junit-platform-launcher-1.10.2.jar:1.10.2]
	at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57) ~[junit5-rt.jar:na]
	at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38) ~[junit-rt.jar:na]
	at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11) ~[idea_rt.jar:na]
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35) ~[junit-rt.jar:na]
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232) ~[junit-rt.jar:na]
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55) ~[junit-rt.jar:na]
Caused by: com.rabbitmq.jms.util.RMQJMSException: invalid stream header: 7B227265
	at com.rabbitmq.jms.client.RMQMessage.fromMessage(RMQMessage.java:1087) ~[rabbitmq-jms-3.2.0.jar:3.2.0]
	at com.rabbitmq.jms.client.RMQMessage.convertJmsMessage(RMQMessage.java:841) ~[rabbitmq-jms-3.2.0.jar:3.2.0]
	at com.rabbitmq.jms.client.RMQMessage.convertMessage(RMQMessage.java:835) ~[rabbitmq-jms-3.2.0.jar:3.2.0]
	at com.rabbitmq.jms.client.RMQMessageConsumer.receive(RMQMessageConsumer.java:358) ~[rabbitmq-jms-3.2.0.jar:3.2.0]
	at com.rabbitmq.jms.client.RMQMessageConsumer.receive(RMQMessageConsumer.java:272) ~[rabbitmq-jms-3.2.0.jar:3.2.0]
	at org.citrusframework.jms.endpoint.JmsSyncProducer.send(JmsSyncProducer.java:135) ~[citrus-jms-4.1.0.jar:na]
	... 86 common frames omitted
Caused by: java.io.StreamCorruptedException: invalid stream header: 7B227265
	at java.base/java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:958) ~[na:na]
	at java.base/java.io.ObjectInputStream.<init>(ObjectInputStream.java:392) ~[na:na]
	at com.rabbitmq.jms.util.WhiteListObjectInputStream.<init>(WhiteListObjectInputStream.java:90) ~[rabbitmq-jms-3.2.0.jar:3.2.0]
	at com.rabbitmq.jms.client.RMQMessage.fromMessage(RMQMessage.java:1061) ~[rabbitmq-jms-3.2.0.jar:3.2.0]
	... 91 common frames omitted

2024-09-02T19:05:55.302+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : ------------------------------------------------------------------------
2024-09-02T19:05:55.303+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : 

org.citrusframework.exceptions.TestCaseFailedException: com.rabbitmq.jms.util.RMQJMSException: invalid stream header: 7B227265

	at org.citrusframework.DefaultTestCase.executeAction(DefaultTestCase.java:146)
	at org.citrusframework.DefaultTestCaseRunner.run(DefaultTestCaseRunner.java:129)
	at org.citrusframework.TestActionRunner.$(TestActionRunner.java:35)
	at sample.camel.SampleCamelRouterTest.springBeanTest(SampleCamelRouterTest.java:36)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at org.citrusframework.junit.jupiter.CitrusExtension.lambda$interceptTestMethod$3(CitrusExtension.java:148)
	at org.citrusframework.common.DefaultTestLoader.lambda$doLoad$1(DefaultTestLoader.java:118)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.citrusframework.common.DefaultTestLoader.doLoad(DefaultTestLoader.java:118)
	at org.citrusframework.common.DefaultTestLoader.load(DefaultTestLoader.java:95)
	at org.citrusframework.junit.jupiter.CitrusExtension.interceptTestMethod(CitrusExtension.java:156)
	at org.citrusframework.junit.jupiter.spring.CitrusSpringExtension.interceptTestMethod(CitrusSpringExtension.java:87)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
Caused by: org.citrusframework.exceptions.CitrusRuntimeException: com.rabbitmq.jms.util.RMQJMSException: invalid stream header: 7B227265
	at org.citrusframework.jms.endpoint.JmsSyncProducer.send(JmsSyncProducer.java:149)
	at org.citrusframework.actions.SendMessageAction.doExecute(SendMessageAction.java:175)
	at org.citrusframework.actions.AbstractTestAction.execute(AbstractTestAction.java:59)
	at org.citrusframework.DefaultTestCase.executeAction(DefaultTestCase.java:139)
	... 13 more
Caused by: com.rabbitmq.jms.util.RMQJMSException: invalid stream header: 7B227265
	at com.rabbitmq.jms.client.RMQMessage.fromMessage(RMQMessage.java:1087)
	at com.rabbitmq.jms.client.RMQMessage.convertJmsMessage(RMQMessage.java:841)
	at com.rabbitmq.jms.client.RMQMessage.convertMessage(RMQMessage.java:835)
	at com.rabbitmq.jms.client.RMQMessageConsumer.receive(RMQMessageConsumer.java:358)
	at com.rabbitmq.jms.client.RMQMessageConsumer.receive(RMQMessageConsumer.java:272)
	at org.citrusframework.jms.endpoint.JmsSyncProducer.send(JmsSyncProducer.java:135)
	... 16 more
Caused by: java.io.StreamCorruptedException: invalid stream header: 7B227265
	at java.base/java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:958)
	at java.base/java.io.ObjectInputStream.<init>(ObjectInputStream.java:392)
	at com.rabbitmq.jms.util.WhiteListObjectInputStream.<init>(WhiteListObjectInputStream.java:90)
	at com.rabbitmq.jms.client.RMQMessage.fromMessage(RMQMessage.java:1061)
	... 21 more

2024-09-02T19:05:55.335+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : 
2024-09-02T19:05:55.335+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : ------------------------------------------------------------------------
2024-09-02T19:05:55.335+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : 
2024-09-02T19:05:55.335+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : 
2024-09-02T19:05:55.335+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : AFTER TEST SUITE: SUCCESS
2024-09-02T19:05:55.335+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : ------------------------------------------------------------------------
2024-09-02T19:05:55.335+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : 
2024-09-02T19:05:55.335+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : ------------------------------------------------------------------------
2024-09-02T19:05:55.335+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : 
2024-09-02T19:05:55.335+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : CITRUS TEST RESULTS
2024-09-02T19:05:55.335+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : 
2024-09-02T19:05:55.337+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               :  springBeanTest ................................................. FAILED
2024-09-02T19:05:55.339+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               :  FAILURE: Caused by: CitrusRuntimeException: com.rabbitmq.jms.util.RMQJMSException: invalid stream header: 7B227265
2024-09-02T19:05:55.339+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : 
2024-09-02T19:05:55.339+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : TOTAL:	1
2024-09-02T19:05:55.341+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : FAILED:	1 (100.0%)
2024-09-02T19:05:55.341+02:00  INFO 1211709 --- [           main] o.c.report.LoggingReporter               : SUCCESS:	0 (0.0%)

As we see there incoming messages was nicely processed by the apache camel. It generated a simple result (as json) and trhows it back to a foo.queue.debug as well as the provider of the message. Also pay attention to the CamelSpringRabbitmqReplyTo header in the log console. As you see apache camel is sending the message back (in-out pattern) to the temporary created queue (by citrus). And this queue is not able to decode the message appropriately to a Message since it it is not amqp = true configured (that is my assumption).

Based on this sample I am going to check if I can find a workaround by specifying a reply destination...

If you have troubles to start the sample, let me now. I will try to response as soon as possible.

Best regards,
Adam L.

@bbortt
Copy link
Collaborator

bbortt commented Sep 3, 2024

awesome! that's something I can work with! you'll hear from me...

@adamlukaszewski
Copy link
Author

I guess that I made it work with a work-around. I have just pushed a commit. What I did:

  • I created a reply queue and destination in my EndpointConfig. A bar.queue was created as the reply queue. This queue was provided in the response destination:
    @Bean
    public JmsEndpoint inOutQueueEndpoint(ConnectionFactory connectionFactory, @Qualifier("fooQueueDestination") RMQDestination fooQueueDestination, @Qualifier("barQueueDestination") RMQDestination barQueueDestination) {

        return CitrusEndpoints
                .jms()
//                .asynchronous()
                .synchronous()
                .connectionFactory(connectionFactory)
                .destination(fooQueueDestination)
                .replyDestination("bar.queue")
                .destinationResolver((session, destinationName, pubSubDomain) -> barQueueDestination)
//                .replyDestination(barQueueDestination)
//                .replyDestination(out)
                .build();
    }

I have to admit that it was not easy to find the right constructor for the queue and the correct parameters (replyDestionation) to ensure that also the correct reply attributes were set by citrus. But with this setup it seems to work.

But... (there is always a but):

  • with this workaround i need to configure new response queues (which I actually do not need for my application logic). The process of creating a temporary queue under the hood by citrus was much better.

  • I guess that one of the main issues here is more or less the com.rabbitmq.jms.admin.RMQDestination. whether you want to get a amqp compatible queue or not depends very strongly on which constructor you are using!

    The org/citrusframework/jms/endpoint/JmsSyncProducer.java:119 (getReplyDestination) setups the reply queue for in-out (sync) communication. If no resolver and replyDestination is configured in the endpoint it is using the RMQSession.createTemporaryQueue which for sure is using a constructor that sets amqp = false.

  • So this was also the reason why I started to create a self-made amqp queue and configured it appropriately

@bbortt
Copy link
Collaborator

bbortt commented Sep 4, 2024

awesome you could make it work!

I've extracted the following improvement from your comment.. from my understanding. please tell me if this could benefit you:

we could improve the AMPQ support by not using RMQSession#createTemporaryQueue based on a flag. e.g.:

.jms()
                .synchronous()
                .ampq(true)
                .build();

something into this direction. that would help?

@adamlukaszewski
Copy link
Author

Hi Timo,

.jms()
                .synchronous()
                .ampq(true)
                .build();

Sounds great! The logic must also ensure that an temporary queue (like it is already happening for jms) will also be created (with exclusive = true). That would completely cover my case.

Why? Since, I could run the example with the bar.queue (output) queue. I started to make some investigations how can I ensure that these temporary queue will be created during the processing, like it is happening for jms (not amqp). That is what I have now:

package sample.camel;

import com.rabbitmq.client.Channel;
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.admin.RMQDestination;
import org.citrusframework.dsl.endpoint.CitrusEndpoints;
import org.citrusframework.jms.endpoint.JmsEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

@Configuration
//@Import(TodoAppAutoConfiguration.class)
public class SampleCamelRouteEndpointConfig {

    Logger logger = LoggerFactory.getLogger(SampleCamelRouteEndpointConfig.class);

    private com.rabbitmq.client.Connection rabbitConnection;  // Hold the RabbitMQ connection


    @Bean
    public RMQConnectionFactory jmsConnectionFactory() throws IOException, TimeoutException {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672); // Replace with appropriate port

        // Initialize RabbitMQ core connection to manage manually
        initializeRabbitConnection(connectionFactory);

        return connectionFactory;
    }

    private void initializeRabbitConnection(RMQConnectionFactory jmsConnectionFactory) throws IOException, TimeoutException {
        // Create a RabbitMQ ConnectionFactory using the same settings as the RMQConnectionFactory
        com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
        factory.setHost(jmsConnectionFactory.getHost());
        factory.setPort(jmsConnectionFactory.getPort());
        factory.setUsername(jmsConnectionFactory.getUsername());
        factory.setPassword(jmsConnectionFactory.getPassword());
        factory.setVirtualHost(jmsConnectionFactory.getVirtualHost());

        // Open a connection to RabbitMQ and store it
        this.rabbitConnection = factory.newConnection();
    }

    @Bean(name = "fooQueueDestination")
    public RMQDestination fooQueueDestination() {
        return new RMQDestination("foo.queue", "", "foo.queue", "");
    }

    @Bean(name = "temporaryQueueName")
    public String temporaryQueueName() throws IOException, TimeoutException {
        // Check if the temporary queue name is already cached

        // Use the existing RabbitMQ connection to declare a temporary queue
        try (Channel channel = rabbitConnection.createChannel()) {
            // Declare a temporary queue with a unique name, exclusive and auto-delete properties
            String tempQueueName = channel.queueDeclare("", false, false, true, null).getQueue();
            logger.info("Temporary queue created: {}", tempQueueName);
            return tempQueueName;
        }
    }

    @Bean
    public JmsEndpoint inOutQueueEndpoint(
            @Qualifier("jmsConnectionFactory") RMQConnectionFactory connectionFactory,
            @Qualifier("fooQueueDestination") RMQDestination fooQueueDestination,
            @Qualifier("temporaryQueueName") String temporaryQueueName
    ) {

        logger.info("Endpoint is using this {}", temporaryQueueName);

        RMQDestination tempQueueDestination = new RMQDestination(temporaryQueueName, "", temporaryQueueName, temporaryQueueName);
        tempQueueDestination.setAmqp(true); // Ensure AMQP protocol is used
        tempQueueDestination.setQueue(true);  // Ensure this is recognized as a queue

        return CitrusEndpoints
                .jms()
                .synchronous()
                .connectionFactory(connectionFactory)
                .destination(fooQueueDestination)
                .replyDestination(tempQueueDestination)
                .build();
    }
}

As you see I started to create with temporaryQueueName the needed reply queue during the initialization of the container. The queues exists just before the endpoint will be used during testing. But... since I have created the queue in a separate connection/session and not the one which the endpoint (jms) is using, I am not able to configure the temporary queues as "exclusive = true" what would remove them automatically once the connection disposes.

Well, it would help a lot in the case of amqp if the temporary queue could be build up (with exclusive = true) to gain more comfort.

Some notes about this. I have revised the logic of the JmsSynchProducer. When a reply destination will be resolved for the endpoint, it creates with session.createConsumer(replyToDestination); with the RMQSession a queue if needed (declareDestinationIfNecessary):

    private RMQMessageConsumer createConsumerInternal(RMQDestination dest, String uuidTag, boolean durableSubscriber, String jmsSelector) throws JMSException {
        String consumerTag = uuidTag != null ? uuidTag : generateJmsConsumerQueueName();
        logger.trace("create consumer for destination '{}' with consumerTag '{}' and selector '{}'", dest, consumerTag, jmsSelector);
        declareDestinationIfNecessary(dest);
        if (!dest.isQueue()) {
            String subscriptionName = consumerTag;
            Subscription subscription = this.subscriptions.get(durableSubscriber, subscriptionName);
            if (subscription == null) {
                // it is unshared, non-durable, creating a transient subscription instance
                subscription = new Subscription(subscriptionName, subscriptionName, false, false, jmsSelector, false);
            }
            subscription.createTopology(dest, this, this.channel);
            consumerTag = subscription.queue();
        }
        RMQMessageConsumer consumer = new RMQMessageConsumer(this, dest, consumerTag, getConnection().isStopped(),
            jmsSelector, this.requeueOnMessageListenerException, this.receivingContextConsumer,
            this.requeueOnTimeout);
        this.consumers.add(consumer);
        return consumer;
    }

and here is the declareDestinationIfNecessary:

    void declareDestinationIfNecessary(RMQDestination destination) throws JMSException {
        if (destination != null && !destination.isAmqp() && !destination.isDeclared()) {
            if (destination.isQueue()) {
                declareRMQQueue(destination, null, false, true);
            } else {
                declareTopic(destination);
            }
        }
    }

What I do not understand here, is why the declareRMQQueue will only be executed if !destination.isAmqp().

In consequence, this "automatic queue creation process" of a reply queue will never work for my amqp reply queues. I haven't found any explanation, yet, why the RMQ Client is making this assumption.

One last sentence, it would help a lot if the entire process using a amqp queue could be set-up automatically, like it is the case for JMS.

Thanks and best regards,
Adam L.

@bbortt
Copy link
Collaborator

bbortt commented Sep 4, 2024

thinking out lout; a different builder - exclusively for ampq - would probably make even more sense.. e.g.:

CitrusEndpoints.ampq()
                .synchronous()
                .build();

there would be less potential for confusion this way, probably.

I'll have a look at it.

edit: and thanks for all the inputs. that helps a lot.

@bbortt bbortt added Type: Feature and removed State: To discuss In case there are open questions concerning the issue Type: Question labels Sep 4, 2024
@adamlukaszewski
Copy link
Author

In my sample project I pushed a commit which finalizes it. The pattern is:

  • Create amqp queues (with autoDelete = true only, since I am working in another session and exclusive won't work)
  • Setting this new queue as destination in my jms().synchronous() endpoint

One small pitfalls:

  • Since the spring context will be initiated several times, I am getting two tmp amqp queues instead of only one. For now, I was not able to figure out how I can solve it. But it does not break the process.

  • And since we are using a jms client it is very important to provide some headers, to instruct the client to decode the message as TextMessage instead of BinaryMessage:

         a.$(SendMessageAction.Builder.send()
                 .endpoint(this.inOutQueueEndpoint)
                 .message()
                 .type(MessageType.PLAINTEXT)
                 .header("JMSType", "TextMessage") // <--- important for amqp 
                 .body("A foo description")
         );

    Perhaps, it could be worth to also consider these points when the new CitrusEndpoints.ampq() will be designed :).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants