Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
sodaRyCN committed May 17, 2024
1 parent d6c21ec commit 0b3c583
Show file tree
Hide file tree
Showing 24 changed files with 112 additions and 62 deletions.
2 changes: 1 addition & 1 deletion eventmesh-admin-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencies {

// https://mvnrepository.com/artifact/com.baomidou/mybatis-plus-boot-starter
implementation group: 'com.baomidou', name: 'mybatis-plus-boot-starter', version: '3.5.5'

implementation "org.reflections:reflections:0.10.2"

// https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter
implementation "com.alibaba:druid-spring-boot-starter"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package com.apache.eventmesh.admin.server;

import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.eventmesh.common.remote.Task;
import org.apache.eventmesh.common.remote.offset.RecordPosition;
import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.PagedList;

import org.apache.eventmesh.common.remote.Task;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordOffset;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordPartition;

public interface Admin extends ComponentLifeCycle {
/**
Expand All @@ -21,5 +25,29 @@ public interface Admin extends ComponentLifeCycle {
void reportHeartbeat(ReportHeartBeatRequest heartBeat);


static void main(String[] args) {
// ReportPositionRequest request = new ReportPositionRequest();
// request.setJobID("1");
// request.setAddress("1");
// request.setState(JobState.RUNNING);
// request.setDataSourceType(DataSourceType.MYSQL);
RecordPosition recordPosition = new RecordPosition();
CanalRecordOffset recordOffset = new CanalRecordOffset();
recordOffset.setOffset(12345L);
recordPosition.setRecordOffset(recordOffset);
CanalRecordPartition partition = new CanalRecordPartition();
partition.setJournalName("demo-binary-log-01");
partition.setTimeStamp(System.currentTimeMillis());
recordPosition.setRecordPartition(partition);
// ArrayList<RecordPosition> list = new ArrayList<>();
// list.add(recordPosition);
// request.setRecordPositionList(list);
String bytes = JsonUtils.toJSONString(recordPosition);

RecordPosition object1 = JsonUtils.parseTypeReferenceObject(bytes, new TypeReference<RecordPosition>() {});
RecordPosition object2 = JsonUtils.parseObject(bytes, RecordPosition.class);
System.out.println(object1);
System.out.println(object2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.apache.eventmesh.common.remote.payload.PayloadUtil;
import org.apache.eventmesh.common.remote.request.BaseRemoteRequest;
import org.apache.eventmesh.common.remote.response.EmptyAckResponse;
import org.apache.eventmesh.common.remote.response.BaseGrpcResponse;
import org.apache.eventmesh.common.remote.response.BaseRemoteResponse;
import org.apache.eventmesh.common.remote.response.FailResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
Expand All @@ -30,13 +30,13 @@ private Payload process(Payload value) {
"exists"));
}
try {
BaseRequestHandler<BaseRemoteRequest, BaseGrpcResponse> handler =
BaseRequestHandler<BaseRemoteRequest, BaseRemoteResponse> handler =
handlerFactory.getHandler(value.getMetadata().getType());
if (handler == null) {
return PayloadUtil.from(FailResponse.build(BaseGrpcResponse.UNKNOWN,
return PayloadUtil.from(FailResponse.build(BaseRemoteResponse.UNKNOWN,
"not match any request handler"));
}
BaseGrpcResponse response = handler.handlerRequest((BaseRemoteRequest) PayloadUtil.parse(value), value.getMetadata());
BaseRemoteResponse response = handler.handlerRequest((BaseRemoteRequest) PayloadUtil.parse(value), value.getMetadata());
if (response == null || response instanceof EmptyAckResponse) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public boolean updateJobState(Integer jobID, JobState state) {
EventMeshJobInfo jobInfo = new EventMeshJobInfo();
jobInfo.setJobID(jobID);
jobInfo.setState(state.ordinal());
update(jobInfo, Wrappers.<EventMeshJobInfo>update().notIn("state",JobState.DELETE,JobState.COMPLETE));
update(jobInfo, Wrappers.<EventMeshJobInfo>update().notIn("state",JobState.DELETE.ordinal(),
JobState.COMPLETE.ordinal()));
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,12 @@ public boolean saveOrUpdateByJob(EventMeshMysqlPosition position) {
history.setRecord(JsonUtils.toJSONString(position));
history.setJob(old.getJobID());
history.setAddress(old.getAddress());
log.info("job [{}] position reporter changed old [{}], now [{}]", position.getJobID(), old, position);
try {
historyService.save(history);
} catch (Exception e) {
log.warn("save mysql position reporter changed history fail", e);
}

log.info("job [{}] position reporter changed old [{}], now [{}]", position.getJobID(), old, position);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
import org.apache.eventmesh.common.remote.response.FetchPositionResponse;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordOffset;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Component;
Expand All @@ -34,10 +36,33 @@ public boolean handler(ReportPositionRequest request, Metadata metadata) {
for (int i = 0; i < 3; i++) {
try {
List<RecordPosition> recordPositionList = request.getRecordPositionList();
RecordPosition recordPosition = recordPositionList.get(0);
if (recordPosition == null || recordPosition.getRecordPartition() == null || recordPosition.getRecordOffset() == null) {
log.warn("report mysql position, but record-partition/partition/offset is null");
return false;
}
if (!(recordPosition.getRecordPartition() instanceof CanalRecordPartition)) {
log.warn("report mysql position, but record partition class [{}] not match [{}]",
recordPosition.getRecordPartition().getRecordPartitionClass(), CanalRecordPartition.class);
return false;
}
if (!(recordPosition.getRecordOffset() instanceof CanalRecordOffset)) {
log.warn("report mysql position, but record offset class [{}] not match [{}]",
recordPosition.getRecordOffset().getRecordOffsetClass(), CanalRecordOffset.class);
return false;
}
CanalRecordOffset offset = (CanalRecordOffset) recordPosition.getRecordOffset();
CanalRecordPartition partition = (CanalRecordPartition) recordPosition.getRecordPartition();
EventMeshMysqlPosition position = new EventMeshMysqlPosition();
position.setJobID(Integer.parseInt(request.getJobID()));
position.setAddress(request.getAddress());

if (offset != null) {
position.setPosition(offset.getOffset());
}
if (partition != null) {
position.setTimestamp(partition.getTimeStamp());
position.setJournalName(partition.getJournalName());
}
if (!positionService.saveOrUpdateByJob(position)) {
log.warn("update job position fail [{}]", request);
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
import org.apache.eventmesh.common.remote.request.BaseRemoteRequest;
import org.apache.eventmesh.common.remote.response.BaseGrpcResponse;
import org.apache.eventmesh.common.remote.response.BaseRemoteResponse;

public abstract class BaseRequestHandler<T extends BaseRemoteRequest, S extends BaseGrpcResponse> {
public BaseGrpcResponse handlerRequest(T request, Metadata metadata) {
public abstract class BaseRequestHandler<T extends BaseRemoteRequest, S extends BaseRemoteResponse> {
public BaseRemoteResponse handlerRequest(T request, Metadata metadata) {
return handler(request, metadata);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.apache.eventmesh.admin.server.web.handler.request;

import org.apache.eventmesh.common.remote.request.BaseRemoteRequest;
import org.apache.eventmesh.common.remote.response.BaseGrpcResponse;
import org.apache.eventmesh.common.remote.response.BaseRemoteResponse;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
Expand All @@ -13,10 +13,10 @@
@Component
public class RequestHandlerFactory implements ApplicationListener<ContextRefreshedEvent> {

private final Map<String, BaseRequestHandler<BaseRemoteRequest, BaseGrpcResponse>> handlers =
private final Map<String, BaseRequestHandler<BaseRemoteRequest, BaseRemoteResponse>> handlers =
new ConcurrentHashMap<>();

public BaseRequestHandler<BaseRemoteRequest, BaseGrpcResponse> getHandler(String type) {
public BaseRequestHandler<BaseRemoteRequest, BaseRemoteResponse> getHandler(String type) {
return handlers.get(type);
}

Expand All @@ -26,7 +26,7 @@ public void onApplicationEvent(ContextRefreshedEvent event) {
Map<String, BaseRequestHandler> beans =
event.getApplicationContext().getBeansOfType(BaseRequestHandler.class);

for (BaseRequestHandler<BaseRemoteRequest, BaseGrpcResponse> requestHandler : beans.values()) {
for (BaseRequestHandler<BaseRemoteRequest, BaseRemoteResponse> requestHandler : beans.values()) {
Class<?> clazz = requestHandler.getClass();
boolean skip = false;
while (!clazz.getSuperclass().equals(BaseRequestHandler.class)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ protected EmptyAckResponse handler(ReportPositionRequest request, Metadata metad
if (StringUtils.isBlank(request.getJobID())) {
throw new AdminServerException(ErrorCode.BAD_REQUEST, "illegal job id, it's empty");
}
if (request.getRecordPositionList() == null || request.getRecordPositionList().isEmpty()) {
throw new AdminServerException(ErrorCode.BAD_REQUEST, "illegal record position list, it's empty");
}
int jobID;

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package org.apache.eventmesh.common.remote.offset;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class RecordOffset {

private Class<? extends RecordOffset> clazz;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package org.apache.eventmesh.common.remote.offset;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class RecordPartition {

private Class<? extends RecordPartition> clazz;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@

package org.apache.eventmesh.common.remote.offset;

import com.fasterxml.jackson.annotation.JsonTypeInfo;

import java.util.Objects;

public class RecordPosition {

@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY)
private RecordPartition recordPartition;

private Class<? extends RecordPartition> recordPartitionClazz;

@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY)
private RecordOffset recordOffset;

private Class<? extends RecordOffset> recordOffsetClazz;
Expand Down Expand Up @@ -57,11 +60,11 @@ public void setRecordOffsetClazz(Class<? extends RecordOffset> recordOffsetClazz
this.recordOffsetClazz = recordOffsetClazz;
}

public RecordPartition getPartition() {
public RecordPartition getRecordPartition() {
return recordPartition;
}

public RecordOffset getOffset() {
public RecordOffset getRecordOffset() {
return recordOffset;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@

@Data
@EqualsAndHashCode(callSuper = true)
public class FetchJobRequest extends BaseGrpcRequest {
public class FetchJobRequest extends BaseRemoteRequest {
private String jobID;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

@Data
@EqualsAndHashCode(callSuper = true)
public class FetchPositionRequest extends BaseGrpcRequest {
public class FetchPositionRequest extends BaseRemoteRequest {

private String jobID;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import java.util.Map;

@Getter
public abstract class BaseGrpcResponse implements IPayload {
public abstract class BaseRemoteResponse implements IPayload {
public static final int UNKNOWN = -1;
@Setter
private boolean success = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
/**
* empty, just mean remote received request
*/
public class EmptyAckResponse extends BaseGrpcResponse {
public class EmptyAckResponse extends BaseRemoteResponse {

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.apache.eventmesh.common.remote.response;

public class FailResponse extends BaseGrpcResponse {
public class FailResponse extends BaseRemoteResponse {
public static FailResponse build(int errorCode, String msg) {
FailResponse response = new FailResponse();
response.setErrorCode(errorCode);
Expand All @@ -17,6 +17,6 @@ public static FailResponse build(int errorCode, String msg) {
* @return response
*/
public static FailResponse build(Throwable exception) {
return build(BaseGrpcResponse.UNKNOWN, exception.getMessage());
return build(BaseRemoteResponse.UNKNOWN, exception.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

@Data
@EqualsAndHashCode(callSuper = true)
public class FetchJobResponse extends BaseGrpcResponse {
public class FetchJobResponse extends BaseRemoteResponse {

private Integer id;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

@Data
@EqualsAndHashCode(callSuper = true)
public class FetchPositionResponse extends BaseGrpcResponse {
public class FetchPositionResponse extends BaseRemoteResponse {

private RecordPosition recordPosition;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ public static <T> T parseTypeReferenceObject(String text, TypeReference<T> typeR
}
}

public static <T> T parseTypeReferenceObject(byte[] text, TypeReference<T> typeReference) {
try {
return OBJECT_MAPPER.readValue(text, typeReference);
} catch (IOException e) {
throw new JsonException("deserialize json string to typeReference error", e);
}
}

public static JsonNode getJsonNode(String text) {
if (StringUtils.isEmpty(text)) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordOffset;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordPartition;
import org.apache.eventmesh.common.remote.offset.canal.CanalRecordOffset;
import org.apache.eventmesh.common.remote.offset.canal.CanalRecordPartition;
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -42,9 +41,6 @@
import java.util.concurrent.locks.LockSupport;


import org.springframework.util.CollectionUtils;

import com.alibaba.otter.canal.common.CanalException;
import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
Expand All @@ -60,13 +56,10 @@
import com.alibaba.otter.canal.parse.CanalEventParser;
import com.alibaba.otter.canal.parse.ha.CanalHAController;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.alibaba.otter.canal.sink.AbstractCanalEventSink;
import com.alibaba.otter.canal.sink.CanalEventSink;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

Expand Down Expand Up @@ -180,8 +173,8 @@ private Canal buildCanal(CanalSourceConfig sourceConfig) {
List<String> positions = new ArrayList<>();
recordPositions.forEach(recordPosition -> {
Map<String, Object> recordPositionMap = new HashMap<>();
CanalRecordPartition canalRecordPartition = (CanalRecordPartition)(recordPosition.getPartition());
CanalRecordOffset canalRecordOffset = (CanalRecordOffset)(recordPosition.getOffset());
CanalRecordPartition canalRecordPartition = (CanalRecordPartition)(recordPosition.getRecordPartition());
CanalRecordOffset canalRecordOffset = (CanalRecordOffset)(recordPosition.getRecordOffset());
recordPositionMap.put("journalName", canalRecordPartition.getJournalName());
recordPositionMap.put("timestamp", canalRecordPartition.getTimeStamp());
recordPositionMap.put("position", canalRecordOffset.getOffset());
Expand Down
Loading

0 comments on commit 0b3c583

Please sign in to comment.