From 91d55286440923e060f0beb84c98b9e1d90c02e5 Mon Sep 17 00:00:00 2001 From: Josep Boix Requesens Date: Thu, 7 Nov 2024 15:36:55 +0100 Subject: [PATCH] feat: add alias creation and refactor setup process Resolves #15 by introducing an alias in the OpenSearch setup to filter out robot-generated events, allowing Grafana charts to focus on human-generated events. Changes made: - Added alias creation as a dedicated setup task, enabling filtering by default in Grafana of robot generated events. - Refactored the setup process into single-responsibility tasks: policy creation, index creation, and alias creation. Tasks are injected in the execution order using the `@Order` annotation. - Created an `ApplicationRunner` to orchestrate the startup process, handling both OpenSearch setup and event dispatcher client initialization. - Added `MockWebServer` as a test dependency to facilitate testing of OpenSearch setup. - Renamed `SseClient` to `EventDispatcherClient` to align with service naming conventions. - Added jacoco coverage report for pull requests. --- .github/workflows/quality.yml | 7 + build.gradle.kts | 17 ++ .../DataTransferApplicationRunner.kt | 71 +++++++++ .../PillarboxDataTransferApplication.kt | 11 +- ...{SseClient.kt => EventDispatcherClient.kt} | 32 ++-- .../monitoring/event/SetupService.kt | 147 ------------------ .../monitoring/event/setup/AliasSetupTask.kt | 89 +++++++++++ .../event/setup/ISMPolicySetupTask.kt | 80 ++++++++++ .../monitoring/event/setup/IndexSetupTask.kt | 80 ++++++++++ .../setup/OpenSearchSetupConfiguration.kt | 24 +++ .../event/setup/OpenSearchSetupService.kt | 69 ++++++++ .../event/setup/OpenSearchSetupTask.kt | 15 ++ .../pillarbox/monitoring/io/ResourceUtils.kt | 16 ++ src/main/resources/opensearch/alias.json | 19 +++ .../event/setup/AliasSetupTaskTest.kt | 84 ++++++++++ .../event/setup/ISMPolicySetupTaskTest.kt | 84 ++++++++++ .../event/setup/IndexSetupTaskTest.kt | 84 ++++++++++ .../event/setup/OpenSearchSetupServiceTest.kt | 92 +++++++++++ .../monitoring/test/MockWebServerUtils.kt | 14 ++ .../PillarboxMonitoringTestConfiguration.kt | 35 +++++ 20 files changed, 892 insertions(+), 178 deletions(-) create mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/DataTransferApplicationRunner.kt rename src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/{SseClient.kt => EventDispatcherClient.kt} (74%) delete mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/SetupService.kt create mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/AliasSetupTask.kt create mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/ISMPolicySetupTask.kt create mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/IndexSetupTask.kt create mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupConfiguration.kt create mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupService.kt create mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupTask.kt create mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/io/ResourceUtils.kt create mode 100644 src/main/resources/opensearch/alias.json create mode 100644 src/test/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/AliasSetupTaskTest.kt create mode 100644 src/test/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/ISMPolicySetupTaskTest.kt create mode 100644 src/test/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/IndexSetupTaskTest.kt create mode 100644 src/test/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupServiceTest.kt create mode 100644 src/test/kotlin/ch/srgssr/pillarbox/monitoring/test/MockWebServerUtils.kt create mode 100644 src/test/kotlin/ch/srgssr/pillarbox/monitoring/test/PillarboxMonitoringTestConfiguration.kt diff --git a/.github/workflows/quality.yml b/.github/workflows/quality.yml index 8c73f01..4b6f825 100644 --- a/.github/workflows/quality.yml +++ b/.github/workflows/quality.yml @@ -24,6 +24,13 @@ jobs: - name: Run Build run: ./gradlew build + - name: Add coverage to PR + id: jacoco + uses: madrapps/jacoco-report@v1.7.1 + with: + paths: ${{ github.workspace }}/**/build/reports/jacoco/test/jacocoTestReport.xml + token: ${{ secrets.GITHUB_TOKEN }} + - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 diff --git a/build.gradle.kts b/build.gradle.kts index 77e1a41..a6b08b2 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -8,6 +8,7 @@ plugins { id("io.spring.dependency-management") version "1.1.6" id("io.gitlab.arturbosch.detekt") version "1.23.7" id("org.jlleitschuh.gradle.ktlint") version "12.1.1" + id("jacoco") } group = "ch.srgssr.pillarbox" @@ -37,6 +38,9 @@ dependencies { testImplementation("org.springframework.boot:spring-boot-starter-test") testImplementation("io.kotest.extensions:kotest-extensions-spring:1.3.0") testImplementation("org.jetbrains.kotlin:kotlin-test-junit5") + testImplementation("io.mockk:mockk:1.13.13") + testImplementation("com.squareup.okhttp3:mockwebserver") + testImplementation("com.squareup.okhttp3:okhttp") testRuntimeOnly("org.junit.platform:junit-platform-launcher") } @@ -65,6 +69,18 @@ ktlint { } } +jacoco { + toolVersion = "0.8.12" +} + +tasks.jacocoTestReport { + dependsOn(tasks.test) + reports { + xml.required.set(true) + html.required.set(false) + } +} + tasks.jar.configure { enabled = false } @@ -80,6 +96,7 @@ tasks.bootJar.configure { tasks.withType { useJUnitPlatform() + finalizedBy("jacocoTestReport") } val updateVersion by tasks.registering { diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/DataTransferApplicationRunner.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/DataTransferApplicationRunner.kt new file mode 100644 index 0000000..d063546 --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/DataTransferApplicationRunner.kt @@ -0,0 +1,71 @@ +package ch.srgssr.pillarbox.monitoring + +import ch.srgssr.pillarbox.monitoring.event.EventDispatcherClient +import ch.srgssr.pillarbox.monitoring.event.setup.OpenSearchSetupService +import ch.srgssr.pillarbox.monitoring.exception.RetryExhaustedException +import ch.srgssr.pillarbox.monitoring.log.logger +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import org.springframework.boot.ApplicationArguments +import org.springframework.boot.ApplicationRunner +import org.springframework.context.annotation.Profile +import org.springframework.stereotype.Component + +/** + * DataTransferApplicationRunner is responsible for initializing the OpenSearch setup and + * starting the Event dispatcher client upon application start-up. + * + * If either the OpenSearch setup or the SSE connection fails irrecoverably, the application + * is terminated via [TerminationService]. + * + * @property openSearchSetupService Service responsible for initializing and validating the OpenSearch setup. + * @property eventDispatcherClient The client responsible for establishing a connection to the event dispatcher. + * @property terminationService Responsible for gracefully terminating the application if critical failures occur + * during either OpenSearch setup or SSE connection. + */ +@Component +@Profile("!test") +class DataTransferApplicationRunner( + private val openSearchSetupService: OpenSearchSetupService, + private val eventDispatcherClient: EventDispatcherClient, + private val terminationService: TerminationService, +) : ApplicationRunner { + private companion object { + private val logger = logger() + } + + /** + * Executes the OpenSearch setup task when the application starts. + * + * Upon successful setup, it initiates the [EventDispatcherClient]. If the setup fails, + * an error is logged and the application is terminated. + * + * @param args Application arguments. + */ + override fun run(args: ApplicationArguments?) { + openSearchSetupService.start().subscribe( + { this.startSseClient() }, + { + logger.error("Failed to connect to OpenSearch:", it) + CoroutineScope(Dispatchers.IO).launch { + terminationService.terminateApplication() + } + }, + ) + } + + private fun startSseClient() { + eventDispatcherClient.start().subscribe( + { }, + { + if (it is RetryExhaustedException) { + logger.error("Failed to connect to SSE after retries, terminating application.", it) + CoroutineScope(Dispatchers.IO).launch { + terminationService.terminateApplication() + } + } + }, + ) + } +} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/PillarboxDataTransferApplication.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/PillarboxDataTransferApplication.kt index f43bb06..dc690d4 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/PillarboxDataTransferApplication.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/PillarboxDataTransferApplication.kt @@ -1,7 +1,5 @@ package ch.srgssr.pillarbox.monitoring -import ch.srgssr.pillarbox.monitoring.event.SetupService -import jakarta.annotation.PostConstruct import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration import org.springframework.boot.context.properties.ConfigurationPropertiesScan @@ -12,14 +10,7 @@ import org.springframework.boot.runApplication */ @SpringBootApplication(exclude = [ElasticsearchDataAutoConfiguration::class]) @ConfigurationPropertiesScan -class PillarboxDataTransferApplication( - private val setupService: SetupService, -) { - @PostConstruct - fun init() { - setupService.start() - } -} +class PillarboxDataTransferApplication /** * The main function that starts the Spring Boot application. diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/SseClient.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClient.kt similarity index 74% rename from src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/SseClient.kt rename to src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClient.kt index cea83d2..b9bad95 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/SseClient.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClient.kt @@ -1,6 +1,5 @@ package ch.srgssr.pillarbox.monitoring.event -import ch.srgssr.pillarbox.monitoring.TerminationService import ch.srgssr.pillarbox.monitoring.concurrent.LockManager import ch.srgssr.pillarbox.monitoring.event.config.SseClientConfigurationProperties import ch.srgssr.pillarbox.monitoring.event.model.EventRequest @@ -15,18 +14,17 @@ import org.springframework.web.reactive.function.client.WebClient import org.springframework.web.reactive.function.client.bodyToFlux /** - * Service responsible for managing a Server-Sent Events (SSE) client. The client connects to an SSE endpoint, - * handles incoming events, and manages retry behavior in case of connection failures. + * Service responsible for managing a Server-Sent Events (SSE) connection to the event dispatcher service. + * It handles incoming events, and manages retry behavior in case of connection failures. * * @property eventService The service used to handle incoming events. * @property properties The SSE client configuration containing the URI and retry settings. - * @property terminationService The service responsible for terminating the application in case of critical failures. + * @property lockManager The session based lock manager. */ @Service -class SseClient( +class EventDispatcherClient( private val eventService: EventService, private val properties: SseClientConfigurationProperties, - private val terminationService: TerminationService, private val lockManager: LockManager, ) { private companion object { @@ -40,7 +38,7 @@ class SseClient( * Starts the SSE client, connecting to the configured SSE endpoint. It handles incoming events by * delegating to the appropriate event handling methods and manages retries in case of connection failures. */ - fun start() { + fun start() = WebClient .create(properties.uri) .get() @@ -57,20 +55,12 @@ class SseClient( retrySignal.failure(), ) }, - ).subscribe( - { CoroutineScope(Dispatchers.IO).launch { handleEvent(it) } }, - { CoroutineScope(Dispatchers.IO).launch { terminateApplication(it) } }, - ) - } - - private fun terminateApplication(error: Throwable) { - if (error is RetryExhaustedException) { - logger.error("Failed to connect after retries, exiting application.", error) - terminationService.terminateApplication() - } else { - logger.error("An error occurred while processing the event.", error) - } - } + ).doOnNext { CoroutineScope(Dispatchers.IO).launch { handleEvent(it) } } + .doOnError { error -> + if (error !is RetryExhaustedException) { + logger.error("An error occurred while processing the event.", error) + } + } private suspend fun handleEvent(eventRequest: EventRequest) { lockManager[eventRequest.sessionId].withLock { diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/SetupService.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/SetupService.kt deleted file mode 100644 index 8baca55..0000000 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/SetupService.kt +++ /dev/null @@ -1,147 +0,0 @@ -package ch.srgssr.pillarbox.monitoring.event - -import ch.srgssr.pillarbox.monitoring.TerminationService -import ch.srgssr.pillarbox.monitoring.event.repository.OpenSearchConfigurationProperties -import ch.srgssr.pillarbox.monitoring.log.error -import ch.srgssr.pillarbox.monitoring.log.logger -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.launch -import org.springframework.core.io.ResourceLoader -import org.springframework.http.HttpHeaders -import org.springframework.http.HttpStatusCode -import org.springframework.http.MediaType -import org.springframework.http.ResponseEntity -import org.springframework.stereotype.Service -import org.springframework.web.reactive.function.client.WebClient -import reactor.core.publisher.Mono - -/** - * Service responsible for setting up the OpenSearch environment and ensuring - * the necessary configurations are in place before the application starts - * processing Server-Sent Events (SSE). This service performs health checks, - * applies Index State Management (ISM) policies, and creates indices if they do not exist. - * - * @property properties OpenSearch configuration properties including the URI and retry settings. - * @property resourceLoader Loads resources such as JSON files for OpenSearch index templates and ISM policies. - * @property terminationService Responsible for terminating the application in case of failure. - * @property sseClient The client used to connect to the SSE endpoint once OpenSearch setup is complete. - */ -@Service -class SetupService( - private val properties: OpenSearchConfigurationProperties, - private val resourceLoader: ResourceLoader, - private val terminationService: TerminationService, - private val sseClient: SseClient, -) { - private val webClient: WebClient = WebClient.create(properties.uri.toString()) - - private companion object { - /** - * Logger instance for logging within this service. - */ - private val logger = logger() - - /** - * Path for creating the OpenSearch index. - */ - private const val INDEX_CREATION_PATH = "/actions-000001" - - /** - * Path for the Index State Management (ISM) policy in OpenSearch. - */ - private const val ISM_POLICY_PATH = "/_plugins/_ism/policies/actions_policy" - } - - /** - * Starts the OpenSearch setup process. This method checks the health of the OpenSearch cluster, - * applies the ISM policy, and creates the index if necessary. Once all setup tasks are complete, - * it starts the SSE client to begin receiving events. - */ - fun start() { - checkOpenSearchHealth() - .retryWhen( - properties.retry.create().doBeforeRetry { - logger.info("Retrying OpenSearch health check...") - }, - ).doOnSuccess { logger.info("OpenSearch is healthy, proceeding with setup...") } - .then(checkAndApplyISMPolicy()) - .then(checkAndCreateIndex()) - .doOnSuccess { logger.info("All setup tasks are completed, starting SSE client...") } - .subscribe( - { sseClient.start() }, - { CoroutineScope(Dispatchers.IO).launch { terminateApplication(it) } }, - ) - } - - private fun terminateApplication(error: Throwable) { - logger.error("Failed to connect to OpenSearch:", error) - terminationService.terminateApplication() - } - - // Check OpenSearch health using WebClient - private fun checkOpenSearchHealth(): Mono> = - webClient - .get() - .uri("/") - .retrieve() - .toBodilessEntity() - - private fun checkAndCreateIndex(): Mono> = - webClient - .head() - .uri(INDEX_CREATION_PATH) - .retrieve() - .onStatus(HttpStatusCode::is4xxClientError) { - logger.info("Index does not exist, creating index...") - createIndex().then(Mono.empty()) - }.onStatus(HttpStatusCode::is2xxSuccessful) { - logger.info("Index already exists, skipping creation.") - Mono.empty() - }.toBodilessEntity() - - private fun createIndex(): Mono> { - val indexTemplateJson = loadResource("classpath:opensearch/index_template.json") - return webClient - .put() - .uri(INDEX_CREATION_PATH) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .bodyValue(indexTemplateJson) - .retrieve() - .toBodilessEntity() - .doOnSuccess { logger.info("Index created successfully") } - .doOnError { e -> logger.error { "Failed to create index: ${e.message}" } } - } - - private fun checkAndApplyISMPolicy(): Mono> = - webClient - .get() - .uri(ISM_POLICY_PATH) - .retrieve() - .onStatus(HttpStatusCode::is4xxClientError) { - logger.info("ISM policy does not exist, creating new ISM policy...") - applyISMPolicy().then(Mono.empty()) - }.onStatus(HttpStatusCode::is2xxSuccessful) { - logger.info("ISM policy already exists, skipping creation.") - Mono.empty() - }.toBodilessEntity() - - private fun applyISMPolicy(): Mono> { - val ismPolicyJson = loadResource("classpath:opensearch/ism_policy.json") - - return webClient - .put() - .uri(ISM_POLICY_PATH) - .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .bodyValue(ismPolicyJson) - .retrieve() - .toBodilessEntity() - .doOnSuccess { logger.info("ISM Policy applied successfully") } - .doOnError { e -> logger.error { "Failed to apply ISM Policy: ${e.message}" } } - } - - private fun loadResource(location: String): String { - val resource = resourceLoader.getResource(location) - return resource.inputStream.bufferedReader().use { it.readText() } - } -} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/AliasSetupTask.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/AliasSetupTask.kt new file mode 100644 index 0000000..a183d8e --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/AliasSetupTask.kt @@ -0,0 +1,89 @@ +package ch.srgssr.pillarbox.monitoring.event.setup + +import ch.srgssr.pillarbox.monitoring.io.loadResourceContent +import ch.srgssr.pillarbox.monitoring.log.error +import ch.srgssr.pillarbox.monitoring.log.logger +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.core.annotation.Order +import org.springframework.core.io.ResourceLoader +import org.springframework.http.HttpHeaders +import org.springframework.http.HttpStatusCode +import org.springframework.http.MediaType +import org.springframework.http.ResponseEntity +import org.springframework.stereotype.Component +import org.springframework.web.reactive.function.client.WebClient +import reactor.core.publisher.Mono + +/** + * Task responsible for setting up a filtered alias in OpenSearch. + * + * This task checks if the specified alias exists in OpenSearch. If it does not, it loads the alias + * configuration from `resources/opensearch/alias.json` and creates it. + * + * @property webClient WebClient instance used to interact with the OpenSearch API. + * @property resourceLoader Resource loader used to access the alias configuration JSON file. + */ +@Component +@Order(3) +class AliasSetupTask( + @Qualifier("openSearchWebClient") + private val webClient: WebClient, + private val resourceLoader: ResourceLoader, +) : OpenSearchSetupTask { + companion object { + /** + * Logger instance for logging within this task. + */ + private val logger = logger() + + /** + * Name of the filtered alias. + */ + private const val ALIAS_NAME = "filtered_actions" + + /** + * Path to check for the existence of the alias in OpenSearch. + */ + private const val ALIAS_CHECK_PATH = "/_alias/${ALIAS_NAME}" + + /** + * Path to create the alias in OpenSearch. + */ + private const val ALIAS_CREATION_PATH = "/_aliases/${ALIAS_NAME}" + } + + /** + * Runs the alias setup task. + * + * Checks if the alias exists: If not, creates the alias. + * + * @return Mono indicating the completion of the task. + */ + override fun run(): Mono<*> = checkAndCreateAlias() + + private fun checkAndCreateAlias(): Mono> = + webClient + .get() + .uri(ALIAS_CHECK_PATH) + .retrieve() + .onStatus(HttpStatusCode::is4xxClientError) { + logger.info("Alias '$ALIAS_NAME' does not exist, creating alias...") + createAlias().then(Mono.empty()) + }.onStatus(HttpStatusCode::is2xxSuccessful) { + logger.info("Alias '$ALIAS_NAME' already exists, skipping creation.") + Mono.empty() + }.toBodilessEntity() + + private fun createAlias(): Mono> { + val indexTemplateJson = resourceLoader.loadResourceContent("classpath:opensearch/alias.json") + return webClient + .put() + .uri(ALIAS_CREATION_PATH) + .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) + .bodyValue(indexTemplateJson) + .retrieve() + .toBodilessEntity() + .doOnSuccess { logger.info("Alias ${ALIAS_NAME} created successfully") } + .doOnError { e -> logger.error { "Failed to create alias: ${e.message}" } } + } +} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/ISMPolicySetupTask.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/ISMPolicySetupTask.kt new file mode 100644 index 0000000..200bcf1 --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/ISMPolicySetupTask.kt @@ -0,0 +1,80 @@ +package ch.srgssr.pillarbox.monitoring.event.setup + +import ch.srgssr.pillarbox.monitoring.io.loadResourceContent +import ch.srgssr.pillarbox.monitoring.log.error +import ch.srgssr.pillarbox.monitoring.log.logger +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.core.annotation.Order +import org.springframework.core.io.ResourceLoader +import org.springframework.http.HttpHeaders +import org.springframework.http.HttpStatusCode +import org.springframework.http.MediaType +import org.springframework.http.ResponseEntity +import org.springframework.stereotype.Component +import org.springframework.web.reactive.function.client.WebClient +import reactor.core.publisher.Mono + +/** + * Task responsible for setting up the Index State Management (ISM) policy in OpenSearch. + * + * This task checks if the ISM policy already exists in OpenSearch. If it does not, it loads + * the ISM policy configuration from `resources/opensearch/ism_policy.json`. + * + * @property webClient WebClient instance for interacting with the OpenSearch API. + * @property resourceLoader Resource loader for accessing the ISM policy configuration file. + */ +@Component +@Order(1) +class ISMPolicySetupTask( + @Qualifier("openSearchWebClient") + private val webClient: WebClient, + private val resourceLoader: ResourceLoader, +) : OpenSearchSetupTask { + private companion object { + /** + * Logger instance for logging within this task. + */ + private val logger = logger() + + /** + * Path for the Index State Management (ISM) policy in OpenSearch. + */ + private const val ISM_POLICY_PATH = "/_plugins/_ism/policies/actions_policy" + } + + /** + * Runs the ISM policy setup task. + * + * Checks if the ISM policy is already present; if not, applies the default policy, + * + * @return Mono indicating the completion of the task. + */ + override fun run(): Mono<*> = checkAndApplyISMPolicy() + + private fun checkAndApplyISMPolicy(): Mono> = + webClient + .get() + .uri(ISM_POLICY_PATH) + .retrieve() + .onStatus(HttpStatusCode::is4xxClientError) { + logger.info("ISM policy does not exist, creating new ISM policy...") + applyISMPolicy().then(Mono.empty()) + }.onStatus(HttpStatusCode::is2xxSuccessful) { + logger.info("ISM policy already exists, skipping creation.") + Mono.empty() + }.toBodilessEntity() + + private fun applyISMPolicy(): Mono> { + val ismPolicyJson = resourceLoader.loadResourceContent("classpath:opensearch/ism_policy.json") + + return webClient + .put() + .uri(ISM_POLICY_PATH) + .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) + .bodyValue(ismPolicyJson) + .retrieve() + .toBodilessEntity() + .doOnSuccess { logger.info("ISM Policy applied successfully") } + .doOnError { e -> logger.error { "Failed to apply ISM Policy: ${e.message}" } } + } +} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/IndexSetupTask.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/IndexSetupTask.kt new file mode 100644 index 0000000..ba8c146 --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/IndexSetupTask.kt @@ -0,0 +1,80 @@ +package ch.srgssr.pillarbox.monitoring.event.setup + +import ch.srgssr.pillarbox.monitoring.io.loadResourceContent +import ch.srgssr.pillarbox.monitoring.log.error +import ch.srgssr.pillarbox.monitoring.log.logger +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.core.annotation.Order +import org.springframework.core.io.ResourceLoader +import org.springframework.http.HttpHeaders +import org.springframework.http.HttpStatusCode +import org.springframework.http.MediaType +import org.springframework.http.ResponseEntity +import org.springframework.stereotype.Component +import org.springframework.web.reactive.function.client.WebClient +import reactor.core.publisher.Mono + +/** + * Task responsible for setting up an OpenSearch index if it does not already exist. + * + * This task checks for the existence of the OpenSearch index If the index + * does not exist, it creates the index from the template stored + * in `resources/opensearch/index_template.json`. + * + * @property webClient WebClient instance used to interact with the OpenSearch API. + * @property resourceLoader Resource loader used to access the index template JSON file. + */ +@Component +@Order(2) +class IndexSetupTask( + @Qualifier("openSearchWebClient") + private val webClient: WebClient, + private val resourceLoader: ResourceLoader, +) : OpenSearchSetupTask { + private companion object { + /** + * Logger instance for logging within this task. + */ + private val logger = logger() + + /** + * Path for creating the OpenSearch index. + */ + private const val INDEX_CREATION_PATH = "/actions-000001" + } + + /** + * Runs the index setup task. + * + * Checks if the OpenSearch index exists; if not, creates the index. + * + * @return Mono indicating the completion of the task. + */ + override fun run(): Mono<*> = checkAndCreateIndex() + + private fun checkAndCreateIndex(): Mono> = + webClient + .head() + .uri(INDEX_CREATION_PATH) + .retrieve() + .onStatus(HttpStatusCode::is4xxClientError) { + logger.info("Index does not exist, creating index...") + createIndex().then(Mono.empty()) + }.onStatus(HttpStatusCode::is2xxSuccessful) { + logger.info("Index already exists, skipping creation.") + Mono.empty() + }.toBodilessEntity() + + private fun createIndex(): Mono> { + val indexTemplateJson = resourceLoader.loadResourceContent("classpath:opensearch/index_template.json") + return webClient + .put() + .uri(INDEX_CREATION_PATH) + .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) + .bodyValue(indexTemplateJson) + .retrieve() + .toBodilessEntity() + .doOnSuccess { logger.info("Index created successfully") } + .doOnError { e -> logger.error { "Failed to create index: ${e.message}" } } + } +} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupConfiguration.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupConfiguration.kt new file mode 100644 index 0000000..f959108 --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupConfiguration.kt @@ -0,0 +1,24 @@ +package ch.srgssr.pillarbox.monitoring.event.setup + +import ch.srgssr.pillarbox.monitoring.event.repository.OpenSearchConfigurationProperties +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.web.reactive.function.client.WebClient + +/** + * Configuration class for OpenSearch setup. + * + * Provides a WebClient bean configured with the OpenSearch URI. + */ +@Configuration +class OpenSearchSetupConfiguration { + /** + * Creates a WebClient bean for OpenSearch using the specified URI from the properties. + * + * @param properties OpenSearch configuration properties containing the URI. + * @return Configured WebClient instance for OpenSearch. + */ + @Bean("openSearchWebClient") + fun openSearchWebClient(properties: OpenSearchConfigurationProperties): WebClient = + WebClient.create(properties.uri.toString()) +} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupService.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupService.kt new file mode 100644 index 0000000..e454844 --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupService.kt @@ -0,0 +1,69 @@ +package ch.srgssr.pillarbox.monitoring.event.setup + +import ch.srgssr.pillarbox.monitoring.event.repository.OpenSearchConfigurationProperties +import ch.srgssr.pillarbox.monitoring.log.logger +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.http.ResponseEntity +import org.springframework.stereotype.Service +import org.springframework.web.reactive.function.client.WebClient +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +/** + * Service responsible for setting up the OpenSearch environment and ensuring + * the necessary configurations are in place before the application starts + * processing Server-Sent Events (SSE). + * + * Discovers all [OpenSearchSetupTask] in the context and executes them sequentially. + * + * @property webClient The web client instance configured for OpenSearch. + * @property tasks The list of setup tasks that must be executed to prepare the OpenSearch environment. + * @property properties OpenSearch configuration properties including the URI and retry settings. + */ +@Service +class OpenSearchSetupService( + @Qualifier("openSearchWebClient") + private val webClient: WebClient, + private val tasks: List, + private val properties: OpenSearchConfigurationProperties, +) { + private companion object { + private val logger = logger() + } + + /** + * Starts the OpenSearch setup process. + * + * This function begins by checking the health of the OpenSearch instance. If + * the health check passes, it proceeds to run all setup tasks in sequence. Once + * all tasks are complete, the SSE client is started. + * + * If the health check or any setup task fails, the process will retry based on + * the retry settings defined in [OpenSearchConfigurationProperties]. If retries + * are exhausted, the application will be terminated. + */ + fun start() = + checkOpenSearchHealth() + .retryWhen( + properties.retry.create().doBeforeRetry { + logger.info("Retrying OpenSearch health check...") + }, + ).doOnSuccess { logger.info("OpenSearch is healthy, proceeding with setup...") } + .then(runSetupTasks()) + .doOnSuccess { logger.info("All setup tasks are completed, starting SSE client...") } + + private fun checkOpenSearchHealth(): Mono> = + webClient + .get() + .uri("/") + .retrieve() + .toBodilessEntity() + + private fun runSetupTasks(): Mono<*> = + Flux + .fromIterable(tasks) + .concatMap { task -> + logger.info("Running setup task: ${task::class.simpleName}") + task.run() + }.last() +} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupTask.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupTask.kt new file mode 100644 index 0000000..c286a28 --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupTask.kt @@ -0,0 +1,15 @@ +package ch.srgssr.pillarbox.monitoring.event.setup + +import reactor.core.publisher.Mono + +/** + * An interface marker for an OpenSearch setup class. + */ +interface OpenSearchSetupTask { + /** + * Executes the setup task. + * + * @return A Mono that completes when the task is done. + */ + fun run(): Mono<*> +} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/io/ResourceUtils.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/io/ResourceUtils.kt new file mode 100644 index 0000000..74b3056 --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/io/ResourceUtils.kt @@ -0,0 +1,16 @@ +package ch.srgssr.pillarbox.monitoring.io + +import org.springframework.core.io.ResourceLoader + +/** + * Retrieves the contents of a specified resource file as a String. + * + * @param location The path to the resource (e.g., "classpath:opensearch/index_template.json"). + * @return A String containing the full contents of the resource file. + * + * @throws java.io.IOException if there was a problem retrieving the resource. + */ +fun ResourceLoader.loadResourceContent(location: String): String { + val resource = this.getResource(location) + return resource.inputStream.bufferedReader().use { it.readText() } +} diff --git a/src/main/resources/opensearch/alias.json b/src/main/resources/opensearch/alias.json new file mode 100644 index 0000000..6f47a53 --- /dev/null +++ b/src/main/resources/opensearch/alias.json @@ -0,0 +1,19 @@ +{ + "actions": [ + { + "add": { + "index": "actions-*", + "alias": "filtered_actions", + "filter": { + "bool": { + "must_not": [ + { "term": { "data.robot": true } }, + { "term": { "session.robot": true } } + ] + } + } + } + } + ] +} + diff --git a/src/test/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/AliasSetupTaskTest.kt b/src/test/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/AliasSetupTaskTest.kt new file mode 100644 index 0000000..d898118 --- /dev/null +++ b/src/test/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/AliasSetupTaskTest.kt @@ -0,0 +1,84 @@ +package ch.srgssr.pillarbox.monitoring.event.setup + +import ch.srgssr.pillarbox.monitoring.event.repository.OpenSearchConfigurationProperties +import ch.srgssr.pillarbox.monitoring.test.PillarboxMonitoringTestConfiguration +import ch.srgssr.pillarbox.monitoring.test.createDispatcher +import com.fasterxml.jackson.databind.ObjectMapper +import io.kotest.assertions.throwables.shouldNotThrow +import io.kotest.core.spec.style.ShouldSpec +import io.kotest.matchers.shouldBe +import okhttp3.mockwebserver.MockResponse +import okhttp3.mockwebserver.MockWebServer +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.ActiveProfiles +import org.springframework.test.context.ContextConfiguration + +@SpringBootTest +@ContextConfiguration(classes = [PillarboxMonitoringTestConfiguration::class]) +@ActiveProfiles("test") +class AliasSetupTaskTest( + private val aliasSetupTask: AliasSetupTask, + private val openSearchProperties: OpenSearchConfigurationProperties, + private val objectMapper: ObjectMapper, +) : ShouldSpec({ + + var mockWebServer = MockWebServer() + + beforeTest { + mockWebServer = MockWebServer() + mockWebServer.start(openSearchProperties.uri.port) + } + + afterTest { + mockWebServer.shutdown() + } + + should("should skip creation if alias exists") { + // Given: The alias is already created in opensearch + mockWebServer.dispatcher = + createDispatcher( + mapOf( + "GET" to "/_alias/filtered_actions" to MockResponse().setResponseCode(200), + ), + ) + + // When: The alias setup task is run + aliasSetupTask.run().block() + + // Then: The alias creation endpoint shouldn't have been invoked + mockWebServer.requestCount shouldBe 1 + + mockWebServer.takeRequest().apply { + path shouldBe "/_alias/filtered_actions" + method shouldBe "GET" + } + } + + should("should not skip creation if alias doesn't exists") { + // Given: The alias is already created in opensearch + mockWebServer.dispatcher = + createDispatcher( + mapOf( + "GET" to "/_alias/filtered_actions" to MockResponse().setResponseCode(404), + "PUT" to "/_aliases/filtered_actions" to MockResponse().setResponseCode(201), + ), + ) + + // When: The alias setup task is run + aliasSetupTask.run().block() + + // Then: The alias creation endpoint should have been invoked + mockWebServer.requestCount shouldBe 2 + + mockWebServer.takeRequest().apply { + path shouldBe "/_alias/filtered_actions" + method shouldBe "GET" + } + + mockWebServer.takeRequest().apply { + path shouldBe "/_aliases/filtered_actions" + method shouldBe "PUT" + shouldNotThrow { objectMapper.readTree(body.readUtf8()) } + } + } + }) diff --git a/src/test/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/ISMPolicySetupTaskTest.kt b/src/test/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/ISMPolicySetupTaskTest.kt new file mode 100644 index 0000000..70fc12a --- /dev/null +++ b/src/test/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/ISMPolicySetupTaskTest.kt @@ -0,0 +1,84 @@ +package ch.srgssr.pillarbox.monitoring.event.setup + +import ch.srgssr.pillarbox.monitoring.event.repository.OpenSearchConfigurationProperties +import ch.srgssr.pillarbox.monitoring.test.PillarboxMonitoringTestConfiguration +import ch.srgssr.pillarbox.monitoring.test.createDispatcher +import com.fasterxml.jackson.databind.ObjectMapper +import io.kotest.assertions.throwables.shouldNotThrow +import io.kotest.core.spec.style.ShouldSpec +import io.kotest.matchers.shouldBe +import okhttp3.mockwebserver.MockResponse +import okhttp3.mockwebserver.MockWebServer +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.ActiveProfiles +import org.springframework.test.context.ContextConfiguration + +@SpringBootTest +@ContextConfiguration(classes = [PillarboxMonitoringTestConfiguration::class]) +@ActiveProfiles("test") +class ISMPolicySetupTaskTest( + private val ismPolicySetupTask: ISMPolicySetupTask, + private val openSearchProperties: OpenSearchConfigurationProperties, + private val objectMapper: ObjectMapper, +) : ShouldSpec({ + + var mockWebServer = MockWebServer() + + beforeTest { + mockWebServer = MockWebServer() + mockWebServer.start(openSearchProperties.uri.port) + } + + afterTest { + mockWebServer.shutdown() + } + + should("should skip creation if the ISM policy exists") { + // Given: The ISM policy is already created in opensearch + mockWebServer.dispatcher = + createDispatcher( + mapOf( + "GET" to "/_plugins/_ism/policies/actions_policy" to MockResponse().setResponseCode(200), + ), + ) + + // When: The ISM policy setup task is run + ismPolicySetupTask.run().block() + + // Then: The ISM policy creation endpoint shouldn't have been invoked + mockWebServer.requestCount shouldBe 1 + + mockWebServer.takeRequest().apply { + path shouldBe "/_plugins/_ism/policies/actions_policy" + method shouldBe "GET" + } + } + + should("should not skip creation if the ISM policy doesn't exists") { + // Given: The ISM policy is not created in opensearch + mockWebServer.dispatcher = + createDispatcher( + mapOf( + "GET" to "/_plugins/_ism/policies/actions_policy" to MockResponse().setResponseCode(404), + "PUT" to "/_plugins/_ism/policies/actions_policy" to MockResponse().setResponseCode(201), + ), + ) + + // When: The ISM policy setup task is run + ismPolicySetupTask.run().block() + + // Then: The ISM policy creation endpoint should have been invoked + mockWebServer.requestCount shouldBe 2 + + mockWebServer.takeRequest().apply { + path shouldBe "/_plugins/_ism/policies/actions_policy" + method shouldBe "GET" + } + + mockWebServer.takeRequest().apply { + path shouldBe "/_plugins/_ism/policies/actions_policy" + method shouldBe "PUT" + shouldNotThrow { objectMapper.readTree(body.readUtf8()) } + } + } + }) diff --git a/src/test/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/IndexSetupTaskTest.kt b/src/test/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/IndexSetupTaskTest.kt new file mode 100644 index 0000000..7a9bd92 --- /dev/null +++ b/src/test/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/IndexSetupTaskTest.kt @@ -0,0 +1,84 @@ +package ch.srgssr.pillarbox.monitoring.event.setup + +import ch.srgssr.pillarbox.monitoring.event.repository.OpenSearchConfigurationProperties +import ch.srgssr.pillarbox.monitoring.test.PillarboxMonitoringTestConfiguration +import ch.srgssr.pillarbox.monitoring.test.createDispatcher +import com.fasterxml.jackson.databind.ObjectMapper +import io.kotest.assertions.throwables.shouldNotThrow +import io.kotest.core.spec.style.ShouldSpec +import io.kotest.matchers.shouldBe +import okhttp3.mockwebserver.MockResponse +import okhttp3.mockwebserver.MockWebServer +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.ActiveProfiles +import org.springframework.test.context.ContextConfiguration + +@SpringBootTest +@ContextConfiguration(classes = [PillarboxMonitoringTestConfiguration::class]) +@ActiveProfiles("test") +class IndexSetupTaskTest( + private val indexSetupTask: IndexSetupTask, + private val openSearchProperties: OpenSearchConfigurationProperties, + private val objectMapper: ObjectMapper, +) : ShouldSpec({ + + var mockWebServer = MockWebServer() + + beforeTest { + mockWebServer = MockWebServer() + mockWebServer.start(openSearchProperties.uri.port) + } + + afterTest { + mockWebServer.shutdown() + } + + should("should skip creation if index exists") { + // Given: The index is already created in opensearch + mockWebServer.dispatcher = + createDispatcher( + mapOf( + "HEAD" to "/actions-000001" to MockResponse().setResponseCode(200), + ), + ) + + // When: The index setup task is run + indexSetupTask.run().block() + + // Then: The index creation endpoint shouldn't have been invoked + mockWebServer.requestCount shouldBe 1 + + mockWebServer.takeRequest().apply { + path shouldBe "/actions-000001" + method shouldBe "HEAD" + } + } + + should("should not skip creation if index doesn't exists") { + // Given: The index is not created in opensearch + mockWebServer.dispatcher = + createDispatcher( + mapOf( + "HEAD" to "/actions-000001" to MockResponse().setResponseCode(404), + "PUT" to "/actions-000001" to MockResponse().setResponseCode(201), + ), + ) + + // When: The index setup task is run + indexSetupTask.run().block() + + // Then: The index creation endpoint should have been invoked + mockWebServer.requestCount shouldBe 2 + + mockWebServer.takeRequest().apply { + path shouldBe "/actions-000001" + method shouldBe "HEAD" + } + + mockWebServer.takeRequest().apply { + path shouldBe "/actions-000001" + method shouldBe "PUT" + shouldNotThrow { objectMapper.readTree(body.readUtf8()) } + } + } + }) diff --git a/src/test/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupServiceTest.kt b/src/test/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupServiceTest.kt new file mode 100644 index 0000000..888ddaa --- /dev/null +++ b/src/test/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupServiceTest.kt @@ -0,0 +1,92 @@ +package ch.srgssr.pillarbox.monitoring.event.setup + +import ch.srgssr.pillarbox.monitoring.event.repository.OpenSearchConfigurationProperties +import ch.srgssr.pillarbox.monitoring.test.PillarboxMonitoringTestConfiguration +import ch.srgssr.pillarbox.monitoring.test.createDispatcher +import io.kotest.assertions.throwables.shouldThrow +import io.kotest.core.spec.style.ShouldSpec +import io.kotest.matchers.shouldBe +import okhttp3.mockwebserver.MockResponse +import okhttp3.mockwebserver.MockWebServer +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.ActiveProfiles +import org.springframework.test.context.ContextConfiguration + +@SpringBootTest +@ContextConfiguration(classes = [PillarboxMonitoringTestConfiguration::class]) +@ActiveProfiles("test") +class OpenSearchSetupServiceTest( + private val openSearchSetupService: OpenSearchSetupService, + private val openSearchProperties: OpenSearchConfigurationProperties, +) : ShouldSpec({ + var mockWebServer = MockWebServer() + + beforeTest { + mockWebServer = MockWebServer() + mockWebServer.start(openSearchProperties.uri.port) + } + + afterTest { + mockWebServer.shutdown() + } + + should("should fail if opensearch is unavailable") { + // Given: opensearch is unavailable + mockWebServer.dispatcher = + createDispatcher( + mapOf( + "GET" to "/" to MockResponse().setResponseCode(500), + ), + ) + + // When: The index setup task is run + shouldThrow { openSearchSetupService.start().block() } + + // Then: The index creation endpoint shouldn't have been invoked + mockWebServer.requestCount shouldBe 1 + + mockWebServer.takeRequest().apply { + path shouldBe "/" + method shouldBe "GET" + } + } + + should("should execute tasks in order: create ism policy, create index and create alias") { + // Given: opensearch is already running and setup + mockWebServer.dispatcher = + createDispatcher( + mapOf( + "GET" to "/" to MockResponse().setResponseCode(200), + "GET" to "/_plugins/_ism/policies/actions_policy" to MockResponse().setResponseCode(200), + "HEAD" to "/actions-000001" to MockResponse().setResponseCode(200), + "GET" to "/_alias/filtered_actions" to MockResponse().setResponseCode(200), + ), + ) + + // When: The opensearch setup service is started + openSearchSetupService.start().block() + + // Then: Tasks should have been executed in order + mockWebServer.requestCount shouldBe 4 + + mockWebServer.takeRequest().apply { + path shouldBe "/" + method shouldBe "GET" + } + + mockWebServer.takeRequest().apply { + path shouldBe "/_plugins/_ism/policies/actions_policy" + method shouldBe "GET" + } + + mockWebServer.takeRequest().apply { + path shouldBe "/actions-000001" + method shouldBe "HEAD" + } + + mockWebServer.takeRequest().apply { + path shouldBe "/_alias/filtered_actions" + method shouldBe "GET" + } + } + }) diff --git a/src/test/kotlin/ch/srgssr/pillarbox/monitoring/test/MockWebServerUtils.kt b/src/test/kotlin/ch/srgssr/pillarbox/monitoring/test/MockWebServerUtils.kt new file mode 100644 index 0000000..0ef3d69 --- /dev/null +++ b/src/test/kotlin/ch/srgssr/pillarbox/monitoring/test/MockWebServerUtils.kt @@ -0,0 +1,14 @@ +package ch.srgssr.pillarbox.monitoring.test + +import okhttp3.mockwebserver.Dispatcher +import okhttp3.mockwebserver.MockResponse +import okhttp3.mockwebserver.RecordedRequest + +fun createDispatcher(responseMap: Map, MockResponse>): Dispatcher { + return object : Dispatcher() { + override fun dispatch(request: RecordedRequest): MockResponse { + val key = request.method to request.path + return responseMap[key] ?: MockResponse().setResponseCode(404) + } + } +} diff --git a/src/test/kotlin/ch/srgssr/pillarbox/monitoring/test/PillarboxMonitoringTestConfiguration.kt b/src/test/kotlin/ch/srgssr/pillarbox/monitoring/test/PillarboxMonitoringTestConfiguration.kt new file mode 100644 index 0000000..5430514 --- /dev/null +++ b/src/test/kotlin/ch/srgssr/pillarbox/monitoring/test/PillarboxMonitoringTestConfiguration.kt @@ -0,0 +1,35 @@ +package ch.srgssr.pillarbox.monitoring.test + +import org.springframework.boot.test.context.TestConfiguration +import java.net.ServerSocket + +// Suppress warning: class is required by Spring's @TestConfiguration even though it only +// contains utility functions. Remove suppression when adding beans to this configuration. +@Suppress("UtilityClassWithPublicConstructor") +@TestConfiguration +class PillarboxMonitoringTestConfiguration { + companion object { + private var openSearchPort: Int = 0 + + init { + openSearchPort = getAvailablePort() + System.setProperty("pillarbox.monitoring.opensearch.uri", "http://localhost:$openSearchPort") + System.setProperty("pillarbox.monitoring.opensearch.retry.max-attempts", "0") + System.setProperty("pillarbox.monitoring.opensearch.retry.initial-interval", "0") + System.setProperty("pillarbox.monitoring.opensearch.retry.max-interval", "0") + + // Register a shutdown hook to release resources if needed + Runtime.getRuntime().addShutdownHook( + Thread { + System.clearProperty("pillarbox.monitoring.opensearch.uri") + }, + ) + } + + private fun getAvailablePort(): Int { + ServerSocket(0).use { socket -> + return socket.localPort + } + } + } +}