Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JGRP-2848 Composite message additions #865

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions src/org/jgroups/BaseMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,19 @@ public int size() {

retval+=Global.SHORT_SIZE; // number of headers
retval+=Headers.marshalledSize(this.headers);
retval+=payloadSize();
return retval;
}

public int sizeNoAddrs(Address src) {
int retval=Global.BYTE_SIZE // leading byte
+ Global.SHORT_SIZE; // flags
if(sender != null && !sender.equals(src))
retval+=Util.size(sender);

retval+=Global.SHORT_SIZE; // number of headers
retval+=Headers.marshalledSize(this.headers);
retval+=payloadSize();
return retval;
}

Expand Down Expand Up @@ -274,7 +287,7 @@ public void writeTo(DataOutput out) throws IOException {
public void writeToNoAddrs(Address src, DataOutput out) throws IOException {
byte leading=0;

boolean write_src_addr=src == null || sender != null && !sender.equals(src);
boolean write_src_addr=sender != null && !sender.equals(src);

if(write_src_addr)
leading=Util.setFlag(leading, SRC_SET);
Expand Down Expand Up @@ -341,5 +354,5 @@ protected static Header[] createHeaders(int size) {
return size > 0? new Header[size] : new Header[Util.DEFAULT_HEADERS];
}


protected abstract int payloadSize();
}
8 changes: 4 additions & 4 deletions src/org/jgroups/BatchMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ public String toString() {
return String.format("%s, %d message(s)", super.toString(), getNumberOfMessages());
}

public int size() {
int retval=super.size() + Global.INT_SIZE;
@Override protected int payloadSize() {
int retval=Global.INT_SIZE; // count
retval+=Util.size(orig_src);
if(msgs != null) {
for(int i=0; i < index; i++)
retval+=msgs[i].size() + Global.SHORT_SIZE; // type
retval+=msgs[i].sizeNoAddrs(getSrc()) + Global.SHORT_SIZE; // type
}
return retval;
}
Expand All @@ -140,7 +140,7 @@ public void writePayload(DataOutput out) throws IOException {
for(int i=0; i < index; i++) {
Message msg=msgs[i];
out.writeShort(msg.getType());
msg.writeToNoAddrs(this.src(), out);
msg.writeToNoAddrs(getSrc(), out);
}
}
}
Expand Down
7 changes: 1 addition & 6 deletions src/org/jgroups/BytesMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,6 @@ public <T extends Object> T getObject(ClassLoader loader) {
}


public int size() {
return super.size() +sizeOfPayload();
}


/**
* Copies the byte array. If offset and length are used (to refer to another array), the copy will contain only
* the subset that offset and length point to, copying the subset into the new copy.<p/>
Expand All @@ -245,7 +240,7 @@ public int size() {
return copy;
}

protected int sizeOfPayload() {
@Override protected int payloadSize() {
int retval=Global.INT_SIZE; // length
if(array != null)
retval+=length; // number of bytes in the array
Expand Down
12 changes: 5 additions & 7 deletions src/org/jgroups/CompositeMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@


import org.jgroups.util.ByteArray;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
Expand Down Expand Up @@ -118,11 +117,11 @@ public String toString() {
return String.format("%s, %d message(s)", super.toString(), getNumberOfMessages());
}

public int size() {
int retval=super.size() + Global.INT_SIZE; // length
@Override protected int payloadSize() {
int retval=Global.INT_SIZE; // count
if(msgs != null) {
for(int i=0; i < index; i++)
retval+=msgs[i].size() + Global.SHORT_SIZE; // type
retval+=msgs[i].sizeNoAddrs(getSrc()) + Global.SHORT_SIZE; // type
}
return retval;
}
Expand All @@ -142,8 +141,7 @@ public void writePayload(DataOutput out) throws IOException {
for(int i=0; i < index; i++) {
Message msg=msgs[i];
out.writeShort(msg.getType());
// msg.writeToNoAddrs(src(), out);
msg.writeTo(out);
msg.writeToNoAddrs(getSrc(), out);
}
}
}
Expand All @@ -156,7 +154,7 @@ public void readPayload(DataInput in) throws IOException, ClassNotFoundException
short type=in.readShort();
Message msg=MessageFactory.create(type).setDest(getDest());
if(msg.getSrc() == null)
msg.setSrc(src());
msg.setSrc(getSrc());
msg.readFrom(in);
msgs[i]=msg;
}
Expand Down
2 changes: 2 additions & 0 deletions src/org/jgroups/EmptyMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ public void writePayload(DataOutput out) throws IOException {
public void readPayload(DataInput in) throws IOException, ClassNotFoundException {
// no payload to read
}

protected int payloadSize() { return 0; }
}
2 changes: 1 addition & 1 deletion src/org/jgroups/FragmentedMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public FragmentedMessage(Message original_msg, int off, int len) {
public boolean hasArray() {return false;}
public boolean hasPayload() {return true;}
public Supplier<Message> create() {return FragmentedMessage::new;}
protected int sizeOfPayload() {return Global.INT_SIZE + length;}
protected int payloadSize() {return Global.INT_SIZE + length;}

@Override
protected Message copyPayload(Message copy) {
Expand Down
4 changes: 2 additions & 2 deletions src/org/jgroups/LongMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ public void readPayload(DataInput in) throws IOException, ClassNotFoundException
value=Bits.readLongCompressed(in);
}

public int size() {
return super.size() + Bits.size(value);
@Override protected int payloadSize() {
return Bits.size(value);
}

public String toString() {
Expand Down
3 changes: 3 additions & 0 deletions src/org/jgroups/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ default Message setFlag(short flag, boolean transient_flags) {
*/
int size();

/** Returns the exact size of the marshalled message without destination (and possibly source) address */
int sizeNoAddrs(Address src);

/** Writes the message to an output stream excluding the destination (and possibly source) address */
void writeToNoAddrs(Address src, DataOutput out) throws IOException;

Expand Down
4 changes: 1 addition & 3 deletions src/org/jgroups/NioMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,6 @@ public NioMessage copy(boolean copy_payload, boolean copy_headers) {

/* ----------------------------------- Interface Streamable ------------------------------- */

public int size() {return super.size() +sizeOfPayload();}

public String toString() {
return String.format("%s %s", super.toString(), use_direct_memory_for_allocations? "(direct)" : "");
}
Expand All @@ -191,7 +189,7 @@ public String toString() {
return copy;
}

protected int sizeOfPayload() {
@Override protected int payloadSize() {
return Global.INT_SIZE + getLength() + Global.BYTE_SIZE; // for use_direct_memory_for_allocations
}

Expand Down
8 changes: 2 additions & 6 deletions src/org/jgroups/ObjectMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public ObjectMessage(Address dest, SizeStreamable obj) {
public boolean hasPayload() {return obj != null;}
public boolean hasArray() {return false;}
public int getOffset() {return 0;}
public int getLength() {return obj != null? objSize() : 0;}
public int getLength() {return obj != null? payloadSize() : 0;}
public byte[] getArray() {throw new UnsupportedOperationException();}
public ObjectMessage setArray(byte[] b, int off, int len) {throw new UnsupportedOperationException();}
public ObjectMessage setArray(ByteArray buf) {throw new UnsupportedOperationException();}
Expand Down Expand Up @@ -132,10 +132,6 @@ public <T extends Object> T getObject() {
return isWrapped() || obj instanceof ObjectWrapperPrimitive? ((ObjectWrapperPrimitive)obj).getObject() : (T)obj;
}

public int size() {
return super.size() + objSize();
}

public void writePayload(DataOutput out) throws IOException {
Util.writeGenericStreamable(obj, out);
}
Expand All @@ -157,7 +153,7 @@ public String toString() {
return super.toString() + String.format(", obj: %s", obj);
}

protected int objSize() {
@Override protected int payloadSize() {
return Util.size(obj);
}
}
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/RingBufferBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ protected int marshalMessagesToSameDestination(Address dest, Message[] buf, int
if(msg != null && Objects.equals(dest, msg.getDest())) {
if(list != null)
list.add(msg);
int size=msg.size() + Global.SHORT_SIZE;
int size=msg.sizeNoAddrs(msg.getSrc()) + Global.SHORT_SIZE;
if(bytes + size > max_bundle_size)
break;
bytes+=size;
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/RingBufferBundlerLockless.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ protected int marshalMessagesToSameDestination(Address dest, Message[] buf,
while(available_msgs > 0) {
Message msg=buf[start_index];
if(msg != null && Objects.equals(dest, msg.getDest())) {
int msg_size=msg.size() + Global.SHORT_SIZE;;
int msg_size=msg.sizeNoAddrs(msg.getSrc()) + Global.SHORT_SIZE;;
if(bytes + msg_size > max_bundle_size)
break;
bytes+=msg_size;
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/RingBufferBundlerLockless2.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ protected int marshalMessagesToSameDestination(Address dest, Message[] buf, fina
for(int i=start_index; i != end_index; i=increment(i)) {
Message msg=buf[i];
if(msg != null && msg != NULL_MSG && Objects.equals(dest, msg.getDest())) {
int msg_size=msg.size() + Global.SHORT_SIZE;
int msg_size=msg.sizeNoAddrs(msg.getSrc()) + Global.SHORT_SIZE;
if(bytes + msg_size > max_bundle_size)
break;
bytes+=msg_size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,9 @@ public void writePayload(DataOutput out) throws IOException {

public void readPayload(DataInput in) throws IOException, ClassNotFoundException {
}

protected int payloadSize() {
return 0;
}
}
}