Skip to content

Commit

Permalink
[ISSUE apache#4979]Canal Connector supports bidirectional data synchr…
Browse files Browse the repository at this point in the history
…onization
  • Loading branch information
xwm1992 committed Jun 5, 2024
1 parent bfafbfb commit 3550ac3
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,20 @@
@EqualsAndHashCode(callSuper = true)
public class CanalSinkConfig extends SinkConfig {

private Integer batchSize = 50; // batchSize
// batchSize
private Integer batchSize = 50;

private Boolean useBatch = true; // enable batch
// enable batch
private Boolean useBatch = true;

private Integer poolSize = 5; // sink thread size for single channel
// sink thread size for single channel
private Integer poolSize = 5;

private SyncMode syncMode; // sync mode: field/row
// sync mode: field/row
private SyncMode syncMode;

private Boolean skipException = false; // skip sink process exception
// skip sink process exception
private Boolean skipException = false;

public SinkConnectorConfig sinkConnectorConfig;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,23 @@ public class CanalSourceConfig extends SourceConfig {
// ================================= channel parameter
// ================================

private Boolean enableRemedy = false; // enable remedy
// enable remedy
private Boolean enableRemedy = false;

private SyncMode syncMode; // sync mode: field/row
// sync mode: field/row
private SyncMode syncMode;

private SyncConsistency syncConsistency; // sync consistency
// sync consistency
private SyncConsistency syncConsistency;

// ================================= system parameter
// ================================

private String systemSchema; // Default is retl
// Column name of the bidirectional synchronization mark
private String needSyncMarkTableColumnName = "needSync";

private String systemMarkTable; // Bidirectional synchronization mark table

private String systemMarkTableColumn; // Column name of the bidirectional synchronization mark

private String systemMarkTableInfo;
// nfo information of the bidirectional synchronization mark, similar to BI_SYNC

private String systemBufferTable; // sync buffer table

private String systemDualTable; // sync heartbeat table
// Column value of the bidirectional synchronization mark
private String needSyncMarkTableColumnValue = "needSync";

private SourceConnectorConfig sourceConnectorConfig;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,11 @@

public class MysqlDialect extends AbstractDbDialect {

private Map<List<String>, String> shardColumns;

public MysqlDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler) {
super(jdbcTemplate, lobHandler);
sqlTemplate = new MysqlSqlTemplate();
}

public MysqlDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler, String name, String databaseVersion,
int majorVersion, int minorVersion) {
super(jdbcTemplate, lobHandler, name, majorVersion, minorVersion);
sqlTemplate = new MysqlSqlTemplate();
}

public boolean isCharSpacePadded() {
return false;
}
Expand All @@ -66,16 +58,8 @@ public boolean isDRDS() {
return false;
}

public String getShardColumns(String schema, String table) {
if (isDRDS()) {
return shardColumns.get(Arrays.asList(schema, table));
} else {
return null;
}
}

public String getDefaultCatalog() {
return (String) jdbcTemplate.queryForObject("select database()", String.class);
return jdbcTemplate.queryForObject("select database()", String.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@

import org.springframework.util.CollectionUtils;

import lombok.Getter;
import lombok.Setter;

/**
* compute latest sql
*/
public class SqlBuilderLoadInterceptor {

@Getter
@Setter
private DbDialect dbDialect;

public boolean before(CanalSinkConfig sinkConfig, CanalConnectRecord record) {
Expand Down Expand Up @@ -128,12 +133,4 @@ private String[] buildColumnNames(List<EventColumn> columns1, List<EventColumn>
}
return result;
}

public DbDialect getDbDialect() {
return dbDialect;
}

public void setDbDialect(DbDialect dbDialect) {
this.dbDialect = dbDialect;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.eventmesh.connector.canal.model.EventColumnIndexComparable;
import org.apache.eventmesh.connector.canal.model.EventType;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.LinkedHashMap;
Expand All @@ -49,57 +49,67 @@ public class EntryParser {
public List<CanalConnectRecord> parse(CanalSourceConfig sourceConfig, List<Entry> datas) {
List<CanalConnectRecord> recordList = new ArrayList<>();
List<Entry> transactionDataBuffer = new ArrayList<>();
// need check weather the entry is loopback
boolean needSync;
try {
for (Entry entry : datas) {
switch (entry.getEntryType()) {
case TRANSACTIONBEGIN:
break;
case ROWDATA:
transactionDataBuffer.add(entry);
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
needSync = checkNeedSync(sourceConfig, rowChange.getRowDatas(0));
if (needSync) {
transactionDataBuffer.add(entry);
}
break;
case TRANSACTIONEND:
for (Entry bufferEntry : transactionDataBuffer) {
List<CanalConnectRecord> recordParsedList = internParse(sourceConfig, bufferEntry);
if (CollectionUtils.isEmpty(recordParsedList)) {
continue;
}
long totalSize = bufferEntry.getHeader().getEventLength();
long eachSize = totalSize / recordParsedList.size();
for (CanalConnectRecord record : recordParsedList) {
if (record == null) {
continue;
}
record.setSize(eachSize);
recordList.add(record);
}
}
parseRecordListWithEntryBuffer(sourceConfig, recordList, transactionDataBuffer);
transactionDataBuffer.clear();
break;
default:
break;
}
}
parseRecordListWithEntryBuffer(sourceConfig, recordList, transactionDataBuffer);
} catch (Exception e) {
throw new RuntimeException(e);
}
return recordList;
}

for (Entry bufferEntry : transactionDataBuffer) {
List<CanalConnectRecord> recordParsedList = internParse(sourceConfig, bufferEntry);
if (CollectionUtils.isEmpty(recordParsedList)) {
private void parseRecordListWithEntryBuffer(CanalSourceConfig sourceConfig, List<CanalConnectRecord> recordList,
List<Entry> transactionDataBuffer) {
for (Entry bufferEntry : transactionDataBuffer) {
List<CanalConnectRecord> recordParsedList = internParse(sourceConfig, bufferEntry);
if (CollectionUtils.isEmpty(recordParsedList)) {
continue;
}
long totalSize = bufferEntry.getHeader().getEventLength();
long eachSize = totalSize / recordParsedList.size();
for (CanalConnectRecord record : recordParsedList) {
if (record == null) {
continue;
}
record.setSize(eachSize);
recordList.add(record);
}
}
}

long totalSize = bufferEntry.getHeader().getEventLength();
long eachSize = totalSize / recordParsedList.size();
for (CanalConnectRecord record : recordParsedList) {
if (record == null) {
continue;
}
record.setSize(eachSize);
recordList.add(record);
}
private boolean checkNeedSync(CanalSourceConfig sourceConfig, RowData rowData) {
Column markedColumn = getColumnIgnoreCase(rowData.getAfterColumnsList(), sourceConfig.getNeedSyncMarkTableColumnName());
if (markedColumn != null) {
return StringUtils.equalsIgnoreCase(markedColumn.getValue(), sourceConfig.getNeedSyncMarkTableColumnValue());
}
return false;
}

private Column getColumnIgnoreCase(List<Column> columns, String columName) {
for (Column column : columns) {
if (column.getName().equalsIgnoreCase(columName)) {
return column;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return recordList;
return null;
}

private List<CanalConnectRecord> internParse(CanalSourceConfig sourceConfig, Entry entry) {
Expand Down Expand Up @@ -127,20 +137,9 @@ private List<CanalConnectRecord> internParse(CanalSourceConfig sourceConfig, Ent
return null;
}

if (StringUtils.equalsIgnoreCase(sourceConfig.getSystemSchema(), schemaName)) {
// do noting
if (eventType.isDdl()) {
return null;
}

if (StringUtils.equalsIgnoreCase(sourceConfig.getSystemDualTable(), tableName)) {
return null;
}
} else {
if (eventType.isDdl()) {
log.warn("unsupported ddl event type: {}", eventType);
return null;
}
if (eventType.isDdl()) {
log.warn("unsupported ddl event type: {}", eventType);
return null;
}

List<CanalConnectRecord> recordList = new ArrayList<>();
Expand All @@ -164,13 +163,12 @@ private CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entr

List<Column> beforeColumns = rowData.getBeforeColumnsList();
List<Column> afterColumns = rowData.getAfterColumnsList();
String tableName = canalConnectRecord.getSchemaName() + "." + canalConnectRecord.getTableName();

boolean isRowMode = canalSourceConfig.getSyncMode().isRow();

Map<String, EventColumn> keyColumns = new LinkedHashMap<String, EventColumn>();
Map<String, EventColumn> oldKeyColumns = new LinkedHashMap<String, EventColumn>();
Map<String, EventColumn> notKeyColumns = new LinkedHashMap<String, EventColumn>();
Map<String, EventColumn> keyColumns = new LinkedHashMap<>();
Map<String, EventColumn> oldKeyColumns = new LinkedHashMap<>();
Map<String, EventColumn> notKeyColumns = new LinkedHashMap<>();

if (eventType.isInsert()) {
for (Column column : afterColumns) {
Expand All @@ -195,7 +193,7 @@ private CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entr
keyColumns.put(column.getName(), copyEventColumn(column, true));
} else {
if (isRowMode && entry.getHeader().getSourceType() == CanalEntry.Type.ORACLE) {
notKeyColumns.put(column.getName(), copyEventColumn(column, isRowMode));
notKeyColumns.put(column.getName(), copyEventColumn(column, true));
}
}
}
Expand Down Expand Up @@ -233,7 +231,7 @@ private CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entr
}
canalConnectRecord.setColumns(columns);
} else {
throw new RuntimeException("this row data has no pks , entry: " + entry.toString() + " and rowData: "
throw new RuntimeException("this row data has no pks , entry: " + entry + " and rowData: "
+ rowData);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public String getSelectSql(String schemaName, String tableName, String[] pkNames
}

sql.append(" from ").append(getFullName(schemaName, tableName)).append(" where ( ");
appendColumnEquals(sql, pkNames, "and");
appendColumnEquals(sql, pkNames);
sql.append(" ) ");
return sql.toString().intern();
}
Expand All @@ -41,7 +41,7 @@ public String getUpdateSql(String schemaName, String tableName, String[] pkNames
StringBuilder sql = new StringBuilder("update " + getFullName(schemaName, tableName) + " set ");
appendExcludeSingleShardColumnEquals(sql, columnNames, ",", updatePks, shardColumn);
sql.append(" where (");
appendColumnEquals(sql, pkNames, "and");
appendColumnEquals(sql, pkNames);
sql.append(")");
return sql.toString().intern();
}
Expand All @@ -65,7 +65,7 @@ public String getInsertSql(String schemaName, String tableName, String[] pkNames

public String getDeleteSql(String schemaName, String tableName, String[] pkNames) {
StringBuilder sql = new StringBuilder("delete from " + getFullName(schemaName, tableName) + " where ");
appendColumnEquals(sql, pkNames, "and");
appendColumnEquals(sql, pkNames);
return sql.toString().intern();
}

Expand All @@ -91,12 +91,12 @@ protected void appendColumnQuestions(StringBuilder sql, String[] columns) {
}
}

protected void appendColumnEquals(StringBuilder sql, String[] columns, String separator) {
protected void appendColumnEquals(StringBuilder sql, String[] columns) {
int size = columns.length;
for (int i = 0; i < size; i++) {
sql.append(" ").append(appendEscape(columns[i])).append(" = ").append("? ");
if (i != size - 1) {
sql.append(separator);
sql.append("and");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public String getMergeSql(String schemaName, String tableName, String[] pkNames,

size = columnNames.length;
for (int i = 0; i < size; i++) {
if (!includePks && shardColumn != null && columnNames[i].equals(shardColumn)) {
if (!includePks && columnNames[i].equals(shardColumn)) {
continue;
}

Expand Down
Loading

0 comments on commit 3550ac3

Please sign in to comment.