Skip to content

Commit

Permalink
Merge pull request #963 from lrhkobe/trace_improve_3
Browse files Browse the repository at this point in the history
[ISSUE #957] add trace for http protocol in eventmesh-runtime
close #957
  • Loading branch information
xwm1992 authored Jun 25, 2022
2 parents 2cace5f + 2b41160 commit dab1086
Show file tree
Hide file tree
Showing 7 changed files with 456 additions and 302 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Map;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
Expand Down Expand Up @@ -64,60 +65,70 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan
cloudEventBuilder = CloudEventBuilder.v1();

cloudEventBuilder = cloudEventBuilder.withId(sendMessageRequestBody.getBizSeqNo())
.withSubject(sendMessageRequestBody.getTopic())
.withType("eventmeshmessage")
.withSource(URI.create("/"))
.withData(content.getBytes(StandardCharsets.UTF_8))
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
.withExtension(ProtocolKey.VERSION, version.getVersion())
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(SendMessageRequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo())
.withExtension(SendMessageRequestBody.UNIQUEID, sendMessageRequestBody.getUniqueId())
.withExtension(SendMessageRequestBody.PRODUCERGROUP,
sendMessageRequestBody.getProducerGroup())
.withExtension(SendMessageRequestBody.TTL, sendMessageRequestBody.getTtl());
.withSubject(sendMessageRequestBody.getTopic())
.withType("eventmeshmessage")
.withSource(URI.create("/"))
.withData(content.getBytes(StandardCharsets.UTF_8))
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
.withExtension(ProtocolKey.VERSION, version.getVersion())
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(SendMessageRequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo())
.withExtension(SendMessageRequestBody.UNIQUEID, sendMessageRequestBody.getUniqueId())
.withExtension(SendMessageRequestBody.PRODUCERGROUP,
sendMessageRequestBody.getProducerGroup())
.withExtension(SendMessageRequestBody.TTL, sendMessageRequestBody.getTtl());
if (StringUtils.isNotEmpty(sendMessageRequestBody.getTag())) {
cloudEventBuilder = cloudEventBuilder.withExtension(SendMessageRequestBody.TAG, sendMessageRequestBody.getTag());
}
if (sendMessageRequestBody.getExtFields() != null && sendMessageRequestBody.getExtFields().size() > 0) {
for (Map.Entry<String, String> entry : sendMessageRequestBody.getExtFields().entrySet()) {
cloudEventBuilder = cloudEventBuilder.withExtension(entry.getKey(), entry.getValue());
}
}
event = cloudEventBuilder.build();
} else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) {
cloudEventBuilder = CloudEventBuilder.v03();
cloudEventBuilder = cloudEventBuilder.withId(sendMessageRequestBody.getBizSeqNo())
.withSubject(sendMessageRequestBody.getTopic())
.withType("eventmeshmessage")
.withSource(URI.create("/"))
.withData(content.getBytes(StandardCharsets.UTF_8))
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
.withExtension(ProtocolKey.VERSION, version.getVersion())
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(SendMessageRequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo())
.withExtension(SendMessageRequestBody.UNIQUEID, sendMessageRequestBody.getUniqueId())
.withExtension(SendMessageRequestBody.PRODUCERGROUP,
sendMessageRequestBody.getProducerGroup())
.withExtension(SendMessageRequestBody.TTL, sendMessageRequestBody.getTtl());
.withSubject(sendMessageRequestBody.getTopic())
.withType("eventmeshmessage")
.withSource(URI.create("/"))
.withData(content.getBytes(StandardCharsets.UTF_8))
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
.withExtension(ProtocolKey.VERSION, version.getVersion())
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(SendMessageRequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo())
.withExtension(SendMessageRequestBody.UNIQUEID, sendMessageRequestBody.getUniqueId())
.withExtension(SendMessageRequestBody.PRODUCERGROUP,
sendMessageRequestBody.getProducerGroup())
.withExtension(SendMessageRequestBody.TTL, sendMessageRequestBody.getTtl());
if (StringUtils.isNotEmpty(sendMessageRequestBody.getTag())) {
cloudEventBuilder = cloudEventBuilder.withExtension(SendMessageRequestBody.TAG, sendMessageRequestBody.getTag());
}
if (sendMessageRequestBody.getExtFields() != null && sendMessageRequestBody.getExtFields().size() > 0) {
for (Map.Entry<String, String> entry : sendMessageRequestBody.getExtFields().entrySet()) {
cloudEventBuilder = cloudEventBuilder.withExtension(entry.getKey(), entry.getValue());
}
}
event = cloudEventBuilder.build();
}
return event;
Expand Down
Loading

0 comments on commit dab1086

Please sign in to comment.