Bibliotek for enkelt å kunne lage mikrotjenester som bruker konseptet rapids and rivers til @fredgeorge. For mer info kan man se denne videoen https://vimeo.com/79866979
- Alle publiserer på rapid. Kan lese fra flere topics, men publiserer kun på rapid-topic
- Rivers filtrerer meldinger etter hvilke kriterier de har
isalive
er true så snart rapids connection er startetisready
er true så snartonStartup
-lytterne er ferdige. KafkaRapid vil ikke begynne å polle meldinger før etter onStartup-lytterne er ferdige, og vil dermed ikke bli assignet partisjoner av brokerne.- Rivers vil kun få packets i
onPacket
nårMessageProblems
er fri for feilmeldinger (errors og severe) - Rivers kan bruke
require*()
-funksjoner for å akkumulere errors i etMessageProblems
-objekt som sendes tilonError
- Rivers kan bruke
demand*()
-funksjoner for å stoppe parsing ved feil. Exception sendes tilonSevere
Man kan bruke en kombinasjon av demand*()
og require*()
. For eksempel om alle meldingene har et @event_name
, så kan man bruke
demandValue("@event_name", "my_event")
for å avbryte parsing når event-navnet ikke er som forventet. Dersom man har alle andre former
for validering med require*()
, så kan man f.eks. logge innholdet i pakken i onError
i lag med en feilmelding som sier noe sånn som klarte ikke å parse my_event
.
Dersom man ikke benytter seg av demand*()
så er det umulig å vite i onError()
hvorvidt @event_name
var forventet verdi eller ikke, og logging vil dermed ende opp med å spamme
med alle meldinger på rapiden som riveren ikke forstår.
- Kjør migreringer i
onStartup
- Bruk rollout strategy
Recreate
. Ellers vil du ha én pod som leser meldinger og skriver til db, mens den andre holder på med migreringer
- Samme kjøreregler som over, bare at du vil få nedetid på api-et
- Rest-api-delen av appen bør skilles ut som egen app som har readonly-connection mot databasen. Dersom migreringene er bakover-kompatible så kan man unngå nedetid, og man kan migrere en "live" database
- Tut og kjør. Rollout strategy
RollingUpdate
vil fungere helt utmerket
fun main() {
val env = System.getenv()
val dataSourceBuilder = DataSourceBuilder(env)
val dataSource = dataSourceBuilder.getDataSource()
RapidApplication.create(env).apply {
MyCoolApp(this, MyDao(dataSource))
}.apply {
register(object : RapidsConnection.StatusListener {
override fun onStartup(rapidsConnection: RapidsConnection) {
// migrate database before consuming messages, but after rapids have started (and isalive returns OK)
dataSourceBuilder.migrate()
}
})
}.start()
}
internal class MyCoolApp(
rapidsConnection: RapidsConnection,
private val myDao: MyDao
) : River.PacketListener {
init {
River(rapidsConnection).apply {
precondition { it.requireValue("@event_name", "my_event") }
validate { it.requireKey("a_required_key") }
// nested objects can be chained using "."
validate { it.requireValue("nested.key", "works_as_well") }
}.register(this)
}
override fun onError(problems: MessageProblems, context: MessageContext, metadata: MessageMetadata) {
/* fordi vi bruker precondition() på event_name kan vi trygt anta at meldingen
er "my_event", og at det er minst én av de ulike validate() som har feilet */
}
override fun onPacket(packet: JsonMessage, context: MessageContext, metadata: MessageMetadata, meterRegistry: MeterRegistry) {
println(packet["a_required_key"].asText())
// nested objects can be chained using "."
println(packet["nested.key"].asText())
}
}
- Servicebruker mountes inn på
/var/run/secrets/nais.io/service_user
- Bootstrap servers angis ved miljøvariabel
KAFKA_BOOTSTRAP_SERVERS
- Consumer group angis med miljøvariabel
KAFKA_CONSUMER_GROUP_ID
- Rapid topic angis med miljøvariabel
KAFKA_RAPID_TOPIC
- Rivers angis med miljøvariabel
KAFKA_EXTRA_TOPIC
(Kommaseparert liste hvis flere rivers.) - For å bruke SSL-autentisering (Aiven) må man angi miljøvariablene
KAFKA_KEYSTORE_PATH
ogKAFKA_KEYSTORE_PASSWORD
Rapids-biblioteket bundler egen logback.xml
så det trengs ikke spesifiseres i mikrotjenestene.
Den bundlede logback.xml
har konfigurasjon for secureLogs (men husk å enable secureLogs i nais.yaml!), tilgjengelig med:
LoggerFactory.getLogger("tjenestekall")
Alle commits på main
gren vil lage en Github release og bygge en ny artifakt mot Jitpack.
Versjonen vil har formatet:
YYYYmmDDMMss.<git sha>
For å "skippe" en release kan en legge til melding [ci skip]
på git commit melding.
Spørsmål knyttet til koden eller prosjektet kan stilles som issues her på GitHub.
Interne henvendelser kan sendes via Slack i kanalen #rapids-and-rivers.