Files
EMS-ESP32/scripts/upload.py
2025-11-03 18:01:09 +01:00

170 lines
5.0 KiB
Python

# Modified from https://github.com/ayushsharma82/ElegantOTA
#
# This is called during the PlatformIO upload process, when the target is 'upload'.
# Use the file upload_cli.py for manual uploads outside PIO.
#
# To use create a pio_local.ini file in the project root and add the following:
# [env]
# upload_protocol = custom
# custom_emsesp_ip = ems-esp.local
# custom_username = admin
# custom_password = admin
#
# and
# extra_scripts = scripts/upload.py
#
import requests
import hashlib
from urllib.parse import urlparse
import time
import os
Import("env")
try:
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
from tqdm import tqdm
from termcolor import cprint
except ImportError:
env.Execute("$PYTHONEXE -m pip install requests_toolbelt")
env.Execute("$PYTHONEXE -m pip install tqdm")
env.Execute("$PYTHONEXE -m pip install termcolor")
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
from tqdm import tqdm
from termcolor import cprint
def print_success(x):
cprint(x, 'green')
def print_fail(x):
cprint(f'Error: {x}', 'red')
def build_headers(host_ip, emsesp_url, content_type='application/json', access_token=None, extra_headers=None):
"""Build common HTTP headers with optional overrides."""
headers = {
'Host': host_ip,
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0',
'Accept': '*/*',
'Accept-Language': 'en-US',
'Accept-Encoding': 'gzip, deflate',
'Referer': emsesp_url,
'Content-Type': content_type,
'Connection': 'keep-alive'
}
if access_token:
headers['Authorization'] = f'Bearer {access_token}'
if extra_headers:
headers.update(extra_headers)
return headers
def on_upload(source, target, env):
# make sure we have set the upload_protocol to custom
if env.get('UPLOAD_PROTOCOL') != 'custom':
print_fail(
"Please set upload_protocol = custom in your pio_local.ini file when using upload.py")
return
# first check authentication
try:
username = env.GetProjectOption('custom_username')
password = env.GetProjectOption('custom_password')
emsesp_ip = env.GetProjectOption('custom_emsesp_ip')
except Exception as e:
print_fail(f'Missing settings. Add these to your pio_local.ini file:\n\ncustom_username=username\ncustom_password=password\ncustom_emsesp_ip=ems-esp.local\n')
return
emsesp_url = f"http://{emsesp_ip}"
parsed_url = urlparse(emsesp_url)
host_ip = parsed_url.netloc
signon_url = f"{emsesp_url}/rest/signIn"
signon_headers = build_headers(host_ip, emsesp_url)
username_password = {
"username": username,
"password": password
}
response = requests.post(
signon_url, json=username_password, headers=signon_headers)
if response.status_code != 200:
print_fail("Authentication with EMS-ESP failed (code " +
str(response.status_code) + ")")
return
print_success("Authentication with EMS-ESP successful")
access_token = response.json().get('access_token')
# start the upload
firmware_path = str(source[0])
with open(firmware_path, 'rb') as firmware:
md5 = hashlib.md5(firmware.read()).hexdigest()
firmware.seek(0)
encoder = MultipartEncoder(fields={
'MD5': md5,
'file': (firmware_path, firmware, 'application/octet-stream')}
)
bar = tqdm(desc='Upload Progress',
total=encoder.len,
dynamic_ncols=True,
unit='B',
unit_scale=True,
unit_divisor=1024
)
monitor = MultipartEncoderMonitor(
encoder, lambda monitor: bar.update(monitor.bytes_read - bar.n))
post_headers = build_headers(
host_ip,
emsesp_url,
content_type=monitor.content_type,
access_token=access_token,
extra_headers={
'Content-Length': str(monitor.len),
'Origin': emsesp_url
}
)
upload_url = f"{emsesp_url}/rest/uploadFile"
response = requests.post(
upload_url, data=monitor, headers=post_headers)
bar.close()
time.sleep(0.1)
print()
if response.status_code != 200:
print_fail(f"Upload failed (code {response.status_code}).")
else:
print_success("Upload successful. Rebooting device.")
restart_headers = build_headers(
host_ip, emsesp_url, access_token=access_token)
restart_url = f"{emsesp_url}/api/system/restart"
response = requests.get(restart_url, headers=restart_headers)
if response.status_code != 200:
print_fail(f"Restart failed (code {response.status_code})")
print()
if env.get('UPLOAD_PROTOCOL') == 'custom':
env.Replace(UPLOADCMD=on_upload)