From 5cba92571791b8e8c1ace475fe54a5bd5f02bd05 Mon Sep 17 00:00:00 2001 From: proddy Date: Wed, 25 Sep 2024 17:32:31 +0200 Subject: [PATCH] formatting --- scripts/build_interface.py | 1 + scripts/clang-format.py | 10 ++-- scripts/clang-tidy.py | 14 ++++-- scripts/generate-modbus-register-doc.py | 1 + scripts/helpers.py | 22 +++++++-- scripts/refresh_module_library_native.py | 6 ++- scripts/rename_fw.py | 47 ++++++++++-------- scripts/run_memory_test.py | 63 ++++++++++++++++-------- scripts/run_native.py | 4 +- scripts/strip_csv.py | 2 +- scripts/update_modbus_registers.py | 9 ++-- scripts/upload.py | 40 +++++++++------ scripts/upload_cli.py | 39 +++++++++------ 13 files changed, 169 insertions(+), 89 deletions(-) diff --git a/scripts/build_interface.py b/scripts/build_interface.py index 229b54c3b..eb9f991fc 100755 --- a/scripts/build_interface.py +++ b/scripts/build_interface.py @@ -3,6 +3,7 @@ import os Import("env") + def buildWeb(): os.chdir("interface") print("Building web interface...") diff --git a/scripts/clang-format.py b/scripts/clang-format.py index 185094457..044f2bc77 100755 --- a/scripts/clang-format.py +++ b/scripts/clang-format.py @@ -3,6 +3,7 @@ # copied from esphome # run from Linux using ./scripts/clang-forrmat.py +from helpers import get_output, src_files, filter_changed import argparse import multiprocessing import os @@ -14,7 +15,7 @@ import threading import click sys.path.append(os.path.dirname(__file__)) -from helpers import get_output, src_files, filter_changed + def run_format(args, queue, lock, failed_files): """Takes filenames out of queue and runs clang-format on them.""" @@ -27,11 +28,13 @@ def run_format(args, queue, lock, failed_files): invocation.extend(['--dry-run', '-Werror']) invocation.append(path) - proc = subprocess.run(invocation, capture_output=True, encoding='utf-8') + proc = subprocess.run( + invocation, capture_output=True, encoding='utf-8') if proc.returncode != 0: with lock: print() - print("\033[0;32m************* File \033[1;32m{}\033[0m".format(path)) + print( + "\033[0;32m************* File \033[1;32m{}\033[0m".format(path)) print(proc.stdout) print(proc.stderr) print() @@ -42,6 +45,7 @@ def run_format(args, queue, lock, failed_files): def progress_bar_show(value): return value if value is not None else '' + def main(): parser = argparse.ArgumentParser() parser.add_argument('-j', '--jobs', type=int, diff --git a/scripts/clang-tidy.py b/scripts/clang-tidy.py index 6a86f01d3..0e397bff2 100755 --- a/scripts/clang-tidy.py +++ b/scripts/clang-tidy.py @@ -3,6 +3,8 @@ # copied from esphome # run from Linux using ./scripts/clang-forrmat.py +from helpers import shlex_quote, get_output, \ + build_all_include, temp_header_file, filter_changed, load_idedata, src_files import argparse import multiprocessing import os @@ -16,8 +18,7 @@ import click import pexpect sys.path.append(os.path.dirname(__file__)) -from helpers import shlex_quote, get_output, \ - build_all_include, temp_header_file, filter_changed, load_idedata, src_files + def clang_options(idedata): cmd = [ @@ -46,6 +47,7 @@ def clang_options(idedata): return cmd + def run_tidy(args, options, tmpdir, queue, lock, failed_files): while True: path = queue.get() @@ -73,20 +75,24 @@ def run_tidy(args, options, tmpdir, queue, lock, failed_files): if rc != 0: with lock: print() - print("\033[0;32m************* File \033[1;32m{}\033[0m".format(path)) + print( + "\033[0;32m************* File \033[1;32m{}\033[0m".format(path)) print(output) print() failed_files.append(path) queue.task_done() + def progress_bar_show(value): if value is None: return '' + def split_list(a, n): k, m = divmod(len(a), n) return [a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n)] + def main(): parser = argparse.ArgumentParser() parser.add_argument('-j', '--jobs', type=int, @@ -187,4 +193,4 @@ def main(): if __name__ == '__main__': - main() \ No newline at end of file + main() diff --git a/scripts/generate-modbus-register-doc.py b/scripts/generate-modbus-register-doc.py index b43ef0c79..5d735542a 100644 --- a/scripts/generate-modbus-register-doc.py +++ b/scripts/generate-modbus-register-doc.py @@ -59,6 +59,7 @@ with fileinput.input() as f_input: entity[headers[i]] = val entities.append(entity) + def device_name_key(e): return e["device name"] diff --git a/scripts/helpers.py b/scripts/helpers.py index efdce500d..1dc858091 100755 --- a/scripts/helpers.py +++ b/scripts/helpers.py @@ -4,7 +4,8 @@ import subprocess import json from pathlib import Path -root_path = os.path.abspath(os.path.normpath(os.path.join(__file__, "..", ".."))) +root_path = os.path.abspath(os.path.normpath( + os.path.join(__file__, "..", ".."))) basepath = os.path.join(root_path, "src") temp_folder = os.path.join(root_path, ".temp") temp_header_file = os.path.join(temp_folder, "all-include.cpp") @@ -18,6 +19,7 @@ def shlex_quote(s): return "'" + s.replace("'", "'\"'\"'") + "'" + def build_all_include(): # Build a cpp file that includes all header files in this repo. # Otherwise header-only integrations would not be tested by clang-tidy @@ -36,6 +38,7 @@ def build_all_include(): p.parent.mkdir(exist_ok=True) p.write_text(content) + def src_files(filetypes=None): file_list = [] for path in walk_files(basepath): @@ -45,24 +48,31 @@ def src_files(filetypes=None): file_list.append(path) return file_list + def walk_files(path): for root, _, files in os.walk(path): for name in files: yield os.path.join(root, name) + def get_output(*args): - proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + proc = subprocess.Popen( + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = proc.communicate() return output.decode("utf-8") + def get_err(*args): - proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + proc = subprocess.Popen( + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = proc.communicate() return err.decode("utf-8") + def splitlines_no_ends(string): return [s.strip() for s in string.splitlines()] + def changed_files(): check_remotes = ["upstream", "origin"] check_remotes.extend(splitlines_no_ends(get_output("git", "remote"))) @@ -81,6 +91,7 @@ def changed_files(): changed.sort() return changed + def filter_changed(files): changed = changed_files() files = [f for f in files if f in changed] @@ -91,6 +102,7 @@ def filter_changed(files): print(f" {c}") return files + def git_ls_files(patterns=None): command = ["git", "ls-files", "-s"] if patterns is not None: @@ -100,6 +112,7 @@ def git_ls_files(patterns=None): lines = [x.split() for x in output.decode("utf-8").splitlines()] return {s[3].strip(): int(s[0]) for s in lines} + def load_idedata(environment): platformio_ini = Path(root_path) / "platformio.ini" temp_idedata = Path(temp_folder) / f"idedata-{environment}.json" @@ -113,7 +126,8 @@ def load_idedata(environment): if not changed: return json.loads(temp_idedata.read_text()) - stdout = subprocess.check_output(["pio", "run", "-t", "idedata", "-e", environment]) + stdout = subprocess.check_output( + ["pio", "run", "-t", "idedata", "-e", environment]) match = re.search(r'{\s*".*}', stdout.decode("utf-8")) data = json.loads(match.group()) diff --git a/scripts/refresh_module_library_native.py b/scripts/refresh_module_library_native.py index bb647ebf6..0ede598c1 100644 --- a/scripts/refresh_module_library_native.py +++ b/scripts/refresh_module_library_native.py @@ -2,10 +2,12 @@ import os Import("env") + def refresh_module(): print("Fetching latest module for Native target...") - os.system("pio pkg install -f -s -e native -l file://../../modules/EMS-ESP-Modules") + os.system( + "pio pkg install -f -s -e native -l file://../../modules/EMS-ESP-Modules") + if not (env.IsCleanTarget()): refresh_module() - diff --git a/scripts/rename_fw.py b/scripts/rename_fw.py index ca32657ed..4e5261358 100755 --- a/scripts/rename_fw.py +++ b/scripts/rename_fw.py @@ -1,11 +1,12 @@ +import hashlib import shutil import re import os Import("env") -import hashlib OUTPUT_DIR = "build{}".format(os.path.sep) + def bin_copy(source, target, env): # get the application version from version.h @@ -19,14 +20,14 @@ def bin_copy(source, target, env): bag[var] = m.group(1) app_version = bag.get('app_version') - + # print(env.Dump()) # get the chip type, in uppercase mcu = env.get('BOARD_MCU').upper() # alternatively take platform from the pio target # platform = str(target[0]).split(os.path.sep)[2] - + # work out the flash memory from the PIO env name (sloppy but works) # unfortunately the board_upload.flash_size is not passed down flash_mem = "4MB" @@ -37,21 +38,21 @@ def bin_copy(source, target, env): index = -2 else: index = -1 - - # if doesn't have an M at the end + + # if doesn't have an M at the end if parts[index].endswith("M"): flash_mem = parts[index] + "B" - + # find if BOARD_HAS_PSRAM is in the cppdefines cppdefines = env.get("CPPDEFINES") if 'BOARD_HAS_PSRAM' in cppdefines: psram = True else: psram = False - + print("*********************************************") print("EMS-ESP version: " + app_version) - + # show psram as Yes or No psram_status = "Yes" if psram else "No" print("Has PSRAM: " + psram_status) @@ -60,7 +61,9 @@ def bin_copy(source, target, env): # convert . to _ so Windows doesn't complain # Format is EMS-ESP--- with + at the end if it has PSRAM - variant = "EMS-ESP-" + app_version.replace(".", "_") + "-" + mcu + "-" + flash_mem + ("+" if psram else "") + variant = "EMS-ESP-" + \ + app_version.replace(".", "_") + "-" + mcu + "-" + \ + flash_mem + ("+" if psram else "") # check if output directories exist and create if necessary if not os.path.isdir(OUTPUT_DIR): @@ -89,13 +92,13 @@ def bin_copy(source, target, env): # copy firmware.bin to firmware/.bin shutil.copy(str(target[0]), bin_file) - with open(bin_file,"rb") as f: + with open(bin_file, "rb") as f: result = hashlib.md5(f.read()) print("MD5: "+result.hexdigest()) file1 = open(md5_file, 'w') file1.write(result.hexdigest()) file1.close() - + # make a copy using the old 3.6.x filename format for backwards compatibility with the WebUI version check, e.g. # create a EMS-ESP--ESP32_S3.bin if target is ci_s3_16M_P (16MB, PSRAM) # create a EMS-ESP--ESP32.bin if target is ci_s_4M (4MB, no PSRAM) @@ -105,24 +108,28 @@ def bin_copy(source, target, env): # extra_variant = "" if env.get('PIOENV') == "ci_s3_16M_P": - extra_variant = "EMS-ESP-" + app_version.replace(".", "_") + "-ESP32_S3" + extra_variant = "EMS-ESP-" + \ + app_version.replace(".", "_") + "-ESP32_S3" elif env.get('PIOENV') == "ci_s_4M": extra_variant = "EMS-ESP-" + app_version.replace(".", "_") + "-ESP32" - - if extra_variant: - extra_bin_file = "{}firmware{}{}.bin".format(OUTPUT_DIR, os.path.sep, extra_variant) + + if extra_variant: + extra_bin_file = "{}firmware{}{}.bin".format( + OUTPUT_DIR, os.path.sep, extra_variant) if os.path.isfile(extra_bin_file): os.remove(extra_bin_file) - - extra_md5_file = "{}firmware{}{}.md5".format(OUTPUT_DIR, os.path.sep, extra_variant) + + extra_md5_file = "{}firmware{}{}.md5".format( + OUTPUT_DIR, os.path.sep, extra_variant) if os.path.isfile(extra_md5_file): - os.remove(extra_md5_file) - + os.remove(extra_md5_file) + shutil.copy(bin_file, extra_bin_file) shutil.copy(md5_file, extra_md5_file) print("Filename copy for 3.6.x: "+extra_bin_file) - + print("*********************************************") + env.AddPostAction("$BUILD_DIR/${PROGNAME}.bin", [bin_copy]) env.AddPostAction("$BUILD_DIR/${PROGNAME}.md5", [bin_copy]) diff --git a/scripts/run_memory_test.py b/scripts/run_memory_test.py index fb826d70c..6fdafcba5 100755 --- a/scripts/run_memory_test.py +++ b/scripts/run_memory_test.py @@ -14,13 +14,15 @@ import platform # For getting the operating system type import subprocess # For executing a shell command from termcolor import cprint + def print_success(x): return cprint(x, 'green') def print_fail(x): return cprint(x, 'red') + def ping_until_up(ip, text): print(text + "...", flush=True, end="") time.sleep(1) - param = '-n' if platform.system().lower()=='windows' else '-c' + param = '-n' if platform.system().lower() == 'windows' else '-c' command = ["ping", param, "2", ip] while True: if (subprocess.run(args=command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 0): @@ -30,13 +32,15 @@ def ping_until_up(ip, text): print(".", flush=True, end="") time.sleep(1) + def run_test(ip, wait, name, token): BASE_URL = "http://" + str(ip) INFO_URL = BASE_URL + "/api/system/info" RESTART_URL = BASE_URL + "/api/system/restart" TEST_URL = BASE_URL + "/api?device=system&cmd=test&data=" + name - GET_HEADERS = { 'Content-Type': 'application/json'} - GET_HEADERS_SECURE = { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + str(token) } + GET_HEADERS = {'Content-Type': 'application/json'} + GET_HEADERS_SECURE = {'Content-Type': 'application/json', + 'Authorization': 'Bearer ' + str(token)} # BODY = json.dumps({ "value": 22.5 }) start = timer() @@ -51,12 +55,15 @@ def run_test(ip, wait, name, token): end = timer() # check if IP exists - ping_until_up(ip, "(" + str(round(end - start, 1)) + ")\t1. Checking if EMS-ESP is reachable") + ping_until_up(ip, "(" + str(round(end - start, 1)) + + ")\t1. Checking if EMS-ESP is reachable") end = timer() # Restart EMS-ESP - print("(" + str(round(end - start, 1)) + ")\t2. Doing a cold restart...", end="") - response = requests.get(RESTART_URL, headers=GET_HEADERS_SECURE, verify=False) + print("(" + str(round(end - start, 1)) + + ")\t2. Doing a cold restart...", end="") + response = requests.get( + RESTART_URL, headers=GET_HEADERS_SECURE, verify=False) if (response.status_code != 200): print_fail("Failed") return @@ -64,20 +71,24 @@ def run_test(ip, wait, name, token): end = timer() # Wait for EMS-ESP to come back up and reconnect to WiFi - ping_until_up(ip, "(" + str(round(end - start, 1)) + ")\t3. Waiting for EMS-ESP to come back online") + ping_until_up(ip, "(" + str(round(end - start, 1)) + + ")\t3. Waiting for EMS-ESP to come back online") end = timer() - print("(" + str(round(end - start, 1)) + ")\t4. Getting initial memory stats...", flush=True, end="") + print("(" + str(round(end - start, 1)) + + ")\t4. Getting initial memory stats...", flush=True, end="") time.sleep(1) response = requests.get(INFO_URL, headers=GET_HEADERS, verify=False) uptime_a = response.json()['System Info']['uptime (seconds)'] freemem_a = response.json()['System Info']['free mem'] maxalloc_a = response.json()['System Info']['max alloc'] - print_success("Uptime is " + str(uptime_a) + " secs, Free mem/Max alloc before=" + str(freemem_a) + "/" + str(maxalloc_a) ) + print_success("Uptime is " + str(uptime_a) + + " secs, Free mem/Max alloc before=" + str(freemem_a) + "/" + str(maxalloc_a)) end = timer() # run test - print("(" + str(round(end - start, 1)) + ")\t5. Running test called '" + name + "'...", end="") + print("(" + str(round(end - start, 1)) + + ")\t5. Running test called '" + name + "'...", end="") response = requests.get(TEST_URL, headers=GET_HEADERS, verify=False) test_output = response.json()['message'] if (test_output != 'OK'): @@ -87,37 +98,47 @@ def run_test(ip, wait, name, token): end = timer() # wait n seconds - print("(" + str(round(end - start, 1)) + ")\t6. Waiting for " + str(wait) + " seconds...", flush=True, end="") + print("(" + str(round(end - start, 1)) + ")\t6. Waiting for " + + str(wait) + " seconds...", flush=True, end="") time.sleep(wait) print_success("Done") end = timer() # get latest stats - print("(" + str(round(end - start, 1)) + ")\t7. Getting latest memory stats...", end="") + print("(" + str(round(end - start, 1)) + + ")\t7. Getting latest memory stats...", end="") response = requests.get(INFO_URL, headers=GET_HEADERS, verify=False) uptime_b = response.json()['System Info']['uptime (seconds)'] freemem_b = response.json()['System Info']['free mem'] maxalloc_b = response.json()['System Info']['max alloc'] - print_success("Uptime is " + str(uptime_b) + " secs, Free mem/Max alloc after=" + str(freemem_b) + "/" + str(maxalloc_b) ) + print_success("Uptime is " + str(uptime_b) + + " secs, Free mem/Max alloc after=" + str(freemem_b) + "/" + str(maxalloc_b)) print() # check if it worked and report back if (uptime_b <= uptime_a): print(" Error! EMS-ESP crashed and restarted :-(") else: - print("In the " + str(uptime_b - uptime_a) + " seconds elapsed, we have Free mem/Max alloc: ", end="") + print("In the " + str(uptime_b - uptime_a) + + " seconds elapsed, we have Free mem/Max alloc: ", end="") cprint("before=" + str(freemem_a) + "/" + str(maxalloc_a) + - " after=" + str(freemem_b) + "/" + str(maxalloc_b) + - " diff=" + str(freemem_a - freemem_b) + "/" + str(maxalloc_a - maxalloc_b), "cyan", attrs=["bold"]) + " after=" + str(freemem_b) + "/" + str(maxalloc_b) + + " diff=" + str(freemem_a - freemem_b) + "/" + str(maxalloc_a - maxalloc_b), "cyan", attrs=["bold"]) # finish print() + # main -parser = argparse.ArgumentParser(description="Benchmark EMS-ESP, memory profiler") -parser.add_argument("-i", "--ip", metavar="IP", type=str, default="ems-esp.local", help="IP address of EMS-ESP") -parser.add_argument("-w", "--wait", metavar="WAIT", type=int, default="10", help="time to wait between test") -parser.add_argument("-n", "--name", metavar="NAME", type=str, default="memory", help="Name of test to run") -parser.add_argument("-t", "--token", metavar="TOKEN", type=str, help="Bearer Token") +parser = argparse.ArgumentParser( + description="Benchmark EMS-ESP, memory profiler") +parser.add_argument("-i", "--ip", metavar="IP", type=str, + default="ems-esp.local", help="IP address of EMS-ESP") +parser.add_argument("-w", "--wait", metavar="WAIT", type=int, + default="10", help="time to wait between test") +parser.add_argument("-n", "--name", metavar="NAME", type=str, + default="memory", help="Name of test to run") +parser.add_argument("-t", "--token", metavar="TOKEN", + type=str, help="Bearer Token") args = parser.parse_args() run_test(**vars(args)) diff --git a/scripts/run_native.py b/scripts/run_native.py index 2dd01ebea..ed741a5e0 100644 --- a/scripts/run_native.py +++ b/scripts/run_native.py @@ -5,6 +5,7 @@ Import("env") OUTPUT_DIR = "build{}".format(os.path.sep) + def move_file(source, target, env): # get the build info @@ -53,8 +54,9 @@ def move_file(source, target, env): print("Renaming file to "+bin_file) shutil.copy(str(target[0]), bin_file) - + print("Executing file") os.system(bin_file) + env.AddPostAction("$BUILD_DIR/${PROGNAME}", [move_file]) diff --git a/scripts/strip_csv.py b/scripts/strip_csv.py index ed0486663..8d07ef522 100755 --- a/scripts/strip_csv.py +++ b/scripts/strip_csv.py @@ -10,4 +10,4 @@ with fileinput.input() as f_input: elif line.startswith('---- CSV END ----'): inRecordingMode = False else: - print(line, end="") + print(line, end="") diff --git a/scripts/update_modbus_registers.py b/scripts/update_modbus_registers.py index 05faf96de..3df1c78f5 100644 --- a/scripts/update_modbus_registers.py +++ b/scripts/update_modbus_registers.py @@ -192,10 +192,12 @@ for entity in entities: # set size for string entities if entity["modbus count"] == "0" and entity_dev_name in string_sizes: - entity["modbus count"] = -(-string_sizes[entity_dev_name] // 2) # divide and round up + entity["modbus count"] = - \ + (-string_sizes[entity_dev_name] // 2) # divide and round up if int(entity["modbus count"]) <= 0: - raise Exception('Entity "' + entity_shortname + '" does not have a size - string sizes need to be added manually to update_modbus_registers.py') + raise Exception('Entity "' + entity_shortname + + '" does not have a size - string sizes need to be added manually to update_modbus_registers.py') # if entity["modbus count"] == "0": # print("ignoring " + entity_dev_name + " - it has a register length of zero") @@ -254,7 +256,8 @@ for device_type_name in device_type_names: for entity_name, modbus_info in sorted(entities.items(), key=lambda x: int(x[1]["modbus offset"])): params = { 'devtype': "dt::" + device_type_name, - "tagtype": tag_to_tagtype[int(tag)], # re.sub(r"[0-9]+", "*", tag), + # re.sub(r"[0-9]+", "*", tag), + "tagtype": tag_to_tagtype[int(tag)], "shortname": 'FL_(' + listNames[entity_name] + ")", "entity_name": entity_name, 'registeroffset': modbus_info["modbus offset"], diff --git a/scripts/upload.py b/scripts/upload.py index 69d27d7f3..a191f2abc 100644 --- a/scripts/upload.py +++ b/scripts/upload.py @@ -10,7 +10,7 @@ # custom_emsesp_ip = ems-esp.local # custom_username = admin # custom_password = admin -# +# # and # extra_scripts = scripts/upload.py # @@ -33,16 +33,19 @@ except ImportError: from tqdm import tqdm from termcolor import cprint + def print_success(x): return cprint(x, 'green') def print_fail(x): return cprint('Error: '+x, 'red') + def on_upload(source, target, env): # make sure we have set the upload_protocol to custom if env.get('UPLOAD_PROTOCOL') != 'custom': - print_fail("Please set upload_protocol = custom in your pio_local.ini file when using upload.py") + print_fail( + "Please set upload_protocol = custom in your pio_local.ini file when using upload.py") return - + # first check authentication try: username = env.GetProjectOption('custom_username') @@ -51,12 +54,11 @@ def on_upload(source, target, env): except: print_fail('Missing settings. Add these to your pio_local.ini file: \n\ncustom_username=username\ncustom_password=password\ncustom_emsesp_ip=ems-esp.local\n') return - emsesp_url = "http://" + env.GetProjectOption('custom_emsesp_ip') parsed_url = urlparse(emsesp_url) host_ip = parsed_url.netloc - + signon_url = f"{emsesp_url}/rest/signIn" signon_headers = { @@ -75,10 +77,12 @@ def on_upload(source, target, env): "password": password } - response = requests.post(signon_url, json=username_password, headers=signon_headers) - + response = requests.post( + signon_url, json=username_password, headers=signon_headers) + if response.status_code != 200: - print_fail("Authentication with EMS-ESP failed (code " + str(response.status_code) + ")") + print_fail("Authentication with EMS-ESP failed (code " + + str(response.status_code) + ")") return print_success("Authentication with EMS-ESP successful") @@ -89,7 +93,7 @@ def on_upload(source, target, env): with open(firmware_path, 'rb') as firmware: md5 = hashlib.md5(firmware.read()).hexdigest() - + firmware.seek(0) encoder = MultipartEncoder(fields={ @@ -105,7 +109,8 @@ def on_upload(source, target, env): unit_divisor=1024 ) - monitor = MultipartEncoderMonitor(encoder, lambda monitor: bar.update(monitor.bytes_read - bar.n)) + monitor = MultipartEncoderMonitor( + encoder, lambda monitor: bar.update(monitor.bytes_read - bar.n)) post_headers = { 'Host': host_ip, @@ -123,13 +128,14 @@ def on_upload(source, target, env): upload_url = f"{emsesp_url}/rest/uploadFile" - response = requests.post(upload_url, data=monitor, headers=post_headers) - + response = requests.post( + upload_url, data=monitor, headers=post_headers) + bar.close() time.sleep(0.1) - + print() - + if response.status_code != 200: print_fail("Upload failed (code " + response.status.code + ").") else: @@ -148,8 +154,10 @@ def on_upload(source, target, env): restart_url = f"{emsesp_url}/api/system/restart" response = requests.get(restart_url, headers=restart_headers) if response.status_code != 200: - print_fail("Restart failed (code " + str(response.status_code) + ")") + print_fail("Restart failed (code " + + str(response.status_code) + ")") print() -env.Replace(UPLOADCMD=on_upload) \ No newline at end of file + +env.Replace(UPLOADCMD=on_upload) diff --git a/scripts/upload_cli.py b/scripts/upload_cli.py index d407983a2..a827ddd77 100644 --- a/scripts/upload_cli.py +++ b/scripts/upload_cli.py @@ -19,9 +19,11 @@ from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor from tqdm import tqdm from termcolor import cprint + def print_success(x): return cprint(x, 'green') def print_fail(x): return cprint(x, 'red') + def upload(file, ip, username, password): # Print welcome message @@ -32,7 +34,7 @@ def upload(file, ip, username, password): emsesp_url = "http://" + f'{ip}' parsed_url = urlparse(emsesp_url) host_ip = parsed_url.netloc - + signon_url = f"{emsesp_url}/rest/signIn" signon_headers = { @@ -51,10 +53,12 @@ def upload(file, ip, username, password): "password": password } - response = requests.post(signon_url, json=username_password, headers=signon_headers, auth=None) - + response = requests.post( + signon_url, json=username_password, headers=signon_headers, auth=None) + if response.status_code != 200: - print_fail("Authentication failed (code " + str(response.status_code) + ")") + print_fail("Authentication failed (code " + + str(response.status_code) + ")") return print_success("Authentication successful") @@ -63,7 +67,7 @@ def upload(file, ip, username, password): # start the upload with open(file, 'rb') as firmware: md5 = hashlib.md5(firmware.read()).hexdigest() - + firmware.seek(0) encoder = MultipartEncoder(fields={ @@ -79,7 +83,8 @@ def upload(file, ip, username, password): unit_divisor=1024 ) - monitor = MultipartEncoderMonitor(encoder, lambda monitor: bar.update(monitor.bytes_read - bar.n)) + monitor = MultipartEncoderMonitor( + encoder, lambda monitor: bar.update(monitor.bytes_read - bar.n)) post_headers = { 'Host': host_ip, @@ -97,8 +102,9 @@ def upload(file, ip, username, password): upload_url = f"{emsesp_url}/rest/uploadFile" - response = requests.post(upload_url, data=monitor, headers=post_headers, auth=None) - + response = requests.post( + upload_url, data=monitor, headers=post_headers, auth=None) + bar.close() time.sleep(0.1) @@ -120,16 +126,21 @@ def upload(file, ip, username, password): restart_url = f"{emsesp_url}/api/system/restart" response = requests.get(restart_url, headers=restart_headers) if response.status_code != 200: - print_fail("Restart failed (code " + str(response.status_code) + ")") + print_fail("Restart failed (code " + + str(response.status_code) + ")") print() + # main parser = argparse.ArgumentParser(description="EMS-ESP Firmware Upload") -parser.add_argument("-f", "--file", metavar="FILE", required=True, type=str, help="firmware file") -parser.add_argument("-i", "--ip", metavar="IP", type=str, default="ems-esp.local", help="IP address of EMS-ESP") -parser.add_argument("-u", "--username", metavar="USERNAME", type=str, default="admin", help="admin user") -parser.add_argument("-p", "--password", metavar="PASSWORD", type=str, default="admin", help="admin password") +parser.add_argument("-f", "--file", metavar="FILE", + required=True, type=str, help="firmware file") +parser.add_argument("-i", "--ip", metavar="IP", type=str, + default="ems-esp.local", help="IP address of EMS-ESP") +parser.add_argument("-u", "--username", metavar="USERNAME", + type=str, default="admin", help="admin user") +parser.add_argument("-p", "--password", metavar="PASSWORD", + type=str, default="admin", help="admin password") args = parser.parse_args() upload(**vars(args)) -