This commit is contained in:
MichaelDvP
2025-10-25 11:35:50 +02:00
7 changed files with 6250 additions and 6231 deletions

View File

@@ -7,9 +7,11 @@ on:
jobs:
github-releases-to-discord:
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: GitHub Releases To Discord
uses: SethCohen/github-releases-to-discord@v1.13.1

View File

@@ -1,4 +1,6 @@
name: 'Pre-check on PR'
permissions:
contents: read
on:
workflow_dispatch:
@@ -32,6 +34,6 @@ jobs:
pip install wheel
pip install -U platformio
- name: Build standalone
- name: Run unit tests
run: |
platformio run -e standalone
platformio run -e native-test -t exec

File diff suppressed because it is too large Load Diff

View File

@@ -291,11 +291,11 @@ build_flags =
; 8. create the dump_telegrams.csv file
;
; To run this in pio use the steps
; pio run -e build_modbus
; pio run -e build_modbus -t clean -t build
; pio run -e build_standalone -t clean -t build
# builds the modbus_entity_parameters.hpp header file
# pio run -e build_modbus
# pio run -e build_modbus -t clean -t build
[env:build_modbus]
extends = env:native
targets = build
@@ -317,5 +317,5 @@ extra_scripts =
build_flags = -DEMSESP_STANDALONE
custom_test_command = entity_dump
custom_output_file = dump_entities.csv
custom_post_script = scripts/build_modbus_generate_doc_post.py
custom_post_script = scripts/build_standalone_post.py

View File

@@ -4,34 +4,39 @@ import sys
import shutil
from pathlib import Path
# This creates the files
# - dump_entities.csv
# - Modbus-Entity-Registers.md
# - dump_telegrams.csv
# Import the streaming function from the separate module
from run_executable import run_with_streaming_input
def get_python_executable():
"""Get the appropriate Python executable for the current platform."""
# Try different Python executable names
python_names = ['python3', 'python', 'py']
for name in python_names:
if shutil.which(name):
return name
# Fallback to sys.executable if available
return sys.executable
def csv_to_md(csv_file_path, output_file_path, script_path):
# Ensure the output directory exists
# Ensure the output directory exists and remove it
Path(output_file_path).parent.mkdir(parents=True, exist_ok=True)
# delete the output file if it exists
if os.path.exists(output_file_path):
os.remove(output_file_path)
# Read CSV file and pipe to Python script to generate header
python_exe = get_python_executable()
with open(csv_file_path, 'r') as csv_file:
with open(output_file_path, 'w') as output_file:
subprocess.run(
@@ -40,24 +45,27 @@ def csv_to_md(csv_file_path, output_file_path, script_path):
stdout=output_file,
check=True
)
print(f"Generated MD file: {output_file_path} ({os.path.getsize(output_file_path)} bytes)")
print(
f"Generated MD file: {output_file_path} ({os.path.getsize(output_file_path)} bytes)")
def main(program_path="./emsesp"):
csv_file = os.path.join("docs", "dump_entities.csv")
output_file = os.path.join("docs", "Modbus-Entity-Registers.md")
script_file = os.path.join("scripts", "generate-modbus-register-doc.py")
# generate the MD file
csv_to_md(csv_file, output_file, script_file)
# final step is to run the telegram_dump test command and generate the dump_telegrams.csv file
# run the test command and generate the dump_telegrams.csv file
test_command = "test telegram_dump"
telegram_output_file = os.path.join("docs", "dump_telegrams.csv")
print(f"Running test command: telegram_dump > {telegram_output_file}")
run_with_streaming_input(program_path, test_command, telegram_output_file)
if __name__ == "__main__":
# Get program path from command line argument or use default
program_path = sys.argv[1] if len(sys.argv) > 1 else "./emsesp"

View File

@@ -1,138 +1,145 @@
import fileinput
import csv
import sys
from itertools import groupby
from collections import defaultdict
#
# This is used to build the contents of the `Modbus-Entity-Registers.md` file used in the emsesp.org documentation.
#
# static data
tag_to_tagtype = {
-1: "TAG_TYPE_NONE",
0: "DEVICE_DATA",
1: "HC",
2: "HC",
3: "HC",
4: "HC",
5: "HC",
6: "HC",
7: "HC",
8: "HC",
9: "DHW",
10: "DHW",
11: "DHW",
12: "DHW",
13: "DHW",
14: "DHW",
15: "DHW",
16: "DHW",
17: "DHW",
18: "DHW",
19: "AHS",
20: "HS",
21: "HS",
22: "HS",
23: "HS",
24: "HS",
25: "HS",
26: "HS",
27: "HS",
28: "HS",
29: "HS",
30: "HS",
31: "HS",
32: "HS",
33: "HS",
34: "HS",
35: "HS",
36: "SRC",
37: "SRC",
38: "SRC",
39: "SRC",
40: "SRC",
41: "SRC",
42: "SRC",
43: "SRC",
44: "SRC",
45: "SRC",
46: "SRC",
47: "SRC",
48: "SRC",
49: "SRC",
50: "SRC",
52: "SRC"
}
def get_tag_type(modbus_block):
"""Convert modbus block number to tag type using lookup."""
block = int(modbus_block)
# read entities csv from stdin
# Handle special cases first
if block == -1:
return "TAG_TYPE_NONE"
if block == 0:
return "DEVICE_DATA"
if block == 19:
return "AHS"
entities = []
# Use ranges for efficiency
if 1 <= block <= 8:
return "HC"
if 9 <= block <= 18:
return "DHW"
if 20 <= block <= 35:
return "HS"
if 36 <= block <= 51:
return "SRC"
with fileinput.input() as f_input:
entities_reader = csv.reader(f_input, delimiter=',', quotechar='"')
headers = next(entities_reader)
for row in entities_reader:
entity = {}
for i, val in enumerate(row):
entity[headers[i]] = val
entities.append(entity)
# Default fallback
return "UNKNOWN"
def device_name_key(e): return e["device name"]
def read_entities():
"""Read and parse CSV entities from stdin with error handling."""
entities = []
try:
with fileinput.input() as f_input:
entities_reader = csv.reader(f_input, delimiter=',', quotechar='"')
headers = next(entities_reader)
# Validate required headers
required_headers = {'device name', 'device type', 'shortname', 'fullname',
'type [options...] \\| (min/max)', 'uom', 'writeable',
'modbus block', 'modbus offset', 'modbus count', 'modbus scale factor'}
missing_headers = required_headers - set(headers)
if missing_headers:
raise ValueError(
f"Missing required headers: {missing_headers}")
for row_num, row in enumerate(entities_reader, start=2):
if len(row) != len(headers):
print(
f"Warning: Row {row_num} has {len(row)} columns, expected {len(headers)}", file=sys.stderr)
continue
entity = dict(zip(headers, row))
entities.append(entity)
except Exception as e:
print(f"Error reading CSV data: {e}", file=sys.stderr)
sys.exit(1)
return entities
def device_type_key(e): return e["device type"]
def group_entities_by_device_type(entities):
"""Group entities by device type efficiently using defaultdict."""
grouped = defaultdict(list)
for entity in entities:
grouped[entity["device type"]].append(entity)
return grouped
def grouped_by(list, key): return groupby(sorted(list, key=key), key)
# entities_by_device_type = grouped_by(entities, device_type_key)
def printDeviceEntities(device_name, device_entities):
print("### " + device_name)
def print_device_entities(device_name, device_entities):
"""Print device entities table using f-strings for better performance."""
print(f"### {device_name}")
print("| shortname | fullname | type | uom | writeable | tag type | register offset | register count | scale factor |")
print("|-|-|-|-|-|-|-|-|-|")
for de in device_entities:
print("| " + de["shortname"] + " | " + de["fullname"] + " | " + de["type [options...] \\| (min/max)"] + " | " + de["uom"] + " | " + de["writeable"] +
" | " + tag_to_tagtype[int(de["modbus block"])] + " | " + de["modbus offset"] + " | " + de["modbus count"] + " | " + de["modbus scale factor"] + " | ")
for entity in device_entities:
tag_type = get_tag_type(entity["modbus block"])
print(f"| {entity['shortname']} | {entity['fullname']} | {entity['type [options...] \\| (min/max)']} | "
f"{entity['uom']} | {entity['writeable']} | {tag_type} | {entity['modbus offset']} | "
f"{entity['modbus count']} | {entity['modbus scale factor']} |")
print()
def printDeviceTypeDevices(device_type, devices):
print("## Devices of type *" + device_type + "*")
for device_name, device_entities in grouped_by(devices, device_name_key):
printDeviceEntities(device_name, device_entities)
def print_device_type_devices(device_type, devices):
"""Print all devices of a specific type."""
print(f"## Devices of type *{device_type}*")
# Group devices by name
device_groups = defaultdict(list)
for device in devices:
device_groups[device["device name"]].append(device)
for device_name, device_entities in device_groups.items():
print_device_entities(device_name, device_entities)
# write header
print("<!-- Use full browser width for this page, the tables are wide -->")
print("<style>")
print(".md-grid {")
print(" max-width: 100%; /* or 100%, if you want to stretch to full-width */")
print("}")
print("</style>")
print()
print("# Entity/Register Mapping")
print()
print("!!! note")
print()
print(" This file has been auto-generated. Do not modify.")
print()
for device_type, devices in grouped_by(entities, device_type_key):
printDeviceTypeDevices(device_type, devices)
# def printGroupedData(groupedData):
# for k, v in groupedData:
# # print("Group {} {}".format(k, list(v)))
# print(k)
def print_header():
"""Print the markdown document header."""
print("<!-- Use full browser width for this page, the tables are wide -->")
print("<style>")
print(".md-grid {")
print(" max-width: 100%; /* or 100%, if you want to stretch to full-width */")
print("}")
print("</style>")
print()
print("# Entity/Register Mapping")
print()
print("!!! note")
print()
print(" This file has been auto-generated. Do not modify.")
print()
# printGroupedData(grouped_entities)
def main():
"""Main function to process entities and generate documentation."""
# Read entities from stdin
entities = read_entities()
# for e in entities:
# print(e)
if not entities:
print("No entities found in input data.", file=sys.stderr)
sys.exit(1)
# Print header
print_header()
# Group entities by device type and process
grouped_entities = group_entities_by_device_type(entities)
# Print documentation for each device type
for device_type, devices in grouped_entities.items():
print_device_type_devices(device_type, devices)
if __name__ == "__main__":
main()

View File

@@ -158,15 +158,15 @@ cpp_entry_template = Template(
# read translations
listNames = {}
transre = re.compile(r'^MAKE_TRANSLATION\(([^,\s]+)\s*,\s*\"([^\"]+)\"')
transf = open('./src/core/locale_translations.h', 'r')
while True:
line = transf.readline()
if not line:
break
m = transre.match(line)
if m is not None:
listNames[m.group(2)] = m.group(1)
transf.close()
try:
with open('./src/core/locale_translations.h', 'r') as transf:
for line in transf:
m = transre.match(line)
if m is not None:
listNames[m.group(2)] = m.group(1)
except FileNotFoundError:
# Handle case where file doesn't exist
pass
entities = []
@@ -175,10 +175,8 @@ with fileinput.input() as f_input:
headers = next(entities_reader)
for row in entities_reader:
entity = {}
for i, val in enumerate(row):
# print(headers[i] + ": " + val)
entity[headers[i]] = val
# Use dict comprehension for better performance
entity = {headers[i]: val for i, val in enumerate(row)}
entities.append(entity)
# print(json.dumps(entities, indent=" "))
@@ -234,30 +232,28 @@ for entity in entities:
for device_type_name, device_type in device_types.items():
for tag, entities in device_type.items():
total_registers = 0
# Pre-calculate all register info to avoid repeated int() conversions
register_info = []
next_free_offset = 0
for entity_name, modbus_info in entities.items():
register_offset = int(modbus_info['modbus offset'])
register_count = int(modbus_info['modbus count'])
total_registers += register_count
register_info.append(
(entity_name, modbus_info, register_offset, register_count))
if register_offset >= 0 and register_offset + register_count > next_free_offset:
next_free_offset = register_offset + register_count
# print(device_type_name + "/" + tag + ": total_registers=" + str(total_registers) + "; next_free_offset=" + str(
# next_free_offset))
for entity_name, modbus_info in entities.items():
register_offset = int(modbus_info['modbus offset'])
register_count = int(modbus_info['modbus count'])
# Assign registers for unassigned entities
for entity_name, modbus_info, register_offset, register_count in register_info:
if register_offset < 0 and register_count > 0:
# assign register
# print("assign " + entity_name + " -> " + str(next_free_offset))
modbus_info['modbus offset'] = str(next_free_offset)
next_free_offset += register_count
# OUTPUT
cpp_entries = ""
cpp_entries = []
# traverse all elements in correct order so they are correctly sorted
for device_type_name in device_type_names:
@@ -267,18 +263,22 @@ for device_type_name in device_type_names:
tag = str(ntag)
if tag in device_type:
entities = device_type[tag]
for entity_name, modbus_info in sorted(entities.items(), key=lambda x: int(x[1]["modbus offset"])):
# Sort once and reuse
sorted_entities = sorted(
entities.items(), key=lambda x: int(x[1]["modbus offset"]))
for entity_name, modbus_info in sorted_entities:
params = {
'devtype': "dt::" + device_type_name,
# re.sub(r"[0-9]+", "*", tag),
"tagtype": tag_to_tagtype[int(tag)],
"shortname": 'FL_(' + listNames[entity_name] + ")",
"entity_name": entity_name,
'registeroffset': modbus_info["modbus offset"],
'registercount': modbus_info["modbus count"]
}
# print(entitypath + ": " + str(modbus_info))
cpp_entries += cpp_entry_template.substitute(params)
cpp_entries.append(cpp_entry_template.substitute(params))
cpp_src = cpp_file_template.substitute({'entries': cpp_entries})
# Join all entries at once
cpp_entries_str = "".join(cpp_entries)
cpp_src = cpp_file_template.substitute({'entries': cpp_entries_str})
print(cpp_src)