You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I want that each time the inner function in foreachL crashes with exception, the offset to stay the same. It appears it doesn't work like this, changing the consumer group to something else will throw an error first time I run this code(with just one message in topic topic), as expected, but second time will work because foreachL is not called anymore since the record is already consumed.
What am I doing wrong?
Thank You,
object Main extends App {
private val consumerCfg = KafkaConsumerConfig.default.copy(
bootstrapServers = List("localhost:9092"),
groupId = "foo-7",
enableAutoCommit = false,
autoOffsetReset = AutoOffsetReset.Earliest,
observableCommitOrder = ObservableCommitOrder.AfterAck,
observableCommitType = ObservableCommitType.Sync
)
val f = KafkaConsumerObservable[String, String](consumerCfg, List("topic"))
.timeoutOnSlowUpstream(5.seconds)
.foreachL { _ ⇒ throw new Exception("crash") }
.runAsync(Scheduler.io())
Await.result(f, Duration.Inf)
}
The text was updated successfully, but these errors were encountered:
Hi,
I want that each time the inner function in
foreachL
crashes with exception, the offset to stay the same. It appears it doesn't work like this, changing the consumer group to something else will throw an error first time I run this code(with just one message in topictopic
), as expected, but second time will work becauseforeachL
is not called anymore since the record is alreadyconsumed
.What am I doing wrong?
Thank You,
The text was updated successfully, but these errors were encountered: