Skip to content

Commit

Permalink
Added more diagnostics (#271, #290)
Browse files Browse the repository at this point in the history
  • Loading branch information
climategadgets committed Oct 9, 2023
1 parent 7e03c30 commit 1bb1757
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,18 @@ protected final void initFluxes(Flux<Signal<Double, Void>> ambientFlux) {
// Let the transmission layer figure out the dupes, they have a better idea about what to do with them
.flatMap(demandDevice::setState)

.doOnError(t -> logger.error("{}: errored out", getAddress(), t))
.doOnComplete(() -> logger.debug("{}: completed", getAddress()))

// VT: NOTE: Careful when testing, this will consume everything thrown at it immediately
.subscribe();

ambientFlux
.doOnNext(this::recordAmbient)

.doOnError(t -> logger.error("{}: errored out", getAddress(), t))
.doOnComplete(() -> logger.debug("{}: completed", getAddress()))

// VT: NOTE: Careful when testing, this will consume everything thrown at it immediately
.subscribe();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private synchronized void guard() {

private void generateTimeoutSignal(Instant now) {

logger.warn("{}: timeout of {} is exceeded, inTimeout={}, repeat={}", marker, timeout, inTimeout, repeat);
logger.info("{}: timeout of {} is exceeded, inTimeout={}, repeat={}", marker, timeout, inTimeout, repeat);

timeoutFluxSink.next(new Signal<>(
now,
Expand All @@ -119,10 +119,13 @@ public Flux<Signal<T, P>> compute(Flux<Signal<T, P>> in) {
var actual = in
.doOnNext(s -> touch(s.timestamp))
.doOnNext(ignored -> inTimeout = false)
.doOnError(t -> logger.error("{}: errored out", marker, t))
.doOnComplete(this::close);

return Flux.merge(actual, timeoutFlux)
.doOnNext(s -> logger.trace("{}: compute={}", marker, s));
.doOnNext(s -> logger.trace("{}: compute={}", marker, s))
.doOnError(t -> logger.error("{}: errored out", marker, t))
.doOnComplete(() -> logger.debug("{}: completed", marker));
}

private synchronized void touch(Instant timestamp) {
Expand Down

0 comments on commit 1bb1757

Please sign in to comment.