Skip to content

Commit

Permalink
reactor registry
Browse files Browse the repository at this point in the history
  • Loading branch information
sodaRyCN committed Apr 23, 2024
1 parent 1b4db89 commit 78535fa
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.apache.eventmesh.admin.server.web;

import com.apache.eventmesh.admin.server.ComponentLifeCycle;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

@Slf4j
public abstract class BaseServer implements ComponentLifeCycle {
@PostConstruct
public void init() {
log.info("[{}] server starting at port [{}]", this.getClass().getSimpleName(), getPort());
start();
log.info("[{}] server started at port [{}]", this.getClass().getSimpleName(), getPort());
}

@PreDestroy
public void shutdown() {
log.info("[{}] server will destroy", this.getClass().getSimpleName());
destroy();
log.info("[{}] server has be destroy", this.getClass().getSimpleName());
}

public abstract int getPort();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminBiStreamServiceGrpc;
import org.springframework.stereotype.Controller;

import javax.annotation.PostConstruct;

@Controller
public class GrpcServer extends AdminBiStreamServiceGrpc.AdminBiStreamServiceImplBase implements ComponentLifeCycle {

@PostConstruct
@Override
public void start() {

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.apache.eventmesh.registry;

import lombok.Getter;

import java.util.List;

public class NotifyEvent {
// means whether it is an increment data
@Getter
private boolean isIncrement = false;

@Getter
private List<RegisterServerInfo> instances;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public static RegistryService getInstance(String registryPluginType) {
private static RegistryService registryBuilder(String registryPluginType) {
RegistryService registryServiceExt = EventMeshExtensionFactory.getExtension(RegistryService.class, registryPluginType);
if (registryServiceExt == null) {
String errorMsg = "can't load the metaService plugin, please check.";
String errorMsg = "can't load the registry plugin, please check.";
log.error(errorMsg);
throw new RuntimeException(errorMsg);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package org.apache.eventmesh.registry;

public interface RegistryListener {
void onChange(Object data);
void onChange(NotifyEvent event);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
Expand Down Expand Up @@ -127,7 +129,15 @@ public void subscribe(RegistryListener listener, String serviceName) {
log.warn("already use same listener subscribe service name {}" ,serviceName);
return;
}
EventListener eventListener = listener::onChange;
EventListener eventListener = new EventListener() {
@Override
public void onEvent(Event event) {
if (!(event instanceof NamingEvent)) {
log.warn("received notify event type isn't not as expected");
return;
}
}
};
List<String> clusters ;
if (serviceInfo.getClusters() == null || serviceInfo.getClusters().isEmpty()) {
clusters = new ArrayList<>();
Expand Down

0 comments on commit 78535fa

Please sign in to comment.