Skip to content

Commit

Permalink
Merge branch 'gh271-heatpump' into gh47 (#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
climategadgets committed Sep 8, 2023
2 parents 6b5b5b7 + a79e047 commit 5661b57
Show file tree
Hide file tree
Showing 19 changed files with 963 additions and 647 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package net.sf.dz3r.device.actuator;

import net.sf.dz3r.device.esphome.v1.ESPHomeSwitch;
import net.sf.dz3r.model.HvacMode;
import net.sf.dz3r.signal.Signal;
import net.sf.dz3r.signal.hvac.HvacCommand;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
import reactor.core.publisher.Flux;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;

import static org.assertj.core.api.Assertions.assertThatCode;

/**
* Test MQTT switch race conditions when controlled by a heat pump.
*
* VT: NOTE: Careful, this works on real hardware, disconnect before testing.
*/
@EnabledIfEnvironmentVariable(
named = "TEST_DZ_ESPHOME_HEATPUMP",
matches = "safe",
disabledReason = "Only execute this test if a suitable MQTT broker and ESPHome switch device are available"
)class ESPHomeHeatPumpTest {

private final Logger logger = LogManager.getLogger();

private final String MQTT_BROKER = "mqtt-esphome";

private final String ESPHOME_SWITCH_TOPIC_ROOT = "/esphome/0156AC/switch/";

@Test
void cycle() throws IOException {

var switchFan = new ESPHomeSwitch(MQTT_BROKER, ESPHOME_SWITCH_TOPIC_ROOT + "t-relay-2-r0-fan");
var switchRunning = new ESPHomeSwitch(MQTT_BROKER, ESPHOME_SWITCH_TOPIC_ROOT + "t-relay-2-r1-condenser");
var switchMode = new ESPHomeSwitch(MQTT_BROKER, ESPHOME_SWITCH_TOPIC_ROOT + "t-relay-2-r2-mode");
var delay = Duration.ofSeconds(1);

// This should throw a wrench into everything...
Flux
.just(true, true, true)
.delayElements(delay)
.map(switchMode::setState)
.blockLast();

logger.info("set mode to 1");

try (var hp = new HeatPump(
"heatpump",
switchMode, true,
switchRunning, false,
switchFan, false,
Duration.ofSeconds(3))) {

var commands = Flux
.just(
new HvacCommand(HvacMode.COOLING, 0d, 0d),
new HvacCommand(null, 1d, 1d),
new HvacCommand(HvacMode.HEATING, 0d, 0d),
new HvacCommand(null, 1d, 1d),
new HvacCommand(null, 0d, 0d))
.delayElements(delay)
.doOnNext(command -> logger.info("command: {}", command))
.map(c -> new Signal<HvacCommand, Void>(Instant.now(), c));

assertThatCode(() -> {
hp
.compute(commands)
.doOnNext(s -> logger.info("output: {}", s))
.doOnNext(s -> {
if (s.isError()) {
throw new IllegalStateException("Failure output received: " + s);
}
})
.blockLast();

logger.info("done");
})
.doesNotThrowAnyException();

// Should close() here automatically
}
}
}
13 changes: 13 additions & 0 deletions dz3r-bootstrap/src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<Console name="CONSOLE" target="SYSTEM_OUT">
<PatternLayout pattern="%highlight{%d{HH:mm:ss,SSS} %level %class{1} %t %NDC %message%n}"/>
</Console>
</Appenders>
<Loggers>
<Root level="DEBUG">
<AppenderRef ref="CONSOLE"/>
</Root>
</Loggers>
</Configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ public void connect(UnitDirector.Feed feed) {
// Zones and zone controller have no business knowing about HVAC mode; inject it
var modeFlux = feed.hvacDeviceFlux
.doOnNext(s -> {
if (s.getValue().requested.mode == null) {
if (s.getValue().command.mode == null) {
logger.debug("null hvacMode (normal on startup): {}", s);
}
})
.filter(s -> s.getValue().requested.mode != null)
.map(s -> new Signal<HvacMode, Void>(s.timestamp, s.getValue().requested.mode, null, s.status, s.error));
.filter(s -> s.getValue().command.mode != null)
.map(s -> new Signal<HvacMode, Void>(s.timestamp, s.getValue().command.mode, null, s.status, s.error));
zoneRenderer.subscribeMode(modeFlux);

// Zone ("thermostat" in its terminology) status feed is the only one supported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ private Point convert(Signal<HvacDeviceStatus, Void> signal) {

if (status != null) {

b.tag("kind", status.kind.toString());
b.addField("demand", status.requested.demand);
b.addField("fanSpeed", status.requested.fanSpeed);
b.addField("demand", status.command.demand);
b.addField("fanSpeed", status.command.fanSpeed);
b.addField("uptimeMillis", Optional.ofNullable(status.uptime).map(Duration::toMillis).orElse(0L));
Optional.ofNullable(status.requested.mode).ifPresent(m -> b.tag("mode", m.toString()));
Optional.ofNullable(status.command.mode).ifPresent(m -> b.tag("mode", m.toString()));
}

if (signal.error != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import net.sf.dz3r.signal.hvac.HvacDeviceStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.io.IOException;
import java.time.Clock;
Expand All @@ -25,7 +25,8 @@ public abstract class AbstractHvacDevice implements HvacDevice {

private final String name;

private Flux<Signal<HvacDeviceStatus, Void>> statusFlux;
private final Sinks.Many<Signal<HvacDeviceStatus, Void>> statusSink;
private final Flux<Signal<HvacDeviceStatus, Void>> statusFlux;

/**
* The moment this device turned on, {@code null} if currently off.
Expand All @@ -41,6 +42,9 @@ protected AbstractHvacDevice(String name) {
protected AbstractHvacDevice(Clock clock, String name) {
this.clock = clock;
this.name = name;

statusSink = Sinks.many().multicast().onBackpressureBuffer();
statusFlux = statusSink.asFlux();
}

@Override
Expand All @@ -55,48 +59,13 @@ protected void check(Switch<?> s, String purpose) {
}

@Override
public final synchronized Flux<Signal<HvacDeviceStatus, Void>> getFlux() {

// VT: NOTE: This whole synchronized thing must be eliminated. + bucket list.

ThreadContext.push("getFlux#" + Integer.toHexString(hashCode()));

try {
logger.debug("getFlux(): name={} waiting...", getAddress());

while (statusFlux == null) {
try {
wait();
logger.debug("getFlux(): name={} flux={}", getAddress(), statusFlux);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException("This shouldn't have happened", ex);
}
}

logger.debug("getFlux(): name={} DONE", getAddress());

// VT: NOTE: Be careful when refactoring this, need correct sharing option here
return statusFlux;

} finally {
ThreadContext.pop();
}
public final Flux<Signal<HvacDeviceStatus, Void>> getFlux() {
return statusFlux;
}

protected final synchronized Flux<Signal<HvacDeviceStatus, Void>> setFlux(Flux<Signal<HvacDeviceStatus, Void>> source) {

// VT: NOTE: This whole synchronized thing must be eliminated. + bucket list.

ThreadContext.push("getFlux#" + Integer.toHexString(hashCode()));
try {
statusFlux = source;
notifyAll();
logger.debug("setFlux(): name={} notified", getAddress());
return source;
} finally {
ThreadContext.pop();
}
protected final void broadcast(Signal<HvacDeviceStatus, Void> signal) {
logger.debug("{}: broadcast: {}", getAddress(), signal);
statusSink.tryEmitNext(signal);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.NonNull;
Expand Down Expand Up @@ -36,15 +36,15 @@ public abstract class AbstractSwitch<A extends Comparable<A>> implements Switch<
* Minimum delay between two subsequent calls to {@link #setStateSync(boolean)} if the value hasn't changed.
* See <a href="https://github.com/home-climate-control/dz/issues/253">Pipeline overrun with slow actuators</a>.
*/
private final Duration minDelay;
private final Duration pace;

/**
* Clock to use. Doesn't make sense to use a non-default clock other than for testing purposes.
*/
private final Clock clock;

private Sinks.Many<Signal<State, String>> stateSink;
private Flux<Signal<State, String>> stateFlux;
private FluxSink<Signal<State, String>> stateSink;
private Boolean lastKnownState;

/**
Expand All @@ -59,27 +59,30 @@ public abstract class AbstractSwitch<A extends Comparable<A>> implements Switch<
* @param address Switch address.
*/
protected AbstractSwitch(@NonNull A address) {
this(address, null, null, null);
this(address, Schedulers.newSingle("switch:" + address, true), null, null);
}

/**
* Create an instance with a given scheduler.
*
* @param address Switch address.
* @param scheduler Scheduler to use. {@code null} means using {@link Schedulers#newSingle(String, boolean)}.
* @param minDelay Minimum delay between sending identical commands to hardware.
* @param pace Issue identical control commands to this switch at most this often.
* @param clock Clock to use. Pass {@code null} except when testing.
*/
protected AbstractSwitch(@NonNull A address, @Nullable Scheduler scheduler, @Nullable Duration minDelay, @Nullable Clock clock) {
protected AbstractSwitch(@NonNull A address, @Nullable Scheduler scheduler, @Nullable Duration pace, @Nullable Clock clock) {

// VT: NOTE: @NonNull seems to have no effect, what enforces it?
this.address = HCCObjects.requireNonNull(address,"address can't be null");

this.scheduler = scheduler == null ? Schedulers.newSingle("switch:" + address, true) : scheduler;
this.minDelay = minDelay;
this.scheduler = scheduler;
this.pace = pace;
this.clock = clock == null ? Clock.systemUTC() : clock;

logger.info("{}: created AbstractSwitch({}) with minDelay={}", Integer.toHexString(hashCode()), getAddress(), minDelay);
stateSink = Sinks.many().multicast().onBackpressureBuffer();
stateFlux = stateSink.asFlux();

logger.info("{}: created AbstractSwitch({}) with pace={}", Integer.toHexString(hashCode()), getAddress(), pace);
}

@Override
Expand All @@ -89,27 +92,9 @@ public final A getAddress() {

@Override
public final synchronized Flux<Signal<State, String>> getFlux() {

if (stateFlux != null) {
return stateFlux;
}

logger.debug("{}: creating stateFlux:{}", Integer.toHexString(hashCode()), address);

stateFlux = Flux
.create(this::connect)
.doOnSubscribe(s -> logger.debug("stateFlux:{} subscribed", address))
.publishOn(Schedulers.boundedElastic())
.publish()
.autoConnect();

return stateFlux;
}

private void connect(FluxSink<Signal<State, String>> sink) {
this.stateSink = sink;
}

@Override
public final Mono<Boolean> setState(boolean state) {

Expand Down Expand Up @@ -142,7 +127,7 @@ private Boolean limitRate(boolean state) {

try {

if (minDelay == null) {
if (pace == null) {
return null;
}

Expand All @@ -156,8 +141,8 @@ private Boolean limitRate(boolean state) {

var delay = Duration.between(lastSetAt, clock.instant());

if (delay.compareTo(minDelay) < 0) {
logger.debug("{}: skipping setState({}), too close ({} of {})", Integer.toHexString(hashCode()), state, delay, minDelay);
if (delay.compareTo(pace) < 0) {
logger.debug("{}: skipping setState({}), too close ({} of {})", Integer.toHexString(hashCode()), state, delay, pace);
return lastSetState;
}

Expand All @@ -171,17 +156,7 @@ private Boolean limitRate(boolean state) {
}

private void reportState(Signal<State, String> signal) {

if (stateSink == null) {

// Unless something subscribes, this will be flooding the log - enable for troubleshooting
// logger.warn("stateSink:{} is still null, skipping: {}", address, signal); // NOSONAR

getFlux();
return;
}

stateSink.next(signal);
stateSink.tryEmitNext(signal);
}

@Override
Expand All @@ -202,8 +177,12 @@ public Mono<Boolean> getState() {
} finally {
ThreadContext.pop();
}
})
.subscribeOn(scheduler);
});

// VT: NOTE: Having this here breaks stuff, but now that it's gone,
// need a thorough review because likely something else is now broken.
// More: https://github.com/home-climate-control/dz/issues/271
// .subscribeOn(scheduler);
}

protected Scheduler getScheduler() {
Expand Down
Loading

0 comments on commit 5661b57

Please sign in to comment.