From 59457277e67ebcda251533e1709cde793e292a1e Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Thu, 3 Aug 2023 19:47:43 +0200 Subject: [PATCH] Added UNBATCH + unit test (https://issues.redhat.com/browse/JGRP-2702) --- src/org/jgroups/protocols/UNBATCH.java | 32 ++++++ .../org/jgroups/protocols/UNBATCH_Test.java | 103 ++++++++++++++++++ 2 files changed, 135 insertions(+) create mode 100644 src/org/jgroups/protocols/UNBATCH.java create mode 100644 tests/junit-functional/org/jgroups/protocols/UNBATCH_Test.java diff --git a/src/org/jgroups/protocols/UNBATCH.java b/src/org/jgroups/protocols/UNBATCH.java new file mode 100644 index 0000000000..7468aa7011 --- /dev/null +++ b/src/org/jgroups/protocols/UNBATCH.java @@ -0,0 +1,32 @@ +package org.jgroups.protocols; + +import org.jgroups.Message; +import org.jgroups.annotations.MBean; +import org.jgroups.annotations.Property; +import org.jgroups.stack.Protocol; +import org.jgroups.util.MessageBatch; + +/** + * Intercepts {@link org.jgroups.stack.Protocol#up(MessageBatch)} and passes up each message of a message batch + * as a single message. Mainly to be used in unit tests (https://issues.redhat.com/browse/JGRP-2702). + * @author Bela Ban + * @since 5.2.18 + */ +@MBean(description="Passes each message from a MessageBatch up as a single message") +public class UNBATCH extends Protocol { + @Property(description="If enabled, message batches are passed up as single messages, otherwise as batches") + protected boolean enabled=true; + + public boolean enabled() {return enabled;} + public UNBATCH enable(boolean b) {enabled=b; return this;} + + @Override + public void up(MessageBatch batch) { + if(!enabled) { + up_prot.up(batch); + return; + } + for(Message msg: batch) + up_prot.up(msg); + } +} diff --git a/tests/junit-functional/org/jgroups/protocols/UNBATCH_Test.java b/tests/junit-functional/org/jgroups/protocols/UNBATCH_Test.java new file mode 100644 index 0000000000..65a4f25ce6 --- /dev/null +++ b/tests/junit-functional/org/jgroups/protocols/UNBATCH_Test.java @@ -0,0 +1,103 @@ +package org.jgroups.protocols; + +import org.jgroups.*; +import org.jgroups.protocols.pbcast.GMS; +import org.jgroups.protocols.pbcast.NAKACK2; +import org.jgroups.protocols.pbcast.STABLE; +import org.jgroups.stack.Protocol; +import org.jgroups.util.MessageBatch; +import org.jgroups.util.Util; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.concurrent.atomic.LongAdder; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Tests {@link UNBATCH} + * @author Bela Ban + * @since 5.2.18 + */ +@Test(groups= Global.FUNCTIONAL,singleThreaded=true) +public class UNBATCH_Test { + protected MyReceiver ra=new MyReceiver(), rb=new MyReceiver(); + protected JChannel a, b; + + @BeforeMethod protected void setup() throws Exception { + a=create("A").connect(UNBATCH_Test.class.getSimpleName()); + b=create("B").connect(UNBATCH_Test.class.getSimpleName()); + Util.waitUntilAllChannelsHaveSameView(5000, 100, a, b); + a.setReceiver(ra); b.setReceiver(rb); + ra.clear(); rb.clear(); + } + + @AfterMethod void destroy() {Util.close(b, a);} + + /** Tests that all unicasts sent by A to B are received as single messages by B */ + public void testUnicastSingleMessages() throws Exception { + Address target=b.getAddress(); + for(int i=1; i <= 100; i++) + a.send(target, i); + Util.waitUntil(5000, 100, () -> rb.numMsgs() == 100, () -> print(b)); + System.out.printf("msgs:\n%s\n", print(b)); + assert rb.numSingleMsgs() == 100; + assert rb.numBatches() == 0; + } + + /** Tests that all multicasts sent by A to B are received as single messages by A and B */ + public void testMulticastSingleMessages() throws Exception { + for(int i=1; i <= 100; i++) + a.send(null, i); + Util.waitUntil(5000, 100, () -> ra.numMsgs() == 100 && rb.numMsgs() == 100, () -> print(a,b)); + System.out.printf("msgs:\n%s\n", print(a,b)); + assert ra.numSingleMsgs() == 100 && rb.numSingleMsgs() == 100; + assert ra.numBatches() == 0 && rb.numBatches() == 0; + } + + protected static String print(JChannel... channels) { + return Stream.of(channels).map(ch -> String.format("%s: %s", ch.getAddress(), ch.getReceiver())) + .collect(Collectors.joining("\n")); + } + + protected static JChannel create(String name) throws Exception { + Protocol[] prots={ + new SHARED_LOOPBACK(), + new LOCAL_PING(), + new NAKACK2(), + new UNICAST3(), + new UNBATCH().enable(true), + new STABLE(), + new GMS().setJoinTimeout(100), + }; + return new JChannel(prots).name(name); + } + + protected static class MyReceiver implements Receiver { + protected final LongAdder num_batches=new LongAdder(); + protected final LongAdder num_single_msgs=new LongAdder(); + protected final LongAdder num_msgs=new LongAdder(); + + protected long numBatches() {return num_batches.sum();} + protected long numSingleMsgs() {return num_single_msgs.sum();} + protected long numMsgs() {return num_msgs.sum();} + protected MyReceiver clear() {num_msgs.reset(); num_batches.reset(); num_single_msgs.reset(); return this;} + + @Override + public void receive(Message msg) { + num_single_msgs.increment(); num_msgs.increment(); + } + + @Override + public void receive(MessageBatch batch) { + num_batches.increment(); num_msgs.add(batch.size()); + } + + @Override + public String toString() { + return String.format("%d msgs: %d batches, %d single msgs", + num_msgs.sum(), num_batches.sum(), num_single_msgs.sum()); + } + } +}