From 2ba54c7751a83348a41fc4eaa25898faaee11951 Mon Sep 17 00:00:00 2001 From: mike_xwm Date: Thu, 1 Aug 2024 14:35:25 +0800 Subject: [PATCH] [ISSUE #5052] Enhancement for source\sink connector (#5066) * [ISSUE #5040] Support gtid mode for sync data with mysql * fix conflicts with master * fix checkstyle error * [ISSUE #5044] Data synchronization strong verification in mariadb gtid mode * fix checkstyle error * [ISSUE #5048] Add report verify request to admin for connector runtime * fix checkstyle error * [ISSUE #5052] Enhancement for source\sink connector * fix checkstyle error * fix checkstyle error --- eventmesh-admin-server/conf/eventmesh.sql | 52 ++++++----------- .../web/service/job/JobInfoBizService.java | 1 - .../sink/connector/CanalSinkConnector.java | 41 ++++++++++++- .../connector/CanalSinkFullConnector.java | 5 ++ .../connector/CanalSourceConnector.java | 5 ++ .../connector/CanalSourceFullConnector.java | 5 ++ .../connector/ChatGPTSourceConnector.java | 5 ++ .../sink/connector/DingDingSinkConnector.java | 5 ++ .../sink/connector/FileSinkConnector.java | 5 ++ .../source/connector/FileSourceConnector.java | 5 ++ .../http/sink/HttpSinkConnector.java | 5 ++ .../http/source/HttpSourceConnector.java | 5 ++ .../jdbc/sink/JdbcSinkConnector.java | 5 ++ .../jdbc/source/JdbcSourceConnector.java | 5 ++ .../sink/connector/KafkaSinkConnector.java | 5 ++ .../connector/KafkaSourceConnector.java | 5 ++ .../sink/connector/KnativeSinkConnector.java | 5 ++ .../connector/KnativeSourceConnector.java | 5 ++ .../sink/connector/LarkSinkConnector.java | 5 ++ .../sink/connector/MongodbSinkConnector.java | 5 ++ .../connector/MongodbSourceConnector.java | 5 ++ .../connector/OpenFunctionSinkConnector.java | 5 ++ .../OpenFunctionSourceConnector.java | 5 ++ .../sink/connector/PravegaSinkConnector.java | 5 ++ .../connector/PravegaSourceConnector.java | 5 ++ .../connector/PrometheusSourceConnector.java | 5 ++ .../sink/connector/PulsarSinkConnector.java | 5 ++ .../connector/PulsarSourceConnector.java | 5 ++ .../sink/connector/RabbitMQSinkConnector.java | 5 ++ .../connector/RabbitMQSourceConnector.java | 5 ++ .../sink/connector/RedisSinkConnector.java | 5 ++ .../connector/RedisSourceConnector.java | 5 ++ .../sink/connector/RocketMQSinkConnector.java | 5 ++ .../connector/RocketMQSourceConnector.java | 5 ++ .../source/connector/S3SourceConnector.java | 5 ++ .../sink/connector/SlackSinkConnector.java | 5 ++ .../sink/connector/SpringSinkConnector.java | 5 ++ .../source/MessageSendingOperations.java | 2 +- .../connector/SpringSourceConnector.java | 14 +++-- .../sink/connector/WeChatSinkConnector.java | 5 ++ .../sink/connector/WeComSinkConnector.java | 5 ++ .../spring/pub/SpringPubController.java | 10 ++-- .../eventmesh/openconnect/SourceWorker.java | 10 ++-- .../openconnect/api/connector/Connector.java | 11 +++- .../api/connector/SourceConnectorContext.java | 3 + .../api/callback/SendExceptionContext.java} | 6 +- .../api/callback/SendMessageCallback.java | 4 +- .../offsetmgmt}/api/callback/SendResult.java | 2 +- .../offsetmgmt/api/data/ConnectRecord.java | 25 +++++++- .../runtime/connector/ConnectorRuntime.java | 57 ++++++++++++++----- .../connector/ConnectorRuntimeConfig.java | 2 + .../src/main/resources/connector.yaml | 1 + 52 files changed, 344 insertions(+), 77 deletions(-) rename eventmesh-openconnect/{eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java => eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendExceptionContext.java} (90%) rename eventmesh-openconnect/{eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect => eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt}/api/callback/SendMessageCallback.java (87%) rename eventmesh-openconnect/{eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect => eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt}/api/callback/SendResult.java (95%) diff --git a/eventmesh-admin-server/conf/eventmesh.sql b/eventmesh-admin-server/conf/eventmesh.sql index 586ab1c266..82d5c53317 100644 --- a/eventmesh-admin-server/conf/eventmesh.sql +++ b/eventmesh-admin-server/conf/eventmesh.sql @@ -23,11 +23,11 @@ /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */; --- 导出 eventmesh 的数据库结构 +-- export eventmesh database CREATE DATABASE IF NOT EXISTS `eventmesh` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci */ /*!80016 DEFAULT ENCRYPTION='N' */; USE `eventmesh`; --- 导出 表 eventmesh.event_mesh_data_source 结构 +-- export table eventmesh.event_mesh_data_source structure CREATE TABLE IF NOT EXISTS `event_mesh_data_source` ( `id` int unsigned NOT NULL AUTO_INCREMENT, `dataType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', @@ -39,11 +39,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_data_source` ( `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) USING BTREE -) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; - --- 数据导出被取消选择。 +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; --- 导出 表 eventmesh.event_mesh_job_info 结构 +-- export table eventmesh.event_mesh_job_info structure CREATE TABLE IF NOT EXISTS `event_mesh_job_info` ( `id` int unsigned NOT NULL AUTO_INCREMENT, `jobID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, @@ -61,11 +59,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_job_info` ( `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) USING BTREE, UNIQUE KEY `jobID` (`jobID`) -) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; - --- 数据导出被取消选择。 +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; --- 导出 表 eventmesh.event_mesh_mysql_position 结构 +-- export table eventmesh.event_mesh_mysql_position structure CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` ( `id` int unsigned NOT NULL AUTO_INCREMENT, `jobID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', @@ -80,11 +76,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` ( `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `jobID` (`jobID`) -) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC; - --- 数据导出被取消选择。 +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC; --- 导出 表 eventmesh.event_mesh_position_reporter_history 结构 +-- export table eventmesh.event_mesh_position_reporter_history structure CREATE TABLE IF NOT EXISTS `event_mesh_position_reporter_history` ( `id` bigint NOT NULL AUTO_INCREMENT, `job` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', @@ -94,27 +88,23 @@ CREATE TABLE IF NOT EXISTS `event_mesh_position_reporter_history` ( PRIMARY KEY (`id`), KEY `job` (`job`), KEY `address` (`address`) -) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='记录position上报者变更时,老记录'; - --- 数据导出被取消选择。 +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='record position reporter changes'; --- 导出 表 eventmesh.event_mesh_runtime_heartbeat 结构 +-- export table eventmesh.event_mesh_runtime_heartbeat structure CREATE TABLE IF NOT EXISTS `event_mesh_runtime_heartbeat` ( `id` bigint unsigned NOT NULL AUTO_INCREMENT, `adminAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `runtimeAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `jobID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, - `reportTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'runtime本地上报时间', + `reportTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'runtime local report time', `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `runtimeAddr` (`runtimeAddr`), KEY `jobID` (`jobID`) -) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; - --- 数据导出被取消选择。 +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; --- 导出 表 eventmesh.event_mesh_runtime_history 结构 +-- export table eventmesh.event_mesh_runtime_history structure CREATE TABLE IF NOT EXISTS `event_mesh_runtime_history` ( `id` bigint NOT NULL AUTO_INCREMENT, `job` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', @@ -122,17 +112,15 @@ CREATE TABLE IF NOT EXISTS `event_mesh_runtime_history` ( `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`), KEY `address` (`address`) -) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='记录runtime上运行任务的变更'; - --- 数据导出被取消选择。 +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='record runtime task change history'; --- 导出 表 eventmesh.event_mesh_task_info 结构 +-- export table eventmesh.event_mesh_task_info structure CREATE TABLE IF NOT EXISTS `event_mesh_task_info` ( `id` int unsigned NOT NULL AUTO_INCREMENT, `taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, `name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, - `state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'TaskState', + `state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'taskstate', `fromRegion` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', `createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', `updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', @@ -140,11 +128,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_task_info` ( `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) USING BTREE, UNIQUE KEY `taskID` (`taskID`) -) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; - --- 数据导出被取消选择。 +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; --- 导出 表 eventmesh.event_mesh_verify 结构 +-- export table eventmesh.event_mesh_verify structure CREATE TABLE IF NOT EXISTS `event_mesh_verify` ( `id` int NOT NULL, `taskID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, @@ -157,8 +143,6 @@ CREATE TABLE IF NOT EXISTS `event_mesh_verify` ( PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; --- 数据导出被取消选择。 - /*!40101 SET SQL_MODE=IFNULL(@OLD_SQL_MODE, '') */; /*!40014 SET FOREIGN_KEY_CHECKS=IFNULL(@OLD_FOREIGN_KEY_CHECKS, 1) */; /*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java index 357cf5d992..9affa10e60 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java @@ -50,7 +50,6 @@ /** * for table 'event_mesh_job_info' db operation - * 2024-05-09 15:51:45 */ @Service @Slf4j diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java index 8ecda8e125..2ecb2384ac 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java @@ -38,6 +38,8 @@ import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; import org.apache.eventmesh.openconnect.api.sink.Sink; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import org.apache.commons.lang.StringUtils; @@ -146,6 +148,11 @@ public String name() { return this.sinkConfig.getSinkConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { executor.shutdown(); @@ -159,7 +166,7 @@ public void put(List sinkRecords) { List canalConnectRecordList = (List) connectRecord.getData(); canalConnectRecordList = filterRecord(canalConnectRecordList); if (isDdlDatas(canalConnectRecordList)) { - doDdl(context, canalConnectRecordList); + doDdl(context, canalConnectRecordList, connectRecord); } else if (sinkConfig.isGTIDMode()) { doLoadWithGtid(context, sinkConfig, connectRecord); } else { @@ -197,7 +204,7 @@ private List filterRecord(List canalConn .collect(Collectors.toList()); } - private void doDdl(DbLoadContext context, List canalConnectRecordList) { + private void doDdl(DbLoadContext context, List canalConnectRecordList, ConnectRecord connectRecord) { for (final CanalConnectRecord record : canalConnectRecordList) { try { Boolean result = jdbcTemplate.execute(new StatementCallback() { @@ -217,9 +224,30 @@ public Boolean doInStatement(Statement stmt) throws SQLException, DataAccessExce context.getFailedRecords().add(record); } } catch (Throwable e) { + connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord, e)); throw new RuntimeException(e); } } + connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord)); + } + + private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Throwable e) { + SendExceptionContext sendExceptionContext = new SendExceptionContext(); + sendExceptionContext.setMessageId(record.getRecordId()); + sendExceptionContext.setCause(e); + if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) { + sendExceptionContext.setTopic(record.getExtension("topic")); + } + return sendExceptionContext; + } + + private SendResult convertToSendResult(ConnectRecord record) { + SendResult result = new SendResult(); + result.setMessageId(record.getRecordId()); + if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) { + result.setTopic(record.getExtension("topic")); + } + return result; } private void doBefore(List canalConnectRecordList, final DbLoadData loadData) { @@ -291,6 +319,9 @@ private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig sinkConfig, C Exception ex = null; try { ex = result.get(); + if (ex == null) { + connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord)); + } } catch (Exception e) { ex = e; } @@ -298,14 +329,16 @@ private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig sinkConfig, C if (skipException != null && skipException) { if (ex != null) { // do skip - log.warn("skip exception for data : {} , caused by {}", + log.warn("skip exception will ack data : {} , caused by {}", filteredRows, ExceptionUtils.getFullStackTrace(ex)); GtidBatchManager.removeGtidBatch(gtid); + connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord)); } } else { if (ex != null) { log.error("sink connector will shutdown by " + ex.getMessage(), ExceptionUtils.getFullStackTrace(ex)); + connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord, ex)); gtidSingleExecutor.shutdown(); System.exit(1); } else { @@ -314,6 +347,8 @@ private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig sinkConfig, C } } else { log.info("Batch received, waiting for other batches."); + // ack this record + connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord)); } } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java index 36c03b156c..2b4c9d7a94 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java @@ -109,6 +109,11 @@ public String name() { return null; } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void put(List sinkRecords) { if (sinkRecords == null || sinkRecords.isEmpty() || sinkRecords.get(0) == null) { diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java index f3f8b2e160..ea5ccdeed0 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java @@ -267,6 +267,11 @@ public String name() { return this.sourceConfig.getSourceConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { if (!running) { diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java index df3c7571c2..97730463b5 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java @@ -159,6 +159,11 @@ public String name() { return this.config.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public List poll() { while (flag.get()) { diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java index 4d54cb2191..6b122087e5 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java @@ -224,6 +224,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { Throwable t = this.server.close().cause(); diff --git a/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java b/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java index 417d9cef36..8c5a1e6611 100644 --- a/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java @@ -103,6 +103,11 @@ public String name() { return this.sinkConfig.getSinkConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { isRunning = false; diff --git a/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/sink/connector/FileSinkConnector.java b/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/sink/connector/FileSinkConnector.java index 89222b35b0..fabae0d43a 100644 --- a/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/sink/connector/FileSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/sink/connector/FileSinkConnector.java @@ -103,6 +103,11 @@ public String name() { return this.sinkConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { outputStream.flush(); diff --git a/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java b/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java index 6ea0a0d33b..68b1a50989 100644 --- a/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java @@ -86,6 +86,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { try { diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java index 6d38b45306..8a14756372 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java @@ -107,6 +107,11 @@ public String name() { return this.httpSinkConfig.connectorConfig.getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() throws Exception { this.sinkHandler.stop(); diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java index 4155aff910..1ca325b18d 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java @@ -144,6 +144,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { if (this.server != null) { diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java index 39681bf179..cc00f1e142 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java @@ -139,6 +139,11 @@ public String name() { return this.sinkConfig.getSinkConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + /** * Stops the Connector. * diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java index 2b2efcbef2..810a59e723 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java @@ -192,6 +192,11 @@ public String name() { return "JDBC Source Connector"; } + @Override + public void onException(ConnectRecord record) { + + } + /** * Stops the Connector. * diff --git a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java index b257cd0f44..0adafc1ce6 100644 --- a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java @@ -94,6 +94,11 @@ public String name() { return this.sinkConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { producer.close(); diff --git a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java index a3be1cbf93..d573126934 100644 --- a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java @@ -94,6 +94,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { kafkaConsumer.unsubscribe(); diff --git a/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/sink/connector/KnativeSinkConnector.java b/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/sink/connector/KnativeSinkConnector.java index a12a1c7461..b14f77ecd4 100644 --- a/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/sink/connector/KnativeSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/sink/connector/KnativeSinkConnector.java @@ -82,6 +82,11 @@ public String name() { return this.sinkConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { started.compareAndSet(true, false); diff --git a/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/source/connector/KnativeSourceConnector.java b/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/source/connector/KnativeSourceConnector.java index 537c1ad4d9..1b0c033e8f 100644 --- a/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/source/connector/KnativeSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/source/connector/KnativeSourceConnector.java @@ -65,6 +65,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { started.compareAndSet(true, false); diff --git a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java index d340dffd13..9981322e8f 100644 --- a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java @@ -110,6 +110,11 @@ public String name() { return this.sinkConfig.getSinkConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { if (!started.compareAndSet(true, false)) { diff --git a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java index 776ea8d71f..1001ffa584 100644 --- a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java @@ -87,6 +87,11 @@ public String name() { return this.sinkConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() throws Exception { this.client.stop(); diff --git a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java index e57c396719..df3f66d6a6 100644 --- a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java @@ -93,6 +93,11 @@ public String name() { return this.sourceConfig.connectorConfig.getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() throws Exception { this.client.stop(); diff --git a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java index 63444efe28..0f00a7e381 100644 --- a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java @@ -74,6 +74,11 @@ public String name() { return this.sinkConfig.getSinkConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { } diff --git a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java index b66bf9b18c..534ecfb79d 100644 --- a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java @@ -76,6 +76,11 @@ public String name() { return this.sourceConfig.getSourceConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { diff --git a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/sink/connector/PravegaSinkConnector.java b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/sink/connector/PravegaSinkConnector.java index e5f09e4350..e089ef6760 100644 --- a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/sink/connector/PravegaSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/sink/connector/PravegaSinkConnector.java @@ -109,6 +109,11 @@ public String name() { return this.sinkConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { writerMap.forEach((topic, writer) -> writer.close()); diff --git a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java index 2611617d8f..836779dbcf 100644 --- a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java @@ -148,6 +148,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { sourceHandlerMap.forEach((topic, handler) -> { diff --git a/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java b/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java index 5c78c718e3..0cafed73f3 100644 --- a/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java @@ -145,6 +145,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { log.info("prometheus source connector stop."); diff --git a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java index 9ff1f22a29..3f90c6c1be 100644 --- a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java @@ -85,6 +85,11 @@ public String name() { return this.sinkConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { try { diff --git a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java index 212d3eb487..0bc576221e 100644 --- a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java @@ -87,6 +87,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { try { diff --git a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java index 4a94a2cb1f..08d1cefbac 100644 --- a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java @@ -95,6 +95,11 @@ public String name() { return this.sinkConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { if (started) { diff --git a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java index 655c20d9b9..0b7e726bda 100644 --- a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java @@ -117,6 +117,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { if (started) { diff --git a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java index 83c3498a99..5b7d27c3ba 100644 --- a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java @@ -85,6 +85,11 @@ public String name() { return this.sinkConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() throws Exception { this.redissonClient.shutdown(); diff --git a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java index 70adce59e2..868639c205 100644 --- a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java @@ -94,6 +94,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() throws Exception { this.topic.removeAllListeners(); diff --git a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java index ae9d4824e5..31d45a28f4 100644 --- a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java @@ -78,6 +78,11 @@ public String name() { return this.sinkConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { producer.shutdown(); diff --git a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java index 8ccb84acce..410f927d75 100644 --- a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java @@ -206,6 +206,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { consumer.unsubscribe(sourceConfig.getConnectorConfig().getTopic()); diff --git a/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java b/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java index d0dc30c15e..078ed7691a 100644 --- a/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java @@ -121,6 +121,11 @@ public String name() { return this.sourceConfig.getSourceConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() throws Exception { diff --git a/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java index e48760d506..836409af71 100644 --- a/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java @@ -84,6 +84,11 @@ public String name() { return this.sinkConfig.getSinkConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { isRunning = false; diff --git a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/sink/connector/SpringSinkConnector.java b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/sink/connector/SpringSinkConnector.java index 94c40eea50..9ba99cd547 100644 --- a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/sink/connector/SpringSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/sink/connector/SpringSinkConnector.java @@ -77,6 +77,11 @@ public String name() { return this.sinkConfig.getSinkConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() throws Exception { diff --git a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java index a337c1cd81..5f38914bb1 100644 --- a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java +++ b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java @@ -17,7 +17,7 @@ package org.apache.eventmesh.connector.spring.source; -import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback; /** * Operations for sending messages. diff --git a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java index 2ab5a3a3c0..db286eb609 100644 --- a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java @@ -25,10 +25,10 @@ import org.apache.eventmesh.common.remote.offset.spring.SpringRecordPartition; import org.apache.eventmesh.connector.spring.source.MessageSendingOperations; import org.apache.eventmesh.openconnect.SourceWorker; -import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback; import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; import org.apache.eventmesh.openconnect.api.source.Source; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import java.util.ArrayList; @@ -95,6 +95,11 @@ public String name() { return this.sourceConfig.getSourceConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() throws Exception { @@ -123,6 +128,7 @@ public List poll() { /** * Send message. + * * @param message message to send */ @Override @@ -136,9 +142,9 @@ public void send(Object message) { /** * Send message with a callback. - * @param message message to send. - * @param workerCallback After the user sends the message to the Connector, - * the SourceWorker will fetch message and invoke. + * + * @param message message to send. + * @param workerCallback After the user sends the message to the Connector, the SourceWorker will fetch message and invoke. */ @Override public void send(Object message, SendMessageCallback workerCallback) { diff --git a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java index dec3f5e5de..6908d119b9 100644 --- a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java @@ -115,6 +115,11 @@ public String name() { return this.sinkConfig.getSinkConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() throws IOException { isRunning = false; diff --git a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java index ef6aed58c5..ca628fa590 100644 --- a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java @@ -95,6 +95,11 @@ public String name() { return this.sinkConfig.getSinkConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() throws IOException { isRunning = false; diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/spring/pub/SpringPubController.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/spring/pub/SpringPubController.java index b7ea8890ee..a734bb6efa 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/spring/pub/SpringPubController.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/spring/pub/SpringPubController.java @@ -19,9 +19,9 @@ import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.connector.spring.source.connector.SpringSourceConnector; -import org.apache.eventmesh.openconnect.api.callback.SendExcepionContext; -import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback; -import org.apache.eventmesh.openconnect.api.callback.SendResult; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult; import java.util.HashMap; import java.util.Map; @@ -53,8 +53,8 @@ public void onSuccess(SendResult sendResult) { } @Override - public void onException(SendExcepionContext sendExcepionContext) { - log.info("Spring source worker send message to EventMesh failed!", sendExcepionContext.getCause()); + public void onException(SendExceptionContext sendExceptionContext) { + log.info("Spring source worker send message to EventMesh failed!", sendExceptionContext.getCause()); } }); return "success!"; diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java index 6e48aa1de8..2a2162a7af 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java +++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java @@ -32,11 +32,11 @@ import org.apache.eventmesh.common.protocol.tcp.UserAgent; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.common.utils.SystemUtils; -import org.apache.eventmesh.openconnect.api.callback.SendExcepionContext; -import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback; -import org.apache.eventmesh.openconnect.api.callback.SendResult; import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; import org.apache.eventmesh.openconnect.api.source.Source; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffsetManagement; import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.DefaultOffsetManagementServiceImpl; @@ -264,8 +264,8 @@ private SendResult convertToSendResult(CloudEvent event) { return result; } - private SendExcepionContext convertToExceptionContext(CloudEvent event, Throwable cause) { - SendExcepionContext exceptionContext = new SendExcepionContext(); + private SendExceptionContext convertToExceptionContext(CloudEvent event, Throwable cause) { + SendExceptionContext exceptionContext = new SendExceptionContext(); exceptionContext.setTopic(event.getId()); exceptionContext.setMessageId(event.getId()); exceptionContext.setCause(cause); diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java index 8ac09eac38..07e44aea94 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java +++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java @@ -34,8 +34,7 @@ public interface Connector extends ComponentLifeCycle { Class configClass(); /** - * This init method is obsolete. For detailed discussion, - * please see here + * This init method is obsolete. For detailed discussion, please see here *

* Initializes the Connector with the provided configuration. * @@ -67,4 +66,12 @@ public interface Connector extends ComponentLifeCycle { */ String name(); + /** + * This method will be called when an exception occurs while processing a ConnectRecord object. This method can be used to handle the exception, + * such as logging error information, or stopping the connector's operation when an exception occurs. + * + * @param record The ConnectRecord object that was being processed when the exception occurred + */ + void onException(ConnectRecord record); + } diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java index 55c88ce55a..f70e77248e 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java +++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java @@ -22,6 +22,7 @@ import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader; import java.util.List; +import java.util.Map; import lombok.Data; @@ -35,6 +36,8 @@ public class SourceConnectorContext implements ConnectorContext { public SourceConfig sourceConfig; + public Map runtimeConfig; + // initial record position public List recordPositionList; diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendExceptionContext.java similarity index 90% rename from eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java rename to eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendExceptionContext.java index 0311ceaef5..974b19a547 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendExceptionContext.java @@ -15,15 +15,15 @@ * limitations under the License. */ -package org.apache.eventmesh.openconnect.api.callback; +package org.apache.eventmesh.openconnect.offsetmgmt.api.callback; -public class SendExcepionContext { +public class SendExceptionContext { private String messageId; private String topic; private Throwable cause; - public SendExcepionContext() { + public SendExceptionContext() { } public String getMessageId() { diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendMessageCallback.java similarity index 87% rename from eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java rename to eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendMessageCallback.java index fd6baba7ec..8346cf36b4 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendMessageCallback.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.openconnect.api.callback; +package org.apache.eventmesh.openconnect.offsetmgmt.api.callback; /** * Message sending callback interface. @@ -24,5 +24,5 @@ public interface SendMessageCallback { void onSuccess(SendResult sendResult); - void onException(SendExcepionContext sendExcepionContext); + void onException(SendExceptionContext sendExceptionContext); } diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendResult.java similarity index 95% rename from eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java rename to eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendResult.java index 8cd861f6de..9afc745f3d 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendResult.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.openconnect.api.callback; +package org.apache.eventmesh.openconnect.offsetmgmt.api.callback; public class SendResult { diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java index cda57e3758..b3fc4346c4 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java @@ -20,15 +20,19 @@ import org.apache.eventmesh.common.remote.offset.RecordOffset; import org.apache.eventmesh.common.remote.offset.RecordPartition; import org.apache.eventmesh.common.remote.offset.RecordPosition; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback; import java.util.Objects; import java.util.Set; +import java.util.UUID; /** * SourceDataEntries are generated by SourceTasks and passed to specific message queue to store. */ public class ConnectRecord { + private final String recordId = UUID.randomUUID().toString(); + private Long timestamp; private Object data; @@ -37,6 +41,8 @@ public class ConnectRecord { private KeyValue extensions; + private SendMessageCallback callback; + public ConnectRecord() { } @@ -57,6 +63,10 @@ public ConnectRecord(RecordPartition recordPartition, RecordOffset recordOffset, this.data = data; } + public String getRecordId() { + return recordId; + } + public Long getTimestamp() { return timestamp; } @@ -127,6 +137,14 @@ public Object getExtensionObj(String key) { return this.extensions.getObject(key); } + public SendMessageCallback getCallback() { + return callback; + } + + public void setCallback(SendMessageCallback callback) { + this.callback = callback; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -136,19 +154,20 @@ public boolean equals(Object o) { return false; } ConnectRecord that = (ConnectRecord) o; - return Objects.equals(timestamp, that.timestamp) && Objects.equals(data, that.data) + return Objects.equals(recordId, that.recordId) && Objects.equals(timestamp, that.timestamp) && Objects.equals(data, that.data) && Objects.equals(position, that.position) && Objects.equals(extensions, that.extensions); } @Override public int hashCode() { - return Objects.hash(timestamp, data, position, extensions); + return Objects.hash(recordId, timestamp, data, position, extensions); } @Override public String toString() { return "ConnectRecord{" - + "timestamp=" + timestamp + + "recordId=" + recordId + + ", timestamp=" + timestamp + ", data=" + data + ", position=" + position + ", extensions=" + extensions diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java index 0335a09568..6cd0452b83 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java @@ -38,12 +38,14 @@ import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.openconnect.api.ConnectorCreateService; -import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback; import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; import org.apache.eventmesh.openconnect.api.factory.ConnectorPluginFactory; import org.apache.eventmesh.openconnect.api.sink.Sink; import org.apache.eventmesh.openconnect.api.source.Source; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffsetManagement; import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.DefaultOffsetManagementServiceImpl; @@ -56,6 +58,7 @@ import org.apache.eventmesh.spi.EventMeshExtensionFactory; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; @@ -63,7 +66,6 @@ import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -207,6 +209,7 @@ private void initConnectorService() throws Exception { SourceConfig sourceConfig = (SourceConfig) ConfigUtil.parse(connectorRuntimeConfig.getSourceConnectorConfig(), sourceConnector.configClass()); SourceConnectorContext sourceConnectorContext = new SourceConnectorContext(); sourceConnectorContext.setSourceConfig(sourceConfig); + sourceConnectorContext.setRuntimeConfig(connectorRuntimeConfig.getRuntimeConfig()); sourceConnectorContext.setOffsetStorageReader(offsetStorageReader); if (CollectionUtils.isNotEmpty(jobResponse.getPosition())) { sourceConnectorContext.setRecordPositionList(jobResponse.getPosition()); @@ -332,15 +335,36 @@ private void startSourceConnector() throws Exception { reportVerifyRequest(record, connectorRuntimeConfig, ConnectorStage.SOURCE); } + // set a callback for this record + // if used the memory storage callback will be triggered after sink put success + record.setCallback(new SendMessageCallback() { + @Override + public void onSuccess(SendResult result) { + // commit record + sourceConnector.commit(record); + Optional submittedRecordPosition = prepareToUpdateRecordOffset(record); + submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack); + Optional callback = + Optional.ofNullable(record.getExtensionObj(CALLBACK_EXTENSION)).map(v -> (SendMessageCallback) v); + callback.ifPresent(cb -> cb.onSuccess(convertToSendResult(record))); + } + + @Override + public void onException(SendExceptionContext sendExceptionContext) { + // handle exception + sourceConnector.onException(record); + log.error("send record to sink callback exception, process will shut down, record: {}", record, + sendExceptionContext.getCause()); + try { + stop(); + } catch (Exception e) { + log.error("Failed to stop after exception", e); + } + } + }); + queue.put(record); - Optional submittedRecordPosition = prepareToUpdateRecordOffset(record); - Optional callback = - Optional.ofNullable(record.getExtensionObj(CALLBACK_EXTENSION)).map(v -> (SendMessageCallback) v); - // commit record - this.sourceConnector.commit(record); - submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack); - // TODO:finish the optional callback - // callback.ifPresent(cb -> cb.onSuccess(record)); + offsetManagement.awaitAllMessages(5000, TimeUnit.MILLISECONDS); // update & commit offset updateCommittableOffsets(); @@ -350,13 +374,20 @@ private void startSourceConnector() throws Exception { } } + private SendResult convertToSendResult(ConnectRecord record) { + SendResult result = new SendResult(); + result.setMessageId(record.getRecordId()); + if (StringUtils.isNotEmpty(record.getExtension("topic"))) { + result.setTopic(record.getExtension("topic")); + } + return result; + } + private void reportVerifyRequest(ConnectRecord record, ConnectorRuntimeConfig connectorRuntimeConfig, ConnectorStage connectorStage) { - UUID uuid = UUID.randomUUID(); - String recordId = uuid.toString(); String md5Str = md5(record.toString()); ReportVerifyRequest reportVerifyRequest = new ReportVerifyRequest(); reportVerifyRequest.setTaskID(connectorRuntimeConfig.getTaskID()); - reportVerifyRequest.setRecordID(recordId); + reportVerifyRequest.setRecordID(record.getRecordId()); reportVerifyRequest.setRecordSig(md5Str); reportVerifyRequest.setConnectorName( IPUtils.getLocalAddress() + "_" + connectorRuntimeConfig.getJobID() + "_" + connectorRuntimeConfig.getRegion()); diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java index 5a58cce08e..ab6fc3aaf5 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java @@ -37,6 +37,8 @@ public class ConnectorRuntimeConfig { private String region; + private Map runtimeConfig; + private String sourceConnectorType; private String sourceConnectorDesc; diff --git a/eventmesh-runtime-v2/src/main/resources/connector.yaml b/eventmesh-runtime-v2/src/main/resources/connector.yaml index bf7f58028b..2e79e5cedc 100644 --- a/eventmesh-runtime-v2/src/main/resources/connector.yaml +++ b/eventmesh-runtime-v2/src/main/resources/connector.yaml @@ -18,3 +18,4 @@ taskID: 1 jobID: 1 region: region1 +runtimeConfig: # this used for connector runtime config