Skip to content

Commit

Permalink
Refactor handlers.
Browse files Browse the repository at this point in the history
  • Loading branch information
ddubyk committed Aug 28, 2024
1 parent b585404 commit 841e74f
Show file tree
Hide file tree
Showing 17 changed files with 1,230 additions and 842 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,48 +31,47 @@ public class PrebidServerResponseBuilder {
private static final String HEADER_CONNECTION_KEEPALIVE = "keep-alive";
private static final String HEADER_CONNECTION_CLOSE = "close";

public Mono<ServerResponse> createResponseMono(final ServerRequest request,
final MediaType mediaType,
final PayloadWrapper wrapper) {
public Mono<ServerResponse> createResponseMono(ServerRequest request,
MediaType mediaType,
PayloadWrapper wrapper) {

return ok(request, mediaType).body(fromValue(wrapper.getPayload().getValue()));
}

public Mono<ServerResponse> createResponseMono(final ServerRequest request,
final MediaType mediaType,
final ResponseObject response) {
public Mono<ServerResponse> createResponseMono(ServerRequest request,
MediaType mediaType,
ResponseObject response) {

return ok(request, mediaType).body(fromValue(response));
}

private ServerResponse.BodyBuilder ok(final ServerRequest request, final MediaType mediaType) {
private ServerResponse.BodyBuilder ok(ServerRequest request, MediaType mediaType) {
final String now = ZonedDateTime.now().format(DateTimeFormatter.RFC_1123_DATE_TIME);
ServerResponse.BodyBuilder builder = ServerResponse.ok()
.contentType(mediaType)
.header(HttpHeaders.DATE, now)
.varyBy(HttpHeaders.ACCEPT_ENCODING)
.cacheControl(CacheControl.noCache());
.contentType(mediaType)
.header(HttpHeaders.DATE, now)
.varyBy(HttpHeaders.ACCEPT_ENCODING)
.cacheControl(CacheControl.noCache());
applyHeaders(builder, request);
return builder;
}

public <T extends Throwable> Mono<ServerResponse> error(final Mono<T> monoError,
final ServerRequest request) {
public <T extends Throwable> Mono<ServerResponse> error(Mono<T> monoError, ServerRequest request) {
return monoError.transform(ThrowableTranslator::translate)
.flatMap(translation ->
addHeaders(status(translation.getHttpStatus()), request)
.body(Mono.just(
ErrorResponse.builder()
.error(translation.getHttpStatus().getReasonPhrase())
.status(translation.getHttpStatus().value())
.path(request.path())
.message(translation.getErrorMessage())
.timestamp(new Date())
.build()),
ErrorResponse.class)
);
ErrorResponse.builder()
.error(translation.getHttpStatus().getReasonPhrase())
.status(translation.getHttpStatus().value())
.path(request.path())
.message(translation.getErrorMessage())
.timestamp(new Date())
.build()),
ErrorResponse.class));
}

private static ServerResponse.BodyBuilder addHeaders(final ServerResponse.BodyBuilder builder,
final ServerRequest request) {
private static ServerResponse.BodyBuilder addHeaders(ServerResponse.BodyBuilder builder, ServerRequest request) {
ServerResponse.BodyBuilder headers =
builder.header(HttpHeaders.DATE, ZonedDateTime.now().format(DateTimeFormatter.RFC_1123_DATE_TIME))
.varyBy(HttpHeaders.ACCEPT_ENCODING)
Expand All @@ -81,9 +80,7 @@ private static ServerResponse.BodyBuilder addHeaders(final ServerResponse.BodyBu
return applyHeaders(headers, request);
}

private static ServerResponse.BodyBuilder applyHeaders(final ServerResponse.BodyBuilder builder,
final ServerRequest request) {

private static ServerResponse.BodyBuilder applyHeaders(ServerResponse.BodyBuilder builder, ServerRequest request) {
final List<String> connectionHeaders = request.headers().header(HttpHeaders.CONNECTION);
if (hasConnectionValue(connectionHeaders, HEADER_CONNECTION_KEEPALIVE)) {
builder.header(HttpHeaders.CONNECTION, HEADER_CONNECTION_KEEPALIVE);
Expand All @@ -96,8 +93,7 @@ private static ServerResponse.BodyBuilder applyHeaders(final ServerResponse.Body

private static boolean hasConnectionValue(List<String> connectionHeaders, String value) {
return !connectionHeaders.isEmpty() && connectionHeaders.stream()
.map(String::toLowerCase)
.allMatch(Predicate.isEqual(value));
.map(String::toLowerCase)
.allMatch(Predicate.isEqual(value));
}

}
14 changes: 14 additions & 0 deletions src/main/java/org/prebid/cache/config/ObjectMapperConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.prebid.cache.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ObjectMapperConfig {

@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper();
}
}
16 changes: 11 additions & 5 deletions src/main/java/org/prebid/cache/handlers/ErrorHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,24 @@
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

import java.util.Objects;

@Component
@Slf4j
public class ErrorHandler extends MetricsHandler {
public class ErrorHandler {

private static final String RESOURCE_NOT_FOUND_BAD_URL = "Resource Not Found - Bad URL.";
private static final String RESOURCE_NOT_FOUND = "Resource Not Found: uuid %s";
private static final String INVALID_PARAMETERS = "Invalid Parameter(s): uuid not found.";
private static final String NO_ELEMENTS_FOUND = "No Elements Found.";

private final MetricsRecorder metricsRecorder;
private final PrebidServerResponseBuilder builder;

@Autowired
public ErrorHandler(final MetricsRecorder metricsRecorder, final PrebidServerResponseBuilder builder) {
this.metricsRecorder = metricsRecorder;
this.builder = builder;
public ErrorHandler(MetricsRecorder metricsRecorder, PrebidServerResponseBuilder builder) {
this.metricsRecorder = Objects.requireNonNull(metricsRecorder);
this.builder = Objects.requireNonNull(builder);
}

public static Mono<ServerResponse> createResourceNotFound(String uuid) {
Expand All @@ -37,7 +43,7 @@ public static Mono<ServerResponse> createNoElementsFound() {
return Mono.error(new BadRequestException(NO_ELEMENTS_FOUND));
}

public Mono<ServerResponse> invalidRequest(final ServerRequest request) {
public Mono<ServerResponse> invalidRequest(ServerRequest request) {
metricsRecorder.getInvalidRequestMeter().increment();
return builder.error(Mono.just(new ResourceNotFoundException(RESOURCE_NOT_FOUND_BAD_URL)), request);
}
Expand Down
9 changes: 0 additions & 9 deletions src/main/java/org/prebid/cache/handlers/MetricsHandler.java

This file was deleted.

2 changes: 1 addition & 1 deletion src/main/java/org/prebid/cache/handlers/PayloadType.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public enum PayloadType {
@JsonValue
private final String text;

PayloadType(final String text) {
PayloadType(String text) {
this.text = text;
}

Expand Down
123 changes: 66 additions & 57 deletions src/main/java/org/prebid/cache/handlers/cache/CacheHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,114 +2,123 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.prebid.cache.builders.PrebidServerResponseBuilder;
import org.prebid.cache.exceptions.BadRequestException;
import org.prebid.cache.exceptions.DuplicateKeyException;
import org.prebid.cache.exceptions.RepositoryException;
import org.prebid.cache.exceptions.RequestParsingException;
import org.prebid.cache.exceptions.ResourceNotFoundException;
import org.prebid.cache.exceptions.UnsupportedMediaTypeException;
import org.prebid.cache.handlers.MetricsHandler;
import org.prebid.cache.handlers.ServiceType;
import org.prebid.cache.log.ConditionalLogger;
import org.prebid.cache.metrics.MetricsRecorder;
import org.prebid.cache.metrics.MetricsRecorder.MetricsRecorderTimer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.DataBufferLimitException;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.UnsupportedMediaTypeStatusException;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;

import java.util.Objects;
import java.util.concurrent.TimeoutException;

@Component
@Slf4j
public abstract class CacheHandler extends MetricsHandler {
public class CacheHandler {

private static final ConditionalLogger CONDITIONAL_LOGGER = new ConditionalLogger(log);

public static final String ID_KEY = "uuid";
public static final String CACHE_HOST_KEY = "ch";
private static final int UNKNOWN_SIZE_VALUE = 1;
ServiceType type;
static final String ID_KEY = "uuid";
static final String CACHE_HOST_KEY = "ch";
private static final String UUID_DUPLICATION = "UUID duplication.";

protected String metricTagPrefix;
private final PrebidServerResponseBuilder builder;
private final MetricsRecorder metricsRecorder;
private final double samplingRate;

private final ConditionalLogger conditionalLogger;
private final Double samplingRate;
@Autowired
public CacheHandler(PrebidServerResponseBuilder builder,
MetricsRecorder metricsRecorder,
@Value("${sampling.rate:0.01}") double samplingRate) {

protected CacheHandler(Double samplingRate) {
this.builder = Objects.requireNonNull(builder);
this.metricsRecorder = Objects.requireNonNull(metricsRecorder);
this.samplingRate = samplingRate;
this.conditionalLogger = new ConditionalLogger(log);
}

public <T> Mono<T> validateErrorResult(final Mono<T> mono) {
return mono.doOnSuccess(v -> log.debug("{}: {}", type, v))
public MetricsRecorder.MetricsRecorderTimer timerContext(ServiceType type, String metricTagPrefix) {
metricsRecorder.markMeterForTag(metricTagPrefix, MetricsRecorder.MeasurementTag.REQUEST);
return metricsRecorder.createRequestTimerForServiceType(type);
}

public <T> Mono<T> validateErrorResult(ServiceType type, Mono<T> mono) {
return mono
.doOnSuccess(v -> log.debug("{}: {}", type, v))
.onErrorMap(DuplicateKeyException.class, error -> {
metricsRecorder.getExistingKeyError().increment();
return new BadRequestException(UUID_DUPLICATION);
return new BadRequestException("UUID duplication.");
})
.onErrorMap(org.springframework.core.codec.DecodingException.class, error ->
new RequestParsingException(error.toString()))
.onErrorMap(org.springframework.web.server.UnsupportedMediaTypeStatusException.class, error ->
.onErrorMap(DecodingException.class, error -> new RequestParsingException(error.toString()))
.onErrorMap(UnsupportedMediaTypeStatusException.class, error ->
new UnsupportedMediaTypeException(error.toString()));
}

Mono<ServerResponse> finalizeResult(final Mono<ServerResponse> mono,
final ServerRequest request,
final MetricsRecorderTimer timerContext) {
public Mono<ServerResponse> finalizeResult(Mono<ServerResponse> mono,
ServerRequest request,
MetricsRecorderTimer timerContext,
String metricTagPrefix) {

// transform to error, if needed and send metrics
return mono
.onErrorResume(throwable -> handleErrorMetrics(throwable, request))
.doOnEach(signal -> {
if (timerContext != null)
.onErrorResume(throwable -> handleErrorMetrics(throwable, request, metricTagPrefix))
.doOnNext(ignored -> {
if (timerContext != null) {
timerContext.stop();
}
});
}

private Mono<ServerResponse> handleErrorMetrics(final Throwable error, final ServerRequest request) {
if (error instanceof RepositoryException) {
recordMetric(MetricsRecorder.MeasurementTag.ERROR_DB);
} else if (error instanceof ResourceNotFoundException) {
conditionalLogger.info(
private Mono<ServerResponse> handleErrorMetrics(Throwable error, ServerRequest request, String metricTagPrefix) {
switch (error) {
case RepositoryException repositoryException ->
metricsRecorder.markMeterForTag(metricTagPrefix, MetricsRecorder.MeasurementTag.ERROR_DB);
case ResourceNotFoundException resourceNotFoundException -> CONDITIONAL_LOGGER.info(
error.getMessage()
+ ". Refererring URLs: " + request.headers().header(HttpHeaders.REFERER)
+ ". Request URI: " + request.uri(),
samplingRate);
} else if (error instanceof BadRequestException) {
log.error(error.getMessage());
} else if (error instanceof TimeoutException) {
metricsRecorder.markMeterForTag(this.metricTagPrefix, MetricsRecorder.MeasurementTag.ERROR_TIMEDOUT);
} else if (error instanceof DataBufferLimitException) {
final long contentLength = request.headers().contentLength().orElse(UNKNOWN_SIZE_VALUE);
conditionalLogger.error(
"Request length: `" + contentLength + "` exceeds maximum size limit",
samplingRate);
} else {
log.error("Error occurred while processing the request: '{}', cause: '{}'",
ExceptionUtils.getMessage(error), ExceptionUtils.getMessage(error));
case BadRequestException badRequestException -> log.error(error.getMessage());
case TimeoutException timeoutException ->
metricsRecorder.markMeterForTag(metricTagPrefix, MetricsRecorder.MeasurementTag.ERROR_TIMEDOUT);
case DataBufferLimitException dataBufferLimitException -> {
final long contentLength = request.headers().contentLength().orElse(UNKNOWN_SIZE_VALUE);
CONDITIONAL_LOGGER.error(
"Request length: `" + contentLength + "` exceeds maximum size limit",
samplingRate);
}
default -> log.error(
"Error occurred while processing the request: '{}', cause: '{}'",
ExceptionUtils.getMessage(error),
ExceptionUtils.getMessage(error));
}

return builder.error(Mono.just(error), request)
.doOnEach(signal -> handleErrorStatusCodes(request, signal));
.doOnSuccess(response -> handleErrorStatusCodes(response, metricTagPrefix));
}

private void handleErrorStatusCodes(ServerRequest request, Signal<ServerResponse> signal) {
final var response = signal.get();
HttpMethod method = request.method();
if (method == null || signal.isOnError() || response == null) {
recordMetric(MetricsRecorder.MeasurementTag.ERROR_UNKNOWN);
} else if (response.statusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
recordMetric(MetricsRecorder.MeasurementTag.ERROR_UNKNOWN);
private void handleErrorStatusCodes(ServerResponse response, String metricTagPrefix) {
if (response == null || response.statusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
metricsRecorder.markMeterForTag(metricTagPrefix, MetricsRecorder.MeasurementTag.ERROR_UNKNOWN);
} else if (response.statusCode() == HttpStatus.BAD_REQUEST) {
recordMetric(MetricsRecorder.MeasurementTag.ERROR_BAD_REQUEST);
metricsRecorder.markMeterForTag(metricTagPrefix, MetricsRecorder.MeasurementTag.ERROR_BAD_REQUEST);
} else if (response.statusCode() == HttpStatus.NOT_FOUND) {
recordMetric(MetricsRecorder.MeasurementTag.ERROR_MISSINGID);
metricsRecorder.markMeterForTag(metricTagPrefix, MetricsRecorder.MeasurementTag.ERROR_MISSINGID);
}
}

private void recordMetric(MetricsRecorder.MeasurementTag tag) {
metricsRecorder.markMeterForTag(this.metricTagPrefix, tag);
}

}
Loading

0 comments on commit 841e74f

Please sign in to comment.