Skip to content

Commit

Permalink
update canal sink connector
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed May 21, 2024
1 parent 832ddae commit c457fbf
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,32 @@

public class DatabaseConnection {

private static DruidDataSource sourceDataSource;
public static DruidDataSource sourceDataSource;

private static DruidDataSource sinkDataSource;
public static DruidDataSource sinkDataSource;

public static CanalSourceConfig sourceConfig;

public static CanalSinkConfig sinkConfig;

// public static void initSourceConnection() {
// sourceDataSource = new DruidDataSource();
// sourceDataSource.setUrl(sourceConfig.getSourceConnectorConfig().getUrl());
// sourceDataSource.setUsername(sourceConfig.getSourceConnectorConfig().getUserName());
// sourceDataSource.setPassword(sourceConfig.getSourceConnectorConfig().getPassWord());
// sourceDataSource.setInitialSize(5);
// sourceDataSource.setMinIdle(5);
// sourceDataSource.setMaxActive(20);
// sourceDataSource.setMaxWait(60000);
// sourceDataSource.setTimeBetweenEvictionRunsMillis(60000);
// sourceDataSource.setMinEvictableIdleTimeMillis(300000);
// sourceDataSource.setValidationQuery("SELECT 1");
// sourceDataSource.setTestWhileIdle(true);
// sourceDataSource.setTestOnBorrow(false);
// sourceDataSource.setTestOnReturn(false);
// sourceDataSource.setPoolPreparedStatements(true);
// sourceDataSource.setMaxPoolPreparedStatementPerConnectionSize(20);
// }
public static void initSourceConnection() {
sourceDataSource = new DruidDataSource();
sourceDataSource.setUrl(sourceConfig.getSourceConnectorConfig().getUrl());
sourceDataSource.setUsername(sourceConfig.getSourceConnectorConfig().getUserName());
sourceDataSource.setPassword(sourceConfig.getSourceConnectorConfig().getPassWord());
sourceDataSource.setInitialSize(5);
sourceDataSource.setMinIdle(5);
sourceDataSource.setMaxActive(20);
sourceDataSource.setMaxWait(60000);
sourceDataSource.setTimeBetweenEvictionRunsMillis(60000);
sourceDataSource.setMinEvictableIdleTimeMillis(300000);
sourceDataSource.setValidationQuery("SELECT 1");
sourceDataSource.setTestWhileIdle(true);
sourceDataSource.setTestOnBorrow(false);
sourceDataSource.setTestOnReturn(false);
sourceDataSource.setPoolPreparedStatements(true);
sourceDataSource.setMaxPoolPreparedStatementPerConnectionSize(20);
}

public static void initSinkConnection() {
sinkDataSource = new DruidDataSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,34 @@

import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkConfig;
import org.apache.eventmesh.connector.canal.CanalConnectRecord;
import org.apache.eventmesh.connector.canal.DatabaseConnection;
import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
import org.apache.eventmesh.openconnect.api.sink.Sink;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;


import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;

import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;

import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.StatementCallback;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CanalSinkConnector implements Sink, ConnectorCreateService<Sink> {

private CanalSinkConfig sinkConfig;

private JdbcTemplate jdbcTemplate;

@Override
public Class<? extends Config> configClass() {
return CanalSinkConfig.class;
Expand All @@ -47,14 +59,17 @@ public Class<? extends Config> configClass() {
@Override
public void init(Config config) throws Exception {
// init config for canal source connector
this.sinkConfig = (CanalSinkConfig)config;
this.sinkConfig = (CanalSinkConfig) config;
}

@Override
public void init(ConnectorContext connectorContext) throws Exception {
// init config for canal source connector
SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) connectorContext;
this.sinkConfig = (CanalSinkConfig)sinkConnectorContext.getSinkConfig();
this.sinkConfig = (CanalSinkConfig) sinkConnectorContext.getSinkConfig();
DatabaseConnection.sinkConfig = this.sinkConfig;
DatabaseConnection.initSinkConnection();
jdbcTemplate = new JdbcTemplate(DatabaseConnection.sinkDataSource);
}

@Override
Expand All @@ -80,14 +95,19 @@ public void stop() {
@Override
public void put(List<ConnectRecord> sinkRecords) {
for (ConnectRecord connectRecord : sinkRecords) {
List<CanalConnectRecord> canalConnectRecordList = (List<CanalConnectRecord>)connectRecord.getData();
for (CanalConnectRecord canalConnectRecord : canalConnectRecordList) {
if (sinkConfig.getSinkConnectorConfig().getSchemaName().equals(canalConnectRecord.getSchemaName()) &&
List<CanalConnectRecord> canalConnectRecordList = (List<CanalConnectRecord>) connectRecord.getData();
if (isDdlDatas(canalConnectRecordList)) {
doDdl(canalConnectRecordList);
} else {
for (CanalConnectRecord canalConnectRecord : canalConnectRecordList) {
if (sinkConfig.getSinkConnectorConfig().getSchemaName().equals(canalConnectRecord.getSchemaName()) &&
sinkConfig.getSinkConnectorConfig().getTableName().equals(canalConnectRecord.getTableName())) {


}
}
}

}
}

Expand All @@ -111,4 +131,40 @@ private boolean isDdlDatas(List<CanalConnectRecord> canalConnectRecordList) {
}
return result;
}

/**
* 执行ddl的调用,处理逻辑比较简单: 串行调用
*
* @param canalConnectRecordList
*/
private void doDdl(List<CanalConnectRecord> canalConnectRecordList) {
for (final CanalConnectRecord record : canalConnectRecordList) {
if (!sinkConfig.getSinkConnectorConfig().getSchemaName().equals(record.getSchemaName()) ||
!sinkConfig.getSinkConnectorConfig().getTableName().equals(record.getTableName())) {
continue;
}
try {
Boolean result = jdbcTemplate.execute(new StatementCallback<Boolean>() {

public Boolean doInStatement(Statement stmt) throws SQLException, DataAccessException {
boolean result = true;
if (StringUtils.isNotEmpty(record.getDdlSchemaName())) {
// 如果mysql,执行ddl时,切换到在源库执行的schema上
// result &= stmt.execute("use " +
// data.getDdlSchemaName());

// 解决当数据库名称为关键字如"Order"的时候,会报错,无法同步
result &= stmt.execute("use `" + record.getDdlSchemaName() + "`");
}
result &= stmt.execute(record.getSql());
return result;
}
});

} catch (Throwable e) {
throw new RuntimeException(e);
}

}
}
}

0 comments on commit c457fbf

Please sign in to comment.