diff --git a/eventmesh-admin-server/conf/eventmesh.sql b/eventmesh-admin-server/conf/eventmesh.sql index 226101661c..82d5c53317 100644 --- a/eventmesh-admin-server/conf/eventmesh.sql +++ b/eventmesh-admin-server/conf/eventmesh.sql @@ -15,13 +15,6 @@ -- specific language governing permissions and limitations -- under the License. --- -------------------------------------------------------- --- 主机: 127.0.0.1 --- 服务器版本: 8.0.36 - MySQL Community Server - GPL --- 服务器操作系统: Win64 --- HeidiSQL 版本: 11.3.0.6295 --- -------------------------------------------------------- - /*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */; /*!40101 SET NAMES utf8 */; /*!50503 SET NAMES utf8mb4 */; @@ -30,103 +23,125 @@ /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */; --- 导出 eventmesh 的数据库结构 +-- export eventmesh database CREATE DATABASE IF NOT EXISTS `eventmesh` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci */ /*!80016 DEFAULT ENCRYPTION='N' */; USE `eventmesh`; --- 导出 表 eventmesh.event_mesh_data_source 结构 +-- export table eventmesh.event_mesh_data_source structure CREATE TABLE IF NOT EXISTS `event_mesh_data_source` ( `id` int unsigned NOT NULL AUTO_INCREMENT, - `dataType` int unsigned NOT NULL, + `dataType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', `description` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, `configuration` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, - `createUid` int NOT NULL, - `updateUid` int NOT NULL, + `region` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) USING BTREE -) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; - --- 数据导出被取消选择。 +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; --- 导出 表 eventmesh.event_mesh_job_info 结构 +-- export table eventmesh.event_mesh_job_info structure CREATE TABLE IF NOT EXISTS `event_mesh_job_info` ( - `jobID` int unsigned NOT NULL AUTO_INCREMENT, - `name` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, - `transportType` int unsigned DEFAULT NULL COMMENT 'JobTransportType', - `sourceData` int unsigned DEFAULT NULL COMMENT 'data_source表', - `targetData` int unsigned DEFAULT NULL, - `state` tinyint unsigned NOT NULL COMMENT 'JobState', - `jobType` tinyint unsigned NOT NULL COMMENT 'connector,mesh,func,...', - `createUid` int unsigned NOT NULL, - `updateUid` int unsigned NOT NULL, + `id` int unsigned NOT NULL AUTO_INCREMENT, + `jobID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, + `taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `transportType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `sourceData` int NOT NULL DEFAULT '0', + `targetData` int NOT NULL DEFAULT '0', + `state` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `jobType` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `fromRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `createUid` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `updateUid` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (`jobID`) USING BTREE -) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; - --- 数据导出被取消选择。 + PRIMARY KEY (`id`) USING BTREE, + UNIQUE KEY `jobID` (`jobID`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; --- 导出 表 eventmesh.event_mesh_mysql_position 结构 +-- export table eventmesh.event_mesh_mysql_position structure CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` ( `id` int unsigned NOT NULL AUTO_INCREMENT, - `jobID` int unsigned NOT NULL, + `jobID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', `serverUUID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, `address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `position` bigint DEFAULT NULL, `gtid` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, `currentGtid` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, `timestamp` bigint DEFAULT NULL, - `journalName` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `journalName` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `jobID` (`jobID`) -) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC; - --- 数据导出被取消选择。 +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC; --- 导出 表 eventmesh.event_mesh_position_reporter_history 结构 +-- export table eventmesh.event_mesh_position_reporter_history structure CREATE TABLE IF NOT EXISTS `event_mesh_position_reporter_history` ( `id` bigint NOT NULL AUTO_INCREMENT, - `job` int NOT NULL, + `job` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', `record` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, - `address` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`), KEY `job` (`job`), KEY `address` (`address`) -) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='记录position上报者变更时,老记录'; - --- 数据导出被取消选择。 +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='record position reporter changes'; --- 导出 表 eventmesh.event_mesh_runtime_heartbeat 结构 +-- export table eventmesh.event_mesh_runtime_heartbeat structure CREATE TABLE IF NOT EXISTS `event_mesh_runtime_heartbeat` ( `id` bigint unsigned NOT NULL AUTO_INCREMENT, `adminAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `runtimeAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, - `jobID` int unsigned DEFAULT NULL, - `reportTime` varchar(50) COLLATE utf8mb4_general_ci NOT NULL COMMENT 'runtime本地上报时间', + `jobID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `reportTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'runtime local report time', `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `runtimeAddr` (`runtimeAddr`), KEY `jobID` (`jobID`) -) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; - --- 数据导出被取消选择。 +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; --- 导出 表 eventmesh.event_mesh_runtime_history 结构 +-- export table eventmesh.event_mesh_runtime_history structure CREATE TABLE IF NOT EXISTS `event_mesh_runtime_history` ( `id` bigint NOT NULL AUTO_INCREMENT, - `job` int NOT NULL, + `job` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', `address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`), KEY `address` (`address`) -) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='记录runtime上运行任务的变更'; +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='record runtime task change history'; --- 数据导出被取消选择。 +-- export table eventmesh.event_mesh_task_info structure +CREATE TABLE IF NOT EXISTS `event_mesh_task_info` ( + `id` int unsigned NOT NULL AUTO_INCREMENT, + `taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, + `name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, + `state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'taskstate', + `fromRegion` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`) USING BTREE, + UNIQUE KEY `taskID` (`taskID`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; + +-- export table eventmesh.event_mesh_verify structure +CREATE TABLE IF NOT EXISTS `event_mesh_verify` ( + `id` int NOT NULL, + `taskID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `recordID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `recordSig` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `connectorName` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `connectorStage` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `position` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `createTime` timestamp NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; /*!40101 SET SQL_MODE=IFNULL(@OLD_SQL_MODE, '') */; /*!40014 SET FOREIGN_KEY_CHECKS=IFNULL(@OLD_FOREIGN_KEY_CHECKS, 1) */; diff --git a/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml index dee497c848..d100e19033 100644 --- a/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml +++ b/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml @@ -1,20 +1,23 @@ - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> @@ -22,18 +25,19 @@ - + - - + + + - id,dataType,address, - description,configuration,createUid, - updateUid,createTime,updateTime + id,dataType,description, + configuration,region,createUid,updateUid, + createTime,updateTime diff --git a/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml index e758a276a9..02e8806680 100644 --- a/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml +++ b/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml @@ -1,43 +1,49 @@ - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - - - + + + + + - - - - + + + + + - jobID,name,transportType, - sourceData,targetData,state, - runtimeType,createUid, - updateUid,createTime,updateTime + id,jobID,desc, + taskID,transportType,sourceData, + targetData,state,jobType, + fromRegion,createTime,updateTime diff --git a/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml index cbb7c094d8..9bcc7f42bb 100644 --- a/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml +++ b/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml @@ -1,43 +1,46 @@ - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> + PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" + "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> - - - - - - - - - - - + + + + + + + + + + + - id - ,jobID,serverUUID,address, - position,gtid,currentGtid,timestamp,journalName, + id,jobID,serverUUID, + address,position,gtid, + currentGtid,timestamp,journalName, createTime,updateTime diff --git a/eventmesh-admin-server/conf/mapper/EventMeshPositionReporterHistoryMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshPositionReporterHistoryMapper.xml index 2ee22e1ad9..a9e4fe6f1b 100644 --- a/eventmesh-admin-server/conf/mapper/EventMeshPositionReporterHistoryMapper.xml +++ b/eventmesh-admin-server/conf/mapper/EventMeshPositionReporterHistoryMapper.xml @@ -1,20 +1,23 @@ - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> @@ -22,7 +25,7 @@ - + diff --git a/eventmesh-admin-server/conf/mapper/EventMeshRuntimeHeartbeatMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshRuntimeHeartbeatMapper.xml index b811c5950d..200b1bf54a 100644 --- a/eventmesh-admin-server/conf/mapper/EventMeshRuntimeHeartbeatMapper.xml +++ b/eventmesh-admin-server/conf/mapper/EventMeshRuntimeHeartbeatMapper.xml @@ -1,20 +1,23 @@ - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> @@ -24,7 +27,7 @@ - + diff --git a/eventmesh-admin-server/conf/mapper/EventMeshRuntimeHistoryMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshRuntimeHistoryMapper.xml index d9e17bc859..281cce30f9 100644 --- a/eventmesh-admin-server/conf/mapper/EventMeshRuntimeHistoryMapper.xml +++ b/eventmesh-admin-server/conf/mapper/EventMeshRuntimeHistoryMapper.xml @@ -1,28 +1,31 @@ - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - - + + diff --git a/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml new file mode 100644 index 0000000000..05b1dc52a0 --- /dev/null +++ b/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml @@ -0,0 +1,45 @@ + + + + + + + + + + + + + + + + + + + + + + id,taskID,name, + desc,state,fromRegion, + createUid,updateUid,createTime, + updateTime + + diff --git a/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml new file mode 100644 index 0000000000..b7b042145a --- /dev/null +++ b/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml @@ -0,0 +1,42 @@ + + + + + + + + + + + + + + + + + + + + id,taskID,recordID, + recordSig,connectorName,connectorStage, + position,createTime + + diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerRuntimeException.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerRuntimeException.java index 5a68baba1e..e68d05100f 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerRuntimeException.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerRuntimeException.java @@ -19,9 +19,8 @@ import lombok.Getter; +@Getter public class AdminServerRuntimeException extends RuntimeException { - - @Getter private final int code; public AdminServerRuntimeException(int code, String message) { diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ExampleAdminServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ExampleAdminServer.java index 7f5fa22dda..b179a790c5 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ExampleAdminServer.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ExampleAdminServer.java @@ -23,7 +23,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -@SpringBootApplication() +@SpringBootApplication(scanBasePackages = "org.apache.eventmesh.admin.server") public class ExampleAdminServer { public static void main(String[] args) throws Exception { diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/GrpcServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/GrpcServer.java index 572e07a21d..d2a0330355 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/GrpcServer.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/GrpcServer.java @@ -18,6 +18,7 @@ package org.apache.eventmesh.admin.server.web; import org.apache.eventmesh.admin.server.AdminServerProperties; +import org.apache.eventmesh.admin.server.web.service.AdminGrpcServer; import java.util.concurrent.TimeUnit; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java index 3f91115bdc..bd896d546c 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java @@ -17,10 +17,30 @@ package org.apache.eventmesh.admin.server.web; +import org.apache.eventmesh.admin.server.web.service.task.TaskBizService; +import org.apache.eventmesh.common.remote.request.CreateTaskRequest; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/eventmesh/admin") public class HttpServer { + @Autowired + private TaskBizService taskService; + + @RequestMapping("/createTask") + public ResponseEntity> createOrUpdateTask(@RequestBody CreateTaskRequest task) { + String uuid = taskService.createTask(task); + return ResponseEntity.ok(Response.success(uuid)); + } + + public boolean deleteTask(Long id) { + return false; + } + + } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/Response.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/Response.java index d58312146c..329a00baae 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/Response.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/Response.java @@ -17,35 +17,44 @@ package org.apache.eventmesh.admin.server.web; -public class Response { +import org.apache.eventmesh.common.remote.exception.ErrorCode; +public class Response { + private int code; private boolean success; private String desc; private T data; - public boolean isSuccess() { - return success; - } - - public void setSuccess(boolean success) { - this.success = success; - } - - public String getDesc() { - return desc; + public static Response success() { + Response response = new Response<>(); + response.success = true; + response.code = ErrorCode.SUCCESS; + return response; } - public void setDesc(String desc) { - this.desc = desc; + public static Response success(T data) { + Response response = new Response<>(); + response.success = true; + response.data = data; + return response; } - public T getData() { - return data; + public static Response fail(int code, String desc) { + Response response = new Response<>(); + response.success = false; + response.code = code; + response.desc = desc; + return response; } - public void setData(T data) { - this.data = data; + public static Response fail(int code, String desc, T data) { + Response response = new Response<>(); + response.success = false; + response.code = code; + response.desc = desc; + response.data = data; + return response; } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java index 2f154faf05..9d81366aa5 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java @@ -27,24 +27,25 @@ import lombok.Data; /** - * event_mesh_data_source + * TableName event_mesh_data_source */ @TableName(value = "event_mesh_data_source") @Data public class EventMeshDataSource implements Serializable { - @TableId(type = IdType.AUTO) private Integer id; - private Integer dataType; + private String dataType; private String description; private String configuration; - private Integer createUid; + private String region; + + private String createUid; - private Integer updateUid; + private String updateUid; private Date createTime; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java index 73d2f4aba4..23db5f6c2b 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java @@ -27,30 +27,35 @@ import lombok.Data; /** - * event_mesh_job_info + * TableName event_mesh_job_info */ @TableName(value = "event_mesh_job_info") @Data public class EventMeshJobInfo implements Serializable { - @TableId(type = IdType.AUTO) - private Integer jobID; + private Integer id; + + private String jobID; - private String name; + private String desc; - private Integer transportType; + private String taskID; + + private String transportType; private Integer sourceData; private Integer targetData; - private Integer state; + private String state; + + private String jobType; - private Integer jobType; + private String fromRegion; - private Integer createUid; + private String createUid; - private Integer updateUid; + private String updateUid; private Date createTime; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java index 65a38b54b5..5e5d5745c1 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java @@ -27,16 +27,15 @@ import lombok.Data; /** - * event_mesh_mysql_position + * TableName event_mesh_mysql_position */ @TableName(value = "event_mesh_mysql_position") @Data public class EventMeshMysqlPosition implements Serializable { - @TableId(type = IdType.AUTO) private Integer id; - private Integer jobID; + private String jobID; private String serverUUID; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshPositionReporterHistory.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshPositionReporterHistory.java index c8d7d9b6d0..8518c38918 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshPositionReporterHistory.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshPositionReporterHistory.java @@ -27,16 +27,15 @@ import lombok.Data; /** - * event_mesh_position_reporter_history + * TableName event_mesh_position_reporter_history */ @TableName(value = "event_mesh_position_reporter_history") @Data public class EventMeshPositionReporterHistory implements Serializable { - @TableId(type = IdType.AUTO) private Long id; - private Integer job; + private String job; private String record; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHeartbeat.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHeartbeat.java index 7cc165cc58..95e6c5e261 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHeartbeat.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHeartbeat.java @@ -27,12 +27,11 @@ import lombok.Data; /** - * event_mesh_runtime_heartbeat + * TableName event_mesh_runtime_heartbeat */ @TableName(value = "event_mesh_runtime_heartbeat") @Data public class EventMeshRuntimeHeartbeat implements Serializable { - @TableId(type = IdType.AUTO) private Long id; @@ -40,7 +39,7 @@ public class EventMeshRuntimeHeartbeat implements Serializable { private String runtimeAddr; - private Integer jobID; + private String jobID; private String reportTime; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHistory.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHistory.java index 1f8ef788d1..ea7e10cbad 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHistory.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHistory.java @@ -27,16 +27,15 @@ import lombok.Data; /** - * event_mesh_runtime_history + * TableName event_mesh_runtime_history */ @TableName(value = "event_mesh_runtime_history") @Data public class EventMeshRuntimeHistory implements Serializable { - @TableId(type = IdType.AUTO) private Long id; - private Integer job; + private String job; private String address; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshTaskInfo.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshTaskInfo.java new file mode 100644 index 0000000000..5d1b6648c9 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshTaskInfo.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.db.entity; + +import java.io.Serializable; +import java.util.Date; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; + +import lombok.Data; + +/** + * TableName event_mesh_task_info + */ +@TableName(value = "event_mesh_task_info") +@Data +public class EventMeshTaskInfo implements Serializable { + @TableId(type = IdType.AUTO) + private Integer id; + + private String taskID; + + private String name; + + private String desc; + + private String state; + + private String fromRegion; + + private String createUid; + + private String updateUid; + + private Date createTime; + + private Date updateTime; + + private static final long serialVersionUID = 1L; +} \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java new file mode 100644 index 0000000000..5425c5c57b --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.db.entity; + +import java.io.Serializable; +import java.util.Date; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; + +import lombok.Data; + +/** + * TableName event_mesh_verify + */ +@TableName(value = "event_mesh_verify") +@Data +public class EventMeshVerify implements Serializable { + @TableId(type = IdType.AUTO) + private Integer id; + + private String taskID; + + private String recordID; + + private String recordSig; + + private String connectorName; + + private String connectorStage; + + private String position; + + private Date createTime; + + private static final long serialVersionUID = 1L; +} \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoExtMapper.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoExtMapper.java new file mode 100644 index 0000000000..7f46dcab41 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoExtMapper.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.db.mapper; + +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; + +import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Options; +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +/** + * etx operator for table event_mesh_job_info + */ +@Mapper +public interface EventMeshJobInfoExtMapper extends BaseMapper { + @Insert("insert into event_mesh_job_info(`taskID`,`state`,`jobType`) values" + + "(#{job.taskID},#{job.state},#{job.jobType})") + @Options(useGeneratedKeys = true, keyProperty = "jobID") + int saveBatch(@Param("jobs") List jobInfoList); +} + + + + diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoMapper.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoMapper.java index eb57c0af2c..39f8a4aed6 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoMapper.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoMapper.java @@ -25,8 +25,7 @@ /** * for table 'event_mesh_job_info' db operation - * 2024-05-09 15:51:45 - * entity.db.web.server.admin.eventmesh.apache.org.EventMeshJobInfo + * entity org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo */ @Mapper public interface EventMeshJobInfoMapper extends BaseMapper { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshTaskInfoMapper.java similarity index 65% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java rename to eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshTaskInfoMapper.java index 7b0ae5074a..d1d472b8c4 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshTaskInfoMapper.java @@ -15,17 +15,23 @@ * limitations under the License. */ -package org.apache.eventmesh.common.remote; +package org.apache.eventmesh.admin.server.web.db.mapper; -public enum JobState { - INIT, STARTED, RUNNING, PAUSE, COMPLETE, DELETE, FAIL; - private static final JobState[] STATES = JobState.values(); +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo; - public static JobState fromIndex(Integer index) { - if (index == null || index < 0 || index >= STATES.length) { - return null; - } +import org.apache.ibatis.annotations.Mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +/** + * event_mesh_task_info + * Entity org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo + */ +@Mapper +public interface EventMeshTaskInfoMapper extends BaseMapper { - return STATES[index]; - } } + + + + diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshVerifyMapper.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshVerifyMapper.java new file mode 100644 index 0000000000..b444d1e4b4 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshVerifyMapper.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.db.mapper; + +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshVerify; + +import org.apache.ibatis.annotations.Mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +/** + * event_mesh_verify + * Entity org.apache.eventmesh.admin.server.web.db.entity.EventMeshVerify + */ +@Mapper +public interface EventMeshVerifyMapper extends BaseMapper { + +} + + + + diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/Task.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoExtService.java similarity index 68% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/Task.java rename to eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoExtService.java index 6f97c4207c..22fc5ae299 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/Task.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoExtService.java @@ -15,20 +15,15 @@ * limitations under the License. */ -package org.apache.eventmesh.common.remote; +package org.apache.eventmesh.admin.server.web.db.service; -// task : job = 1 : m -public class Task { - private long id; - private String name; - private String desc; - private String uid; - private String sourceUser; - private String sourcePasswd; - private String targetUser; - private String targetPasswd; - private int sourceType; - private int targetType; +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; +import java.util.List; +/** + * ext operator for table event_mesh_job + */ +public interface EventMeshJobInfoExtService { + int batchSave(List jobs); } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java index c5ad399854..572e451ceb 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java @@ -22,8 +22,8 @@ import com.baomidou.mybatisplus.extension.service.IService; /** -* for table `event_mesh_job_info' db operation -* 2024-05-09 15:51:45 -*/ + * event_mesh_job_info + */ public interface EventMeshJobInfoService extends IService { + } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshTaskInfoService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshTaskInfoService.java new file mode 100644 index 0000000000..dc35cfe071 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshTaskInfoService.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.db.service; + +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo; + +import com.baomidou.mybatisplus.extension.service.IService; + +/** + * event_mesh_task_info + */ +public interface EventMeshTaskInfoService extends IService { + +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshVerifyService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshVerifyService.java new file mode 100644 index 0000000000..97f2d7268e --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshVerifyService.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.db.service; + +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshVerify; + +import com.baomidou.mybatisplus.extension.service.IService; + +/** + * event_mesh_verify + */ +public interface EventMeshVerifyService extends IService { + +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/Admin.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoExtServiceImpl.java similarity index 53% rename from eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/Admin.java rename to eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoExtServiceImpl.java index 9ee25fadb2..6cf0ebf6b2 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/Admin.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoExtServiceImpl.java @@ -15,32 +15,24 @@ * limitations under the License. */ -package org.apache.eventmesh.admin.server; +package org.apache.eventmesh.admin.server.web.db.service.impl; -import org.apache.eventmesh.common.ComponentLifeCycle; -import org.apache.eventmesh.common.remote.Task; -import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest; -import org.apache.eventmesh.common.utils.PagedList; +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; +import org.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobInfoExtMapper; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoExtService; -/** - * Admin - */ -public interface Admin extends ComponentLifeCycle { - - /** - * support for web or ops - **/ - boolean createOrUpdateTask(Task task); - - boolean deleteTask(Long id); +import java.util.List; - Task getTask(Long id); +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; - // paged list - PagedList getTaskPaged(Task task); +@Service +public class EventMeshJobInfoExtServiceImpl implements EventMeshJobInfoExtService { + @Autowired + EventMeshJobInfoExtMapper mapper; - /** - * support for task - */ - void reportHeartbeat(ReportHeartBeatRequest heartBeat); + @Override + public int batchSave(List jobs) { + return mapper.saveBatch(jobs); + } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java index dd7312ceae..4613e0809d 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java @@ -25,14 +25,10 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import lombok.extern.slf4j.Slf4j; - /** - * for table 'event_mesh_job_info' db operation - * 2024-05-09 15:51:45 + * event_mesh_job_info */ @Service -@Slf4j public class EventMeshJobInfoServiceImpl extends ServiceImpl implements EventMeshJobInfoService { diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshTaskInfoServiceImpl.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshTaskInfoServiceImpl.java new file mode 100644 index 0000000000..9568b63671 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshTaskInfoServiceImpl.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.db.service.impl; + +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo; +import org.apache.eventmesh.admin.server.web.db.mapper.EventMeshTaskInfoMapper; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshTaskInfoService; + +import org.springframework.stereotype.Service; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; + +/** + * event_mesh_task_info + */ +@Service +public class EventMeshTaskInfoServiceImpl extends ServiceImpl + implements EventMeshTaskInfoService { + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java index 56f9f047b7..8f159fa45b 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java @@ -17,14 +17,15 @@ package org.apache.eventmesh.admin.server.web.handler.impl; -import org.apache.eventmesh.admin.server.AdminServerRuntimeException; -import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobDetail; import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; -import org.apache.eventmesh.admin.server.web.service.job.EventMeshJobInfoBizService; +import org.apache.eventmesh.admin.server.web.pojo.JobDetail; +import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.job.JobConnectorConfig; import org.apache.eventmesh.common.remote.request.FetchJobRequest; import org.apache.eventmesh.common.remote.response.FetchJobResponse; +import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.commons.lang3.StringUtils; @@ -38,34 +39,29 @@ public class FetchJobRequestHandler extends BaseRequestHandler { @Autowired - EventMeshJobInfoBizService jobInfoBizService; + JobInfoBizService jobInfoBizService; @Override public FetchJobResponse handler(FetchJobRequest request, Metadata metadata) { if (StringUtils.isBlank(request.getJobID())) { - throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, "job id is empty"); - } - int jobID; - try { - jobID = Integer.parseInt(request.getJobID()); - } catch (NumberFormatException e) { - throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, String.format("illegal job id %s", - request.getJobID())); + return FetchJobResponse.failResponse(ErrorCode.BAD_REQUEST, "job id is empty"); } FetchJobResponse response = FetchJobResponse.successResponse(); - EventMeshJobDetail detail = jobInfoBizService.getJobDetail(request, metadata); + JobDetail detail = jobInfoBizService.getJobDetail(request.getJobID()); if (detail == null) { return response; } - response.setId(detail.getId()); - response.setName(detail.getName()); - response.setSourceConnectorConfig(detail.getSourceConnectorConfig()); - response.setSourceConnectorDesc(detail.getSourceConnectorDesc()); + response.setId(detail.getJobID()); + JobConnectorConfig config = new JobConnectorConfig(); + config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource())); + config.setSourceConnectorDesc(detail.getSourceConnectorDesc()); + config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource())); + config.setSourceConnectorDesc(detail.getSinkConnectorDesc()); + response.setConnectorConfig(config); response.setTransportType(detail.getTransportType()); - response.setSinkConnectorConfig(detail.getSinkConnectorConfig()); - response.setSourceConnectorDesc(detail.getSinkConnectorDesc()); response.setState(detail.getState()); - response.setPosition(detail.getPosition()); + response.setPosition(detail.getPositions()); + response.setType(detail.getJobType()); return response; } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchPositionHandler.java index 2e6fa31f05..85ef0e6113 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchPositionHandler.java @@ -17,10 +17,9 @@ package org.apache.eventmesh.admin.server.web.handler.impl; -import org.apache.eventmesh.admin.server.AdminServerRuntimeException; import org.apache.eventmesh.admin.server.web.db.DBThreadPool; import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; -import org.apache.eventmesh.admin.server.web.service.position.EventMeshPositionBizService; +import org.apache.eventmesh.admin.server.web.service.position.PositionBizService; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.remote.exception.ErrorCode; import org.apache.eventmesh.common.remote.request.FetchPositionRequest; @@ -41,15 +40,15 @@ public class FetchPositionHandler extends BaseRequestHandler { +public class ReportHeartBeatHandler extends BaseRequestHandler { @Autowired - EventMeshRuntimeHeartbeatBizService heartbeatBizService; + RuntimeHeartbeatBizService heartbeatBizService; @Autowired DBThreadPool executor; @Override - protected EmptyAckResponse handler(ReportHeartBeatRequest request, Metadata metadata) { + protected SimpleResponse handler(ReportHeartBeatRequest request, Metadata metadata) { + if (StringUtils.isBlank(request.getJobID()) || StringUtils.isBlank(request.getAddress())) { + log.info("request [{}] id or reporter address is empty", request); + return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "request id or reporter address is empty"); + } executor.getExecutors().execute(() -> { EventMeshRuntimeHeartbeat heartbeat = new EventMeshRuntimeHeartbeat(); - int job; - try { - job = Integer.parseInt(request.getJobID()); - } catch (NumberFormatException e) { - log.warn("runtime {} report heartbeat fail, illegal job id {}", request.getAddress(), request.getJobID()); - return; - } - heartbeat.setJobID(job); + heartbeat.setJobID(request.getJobID()); heartbeat.setReportTime(request.getReportedTimeStamp()); heartbeat.setAdminAddr(IPUtils.getLocalAddress()); heartbeat.setRuntimeAddr(request.getAddress()); @@ -65,6 +65,6 @@ protected EmptyAckResponse handler(ReportHeartBeatRequest request, Metadata meta } }); - return new EmptyAckResponse(); + return SimpleResponse.success(); } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java index adfe110134..5e2a968262 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java @@ -17,15 +17,15 @@ package org.apache.eventmesh.admin.server.web.handler.impl; -import org.apache.eventmesh.admin.server.AdminServerRuntimeException; import org.apache.eventmesh.admin.server.web.db.DBThreadPool; import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; -import org.apache.eventmesh.admin.server.web.service.job.EventMeshJobInfoBizService; -import org.apache.eventmesh.admin.server.web.service.position.EventMeshPositionBizService; +import org.apache.eventmesh.admin.server.web.pojo.JobDetail; +import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService; +import org.apache.eventmesh.admin.server.web.service.position.PositionBizService; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.remote.exception.ErrorCode; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; -import org.apache.eventmesh.common.remote.response.EmptyAckResponse; +import org.apache.eventmesh.common.remote.response.SimpleResponse; import org.apache.commons.lang3.StringUtils; @@ -36,36 +36,33 @@ @Component @Slf4j -public class ReportPositionHandler extends BaseRequestHandler { - +public class ReportPositionHandler extends BaseRequestHandler { @Autowired - EventMeshJobInfoBizService jobInfoBizService; + private JobInfoBizService jobInfoBizService; @Autowired - DBThreadPool executor; + private DBThreadPool executor; @Autowired - EventMeshPositionBizService positionBizService; - + private PositionBizService positionBizService; @Override - protected EmptyAckResponse handler(ReportPositionRequest request, Metadata metadata) { + protected SimpleResponse handler(ReportPositionRequest request, Metadata metadata) { + if (StringUtils.isBlank(request.getJobID())) { + log.info("request [{}] illegal job id", request); + return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, it's empty"); + } if (request.getDataSourceType() == null) { - throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, "illegal data type, it's empty"); + log.info("request [{}] illegal data type", request); + return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal data type, it's empty"); } if (StringUtils.isBlank(request.getJobID())) { - throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, "illegal job id, it's empty"); + log.info("request [{}] illegal job id", request); + return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, it's empty"); } if (request.getRecordPositionList() == null || request.getRecordPositionList().isEmpty()) { - throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, "illegal record position list, it's empty"); - } - int jobID; - - try { - jobID = Integer.parseInt(request.getJobID()); - } catch (NumberFormatException e) { - throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, String.format("illegal job id [%s] format", - request.getJobID())); + log.info("request [{}] illegal record position", request); + return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal record position list, it's empty"); } positionBizService.isValidatePositionRequest(request.getDataSourceType()); @@ -88,8 +85,10 @@ protected EmptyAckResponse handler(ReportPositionRequest request, Metadata metad log.warn("handle position request fail, request [{}]", request, e); } finally { try { - if (!jobInfoBizService.updateJobState(jobID, request.getState())) { - log.warn("update job [{}] state to [{}] fail", jobID, request.getState()); + JobDetail detail = jobInfoBizService.getJobDetail(request.getJobID()); + if (detail != null && !detail.getState().equals(request.getState()) && !jobInfoBizService.updateJobState(request.getJobID(), + request.getState())) { + log.warn("update job [{}] old state [{}] to [{}] fail", request.getJobID(), detail.getState(), request.getState()); } } catch (Exception e) { log.warn("update job id [{}] type [{}] state [{}] fail", request.getJobID(), @@ -97,6 +96,6 @@ protected EmptyAckResponse handler(ReportPositionRequest request, Metadata metad } } }); - return new EmptyAckResponse(); + return SimpleResponse.success(); } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java new file mode 100644 index 0000000000..39963494cf --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.handler.impl; + +import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; +import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.request.ReportVerifyRequest; +import org.apache.eventmesh.common.remote.response.SimpleResponse; + +import org.apache.commons.lang3.StringUtils; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; + +@Component +@Slf4j +public class ReportVerifyHandler extends BaseRequestHandler { + @Autowired + private VerifyBizService verifyService; + + @Override + protected SimpleResponse handler(ReportVerifyRequest request, Metadata metadata) { + if (StringUtils.isAnyBlank(request.getTaskID(), request.getRecordSig(), request.getRecordID(), request.getConnectorStage())) { + log.info("report verify request [{}] illegal", request); + return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "request task id, sign, record id or stage is none"); + } + return verifyService.reportVerifyRecord(request) ? SimpleResponse.success() : SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save verify " + + "request fail"); + } +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/JobDetail.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/JobDetail.java new file mode 100644 index 0000000000..c47b284483 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/JobDetail.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.pojo; + +import org.apache.eventmesh.common.remote.TaskState; +import org.apache.eventmesh.common.remote.TransportType; +import org.apache.eventmesh.common.remote.datasource.DataSource; +import org.apache.eventmesh.common.remote.job.JobType; +import org.apache.eventmesh.common.remote.offset.RecordPosition; + +import java.util.Date; +import java.util.List; + +import lombok.Data; + +@Data +public class JobDetail { + private Integer id; + + private String jobID; + + private String desc; + + private String taskID; + + private TaskState state; + + private JobType jobType; + + private Date createTime; + + private Date updateTime; + + private String createUid; + + private String updateUid; + + private String region; + + private DataSource sourceDataSource; + + private String sourceConnectorDesc; + + private DataSource sinkDataSource; + + private String sinkConnectorDesc; + + private TransportType transportType; + + private List positions; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobType.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/TaskDetail.java similarity index 88% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobType.java rename to eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/TaskDetail.java index 3c8272af40..86f5342f35 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobType.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/TaskDetail.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package org.apache.eventmesh.common.remote; +package org.apache.eventmesh.admin.server.web.pojo; -public enum JobType { - FULL, - INCREASE, - STRUCT_SYNC +/** + * Description: + */ +public class TaskDetail { } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/AdminGrpcServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/AdminGrpcServer.java similarity index 81% rename from eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/AdminGrpcServer.java rename to eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/AdminGrpcServer.java index 3bac237088..bc822ad6c3 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/AdminGrpcServer.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/AdminGrpcServer.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.admin.server.web; +package org.apache.eventmesh.admin.server.web.service; import org.apache.eventmesh.admin.server.AdminServerRuntimeException; import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; @@ -26,8 +26,7 @@ import org.apache.eventmesh.common.remote.payload.PayloadUtil; import org.apache.eventmesh.common.remote.request.BaseRemoteRequest; import org.apache.eventmesh.common.remote.response.BaseRemoteResponse; -import org.apache.eventmesh.common.remote.response.EmptyAckResponse; -import org.apache.eventmesh.common.remote.response.FailResponse; +import org.apache.eventmesh.common.remote.response.SimpleResponse; import org.apache.commons.lang3.StringUtils; @@ -48,24 +47,26 @@ public class AdminGrpcServer extends AdminServiceGrpc.AdminServiceImplBase { private Payload process(Payload value) { if (value == null || StringUtils.isBlank(value.getMetadata().getType())) { - return PayloadUtil.from(FailResponse.build(ErrorCode.BAD_REQUEST, "bad request: type not exists")); + return PayloadUtil.from(SimpleResponse.fail(ErrorCode.BAD_REQUEST, "bad request: type not exists")); } try { BaseRequestHandler handler = handlerFactory.getHandler(value.getMetadata().getType()); if (handler == null) { - return PayloadUtil.from(FailResponse.build(BaseRemoteResponse.UNKNOWN, "not match any request handler")); + return PayloadUtil.from(SimpleResponse.fail(ErrorCode.BAD_REQUEST, "not match any request handler")); } BaseRemoteResponse response = handler.handlerRequest((BaseRemoteRequest) PayloadUtil.parse(value), value.getMetadata()); - if (response == null || response instanceof EmptyAckResponse) { - return null; + if (response == null) { + log.warn("received request type [{}] handler [{}], then replay empty response", value.getMetadata().getType(), + handler.getClass().getName()); + response = SimpleResponse.success(); } return PayloadUtil.from(response); } catch (Exception e) { log.warn("process payload {} fail", value.getMetadata().getType(), e); if (e instanceof AdminServerRuntimeException) { - return PayloadUtil.from(FailResponse.build(((AdminServerRuntimeException) e).getCode(), e.getMessage())); + return PayloadUtil.from(SimpleResponse.fail(((AdminServerRuntimeException) e).getCode(), e.getMessage())); } - return PayloadUtil.from(FailResponse.build(ErrorCode.INTERNAL_ERR, "admin server internal err")); + return PayloadUtil.from(SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "admin server internal err")); } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/AdminServer.java similarity index 82% rename from eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServer.java rename to eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/AdminServer.java index a2e4cc7063..fd7582800d 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServer.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/AdminServer.java @@ -15,16 +15,16 @@ * limitations under the License. */ -package org.apache.eventmesh.admin.server; +package org.apache.eventmesh.admin.server.web.service; +import org.apache.eventmesh.admin.server.AdminServerProperties; +import org.apache.eventmesh.admin.server.AdminServerRuntimeException; +import org.apache.eventmesh.common.ComponentLifeCycle; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.config.CommonConfiguration; import org.apache.eventmesh.common.config.ConfigService; -import org.apache.eventmesh.common.remote.Task; import org.apache.eventmesh.common.remote.exception.ErrorCode; -import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest; import org.apache.eventmesh.common.utils.IPUtils; -import org.apache.eventmesh.common.utils.PagedList; import org.apache.eventmesh.registry.RegisterServerInfo; import org.apache.eventmesh.registry.RegistryFactory; import org.apache.eventmesh.registry.RegistryService; @@ -41,8 +41,7 @@ @Service @Slf4j -public class AdminServer implements Admin, ApplicationListener { - +public class AdminServer implements ComponentLifeCycle, ApplicationListener { private final RegistryService registryService; private final RegisterServerInfo adminServeInfo; @@ -67,32 +66,6 @@ public AdminServer(AdminServerProperties properties) { registryService = RegistryFactory.getInstance(configuration.getEventMeshRegistryPluginType()); } - - @Override - public boolean createOrUpdateTask(Task task) { - return false; - } - - @Override - public boolean deleteTask(Long id) { - return false; - } - - @Override - public Task getTask(Long id) { - return null; - } - - @Override - public PagedList getTaskPaged(Task task) { - return null; - } - - @Override - public void reportHeartbeat(ReportHeartBeatRequest heartBeat) { - - } - @Override @PostConstruct public void start() { diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/datasource/DataSourceBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/datasource/DataSourceBizService.java new file mode 100644 index 0000000000..433847a4cd --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/datasource/DataSourceBizService.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.service.datasource; + +import org.apache.eventmesh.admin.server.AdminServerRuntimeException; +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.request.CreateOrUpdateDataSourceReq; +import org.apache.eventmesh.common.utils.JsonUtils; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class DataSourceBizService { + @Autowired + private EventMeshDataSourceService dataSourceService; + + public EventMeshDataSource createDataSource(CreateOrUpdateDataSourceReq dataSource) { + EventMeshDataSource entity = new EventMeshDataSource(); + entity.setConfiguration(JsonUtils.toJSONString(dataSource.getConfig())); + entity.setDataType(dataSource.getType().name()); + entity.setCreateUid(dataSource.getOperator()); + entity.setUpdateUid(dataSource.getOperator()); + entity.setRegion(dataSource.getRegion()); + entity.setDescription(dataSource.getDesc()); + if (dataSourceService.save(entity)) { + return entity; + } + throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "save data source fail"); + } +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/heatbeat/EventMeshRuntimeHeartbeatBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/heatbeat/RuntimeHeartbeatBizService.java similarity index 98% rename from eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/heatbeat/EventMeshRuntimeHeartbeatBizService.java rename to eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/heatbeat/RuntimeHeartbeatBizService.java index 4fa80b270a..95dff6e5b3 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/heatbeat/EventMeshRuntimeHeartbeatBizService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/heatbeat/RuntimeHeartbeatBizService.java @@ -34,7 +34,7 @@ */ @Service @Slf4j -public class EventMeshRuntimeHeartbeatBizService { +public class RuntimeHeartbeatBizService { @Autowired EventMeshRuntimeHistoryService historyService; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/EventMeshJobInfoBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/EventMeshJobInfoBizService.java deleted file mode 100644 index 79771cbf24..0000000000 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/EventMeshJobInfoBizService.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.admin.server.web.service.job; - -import org.apache.eventmesh.admin.server.AdminServerRuntimeException; -import org.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource; -import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobDetail; -import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; -import org.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService; -import org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; -import org.apache.eventmesh.admin.server.web.service.position.EventMeshPositionBizService; -import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; -import org.apache.eventmesh.common.remote.JobState; -import org.apache.eventmesh.common.remote.exception.ErrorCode; -import org.apache.eventmesh.common.remote.job.DataSourceType; -import org.apache.eventmesh.common.remote.job.JobTransportType; -import org.apache.eventmesh.common.remote.request.FetchJobRequest; -import org.apache.eventmesh.common.utils.JsonUtils; - -import org.apache.commons.lang3.StringUtils; - -import java.util.Map; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import com.baomidou.mybatisplus.core.toolkit.Wrappers; -import com.fasterxml.jackson.core.type.TypeReference; - -import lombok.extern.slf4j.Slf4j; - -/** - * for table 'event_mesh_job_info' db operation - * 2024-05-09 15:51:45 - */ -@Service -@Slf4j -public class EventMeshJobInfoBizService { - - @Autowired - EventMeshJobInfoService jobInfoService; - - @Autowired - EventMeshDataSourceService dataSourceService; - - @Autowired - EventMeshPositionBizService positionBizService; - - public boolean updateJobState(Integer jobID, JobState state) { - if (jobID == null || state == null) { - return false; - } - EventMeshJobInfo jobInfo = new EventMeshJobInfo(); - jobInfo.setJobID(jobID); - jobInfo.setState(state.ordinal()); - jobInfoService.update(jobInfo, Wrappers.update().notIn("state", JobState.DELETE.ordinal(), - JobState.COMPLETE.ordinal())); - return true; - } - - public EventMeshJobDetail getJobDetail(FetchJobRequest request, Metadata metadata) { - if (request == null) { - return null; - } - EventMeshJobInfo job = jobInfoService.getById(request.getJobID()); - if (job == null) { - return null; - } - EventMeshJobDetail detail = new EventMeshJobDetail(); - detail.setId(job.getJobID()); - detail.setName(job.getName()); - EventMeshDataSource source = dataSourceService.getById(job.getSourceData()); - EventMeshDataSource target = dataSourceService.getById(job.getTargetData()); - if (source != null) { - if (!StringUtils.isBlank(source.getConfiguration())) { - try { - detail.setSourceConnectorConfig(JsonUtils.parseTypeReferenceObject(source.getConfiguration(), - new TypeReference>() { - })); - } catch (Exception e) { - log.warn("parse source config id [{}] fail", job.getSourceData(), e); - throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal source data source config"); - } - } - detail.setSourceConnectorDesc(source.getDescription()); - if (source.getDataType() != null) { - detail.setPosition(positionBizService.getPositionByJobID(job.getJobID(), - DataSourceType.getDataSourceType(source.getDataType()))); - - } - } - if (target != null) { - if (!StringUtils.isBlank(target.getConfiguration())) { - try { - detail.setSinkConnectorConfig(JsonUtils.parseTypeReferenceObject(target.getConfiguration(), - new TypeReference>() { - })); - } catch (Exception e) { - log.warn("parse sink config id [{}] fail", job.getSourceData(), e); - throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal target data sink config"); - } - } - detail.setSinkConnectorDesc(target.getDescription()); - } - - JobState state = JobState.fromIndex(job.getState()); - if (state == null) { - throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal job state in db"); - } - detail.setState(state); - detail.setTransportType(JobTransportType.getJobTransportType(job.getTransportType())); - return detail; - } -} - - - - diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java new file mode 100644 index 0000000000..9affa10e60 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.service.job; + +import org.apache.eventmesh.admin.server.AdminServerRuntimeException; +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource; +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoExtService; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; +import org.apache.eventmesh.admin.server.web.pojo.JobDetail; +import org.apache.eventmesh.admin.server.web.service.datasource.DataSourceBizService; +import org.apache.eventmesh.admin.server.web.service.position.PositionBizService; +import org.apache.eventmesh.common.remote.TaskState; +import org.apache.eventmesh.common.remote.TransportType; +import org.apache.eventmesh.common.remote.datasource.DataSource; +import org.apache.eventmesh.common.remote.datasource.DataSourceType; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.request.CreateOrUpdateDataSourceReq; +import org.apache.eventmesh.common.utils.JsonUtils; + +import org.apache.commons.lang3.StringUtils; + +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import com.baomidou.mybatisplus.core.toolkit.Wrappers; + +import lombok.extern.slf4j.Slf4j; + +/** + * for table 'event_mesh_job_info' db operation + */ +@Service +@Slf4j +public class JobInfoBizService { + + @Autowired + private EventMeshJobInfoService jobInfoService; + + @Autowired + private EventMeshJobInfoExtService jobInfoExtService; + + @Autowired + private DataSourceBizService dataSourceBizService; + + @Autowired + private EventMeshDataSourceService dataSourceService; + + @Autowired + private PositionBizService positionBizService; + + public boolean updateJobState(String jobID, TaskState state) { + if (jobID == null || state == null) { + return false; + } + EventMeshJobInfo jobInfo = new EventMeshJobInfo(); + jobInfo.setState(state.name()); + return jobInfoService.update(jobInfo, Wrappers.update().eq("jobID", jobID).ne("state", TaskState.DELETE.name())); + } + + @Transactional + public List createJobs(List jobs) { + if (jobs == null || jobs.isEmpty() || jobs.stream().anyMatch(job -> StringUtils.isBlank(job.getTaskID()))) { + log.warn("when create jobs, task id is empty or jobs config is empty "); + return null; + } + List entityList = new LinkedList<>(); + for (JobDetail job : jobs) { + EventMeshJobInfo entity = new EventMeshJobInfo(); + entity.setState(TaskState.INIT.name()); + entity.setTaskID(job.getTaskID()); + entity.setJobType(job.getJobType().name()); + entity.setDesc(job.getDesc()); + String jobID = UUID.randomUUID().toString(); + entity.setJobID(jobID); + entity.setTransportType(job.getTransportType().name()); + entity.setCreateUid(job.getCreateUid()); + entity.setUpdateUid(job.getUpdateUid()); + entity.setFromRegion(job.getRegion()); + CreateOrUpdateDataSourceReq source = new CreateOrUpdateDataSourceReq(); + source.setType(job.getTransportType().getSrc()); + source.setOperator(job.getCreateUid()); + source.setRegion(job.getRegion()); + source.setDesc(job.getSourceConnectorDesc()); + source.setConfig(job.getSourceDataSource()); + EventMeshDataSource createdSource = dataSourceBizService.createDataSource(source); + entity.setSourceData(createdSource.getId()); + + CreateOrUpdateDataSourceReq sink = new CreateOrUpdateDataSourceReq(); + sink.setType(job.getTransportType().getDst()); + sink.setOperator(job.getCreateUid()); + sink.setRegion(job.getRegion()); + sink.setDesc(job.getSinkConnectorDesc()); + sink.setConfig(job.getSinkDataSource()); + EventMeshDataSource createdSink = dataSourceBizService.createDataSource(source); + entity.setTargetData(createdSink.getId()); + + entityList.add(entity); + } + int changed = jobInfoExtService.batchSave(entityList); + if (changed != jobs.size()) { + throw new AdminServerRuntimeException(ErrorCode.INTERNAL_ERR, String.format("create [%d] jobs of not match expect [%d]", + changed, jobs.size())); + } + return entityList; + } + + + public JobDetail getJobDetail(String jobID) { + if (jobID == null) { + return null; + } + EventMeshJobInfo job = jobInfoService.getById(jobID); + if (job == null) { + return null; + } + JobDetail detail = new JobDetail(); + detail.setJobID(job.getJobID()); + EventMeshDataSource source = dataSourceService.getById(job.getSourceData()); + EventMeshDataSource target = dataSourceService.getById(job.getTargetData()); + if (source != null) { + if (!StringUtils.isBlank(source.getConfiguration())) { + try { + detail.setSourceDataSource(JsonUtils.parseObject(source.getConfiguration(), DataSource.class)); + } catch (Exception e) { + log.warn("parse source config id [{}] fail", job.getSourceData(), e); + throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal source data source config"); + } + } + detail.setSourceConnectorDesc(source.getDescription()); + if (source.getDataType() != null) { + detail.setPositions(positionBizService.getPositionByJobID(job.getJobID(), + DataSourceType.getDataSourceType(source.getDataType()))); + + } + } + if (target != null) { + if (!StringUtils.isBlank(target.getConfiguration())) { + try { + detail.setSinkDataSource(JsonUtils.parseObject(target.getConfiguration(), DataSource.class)); + } catch (Exception e) { + log.warn("parse sink config id [{}] fail", job.getSourceData(), e); + throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal target data sink config"); + } + } + detail.setSinkConnectorDesc(target.getDescription()); + } + + TaskState state = TaskState.fromIndex(job.getState()); + if (state == null) { + throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal job state in db"); + } + detail.setState(state); + detail.setTransportType(TransportType.getTransportType(job.getTransportType())); + return detail; + } +} + + + + diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionBizService.java similarity index 92% rename from eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java rename to eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionBizService.java index d3b6ff555e..c40fc9e7e5 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionBizService.java @@ -19,8 +19,8 @@ import org.apache.eventmesh.admin.server.AdminServerRuntimeException; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.datasource.DataSourceType; import org.apache.eventmesh.common.remote.exception.ErrorCode; -import org.apache.eventmesh.common.remote.job.DataSourceType; import org.apache.eventmesh.common.remote.offset.RecordPosition; import org.apache.eventmesh.common.remote.request.FetchPositionRequest; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; @@ -34,7 +34,7 @@ @Service @Slf4j -public class EventMeshPositionBizService { +public class PositionBizService { @Autowired PositionHandlerFactory factory; @@ -70,14 +70,14 @@ public boolean reportPosition(ReportPositionRequest request, Metadata metadata) return handler.handler(request, metadata); } - public List getPositionByJobID(Integer jobID, DataSourceType type) { + public List getPositionByJobID(String jobID, DataSourceType type) { if (jobID == null || type == null) { return null; } isValidatePositionRequest(type); PositionHandler handler = factory.getHandler(type); FetchPositionRequest request = new FetchPositionRequest(); - request.setJobID(String.valueOf(jobID)); + request.setJobID(jobID); return handler.handler(request, null); } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionHandler.java index 921b0b2e59..e09c1a3837 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionHandler.java @@ -17,7 +17,7 @@ package org.apache.eventmesh.admin.server.web.service.position; -import org.apache.eventmesh.common.remote.job.DataSourceType; +import org.apache.eventmesh.common.remote.datasource.DataSourceType; public abstract class PositionHandler implements IReportPositionHandler, IFetchPositionHandler { diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionHandlerFactory.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionHandlerFactory.java index 751291351d..c2065f80f4 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionHandlerFactory.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionHandlerFactory.java @@ -17,7 +17,7 @@ package org.apache.eventmesh.admin.server.web.service.position; -import org.apache.eventmesh.common.remote.job.DataSourceType; +import org.apache.eventmesh.common.remote.datasource.DataSourceType; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java index f2c174c3b7..352ba57e96 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java @@ -23,7 +23,7 @@ import org.apache.eventmesh.admin.server.web.db.service.EventMeshPositionReporterHistoryService; import org.apache.eventmesh.admin.server.web.service.position.PositionHandler; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; -import org.apache.eventmesh.common.remote.job.DataSourceType; +import org.apache.eventmesh.common.remote.datasource.DataSourceType; import org.apache.eventmesh.common.remote.offset.RecordPosition; import org.apache.eventmesh.common.remote.offset.canal.CanalRecordOffset; import org.apache.eventmesh.common.remote.offset.canal.CanalRecordPartition; @@ -31,8 +31,12 @@ import org.apache.eventmesh.common.remote.request.ReportPositionRequest; import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.commons.lang3.StringUtils; + +import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.locks.LockSupport; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DuplicateKeyException; @@ -45,6 +49,9 @@ @Component @Slf4j public class MysqlPositionHandler extends PositionHandler { + private static final int RETRY_TIMES = 3; + + private final long retryPeriod = Duration.ofMillis(500).toNanos(); @Autowired EventMeshMysqlPositionService positionService; @@ -57,20 +64,44 @@ protected DataSourceType getSourceType() { return DataSourceType.MYSQL; } + private boolean isNotForward(EventMeshMysqlPosition now, EventMeshMysqlPosition old) { + if (StringUtils.isNotBlank(old.getJournalName()) && old.getJournalName().equals(now.getJournalName()) + && old.getPosition() >= now.getPosition()) { + log.info("job [{}] report position [{}] by runtime [{}] less than db position [{}] journal name [{}] by [{}]", + now.getJobID(), now.getPosition(), now.getAddress(), now.getJournalName(), old.getPosition(), old.getAddress()); + return true; + } + return false; + } + public boolean saveOrUpdateByJob(EventMeshMysqlPosition position) { - EventMeshMysqlPosition old = positionService.getOne(Wrappers.query().eq("jobId", - position.getJobID())); - if (old == null) { - return positionService.save(position); - } else { - if (old.getPosition() >= position.getPosition()) { - log.info("job [{}] report position [{}] by runtime [{}] less than db position [{}] by [{}]", - position.getJobID(), position.getPosition(), position.getAddress(), old.getPosition(), old.getAddress()); + for (int i = 0; i < RETRY_TIMES; i++) { + EventMeshMysqlPosition old = positionService.getOne(Wrappers.query().eq("jobId", + position.getJobID())); + if (old == null) { + try { + return positionService.save(position); + } catch (DuplicateKeyException e) { + log.warn("current insert position fail, it will retry in 500ms"); + LockSupport.parkNanos(retryPeriod); + continue; + } catch (Exception e) { + log.warn("insert position fail catch unknown exception", e); + return false; + } + } + + if (isNotForward(position, old)) { return true; } try { - return positionService.update(position, Wrappers.update().eq("updateTime", - old.getUpdateTime())); + if (!positionService.update(position, Wrappers.update().eq("updateTime", + old.getUpdateTime()).eq("jobID", old.getJobID()))) { + log.warn("update position [{}] fail, maybe current update. it will retry in 500ms", position); + LockSupport.parkNanos(retryPeriod); + continue; + } + return true; } finally { if (old.getAddress() != null && !old.getAddress().equals(position.getAddress())) { EventMeshPositionReporterHistory history = new EventMeshPositionReporterHistory(); @@ -87,61 +118,53 @@ public boolean saveOrUpdateByJob(EventMeshMysqlPosition position) { } } } + return false; } @Override public boolean handler(ReportPositionRequest request, Metadata metadata) { - for (int i = 0; i < 3; i++) { - try { - List recordPositionList = request.getRecordPositionList(); - RecordPosition recordPosition = recordPositionList.get(0); - if (recordPosition == null || recordPosition.getRecordPartition() == null || recordPosition.getRecordOffset() == null) { - log.warn("report mysql position, but record-partition/partition/offset is null"); - return false; - } - if (!(recordPosition.getRecordPartition() instanceof CanalRecordPartition)) { - log.warn("report mysql position, but record partition class [{}] not match [{}]", - recordPosition.getRecordPartition().getRecordPartitionClass(), CanalRecordPartition.class); - return false; - } - if (!(recordPosition.getRecordOffset() instanceof CanalRecordOffset)) { - log.warn("report mysql position, but record offset class [{}] not match [{}]", - recordPosition.getRecordOffset().getRecordOffsetClass(), CanalRecordOffset.class); - return false; - } - EventMeshMysqlPosition position = new EventMeshMysqlPosition(); - position.setJobID(Integer.parseInt(request.getJobID())); - position.setAddress(request.getAddress()); - CanalRecordOffset offset = (CanalRecordOffset) recordPosition.getRecordOffset(); - if (offset != null) { - position.setPosition(offset.getOffset()); - position.setGtid(offset.getGtid()); - position.setCurrentGtid(offset.getCurrentGtid()); - } - CanalRecordPartition partition = (CanalRecordPartition) recordPosition.getRecordPartition(); - if (partition != null) { - position.setServerUUID(partition.getServerUUID()); - position.setTimestamp(partition.getTimeStamp()); - position.setJournalName(partition.getJournalName()); - } - if (!saveOrUpdateByJob(position)) { - log.warn("update job position fail [{}]", request); - return false; - } - return true; - } catch (DuplicateKeyException e) { - log.warn("concurrent report position job [{}], it will try again", request.getJobID()); - } catch (Exception e) { - log.warn("save position job [{}] fail", request.getJobID(), e); + + try { + List recordPositionList = request.getRecordPositionList(); + RecordPosition recordPosition = recordPositionList.get(0); + if (recordPosition == null || recordPosition.getRecordPartition() == null || recordPosition.getRecordOffset() == null) { + log.warn("report mysql position, but record-partition/partition/offset is null"); return false; } - try { - Thread.sleep(200); - } catch (InterruptedException ignore) { - log.warn("save position thread interrupted, [{}]", request); - return true; + if (!(recordPosition.getRecordPartition() instanceof CanalRecordPartition)) { + log.warn("report mysql position, but record partition class [{}] not match [{}]", + recordPosition.getRecordPartition().getRecordPartitionClass(), CanalRecordPartition.class); + return false; + } + if (!(recordPosition.getRecordOffset() instanceof CanalRecordOffset)) { + log.warn("report mysql position, but record offset class [{}] not match [{}]", + recordPosition.getRecordOffset().getRecordOffsetClass(), CanalRecordOffset.class); + return false; + } + EventMeshMysqlPosition position = new EventMeshMysqlPosition(); + position.setJobID(request.getJobID()); + position.setAddress(request.getAddress()); + CanalRecordOffset offset = (CanalRecordOffset) recordPosition.getRecordOffset(); + if (offset != null) { + position.setPosition(offset.getOffset()); + position.setGtid(offset.getGtid()); + position.setCurrentGtid(offset.getCurrentGtid()); } + CanalRecordPartition partition = (CanalRecordPartition) recordPosition.getRecordPartition(); + if (partition != null) { + position.setServerUUID(partition.getServerUUID()); + position.setTimestamp(partition.getTimeStamp()); + position.setJournalName(partition.getJournalName()); + } + if (!saveOrUpdateByJob(position)) { + log.warn("update job position fail [{}]", request); + return false; + } + return true; + } catch (Exception e) { + log.warn("save position job [{}] fail", request.getJobID(), e); } + return false; } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java new file mode 100644 index 0000000000..b4fdc57af0 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.service.task; + +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshTaskInfoService; +import org.apache.eventmesh.admin.server.web.pojo.JobDetail; +import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService; +import org.apache.eventmesh.common.remote.TaskState; +import org.apache.eventmesh.common.remote.request.CreateTaskRequest; + +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +@Service +public class TaskBizService { + @Autowired + private EventMeshTaskInfoService taskInfoService; + + @Autowired + private JobInfoBizService jobInfoService; + + @Transactional + public String createTask(CreateTaskRequest req) { + String taskID = UUID.randomUUID().toString(); + List jobs = req.getJobs().stream().map(x -> { + JobDetail job = parse(x); + job.setTaskID(taskID); + job.setRegion(req.getRegion()); + job.setCreateUid(req.getUid()); + job.setUpdateUid(req.getUid()); + return job; + }).collect(Collectors.toList()); + jobInfoService.createJobs(jobs); + EventMeshTaskInfo taskInfo = new EventMeshTaskInfo(); + taskInfo.setTaskID(taskID); + taskInfo.setName(req.getName()); + taskInfo.setDesc(req.getDesc()); + taskInfo.setState(TaskState.INIT.name()); + taskInfo.setCreateUid(req.getUid()); + taskInfo.setFromRegion(req.getRegion()); + taskInfoService.save(taskInfo); + return taskID; + } + + private JobDetail parse(CreateTaskRequest.JobDetail src) { + JobDetail dst = new JobDetail(); + dst.setDesc(src.getDesc()); + dst.setTransportType(src.getTransportType()); + dst.setSourceConnectorDesc(src.getSourceConnectorDesc()); + dst.setSourceDataSource(src.getSourceDataSource()); + dst.setSinkConnectorDesc(src.getSinkConnectorDesc()); + dst.setSinkDataSource(src.getSinkDataSource()); + dst.setJobType(src.getJobType()); + return dst; + } +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java new file mode 100644 index 0000000000..74f208b199 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.service.verify; + +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshVerify; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshVerifyService; +import org.apache.eventmesh.common.remote.request.ReportVerifyRequest; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class VerifyBizService { + @Autowired + private EventMeshVerifyService verifyService; + + public boolean reportVerifyRecord(ReportVerifyRequest request) { + EventMeshVerify verify = new EventMeshVerify(); + verify.setRecordID(request.getRecordID()); + verify.setRecordSig(request.getRecordSig()); + verify.setPosition(request.getPosition()); + verify.setTaskID(request.getTaskID()); + verify.setConnectorName(request.getConnectorName()); + verify.setConnectorStage(request.getConnectorStage()); + return verifyService.save(verify); + } +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/offset/OffsetStorageConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/offset/OffsetStorageConfig.java index e8ff1d4909..60448d3691 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/offset/OffsetStorageConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/offset/OffsetStorageConfig.java @@ -17,7 +17,7 @@ package org.apache.eventmesh.common.config.connector.offset; -import org.apache.eventmesh.common.remote.job.DataSourceType; +import org.apache.eventmesh.common.remote.datasource.DataSourceType; import java.util.Map; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TaskState.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TaskState.java new file mode 100644 index 0000000000..606339c443 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TaskState.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote; + +import java.util.HashMap; +import java.util.Map; + +import lombok.ToString; + +@ToString +public enum TaskState { + INIT, STARTED, RUNNING, PAUSE, COMPLETE, DELETE, FAIL; + private static final TaskState[] STATES_NUM_INDEX = TaskState.values(); + private static final Map STATES_NAME_INDEX = new HashMap<>(); + static { + for (TaskState jobState : STATES_NUM_INDEX) { + STATES_NAME_INDEX.put(jobState.name(), jobState); + } + } + + public static TaskState fromIndex(Integer index) { + if (index == null || index < 0 || index >= STATES_NUM_INDEX.length) { + return null; + } + + return STATES_NUM_INDEX[index]; + } + + public static TaskState fromIndex(String index) { + if (index == null || index.isEmpty()) { + return null; + } + + return STATES_NAME_INDEX.get(index); + } +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobTransportType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java similarity index 57% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobTransportType.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java index 5f06cf1f2c..95a88a23fa 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobTransportType.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java @@ -15,51 +15,51 @@ * limitations under the License. */ -package org.apache.eventmesh.common.remote.job; +package org.apache.eventmesh.common.remote; + +import org.apache.eventmesh.common.remote.datasource.DataSourceType; import java.util.HashMap; import java.util.Map; -public enum JobTransportType { +import lombok.Getter; + +@Getter +public enum TransportType { MYSQL_MYSQL(DataSourceType.MYSQL, DataSourceType.MYSQL), REDIS_REDIS(DataSourceType.REDIS, DataSourceType.REDIS), - ROCKETMQ_ROCKETMQ(DataSourceType.ROCKETMQ, DataSourceType.ROCKETMQ); - private static final Map INDEX_TYPES = new HashMap<>(); - private static final JobTransportType[] TYPES = JobTransportType.values(); + ROCKETMQ_ROCKETMQ(DataSourceType.ROCKETMQ, DataSourceType.ROCKETMQ), + MYSQL_HTTP(DataSourceType.MYSQL, DataSourceType.HTTP), + HTTP_MYSQL(DataSourceType.HTTP, DataSourceType.MYSQL), + REDIS_MQ(DataSourceType.REDIS, DataSourceType.ROCKETMQ); + private static final Map INDEX_TYPES = new HashMap<>(); + private static final TransportType[] TYPES = TransportType.values(); private static final String SEPARATOR = "@"; static { - for (JobTransportType type : TYPES) { - INDEX_TYPES.put(generateKey(type.src, type.dst), type); + for (TransportType type : TYPES) { + INDEX_TYPES.put(type.name(), type); } } - DataSourceType src; + private final DataSourceType src; - DataSourceType dst; + private final DataSourceType dst; - JobTransportType(DataSourceType src, DataSourceType dst) { + TransportType(DataSourceType src, DataSourceType dst) { this.src = src; this.dst = dst; } - private static String generateKey(DataSourceType src, DataSourceType dst) { - return src.ordinal() + SEPARATOR + dst.ordinal(); - } - - public DataSourceType getSrc() { - return src; - } - public DataSourceType getDst() { - return dst; - } - - public static JobTransportType getJobTransportType(DataSourceType src, DataSourceType dst) { - return INDEX_TYPES.get(generateKey(src, dst)); + public static TransportType getTransportType(String index) { + if (index == null || index.isEmpty()) { + return null; + } + return INDEX_TYPES.get(index); } - public static JobTransportType getJobTransportType(Integer index) { + public static TransportType getTransportType(Integer index) { if (index == null || index < 0 || index >= TYPES.length) { return null; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSource.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSource.java new file mode 100644 index 0000000000..7af3812f24 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSource.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.datasource; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import lombok.Getter; + +@Getter +public class DataSource { + private final DataSourceType type; + private String desc; + @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS) + @JsonSubTypes({ + @JsonSubTypes.Type(value = MySqlIncDataSourceSourceConf.class, name = "MySqlIncDataSourceSourceConf") + }) + private final DataSourceConf conf; + private final Class confClazz; + + public DataSource(DataSourceType type, DataSourceConf conf) { + this.type = type; + this.conf = conf; + this.confClazz = conf.getConfClass(); + } + + +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceClassify.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceClassify.java similarity index 91% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceClassify.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceClassify.java index 36f4064e70..8cb01c9204 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceClassify.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceClassify.java @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.eventmesh.common.remote.job; +package org.apache.eventmesh.common.remote.datasource; public enum DataSourceClassify { // relationship db RDB, MQ, - CACHE; + CACHE, + TUNNEL; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/Job.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceConf.java similarity index 83% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/Job.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceConf.java index b65517bece..9701a9fa11 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/Job.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceConf.java @@ -15,11 +15,9 @@ * limitations under the License. */ -package org.apache.eventmesh.common.remote; +package org.apache.eventmesh.common.remote.datasource; -public class Job { - private long id; - private long taskID; - private JobType type; - private JobState state; + +public abstract class DataSourceConf { + public abstract Class getConfClass(); } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceDriverType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceDriverType.java similarity index 91% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceDriverType.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceDriverType.java index 52bc811195..4429bee5a9 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceDriverType.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceDriverType.java @@ -15,10 +15,11 @@ * limitations under the License. */ -package org.apache.eventmesh.common.remote.job; +package org.apache.eventmesh.common.remote.datasource; public enum DataSourceDriverType { MYSQL, REDIS, - ROCKETMQ; + ROCKETMQ, + HTTP; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java similarity index 72% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceType.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java index e41865d24a..985f311b92 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceType.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java @@ -15,12 +15,29 @@ * limitations under the License. */ -package org.apache.eventmesh.common.remote.job; +package org.apache.eventmesh.common.remote.datasource; +import java.util.HashMap; +import java.util.Map; + +import lombok.Getter; +import lombok.ToString; + +@Getter +@ToString public enum DataSourceType { MYSQL("MySQL", DataSourceDriverType.MYSQL, DataSourceClassify.RDB), REDIS("Redis", DataSourceDriverType.REDIS, DataSourceClassify.CACHE), - ROCKETMQ("RocketMQ", DataSourceDriverType.ROCKETMQ, DataSourceClassify.MQ); + ROCKETMQ("RocketMQ", DataSourceDriverType.ROCKETMQ, DataSourceClassify.MQ), + HTTP("HTTP", DataSourceDriverType.HTTP, DataSourceClassify.TUNNEL); + private static final Map INDEX_TYPES = new HashMap<>(); + private static final DataSourceType[] TYPES = DataSourceType.values(); + static { + for (DataSourceType type : TYPES) { + INDEX_TYPES.put(type.name(), type); + } + } + private final String name; private final DataSourceDriverType driverType; private final DataSourceClassify classify; @@ -31,20 +48,13 @@ public enum DataSourceType { this.classify = classify; } - public String getName() { - return name; - } - - public DataSourceDriverType getDriverType() { - return driverType; - } - - public DataSourceClassify getClassify() { - return classify; + public static DataSourceType getDataSourceType(String index) { + if (index == null || index.isEmpty()) { + return null; + } + return INDEX_TYPES.get(index); } - private static final DataSourceType[] TYPES = DataSourceType.values(); - public static DataSourceType getDataSourceType(Integer index) { if (index == null || index < 0 || index >= TYPES.length) { return null; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/MySqlIncDataSourceSourceConf.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/MySqlIncDataSourceSourceConf.java new file mode 100644 index 0000000000..f8c825e963 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/MySqlIncDataSourceSourceConf.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.datasource; + +import org.apache.eventmesh.common.config.connector.rdb.canal.SourceConnectorConfig; +import org.apache.eventmesh.common.remote.job.SyncConsistency; +import org.apache.eventmesh.common.remote.job.SyncMode; +import org.apache.eventmesh.common.remote.offset.RecordPosition; + +import java.util.List; + +public class MySqlIncDataSourceSourceConf extends DataSourceConf { + @Override + public Class getConfClass() { + return MySqlIncDataSourceSourceConf.class; + } + + private String destination; + + private Long canalInstanceId; + + private String desc; + + private boolean ddlSync = true; + + private boolean filterTableError = false; + + private Long slaveId; + + private Short clientId; + + private String serverUUID; + + private boolean isMariaDB = true; + + private boolean isGTIDMode = true; + + private Integer batchSize = 10000; + + private Long batchTimeout = -1L; + + private String tableFilter; + + private String fieldFilter; + + private List recordPositions; + + // ================================= channel parameter + // ================================ + + // enable remedy + private Boolean enableRemedy = false; + + // sync mode: field/row + private SyncMode syncMode; + + // sync consistency + private SyncConsistency syncConsistency; + + // ================================= system parameter + // ================================ + + // Column name of the bidirectional synchronization mark + private String needSyncMarkTableColumnName = "needSync"; + + // Column value of the bidirectional synchronization mark + private String needSyncMarkTableColumnValue = "needSync"; + + private SourceConnectorConfig sourceConnectorConfig; +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobConnectorConfig.java similarity index 69% rename from eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobConnectorConfig.java index 849a90a883..14e8178cf3 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobConnectorConfig.java @@ -15,26 +15,17 @@ * limitations under the License. */ -package org.apache.eventmesh.admin.server.web.db.entity; +package org.apache.eventmesh.common.remote.job; -import org.apache.eventmesh.common.remote.JobState; -import org.apache.eventmesh.common.remote.job.JobTransportType; -import org.apache.eventmesh.common.remote.offset.RecordPosition; - -import java.util.List; import java.util.Map; import lombok.Data; +/** + * Description: + */ @Data -public class EventMeshJobDetail { - - private Integer id; - - private String name; - - private JobTransportType transportType; - +public class JobConnectorConfig { private Map sourceConnectorConfig; private String sourceConnectorDesc; @@ -42,8 +33,4 @@ public class EventMeshJobDetail { private Map sinkConnectorConfig; private String sinkConnectorDesc; - - private List position; - - private JobState state; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobType.java new file mode 100644 index 0000000000..83d2f56964 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobType.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.job; + +import java.util.HashMap; +import java.util.Map; + +public enum JobType { + FULL, + INCREASE, + CHECK; + + private static final JobType[] STATES_NUM_INDEX = JobType.values(); + private static final Map STATES_NAME_INDEX = new HashMap<>(); + static { + for (JobType jobType : STATES_NUM_INDEX) { + STATES_NAME_INDEX.put(jobType.name(), jobType); + } + } + + public static JobType fromIndex(Integer index) { + if (index == null || index < 0 || index > STATES_NUM_INDEX.length) { + return null; + } + + return STATES_NUM_INDEX[index]; + } + + public static JobType fromIndex(String index) { + if (index == null || index.isEmpty()) { + return null; + } + + return STATES_NAME_INDEX.get(index); + } +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/BaseRemoteRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/BaseRemoteRequest.java index 3eba07836a..b8c4c06207 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/BaseRemoteRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/BaseRemoteRequest.java @@ -27,7 +27,7 @@ @Getter public abstract class BaseRemoteRequest implements IPayload { - private Map header = new HashMap<>(); + private final Map header = new HashMap<>(); public void addHeader(String key, String value) { if (key == null || value == null) { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java new file mode 100644 index 0000000000..4ecf9b4527 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.request; + +import org.apache.eventmesh.common.remote.datasource.DataSource; +import org.apache.eventmesh.common.remote.datasource.DataSourceType; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * create or update datasource with custom data source config + */ +@Data +@EqualsAndHashCode(callSuper = true) +public class CreateOrUpdateDataSourceReq extends BaseRemoteRequest { + private Integer id; + private DataSourceType type; + private String desc; + private DataSource config; + private String region; + private String operator; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java new file mode 100644 index 0000000000..ce24e03416 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.request; + +import org.apache.eventmesh.common.remote.TransportType; +import org.apache.eventmesh.common.remote.datasource.DataSource; +import org.apache.eventmesh.common.remote.job.JobType; + +import java.util.List; + +import lombok.Data; + +/** + * Description: create task without task id, otherwise update task + */ +@Data +public class CreateTaskRequest { + private String name; + private String desc; + private String uid; + private List jobs; + private String region; + + @Data + public static class JobDetail { + private String desc; + + private JobType jobType; + + private DataSource sourceDataSource; + + private String sourceConnectorDesc; + + private DataSource sinkDataSource; + + private String sinkConnectorDesc; + + private TransportType transportType; + } +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchPositionRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchPositionRequest.java index db9d0ced2a..90563251ab 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchPositionRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchPositionRequest.java @@ -17,7 +17,7 @@ package org.apache.eventmesh.common.remote.request; -import org.apache.eventmesh.common.remote.job.DataSourceType; +import org.apache.eventmesh.common.remote.datasource.DataSourceType; import org.apache.eventmesh.common.remote.offset.RecordPosition; import lombok.Data; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportPositionRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportPositionRequest.java index 79b05607f0..42694d5675 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportPositionRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportPositionRequest.java @@ -17,24 +17,26 @@ package org.apache.eventmesh.common.remote.request; -import org.apache.eventmesh.common.remote.JobState; -import org.apache.eventmesh.common.remote.job.DataSourceType; +import org.apache.eventmesh.common.remote.TaskState; +import org.apache.eventmesh.common.remote.datasource.DataSourceType; import org.apache.eventmesh.common.remote.offset.RecordPosition; import java.util.List; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.ToString; @Data @EqualsAndHashCode(callSuper = true) +@ToString public class ReportPositionRequest extends BaseRemoteRequest { private String jobID; private List recordPositionList; - private JobState state; + private TaskState state; private String address; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java index 87f4581eb5..cd541949f4 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java @@ -19,9 +19,11 @@ import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.ToString; @Data @EqualsAndHashCode(callSuper = true) +@ToString public class ReportVerifyRequest extends BaseRemoteRequest { private String taskID; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseRemoteResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseRemoteResponse.java index b6f5daa565..3ea8401535 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseRemoteResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseRemoteResponse.java @@ -27,8 +27,6 @@ @Getter public abstract class BaseRemoteResponse implements IPayload { - - public static final int UNKNOWN = -1; @Setter private boolean success = true; @Setter diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/EmptyAckResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/CreateTaskResponse.java similarity index 88% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/EmptyAckResponse.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/CreateTaskResponse.java index e51091fe94..a6f5628d6f 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/EmptyAckResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/CreateTaskResponse.java @@ -17,9 +17,5 @@ package org.apache.eventmesh.common.remote.response; -/** - * empty, just mean remote received request - */ -public class EmptyAckResponse extends BaseRemoteResponse { - +public class CreateTaskResponse extends BaseRemoteResponse { } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java index a51cb32b9c..95d2d157e0 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java @@ -17,13 +17,14 @@ package org.apache.eventmesh.common.remote.response; -import org.apache.eventmesh.common.remote.JobState; +import org.apache.eventmesh.common.remote.TaskState; +import org.apache.eventmesh.common.remote.TransportType; import org.apache.eventmesh.common.remote.exception.ErrorCode; -import org.apache.eventmesh.common.remote.job.JobTransportType; +import org.apache.eventmesh.common.remote.job.JobConnectorConfig; +import org.apache.eventmesh.common.remote.job.JobType; import org.apache.eventmesh.common.remote.offset.RecordPosition; import java.util.List; -import java.util.Map; import lombok.Data; import lombok.EqualsAndHashCode; @@ -32,23 +33,17 @@ @EqualsAndHashCode(callSuper = true) public class FetchJobResponse extends BaseRemoteResponse { - private Integer id; + private String id; - private String name; + private TransportType transportType; - private JobTransportType transportType; - - private Map sourceConnectorConfig; - - private String sourceConnectorDesc; - - private Map sinkConnectorConfig; - - private String sinkConnectorDesc; + private JobConnectorConfig connectorConfig; private List position; - private JobState state; + private TaskState state; + + private JobType type; public static FetchJobResponse successResponse() { FetchJobResponse response = new FetchJobResponse(); diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FailResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/SimpleResponse.java similarity index 67% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FailResponse.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/SimpleResponse.java index d1d01dc59c..a4cdd52f99 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FailResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/SimpleResponse.java @@ -17,9 +17,18 @@ package org.apache.eventmesh.common.remote.response; -public class FailResponse extends BaseRemoteResponse { - public static FailResponse build(int errorCode, String msg) { - FailResponse response = new FailResponse(); +import org.apache.eventmesh.common.remote.exception.ErrorCode; + +public class SimpleResponse extends BaseRemoteResponse { + /** + * just mean remote received or process success + */ + public static SimpleResponse success() { + return new SimpleResponse(); + } + + public static SimpleResponse fail(int errorCode, String msg) { + SimpleResponse response = new SimpleResponse(); response.setErrorCode(errorCode); response.setDesc(msg); response.setSuccess(false); @@ -33,7 +42,7 @@ public static FailResponse build(int errorCode, String msg) { * @param exception exception * @return response */ - public static FailResponse build(Throwable exception) { - return build(BaseRemoteResponse.UNKNOWN, exception.getMessage()); + public static SimpleResponse fail(Throwable exception) { + return fail(ErrorCode.INTERNAL_ERR, exception.getMessage()); } } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java index 7fa762d67b..bf91957032 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java @@ -62,6 +62,14 @@ public static T mapToObject(Map map, Class beanClass) { return beanClass.cast(obj); } + public static Map objectToMap(Object obj) { + if (obj == null) { + return null; + } + return OBJECT_MAPPER.convertValue(obj, new TypeReference>() { + }); + } + /** * Serialize object to json string. * diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java index 1aebcf6364..de7a45dc99 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java @@ -24,6 +24,7 @@ import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition; import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLColumnDef; import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLTableDef; +import org.apache.eventmesh.common.exception.EventMeshException; import org.apache.eventmesh.connector.canal.SqlUtils; import java.sql.JDBCType; @@ -88,6 +89,7 @@ protected void run() { } if (columns == null || columns.isEmpty() || columns.get(table.getTableName()) == null) { log.warn("init db [{}] table [{}] info, and columns are empty", db.getSchemaName(), table.getTableName()); + throw new EventMeshException("db [{}] table [{}] columns are empty"); } else { LinkedHashMap cols = new LinkedHashMap<>(); columns.get(table.getTableName()).forEach(x -> cols.put(x.getName(), x)); @@ -95,8 +97,9 @@ protected void run() { } this.tables.put(new RdbSimpleTable(db.getSchemaName(), table.getTableName()), mysqlTable); - } catch (Exception e) { + } catch (SQLException e) { log.error("init rdb table schema [{}] table [{}] fail", db.getSchemaName(), table.getTableName(), e); + throw new EventMeshException(e); } } diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java index c784069f97..08270fc024 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java @@ -23,8 +23,8 @@ import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceStub; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; -import org.apache.eventmesh.common.remote.JobState; -import org.apache.eventmesh.common.remote.job.DataSourceType; +import org.apache.eventmesh.common.remote.TaskState; +import org.apache.eventmesh.common.remote.datasource.DataSourceType; import org.apache.eventmesh.common.remote.offset.RecordOffset; import org.apache.eventmesh.common.remote.offset.RecordPartition; import org.apache.eventmesh.common.remote.offset.RecordPosition; @@ -71,7 +71,7 @@ public class AdminOffsetService implements OffsetManagementService { private String jobId; - private JobState jobState; + private TaskState jobState; private DataSourceType dataSourceType; @@ -271,7 +271,7 @@ public void onCompleted() { log.info("init record offset {}", initialRecordOffsetMap); positionStore.putAll(initialRecordOffsetMap); } - this.jobState = JobState.RUNNING; + this.jobState = TaskState.RUNNING; this.jobId = offsetStorageConfig.getExtensions().get("jobId"); } } diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java index b13a5b35c5..dd248bf04d 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java @@ -196,12 +196,12 @@ private void initConnectorService() throws Exception { } connectorRuntimeConfig.setSourceConnectorType(jobResponse.getTransportType().getSrc().getName()); - connectorRuntimeConfig.setSourceConnectorDesc(jobResponse.getSourceConnectorDesc()); - connectorRuntimeConfig.setSourceConnectorConfig(jobResponse.getSourceConnectorConfig()); + connectorRuntimeConfig.setSourceConnectorDesc(jobResponse.getConnectorConfig().getSourceConnectorDesc()); + connectorRuntimeConfig.setSourceConnectorConfig(jobResponse.getConnectorConfig().getSourceConnectorConfig()); connectorRuntimeConfig.setSinkConnectorType(jobResponse.getTransportType().getDst().getName()); - connectorRuntimeConfig.setSinkConnectorDesc(jobResponse.getSinkConnectorDesc()); - connectorRuntimeConfig.setSinkConnectorConfig(jobResponse.getSinkConnectorConfig()); + connectorRuntimeConfig.setSinkConnectorDesc(jobResponse.getConnectorConfig().getSinkConnectorDesc()); + connectorRuntimeConfig.setSinkConnectorConfig(jobResponse.getConnectorConfig().getSinkConnectorConfig()); ConnectorCreateService sourceConnectorCreateService = ConnectorPluginFactory.createConnector(connectorRuntimeConfig.getSourceConnectorType() + "-Source"); diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/util/TraceUtilsTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/util/TraceUtilsTest.java new file mode 100644 index 0000000000..34989b6442 --- /dev/null +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/util/TraceUtilsTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.util; + +import org.apache.eventmesh.runtime.boot.EventMeshServer; +import org.apache.eventmesh.runtime.mock.MockCloudEvent; +import org.apache.eventmesh.runtime.trace.Trace; + +import java.util.Map; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import io.cloudevents.SpecVersion; +import io.opentelemetry.api.trace.Span; + +public class TraceUtilsTest { + @Test + public void testShouldPrepareClientSpan() throws Exception { + Map cloudEventExtensionMap = EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), new MockCloudEvent()); + try (MockedStatic dummyStatic = Mockito.mockStatic(EventMeshServer.class)) { + Trace trace = Trace.getInstance("zipkin", true); + trace.init(); + dummyStatic.when(EventMeshServer::getTrace).thenReturn(trace); + Span testClientSpan = TraceUtils.prepareClientSpan( + cloudEventExtensionMap, + "test client span", + false + ); + Assertions.assertNotNull(testClientSpan); + } + } + + @Test + public void testShouldPrepareServerSpan() throws Exception { + Map cloudEventExtensionMap = EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), new MockCloudEvent()); + try (MockedStatic dummyStatic = Mockito.mockStatic(EventMeshServer.class)) { + Trace trace = Trace.getInstance("zipkin", true); + trace.init(); + dummyStatic.when(EventMeshServer::getTrace).thenReturn(trace); + TraceUtils.prepareClientSpan( + cloudEventExtensionMap, + "test client span", + false + ); + Span testServerSpan = TraceUtils.prepareServerSpan( + cloudEventExtensionMap, + "test server span", + false + ); + Assertions.assertNotNull(testServerSpan); + } + } + + @Test + public void testShouldFinishSpan() throws Exception { + MockCloudEvent cloudEvent = new MockCloudEvent(); + Map cloudEventExtensionMap = EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), cloudEvent); + try (MockedStatic dummyStatic = Mockito.mockStatic(EventMeshServer.class)) { + Trace trace = Trace.getInstance("zipkin", true); + trace.init(); + dummyStatic.when(EventMeshServer::getTrace).thenReturn(trace); + Span testClientSpan = TraceUtils.prepareClientSpan( + cloudEventExtensionMap, + "test client span", + false + ); + + TraceUtils.finishSpan(testClientSpan, cloudEvent); + Assertions.assertFalse(testClientSpan.isRecording()); + } + } +}