Skip to content

Commit

Permalink
feat: add alias creation and refactor setup process
Browse files Browse the repository at this point in the history
- Added alias creation as a dedicated setup task, allowing filtering in Grafana.
- Refactored setup process into single-responsibility tasks: policy, index and alias creation.
- Added MockWebServer as a test dependency to test opensearch setup.
- Renamed `SseClient` to `EventDispatcherClient` to match the service naming.
- Added jacoco coverage report for pull requests.
  • Loading branch information
jboix committed Nov 8, 2024
1 parent 4bb5955 commit d655bc3
Show file tree
Hide file tree
Showing 20 changed files with 902 additions and 178 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/quality.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ jobs:
- name: Run Build
run: ./gradlew build

- name: Add coverage to PR
id: jacoco
uses: madrapps/jacoco-report@v1.7.1
with:
paths: ${{ github.workspace }}/**/build/reports/jacoco/test/jacocoTestReport.xml.xml
token: ${{ secrets.GITHUB_TOKEN }}

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

Expand Down
17 changes: 17 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ plugins {
id("io.spring.dependency-management") version "1.1.6"
id("io.gitlab.arturbosch.detekt") version "1.23.7"
id("org.jlleitschuh.gradle.ktlint") version "12.1.1"
id("jacoco")
}

group = "ch.srgssr.pillarbox"
Expand Down Expand Up @@ -37,6 +38,9 @@ dependencies {
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("io.kotest.extensions:kotest-extensions-spring:1.3.0")
testImplementation("org.jetbrains.kotlin:kotlin-test-junit5")
testImplementation("io.mockk:mockk:1.13.13")
testImplementation("com.squareup.okhttp3:mockwebserver")
testImplementation("com.squareup.okhttp3:okhttp")
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
}

Expand Down Expand Up @@ -65,6 +69,18 @@ ktlint {
}
}

jacoco {
toolVersion = "0.8.12"
}

tasks.jacocoTestReport {
dependsOn(tasks.test)
reports {
xml.required.set(true)
html.required.set(false)
}
}

tasks.jar.configure {
enabled = false
}
Expand All @@ -80,6 +96,7 @@ tasks.bootJar.configure {

tasks.withType<Test> {
useJUnitPlatform()
finalizedBy("jacocoTestReport")
}

val updateVersion by tasks.registering {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package ch.srgssr.pillarbox.monitoring

import ch.srgssr.pillarbox.monitoring.event.EventDispatcherClient
import ch.srgssr.pillarbox.monitoring.event.setup.OpenSearchSetupService
import ch.srgssr.pillarbox.monitoring.exception.RetryExhaustedException
import ch.srgssr.pillarbox.monitoring.log.logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.springframework.boot.ApplicationArguments
import org.springframework.boot.ApplicationRunner
import org.springframework.context.annotation.Profile
import org.springframework.stereotype.Component

/**
* DataTransferApplicationRunner is responsible for initializing the OpenSearch setup and
* starting the Event dispatcher client upon application start-up.
*
* If either the OpenSearch setup or the SSE connection fails irrecoverably, the application
* is terminated via [TerminationService].
*
* @property openSearchSetupService Service responsible for initializing and validating the OpenSearch setup.
* @property eventDispatcherClient The client responsible for establishing a connection to the event dispatcher.
* @property terminationService Responsible for gracefully terminating the application if critical failures occur
* during either OpenSearch setup or SSE connection.
*/
@Component
@Profile("!test")
class DataTransferApplicationRunner(
private val openSearchSetupService: OpenSearchSetupService,
private val eventDispatcherClient: EventDispatcherClient,
private val terminationService: TerminationService,
) : ApplicationRunner {
private companion object {
private val logger = logger()
}

/**
* Executes the OpenSearch setup task when the application starts.
*
* Upon successful setup, it initiates the [EventDispatcherClient]. If the setup fails,
* an error is logged and the application is terminated.
*
* @param args Application arguments.
*/
override fun run(args: ApplicationArguments?) {
openSearchSetupService.start().subscribe(
{ this.startSseClient() },
{
logger.error("Failed to connect to OpenSearch:", it)
CoroutineScope(Dispatchers.IO).launch {
terminationService.terminateApplication()
}
},
)
}

private fun startSseClient() {
eventDispatcherClient.start().subscribe(
{ },
{
if (it is RetryExhaustedException) {
logger.error("Failed to connect to SSE after retries, terminating application.", it)
CoroutineScope(Dispatchers.IO).launch {
terminationService.terminateApplication()
}
}
},
)
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package ch.srgssr.pillarbox.monitoring

import ch.srgssr.pillarbox.monitoring.event.SetupService
import jakarta.annotation.PostConstruct
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration
import org.springframework.boot.context.properties.ConfigurationPropertiesScan
Expand All @@ -12,14 +10,7 @@ import org.springframework.boot.runApplication
*/
@SpringBootApplication(exclude = [ElasticsearchDataAutoConfiguration::class])
@ConfigurationPropertiesScan
class PillarboxDataTransferApplication(
private val setupService: SetupService,
) {
@PostConstruct
fun init() {
setupService.start()
}
}
class PillarboxDataTransferApplication

/**
* The main function that starts the Spring Boot application.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package ch.srgssr.pillarbox.monitoring.event

import ch.srgssr.pillarbox.monitoring.TerminationService
import ch.srgssr.pillarbox.monitoring.concurrent.LockManager
import ch.srgssr.pillarbox.monitoring.event.config.SseClientConfigurationProperties
import ch.srgssr.pillarbox.monitoring.event.model.EventRequest
Expand All @@ -15,18 +14,17 @@ import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.bodyToFlux

/**
* Service responsible for managing a Server-Sent Events (SSE) client. The client connects to an SSE endpoint,
* handles incoming events, and manages retry behavior in case of connection failures.
* Service responsible for managing a Server-Sent Events (SSE) connection to the event dispatcher service.
* It handles incoming events, and manages retry behavior in case of connection failures.
*
* @property eventService The service used to handle incoming events.
* @property properties The SSE client configuration containing the URI and retry settings.
* @property terminationService The service responsible for terminating the application in case of critical failures.
* @property lockManager The session based lock manager.
*/
@Service
class SseClient(
class EventDispatcherClient(
private val eventService: EventService,
private val properties: SseClientConfigurationProperties,
private val terminationService: TerminationService,
private val lockManager: LockManager,
) {
private companion object {
Expand All @@ -40,7 +38,7 @@ class SseClient(
* Starts the SSE client, connecting to the configured SSE endpoint. It handles incoming events by
* delegating to the appropriate event handling methods and manages retries in case of connection failures.
*/
fun start() {
fun start() =
WebClient
.create(properties.uri)
.get()
Expand All @@ -57,20 +55,12 @@ class SseClient(
retrySignal.failure(),
)
},
).subscribe(
{ CoroutineScope(Dispatchers.IO).launch { handleEvent(it) } },
{ CoroutineScope(Dispatchers.IO).launch { terminateApplication(it) } },
)
}

private fun terminateApplication(error: Throwable) {
if (error is RetryExhaustedException) {
logger.error("Failed to connect after retries, exiting application.", error)
terminationService.terminateApplication()
} else {
logger.error("An error occurred while processing the event.", error)
}
}
).doOnNext { CoroutineScope(Dispatchers.IO).launch { handleEvent(it) } }
.doOnError { error ->
if (error !is RetryExhaustedException) {
logger.error("An error occurred while processing the event.", error)
}
}

private suspend fun handleEvent(eventRequest: EventRequest) {
lockManager[eventRequest.sessionId].withLock {
Expand Down
147 changes: 0 additions & 147 deletions src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/SetupService.kt

This file was deleted.

Loading

0 comments on commit d655bc3

Please sign in to comment.