diff --git a/scripts/generate-modbus-register-doc.py b/scripts/generate-modbus-register-doc.py
index 416de6175..76478a4ea 100644
--- a/scripts/generate-modbus-register-doc.py
+++ b/scripts/generate-modbus-register-doc.py
@@ -1,122 +1,145 @@
import fileinput
import csv
+import sys
from itertools import groupby
+from collections import defaultdict
#
# This is used to build the contents of the `Modbus-Entity-Registers.md` file used in the emsesp.org documentation.
#
-# static data
-tag_to_tagtype = {
- -1: "TAG_TYPE_NONE",
- 0: "DEVICE_DATA",
- 1: "HC",
- 2: "HC",
- 3: "HC",
- 4: "HC",
- 5: "HC",
- 6: "HC",
- 7: "HC",
- 8: "HC",
- 9: "DHW",
- 10: "DHW",
- 11: "DHW",
- 12: "DHW",
- 13: "DHW",
- 14: "DHW",
- 15: "DHW",
- 16: "DHW",
- 17: "DHW",
- 18: "DHW",
- 19: "AHS",
- 20: "HS",
- 21: "HS",
- 22: "HS",
- 23: "HS",
- 24: "HS",
- 25: "HS",
- 26: "HS",
- 27: "HS",
- 28: "HS",
- 29: "HS",
- 30: "HS",
- 31: "HS",
- 32: "HS",
- 33: "HS",
- 34: "HS",
- 35: "HS"
-}
+def get_tag_type(modbus_block):
+ """Convert modbus block number to tag type using lookup."""
+ block = int(modbus_block)
-# read entities csv from stdin
+ # Handle special cases first
+ if block == -1:
+ return "TAG_TYPE_NONE"
+ if block == 0:
+ return "DEVICE_DATA"
+ if block == 19:
+ return "AHS"
-entities = []
+ # Use ranges for efficiency
+ if 1 <= block <= 8:
+ return "HC"
+ if 9 <= block <= 18:
+ return "DHW"
+ if 20 <= block <= 35:
+ return "HS"
+ if 36 <= block <= 50 or block == 52:
+ return "SRC"
-with fileinput.input() as f_input:
- entities_reader = csv.reader(f_input, delimiter=',', quotechar='"')
- headers = next(entities_reader)
-
- for row in entities_reader:
- entity = {}
- for i, val in enumerate(row):
- entity[headers[i]] = val
- entities.append(entity)
+ # Default fallback
+ return "UNKNOWN"
-def device_name_key(e): return e["device name"]
+def read_entities():
+ """Read and parse CSV entities from stdin with error handling."""
+ entities = []
+
+ try:
+ with fileinput.input() as f_input:
+ entities_reader = csv.reader(f_input, delimiter=',', quotechar='"')
+ headers = next(entities_reader)
+
+ # Validate required headers
+ required_headers = {'device name', 'device type', 'shortname', 'fullname',
+ 'type [options...] \\| (min/max)', 'uom', 'writeable',
+ 'modbus block', 'modbus offset', 'modbus count', 'modbus scale factor'}
+ missing_headers = required_headers - set(headers)
+ if missing_headers:
+ raise ValueError(
+ f"Missing required headers: {missing_headers}")
+
+ for row_num, row in enumerate(entities_reader, start=2):
+ if len(row) != len(headers):
+ print(
+ f"Warning: Row {row_num} has {len(row)} columns, expected {len(headers)}", file=sys.stderr)
+ continue
+
+ entity = dict(zip(headers, row))
+ entities.append(entity)
+
+ except Exception as e:
+ print(f"Error reading CSV data: {e}", file=sys.stderr)
+ sys.exit(1)
+
+ return entities
-def device_type_key(e): return e["device type"]
+def group_entities_by_device_type(entities):
+ """Group entities by device type efficiently using defaultdict."""
+ grouped = defaultdict(list)
+ for entity in entities:
+ grouped[entity["device type"]].append(entity)
+ return grouped
-def grouped_by(list, key): return groupby(sorted(list, key=key), key)
-
-
-# entities_by_device_type = grouped_by(entities, device_type_key)
-
-
-def printDeviceEntities(device_name, device_entities):
- print("### " + device_name)
+def print_device_entities(device_name, device_entities):
+ """Print device entities table using f-strings for better performance."""
+ print(f"### {device_name}")
print("| shortname | fullname | type | uom | writeable | tag type | register offset | register count | scale factor |")
print("|-|-|-|-|-|-|-|-|-|")
- for de in device_entities:
- print("| " + de["shortname"] + " | " + de["fullname"] + " | " + de["type [options...] \\| (min/max)"] + " | " + de["uom"] + " | " + de["writeable"] +
- " | " + tag_to_tagtype[int(de["modbus block"])] + " | " + de["modbus offset"] + " | " + de["modbus count"] + " | " + de["modbus scale factor"] + " | ")
+
+ for entity in device_entities:
+ tag_type = get_tag_type(entity["modbus block"])
+ print(f"| {entity['shortname']} | {entity['fullname']} | {entity['type [options...] \\| (min/max)']} | "
+ f"{entity['uom']} | {entity['writeable']} | {tag_type} | {entity['modbus offset']} | "
+ f"{entity['modbus count']} | {entity['modbus scale factor']} |")
print()
-def printDeviceTypeDevices(device_type, devices):
- print("## Devices of type *" + device_type + "*")
- for device_name, device_entities in grouped_by(devices, device_name_key):
- printDeviceEntities(device_name, device_entities)
+def print_device_type_devices(device_type, devices):
+ """Print all devices of a specific type."""
+ print(f"## Devices of type *{device_type}*")
+
+ # Group devices by name
+ device_groups = defaultdict(list)
+ for device in devices:
+ device_groups[device["device name"]].append(device)
+
+ for device_name, device_entities in device_groups.items():
+ print_device_entities(device_name, device_entities)
-# write header
-
-print("")
-print("")
-print()
-print("# Entity/Register Mapping")
-print()
-print("!!! note")
-print()
-print(" This file has been auto-generated. Do not modify.")
-print()
-
-for device_type, devices in grouped_by(entities, device_type_key):
- printDeviceTypeDevices(device_type, devices)
-
-# def printGroupedData(groupedData):
-# for k, v in groupedData:
-# # print("Group {} {}".format(k, list(v)))
-# print(k)
+def print_header():
+ """Print the markdown document header."""
+ print("")
+ print("")
+ print()
+ print("# Entity/Register Mapping")
+ print()
+ print("!!! note")
+ print()
+ print(" This file has been auto-generated. Do not modify.")
+ print()
-# printGroupedData(grouped_entities)
+def main():
+ """Main function to process entities and generate documentation."""
+ # Read entities from stdin
+ entities = read_entities()
-# for e in entities:
-# print(e)
+ if not entities:
+ print("No entities found in input data.", file=sys.stderr)
+ sys.exit(1)
+
+ # Print header
+ print_header()
+
+ # Group entities by device type and process
+ grouped_entities = group_entities_by_device_type(entities)
+
+ # Print documentation for each device type
+ for device_type, devices in grouped_entities.items():
+ print_device_type_devices(device_type, devices)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/scripts/update_modbus_registers.py b/scripts/update_modbus_registers.py
index ded0c7260..e8ee2ceb6 100644
--- a/scripts/update_modbus_registers.py
+++ b/scripts/update_modbus_registers.py
@@ -158,15 +158,15 @@ cpp_entry_template = Template(
# read translations
listNames = {}
transre = re.compile(r'^MAKE_TRANSLATION\(([^,\s]+)\s*,\s*\"([^\"]+)\"')
-transf = open('./src/core/locale_translations.h', 'r')
-while True:
- line = transf.readline()
- if not line:
- break
- m = transre.match(line)
- if m is not None:
- listNames[m.group(2)] = m.group(1)
-transf.close()
+try:
+ with open('./src/core/locale_translations.h', 'r') as transf:
+ for line in transf:
+ m = transre.match(line)
+ if m is not None:
+ listNames[m.group(2)] = m.group(1)
+except FileNotFoundError:
+ # Handle case where file doesn't exist
+ pass
entities = []
@@ -175,10 +175,8 @@ with fileinput.input() as f_input:
headers = next(entities_reader)
for row in entities_reader:
- entity = {}
- for i, val in enumerate(row):
- # print(headers[i] + ": " + val)
- entity[headers[i]] = val
+ # Use dict comprehension for better performance
+ entity = {headers[i]: val for i, val in enumerate(row)}
entities.append(entity)
# print(json.dumps(entities, indent=" "))
@@ -234,30 +232,28 @@ for entity in entities:
for device_type_name, device_type in device_types.items():
for tag, entities in device_type.items():
- total_registers = 0
+ # Pre-calculate all register info to avoid repeated int() conversions
+ register_info = []
next_free_offset = 0
+
for entity_name, modbus_info in entities.items():
register_offset = int(modbus_info['modbus offset'])
register_count = int(modbus_info['modbus count'])
- total_registers += register_count
+ register_info.append(
+ (entity_name, modbus_info, register_offset, register_count))
+
if register_offset >= 0 and register_offset + register_count > next_free_offset:
next_free_offset = register_offset + register_count
- # print(device_type_name + "/" + tag + ": total_registers=" + str(total_registers) + "; next_free_offset=" + str(
- # next_free_offset))
-
- for entity_name, modbus_info in entities.items():
- register_offset = int(modbus_info['modbus offset'])
- register_count = int(modbus_info['modbus count'])
+ # Assign registers for unassigned entities
+ for entity_name, modbus_info, register_offset, register_count in register_info:
if register_offset < 0 and register_count > 0:
- # assign register
- # print("assign " + entity_name + " -> " + str(next_free_offset))
modbus_info['modbus offset'] = str(next_free_offset)
next_free_offset += register_count
# OUTPUT
-cpp_entries = ""
+cpp_entries = []
# traverse all elements in correct order so they are correctly sorted
for device_type_name in device_type_names:
@@ -267,18 +263,22 @@ for device_type_name in device_type_names:
tag = str(ntag)
if tag in device_type:
entities = device_type[tag]
- for entity_name, modbus_info in sorted(entities.items(), key=lambda x: int(x[1]["modbus offset"])):
+ # Sort once and reuse
+ sorted_entities = sorted(
+ entities.items(), key=lambda x: int(x[1]["modbus offset"]))
+ for entity_name, modbus_info in sorted_entities:
params = {
'devtype': "dt::" + device_type_name,
- # re.sub(r"[0-9]+", "*", tag),
"tagtype": tag_to_tagtype[int(tag)],
"shortname": 'FL_(' + listNames[entity_name] + ")",
"entity_name": entity_name,
'registeroffset': modbus_info["modbus offset"],
'registercount': modbus_info["modbus count"]
}
- # print(entitypath + ": " + str(modbus_info))
- cpp_entries += cpp_entry_template.substitute(params)
+ cpp_entries.append(cpp_entry_template.substitute(params))
-cpp_src = cpp_file_template.substitute({'entries': cpp_entries})
+# Join all entries at once
+cpp_entries_str = "".join(cpp_entries)
+
+cpp_src = cpp_file_template.substitute({'entries': cpp_entries_str})
print(cpp_src)