Skip to content

Commit

Permalink
[ISSUE apache#5048] Add report verify request to admin for connector …
Browse files Browse the repository at this point in the history
…runtime
  • Loading branch information
xwm1992 committed Jul 29, 2024
1 parent 33bbbe4 commit 2d45966
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.enums;

public enum ConnectorStage {
SOURCE,
SINK
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.remote.request;

import lombok.Data;
import lombok.EqualsAndHashCode;

@Data
@EqualsAndHashCode(callSuper = true)
public class ReportVerifyRequest extends BaseRemoteRequest{
private String taskID;

private String recordID;

private String recordSig;

private String connectorName;

private String connectorStage;

private String position;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import org.apache.eventmesh.common.config.connector.SinkConfig;
import org.apache.eventmesh.common.config.connector.SourceConfig;
import org.apache.eventmesh.common.config.connector.offset.OffsetStorageConfig;
import org.apache.eventmesh.common.enums.ConnectorStage;
import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc;
import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceBlockingStub;
import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceStub;
import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload;
import org.apache.eventmesh.common.remote.request.FetchJobRequest;
import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
import org.apache.eventmesh.common.remote.request.ReportVerifyRequest;
import org.apache.eventmesh.common.remote.response.FetchJobResponse;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.JsonUtils;
Expand All @@ -55,10 +57,13 @@

import org.apache.commons.collections4.CollectionUtils;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -281,8 +286,9 @@ public void start() throws Exception {
try {
this.stop();
} catch (Exception ex) {
throw new RuntimeException(ex);
log.error("Failed to stop after exception", ex);
}
throw new RuntimeException(e);
}
});
// start
Expand All @@ -294,8 +300,9 @@ public void start() throws Exception {
try {
this.stop();
} catch (Exception ex) {
throw new RuntimeException(ex);
log.error("Failed to stop after exception", ex);
}
throw new RuntimeException(e);
}
});
}
Expand All @@ -304,6 +311,8 @@ public void start() throws Exception {
public void stop() throws Exception {
sourceConnector.stop();
sinkConnector.stop();
sourceService.shutdown();
sinkService.shutdown();
heartBeatExecutor.shutdown();
requestObserver.onCompleted();
if (channel != null && !channel.isShutdown()) {
Expand All @@ -318,6 +327,11 @@ private void startSourceConnector() throws Exception {
// TODO: use producer pub record to storage replace below
if (connectorRecordList != null && !connectorRecordList.isEmpty()) {
for (ConnectRecord record : connectorRecordList) {
// if enabled incremental data reporting consistency check
if (connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) {
reportVerifyRequest(record, connectorRuntimeConfig, ConnectorStage.SOURCE);
}

queue.put(record);
Optional<RecordOffsetManagement.SubmittedPosition> submittedRecordPosition = prepareToUpdateRecordOffset(record);
Optional<SendMessageCallback> callback =
Expand All @@ -336,6 +350,43 @@ private void startSourceConnector() throws Exception {
}
}

private void reportVerifyRequest(ConnectRecord record, ConnectorRuntimeConfig connectorRuntimeConfig, ConnectorStage connectorStage) {
UUID uuid = UUID.randomUUID();
String recordId = uuid.toString();
String md5Str = md5(record.toString());
ReportVerifyRequest reportVerifyRequest = new ReportVerifyRequest();
reportVerifyRequest.setTaskID(connectorRuntimeConfig.getTaskID());
reportVerifyRequest.setRecordID(recordId);
reportVerifyRequest.setRecordSig(md5Str);
reportVerifyRequest.setConnectorName(
IPUtils.getLocalAddress() + "_" + connectorRuntimeConfig.getJobID() + "_" + connectorRuntimeConfig.getRegion());
reportVerifyRequest.setConnectorStage(connectorStage.name());
reportVerifyRequest.setPosition(JsonUtils.toJSONString(record.getPosition()));

Metadata metadata = Metadata.newBuilder().setType(ReportVerifyRequest.class.getSimpleName()).build();

Payload request = Payload.newBuilder().setMetadata(metadata)
.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportVerifyRequest))))
.build())
.build();

requestObserver.onNext(request);
}

private String md5(String input) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] messageDigest = md.digest(input.getBytes());
StringBuilder sb = new StringBuilder();
for (byte b : messageDigest) {
sb.append(String.format("%02x", b));
}
return sb.toString();
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}

public Optional<RecordOffsetManagement.SubmittedPosition> prepareToUpdateRecordOffset(ConnectRecord record) {
return Optional.of(this.offsetManagement.submitRecord(record.getPosition()));
}
Expand Down Expand Up @@ -426,6 +477,10 @@ private void startSinkConnector() throws Exception {
List<ConnectRecord> connectRecordList = new ArrayList<>();
connectRecordList.add(connectRecord);
sinkConnector.put(connectRecordList);
// if enabled incremental data reporting consistency check
if (connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) {
reportVerifyRequest(connectRecord, connectorRuntimeConfig, ConnectorStage.SINK);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ public class ConnectorRuntimeConfig {

private String connectorRuntimeInstanceId;

private String taskID;

private String jobID;

private String region;

private String sourceConnectorType;

private String sourceConnectorDesc;
Expand All @@ -45,4 +49,6 @@ public class ConnectorRuntimeConfig {

private Map<String, Object> sinkConnectorConfig;

public boolean enableIncrementalDataConsistencyCheck = true;

}
2 changes: 2 additions & 0 deletions eventmesh-runtime-v2/src/main/resources/connector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@
# limitations under the License.
#

taskID: 1
jobID: 1
region: region1

0 comments on commit 2d45966

Please sign in to comment.