diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/enums/ConnectorStage.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/enums/ConnectorStage.java new file mode 100644 index 0000000000..90265fba4a --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/enums/ConnectorStage.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.enums; + +public enum ConnectorStage { + SOURCE, + SINK +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java new file mode 100644 index 0000000000..f02b8aaf35 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.request; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +public class ReportVerifyRequest extends BaseRemoteRequest{ + private String taskID; + + private String recordID; + + private String recordSig; + + private String connectorName; + + private String connectorStage; + + private String position; +} diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java index 65676903dd..1605319862 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java @@ -25,6 +25,7 @@ import org.apache.eventmesh.common.config.connector.SinkConfig; import org.apache.eventmesh.common.config.connector.SourceConfig; import org.apache.eventmesh.common.config.connector.offset.OffsetStorageConfig; +import org.apache.eventmesh.common.enums.ConnectorStage; import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc; import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceBlockingStub; import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceStub; @@ -32,6 +33,7 @@ import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; import org.apache.eventmesh.common.remote.request.FetchJobRequest; import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest; +import org.apache.eventmesh.common.remote.request.ReportVerifyRequest; import org.apache.eventmesh.common.remote.response.FetchJobResponse; import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.JsonUtils; @@ -55,10 +57,13 @@ import org.apache.commons.collections4.CollectionUtils; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -281,8 +286,9 @@ public void start() throws Exception { try { this.stop(); } catch (Exception ex) { - throw new RuntimeException(ex); + log.error("Failed to stop after exception", ex); } + throw new RuntimeException(e); } }); // start @@ -294,8 +300,9 @@ public void start() throws Exception { try { this.stop(); } catch (Exception ex) { - throw new RuntimeException(ex); + log.error("Failed to stop after exception", ex); } + throw new RuntimeException(e); } }); } @@ -304,6 +311,8 @@ public void start() throws Exception { public void stop() throws Exception { sourceConnector.stop(); sinkConnector.stop(); + sourceService.shutdown(); + sinkService.shutdown(); heartBeatExecutor.shutdown(); requestObserver.onCompleted(); if (channel != null && !channel.isShutdown()) { @@ -318,6 +327,11 @@ private void startSourceConnector() throws Exception { // TODO: use producer pub record to storage replace below if (connectorRecordList != null && !connectorRecordList.isEmpty()) { for (ConnectRecord record : connectorRecordList) { + // if enabled incremental data reporting consistency check + if (connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) { + reportVerifyRequest(record, connectorRuntimeConfig, ConnectorStage.SOURCE); + } + queue.put(record); Optional submittedRecordPosition = prepareToUpdateRecordOffset(record); Optional callback = @@ -336,6 +350,43 @@ private void startSourceConnector() throws Exception { } } + private void reportVerifyRequest(ConnectRecord record, ConnectorRuntimeConfig connectorRuntimeConfig, ConnectorStage connectorStage) { + UUID uuid = UUID.randomUUID(); + String recordId = uuid.toString(); + String md5Str = md5(record.toString()); + ReportVerifyRequest reportVerifyRequest = new ReportVerifyRequest(); + reportVerifyRequest.setTaskID(connectorRuntimeConfig.getTaskID()); + reportVerifyRequest.setRecordID(recordId); + reportVerifyRequest.setRecordSig(md5Str); + reportVerifyRequest.setConnectorName( + IPUtils.getLocalAddress() + "_" + connectorRuntimeConfig.getJobID() + "_" + connectorRuntimeConfig.getRegion()); + reportVerifyRequest.setConnectorStage(connectorStage.name()); + reportVerifyRequest.setPosition(JsonUtils.toJSONString(record.getPosition())); + + Metadata metadata = Metadata.newBuilder().setType(ReportVerifyRequest.class.getSimpleName()).build(); + + Payload request = Payload.newBuilder().setMetadata(metadata) + .setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportVerifyRequest)))) + .build()) + .build(); + + requestObserver.onNext(request); + } + + private String md5(String input) { + try { + MessageDigest md = MessageDigest.getInstance("MD5"); + byte[] messageDigest = md.digest(input.getBytes()); + StringBuilder sb = new StringBuilder(); + for (byte b : messageDigest) { + sb.append(String.format("%02x", b)); + } + return sb.toString(); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } + public Optional prepareToUpdateRecordOffset(ConnectRecord record) { return Optional.of(this.offsetManagement.submitRecord(record.getPosition())); } @@ -426,6 +477,10 @@ private void startSinkConnector() throws Exception { List connectRecordList = new ArrayList<>(); connectRecordList.add(connectRecord); sinkConnector.put(connectRecordList); + // if enabled incremental data reporting consistency check + if (connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) { + reportVerifyRequest(connectRecord, connectorRuntimeConfig, ConnectorStage.SINK); + } } } } diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java index 901defc47d..5a58cce08e 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java @@ -31,8 +31,12 @@ public class ConnectorRuntimeConfig { private String connectorRuntimeInstanceId; + private String taskID; + private String jobID; + private String region; + private String sourceConnectorType; private String sourceConnectorDesc; @@ -45,4 +49,6 @@ public class ConnectorRuntimeConfig { private Map sinkConnectorConfig; + public boolean enableIncrementalDataConsistencyCheck = true; + } diff --git a/eventmesh-runtime-v2/src/main/resources/connector.yaml b/eventmesh-runtime-v2/src/main/resources/connector.yaml index bc7bc20756..bf7f58028b 100644 --- a/eventmesh-runtime-v2/src/main/resources/connector.yaml +++ b/eventmesh-runtime-v2/src/main/resources/connector.yaml @@ -15,4 +15,6 @@ # limitations under the License. # +taskID: 1 jobID: 1 +region: region1