From 78535fa763eea1b05843cec286077d9417e7dde3 Mon Sep 17 00:00:00 2001 From: sodaRyCN <757083350@qq.com> Date: Tue, 23 Apr 2024 15:02:35 +0800 Subject: [PATCH] reactor registry --- .../admin/server/web/BaseServer.java | 26 +++++++++++++++++++ .../admin/server/web/GrpcServer.java | 3 +++ .../registry/AbstractRegistryListener.java | 14 ---------- .../eventmesh/registry/NotifyEvent.java | 14 ++++++++++ .../eventmesh/registry/RegistryFactory.java | 2 +- .../eventmesh/registry/RegistryListener.java | 2 +- .../registry/nacos/NacosDiscoveryService.java | 12 ++++++++- 7 files changed, 56 insertions(+), 17 deletions(-) create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/BaseServer.java delete mode 100644 eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/AbstractRegistryListener.java create mode 100644 eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/NotifyEvent.java diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/BaseServer.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/BaseServer.java new file mode 100644 index 0000000000..b1d516d654 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/BaseServer.java @@ -0,0 +1,26 @@ +package com.apache.eventmesh.admin.server.web; + +import com.apache.eventmesh.admin.server.ComponentLifeCycle; +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +@Slf4j +public abstract class BaseServer implements ComponentLifeCycle { + @PostConstruct + public void init() { + log.info("[{}] server starting at port [{}]", this.getClass().getSimpleName(), getPort()); + start(); + log.info("[{}] server started at port [{}]", this.getClass().getSimpleName(), getPort()); + } + + @PreDestroy + public void shutdown() { + log.info("[{}] server will destroy", this.getClass().getSimpleName()); + destroy(); + log.info("[{}] server has be destroy", this.getClass().getSimpleName()); + } + + public abstract int getPort(); +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/GrpcServer.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/GrpcServer.java index fee889a89f..4939e2fec1 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/GrpcServer.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/GrpcServer.java @@ -4,9 +4,12 @@ import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminBiStreamServiceGrpc; import org.springframework.stereotype.Controller; +import javax.annotation.PostConstruct; + @Controller public class GrpcServer extends AdminBiStreamServiceGrpc.AdminBiStreamServiceImplBase implements ComponentLifeCycle { + @PostConstruct @Override public void start() { diff --git a/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/AbstractRegistryListener.java b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/AbstractRegistryListener.java deleted file mode 100644 index f5e36677ca..0000000000 --- a/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/AbstractRegistryListener.java +++ /dev/null @@ -1,14 +0,0 @@ -package org.apache.eventmesh.registry; - -public abstract class AbstractRegistryListener implements RegistryListener { - protected abstract boolean checkType(Object data); - @Override - @SuppressWarnings("unchecked") - public void onChange(Object data) { - if (!checkType(data)) { - return; - } - process((T)data); - } - protected abstract void process(T data); -} diff --git a/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/NotifyEvent.java b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/NotifyEvent.java new file mode 100644 index 0000000000..da6cb7084e --- /dev/null +++ b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/NotifyEvent.java @@ -0,0 +1,14 @@ +package org.apache.eventmesh.registry; + +import lombok.Getter; + +import java.util.List; + +public class NotifyEvent { + // means whether it is an increment data + @Getter + private boolean isIncrement = false; + + @Getter + private List instances; +} diff --git a/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/RegistryFactory.java b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/RegistryFactory.java index d1c1f23892..bd6d37e047 100644 --- a/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/RegistryFactory.java +++ b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/RegistryFactory.java @@ -17,7 +17,7 @@ public static RegistryService getInstance(String registryPluginType) { private static RegistryService registryBuilder(String registryPluginType) { RegistryService registryServiceExt = EventMeshExtensionFactory.getExtension(RegistryService.class, registryPluginType); if (registryServiceExt == null) { - String errorMsg = "can't load the metaService plugin, please check."; + String errorMsg = "can't load the registry plugin, please check."; log.error(errorMsg); throw new RuntimeException(errorMsg); } diff --git a/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/RegistryListener.java b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/RegistryListener.java index 4f53e4b769..a46b61ed69 100644 --- a/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/RegistryListener.java +++ b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/RegistryListener.java @@ -1,5 +1,5 @@ package org.apache.eventmesh.registry; public interface RegistryListener { - void onChange(Object data); + void onChange(NotifyEvent event); } diff --git a/eventmesh-registry/eventmesh-registry-nacos/src/main/java/org/apache/eventmesh/registry/nacos/NacosDiscoveryService.java b/eventmesh-registry/eventmesh-registry-nacos/src/main/java/org/apache/eventmesh/registry/nacos/NacosDiscoveryService.java index dbb9a140c9..930b8e36ab 100644 --- a/eventmesh-registry/eventmesh-registry-nacos/src/main/java/org/apache/eventmesh/registry/nacos/NacosDiscoveryService.java +++ b/eventmesh-registry/eventmesh-registry-nacos/src/main/java/org/apache/eventmesh/registry/nacos/NacosDiscoveryService.java @@ -4,7 +4,9 @@ import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.NamingService; +import com.alibaba.nacos.api.naming.listener.Event; import com.alibaba.nacos.api.naming.listener.EventListener; +import com.alibaba.nacos.api.naming.listener.NamingEvent; import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacos.api.naming.pojo.ServiceInfo; import com.alibaba.nacos.api.naming.utils.NamingUtils; @@ -127,7 +129,15 @@ public void subscribe(RegistryListener listener, String serviceName) { log.warn("already use same listener subscribe service name {}" ,serviceName); return; } - EventListener eventListener = listener::onChange; + EventListener eventListener = new EventListener() { + @Override + public void onEvent(Event event) { + if (!(event instanceof NamingEvent)) { + log.warn("received notify event type isn't not as expected"); + return; + } + } + }; List clusters ; if (serviceInfo.getClusters() == null || serviceInfo.getClusters().isEmpty()) { clusters = new ArrayList<>();