Skip to content

Commit

Permalink
update code style
Browse files Browse the repository at this point in the history
  • Loading branch information
liukang committed Oct 11, 2024
1 parent 85e27ea commit 271b405
Showing 1 changed file with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.retry.api.conf.RetryConfiguration;
import org.apache.eventmesh.retry.api.strategy.RetryStrategy;

import java.util.Objects;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -47,22 +50,21 @@ private void sendMessageBack(final RetryConfiguration configuration) {
String bizSeqNo = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey())).toString();
String uniqueId = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey())).toString();
CloudEvent retryEvent = CloudEventBuilder.from(event)
// .withExtension(ProtocolKey.TOPIC, topic)
.withSubject(topic)
.build();
.withSubject(topic)
.build();
Producer producer = configuration.getProducer();
producer.publish(retryEvent, new SendCallback() {

@Override
public void onSuccess(SendResult sendResult) {
log.info("consumer:{} consume success,, bizSeqno:{}, uniqueId:{}",
consumerGroupName, bizSeqNo, uniqueId);
consumerGroupName, bizSeqNo, uniqueId);
}

@Override
public void onException(OnExceptionContext context) {
log.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}",
consumerGroupName, bizSeqNo, uniqueId, context.getException());
consumerGroupName, bizSeqNo, uniqueId, context.getException());
}
});
}
Expand Down

0 comments on commit 271b405

Please sign in to comment.