From 5d8fe89e5a8ab310270da3239bc5a85e661e7d6a Mon Sep 17 00:00:00 2001 From: proddy Date: Mon, 3 Nov 2025 18:01:09 +0100 Subject: [PATCH] python optimizations --- scripts/upload.py | 89 +++++++++-------- scripts/upload_cli.py | 225 ++++++++++++++++++++++++------------------ 2 files changed, 173 insertions(+), 141 deletions(-) diff --git a/scripts/upload.py b/scripts/upload.py index f0d7322d7..19be74710 100644 --- a/scripts/upload.py +++ b/scripts/upload.py @@ -36,8 +36,34 @@ except ImportError: from termcolor import cprint -def print_success(x): return cprint(x, 'green') -def print_fail(x): return cprint('Error: '+x, 'red') +def print_success(x): + cprint(x, 'green') + + +def print_fail(x): + cprint(f'Error: {x}', 'red') + + +def build_headers(host_ip, emsesp_url, content_type='application/json', access_token=None, extra_headers=None): + """Build common HTTP headers with optional overrides.""" + headers = { + 'Host': host_ip, + 'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0', + 'Accept': '*/*', + 'Accept-Language': 'en-US', + 'Accept-Encoding': 'gzip, deflate', + 'Referer': emsesp_url, + 'Content-Type': content_type, + 'Connection': 'keep-alive' + } + + if access_token: + headers['Authorization'] = f'Bearer {access_token}' + + if extra_headers: + headers.update(extra_headers) + + return headers def on_upload(source, target, env): @@ -53,26 +79,16 @@ def on_upload(source, target, env): username = env.GetProjectOption('custom_username') password = env.GetProjectOption('custom_password') emsesp_ip = env.GetProjectOption('custom_emsesp_ip') - except: - print_fail('Missing settings. Add these to your pio_local.ini file: \n\ncustom_username=username\ncustom_password=password\ncustom_emsesp_ip=ems-esp.local\n') + except Exception as e: + print_fail(f'Missing settings. Add these to your pio_local.ini file:\n\ncustom_username=username\ncustom_password=password\ncustom_emsesp_ip=ems-esp.local\n') return - emsesp_url = "http://" + env.GetProjectOption('custom_emsesp_ip') + emsesp_url = f"http://{emsesp_ip}" parsed_url = urlparse(emsesp_url) host_ip = parsed_url.netloc signon_url = f"{emsesp_url}/rest/signIn" - - signon_headers = { - 'Host': host_ip, - 'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0', - 'Accept': '*/*', - 'Accept-Language': 'en-US', - 'Accept-Encoding': 'gzip, deflate', - 'Referer': f'{emsesp_url}', - 'Content-Type': 'application/json', - 'Connection': 'keep-alive' - } + signon_headers = build_headers(host_ip, emsesp_url) username_password = { "username": username, @@ -114,19 +130,16 @@ def on_upload(source, target, env): monitor = MultipartEncoderMonitor( encoder, lambda monitor: bar.update(monitor.bytes_read - bar.n)) - post_headers = { - 'Host': host_ip, - 'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0', - 'Accept': '*/*', - 'Accept-Language': 'en-US', - 'Accept-Encoding': 'gzip, deflate', - 'Referer': f'{emsesp_url}', - 'Connection': 'keep-alive', - 'Content-Type': monitor.content_type, - 'Content-Length': str(monitor.len), - 'Origin': f'{emsesp_url}', - 'Authorization': 'Bearer ' + f'{access_token}' - } + post_headers = build_headers( + host_ip, + emsesp_url, + content_type=monitor.content_type, + access_token=access_token, + extra_headers={ + 'Content-Length': str(monitor.len), + 'Origin': emsesp_url + } + ) upload_url = f"{emsesp_url}/rest/uploadFile" @@ -139,25 +152,15 @@ def on_upload(source, target, env): print() if response.status_code != 200: - print_fail("Upload failed (code " + response.status.code + ").") + print_fail(f"Upload failed (code {response.status_code}).") else: print_success("Upload successful. Rebooting device.") - restart_headers = { - 'Host': host_ip, - 'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0', - 'Accept': '*/*', - 'Accept-Language': 'en-US', - 'Accept-Encoding': 'gzip, deflate', - 'Referer': f'{emsesp_url}', - 'Content-Type': 'application/json', - 'Connection': 'keep-alive', - 'Authorization': 'Bearer ' + f'{access_token}' - } + restart_headers = build_headers( + host_ip, emsesp_url, access_token=access_token) restart_url = f"{emsesp_url}/api/system/restart" response = requests.get(restart_url, headers=restart_headers) if response.status_code != 200: - print_fail("Restart failed (code " + - str(response.status_code) + ")") + print_fail(f"Restart failed (code {response.status_code})") print() diff --git a/scripts/upload_cli.py b/scripts/upload_cli.py index c4e066e3a..a6ed77681 100644 --- a/scripts/upload_cli.py +++ b/scripts/upload_cli.py @@ -11,125 +11,154 @@ # python3 upload_cli.py -i 10.10.10.175 -f ../build/firmware/EMS-ESP-3_7_0-dev_34-ESP32S3-16MB+.bin import argparse -import requests import hashlib -from urllib.parse import urlparse +import sys import time +from pathlib import Path + +import requests from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor -from tqdm import tqdm from termcolor import cprint +from tqdm import tqdm + +# Constants +USER_AGENT = 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0' +CHUNK_SIZE = 8192 # 8KB chunks for MD5 calculation -def print_success(x): return cprint(x, 'green') -def print_fail(x): return cprint(x, 'red') +def print_success(x): + return cprint(x, 'green') + + +def print_fail(x): + return cprint(x, 'red') + + +def calculate_md5(file_path): + """Calculate MD5 hash of a file in chunks for memory efficiency.""" + md5_hash = hashlib.md5() + with open(file_path, 'rb') as f: + for chunk in iter(lambda: f.read(CHUNK_SIZE), b''): + md5_hash.update(chunk) + return md5_hash.hexdigest() + + +def create_base_headers(host_ip, emsesp_url): + """Create base headers used across all requests.""" + return { + 'Host': host_ip, + 'User-Agent': USER_AGENT, + 'Accept': '*/*', + 'Accept-Language': 'en-US', + 'Accept-Encoding': 'gzip, deflate', + 'Referer': emsesp_url, + 'Connection': 'keep-alive' + } def upload(file, ip, username, password): - + """Upload firmware to EMS-ESP device.""" # Print welcome message print() print("EMS-ESP Firmware Upload") - # first check authentication - emsesp_url = "http://" + f'{ip}' - parsed_url = urlparse(emsesp_url) - host_ip = parsed_url.netloc - - signon_url = f"{emsesp_url}/rest/signIn" - - signon_headers = { - 'Host': host_ip, - 'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0', - 'Accept': '*/*', - 'Accept-Language': 'en-US', - 'Accept-Encoding': 'gzip, deflate', - 'Referer': f'{emsesp_url}', - 'Content-Type': 'application/json', - 'Connection': 'keep-alive' - } - - username_password = { - "username": username, - "password": password - } - - response = requests.post( - signon_url, json=username_password, headers=signon_headers, auth=None) - - if response.status_code != 200: - print_fail("Authentication failed (code " + - str(response.status_code) + ")") + # Validate file exists + file_path = Path(file) + if not file_path.exists(): + print_fail(f"File not found: {file}") return - print_success("Authentication successful") - access_token = response.json().get('access_token') + # Setup URLs and headers + emsesp_url = f"http://{ip}" + host_ip = ip + + # Use a session for connection pooling and persistence + session = requests.Session() + + try: + # Authentication + signon_url = f"{emsesp_url}/rest/signIn" + signon_headers = create_base_headers(host_ip, emsesp_url) + signon_headers['Content-Type'] = 'application/json' - # start the upload - with open(file, 'rb') as firmware: - md5 = hashlib.md5(firmware.read()).hexdigest() - - firmware.seek(0) - - encoder = MultipartEncoder(fields={ - 'MD5': md5, - 'file': (file, firmware, 'application/octet-stream')} - ) - - bar = tqdm(desc='Upload Progress', - total=encoder.len, - dynamic_ncols=True, - unit='B', - unit_scale=True, - unit_divisor=1024 - ) - - monitor = MultipartEncoderMonitor( - encoder, lambda monitor: bar.update(monitor.bytes_read - bar.n)) - - post_headers = { - 'Host': host_ip, - 'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0', - 'Accept': '*/*', - 'Accept-Language': 'en-US', - 'Accept-Encoding': 'gzip, deflate', - 'Referer': f'{emsesp_url}', - 'Connection': 'keep-alive', - 'Content-Type': monitor.content_type, - 'Content-Length': str(monitor.len), - 'Origin': f'{emsesp_url}', - 'Authorization': 'Bearer ' + f'{access_token}' + username_password = { + "username": username, + "password": password } - upload_url = f"{emsesp_url}/rest/uploadFile" - - response = requests.post( - upload_url, data=monitor, headers=post_headers, auth=None) - - bar.close() - time.sleep(0.1) + response = session.post(signon_url, json=username_password, headers=signon_headers) if response.status_code != 200: - print_fail("Upload failed (code " + response.status.code + ").") - else: - print_success("Upload successful. Rebooting device.") - restart_headers = { - 'Host': host_ip, - 'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0', - 'Accept': '*/*', - 'Accept-Language': 'en-US', - 'Accept-Encoding': 'gzip, deflate', - 'Referer': f'{emsesp_url}', - 'Content-Type': 'application/json', - 'Connection': 'keep-alive', - 'Authorization': 'Bearer ' + f'{access_token}' - } - restart_url = f"{emsesp_url}/api/system/restart" - response = requests.get(restart_url, headers=restart_headers) - if response.status_code != 200: - print_fail("Restart failed (code " + - str(response.status_code) + ")") + print_fail(f"Authentication failed (code {response.status_code})") + return - print() + print_success("Authentication successful") + access_token = response.json().get('access_token') + + # Calculate MD5 hash + print("Calculating MD5 hash...") + md5 = calculate_md5(file_path) + + # Start the upload + with open(file_path, 'rb') as firmware: + encoder = MultipartEncoder(fields={ + 'MD5': md5, + 'file': (file, firmware, 'application/octet-stream') + }) + + bar = tqdm( + desc='Upload Progress', + total=encoder.len, + dynamic_ncols=True, + unit='B', + unit_scale=True, + unit_divisor=1024 + ) + + monitor = MultipartEncoderMonitor( + encoder, lambda monitor: bar.update(monitor.bytes_read - bar.n)) + + post_headers = create_base_headers(host_ip, emsesp_url) + post_headers.update({ + 'Content-Type': monitor.content_type, + 'Content-Length': str(monitor.len), + 'Origin': emsesp_url, + 'Authorization': f'Bearer {access_token}' + }) + + upload_url = f"{emsesp_url}/rest/uploadFile" + response = session.post(upload_url, data=monitor, headers=post_headers) + + bar.close() + time.sleep(0.1) + + if response.status_code != 200: + print_fail(f"Upload failed (code {response.status_code})") + else: + print_success("Upload successful. Rebooting device.") + restart_headers = create_base_headers(host_ip, emsesp_url) + restart_headers.update({ + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {access_token}' + }) + restart_url = f"{emsesp_url}/api/system/restart" + response = session.get(restart_url, headers=restart_headers) + if response.status_code != 200: + print_fail(f"Restart failed (code {response.status_code})") + + except requests.RequestException as e: + print_fail(f"Network error: {e}") + sys.exit(1) + except IOError as e: + print_fail(f"File error: {e}") + sys.exit(1) + except Exception as e: + print_fail(f"Unexpected error: {e}") + sys.exit(1) + finally: + session.close() + + print() # main