diff --git a/doc/manual/api.adoc b/doc/manual/api.adoc index 10726893a1..e77aac9175 100644 --- a/doc/manual/api.adoc +++ b/doc/manual/api.adoc @@ -269,48 +269,22 @@ to better performance. JGroups 5.0 comes with a number of message types (see the next sections). If none of them are a fit for the application's requirements, new message types can be defined and registered. To do this, the new message type needs to implement -`Message` (typically by subclassing `BaseMessage`) and registering it with the `MessageFactory` in the transport: +`Message` (typically by subclassing `BaseMessage`) and registering it with the `MessageFactory`: [source,java] ---- CustomMessage msg=new CustomMessage(...); -JChannel ch; -TP transport=ch.getProtocolStack().getTransport(); -MessageFactory mf=transport.getMessageFactory(); -mf.register((short)12345, CustomMessage::new) +MessageFactory.register((short)12345, CustomMessage::new) ---- A (unique) ID has to be assigned with the message type, and then it has to be registered with the message factory in the transport. This has to be done before sending an instance of the new message type. If the ID has already been registered before, or is taken, an exception will be thrown. -Note that the default implementation of `MessageFactory` requires all IDs to be greater than 32, so that there's room -for adding built-in message types. +`MessageFactory` requires all IDs to be greater than 32, so that there's room for adding built-in message types. NOTE: It is recommended to register all custom message types _before_ connecting the channel, so that potential errors are detected early. -[[CustomMessageFactory]] -==== Custom `MessageFactory` -`MessageFactory` is a simple interface: - -[source,java] ----- -public interface MessageFactory { - T create(short id); - void register(short type, Supplier generator); -} ----- -We saw the that the `register()` method is used to associate new message types with IDs <>. - -There is a `DefaultMessageFactory` which is set in the transport (`TP`). If more control over the creation of custom -messages is desired, a custom implementation of `MessageFactory` can be written and registered in the transport, using -`TP.setMessageFactory(MessageFactory mf)`. - -An example for why we might want to provide our own `MessageFactory` is that we have control over the creation of -messages; e.g. to create an `NioMessage` with a *direct* `ByteBuffer`, we may want to use a _pool_ of off-heap memory -rather than calling `ByteBuffer.allocateDirect()` for each message, which is slow. - - [[BytesMessage]] ==== BytesMessage This is the equivalent to the 4.x `Message`, and contains a byte array, offset and length. There are methods to get and @@ -365,8 +339,7 @@ The methods of `NioMessage` are: |========================== NOTE: The envisioned use case for `useDirectMemory()` is when we send an `NioMessage` with a direct `ByteBuffer`, but - don't need the `ByteBuffer` to be created in off-heap memory at the receiver, when on-heap will do. + - The alternative is to provide a custom <>. + don't need the `ByteBuffer` to be created in off-heap memory at the receiver, when on-heap will do. diff --git a/src/org/jgroups/BatchMessage.java b/src/org/jgroups/BatchMessage.java index 85276c180f..5a0977c0a1 100644 --- a/src/org/jgroups/BatchMessage.java +++ b/src/org/jgroups/BatchMessage.java @@ -1,4 +1,3 @@ - package org.jgroups; @@ -32,8 +31,6 @@ public class BatchMessage extends BaseMessage implements Iterable { protected Address orig_src; - protected static final MessageFactory mf=new DefaultMessageFactory(); - public BatchMessage() { } @@ -155,7 +152,7 @@ public void readPayload(DataInput in) throws IOException, ClassNotFoundException msgs=new Message[index]; // a bit of additional space should we add byte arrays for(int i=0; i < index; i++) { short type=in.readShort(); - msgs[i]=mf.create(type).setDest(dest()).setSrc(orig_src); + msgs[i]=MessageFactory.create(type).setDest(dest()).setSrc(orig_src); msgs[i].readFrom(in); } } diff --git a/src/org/jgroups/CompositeMessage.java b/src/org/jgroups/CompositeMessage.java index 0b9b7e59c3..795d55bb65 100644 --- a/src/org/jgroups/CompositeMessage.java +++ b/src/org/jgroups/CompositeMessage.java @@ -31,8 +31,6 @@ public class CompositeMessage extends BaseMessage implements Iterable { protected boolean collapse; // send as a BytesMessage when true - protected static final MessageFactory mf=new DefaultMessageFactory(); - public CompositeMessage() { } @@ -147,7 +145,7 @@ public void readPayload(DataInput in) throws IOException, ClassNotFoundException msgs=new Message[index]; // a bit of additional space should we add byte arrays for(int i=0; i < index; i++) { short type=in.readShort(); - msgs[i]=mf.create(type); + msgs[i]=MessageFactory.create(type); msgs[i].readFrom(in); } } diff --git a/src/org/jgroups/DefaultMessageFactory.java b/src/org/jgroups/DefaultMessageFactory.java deleted file mode 100644 index 6ca0f8747a..0000000000 --- a/src/org/jgroups/DefaultMessageFactory.java +++ /dev/null @@ -1,46 +0,0 @@ -package org.jgroups; - -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.function.Supplier; - -/** - * Default implementation of {@link MessageFactory}. Uses an array for message IDs less then 32, and a hashmap for - * types above 32 - * @author Bela Ban - * @since 5.0 - */ -public class DefaultMessageFactory implements MessageFactory { - protected static final byte MIN_TYPE=32; - protected final Supplier[] creators=new Supplier[MIN_TYPE]; - protected Map> map=new HashMap<>(); - - public DefaultMessageFactory() { - creators[Message.BYTES_MSG]=BytesMessage::new; - creators[Message.NIO_MSG]=NioMessage::new; - creators[Message.EMPTY_MSG]=EmptyMessage::new; - creators[Message.OBJ_MSG]=ObjectMessage::new; - creators[Message.LONG_MSG]=LongMessage::new; - creators[Message.COMPOSITE_MSG]=CompositeMessage::new; - creators[Message.FRAG_MSG]=FragmentedMessage::new; - creators[Message.EARLYBATCH_MSG]=BatchMessage::new; - } - - public T create(short type) { - Supplier creator=type < MIN_TYPE? creators[type] : map.get(type); - if(creator == null) - throw new IllegalArgumentException("no creator found for type " + type); - return (T)creator.get(); - } - - public T register(short type, Supplier generator) { - Objects.requireNonNull(generator, "the creator must be non-null"); - if(type < MIN_TYPE) - throw new IllegalArgumentException(String.format("type (%d) must be >= 32", type)); - if(map.containsKey(type)) - throw new IllegalArgumentException(String.format("type %d is already taken", type)); - map.put(type, generator); - return (T)this; - } -} diff --git a/src/org/jgroups/MessageFactory.java b/src/org/jgroups/MessageFactory.java index 1cb3fa65c4..8bdb264820 100644 --- a/src/org/jgroups/MessageFactory.java +++ b/src/org/jgroups/MessageFactory.java @@ -1,21 +1,43 @@ package org.jgroups; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; import java.util.function.Supplier; /** + * Factory to create messages. Uses an array for message IDs less then 32, and a hashmap for + * types above 32 * @author Bela Ban * @since 5.0 */ -public interface MessageFactory { - - +public class MessageFactory { + protected static final byte MIN_TYPE=32; + protected static final Supplier[] creators=new Supplier[MIN_TYPE]; + protected static Map> map=new HashMap<>(); + static { + creators[Message.BYTES_MSG]=BytesMessage::new; + creators[Message.NIO_MSG]=NioMessage::new; + creators[Message.EMPTY_MSG]=EmptyMessage::new; + creators[Message.OBJ_MSG]=ObjectMessage::new; + creators[Message.LONG_MSG]=LongMessage::new; + creators[Message.COMPOSITE_MSG]=CompositeMessage::new; + creators[Message.FRAG_MSG]=FragmentedMessage::new; + creators[Message.EARLYBATCH_MSG]=BatchMessage::new; + } + /** * Creates a message based on the given ID * @param id The ID * @param The type of the message * @return A message */ - T create(short id); + public static T create(short type) { + Supplier creator=type < MIN_TYPE? creators[type] : map.get(type); + if(creator == null) + throw new IllegalArgumentException("no creator found for type " + type); + return (T)creator.get(); + } /** * Registers a new creator of messages @@ -23,5 +45,12 @@ public interface MessageFactory { * needs to be available (ie., not taken by JGroups or other applications). * @param generator The creator of the payload associated with the given type */ - M register(short type, Supplier generator); + public static void register(short type, Supplier generator) { + Objects.requireNonNull(generator, "the creator must be non-null"); + if(type < MIN_TYPE) + throw new IllegalArgumentException(String.format("type (%d) must be >= 32", type)); + if(map.containsKey(type)) + throw new IllegalArgumentException(String.format("type %d is already taken", type)); + map.put(type, generator); + } } diff --git a/src/org/jgroups/protocols/COMPRESS.java b/src/org/jgroups/protocols/COMPRESS.java index b438bd78ca..26a089d32a 100644 --- a/src/org/jgroups/protocols/COMPRESS.java +++ b/src/org/jgroups/protocols/COMPRESS.java @@ -47,7 +47,6 @@ public class COMPRESS extends Protocol { protected BlockingQueue deflater_pool; protected BlockingQueue inflater_pool; - protected MessageFactory msg_factory; protected final LongAdder num_compressions=new LongAdder(), num_decompressions=new LongAdder(); @@ -77,7 +76,6 @@ public void init() throws Exception { inflater_pool=new ArrayBlockingQueue<>(pool_size); for(int i=0; i < pool_size; i++) inflater_pool.add(new Inflater()); - msg_factory=getTransport().getMessageFactory(); } public void destroy() { @@ -192,7 +190,7 @@ protected Message uncompress(Message msg, int original_size, boolean needs_deser inflater.inflate(uncompressed_payload); // we need to copy: https://issues.redhat.com/browse/JGRP-867 if(needs_deserialization) { - return messageFromByteArray(uncompressed_payload, msg_factory); + return messageFromByteArray(uncompressed_payload); } else return msg.copy(false, true).setArray(uncompressed_payload, 0, uncompressed_payload.length); @@ -221,9 +219,9 @@ protected static ByteArray messageToByteArray(Message msg) { } } - protected static Message messageFromByteArray(byte[] uncompressed_payload, MessageFactory msg_factory) { + protected static Message messageFromByteArray(byte[] uncompressed_payload) { try { - return Util.messageFromBuffer(uncompressed_payload, 0, uncompressed_payload.length, msg_factory); + return Util.messageFromBuffer(uncompressed_payload, 0, uncompressed_payload.length); } catch(Exception ex) { throw new RuntimeException("failed unmarshalling message", ex); diff --git a/src/org/jgroups/protocols/Encrypt.java b/src/org/jgroups/protocols/Encrypt.java index 75ba910940..2d5d47c9bf 100644 --- a/src/org/jgroups/protocols/Encrypt.java +++ b/src/org/jgroups/protocols/Encrypt.java @@ -70,8 +70,6 @@ public abstract class Encrypt extends Protocol { // SecureRandom instance for generating IV's protected SecureRandom secure_random = new SecureRandom(); - protected MessageFactory msg_factory; - /** * Sets the key store entry used to configure this protocol. @@ -96,7 +94,6 @@ public abstract class Encrypt extends Protocol { public SecureRandom secureRandom() {return this.secure_random;} /** Allows callers to replace secure_random with impl of their choice, e.g. for performance reasons. */ public > T secureRandom(SecureRandom sr) {this.secure_random = sr; return (T)this;} - public > T msgFactory(MessageFactory f) {this.msg_factory=f; return (T)this;} @ManagedAttribute public String version() {return Util.byteArrayToHexString(sym_version);} @@ -116,8 +113,6 @@ public void init() throws Exception { key_map=new BoundedHashMap<>(key_map_max_size); initSymCiphers(sym_algorithm, secret_key); TP transport=getTransport(); - if(transport != null) - msg_factory=transport.getMessageFactory(); } @@ -322,7 +317,7 @@ protected Message _decrypt(final Cipher cipher, Key key, Message msg, EncryptHea decrypted_msg=cipher.doFinal(msg.getArray(), msg.getOffset(), msg.getLength()); } if(hdr.needsDeserialization()) - return Util.messageFromBuffer(decrypted_msg, 0, decrypted_msg.length, msg_factory); + return Util.messageFromBuffer(decrypted_msg, 0, decrypted_msg.length); else return msg.setArray(decrypted_msg, 0, decrypted_msg.length); } diff --git a/src/org/jgroups/protocols/FRAG.java b/src/org/jgroups/protocols/FRAG.java index 8b8299434f..d466700ab5 100644 --- a/src/org/jgroups/protocols/FRAG.java +++ b/src/org/jgroups/protocols/FRAG.java @@ -42,7 +42,6 @@ public class FRAG extends Fragmentation { protected final FragmentationList fragment_list=new FragmentationList(); protected final AtomicInteger curr_id=new AtomicInteger(1); protected final List
members=new ArrayList<>(11); - protected MessageFactory msg_factory; protected final Predicate HAS_FRAG_HEADER=msg -> msg.getHeader(id) != null; @@ -58,7 +57,6 @@ public class FRAG extends Fragmentation { public void init() throws Exception { super.init(); - msg_factory=getTransport().getMessageFactory(); Map info=new HashMap<>(1); info.put("frag_size", frag_size); down_prot.down(new Event(Event.CONFIG, info)); @@ -229,7 +227,7 @@ private Message unfragment(Message msg, FragHeader hdr) { return null; try { - Message assembled_msg=Util.messageFromBuffer(buf, 0, buf.length, msg_factory); + Message assembled_msg=Util.messageFromBuffer(buf, 0, buf.length); assembled_msg.setSrc(sender); // needed ? YES, because fragments have a null src !! if(log.isTraceEnabled()) log.trace("assembled_msg is " + assembled_msg); num_received_msgs++; diff --git a/src/org/jgroups/protocols/FRAG2.java b/src/org/jgroups/protocols/FRAG2.java index c6d10532c6..fce77931cf 100644 --- a/src/org/jgroups/protocols/FRAG2.java +++ b/src/org/jgroups/protocols/FRAG2.java @@ -46,7 +46,6 @@ public class FRAG2 extends Fragmentation { protected final AtomicLong curr_id=new AtomicLong(1); protected final List
members=new ArrayList<>(11); - protected MessageFactory msg_factory; protected final AverageMinMax avg_size_down=new AverageMinMax(); protected final AverageMinMax avg_size_up=new AverageMinMax(); @@ -75,7 +74,6 @@ public void init() throws Exception { throw new IllegalArgumentException("frag_size (" + frag_size + ") has to be < TP.max_bundle_size (" + max_bundle_size + ")"); } - msg_factory=transport.getMessageFactory(); Map info=new HashMap<>(1); info.put("frag_size", frag_size); down_prot.down(new Event(Event.CONFIG, info)); @@ -261,7 +259,7 @@ protected Message unfragment(Message msg, FragHeader hdr) { FragEntry entry=frag_table.get(hdr.id); if(entry == null) { - entry=new FragEntry(hdr.num_frags, hdr.needs_deserialization, msg_factory); + entry=new FragEntry(hdr.num_frags, hdr.needs_deserialization); FragEntry tmp=frag_table.putIfAbsent(hdr.id, entry); if(tmp != null) entry=tmp; @@ -314,7 +312,7 @@ protected Message assembleMessage(Message[] fragments, boolean needs_deserializa index+=length; } if(needs_deserialization) - retval=Util.messageFromBuffer(combined_buffer, 0, combined_buffer.length, msg_factory); + retval=Util.messageFromBuffer(combined_buffer, 0, combined_buffer.length); else retval.setArray(combined_buffer, 0, combined_buffer.length); return retval; @@ -332,7 +330,6 @@ protected static class FragEntry { protected final Message[] fragments; protected int number_of_frags_recvd; protected final boolean needs_deserialization; - protected final MessageFactory msg_factory; protected final Lock lock=new ReentrantLock(); @@ -340,10 +337,9 @@ protected static class FragEntry { * Creates a new entry * @param tot_frags the number of fragments to expect for this message */ - protected FragEntry(int tot_frags, boolean needs_deserialization, MessageFactory mf) { + protected FragEntry(int tot_frags, boolean needs_deserialization) { fragments=new Message[tot_frags]; this.needs_deserialization=needs_deserialization; - this.msg_factory=mf; } diff --git a/src/org/jgroups/protocols/FRAG3.java b/src/org/jgroups/protocols/FRAG3.java index 6def97ead5..f970dfb544 100644 --- a/src/org/jgroups/protocols/FRAG3.java +++ b/src/org/jgroups/protocols/FRAG3.java @@ -45,8 +45,6 @@ public class FRAG3 extends Fragmentation { protected final List
members=new ArrayList<>(11); - protected MessageFactory msg_factory; - protected final AverageMinMax avg_size_down=new AverageMinMax(); protected final AverageMinMax avg_size_up=new AverageMinMax(); @@ -70,7 +68,6 @@ public void init() throws Exception { if(frag_size >= max_bundle_size) throw new IllegalArgumentException("frag_size (" + frag_size + ") has to be < TP.max_bundle_size (" + max_bundle_size + ")"); - msg_factory=transport.getMessageFactory(); Map info=new HashMap<>(1); info.put("frag_size", frag_size); down_prot.down(new Event(Event.CONFIG, info)); @@ -351,7 +348,7 @@ protected boolean isComplete() { * @return the complete message in one buffer */ protected Message assembleMessage() throws Exception { - return needs_deserialization? Util.messageFromBuffer(buffer, 0, buffer.length, msg_factory) + return needs_deserialization? Util.messageFromBuffer(buffer, 0, buffer.length) : msg.setArray(buffer, 0, buffer.length); } diff --git a/src/org/jgroups/protocols/FRAG4.java b/src/org/jgroups/protocols/FRAG4.java index 41952ebd04..2f13372812 100644 --- a/src/org/jgroups/protocols/FRAG4.java +++ b/src/org/jgroups/protocols/FRAG4.java @@ -4,6 +4,7 @@ import org.jgroups.BytesMessage; import org.jgroups.FragmentedMessage; import org.jgroups.Message; +import org.jgroups.MessageFactory; import org.jgroups.util.ByteArrayDataInputStream; import org.jgroups.util.Range; import org.jgroups.util.Util; @@ -81,7 +82,7 @@ protected Message assembleMessage(Message[] fragments, boolean needs_deserializa m.getOffset(), m.getLength()))); DataInput in=new DataInputStream(seq); - Message retval=msg_factory.create(hdr.getOriginalType()); + Message retval=MessageFactory.create(hdr.getOriginalType()); retval.readFrom(in); return retval; } diff --git a/src/org/jgroups/protocols/SEQUENCER.java b/src/org/jgroups/protocols/SEQUENCER.java index bd796e6a74..ab9bfca113 100644 --- a/src/org/jgroups/protocols/SEQUENCER.java +++ b/src/org/jgroups/protocols/SEQUENCER.java @@ -66,8 +66,6 @@ public class SEQUENCER extends Protocol { /** Used for each resent message to wait until the message has been received */ protected final Promise ack_promise=new Promise<>(); - protected MessageFactory msg_factory; - @Property(description="Size of the set to store received seqnos (for duplicate checking)") @@ -106,7 +104,6 @@ public void resetStats() { public void init() throws Exception { super.init(); - msg_factory=getTransport().getMessageFactory(); } public void start() throws Exception { @@ -457,7 +454,7 @@ protected void broadcast(final Message msg, boolean copy, Address original_sende protected void unwrapAndDeliver(final Message msg, boolean flush_ack) { try { // Message msg_to_deliver=Util.streamableFromBuffer(BytesMessage::new, msg.getArray(), msg.getOffset(), msg.getLength()); - Message msg_to_deliver=Util.messageFromBuffer(msg.getArray(), msg.getOffset(), msg.getLength(), msg_factory); + Message msg_to_deliver=Util.messageFromBuffer(msg.getArray(), msg.getOffset(), msg.getLength()); SequencerHeader hdr=msg_to_deliver.getHeader(this.id); if(flush_ack) hdr.flush_ack=true; diff --git a/src/org/jgroups/protocols/SERIALIZE.java b/src/org/jgroups/protocols/SERIALIZE.java index ba58c4f45d..a65805b5dd 100644 --- a/src/org/jgroups/protocols/SERIALIZE.java +++ b/src/org/jgroups/protocols/SERIALIZE.java @@ -27,12 +27,6 @@ @MBean(description="Serializes entire message into the payload of another message") public class SERIALIZE extends Protocol { protected static final short GMS_ID=ClassConfigurator.getProtocolId(GMS.class); - protected MessageFactory mf; - - public void init() throws Exception { - super.init(); - mf=getTransport().getMessageFactory(); - } public Object down(Message msg) { if(msg.getSrc() == null) @@ -84,7 +78,7 @@ public void up(MessageBatch batch) { protected Message deserialize(Address sender, byte[] buf, int offset, int length) throws Exception { try { - Message msg=Util.messageFromBuffer(buf, offset, length, mf); + Message msg=Util.messageFromBuffer(buf, offset, length); if(msg.getDest() == null) msg.setDest(msg.getDest()); if(msg.getSrc() == null) diff --git a/src/org/jgroups/protocols/TP.java b/src/org/jgroups/protocols/TP.java index 75d3f5556e..8dd53e9be7 100644 --- a/src/org/jgroups/protocols/TP.java +++ b/src/org/jgroups/protocols/TP.java @@ -163,11 +163,6 @@ public abstract class TP extends Protocol implements DiagnosticsHandler.ProbeHan "disables this.",type=AttributeType.TIME) protected long suppress_time_different_cluster_warnings=60000; - @Property(description="The fully qualified name of a MessageFactory implementation",exposeAsManagedAttribute=false) - protected String msg_factory_class; - - protected MessageFactory msg_factory=new DefaultMessageFactory(); - @Property(description="The type of bundler used (\"ring-buffer\", \"transfer-queue\" (default), \"sender-sends\" or " + "\"no-bundler\") or the fully qualified classname of a Bundler implementation") protected String bundler_type="transfer-queue"; @@ -177,9 +172,6 @@ public String getBundlerClass() { return bundler != null? bundler.getClass().getName() : "null"; } - public MessageFactory getMessageFactory() {return msg_factory;} - public T setMessageFactory(MessageFactory m) {msg_factory=m; return (T)this;} - public InetAddress getBindAddr() {return bind_addr;} public T setBindAddr(InetAddress b) {this.bind_addr=b; return (T)this;} @@ -222,20 +214,12 @@ public String getBundlerClass() { public long getSuppressTimeDifferentClusterWarnings() {return suppress_time_different_cluster_warnings;} public T setSuppressTimeDifferentClusterWarnings(long s) {this.suppress_time_different_cluster_warnings=s; return (T)this;} - public String getMsgFactoryClass() {return msg_factory_class;} - public T setMsgFactoryClass(String m) {this.msg_factory_class=m; return (T)this;} - public String getBundlerType() {return bundler_type;} public T setBundlerType(String b) {this.bundler_type=b; return (T)this;} @Property public T useVirtualThreads(boolean f) {use_vthreads=f; return (T)this;} - @ManagedAttribute - public String getMessageFactoryClass() { - return msg_factory != null? msg_factory.getClass().getName() : "n/a"; - } - @ManagedAttribute(description="Is the logical_addr_cache reaper task running") public boolean isLogicalAddressCacheReaperRunning() { return logical_addr_cache_reaper != null && !logical_addr_cache_reaper.isDone(); @@ -782,11 +766,6 @@ public String toString() { else msg_processing_policy.init(this); - if(msg_factory_class != null) { - Class clazz=(Class)Util.loadClass(msg_factory_class, getClass()); - msg_factory=clazz.getDeclaredConstructor().newInstance(); - } - if(bundler == null) { bundler=createBundler(bundler_type, getClass()); bundler.init(this); @@ -1245,7 +1224,7 @@ public void receive(Address sender, byte[] data, int offset, int length) { boolean is_message_list=(flags & LIST) == LIST, multicast=(flags & MULTICAST) == MULTICAST; ByteArrayDataInputStream in=new ByteArrayDataInputStream(data, offset, length); if(is_message_list) // used if message bundling is enabled - handleMessageBatch(in, multicast, msg_factory); + handleMessageBatch(in, multicast); else handleSingleMessage(in, multicast); } @@ -1264,15 +1243,15 @@ public void receive(Address sender, DataInput in, int ignoredLength) throws Exce boolean is_message_list=(flags & LIST) == LIST, multicast=(flags & MULTICAST) == MULTICAST; if(is_message_list) // used if message bundling is enabled - handleMessageBatch(in, multicast, msg_factory); + handleMessageBatch(in, multicast); else handleSingleMessage(in, multicast); } - protected void handleMessageBatch(DataInput in, boolean multicast, MessageFactory factory) { + protected void handleMessageBatch(DataInput in, boolean multicast) { try { - final MessageBatch[] batches=Util.readMessageBatch(in, multicast, factory); + final MessageBatch[] batches=Util.readMessageBatch(in, multicast); final MessageBatch regular=batches[0], oob=batches[1]; // we need to update the stats *before* processing the batches: protocols can remove msgs from the batch @@ -1290,7 +1269,7 @@ protected void handleMessageBatch(DataInput in, boolean multicast, MessageFactor protected void handleSingleMessage(DataInput in, boolean multicast) { try { short type=in.readShort(); - Message msg=msg_factory.create(type); // don't create headers, readFrom() will do this + Message msg=MessageFactory.create(type); // don't create headers, readFrom() will do this msg.readFrom(in); if(!multicast && unicastDestMismatch(msg.getDest())) diff --git a/src/org/jgroups/util/Util.java b/src/org/jgroups/util/Util.java index afe3b09c57..c94e5299e2 100644 --- a/src/org/jgroups/util/Util.java +++ b/src/org/jgroups/util/Util.java @@ -1279,10 +1279,10 @@ public static ByteArray messageToBuffer(Message msg) throws Exception { } - public static Message messageFromBuffer(byte[] buf, int offset, int length, MessageFactory mf) throws Exception { + public static Message messageFromBuffer(byte[] buf, int offset, int length) throws Exception { ByteArrayDataInputStream in=new ByteArrayDataInputStream(buf, offset, length); short type=in.readShort(); - Message msg=mf.create(type); + Message msg=MessageFactory.create(type); msg.readFrom(in); return msg; } @@ -1298,12 +1298,12 @@ public static ByteArray messageToByteBuffer(Message msg) throws Exception { } - public static Message messageFromByteBuffer(byte[] buffer, int offset, int length, MessageFactory mf) throws Exception { + public static Message messageFromByteBuffer(byte[] buffer, int offset, int length) throws Exception { DataInput in=new ByteArrayDataInputStream(buffer,offset,length); if(!in.readBoolean()) return null; short type=in.readShort(); - Message msg=mf.create(type); + Message msg=MessageFactory.create(type); msg.readFrom(in); return msg; } @@ -1435,9 +1435,9 @@ public static void writeMessage(Message msg, DataOutput dos, boolean multicast) msg.writeTo(dos); } - public static Message readMessage(DataInput in, MessageFactory mf) throws IOException, ClassNotFoundException { + public static Message readMessage(DataInput in) throws IOException, ClassNotFoundException { short type=in.readShort(); - Message msg=mf.create(type); + Message msg=MessageFactory.create(type); msg.readFrom(in); return msg; } @@ -1512,7 +1512,7 @@ public static void writeMessageListHeader(Address dest, Address src, byte[] clus } - public static List readMessageList(DataInput in, short transport_id, MessageFactory mf) + public static List readMessageList(DataInput in, short transport_id) throws IOException, ClassNotFoundException { List list=new LinkedList<>(); Address dest=Util.readAddress(in); @@ -1527,7 +1527,7 @@ public static List readMessageList(DataInput in, short transport_id, Me for(int i=0; i < len; i++) { short type=in.readShort(); // skip the - Message msg=mf.create(type); + Message msg=MessageFactory.create(type); msg.readFrom(in); msg.setDest(dest); if(msg.getSrc() == null) @@ -1549,7 +1549,7 @@ public static List readMessageList(DataInput in, short transport_id, Me * * @return an array of 2 MessageBatches in the order above, the first batch is at index 0 */ - public static MessageBatch[] readMessageBatch(DataInput in, boolean multicast, MessageFactory factory) + public static MessageBatch[] readMessageBatch(DataInput in, boolean multicast) throws IOException, ClassNotFoundException { MessageBatch[] batches=new MessageBatch[2]; // [0]: reg, [1]: OOB Address dest=Util.readAddress(in); @@ -1562,7 +1562,7 @@ public static MessageBatch[] readMessageBatch(DataInput in, boolean multicast, M int len=in.readInt(); for(int i=0; i < len; i++) { short type=in.readShort(); - Message msg=factory.create(type).setDest(dest).setSrc(src); + Message msg=MessageFactory.create(type).setDest(dest).setSrc(src); msg.readFrom(in); boolean oob=msg.isFlagSet(Message.Flag.OOB); int index=0; @@ -1590,7 +1590,6 @@ public static void parse(InputStream input, BiConsumer msg_consum if(msg_consumer == null && batch_consumer == null) return; byte[] tmp=new byte[Global.INT_SIZE]; - MessageFactory mf=new DefaultMessageFactory(); try(DataInputStream dis=new DataInputStream(input)) { for(;;) { // for TCP, we send the length first; this needs to be skipped as it is not part of the JGroups payload @@ -1623,7 +1622,7 @@ public static void parse(InputStream input, BiConsumer msg_consum boolean is_message_list=(flags & LIST) == LIST; boolean multicast=(flags & MULTICAST) == MULTICAST; if(is_message_list) { // used if message bundling is enabled - final MessageBatch[] batches=Util.readMessageBatch(dis,multicast, mf); + final MessageBatch[] batches=Util.readMessageBatch(dis,multicast); for(MessageBatch batch: batches) { if(batch == null) continue; @@ -1636,7 +1635,7 @@ public static void parse(InputStream input, BiConsumer msg_consum } } else { - Message msg=Util.readMessage(dis, mf); + Message msg=Util.readMessage(dis); if(msg_consumer != null) msg_consumer.accept(version, msg); } diff --git a/tests/junit-functional/org/jgroups/protocols/ENCRYPTKeystoreTest.java b/tests/junit-functional/org/jgroups/protocols/ENCRYPTKeystoreTest.java index 51be011dd7..67391dc660 100644 --- a/tests/junit-functional/org/jgroups/protocols/ENCRYPTKeystoreTest.java +++ b/tests/junit-functional/org/jgroups/protocols/ENCRYPTKeystoreTest.java @@ -5,7 +5,6 @@ import org.jgroups.BytesMessage; -import org.jgroups.DefaultMessageFactory; import org.jgroups.Global; import org.jgroups.Message; import org.jgroups.conf.ClassConfigurator; @@ -33,7 +32,6 @@ public void testInitWrongKeystoreProperties() { SYM_ENCRYPT encrypt=new SYM_ENCRYPT().keystoreName("unkownKeystore.keystore"); try { encrypt.init(); - encrypt.msgFactory(new DefaultMessageFactory()); } catch(Exception e) { System.out.println("didn't find incorrect keystore (as expected): " + e.getMessage()); @@ -43,7 +41,6 @@ public void testInitWrongKeystoreProperties() { public void testInitKeystoreProperties() throws Exception { SYM_ENCRYPT encrypt=new SYM_ENCRYPT().keystoreName("defaultStore.keystore"); encrypt.init(); - encrypt.msgFactory(new DefaultMessageFactory()); } public void testMessageDownEncode() throws Exception { @@ -157,7 +154,6 @@ public void testEncryptEntireMessage() throws Exception { protected SYM_ENCRYPT create(String keystore) throws Exception { SYM_ENCRYPT encrypt=new SYM_ENCRYPT().keystoreName(keystore).symAlgorithm(symAlgorithm()).symIvLength(symIvLength()); encrypt.init(); - encrypt.msgFactory(new DefaultMessageFactory()); return encrypt; } diff --git a/tests/junit-functional/org/jgroups/protocols/EncryptTest.java b/tests/junit-functional/org/jgroups/protocols/EncryptTest.java index 6cf7a3edac..275bcf1959 100644 --- a/tests/junit-functional/org/jgroups/protocols/EncryptTest.java +++ b/tests/junit-functional/org/jgroups/protocols/EncryptTest.java @@ -129,7 +129,6 @@ public void testMessageSendingByRogueUsingEncryption() throws Exception { secretKey.setAccessible(true); Util.setField(secretKey, encrypt, secret_key); encrypt.init(); - encrypt.msgFactory(new DefaultMessageFactory()); short encrypt_id=ClassConfigurator.getProtocolId(SYM_ENCRYPT.class); byte[] iv = encrypt.makeIv(); diff --git a/tests/junit-functional/org/jgroups/tests/BatchMessageTest.java b/tests/junit-functional/org/jgroups/tests/BatchMessageTest.java index 49bc69649f..b2d8cfd869 100644 --- a/tests/junit-functional/org/jgroups/tests/BatchMessageTest.java +++ b/tests/junit-functional/org/jgroups/tests/BatchMessageTest.java @@ -22,8 +22,6 @@ public class BatchMessageTest extends MessageTestBase { protected static final Message M2=create(DEST, 1000, true, true); protected static final Message M3=new EmptyMessage(DEST); - protected static final MessageFactory MF=new DefaultMessageFactory(); - public void testCreation() { BatchMessage msg=new BatchMessage(DEST, SRC, new Message[]{M1,M2,M3}, 3); assert msg.getNumberOfMessages() == 3; diff --git a/tests/junit-functional/org/jgroups/tests/CompositeMessageTest.java b/tests/junit-functional/org/jgroups/tests/CompositeMessageTest.java index aba3d106ff..3b034be00c 100644 --- a/tests/junit-functional/org/jgroups/tests/CompositeMessageTest.java +++ b/tests/junit-functional/org/jgroups/tests/CompositeMessageTest.java @@ -21,8 +21,6 @@ public class CompositeMessageTest extends MessageTestBase { protected static final Message M2=create(DEST, 1000, true, true); protected static final Message M3=new EmptyMessage(DEST); - protected static final MessageFactory MF=new DefaultMessageFactory(); - public void testCreation() { CompositeMessage msg=new CompositeMessage(DEST, M1, M2); assert msg.getNumberOfMessages() == 2; @@ -62,7 +60,7 @@ public void testCollapse() throws Exception { CompositeMessage msg=new CompositeMessage(DEST, M1, M2, M3).collapse(true); int length=msg.getLength(); ByteArray buf=Util.messageToBuffer(msg); - Message msg2=Util.messageFromBuffer(buf.getArray(), buf.getOffset(), buf.getLength(), MF); + Message msg2=Util.messageFromBuffer(buf.getArray(), buf.getOffset(), buf.getLength()); assert msg2 instanceof BytesMessage; assert msg2.getLength() == length; } @@ -75,7 +73,7 @@ public void testCollapse2() throws Exception { .collapse(true); int length=msg.getLength(); ByteArray buf=Util.messageToBuffer(msg); - Message msg2=Util.messageFromBuffer(buf.getArray(), buf.getOffset(), buf.getLength(), MF); + Message msg2=Util.messageFromBuffer(buf.getArray(), buf.getOffset(), buf.getLength()); assert msg2 instanceof BytesMessage; assert msg2.getLength() == length; diff --git a/tests/junit-functional/org/jgroups/tests/FragmentedMessageTest.java b/tests/junit-functional/org/jgroups/tests/FragmentedMessageTest.java index 9df6db3bfe..dfde70a987 100644 --- a/tests/junit-functional/org/jgroups/tests/FragmentedMessageTest.java +++ b/tests/junit-functional/org/jgroups/tests/FragmentedMessageTest.java @@ -17,7 +17,6 @@ @Test(groups=Global.FUNCTIONAL) public class FragmentedMessageTest { protected static final int FRAG_SIZE=500; - protected final MessageFactory msg_factory=new DefaultMessageFactory(); protected final byte[] array=Util.generateArray(1200); protected final Address src=Util.createRandomAddress("X"), dest=Util.createRandomAddress("D"); @@ -109,7 +108,7 @@ protected void _testFragmentation(Message original_msg, Consumer verifi new SequenceInputStream(Util.enumerate(msgs, 0, msgs.length, m -> new ByteArrayDataInputStream(m.getArray(),m.getOffset(),m.getLength()))); DataInput input=new DataInputStream(seq); - Message new_msg=msg_factory.create(original_msg.getType()); + Message new_msg=MessageFactory.create(original_msg.getType()); new_msg.readFrom(input); assert original_msg.getLength() == new_msg.getLength(); verifier.accept(new_msg); diff --git a/tests/junit-functional/org/jgroups/tests/MessageBatchTest.java b/tests/junit-functional/org/jgroups/tests/MessageBatchTest.java index e8b538b149..4e990eae3a 100644 --- a/tests/junit-functional/org/jgroups/tests/MessageBatchTest.java +++ b/tests/junit-functional/org/jgroups/tests/MessageBatchTest.java @@ -378,7 +378,6 @@ public void testTotalSize() { public void testSize() throws Exception { - MessageFactory mf=new DefaultMessageFactory(); List msgs=createMessages(); ByteArrayOutputStream output=new ByteArrayOutputStream(); DataOutputStream out=new DataOutputStream(output); @@ -391,7 +390,7 @@ public void testSize() throws Exception { DataInputStream in=new DataInputStream(new ByteArrayInputStream(buf)); in.readShort(); // version in.readByte(); // flags - List list=Util.readMessageList(in, UDP_ID, mf); + List list=Util.readMessageList(in, UDP_ID); assert msgs.size() == list.size(); } diff --git a/tests/junit-functional/org/jgroups/tests/MessageFactoryTest.java b/tests/junit-functional/org/jgroups/tests/MessageFactoryTest.java index 4877898959..2154d9b532 100644 --- a/tests/junit-functional/org/jgroups/tests/MessageFactoryTest.java +++ b/tests/junit-functional/org/jgroups/tests/MessageFactoryTest.java @@ -15,21 +15,20 @@ */ @Test(groups=Global.FUNCTIONAL) public class MessageFactoryTest { - protected final MessageFactory mf=new DefaultMessageFactory(); public void testRegistration() { for(int i=0; i < 32; i++) { try { - mf.register((short)i, MyMessageFactory::new); + MessageFactory.register((short)i, MyMessageFactory::new); } catch(IllegalArgumentException ex) { System.out.printf("received exception (as expected): %s\n", ex); } } - mf.register((short)32, MyMessageFactory::new); + MessageFactory.register((short)32, MyMessageFactory::new); try { - mf.register((short)32, MyMessageFactory::new); + MessageFactory.register((short)32, MyMessageFactory::new); } catch(IllegalArgumentException ex) { System.out.printf("received exception (as expected): %s\n", ex); diff --git a/tests/junit-functional/org/jgroups/tests/NioMessageTest.java b/tests/junit-functional/org/jgroups/tests/NioMessageTest.java index b790c786c8..bd478e1bd1 100644 --- a/tests/junit-functional/org/jgroups/tests/NioMessageTest.java +++ b/tests/junit-functional/org/jgroups/tests/NioMessageTest.java @@ -1,6 +1,5 @@ package org.jgroups.tests; -import org.jgroups.DefaultMessageFactory; import org.jgroups.Global; import org.jgroups.Message; import org.jgroups.NioMessage; @@ -236,7 +235,7 @@ public void testReadonly() throws Exception { ByteBuffer payload=ByteBuffer.allocate(4).putInt(322649).flip().asReadOnlyBuffer(); Message msg=new NioMessage(null, payload); ByteArray buf=Util.messageToBuffer(msg); - NioMessage msg2=(NioMessage)Util.messageFromBuffer(buf.getArray(), buf.getOffset(), buf.getLength(), new DefaultMessageFactory()); + NioMessage msg2=(NioMessage)Util.messageFromBuffer(buf.getArray(), buf.getOffset(), buf.getLength()); ByteBuffer buf2=msg2.getBuf(); assert payload.equals(buf2); diff --git a/tests/junit-functional/org/jgroups/tests/UtilTest.java b/tests/junit-functional/org/jgroups/tests/UtilTest.java index a2091b4366..21061eb2e9 100644 --- a/tests/junit-functional/org/jgroups/tests/UtilTest.java +++ b/tests/junit-functional/org/jgroups/tests/UtilTest.java @@ -505,17 +505,16 @@ public void testWriteAndReadString() throws IOException { } public void testMessageToByteBuffer() throws Exception { - MessageFactory mf=new DefaultMessageFactory(); - _testMessage(new EmptyMessage(), mf); - _testMessage(new BytesMessage(null, "hello world"), mf); - _testMessage(new EmptyMessage(null).setSrc(Util.createRandomAddress()), mf); - _testMessage(new EmptyMessage(null).setSrc(Util.createRandomAddress()), mf); - _testMessage(new BytesMessage(null, "bela").setSrc(Util.createRandomAddress()), mf); + _testMessage(new EmptyMessage()); + _testMessage(new BytesMessage(null, "hello world")); + _testMessage(new EmptyMessage(null).setSrc(Util.createRandomAddress())); + _testMessage(new EmptyMessage(null).setSrc(Util.createRandomAddress())); + _testMessage(new BytesMessage(null, "bela").setSrc(Util.createRandomAddress())); } - private static void _testMessage(Message msg, final MessageFactory mf) throws Exception { + private static void _testMessage(Message msg) throws Exception { ByteArray buf=Util.messageToByteBuffer(msg); - Message msg2=Util.messageFromByteBuffer(buf.getArray(), buf.getOffset(), buf.getLength(), mf); + Message msg2=Util.messageFromByteBuffer(buf.getArray(), buf.getOffset(), buf.getLength()); Assert.assertEquals(msg.getSrc(), msg2.getSrc()); Assert.assertEquals(msg.getDest(), msg2.getDest()); Assert.assertEquals(msg.getLength(), msg2.getLength());