Skip to content

Commit

Permalink
More programmatic setters in TP
Browse files Browse the repository at this point in the history
  • Loading branch information
cfredri4 committed Oct 17, 2024
1 parent 2898376 commit 75edfaa
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/BaseBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public String printBuffers() {

public void init(TP transport) {
this.transport=transport;
msg_processing_policy=transport.msgProcessingPolicy();
msg_processing_policy=transport.getMsgProcessingPolicy();
msg_stats=transport.getMessageStats();
log=transport.getLog();
output=new ByteArrayDataOutputStream(max_size + MSG_OVERHEAD);
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/DAISYCHAIN.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public Object down(Message msg) {
msg.setSrc(local_addr);
if(log.isTraceEnabled())
log.trace("%s: looping back message %s", local_addr, msg);
transport.msgProcessingPolicy().loopback(msg, msg.isFlagSet(Message.Flag.OOB));
transport.getMsgProcessingPolicy().loopback(msg, msg.isFlagSet(Message.Flag.OOB));
}

// we need to copy the message, as we cannot do a msg.setSrc(next): the next retransmission
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/PerDestinationBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public double avgBatchSize() {

public void init(TP transport) {
this.transport=Objects.requireNonNull(transport);
msg_processing_policy=transport.msgProcessingPolicy();
msg_processing_policy=transport.getMsgProcessingPolicy();
msg_stats=transport.getMessageStats();
this.log=transport.getLog();
}
Expand Down
14 changes: 10 additions & 4 deletions src/org/jgroups/protocols/TP.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ public String getBundlerClass() {
public long getTimeServiceInterval() {return time_service_interval;}
public <T extends TP> T setTimeServiceInterval(long t) {this.time_service_interval=t; return (T)this;}

public boolean isUseVirtualThreads() {return use_vthreads;}
public <T extends TP> T useVirtualThreads(boolean b) {use_vthreads=b; return (T)this;}

public boolean logDiscardMsgs() {return log_discard_msgs;}
public <T extends TP> T logDiscardMsgs(boolean l) {this.log_discard_msgs=l; return (T)this;}

Expand Down Expand Up @@ -249,10 +252,9 @@ public <T extends Protocol> T setLevel(String level) {

@ManagedOperation(description="Changes the message processing policy. The fully qualified name of a class " +
"implementing MessageProcessingPolicy needs to be given")
public void setMessageProcessingPolicy(String policy) {
public <T extends TP> T setMessageProcessingPolicy(String policy) {
if(policy == null)
return;

return (T)this;
if(policy.startsWith("submit"))
msg_processing_policy=new SubmitToThreadPool();
else if(policy.startsWith("max"))
Expand All @@ -272,6 +274,7 @@ else if(policy.startsWith("unbatch"))
catch(Exception e) {
log.error("failed setting message_processing_policy", e);
}
return (T)this;
}

public MessageProcessingPolicy getMessageProcessingPolicy() {return msg_processing_policy;}
Expand Down Expand Up @@ -443,7 +446,10 @@ protected TP() {
}

public MsgStats getMessageStats() {return msg_stats;}
public MessageProcessingPolicy msgProcessingPolicy() {return msg_processing_policy;}

public MessageProcessingPolicy getMsgProcessingPolicy() {return msg_processing_policy;}
public <T extends TP> T msgProcessingPolicy(MessageProcessingPolicy p) {this.msg_processing_policy=p; return (T)this;}

public RTT getRTT() {return rtt;}

@Override
Expand Down

0 comments on commit 75edfaa

Please sign in to comment.