diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java index 524ce51265..2b64f2eccd 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java @@ -11,32 +11,32 @@ public class DatabaseConnection { - private static DruidDataSource sourceDataSource; + public static DruidDataSource sourceDataSource; - private static DruidDataSource sinkDataSource; + public static DruidDataSource sinkDataSource; public static CanalSourceConfig sourceConfig; public static CanalSinkConfig sinkConfig; -// public static void initSourceConnection() { -// sourceDataSource = new DruidDataSource(); -// sourceDataSource.setUrl(sourceConfig.getSourceConnectorConfig().getUrl()); -// sourceDataSource.setUsername(sourceConfig.getSourceConnectorConfig().getUserName()); -// sourceDataSource.setPassword(sourceConfig.getSourceConnectorConfig().getPassWord()); -// sourceDataSource.setInitialSize(5); -// sourceDataSource.setMinIdle(5); -// sourceDataSource.setMaxActive(20); -// sourceDataSource.setMaxWait(60000); -// sourceDataSource.setTimeBetweenEvictionRunsMillis(60000); -// sourceDataSource.setMinEvictableIdleTimeMillis(300000); -// sourceDataSource.setValidationQuery("SELECT 1"); -// sourceDataSource.setTestWhileIdle(true); -// sourceDataSource.setTestOnBorrow(false); -// sourceDataSource.setTestOnReturn(false); -// sourceDataSource.setPoolPreparedStatements(true); -// sourceDataSource.setMaxPoolPreparedStatementPerConnectionSize(20); -// } + public static void initSourceConnection() { + sourceDataSource = new DruidDataSource(); + sourceDataSource.setUrl(sourceConfig.getSourceConnectorConfig().getUrl()); + sourceDataSource.setUsername(sourceConfig.getSourceConnectorConfig().getUserName()); + sourceDataSource.setPassword(sourceConfig.getSourceConnectorConfig().getPassWord()); + sourceDataSource.setInitialSize(5); + sourceDataSource.setMinIdle(5); + sourceDataSource.setMaxActive(20); + sourceDataSource.setMaxWait(60000); + sourceDataSource.setTimeBetweenEvictionRunsMillis(60000); + sourceDataSource.setMinEvictableIdleTimeMillis(300000); + sourceDataSource.setValidationQuery("SELECT 1"); + sourceDataSource.setTestWhileIdle(true); + sourceDataSource.setTestOnBorrow(false); + sourceDataSource.setTestOnReturn(false); + sourceDataSource.setPoolPreparedStatements(true); + sourceDataSource.setMaxPoolPreparedStatementPerConnectionSize(20); + } public static void initSinkConnection() { sinkDataSource = new DruidDataSource(); diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java index cf16598947..7911b73fa8 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java @@ -23,6 +23,7 @@ import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkConfig; import org.apache.eventmesh.connector.canal.CanalConnectRecord; +import org.apache.eventmesh.connector.canal.DatabaseConnection; import org.apache.eventmesh.openconnect.api.ConnectorCreateService; import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; @@ -30,8 +31,17 @@ import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.exception.ExceptionUtils; + +import java.sql.SQLException; +import java.sql.Statement; import java.util.List; +import org.springframework.dao.DataAccessException; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.StatementCallback; + import lombok.extern.slf4j.Slf4j; @Slf4j @@ -39,6 +49,8 @@ public class CanalSinkConnector implements Sink, ConnectorCreateService { private CanalSinkConfig sinkConfig; + private JdbcTemplate jdbcTemplate; + @Override public Class configClass() { return CanalSinkConfig.class; @@ -47,14 +59,17 @@ public Class configClass() { @Override public void init(Config config) throws Exception { // init config for canal source connector - this.sinkConfig = (CanalSinkConfig)config; + this.sinkConfig = (CanalSinkConfig) config; } @Override public void init(ConnectorContext connectorContext) throws Exception { // init config for canal source connector SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) connectorContext; - this.sinkConfig = (CanalSinkConfig)sinkConnectorContext.getSinkConfig(); + this.sinkConfig = (CanalSinkConfig) sinkConnectorContext.getSinkConfig(); + DatabaseConnection.sinkConfig = this.sinkConfig; + DatabaseConnection.initSinkConnection(); + jdbcTemplate = new JdbcTemplate(DatabaseConnection.sinkDataSource); } @Override @@ -80,14 +95,19 @@ public void stop() { @Override public void put(List sinkRecords) { for (ConnectRecord connectRecord : sinkRecords) { - List canalConnectRecordList = (List)connectRecord.getData(); - for (CanalConnectRecord canalConnectRecord : canalConnectRecordList) { - if (sinkConfig.getSinkConnectorConfig().getSchemaName().equals(canalConnectRecord.getSchemaName()) && + List canalConnectRecordList = (List) connectRecord.getData(); + if (isDdlDatas(canalConnectRecordList)) { + doDdl(canalConnectRecordList); + } else { + for (CanalConnectRecord canalConnectRecord : canalConnectRecordList) { + if (sinkConfig.getSinkConnectorConfig().getSchemaName().equals(canalConnectRecord.getSchemaName()) && sinkConfig.getSinkConnectorConfig().getTableName().equals(canalConnectRecord.getTableName())) { + } } } + } } @@ -111,4 +131,40 @@ private boolean isDdlDatas(List canalConnectRecordList) { } return result; } + + /** + * 执行ddl的调用,处理逻辑比较简单: 串行调用 + * + * @param canalConnectRecordList + */ + private void doDdl(List canalConnectRecordList) { + for (final CanalConnectRecord record : canalConnectRecordList) { + if (!sinkConfig.getSinkConnectorConfig().getSchemaName().equals(record.getSchemaName()) || + !sinkConfig.getSinkConnectorConfig().getTableName().equals(record.getTableName())) { + continue; + } + try { + Boolean result = jdbcTemplate.execute(new StatementCallback() { + + public Boolean doInStatement(Statement stmt) throws SQLException, DataAccessException { + boolean result = true; + if (StringUtils.isNotEmpty(record.getDdlSchemaName())) { + // 如果mysql,执行ddl时,切换到在源库执行的schema上 + // result &= stmt.execute("use " + + // data.getDdlSchemaName()); + + // 解决当数据库名称为关键字如"Order"的时候,会报错,无法同步 + result &= stmt.execute("use `" + record.getDdlSchemaName() + "`"); + } + result &= stmt.execute(record.getSql()); + return result; + } + }); + + } catch (Throwable e) { + throw new RuntimeException(e); + } + + } + } }