mirror of
https://github.com/emsesp/EMS-ESP32.git
synced 2025-12-06 15:59:52 +03:00
python optimizations
This commit is contained in:
@@ -11,125 +11,154 @@
|
||||
# python3 upload_cli.py -i 10.10.10.175 -f ../build/firmware/EMS-ESP-3_7_0-dev_34-ESP32S3-16MB+.bin
|
||||
|
||||
import argparse
|
||||
import requests
|
||||
import hashlib
|
||||
from urllib.parse import urlparse
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
|
||||
from tqdm import tqdm
|
||||
from termcolor import cprint
|
||||
from tqdm import tqdm
|
||||
|
||||
# Constants
|
||||
USER_AGENT = 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0'
|
||||
CHUNK_SIZE = 8192 # 8KB chunks for MD5 calculation
|
||||
|
||||
|
||||
def print_success(x): return cprint(x, 'green')
|
||||
def print_fail(x): return cprint(x, 'red')
|
||||
def print_success(x):
|
||||
return cprint(x, 'green')
|
||||
|
||||
|
||||
def print_fail(x):
|
||||
return cprint(x, 'red')
|
||||
|
||||
|
||||
def calculate_md5(file_path):
|
||||
"""Calculate MD5 hash of a file in chunks for memory efficiency."""
|
||||
md5_hash = hashlib.md5()
|
||||
with open(file_path, 'rb') as f:
|
||||
for chunk in iter(lambda: f.read(CHUNK_SIZE), b''):
|
||||
md5_hash.update(chunk)
|
||||
return md5_hash.hexdigest()
|
||||
|
||||
|
||||
def create_base_headers(host_ip, emsesp_url):
|
||||
"""Create base headers used across all requests."""
|
||||
return {
|
||||
'Host': host_ip,
|
||||
'User-Agent': USER_AGENT,
|
||||
'Accept': '*/*',
|
||||
'Accept-Language': 'en-US',
|
||||
'Accept-Encoding': 'gzip, deflate',
|
||||
'Referer': emsesp_url,
|
||||
'Connection': 'keep-alive'
|
||||
}
|
||||
|
||||
|
||||
def upload(file, ip, username, password):
|
||||
|
||||
"""Upload firmware to EMS-ESP device."""
|
||||
# Print welcome message
|
||||
print()
|
||||
print("EMS-ESP Firmware Upload")
|
||||
|
||||
# first check authentication
|
||||
emsesp_url = "http://" + f'{ip}'
|
||||
parsed_url = urlparse(emsesp_url)
|
||||
host_ip = parsed_url.netloc
|
||||
|
||||
signon_url = f"{emsesp_url}/rest/signIn"
|
||||
|
||||
signon_headers = {
|
||||
'Host': host_ip,
|
||||
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0',
|
||||
'Accept': '*/*',
|
||||
'Accept-Language': 'en-US',
|
||||
'Accept-Encoding': 'gzip, deflate',
|
||||
'Referer': f'{emsesp_url}',
|
||||
'Content-Type': 'application/json',
|
||||
'Connection': 'keep-alive'
|
||||
}
|
||||
|
||||
username_password = {
|
||||
"username": username,
|
||||
"password": password
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
signon_url, json=username_password, headers=signon_headers, auth=None)
|
||||
|
||||
if response.status_code != 200:
|
||||
print_fail("Authentication failed (code " +
|
||||
str(response.status_code) + ")")
|
||||
# Validate file exists
|
||||
file_path = Path(file)
|
||||
if not file_path.exists():
|
||||
print_fail(f"File not found: {file}")
|
||||
return
|
||||
|
||||
print_success("Authentication successful")
|
||||
access_token = response.json().get('access_token')
|
||||
# Setup URLs and headers
|
||||
emsesp_url = f"http://{ip}"
|
||||
host_ip = ip
|
||||
|
||||
# Use a session for connection pooling and persistence
|
||||
session = requests.Session()
|
||||
|
||||
try:
|
||||
# Authentication
|
||||
signon_url = f"{emsesp_url}/rest/signIn"
|
||||
signon_headers = create_base_headers(host_ip, emsesp_url)
|
||||
signon_headers['Content-Type'] = 'application/json'
|
||||
|
||||
# start the upload
|
||||
with open(file, 'rb') as firmware:
|
||||
md5 = hashlib.md5(firmware.read()).hexdigest()
|
||||
|
||||
firmware.seek(0)
|
||||
|
||||
encoder = MultipartEncoder(fields={
|
||||
'MD5': md5,
|
||||
'file': (file, firmware, 'application/octet-stream')}
|
||||
)
|
||||
|
||||
bar = tqdm(desc='Upload Progress',
|
||||
total=encoder.len,
|
||||
dynamic_ncols=True,
|
||||
unit='B',
|
||||
unit_scale=True,
|
||||
unit_divisor=1024
|
||||
)
|
||||
|
||||
monitor = MultipartEncoderMonitor(
|
||||
encoder, lambda monitor: bar.update(monitor.bytes_read - bar.n))
|
||||
|
||||
post_headers = {
|
||||
'Host': host_ip,
|
||||
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0',
|
||||
'Accept': '*/*',
|
||||
'Accept-Language': 'en-US',
|
||||
'Accept-Encoding': 'gzip, deflate',
|
||||
'Referer': f'{emsesp_url}',
|
||||
'Connection': 'keep-alive',
|
||||
'Content-Type': monitor.content_type,
|
||||
'Content-Length': str(monitor.len),
|
||||
'Origin': f'{emsesp_url}',
|
||||
'Authorization': 'Bearer ' + f'{access_token}'
|
||||
username_password = {
|
||||
"username": username,
|
||||
"password": password
|
||||
}
|
||||
|
||||
upload_url = f"{emsesp_url}/rest/uploadFile"
|
||||
|
||||
response = requests.post(
|
||||
upload_url, data=monitor, headers=post_headers, auth=None)
|
||||
|
||||
bar.close()
|
||||
time.sleep(0.1)
|
||||
response = session.post(signon_url, json=username_password, headers=signon_headers)
|
||||
|
||||
if response.status_code != 200:
|
||||
print_fail("Upload failed (code " + response.status.code + ").")
|
||||
else:
|
||||
print_success("Upload successful. Rebooting device.")
|
||||
restart_headers = {
|
||||
'Host': host_ip,
|
||||
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0',
|
||||
'Accept': '*/*',
|
||||
'Accept-Language': 'en-US',
|
||||
'Accept-Encoding': 'gzip, deflate',
|
||||
'Referer': f'{emsesp_url}',
|
||||
'Content-Type': 'application/json',
|
||||
'Connection': 'keep-alive',
|
||||
'Authorization': 'Bearer ' + f'{access_token}'
|
||||
}
|
||||
restart_url = f"{emsesp_url}/api/system/restart"
|
||||
response = requests.get(restart_url, headers=restart_headers)
|
||||
if response.status_code != 200:
|
||||
print_fail("Restart failed (code " +
|
||||
str(response.status_code) + ")")
|
||||
print_fail(f"Authentication failed (code {response.status_code})")
|
||||
return
|
||||
|
||||
print()
|
||||
print_success("Authentication successful")
|
||||
access_token = response.json().get('access_token')
|
||||
|
||||
# Calculate MD5 hash
|
||||
print("Calculating MD5 hash...")
|
||||
md5 = calculate_md5(file_path)
|
||||
|
||||
# Start the upload
|
||||
with open(file_path, 'rb') as firmware:
|
||||
encoder = MultipartEncoder(fields={
|
||||
'MD5': md5,
|
||||
'file': (file, firmware, 'application/octet-stream')
|
||||
})
|
||||
|
||||
bar = tqdm(
|
||||
desc='Upload Progress',
|
||||
total=encoder.len,
|
||||
dynamic_ncols=True,
|
||||
unit='B',
|
||||
unit_scale=True,
|
||||
unit_divisor=1024
|
||||
)
|
||||
|
||||
monitor = MultipartEncoderMonitor(
|
||||
encoder, lambda monitor: bar.update(monitor.bytes_read - bar.n))
|
||||
|
||||
post_headers = create_base_headers(host_ip, emsesp_url)
|
||||
post_headers.update({
|
||||
'Content-Type': monitor.content_type,
|
||||
'Content-Length': str(monitor.len),
|
||||
'Origin': emsesp_url,
|
||||
'Authorization': f'Bearer {access_token}'
|
||||
})
|
||||
|
||||
upload_url = f"{emsesp_url}/rest/uploadFile"
|
||||
response = session.post(upload_url, data=monitor, headers=post_headers)
|
||||
|
||||
bar.close()
|
||||
time.sleep(0.1)
|
||||
|
||||
if response.status_code != 200:
|
||||
print_fail(f"Upload failed (code {response.status_code})")
|
||||
else:
|
||||
print_success("Upload successful. Rebooting device.")
|
||||
restart_headers = create_base_headers(host_ip, emsesp_url)
|
||||
restart_headers.update({
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': f'Bearer {access_token}'
|
||||
})
|
||||
restart_url = f"{emsesp_url}/api/system/restart"
|
||||
response = session.get(restart_url, headers=restart_headers)
|
||||
if response.status_code != 200:
|
||||
print_fail(f"Restart failed (code {response.status_code})")
|
||||
|
||||
except requests.RequestException as e:
|
||||
print_fail(f"Network error: {e}")
|
||||
sys.exit(1)
|
||||
except IOError as e:
|
||||
print_fail(f"File error: {e}")
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
print_fail(f"Unexpected error: {e}")
|
||||
sys.exit(1)
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
print()
|
||||
|
||||
|
||||
# main
|
||||
|
||||
Reference in New Issue
Block a user