Skip to content

Commit

Permalink
update connector runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed Apr 18, 2024
1 parent b91598b commit 8656a8b
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import com.google.protobuf.Any;
import com.google.protobuf.StringValue;

import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand All @@ -23,22 +26,18 @@ public static void main(String[] args) {
// TODO:添加shutDownHook

try {
ConfigService.getInstance()
.setConfigPath(EventMeshConstants.EVENTMESH_CONF_HOME + File.separator)
.setRootConfig(EventMeshConstants.EVENTMESH_CONF_FILE);

EventMeshServer server = new EventMeshServer();
BannerUtil.generateBanner();
server.init();
server.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
log.info("eventMesh shutting down hook begin.");
log.info("runtime shutting down hook begin.");
long start = System.currentTimeMillis();
server.shutdown();
long end = System.currentTimeMillis();

log.info("eventMesh shutdown cost {}ms", end - start);
log.info("runtime shutdown cost {}ms", end - start);
} catch (Exception e) {
log.error("exception when shutdown.", e);
}
Expand All @@ -60,27 +59,28 @@ public static void main(String[] args) {
StreamObserver<Payload> responseObserver = new StreamObserver<Payload>() {
@Override
public void onNext(Payload response) {
System.out.println("Received response: " + response.getBody());
log.info("runtime receive message: {} ", response);
}

@Override
public void onError(Throwable t) {
System.out.println("Error: " + t.getMessage());
log.error("runtime receive error message: {}", t.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Stream completed");
log.info("runtime finished receive message and completed");
}
};

// 创建一个请求观察者
StreamObserver<Payload> requestObserver = stub.invokeBiStream(responseObserver);

StringValue stringValue = StringValue.newBuilder().setValue("test").build();
Any test = Any.pack(stringValue);
// 发送请求
for (int i = 0; i < 10; i++) {
Payload request = Payload.newBuilder()
.setBody("t")
.setBody(test)
.build();
requestObserver.onNext(request);
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 8656a8b

Please sign in to comment.