Skip to content

Commit

Permalink
update canal source connector
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed May 16, 2024
1 parent d1c4689 commit 76ead29
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package org.apache.eventmesh.common.config.connector.rdb.canal;

import org.apache.eventmesh.common.config.connector.SourceConfig;
import org.apache.eventmesh.common.remote.offset.RecordPosition;

import java.util.List;

import lombok.Data;
import lombok.EqualsAndHashCode;
Expand All @@ -23,9 +26,11 @@ public class CanalSourceConfig extends SourceConfig {

private Short clientId;

private Integer batchSize;
private Integer batchSize = 10000;

private Long batchTimeout = -1L;

private Long batchTimeout;
private List<RecordPosition> recordPositions;

private SourceConnectorConfig sourceConnectorConfig;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,11 @@

public class RecordPartition {

// private Map<String, ?> partitionMap = new HashMap<>();

private Class<? extends RecordPartition> clazz;

public RecordPartition() {

}
// public RecordPartition(Map<String, ?> partition) {
// this.partitionMap = partition;
// }
//
// public Map<String, ?> getPartitionMap() {
// return partitionMap;
// }

public Class<? extends RecordPartition> getRecordPartitionClass() {
return RecordPartition.class;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,31 @@

import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceConfig;
import org.apache.eventmesh.common.remote.offset.RecordPosition;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.connector.canal.DatabaseConnection;
import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordOffset;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal.CanalRecordPartition;
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;


import org.springframework.util.CollectionUtils;

import com.alibaba.otter.canal.common.CanalException;
import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
Expand All @@ -50,10 +61,14 @@
import com.alibaba.otter.canal.parse.ha.CanalHAController;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.alibaba.otter.canal.sink.AbstractCanalEventSink;
import com.alibaba.otter.canal.sink.CanalEventSink;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -72,6 +87,8 @@ public class CanalSourceConnector implements Source, ConnectorCreateService<Sour

private volatile boolean running = false;

private static final int maxEmptyTimes = 10;

@Override
public Class<? extends Config> configClass() {
return CanalSourceConfig.class;
Expand All @@ -93,6 +110,7 @@ public void init(ConnectorContext connectorContext) throws Exception {
DatabaseConnection.initSourceConnection();

canalServer = CanalServerWithEmbedded.instance();

canalServer.setCanalInstanceGenerator(new CanalInstanceGenerator() {
@Override
public CanalInstance generate(String destination) {
Expand Down Expand Up @@ -153,9 +171,24 @@ private Canal buildCanal(CanalSourceConfig sourceConfig) {
sourceConfig.getSourceConnectorConfig().getDbPort())));
parameter.setDbUsername(sourceConfig.getSourceConnectorConfig().getUserName());
parameter.setDbPassword(sourceConfig.getSourceConnectorConfig().getPassWord());
// parameter.setPositions();
// Arrays.asList("{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}",
// "{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}")

// check positions
// example: Arrays.asList("{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}",
// "{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}")
if (sourceConfig.getRecordPositions() != null && !sourceConfig.getRecordPositions().isEmpty()) {
List<RecordPosition> recordPositions = sourceConfig.getRecordPositions();
List<String> positions = new ArrayList<>();
recordPositions.forEach(recordPosition -> {
Map<String, Object> recordPositionMap = new HashMap<>();
CanalRecordPartition canalRecordPartition = (CanalRecordPartition)(recordPosition.getPartition());
CanalRecordOffset canalRecordOffset = (CanalRecordOffset)(recordPosition.getOffset());
recordPositionMap.put("journalName", canalRecordPartition.getJournalName());
recordPositionMap.put("timestamp", canalRecordPartition.getTimeStamp());
recordPositionMap.put("position", canalRecordOffset.getOffset());
positions.add(JsonUtils.toJSONString(recordPositionMap));
});
parameter.setPositions(positions);
}

parameter.setSlaveId(slaveId);

Expand Down Expand Up @@ -204,14 +237,85 @@ public String name() {

@Override
public void stop() {

if (!running) {
return;
}
running = false;
canalServer.stop(sourceConfig.getDestination());
canalServer.stop();
}

@Override
public List<ConnectRecord> poll() {
int emptyTimes = 0;
com.alibaba.otter.canal.protocol.Message message = null;
if (sourceConfig.getBatchTimeout() < 0) {// perform polling
while (running) {
message = canalServer.getWithoutAck(clientIdentity, sourceConfig.getBatchSize());
if (message == null || message.getId() == -1L) { // empty
applyWait(emptyTimes++);
} else {
break;
}
}
} else { // perform with timeout
while (running) {
message = canalServer.getWithoutAck(clientIdentity, sourceConfig.getBatchSize(), sourceConfig.getBatchTimeout(), TimeUnit.MILLISECONDS);
if (message == null || message.getId() == -1L) { // empty
continue;
}
break;
}
}

List<Entry> entries;
assert message != null;
if (message.isRaw()) {
entries = new ArrayList<>(message.getRawEntries().size());
for (ByteString entry : message.getRawEntries()) {
try {
entries.add(CanalEntry.Entry.parseFrom(entry));
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
} else {
entries = message.getEntries();
}

// List<EventData> eventDatas = messageParser.parse(pipelineId, entries); // 过滤事务头/尾和回环数据
// Message<EventData> result = new Message<EventData>(message.getId(), eventDatas);
// // 更新一下最后的entry时间,包括被过滤的数据
// if (!CollectionUtils.isEmpty(entries)) {
// long lastEntryTime = entries.get(entries.size() - 1).getHeader().getExecuteTime();
// if (lastEntryTime > 0) {// oracle的时间可能为0
// this.lastEntryTime = lastEntryTime;
// }
// }
//
// if (dump && logger.isInfoEnabled()) {
// String startPosition = null;
// String endPosition = null;
// if (!CollectionUtils.isEmpty(entries)) {
// startPosition = buildPositionForDump(entries.get(0));
// endPosition = buildPositionForDump(entries.get(entries.size() - 1));
// }
//
// dumpMessages(result, startPosition, endPosition, entries.size());// 记录一下,方便追查问题
// }
return null;
}

// 处理无数据的情况,避免空循环挂死
private void applyWait(int emptyTimes) {
int newEmptyTimes = Math.min(emptyTimes, maxEmptyTimes);
if (emptyTimes <= 3) { // 3次以内
Thread.yield();
} else { // 超过3次,最多只sleep 10ms
LockSupport.parkNanos(1000 * 1000L * newEmptyTimes);
}
}


@Override
public Source create() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal;

import org.apache.eventmesh.common.remote.offset.RecordOffset;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

@EqualsAndHashCode(callSuper = true)
@Data
@ToString
public class CanalRecordOffset extends RecordOffset {

private Long offset;

public CanalRecordOffset() {

}

@Override
public Class<? extends RecordOffset> getRecordOffsetClass() {
return CanalRecordOffset.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.openconnect.offsetmgmt.api.data.canal;

import org.apache.eventmesh.common.remote.offset.RecordPartition;

import java.util.Objects;

import lombok.Data;
import lombok.ToString;


@Data
@ToString
public class CanalRecordPartition extends RecordPartition {

private String journalName;

private Long timeStamp;

public CanalRecordPartition() {
super();
}

public Class<? extends RecordPartition> getRecordPartitionClass() {
return CanalRecordPartition.class;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CanalRecordPartition that = (CanalRecordPartition) o;
return Objects.equals(journalName, that.journalName) && Objects.equals(timeStamp, that.timeStamp);
}

@Override
public int hashCode() {
return Objects.hash(journalName, timeStamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ public RocketMQRecordOffset() {

}

// public RocketMQRecordOffset(Map<String, ?> offset) {
// super(offset);
// }

@Override
public Class<? extends RecordOffset> getRecordOffsetClass() {
return RocketMQRecordOffset.class;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import lombok.ToString;


@EqualsAndHashCode(callSuper = true)
@Data
@ToString
public class RocketMQRecordPartition extends RecordPartition {
Expand All @@ -49,10 +48,6 @@ public RocketMQRecordPartition() {
super();
}

// public RocketMQRecordPartition(Map<String, ?> partition) {
// super(partition);
// }

public Class<? extends RecordPartition> getRecordPartitionClass() {
return RocketMQRecordPartition.class;
}
Expand Down

0 comments on commit 76ead29

Please sign in to comment.