Merge branch 'dev'

This commit is contained in:
proddy
2025-12-31 21:26:15 +01:00
parent eaa277fef0
commit 28135c225b
385 changed files with 40221 additions and 38187 deletions

View File

@@ -1,29 +1,121 @@
from pathlib import Path
import os
import subprocess
import shutil
Import("env")
def buildWeb():
os.chdir("interface")
print("Building web interface...")
def get_pnpm_executable():
"""Get the appropriate pnpm executable for the current platform."""
# Try different pnpm executable names
pnpm_names = ['pnpm', 'pnpm.cmd', 'pnpm.exe']
for name in pnpm_names:
if shutil.which(name):
return name
# Fallback to pnpm if not found
return 'pnpm'
def run_command_in_directory(command, directory):
"""Run a command in a specific directory."""
try:
env.Execute("yarn")
env.Execute("yarn typesafe-i18n --no-watch")
with open("./src/i18n/i18n-util.ts") as r:
text = r.read().replace("Locales = 'pl'", "Locales = 'en'")
with open("./src/i18n/i18n-util.ts", "w") as w:
w.write(text)
print("Setting WebUI locale to 'en'")
env.Execute("yarn build")
env.Execute("yarn webUI")
finally:
os.chdir("..")
result = subprocess.run(
command,
shell=True,
cwd=directory,
check=True,
capture_output=True,
text=True
)
if result.stdout:
print(result.stdout)
if result.stderr:
print(result.stderr)
return True
except subprocess.CalledProcessError as e:
print(f"Command failed: {command}")
print(f"Error: {e}")
if e.stdout:
print(f"Output: {e.stdout}")
if e.stderr:
print(f"Error output: {e.stderr}")
return False
except Exception as e:
print(f"Unexpected error running command '{command}': {e}")
return False
# Don't buuld webUI if called from GitHub Actions
if "NO_BUILD_WEBUI" in os.environ:
print("!! Skipping the build of the web interface !!")
else:
if not (env.IsCleanTarget()):
buildWeb()
def buildWeb():
interface_dir = Path("interface")
pnpm_exe = get_pnpm_executable()
# Set CI environment variable to make pnpm use silent mode
os.environ['CI'] = 'true'
print("Building web interface...")
# Check if interface directory exists
if not interface_dir.exists():
print(f"Error: Interface directory '{interface_dir}' not found!")
return False
# Check if pnpm is available
if not shutil.which(pnpm_exe):
print(f"Error: '{pnpm_exe}' not found in PATH!")
return False
try:
# Run pnpm commands in the interface directory
commands = [
f"{pnpm_exe} install",
f"{pnpm_exe} build_webUI"
]
for command in commands:
print(f"Running: {command}")
if not run_command_in_directory(command, interface_dir):
return False
# Modify i18n-util.ts file
i18n_file = interface_dir / "src" / "i18n" / "i18n-util.ts"
if i18n_file.exists():
with open(i18n_file, 'r') as r:
text = r.read().replace("Locales = 'pl'", "Locales = 'en'")
with open(i18n_file, 'w') as w:
w.write(text)
print("Setting WebUI locale to 'en'")
else:
print(
f"Warning: {i18n_file} not found, skipping locale modification")
print("Web interface build completed successfully!")
return True
except Exception as e:
print(f"Error building web interface: {e}")
return False
def build_webUI(*args, **kwargs):
success = buildWeb()
if not success:
print("Web interface build failed!")
env.Exit(1)
env.Exit(0)
# Create custom target that only runs the script and then exits, without continuing with the pio workflow
env.AddCustomTarget(
name="build",
dependencies=None,
actions=[build_webUI],
title="build web interface",
description="installs pnpm packages, updates libraries and builds web UI",
always_build=True
)

View File

@@ -0,0 +1,55 @@
import subprocess
import os
import sys
import shutil
from pathlib import Path
# This script is used to update the modbus_entity_parameters.hpp file with the data from the dump_entities.csv file.
# It is used in the build_modbus target.
def get_python_executable():
"""Get the appropriate Python executable for the current platform."""
# Try different Python executable names
python_names = ['python3', 'python', 'py']
for name in python_names:
if shutil.which(name):
return name
# Fallback to sys.executable if available
return sys.executable
def csv_to_header(csv_file_path, header_file_path, script_path):
# Ensure the output directory exists
Path(header_file_path).parent.mkdir(parents=True, exist_ok=True)
# delete the output file if it exists
if os.path.exists(header_file_path):
os.remove(header_file_path)
# Read CSV file and pipe to Python script to generate header
python_exe = get_python_executable()
with open(csv_file_path, 'r') as csv_file:
with open(header_file_path, 'w') as header_file:
subprocess.run(
[python_exe, script_path],
stdin=csv_file,
stdout=header_file,
check=True
)
print(f"Generated header file: {header_file_path} ({os.path.getsize(header_file_path)} bytes)")
def main():
csv_file = os.path.join("docs", "dump_entities.csv")
header_file = os.path.join("src", "core", "modbus_entity_parameters.hpp")
script_file = os.path.join("scripts", "update_modbus_registers.py")
csv_to_header(csv_file, header_file, script_file)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,43 @@
from pathlib import Path
import os
Import("env")
# This script is used to create a dummy modbus_entity_parameters.hpp so the first pass compiles.
# It is used in the build_modbus target.
def create_dummy_modbus_header():
"""Create a dummy modbus_entity_parameters.hpp so the first pass compiles."""
header_content = '''#include "modbus.h"
#include "emsdevice.h"
/*
* This file is auto-generated. Do not modify.
*/
// clang-format off
namespace emsesp {
using dt = EMSdevice::DeviceType;
#define REGISTER_MAPPING(device_type, device_value_tag_type, long_name, modbus_register_offset, modbus_register_count) \\
{ device_type, device_value_tag_type, long_name[0], modbus_register_offset, modbus_register_count }
// IMPORTANT: This list MUST be ordered by keys "device_type", "device_value_tag_type" and "modbus_register_offset" in this order.
const std::initializer_list<Modbus::EntityModbusInfo> Modbus::modbus_register_mappings = {};
} // namespace emsesp
// clang-format on
'''
header_path = Path("src") / "core" / "modbus_entity_parameters.hpp"
header_path.parent.mkdir(parents=True, exist_ok=True)
with open(header_path, 'w') as f:
f.write(header_content)
print(f"Created dummy header file: {header_path} ({os.path.getsize(header_path)} bytes)")
if not (env.IsCleanTarget()):
create_dummy_modbus_header()

52
scripts/build_run_test.py Executable file
View File

@@ -0,0 +1,52 @@
import os
import shutil
import subprocess
import sys
Import("env")
# This script is used to build the program and run the test command.
# It is used in the build_modbus and build_standalone targets.
# Reads configuration from platformio.ini (output file path and test command)
# Runs the executable with streaming input, sending it the test command and capturing output to a file
# Optionally runs a post-processing script if defined in the configuration
# Import the streaming function from the separate module
from run_executable import run_with_streaming_input
def get_python_executable():
"""Get the appropriate Python executable for the current platform."""
# Try different Python executable names
python_names = ['python3', 'python', 'py']
for name in python_names:
if shutil.which(name):
return name
# Fallback to sys.executable if available
return sys.executable
def build_run_test(source, target, env):
# Get the executable path
program_path = source[0].get_abspath()
# Get output file and test command from environment variable or use defaults
output_file = os.path.join("docs", env.GetProjectOption("custom_output_file", "dump_default_output.txt"))
test_command = env.GetProjectOption("custom_test_command", "test entity_dump")
# run the test command and save the output to the output file
run_with_streaming_input(program_path, test_command, output_file)
# if we have a post command defined run it
post_script = env.GetProjectOption("custom_post_script", None)
if post_script:
print(f"Running post script: {post_script}")
python_exe = get_python_executable()
subprocess.run([python_exe, post_script, program_path], check=True)
env.AddCustomTarget(
"build",
"$BUILD_DIR/${PROGNAME}$PROGSUFFIX",
build_run_test
)

View File

@@ -0,0 +1,73 @@
import subprocess
import os
import sys
import shutil
from pathlib import Path
# This creates the files
# - dump_entities.csv
# - Modbus-Entity-Registers.md
# - dump_telegrams.csv
# Modbus-Entity-Registers.md is used in the emsesp.org documentation. You can also create it manually with the generate-modbus-register-doc.py script.
# cat ./docs/dump_entities.csv | python3 ./scripts/generate-modbus-register-doc.py # Import the streaming function from the separate module
from run_executable import run_with_streaming_input
def get_python_executable():
"""Get the appropriate Python executable for the current platform."""
# Try different Python executable names
python_names = ['python3', 'python', 'py']
for name in python_names:
if shutil.which(name):
return name
# Fallback to sys.executable if available
return sys.executable
def csv_to_md(csv_file_path, output_file_path, script_path):
# Ensure the output directory exists and remove it
Path(output_file_path).parent.mkdir(parents=True, exist_ok=True)
if os.path.exists(output_file_path):
os.remove(output_file_path)
# Read CSV file and pipe to Python script to generate header
python_exe = get_python_executable()
with open(csv_file_path, 'r') as csv_file:
with open(output_file_path, 'w') as output_file:
subprocess.run(
[python_exe, script_path],
stdin=csv_file,
stdout=output_file,
check=True
)
print(
f"Generated MD file: {output_file_path} ({os.path.getsize(output_file_path)} bytes)")
def main(program_path="./emsesp"):
csv_file = os.path.join("docs", "dump_entities.csv")
output_file = os.path.join("docs", "Modbus-Entity-Registers.md")
script_file = os.path.join("scripts", "generate-modbus-register-doc.py")
# generate the MD file
csv_to_md(csv_file, output_file, script_file)
# final step is to run the telegram_dump test command and generate the dump_telegrams.csv file
# run the test command and generate the dump_telegrams.csv file
test_command = "test telegram_dump"
telegram_output_file = os.path.join("docs", "dump_telegrams.csv")
print(f"Running test command: telegram_dump > {telegram_output_file}")
run_with_streaming_input(program_path, test_command, telegram_output_file)
if __name__ == "__main__":
# Get program path from command line argument or use default
program_path = sys.argv[1] if len(sys.argv) > 1 else "./emsesp"
main(program_path)

View File

@@ -3,7 +3,7 @@
# and https://github.com/streetsidesoftware/vscode-spell-checker/blob/main/FAQ.md
# go to root top level directory
# install cpsell with yarn add --dev cspell@latest
# install cpsell with pnpm add --dev cspell@latest
# first time installation
# 1) run `npx cspell "**" --no-progress --unique > words-found-verbose.txt`

14
scripts/force_clean.py Normal file
View File

@@ -0,0 +1,14 @@
Import("env")
import os
import shutil
def force_clean(source, target, env):
"""Remove build directory before building"""
build_dir = env.subst("$BUILD_DIR")
if os.path.exists(build_dir):
print(f"Force cleaning: {build_dir}")
shutil.rmtree(build_dir)
# Register the callback to run before building
env.AddPreAction("$BUILD_DIR/${PROGNAME}$PROGSUFFIX", force_clean)

View File

@@ -1,122 +1,167 @@
import fileinput
import csv
import sys
from itertools import groupby
from collections import defaultdict
#
# This is used to build the contents of the `Modbus-Entity-Registers.md` file used in the emsesp.org documentation.
#
# static data
tag_to_tagtype = {
-1: "TAG_TYPE_NONE",
0: "DEVICE_DATA",
1: "HC",
2: "HC",
3: "HC",
4: "HC",
5: "HC",
6: "HC",
7: "HC",
8: "HC",
9: "DHW",
10: "DHW",
11: "DHW",
12: "DHW",
13: "DHW",
14: "DHW",
15: "DHW",
16: "DHW",
17: "DHW",
18: "DHW",
19: "AHS",
20: "HS",
21: "HS",
22: "HS",
23: "HS",
24: "HS",
25: "HS",
26: "HS",
27: "HS",
28: "HS",
29: "HS",
30: "HS",
31: "HS",
32: "HS",
33: "HS",
34: "HS",
35: "HS"
}
def get_tag_type(modbus_block):
"""Convert modbus block number to tag type using lookup."""
block = int(modbus_block)
# read entities csv from stdin
# Handle special cases first
if block == -1:
return "TAG_TYPE_NONE"
if block == 0:
return "DEVICE_DATA"
if block == 19:
return "AHS"
entities = []
# Use ranges for efficiency
if 1 <= block <= 8:
return "HC"
if 9 <= block <= 18:
return "DHW"
if 20 <= block <= 35:
return "HS"
if 36 <= block <= 51:
return "SRC"
with fileinput.input() as f_input:
entities_reader = csv.reader(f_input, delimiter=',', quotechar='"')
headers = next(entities_reader)
for row in entities_reader:
entity = {}
for i, val in enumerate(row):
entity[headers[i]] = val
entities.append(entity)
# Default fallback
return "UNKNOWN"
def device_name_key(e): return e["device name"]
def read_entities():
"""Read and parse CSV entities from stdin with error handling."""
entities = []
try:
with fileinput.input() as f_input:
entities_reader = csv.reader(f_input, delimiter=',', quotechar='"')
headers = next(entities_reader)
# Validate required headers
required_headers = {'device name', 'device type', 'shortname', 'fullname',
'type [options...] \\| (min/max)', 'uom', 'writeable',
'modbus block', 'modbus offset', 'modbus count', 'modbus scale factor'}
missing_headers = required_headers - set(headers)
if missing_headers:
raise ValueError(
f"Missing required headers: {missing_headers}")
for row_num, row in enumerate(entities_reader, start=2):
if len(row) != len(headers):
print(
f"Warning: Row {row_num} has {len(row)} columns, expected {len(headers)}", file=sys.stderr)
continue
entity = dict(zip(headers, row))
entities.append(entity)
except Exception as e:
print(f"Error reading CSV data: {e}", file=sys.stderr)
sys.exit(1)
return entities
def device_type_key(e): return e["device type"]
def group_entities_by_device_type(entities):
"""Group entities by device type efficiently using defaultdict."""
grouped = defaultdict(list)
for entity in entities:
grouped[entity["device type"]].append(entity)
return grouped
def grouped_by(list, key): return groupby(sorted(list, key=key), key)
# entities_by_device_type = grouped_by(entities, device_type_key)
def printDeviceEntities(device_name, device_entities):
print("### " + device_name)
def print_device_entities(device_name, device_entities):
"""Print device entities table using f-strings for better performance."""
print(f"### {device_name}")
print()
print("| shortname | fullname | type | uom | writeable | tag type | register offset | register count | scale factor |")
print("|-|-|-|-|-|-|-|-|-|")
for de in device_entities:
print("| " + de["shortname"] + " | " + de["fullname"] + " | " + de["type [options...] \\| (min/max)"] + " | " + de["uom"] + " | " + de["writeable"] +
" | " + tag_to_tagtype[int(de["modbus block"])] + " | " + de["modbus offset"] + " | " + de["modbus count"] + " | " + de["modbus scale factor"] + " | ")
for entity in device_entities:
tag_type = get_tag_type(entity["modbus block"])
type_long = entity['type [options...] \\| (min/max)']
# Split off type and the rest efficiently
split_type = type_long.split(" ", 1)
type_base = split_type[0]
type_suffix = split_type[1] if len(split_type) > 1 else ""
# Optimize type_rest extraction and int range detection
type_rest_str = type_suffix
if "int" in type_base and "(" in type_suffix:
try:
# Extract inner part of parentheses
range_inner = type_suffix[type_suffix.index("(")+1:type_suffix.index(")")]
min_value = max_value = None
if ">=" in range_inner and "<=" in range_inner:
# >=0<=100 style
min_value, max_value = range_inner.replace(">=", "").split("<=")
elif "/" in range_inner:
min_value, max_value = range_inner.split("/")
if min_value is not None and max_value is not None:
type_rest_str = f"(&gt;={min_value.strip()}&lt;={max_value.strip()})"
else:
type_rest_str = ""
except Exception:
# fallback to original
pass
print(f"| {entity['shortname']} | {entity['fullname']} | {type_base} {type_rest_str} | "
f"{entity['uom']} | {entity['writeable']} | {tag_type} | {entity['modbus offset']} | "
f"{entity['modbus count']} | {entity['modbus scale factor']} |")
print()
def printDeviceTypeDevices(device_type, devices):
print("## Devices of type *" + device_type + "*")
for device_name, device_entities in grouped_by(devices, device_name_key):
printDeviceEntities(device_name, device_entities)
def print_device_type_devices(device_type, devices):
"""Print all devices of a specific type."""
print(f"## Devices of type \\_{device_type}")
print()
# Group devices by name
device_groups = defaultdict(list)
for device in devices:
device_groups[device["device name"]].append(device)
for device_name, device_entities in device_groups.items():
print_device_entities(device_name, device_entities)
# write header
print("<!-- Use full browser width for this page, the tables are wide -->")
print("<style>")
print(".md-grid {")
print(" max-width: 100%; /* or 100%, if you want to stretch to full-width */")
print("}")
print("</style>")
print()
print("# Entity/Register Mapping")
print()
print("!!! note")
print()
print(" This file has been auto-generated. Do not modify.")
print()
for device_type, devices in grouped_by(entities, device_type_key):
printDeviceTypeDevices(device_type, devices)
# def printGroupedData(groupedData):
# for k, v in groupedData:
# # print("Group {} {}".format(k, list(v)))
# print(k)
def print_header():
"""Print the markdown document header."""
print("# Modbus Entity/Register Mapping")
print()
print(":::warning")
print("This file has been auto-generated. Do not modify.")
print(":::")
print()
# printGroupedData(grouped_entities)
def main():
"""Main function to process entities and generate documentation."""
# Read entities from stdin
entities = read_entities()
# for e in entities:
# print(e)
if not entities:
print("No entities found in input data.", file=sys.stderr)
sys.exit(1)
# Print header
print_header()
# Group entities by device type and process
grouped_entities = group_entities_by_device_type(entities)
# Print documentation for each device type
for device_type, devices in grouped_entities.items():
print_device_type_devices(device_type, devices)
if __name__ == "__main__":
main()

9
scripts/generate_csv_and_headers.sh Executable file → Normal file
View File

@@ -1,9 +1,14 @@
#!/bin/sh
#
# Builds the dump_*.csv files, modbus headers and modbus documentation.
# Run as `sh scripts/generate_csv_and_headers.sh` from the root of the repository.
##
## IMPORTANT NOTE!
## This script is not used anymore. It is kept for reference only.
## It has been replaced with two pio targets: build_modbus and build_standalone.
##
# create a dummy modbus_entity_parameters.hpp so the first pass compiles
cat >./src/core/modbus_entity_parameters.hpp <<EOL
#include "modbus.h"
@@ -56,4 +61,4 @@ echo "test telegram_dump" | ./emsesp | python3 ./scripts/strip_csv.py > ./docs/d
ls -al ./src/core/modbus_entity_parameters.hpp
ls -al ./docs/Modbus-Entity-Registers.md
ls -al ./docs/dump_entities.csv
ls -al ./docs/dump_telegrams.csv
ls -al ./docs/dump_telegrams.csv

View File

@@ -2,133 +2,121 @@ import hashlib
import shutil
import re
import os
from pathlib import Path
Import("env")
OUTPUT_DIR = "build{}".format(os.path.sep)
OUTPUT_DIR = Path("build")
def bin_copy(source, target, env):
"""Optimized firmware renaming and copying function."""
# Get the application version from emsesp_version.h
version_file = Path('./src/emsesp_version.h')
if not version_file.exists():
print("Error: emsesp_version.h not found!")
return
app_version = None
version_pattern = re.compile(r'^#define EMSESP_APP_VERSION\s+"(\S+)"')
with version_file.open('r') as f:
for line in f:
match = version_pattern.match(line)
if match:
app_version = match.group(1)
break
if not app_version:
print("Error: Could not find EMSESP_APP_VERSION in emsesp_version.h!")
return
# get the application version from version.h
bag = {}
exprs = [(re.compile(r'^#define EMSESP_APP_VERSION\s+"(\S+)"'), 'app_version')]
with open('./src/version.h', 'r') as f:
for l in f.readlines():
for expr, var in exprs:
m = expr.match(l)
if m and len(m.groups()) > 0:
bag[var] = m.group(1)
# Get the chip type, in uppercase
mcu = env.get('BOARD_MCU', '').upper()
if not mcu:
print("Error: Could not determine MCU type!")
return
app_version = bag.get('app_version')
# Work out the flash memory from the PIO env name
flash_mem = "4MB" # default
pio_env = env.get('PIOENV', '').upper()
if pio_env:
parts = pio_env.split('_')
# If it ends with _P skip (we use this to denote PSRAM)
index = -2 if parts[-1].endswith("P") else -1
# If it has an M at the end, use it
if parts[index].endswith("M"):
flash_mem = parts[index] + "B"
# print(env.Dump())
# Check if BOARD_HAS_PSRAM is in the cppdefines
cppdefines = env.get("CPPDEFINES", [])
psram = 'BOARD_HAS_PSRAM' in cppdefines
# get the chip type, in uppercase
mcu = env.get('BOARD_MCU').upper()
# alternatively take platform from the pio target
# platform = str(target[0]).split(os.path.sep)[2]
print("=" * 90)
print(f"EMS-ESP version: {app_version}")
print(f"Has PSRAM: {'Yes' if psram else 'No'}")
print(f"MCU: {mcu}")
print(f"Flash Mem: {flash_mem}")
# work out the flash memory from the PIO env name (sloppy but works)
# unfortunately the board_upload.flash_size is not passed down
flash_mem = "4MB"
pio_env = env.get('PIOENV').upper()
parts = pio_env.split('_')
# if it ends with a _P skip (we use this to denote PSRAM)
if parts[-1].endswith("P"):
index = -2
else:
index = -1
# if doesn't have an M at the end
if parts[index].endswith("M"):
flash_mem = parts[index] + "B"
# find if BOARD_HAS_PSRAM is in the cppdefines
cppdefines = env.get("CPPDEFINES")
if 'BOARD_HAS_PSRAM' in cppdefines:
psram = True
else:
psram = False
print("*********************************************")
print("EMS-ESP version: " + app_version)
# show psram as Yes or No
psram_status = "Yes" if psram else "No"
print("Has PSRAM: " + psram_status)
print("MCU: "+str(mcu))
print("Flash Mem: " + flash_mem)
# convert . to _ so Windows doesn't complain
# Convert . to _ so Windows doesn't complain
# Format is EMS-ESP-<version>-<mcu>-<flash> with + at the end if it has PSRAM
variant = "EMS-ESP-" + \
app_version.replace(".", "_") + "-" + mcu + "-" + \
flash_mem + ("+" if psram else "")
variant = f"EMS-ESP-{app_version.replace('.', '_')}-{mcu}-{flash_mem}{'+' if psram else ''}"
# check if output directories exist and create if necessary
if not os.path.isdir(OUTPUT_DIR):
os.mkdir(OUTPUT_DIR)
# Create output directories
firmware_dir = OUTPUT_DIR / "firmware"
firmware_dir.mkdir(parents=True, exist_ok=True)
for d in ['firmware']:
if not os.path.isdir("{}{}".format(OUTPUT_DIR, d)):
os.mkdir("{}{}".format(OUTPUT_DIR, d))
# Define file paths
bin_file = firmware_dir / f"{variant}.bin"
md5_file = firmware_dir / f"{variant}.md5"
# create string with location and file names based on variant
bin_file = "{}firmware{}{}.bin".format(OUTPUT_DIR, os.path.sep, variant)
md5_file = "{}firmware{}{}.md5".format(OUTPUT_DIR, os.path.sep, variant)
# Remove existing files if they exist
for file_path in [bin_file, md5_file]:
if file_path.exists():
file_path.unlink()
# check if new target files exist and remove if necessary
for f in [bin_file]:
if os.path.isfile(f):
os.remove(f)
print(f"Filename: {bin_file}")
# check if new target files exist and remove if necessary
for f in [md5_file]:
if os.path.isfile(f):
os.remove(f)
# Copy firmware.bin to firmware/<variant>.bin
shutil.copy2(str(target[0]), str(bin_file))
print("Filename: "+bin_file)
# Calculate and write MD5 hash
with bin_file.open("rb") as f:
md5_hash = hashlib.md5(f.read()).hexdigest()
print(f"MD5: {md5_hash}")
md5_file.write_text(md5_hash)
# copy firmware.bin to firmware/<variant>.bin
shutil.copy(str(target[0]), bin_file)
with open(bin_file, "rb") as f:
result = hashlib.md5(f.read())
print("MD5: "+result.hexdigest())
file1 = open(md5_file, 'w')
file1.write(result.hexdigest())
file1.close()
# make a copy using the old 3.6.x filename format for backwards compatibility with the WebUI version check, e.g.
# create a EMS-ESP-<version>-ESP32_S3.bin if target is s3_16M_P (16MB, PSRAM)
# create a EMS-ESP-<version>-ESP32.bin if target is s_4M (4MB, no PSRAM), compatible only with S32 V1 and E32 V1.0,1.4,1.5
#
# Note: there is a chance newer E32V2s (which use the 16MB partition table and PSRAM) are running a custom build
# of the 3.6.5 firmware as 3.6.5 was released before production of the gateway board. Updating via the WebUI will break the system and require a manual update.
#
extra_variant = ""
if env.get('PIOENV') == "s3_16M_P":
extra_variant = "EMS-ESP-" + \
app_version.replace(".", "_") + "-ESP32_S3"
elif env.get('PIOENV') == "s_4M":
extra_variant = "EMS-ESP-" + app_version.replace(".", "_") + "-ESP32"
# Make a copy using the old 3.6.x filename format for backwards compatibility
# Note: there is a chance newer E32V2s (which use the 16MB partition table and PSRAM)
# are running a custom build of the 3.6.5 firmware as 3.6.5 was released before
# production of the gateway board. Updating via the WebUI will break the system
# and require a manual update.
pio_env = env.get('PIOENV', '')
extra_variant = None
if pio_env == "s3_16M_P":
extra_variant = f"EMS-ESP-{app_version.replace('.', '_')}-ESP32_S3"
elif pio_env == "s_4M":
extra_variant = f"EMS-ESP-{app_version.replace('.', '_')}-ESP32"
if extra_variant:
extra_bin_file = "{}firmware{}{}.bin".format(
OUTPUT_DIR, os.path.sep, extra_variant)
if os.path.isfile(extra_bin_file):
os.remove(extra_bin_file)
extra_bin_file = firmware_dir / f"{extra_variant}.bin"
extra_md5_file = firmware_dir / f"{extra_variant}.md5"
# Remove existing files if they exist
for file_path in [extra_bin_file, extra_md5_file]:
if file_path.exists():
file_path.unlink()
# Copy files
shutil.copy2(str(bin_file), str(extra_bin_file))
shutil.copy2(str(md5_file), str(extra_md5_file))
print(f"Filename copy for 3.6.x: {extra_bin_file}")
extra_md5_file = "{}firmware{}{}.md5".format(
OUTPUT_DIR, os.path.sep, extra_variant)
if os.path.isfile(extra_md5_file):
os.remove(extra_md5_file)
shutil.copy(bin_file, extra_bin_file)
shutil.copy(md5_file, extra_md5_file)
print("Filename copy for 3.6.x: "+extra_bin_file)
print("*********************************************")
print("=" * 90)
env.AddPostAction("$BUILD_DIR/${PROGNAME}.bin", [bin_copy])

137
scripts/run_executable.py Executable file
View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python3
"""
Utility functions for running executables with streaming input and CSV output extraction.
"""
import subprocess
import sys
import os
from pathlib import Path
def run_with_streaming_input(program_path, test_command, output_file=None):
"""
Run the executable and stream text input to it.
Args:
program_path (str): Path to the executable to run
test_command (str): Command to send to the executable
output_file (str, optional): Path to save CSV output. If None, no file is saved.
Returns:
int: Return code of the executed process
"""
try:
# Start the process with pipes for stdin, stdout, and stderr
process = subprocess.Popen(
[str(program_path)],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding='utf-8',
errors='replace', # Replace invalid UTF-8 bytes instead of crashing
bufsize=1 # Line buffered
)
# add "test " to test_command if it doesn't already start with "test "
if not test_command.startswith("test "):
test_command = "test " + test_command
# Stream input line by line
for line in test_command.strip().split('\n'):
process.stdin.write(line + '\n')
process.stdin.flush()
# Close stdin to signal end of input
process.stdin.close()
# Read and collect output between CSV START and CSV END, then export to file
in_cvs_section = False
csv_output = []
for line in process.stdout:
if "---- CSV START ----" in line:
in_cvs_section = True
continue
elif "---- CSV END ----" in line:
in_cvs_section = False
continue
elif in_cvs_section:
csv_output.append(line)
# print(line, end='')
# Export CSV output to file if output_file is specified
if output_file:
# Ensure the output directory exists
Path(output_file).parent.mkdir(parents=True, exist_ok=True)
# delete the output file if it exists
if os.path.exists(output_file):
os.remove(output_file)
# Export CSV output to file
with open(output_file, 'w', encoding='utf-8', errors='replace') as f:
f.writelines(csv_output)
print(f"CSV file created: {output_file} ({os.path.getsize(output_file)} bytes)")
# Wait for process to complete
return_code = process.wait()
# Print any errors
stderr_output = process.stderr.read()
if stderr_output:
print("\nErrors:", file=sys.stderr)
print(stderr_output, file=sys.stderr)
return return_code
except Exception as e:
print(f"Error running executable: {e}", file=sys.stderr)
return 1
def run_executable_with_command(program_path, command, output_file=None):
"""
Simplified interface to run an executable with a command and optionally save output.
Args:
program_path (str): Path to the executable to run
command (str): Command to send to the executable
output_file (str, optional): Path to save CSV output. If None, no file is saved.
Returns:
int: Return code of the executed process
"""
return run_with_streaming_input(program_path, command, output_file)
def main():
"""Command-line interface for running executables with streaming input."""
if len(sys.argv) < 3:
print("Usage: python3 run_executable.py <program_path> <command> [output_file]")
print("Example: python3 run_executable.py ./emsesp entity_dump ./output.csv")
sys.exit(1)
program_path = sys.argv[1]
command = sys.argv[2]
output_file = sys.argv[3] if len(sys.argv) > 3 else None
print(f"Running: {program_path}")
print(f"Command: {command}")
if output_file:
print(f"Output file: {output_file}")
return_code = run_with_streaming_input(program_path, command, output_file)
if return_code == 0:
print("Execution completed successfully!")
else:
print(f"Execution failed with return code: {return_code}")
sys.exit(return_code)
if __name__ == "__main__":
main()

View File

@@ -11,7 +11,7 @@ def move_file(source, target, env):
# get the build info
bag = {}
exprs = [(re.compile(r'^#define EMSESP_APP_VERSION\s+"(\S+)"'), 'app_version')]
with open('./src/version.h', 'r') as f:
with open('./src/emsesp_version.h', 'r') as f:
for l in f.readlines():
for expr, var in exprs:
m = expr.match(l)
@@ -36,7 +36,6 @@ def move_file(source, target, env):
print("app version: " + app_version)
print("platform: " + platform)
# TODO do we need to add a .exe extension for windows? - need to test
variant = "native"
# check if output directories exist and create if necessary

View File

@@ -16,6 +16,6 @@ make clean
-Dsonar.organization=proddy \
-Dsonar.projectKey=proddy_EMS-ESP32 \
-Dsonar.projectName=EMS-ESP32 \
-Dsonar.sources="./src, ./lib/framework" \
-Dsonar.sources="./src" \
-Dsonar.cfamily.compile-commands=compile_commands.json \
-Dsonar.host.url=https://sonarcloud.io

View File

@@ -1,31 +1,30 @@
#!/bin/sh
# run from root folder, like `sh ./scripts/update_all.sh`
# make sure ncu is installed globally (https://github.com/raineorshine/npm-check-updates)
# as well as GNUMake (make) and python3
cd interface
rm -rf yarn.lock node_modules
touch yarn.lock
ncu -u
yarn set version stable
yarn
yarn format
yarn lint
rm -rf node_modules
corepack use pnpm@latest-10
pnpm update --latest
pnpm install
pnpm format
pnpm lint
cd ../mock-api
rm -rf yarn.lock node_modules
touch yarn.lock
ncu -u
yarn set version stable
yarn
yarn format
rm -rf node_modules
corepack use pnpm@latest-10
pnpm update --latest
pnpm install
pnpm format
cd ..
cd interface
yarn build; yarn webUI
pnpm build_webUI
cd ..
npx cspell "**"
npx cspell "**"
sh ./scripts/generate_csv_and_headers.sh
# platformio run -e build_modbus
# platformio run -e build_standalone
# platformio run -e native-test -t exec

View File

@@ -1,7 +1,10 @@
#
# Update modbus parameters from entity definitions.
# This script generates c++ code for the modbus parameter definitions.
# Called by /scripts/generate_csv_and_headers.sh
#
# Called by /scripts/generate_csv_and_headers.sh and pio build_modbus target.
# can be called manually with:
# cat ./docs/dump_entities.csv | python3 ./scripts/update_modbus_registers.py > ./src/core/modbus_entity_parameters.hpp
import fileinput
import csv
@@ -42,7 +45,9 @@ string_sizes = {
"thermostat/switchtime2": 16,
"thermostat/switchtime": 16,
"thermostat/switchtimeww": 21,
"controller/datetime": 25
"controller/datetime": 25,
"connect/datetime": 25,
"connect/name": 51
}
tag_to_tagtype = {
@@ -82,7 +87,23 @@ tag_to_tagtype = {
32: "TAG_TYPE_HS",
33: "TAG_TYPE_HS",
34: "TAG_TYPE_HS",
35: "TAG_TYPE_HS"
35: "TAG_TYPE_HS",
36: "TAG_TYPE_SRC",
37: "TAG_TYPE_SRC",
38: "TAG_TYPE_SRC",
39: "TAG_TYPE_SRC",
40: "TAG_TYPE_SRC",
41: "TAG_TYPE_SRC",
42: "TAG_TYPE_SRC",
43: "TAG_TYPE_SRC",
44: "TAG_TYPE_SRC",
45: "TAG_TYPE_SRC",
46: "TAG_TYPE_SRC",
47: "TAG_TYPE_SRC",
48: "TAG_TYPE_SRC",
49: "TAG_TYPE_SRC",
50: "TAG_TYPE_SRC",
51: "TAG_TYPE_SRC"
}
device_type_names = [
@@ -140,27 +161,24 @@ cpp_entry_template = Template(
# read translations
listNames = {}
transre = re.compile(r'^MAKE_TRANSLATION\(([^,\s]+)\s*,\s*\"([^\"]+)\"')
transf = open('./src/core/locale_translations.h', 'r')
while True:
line = transf.readline()
if not line:
break
m = transre.match(line)
if m is not None:
listNames[m.group(2)] = m.group(1)
transf.close()
try:
with open('./src/core/locale_translations.h', 'r', encoding='utf-8', errors='replace') as transf:
for line in transf:
m = transre.match(line)
if m is not None:
listNames[m.group(2)] = m.group(1)
except FileNotFoundError:
# Handle case where file doesn't exist
raise Exception('Error! locale_translations.h not found')
entities = []
with fileinput.input() as f_input:
with fileinput.input(encoding='utf-8', errors='replace') as f_input:
entities_reader = csv.reader(f_input, delimiter=',', quotechar='"')
headers = next(entities_reader)
for row in entities_reader:
entity = {}
for i, val in enumerate(row):
# print(headers[i] + ": " + val)
entity[headers[i]] = val
entity = {headers[i]: val for i, val in enumerate(row)}
entities.append(entity)
# print(json.dumps(entities, indent=" "))
@@ -192,8 +210,8 @@ for entity in entities:
(-string_sizes[entity_dev_name] // 2) # divide and round up
if int(entity["modbus count"]) <= 0:
raise Exception('Entity "' + entity_dev_name + ' (' + entity_shortname + ')' +
'" does not have a size - string sizes need to be added manually to update_modbus_registers.py/string_sizes')
raise Exception('Error! Entity "' + entity_dev_name + ' (' + entity_shortname + ')' +
'" does not have a size - string sizes need to be added manually to update_modbus_registers.py/string_sizes[]')
# if entity["modbus count"] == "0":
# print("ignoring " + entity_dev_name + " - it has a register length of zero")
@@ -216,30 +234,28 @@ for entity in entities:
for device_type_name, device_type in device_types.items():
for tag, entities in device_type.items():
total_registers = 0
# Pre-calculate all register info to avoid repeated int() conversions
register_info = []
next_free_offset = 0
for entity_name, modbus_info in entities.items():
register_offset = int(modbus_info['modbus offset'])
register_count = int(modbus_info['modbus count'])
total_registers += register_count
register_info.append(
(entity_name, modbus_info, register_offset, register_count))
if register_offset >= 0 and register_offset + register_count > next_free_offset:
next_free_offset = register_offset + register_count
# print(device_type_name + "/" + tag + ": total_registers=" + str(total_registers) + "; next_free_offset=" + str(
# next_free_offset))
for entity_name, modbus_info in entities.items():
register_offset = int(modbus_info['modbus offset'])
register_count = int(modbus_info['modbus count'])
# Assign registers for unassigned entities
for entity_name, modbus_info, register_offset, register_count in register_info:
if register_offset < 0 and register_count > 0:
# assign register
# print("assign " + entity_name + " -> " + str(next_free_offset))
modbus_info['modbus offset'] = str(next_free_offset)
next_free_offset += register_count
# OUTPUT
cpp_entries = ""
cpp_entries = []
# traverse all elements in correct order so they are correctly sorted
for device_type_name in device_type_names:
@@ -249,18 +265,28 @@ for device_type_name in device_type_names:
tag = str(ntag)
if tag in device_type:
entities = device_type[tag]
for entity_name, modbus_info in sorted(entities.items(), key=lambda x: int(x[1]["modbus offset"])):
# Sort once and reuse
sorted_entities = sorted(
entities.items(), key=lambda x: int(x[1]["modbus offset"]))
for entity_name, modbus_info in sorted_entities:
# Strip device type prefix (e.g., "dhw.nrg" -> "nrg") for translation lookup
lookup_name = entity_name.split('.')[-1] if '.' in entity_name else entity_name
if lookup_name not in listNames:
raise KeyError(f"Error! Translation not found for '{lookup_name}' (entity: '{entity_name}'). Please add it to locale_translations.h")
params = {
'devtype': "dt::" + device_type_name,
# re.sub(r"[0-9]+", "*", tag),
"tagtype": tag_to_tagtype[int(tag)],
"shortname": 'FL_(' + listNames[entity_name] + ")",
"shortname": 'FL_(' + listNames[lookup_name] + ")",
"entity_name": entity_name,
'registeroffset': modbus_info["modbus offset"],
'registercount': modbus_info["modbus count"]
}
# print(entitypath + ": " + str(modbus_info))
cpp_entries += cpp_entry_template.substitute(params)
cpp_entries.append(cpp_entry_template.substitute(params))
cpp_src = cpp_file_template.substitute({'entries': cpp_entries})
# Join all entries at once
cpp_entries_str = "".join(cpp_entries)
cpp_src = cpp_file_template.substitute({'entries': cpp_entries_str})
print(cpp_src)

View File

@@ -36,8 +36,34 @@ except ImportError:
from termcolor import cprint
def print_success(x): return cprint(x, 'green')
def print_fail(x): return cprint('Error: '+x, 'red')
def print_success(x):
cprint(x, 'green')
def print_fail(x):
cprint(f'Error: {x}', 'red')
def build_headers(host_ip, emsesp_url, content_type='application/json', access_token=None, extra_headers=None):
"""Build common HTTP headers with optional overrides."""
headers = {
'Host': host_ip,
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0',
'Accept': '*/*',
'Accept-Language': 'en-US',
'Accept-Encoding': 'gzip, deflate',
'Referer': emsesp_url,
'Content-Type': content_type,
'Connection': 'keep-alive'
}
if access_token:
headers['Authorization'] = f'Bearer {access_token}'
if extra_headers:
headers.update(extra_headers)
return headers
def on_upload(source, target, env):
@@ -53,26 +79,16 @@ def on_upload(source, target, env):
username = env.GetProjectOption('custom_username')
password = env.GetProjectOption('custom_password')
emsesp_ip = env.GetProjectOption('custom_emsesp_ip')
except:
print_fail('Missing settings. Add these to your pio_local.ini file: \n\ncustom_username=username\ncustom_password=password\ncustom_emsesp_ip=ems-esp.local\n')
except Exception as e:
print_fail(f'Missing settings. Add these to your pio_local.ini file:\n\ncustom_username=username\ncustom_password=password\ncustom_emsesp_ip=ems-esp.local\n')
return
emsesp_url = "http://" + env.GetProjectOption('custom_emsesp_ip')
emsesp_url = f"http://{emsesp_ip}"
parsed_url = urlparse(emsesp_url)
host_ip = parsed_url.netloc
signon_url = f"{emsesp_url}/rest/signIn"
signon_headers = {
'Host': host_ip,
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0',
'Accept': '*/*',
'Accept-Language': 'en-US',
'Accept-Encoding': 'gzip, deflate',
'Referer': f'{emsesp_url}',
'Content-Type': 'application/json',
'Connection': 'keep-alive'
}
signon_headers = build_headers(host_ip, emsesp_url)
username_password = {
"username": username,
@@ -114,19 +130,16 @@ def on_upload(source, target, env):
monitor = MultipartEncoderMonitor(
encoder, lambda monitor: bar.update(monitor.bytes_read - bar.n))
post_headers = {
'Host': host_ip,
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0',
'Accept': '*/*',
'Accept-Language': 'en-US',
'Accept-Encoding': 'gzip, deflate',
'Referer': f'{emsesp_url}',
'Connection': 'keep-alive',
'Content-Type': monitor.content_type,
'Content-Length': str(monitor.len),
'Origin': f'{emsesp_url}',
'Authorization': 'Bearer ' + f'{access_token}'
}
post_headers = build_headers(
host_ip,
emsesp_url,
content_type=monitor.content_type,
access_token=access_token,
extra_headers={
'Content-Length': str(monitor.len),
'Origin': emsesp_url
}
)
upload_url = f"{emsesp_url}/rest/uploadFile"
@@ -139,25 +152,15 @@ def on_upload(source, target, env):
print()
if response.status_code != 200:
print_fail("Upload failed (code " + response.status.code + ").")
print_fail(f"Upload failed (code {response.status_code}).")
else:
print_success("Upload successful. Rebooting device.")
restart_headers = {
'Host': host_ip,
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0',
'Accept': '*/*',
'Accept-Language': 'en-US',
'Accept-Encoding': 'gzip, deflate',
'Referer': f'{emsesp_url}',
'Content-Type': 'application/json',
'Connection': 'keep-alive',
'Authorization': 'Bearer ' + f'{access_token}'
}
restart_headers = build_headers(
host_ip, emsesp_url, access_token=access_token)
restart_url = f"{emsesp_url}/api/system/restart"
response = requests.get(restart_url, headers=restart_headers)
if response.status_code != 200:
print_fail("Restart failed (code " +
str(response.status_code) + ")")
print_fail(f"Restart failed (code {response.status_code})")
print()

View File

@@ -11,125 +11,154 @@
# python3 upload_cli.py -i 10.10.10.175 -f ../build/firmware/EMS-ESP-3_7_0-dev_34-ESP32S3-16MB+.bin
import argparse
import requests
import hashlib
from urllib.parse import urlparse
import sys
import time
from pathlib import Path
import requests
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
from tqdm import tqdm
from termcolor import cprint
from tqdm import tqdm
# Constants
USER_AGENT = 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0'
CHUNK_SIZE = 8192 # 8KB chunks for MD5 calculation
def print_success(x): return cprint(x, 'green')
def print_fail(x): return cprint(x, 'red')
def print_success(x):
return cprint(x, 'green')
def print_fail(x):
return cprint(x, 'red')
def calculate_md5(file_path):
"""Calculate MD5 hash of a file in chunks for memory efficiency."""
md5_hash = hashlib.md5()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(CHUNK_SIZE), b''):
md5_hash.update(chunk)
return md5_hash.hexdigest()
def create_base_headers(host_ip, emsesp_url):
"""Create base headers used across all requests."""
return {
'Host': host_ip,
'User-Agent': USER_AGENT,
'Accept': '*/*',
'Accept-Language': 'en-US',
'Accept-Encoding': 'gzip, deflate',
'Referer': emsesp_url,
'Connection': 'keep-alive'
}
def upload(file, ip, username, password):
"""Upload firmware to EMS-ESP device."""
# Print welcome message
print()
print("EMS-ESP Firmware Upload")
# first check authentication
emsesp_url = "http://" + f'{ip}'
parsed_url = urlparse(emsesp_url)
host_ip = parsed_url.netloc
signon_url = f"{emsesp_url}/rest/signIn"
signon_headers = {
'Host': host_ip,
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0',
'Accept': '*/*',
'Accept-Language': 'en-US',
'Accept-Encoding': 'gzip, deflate',
'Referer': f'{emsesp_url}',
'Content-Type': 'application/json',
'Connection': 'keep-alive'
}
username_password = {
"username": username,
"password": password
}
response = requests.post(
signon_url, json=username_password, headers=signon_headers, auth=None)
if response.status_code != 200:
print_fail("Authentication failed (code " +
str(response.status_code) + ")")
# Validate file exists
file_path = Path(file)
if not file_path.exists():
print_fail(f"File not found: {file}")
return
print_success("Authentication successful")
access_token = response.json().get('access_token')
# Setup URLs and headers
emsesp_url = f"http://{ip}"
host_ip = ip
# Use a session for connection pooling and persistence
session = requests.Session()
try:
# Authentication
signon_url = f"{emsesp_url}/rest/signIn"
signon_headers = create_base_headers(host_ip, emsesp_url)
signon_headers['Content-Type'] = 'application/json'
# start the upload
with open(file, 'rb') as firmware:
md5 = hashlib.md5(firmware.read()).hexdigest()
firmware.seek(0)
encoder = MultipartEncoder(fields={
'MD5': md5,
'file': (file, firmware, 'application/octet-stream')}
)
bar = tqdm(desc='Upload Progress',
total=encoder.len,
dynamic_ncols=True,
unit='B',
unit_scale=True,
unit_divisor=1024
)
monitor = MultipartEncoderMonitor(
encoder, lambda monitor: bar.update(monitor.bytes_read - bar.n))
post_headers = {
'Host': host_ip,
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0',
'Accept': '*/*',
'Accept-Language': 'en-US',
'Accept-Encoding': 'gzip, deflate',
'Referer': f'{emsesp_url}',
'Connection': 'keep-alive',
'Content-Type': monitor.content_type,
'Content-Length': str(monitor.len),
'Origin': f'{emsesp_url}',
'Authorization': 'Bearer ' + f'{access_token}'
username_password = {
"username": username,
"password": password
}
upload_url = f"{emsesp_url}/rest/uploadFile"
response = requests.post(
upload_url, data=monitor, headers=post_headers, auth=None)
bar.close()
time.sleep(0.1)
response = session.post(signon_url, json=username_password, headers=signon_headers)
if response.status_code != 200:
print_fail("Upload failed (code " + response.status.code + ").")
else:
print_success("Upload successful. Rebooting device.")
restart_headers = {
'Host': host_ip,
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0',
'Accept': '*/*',
'Accept-Language': 'en-US',
'Accept-Encoding': 'gzip, deflate',
'Referer': f'{emsesp_url}',
'Content-Type': 'application/json',
'Connection': 'keep-alive',
'Authorization': 'Bearer ' + f'{access_token}'
}
restart_url = f"{emsesp_url}/api/system/restart"
response = requests.get(restart_url, headers=restart_headers)
if response.status_code != 200:
print_fail("Restart failed (code " +
str(response.status_code) + ")")
print_fail(f"Authentication failed (code {response.status_code})")
return
print()
print_success("Authentication successful")
access_token = response.json().get('access_token')
# Calculate MD5 hash
print("Calculating MD5 hash...")
md5 = calculate_md5(file_path)
# Start the upload
with open(file_path, 'rb') as firmware:
encoder = MultipartEncoder(fields={
'MD5': md5,
'file': (file, firmware, 'application/octet-stream')
})
bar = tqdm(
desc='Upload Progress',
total=encoder.len,
dynamic_ncols=True,
unit='B',
unit_scale=True,
unit_divisor=1024
)
monitor = MultipartEncoderMonitor(
encoder, lambda monitor: bar.update(monitor.bytes_read - bar.n))
post_headers = create_base_headers(host_ip, emsesp_url)
post_headers.update({
'Content-Type': monitor.content_type,
'Content-Length': str(monitor.len),
'Origin': emsesp_url,
'Authorization': f'Bearer {access_token}'
})
upload_url = f"{emsesp_url}/rest/uploadFile"
response = session.post(upload_url, data=monitor, headers=post_headers)
bar.close()
time.sleep(0.1)
if response.status_code != 200:
print_fail(f"Upload failed (code {response.status_code})")
else:
print_success("Upload successful. Rebooting device.")
restart_headers = create_base_headers(host_ip, emsesp_url)
restart_headers.update({
'Content-Type': 'application/json',
'Authorization': f'Bearer {access_token}'
})
restart_url = f"{emsesp_url}/api/system/restart"
response = session.get(restart_url, headers=restart_headers)
if response.status_code != 200:
print_fail(f"Restart failed (code {response.status_code})")
except requests.RequestException as e:
print_fail(f"Network error: {e}")
sys.exit(1)
except IOError as e:
print_fail(f"File error: {e}")
sys.exit(1)
except Exception as e:
print_fail(f"Unexpected error: {e}")
sys.exit(1)
finally:
session.close()
print()
# main