Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Support multiple MQTT availability topics (#37418)
* Support multiple MQTT availability topics

* Make availability list and availability_topic exclusive

* Make availability list and availability_topic exclusive

* Add missing abbreviation
  • Loading branch information
emontnemery authored and balloob committed Jul 14, 2020
1 parent 0d58048 commit 5d26f5d
Show file tree
Hide file tree
Showing 4 changed files with 260 additions and 18 deletions.
79 changes: 61 additions & 18 deletions homeassistant/components/mqtt/__init__.py
Expand Up @@ -88,6 +88,8 @@
CONF_TLS_VERSION = "tls_version"

CONF_COMMAND_TOPIC = "command_topic"
CONF_TOPIC = "topic"
CONF_AVAILABILITY = "availability"
CONF_AVAILABILITY_TOPIC = "availability_topic"
CONF_PAYLOAD_AVAILABLE = "payload_available"
CONF_PAYLOAD_NOT_AVAILABLE = "payload_not_available"
Expand Down Expand Up @@ -203,9 +205,9 @@ def embedded_broker_deprecated(value):

SCHEMA_BASE = {vol.Optional(CONF_QOS, default=DEFAULT_QOS): _VALID_QOS_SCHEMA}

MQTT_AVAILABILITY_SCHEMA = vol.Schema(
MQTT_AVAILABILITY_SINGLE_SCHEMA = vol.Schema(
{
vol.Optional(CONF_AVAILABILITY_TOPIC): valid_subscribe_topic,
vol.Exclusive(CONF_AVAILABILITY_TOPIC, "availability"): valid_subscribe_topic,
vol.Optional(
CONF_PAYLOAD_AVAILABLE, default=DEFAULT_PAYLOAD_AVAILABLE
): cv.string,
Expand All @@ -215,6 +217,30 @@ def embedded_broker_deprecated(value):
}
)

MQTT_AVAILABILITY_LIST_SCHEMA = vol.Schema(
{
vol.Exclusive(CONF_AVAILABILITY, "availability"): vol.All(
cv.ensure_list,
[
{
vol.Optional(CONF_TOPIC): valid_subscribe_topic,
vol.Optional(
CONF_PAYLOAD_AVAILABLE, default=DEFAULT_PAYLOAD_AVAILABLE
): cv.string,
vol.Optional(
CONF_PAYLOAD_NOT_AVAILABLE,
default=DEFAULT_PAYLOAD_NOT_AVAILABLE,
): cv.string,
}
],
),
}
)

MQTT_AVAILABILITY_SCHEMA = MQTT_AVAILABILITY_SINGLE_SCHEMA.extend(
MQTT_AVAILABILITY_LIST_SCHEMA.schema
)

MQTT_ENTITY_DEVICE_INFO_SCHEMA = vol.All(
cv.deprecated(CONF_DEPRECATED_VIA_HUB, CONF_VIA_DEVICE),
vol.Schema(
Expand Down Expand Up @@ -989,8 +1015,7 @@ def __init__(self, config: dict) -> None:
"""Initialize the availability mixin."""
self._availability_sub_state = None
self._available = False

self._avail_config = config
self._availability_setup_from_config(config)

async def async_added_to_hass(self) -> None:
"""Subscribe MQTT events."""
Expand All @@ -1004,33 +1029,52 @@ async def async_added_to_hass(self) -> None:

async def availability_discovery_update(self, config: dict):
"""Handle updated discovery message."""
self._avail_config = config
self._availability_setup_from_config(config)
await self._availability_subscribe_topics()

def _availability_setup_from_config(self, config):
"""(Re)Setup."""
self._avail_topics = {}
if CONF_AVAILABILITY_TOPIC in config:
self._avail_topics[config[CONF_AVAILABILITY_TOPIC]] = {
CONF_PAYLOAD_AVAILABLE: config[CONF_PAYLOAD_AVAILABLE],
CONF_PAYLOAD_NOT_AVAILABLE: config[CONF_PAYLOAD_NOT_AVAILABLE],
}

if CONF_AVAILABILITY in config:
for avail in config[CONF_AVAILABILITY]:
self._avail_topics[avail[CONF_TOPIC]] = {
CONF_PAYLOAD_AVAILABLE: avail[CONF_PAYLOAD_AVAILABLE],
CONF_PAYLOAD_NOT_AVAILABLE: avail[CONF_PAYLOAD_NOT_AVAILABLE],
}

self._avail_config = config

async def _availability_subscribe_topics(self):
"""(Re)Subscribe to topics."""

@callback
@log_messages(self.hass, self.entity_id)
def availability_message_received(msg: Message) -> None:
"""Handle a new received MQTT availability message."""
if msg.payload == self._avail_config[CONF_PAYLOAD_AVAILABLE]:
topic = msg.topic
if msg.payload == self._avail_topics[topic][CONF_PAYLOAD_AVAILABLE]:
self._available = True
elif msg.payload == self._avail_config[CONF_PAYLOAD_NOT_AVAILABLE]:
elif msg.payload == self._avail_topics[topic][CONF_PAYLOAD_NOT_AVAILABLE]:
self._available = False

self.async_write_ha_state()

topics = {}
for topic in self._avail_topics:
topics[f"availability_{topic}"] = {
"topic": topic,
"msg_callback": availability_message_received,
"qos": self._avail_config[CONF_QOS],
}

self._availability_sub_state = await async_subscribe_topics(
self.hass,
self._availability_sub_state,
{
"availability_topic": {
"topic": self._avail_config.get(CONF_AVAILABILITY_TOPIC),
"msg_callback": availability_message_received,
"qos": self._avail_config[CONF_QOS],
}
},
self.hass, self._availability_sub_state, topics,
)

@callback
Expand All @@ -1048,10 +1092,9 @@ async def async_will_remove_from_hass(self):
@property
def available(self) -> bool:
"""Return if the device is available."""
availability_topic = self._avail_config.get(CONF_AVAILABILITY_TOPIC)
if not self.hass.data[DATA_MQTT].connected:
return False
return availability_topic is None or self._available
return not self._avail_topics or self._available


async def cleanup_device_registry(hass, device_id):
Expand Down
1 change: 1 addition & 0 deletions homeassistant/components/mqtt/abbreviations.py
Expand Up @@ -7,6 +7,7 @@
"aux_cmd_t": "aux_command_topic",
"aux_stat_tpl": "aux_state_template",
"aux_stat_t": "aux_state_topic",
"avty": "availability",
"avty_t": "availability_topic",
"away_mode_cmd_t": "away_mode_command_topic",
"away_mode_stat_tpl": "away_mode_state_template",
Expand Down
174 changes: 174 additions & 0 deletions tests/components/mqtt/test_common.py
Expand Up @@ -104,6 +104,98 @@ async def help_test_default_availability_payload(
assert state.state != STATE_UNAVAILABLE


async def help_test_default_availability_list_payload(
hass,
mqtt_mock,
domain,
config,
no_assumed_state=False,
state_topic=None,
state_message=None,
):
"""Test availability by default payload with defined topic.
This is a test helper for the MqttAvailability mixin.
"""
# Add availability settings to config
config = copy.deepcopy(config)
config[domain]["availability"] = [
{"topic": "availability-topic1"},
{"topic": "availability-topic2"},
]
assert await async_setup_component(hass, domain, config,)
await hass.async_block_till_done()

state = hass.states.get(f"{domain}.test")
assert state.state == STATE_UNAVAILABLE

async_fire_mqtt_message(hass, "availability-topic1", "online")

state = hass.states.get(f"{domain}.test")
assert state.state != STATE_UNAVAILABLE
if no_assumed_state:
assert not state.attributes.get(ATTR_ASSUMED_STATE)

async_fire_mqtt_message(hass, "availability-topic1", "offline")

state = hass.states.get(f"{domain}.test")
assert state.state == STATE_UNAVAILABLE

async_fire_mqtt_message(hass, "availability-topic2", "online")

state = hass.states.get(f"{domain}.test")
assert state.state != STATE_UNAVAILABLE
if no_assumed_state:
assert not state.attributes.get(ATTR_ASSUMED_STATE)

async_fire_mqtt_message(hass, "availability-topic2", "offline")

state = hass.states.get(f"{domain}.test")
assert state.state == STATE_UNAVAILABLE

if state_topic:
async_fire_mqtt_message(hass, state_topic, state_message)

state = hass.states.get(f"{domain}.test")
assert state.state == STATE_UNAVAILABLE

async_fire_mqtt_message(hass, "availability-topic1", "online")

state = hass.states.get(f"{domain}.test")
assert state.state != STATE_UNAVAILABLE


async def help_test_default_availability_list_single(
hass,
mqtt_mock,
caplog,
domain,
config,
no_assumed_state=False,
state_topic=None,
state_message=None,
):
"""Test availability list and availability_topic are mutually exclusive.
This is a test helper for the MqttAvailability mixin.
"""
# Add availability settings to config
config = copy.deepcopy(config)
config[domain]["availability"] = [
{"topic": "availability-topic1"},
]
config[domain]["availability_topic"] = "availability-topic"
assert await async_setup_component(hass, domain, config,)
await hass.async_block_till_done()

state = hass.states.get(f"{domain}.test")
assert state is None
assert (
"Invalid config for [sensor.mqtt]: two or more values in the same group of exclusion 'availability'"
in caplog.text
)


async def help_test_custom_availability_payload(
hass,
mqtt_mock,
Expand Down Expand Up @@ -152,6 +244,88 @@ async def help_test_custom_availability_payload(
assert state.state != STATE_UNAVAILABLE


async def help_test_discovery_update_availability(
hass,
mqtt_mock,
domain,
config,
no_assumed_state=False,
state_topic=None,
state_message=None,
):
"""Test update of discovered MQTTAvailability.
This is a test helper for the MQTTAvailability mixin.
"""
# Add availability settings to config
config1 = copy.deepcopy(config)
config1[domain]["availability_topic"] = "availability-topic1"
config2 = copy.deepcopy(config)
config2[domain]["availability"] = [
{"topic": "availability-topic2"},
{"topic": "availability-topic3"},
]
config3 = copy.deepcopy(config)
config3[domain]["availability_topic"] = "availability-topic4"
data1 = json.dumps(config1[domain])
data2 = json.dumps(config2[domain])
data3 = json.dumps(config3[domain])

entry = hass.config_entries.async_entries(mqtt.DOMAIN)[0]
await async_start(hass, "homeassistant", entry)
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data1)
await hass.async_block_till_done()

state = hass.states.get(f"{domain}.test")
assert state.state == STATE_UNAVAILABLE

async_fire_mqtt_message(hass, "availability-topic1", "online")
state = hass.states.get(f"{domain}.test")
assert state.state != STATE_UNAVAILABLE

async_fire_mqtt_message(hass, "availability-topic1", "offline")
state = hass.states.get(f"{domain}.test")
assert state.state == STATE_UNAVAILABLE

# Change availability_topic
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data2)
await hass.async_block_till_done()

# Verify we are no longer subscribing to the old topic
async_fire_mqtt_message(hass, "availability-topic1", "online")
state = hass.states.get(f"{domain}.test")
assert state.state == STATE_UNAVAILABLE

# Verify we are subscribing to the new topic
async_fire_mqtt_message(hass, "availability-topic2", "online")
state = hass.states.get(f"{domain}.test")
assert state.state != STATE_UNAVAILABLE

# Verify we are subscribing to the new topic
async_fire_mqtt_message(hass, "availability-topic3", "offline")
state = hass.states.get(f"{domain}.test")
assert state.state == STATE_UNAVAILABLE

# Change availability_topic
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data3)
await hass.async_block_till_done()

# Verify we are no longer subscribing to the old topic
async_fire_mqtt_message(hass, "availability-topic2", "online")
state = hass.states.get(f"{domain}.test")
assert state.state == STATE_UNAVAILABLE

# Verify we are no longer subscribing to the old topic
async_fire_mqtt_message(hass, "availability-topic3", "online")
state = hass.states.get(f"{domain}.test")
assert state.state == STATE_UNAVAILABLE

# Verify we are subscribing to the new topic
async_fire_mqtt_message(hass, "availability-topic4", "online")
state = hass.states.get(f"{domain}.test")
assert state.state != STATE_UNAVAILABLE


async def help_test_setting_attribute_via_mqtt_json_message(
hass, mqtt_mock, domain, config
):
Expand Down
24 changes: 24 additions & 0 deletions tests/components/mqtt/test_sensor.py
Expand Up @@ -16,11 +16,14 @@
help_test_availability_when_connection_lost,
help_test_availability_without_topic,
help_test_custom_availability_payload,
help_test_default_availability_list_payload,
help_test_default_availability_list_single,
help_test_default_availability_payload,
help_test_discovery_broken,
help_test_discovery_removal,
help_test_discovery_update,
help_test_discovery_update_attr,
help_test_discovery_update_availability,
help_test_entity_debug_info,
help_test_entity_debug_info_max_messages,
help_test_entity_debug_info_message,
Expand Down Expand Up @@ -250,13 +253,34 @@ async def test_default_availability_payload(hass, mqtt_mock):
)


async def test_default_availability_list_payload(hass, mqtt_mock):
"""Test availability by default payload with defined topic."""
await help_test_default_availability_list_payload(
hass, mqtt_mock, sensor.DOMAIN, DEFAULT_CONFIG
)


async def test_default_availability_list_single(hass, mqtt_mock, caplog):
"""Test availability list and availability_topic are mutually exclusive."""
await help_test_default_availability_list_single(
hass, mqtt_mock, caplog, sensor.DOMAIN, DEFAULT_CONFIG
)


async def test_custom_availability_payload(hass, mqtt_mock):
"""Test availability by custom payload with defined topic."""
await help_test_custom_availability_payload(
hass, mqtt_mock, sensor.DOMAIN, DEFAULT_CONFIG
)


async def test_discovery_update_availability(hass, mqtt_mock):
"""Test availability discovery update."""
await help_test_discovery_update_availability(
hass, mqtt_mock, sensor.DOMAIN, DEFAULT_CONFIG
)


async def test_invalid_device_class(hass, mqtt_mock):
"""Test device_class option with invalid value."""
assert await async_setup_component(
Expand Down

0 comments on commit 5d26f5d

Please sign in to comment.