Skip to content

Commit

Permalink
[ISSUE apache#5071] Enhancement for admin server and canal source/sin…
Browse files Browse the repository at this point in the history
…k connector
  • Loading branch information
xwm1992 committed Aug 7, 2024
1 parent abe3e93 commit 5458bae
Show file tree
Hide file tree
Showing 22 changed files with 195 additions and 111 deletions.
2 changes: 1 addition & 1 deletion eventmesh-admin-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ dependencies {
implementation project(":eventmesh-common")
implementation project(":eventmesh-registry:eventmesh-registry-api")
implementation project(":eventmesh-registry:eventmesh-registry-nacos")
implementation project(':eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api')
implementation project(":eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api")
implementation "com.alibaba.nacos:nacos-client"
implementation("org.springframework.boot:spring-boot-starter-web") {
exclude group: "org.springframework.boot", module: "spring-boot-starter-tomcat"
Expand Down
8 changes: 6 additions & 2 deletions eventmesh-admin-server/conf/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ mybatis-plus:
configuration:
map-underscore-to-camel-case: false
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# http server port
server:
port: 8082
event-mesh:
admin-server:
serviceName: DEFAULT_GROUP@@em_adm_server
# grpc server port
port: 8081
adminServerList:
region1:
- http://localhost:8081
region2:
- http://localhost:8082
region2:
- http://localhost:8083
region: region1
7 changes: 4 additions & 3 deletions eventmesh-admin-server/conf/eventmesh.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
`dataType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`description` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`configuration` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`configurationClass` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`region` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
Expand Down Expand Up @@ -134,13 +135,13 @@ CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (

-- export table eventmesh.event_mesh_verify structure
CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
`id` int NOT NULL,
`id` int unsigned NOT NULL AUTO_INCREMENT,
`taskID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`recordID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`recordSig` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`connectorName` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`connectorName` varchar(200) COLLATE utf8mb4_general_ci DEFAULT NULL,
`connectorStage` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`position` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`position` text COLLATE utf8mb4_general_ci DEFAULT NULL,
`createTime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<result property="dataType" column="dataType" jdbcType="VARCHAR"/>
<result property="description" column="description" jdbcType="VARCHAR"/>
<result property="configuration" column="configuration" jdbcType="VARCHAR"/>
<result property="configurationClass" column="configurationClass" jdbcType="VARCHAR"/>
<result property="region" column="region" jdbcType="VARCHAR"/>
<result property="createUid" column="createUid" jdbcType="VARCHAR"/>
<result property="updateUid" column="updateUid" jdbcType="VARCHAR"/>
Expand All @@ -37,7 +38,7 @@

<sql id="Base_Column_List">
id,dataType,description,
configuration,region,createUid,updateUid,
createTime,updateTime
configuration,configurationClass,region,
createUid,updateUid,createTime,updateTime
</sql>
</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class EventMeshDataSource implements Serializable {

private String configuration;

private String configurationClass;

private String region;

private String createUid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ public FetchJobResponse handler(FetchJobRequest request, Metadata metadata) {
}
response.setId(detail.getJobID());
JobConnectorConfig config = new JobConnectorConfig();
config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource()));
config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource().getConf()));
config.setSourceConnectorDesc(detail.getSourceConnectorDesc());
config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource()));
config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource().getConf()));
config.setSourceConnectorDesc(detail.getSinkConnectorDesc());
response.setConnectorConfig(config);
response.setTransportType(detail.getTransportType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@

@Service
public class DataSourceBizService {

@Autowired
private EventMeshDataSourceService dataSourceService;

public EventMeshDataSource createDataSource(CreateOrUpdateDataSourceReq dataSource) {
EventMeshDataSource entity = new EventMeshDataSource();
entity.setConfiguration(JsonUtils.toJSONString(dataSource.getConfig()));
entity.setConfigurationClass(dataSource.getConfigClass());
entity.setDataType(dataSource.getType().name());
entity.setCreateUid(dataSource.getOperator());
entity.setUpdateUid(dataSource.getOperator());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.eventmesh.admin.server.web.pojo.JobDetail;
import org.apache.eventmesh.admin.server.web.service.datasource.DataSourceBizService;
import org.apache.eventmesh.admin.server.web.service.position.PositionBizService;
import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.remote.TaskState;
import org.apache.eventmesh.common.remote.TransportType;
import org.apache.eventmesh.common.remote.datasource.DataSource;
Expand Down Expand Up @@ -114,6 +115,7 @@ public List<EventMeshJobInfo> createJobs(List<JobDetail> jobs) {
source.setRegion(job.getSourceDataSource().getRegion());
source.setDesc(job.getSourceConnectorDesc());
source.setConfig(job.getSourceDataSource().getConf());
source.setConfigClass(job.getSourceDataSource().getConfClazz().getName());
EventMeshDataSource createdSource = dataSourceBizService.createDataSource(source);
entity.setSourceData(createdSource.getId());

Expand All @@ -123,6 +125,7 @@ public List<EventMeshJobInfo> createJobs(List<JobDetail> jobs) {
sink.setRegion(job.getSinkDataSource().getRegion());
sink.setDesc(job.getSinkConnectorDesc());
sink.setConfig(job.getSinkDataSource().getConf());
sink.setConfigClass(job.getSinkDataSource().getConfClazz().getName());
EventMeshDataSource createdSink = dataSourceBizService.createDataSource(sink);
entity.setTargetData(createdSink.getId());

Expand All @@ -141,18 +144,22 @@ public JobDetail getJobDetail(String jobID) {
if (jobID == null) {
return null;
}
EventMeshJobInfo job = jobInfoService.getById(jobID);
EventMeshJobInfo job = jobInfoService.getOne(Wrappers.<EventMeshJobInfo>query().eq("jobID", jobID));
if (job == null) {
return null;
}
JobDetail detail = new JobDetail();
detail.setTaskID(job.getTaskID());
detail.setJobID(job.getJobID());
EventMeshDataSource source = dataSourceService.getById(job.getSourceData());
EventMeshDataSource target = dataSourceService.getById(job.getTargetData());
if (source != null) {
if (!StringUtils.isBlank(source.getConfiguration())) {
try {
detail.setSourceDataSource(JsonUtils.parseObject(source.getConfiguration(), DataSource.class));
DataSource sourceDataSource = new DataSource();
Class<?> configClass = Class.forName(source.getConfigurationClass());
sourceDataSource.setConf((Config) JsonUtils.parseObject(source.getConfiguration(), configClass));
detail.setSourceDataSource(sourceDataSource);
} catch (Exception e) {
log.warn("parse source config id [{}] fail", job.getSourceData(), e);
throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal source data source config");
Expand All @@ -168,7 +175,10 @@ public JobDetail getJobDetail(String jobID) {
if (target != null) {
if (!StringUtils.isBlank(target.getConfiguration())) {
try {
detail.setSinkDataSource(JsonUtils.parseObject(target.getConfiguration(), DataSource.class));
DataSource sinkDataSource = new DataSource();
Class<?> configClass = Class.forName(target.getConfigurationClass());
sinkDataSource.setConf((Config) JsonUtils.parseObject(target.getConfiguration(), configClass));
detail.setSinkDataSource(sinkDataSource);
} catch (Exception e) {
log.warn("parse sink config id [{}] fail", job.getSourceData(), e);
throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal target data sink config");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@
import org.apache.eventmesh.admin.server.web.db.service.EventMeshTaskInfoService;
import org.apache.eventmesh.admin.server.web.pojo.JobDetail;
import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService;
import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.remote.TaskState;
import org.apache.eventmesh.common.remote.datasource.DataSource;
import org.apache.eventmesh.common.remote.datasource.DataSourceType;
import org.apache.eventmesh.common.remote.request.CreateTaskRequest;
import org.apache.eventmesh.common.utils.JsonUtils;

import org.apache.commons.lang3.StringUtils;

import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
Expand All @@ -40,6 +45,7 @@

@Service
public class TaskBizService {

@Autowired
private EventMeshTaskInfoService taskInfoService;

Expand Down Expand Up @@ -76,7 +82,12 @@ public String createTask(CreateTaskRequest req) {

String finalTaskID = taskID;
List<JobDetail> jobs = req.getJobs().stream().map(x -> {
JobDetail job = parse(x);
JobDetail job = null;
try {
job = parse(x);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
job.setTaskID(finalTaskID);
job.setCreateUid(req.getUid());
job.setUpdateUid(req.getUid());
Expand All @@ -95,14 +106,30 @@ public String createTask(CreateTaskRequest req) {
return finalTaskID;
}

private JobDetail parse(CreateTaskRequest.JobDetail src) {
private JobDetail parse(CreateTaskRequest.JobDetail src) throws ClassNotFoundException {
JobDetail dst = new JobDetail();
dst.setJobDesc(src.getJobDesc());
dst.setTransportType(src.getTransportType());
dst.setSourceConnectorDesc(src.getSourceConnectorDesc());
dst.setSourceDataSource(src.getSourceDataSource());
Map<String, Object> sourceDataMap = src.getSourceDataSource();
DataSource sourceDataSource = new DataSource();
sourceDataSource.setType(DataSourceType.fromString(sourceDataMap.get("type").toString()));
sourceDataSource.setDesc((String) sourceDataMap.get("desc"));
sourceDataSource.setConfClazz((Class<? extends Config>) Class.forName(sourceDataMap.get("confClazz").toString()));
sourceDataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(sourceDataMap.get("conf")), sourceDataSource.getConfClazz()));
sourceDataSource.setRegion((String) sourceDataMap.get("region"));
dst.setSourceDataSource(sourceDataSource);

dst.setSinkConnectorDesc(src.getSinkConnectorDesc());
dst.setSinkDataSource(src.getSinkDataSource());
Map<String, Object> sinkDataMap = src.getSinkDataSource();
DataSource sinkDataSource = new DataSource();
sinkDataSource.setType(DataSourceType.fromString(sinkDataMap.get("type").toString()));
sinkDataSource.setDesc((String) sinkDataMap.get("desc"));
sinkDataSource.setConfClazz((Class<? extends Config>) Class.forName(sinkDataMap.get("confClazz").toString()));
sinkDataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(sinkDataMap.get("conf")), sinkDataSource.getConfClazz()));
sinkDataSource.setRegion((String) sinkDataMap.get("region"));
dst.setSinkDataSource(sinkDataSource);

// full/increase/check
dst.setJobType(src.getJobType());
dst.setFromRegion(src.getFromRegion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,13 @@ public static DataSourceType getDataSourceType(Integer index) {
}
return TYPES[index];
}

public static DataSourceType fromString(String type) {
for (DataSourceType dataSourceType : DataSourceType.values()) {
if (dataSourceType.name().equalsIgnoreCase(type)) {
return dataSourceType;
}
}
throw new IllegalArgumentException("No enum constant for type: " + type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class CreateOrUpdateDataSourceReq extends BaseRemoteRequest {
private DataSourceType type;
private String desc;
private Config config;
private String configClass;
private String region;
private String operator;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.eventmesh.common.remote.request;

import org.apache.eventmesh.common.remote.TransportType;
import org.apache.eventmesh.common.remote.datasource.DataSource;
import org.apache.eventmesh.common.remote.job.JobType;

import java.util.List;
import java.util.Map;

import lombok.Data;

Expand Down Expand Up @@ -61,11 +61,11 @@ public static class JobDetail {
// full/increase/check
private JobType jobType;

private DataSource sourceDataSource;
private Map<String, Object> sourceDataSource;

private String sourceConnectorDesc;

private DataSource sinkDataSource;
private Map<String, Object> sinkDataSource;

private String sinkConnectorDesc;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
org.apache.eventmesh.common.remote.request.FetchJobRequest
org.apache.eventmesh.common.remote.response.FetchJobResponse
org.apache.eventmesh.common.remote.request.ReportPositionRequest
org.apache.eventmesh.common.remote.request.ReportVerifyRequest
org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest
org.apache.eventmesh.common.remote.request.FetchPositionRequest
org.apache.eventmesh.common.remote.response.FetchPositionResponse
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import org.apache.eventmesh.connector.canal.model.EventColumn;
import org.apache.eventmesh.connector.canal.model.EventType;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import lombok.Data;

@Data
public class CanalConnectRecord {
public class CanalConnectRecord implements Serializable {

private static final long serialVersionUID = 1L;

private String schemaName;

Expand Down
Loading

0 comments on commit 5458bae

Please sign in to comment.