diff --git a/docker/postgres/init.sql b/docker/postgres/init.sql index eb2440c..b83c800 100644 --- a/docker/postgres/init.sql +++ b/docker/postgres/init.sql @@ -86,6 +86,8 @@ CREATE TABLE IF NOT EXISTS node_configurations -- Configuration (MQTT) mqtt_encryption_enabled BOOLEAN DEFAULT FALSE, mqtt_json_enabled BOOLEAN DEFAULT FALSE, + mqtt_json_message_timestamp TIMESTAMP DEFAULT NOW(), + mqtt_configured_root_topic TEXT DEFAULT '', mqtt_info_last_timestamp TIMESTAMP DEFAULT NOW(), diff --git a/exporter/metric/node_configuration_metrics.py b/exporter/metric/node_configuration_metrics.py index 001ee03..36fb613 100644 --- a/exporter/metric/node_configuration_metrics.py +++ b/exporter/metric/node_configuration_metrics.py @@ -187,11 +187,28 @@ class NodeConfigurationMetrics(metaclass=Singleton): self.db.execute_db_operation(db_operation) def process_mqtt_update(self, node_id: str, mqtt_encryption_enabled=None, mqtt_json_enabled=None, - mqtt_configured_root_topic=None): + mqtt_configured_root_topic=None, is_json_message=False): if not self.report: return def db_operation(cur, conn): + # Update the last MQTT message timestamp for every message + cur.execute(""" + UPDATE node_configurations + SET mqtt_info_last_timestamp = NOW() + WHERE node_id = %s + """, (node_id,)) + + # If it's a JSON message, update the JSON message timestamp + if is_json_message: + cur.execute(""" + UPDATE node_configurations + SET mqtt_json_message_timestamp = NOW(), + mqtt_json_enabled = TRUE + WHERE node_id = %s + """, (node_id,)) + + # Perform the main update cur.execute(""" INSERT INTO node_configurations ( node_id, @@ -203,9 +220,14 @@ class NodeConfigurationMetrics(metaclass=Singleton): ON CONFLICT(node_id) DO UPDATE SET mqtt_encryption_enabled = COALESCE(EXCLUDED.mqtt_encryption_enabled, node_configurations.mqtt_encryption_enabled), - mqtt_json_enabled = COALESCE(EXCLUDED.mqtt_json_enabled, node_configurations.mqtt_json_enabled), + mqtt_json_enabled = CASE + WHEN (node_configurations.mqtt_info_last_timestamp - node_configurations.mqtt_json_message_timestamp) > INTERVAL '1 hour' + THEN FALSE + ELSE COALESCE(EXCLUDED.mqtt_json_enabled, node_configurations.mqtt_json_enabled) + END, mqtt_configured_root_topic = COALESCE(EXCLUDED.mqtt_configured_root_topic, node_configurations.mqtt_configured_root_topic), mqtt_info_last_timestamp = NOW() + RETURNING mqtt_json_enabled """, (node_id, mqtt_encryption_enabled, mqtt_json_enabled, mqtt_configured_root_topic)) conn.commit() diff --git a/exporter/processor/processor_base.py b/exporter/processor/processor_base.py index 65358e1..7f53d02 100644 --- a/exporter/processor/processor_base.py +++ b/exporter/processor/processor_base.py @@ -153,6 +153,7 @@ class MessageProcessor: gateway_node_id = str(int(json_packet['sender'][1:], 16)) NodeConfigurationMetrics().process_mqtt_update( node_id=gateway_node_id, + mqtt_json_enabled=True, mqtt_encryption_enabled=json_packet.get('encrypted', False), mqtt_configured_root_topic=topic )