From 0406ee05295b4d8f0d302a10ff19dcc995d986ab Mon Sep 17 00:00:00 2001 From: proddy Date: Tue, 9 Jun 2026 23:04:41 +0200 Subject: [PATCH] Scheduler to sync, Commands to async --- src/web/WebCommandService.cpp | 152 ++++++++++++++++++++++++++++++-- src/web/WebCommandService.h | 38 ++++++++ src/web/WebSchedulerService.cpp | 25 +----- src/web/WebSchedulerService.h | 14 --- src/web/WebStatusService.cpp | 2 +- 5 files changed, 187 insertions(+), 44 deletions(-) diff --git a/src/web/WebCommandService.cpp b/src/web/WebCommandService.cpp index 5a62c6da3..1ead8a808 100644 --- a/src/web/WebCommandService.cpp +++ b/src/web/WebCommandService.cpp @@ -23,6 +23,10 @@ namespace emsesp { +#ifndef EMSESP_STANDALONE +QueueHandle_t WebCommandService::commandQueue_ = nullptr; +#endif + WebCommandService::WebCommandService(AsyncWebServer * server, FS * fs, SecurityManager * securityManager) : _httpEndpoint(WebCommands::read, WebCommands::update, this, server, EMSESP_COMMANDS_SERVICE_PATH, securityManager, AuthenticationPredicates::IS_AUTHENTICATED) , _fsPersistence(WebCommands::read, WebCommands::update, this, fs, EMSESP_COMMANDS_FILE) { @@ -38,11 +42,134 @@ void WebCommandService::begin() { snprintf(topic, sizeof(topic), "%s/#", F_(commands)); Mqtt::subscribe(EMSdevice::DeviceType::COMMAND, topic, nullptr); +#ifndef EMSESP_STANDALONE + // when PSRAM is available, run command execution (potentially blocking, e.g. HTTP) in its own task + // so it never stalls the main loop. Without PSRAM we execute inline (see queueCommand()). + if (EMSESP::system_.PSram()) { + commandQueue_ = xQueueCreate(EMSESP_COMMAND_QUEUE_SIZE, sizeof(CommandJob *)); + if (commandQueue_ != nullptr) { +#if defined(CONFIG_FREERTOS_UNICORE) || (EMSESP_COMMAND_RUNNING_CORE < 0) + xTaskCreate((TaskFunction_t)command_task, "command_task", EMSESP_COMMAND_STACKSIZE, NULL, EMSESP_COMMAND_PRIORITY, NULL); +#else + xTaskCreatePinnedToCore( + (TaskFunction_t)command_task, "command_task", EMSESP_COMMAND_STACKSIZE, NULL, EMSESP_COMMAND_PRIORITY, NULL, EMSESP_COMMAND_RUNNING_CORE); +#endif + } + } +#endif + #if defined(EMSESP_TEST) load_test_data(); #endif } +// enqueue a command for the worker task. Fire-and-forget: returns true when the job was queued +// (or executed inline when no worker exists). It does NOT report the command's success/failure. +bool WebCommandService::queueCommand(const char * name, const char * value) { +#ifndef EMSESP_STANDALONE + if (commandQueue_ != nullptr) { + CommandJob * job = new CommandJob(); + if (job == nullptr) { + return false; + } + job->name = name ? name : ""; + job->has_value = (value != nullptr); + job->value = value ? value : ""; + if (xQueueSend(commandQueue_, &job, 0) != pdPASS) { + EMSESP::logger().warning("Command queue full, dropping '%s'", job->name.c_str()); + delete job; + return false; + } + return true; + } +#endif + // no worker task available (no PSRAM or standalone build) - run synchronously + return executeCommand(name, value); +} + +// true if the command definition is a HTTP/URL command (JSON with a http(s):// url), +// i.e. one that will do a blocking TCP/TLS request when executed +bool WebCommandService::isUrlCommand(const std::string & command) { + JsonDocument doc; + if (deserializeJson(doc, command) != DeserializationError::Ok) { + return false; + } + std::string url = doc["url"] | ""; + auto lower_url = Helpers::toLower(url.c_str()); + return lower_url.starts_with("http://") || lower_url.starts_with("https://"); +} + +// true if a value expression contains an embedded {"url":...} JSON snippet, which compute() +// will resolve with a blocking HTTP request. Mirrors the scan compute() does in shuntingYard.cpp +bool WebCommandService::valueContainsUrl(const std::string & value) { + auto f = value.find_first_of('{'); + while (f != std::string::npos) { + // find the matching closing brace, like compute() does + auto e = f + 1; + for (uint8_t i = 1; i > 0; e++) { + if (e >= value.length()) { + return false; // unbalanced braces, compute() will give up too + } else if (value[e] == '}') { + i--; + } else if (value[e] == '{') { + i++; + } + } + JsonDocument doc; + if (deserializeJson(doc, value.substr(f, e - f)) == DeserializationError::Ok) { + for (JsonPairConst p : doc.as()) { + if (Helpers::toLower(p.key().c_str()) == "url") { + return true; + } + } + } + f = value.find_first_of('{', e); + } + return false; +} + +// smart dispatch: commands that will do a blocking HTTP/TLS request - either a URL command or an +// internal command whose value embeds a {url} fetch - are offloaded to the worker task so they +// can't stall the caller (main loop for MQTT, async_tcp task for the web API). Everything else +// runs synchronously, so the caller still gets the command's real success/failure. +// for queued commands the return value only means "dispatched". +bool WebCommandService::dispatchCommand(const char * name, const char * value) { +#ifndef EMSESP_STANDALONE + if (commandQueue_ != nullptr) { + const CommandItem * ci = find(name); + if (ci != nullptr) { + if (isUrlCommand(ci->cmd.c_str())) { + return queueCommand(name, value); + } + // system/message defers evaluation of its value (via the scheduler's raw_value), + // so executing it never blocks - keep it synchronous even if the value has a {url} + if (Helpers::toLower(ci->cmd.c_str()) != "system/message") { + // the effective value is the override if given, else the command's stored default + const std::string effective_value = value ? value : std::string(ci->value.c_str()); + if (valueContainsUrl(effective_value)) { + return queueCommand(name, value); + } + } + } + } +#endif + return executeCommand(name, value); +} + +#ifndef EMSESP_STANDALONE +// worker task: blocks on the queue and executes each command in turn, off the main loop +void WebCommandService::command_task(void * pvParameters) { + CommandJob * job = nullptr; + while (1) { + if (xQueueReceive(commandQueue_, &job, portMAX_DELAY) == pdPASS && job != nullptr) { + EMSESP::webCommandService.executeCommand(job->name.c_str(), job->has_value ? job->value.c_str() : nullptr); + delete job; + job = nullptr; + } + } +} +#endif + void WebCommands::read(WebCommands & webCommands, JsonObject root) { JsonArray items = root["commands"].to(); uint8_t counter = 1; @@ -71,7 +198,7 @@ StateUpdateResult WebCommands::update(JsonObject root, WebCommands & webCommands EMSdevice::DeviceType::COMMAND, webCommands.commandItems.back().name, [name = std::string(webCommands.commandItems.back().name)](const char * value, const int8_t id) { - return EMSESP::webCommandService.executeCommand(name.c_str(), value); // value is optional + return EMSESP::webCommandService.dispatchCommand(name.c_str(), value); // value is optional }, FL_(command_cmd), CommandFlag::ADMIN_ONLY); @@ -110,6 +237,20 @@ bool WebCommandService::executeCommand(const char * name, const char * value) { bool WebCommandService::executeCommand(const char * name, const std::string & command, const std::string & data) { std::string cmd = Helpers::toLower(command); + // run the value through the shunting-yard calculator so expressions like "custom/heatcnt + 1" + // are resolved (entity references replaced by their values, then computed). Plain values pass + // through unchanged. Applies to both URL and internal commands, like the old scheduler code + // which computed the value before executing. system/message evaluates its own argument later + // (deferred via the scheduler's raw_value), so pre-computing it would run it twice - pass raw. + std::string computed_data = data; + if (!data.empty() && cmd != "system/message") { + computed_data = compute(data); + if (computed_data.empty()) { + EMSESP::logger().warning("Command '%s': cannot compute value '%s'", name, data.c_str()); + return false; + } + } + // handle HTTP commands (JSON with url/method/value) JsonDocument doc; if (deserializeJson(doc, cmd) == DeserializationError::Ok) { @@ -121,7 +262,8 @@ bool WebCommandService::executeCommand(const char * name, const std::string & co commands(s, false); url.replace(q + 1, l, s); } - std::string value = doc["value"] | data; + // the cmd's embedded value only gets entity substitution (commands), the passed value is fully computed + std::string value = doc["value"] | computed_data; std::string method = doc["method"] | "GET"; commands(value, false); auto lower_url = Helpers::toLower(url.c_str()); @@ -142,8 +284,8 @@ bool WebCommandService::executeCommand(const char * name, const std::string & co // handle internal API commands doc.clear(); JsonObject input = doc.to(); - if (!data.empty()) { - input["data"] = data; + if (!computed_data.empty()) { + input["data"] = computed_data; } JsonDocument doc_output; @@ -301,7 +443,7 @@ void WebCommandService::load_test_data() { EMSdevice::DeviceType::COMMAND, item.name, [name = std::string(item.name)](const char * value, const int8_t id) { - return EMSESP::webCommandService.executeCommand(name.c_str(), value); + return EMSESP::webCommandService.dispatchCommand(name.c_str(), value); }, FL_(command_cmd), CommandFlag::ADMIN_ONLY); diff --git a/src/web/WebCommandService.h b/src/web/WebCommandService.h index c4d54cbf4..6de4ad429 100644 --- a/src/web/WebCommandService.h +++ b/src/web/WebCommandService.h @@ -18,12 +18,31 @@ #include +#ifndef EMSESP_STANDALONE +#include +#include +#include +#endif + #ifndef WebCommandService_h #define WebCommandService_h #define EMSESP_COMMANDS_FILE "/config/emsespCommands.json" #define EMSESP_COMMANDS_SERVICE_PATH "/rest/commands" // GET and POST +#ifndef EMSESP_COMMAND_RUNNING_CORE +#define EMSESP_COMMAND_RUNNING_CORE 1 +#endif +#ifndef EMSESP_COMMAND_STACKSIZE +#define EMSESP_COMMAND_STACKSIZE 8192 +#endif +#ifndef EMSESP_COMMAND_PRIORITY +#define EMSESP_COMMAND_PRIORITY 1 +#endif +#ifndef EMSESP_COMMAND_QUEUE_SIZE +#define EMSESP_COMMAND_QUEUE_SIZE 10 +#endif + namespace emsesp { class CommandItem { @@ -33,6 +52,14 @@ class CommandItem { char name[20]; }; +// a single unit of work handed to the command worker task +class CommandJob { + public: + std::string name; + std::string value; + bool has_value; +}; + class WebCommands { public: std::list> commandItems; @@ -53,6 +80,12 @@ class WebCommandService : public StatefulService { bool executeCommand(const char * name, const char * value = nullptr); bool executeCommand(const char * name, const std::string & cmd, const std::string & value); + bool queueCommand(const char * name, const char * value = nullptr); + bool dispatchCommand(const char * name, const char * value = nullptr); + + static bool isUrlCommand(const std::string & command); // true if the command definition is a HTTP/URL command + static bool valueContainsUrl(const std::string & value); // true if a value embeds a {"url":...} compute() will fetch + const CommandItem * find(const char * name); uint8_t count_entities(); @@ -64,6 +97,11 @@ class WebCommandService : public StatefulService { #endif private: +#ifndef EMSESP_STANDALONE + static void command_task(void * pvParameters); + static QueueHandle_t commandQueue_; +#endif + HttpEndpoint _httpEndpoint; FSPersistence _fsPersistence; diff --git a/src/web/WebSchedulerService.cpp b/src/web/WebSchedulerService.cpp index 4741e3344..df30da412 100644 --- a/src/web/WebSchedulerService.cpp +++ b/src/web/WebSchedulerService.cpp @@ -39,16 +39,6 @@ void WebSchedulerService::begin() { char topic[Mqtt::MQTT_TOPIC_MAX_SIZE]; snprintf(topic, sizeof(topic), "%s/#", F_(scheduler)); Mqtt::subscribe(EMSdevice::DeviceType::SCHEDULER, topic, nullptr); // use empty function callback -#ifndef EMSESP_STANDALONE - if (EMSESP::system_.PSram()) { -#if defined(CONFIG_FREERTOS_UNICORE) || (EMSESP_SCHEDULER_RUNNING_CORE < 0) - xTaskCreate((TaskFunction_t)scheduler_task, "scheduler_task", EMSESP_SCHEDULER_STACKSIZE, NULL, EMSESP_SCHEDULER_PRIORITY, NULL); -#else - xTaskCreatePinnedToCore( - (TaskFunction_t)scheduler_task, "scheduler_task", EMSESP_SCHEDULER_STACKSIZE, NULL, EMSESP_SCHEDULER_PRIORITY, NULL, EMSESP_SCHEDULER_RUNNING_CORE); -#endif - } -#endif #if defined(EMSESP_TEST) load_test_data(); @@ -339,7 +329,7 @@ bool WebSchedulerService::runScheduleCommand(const ScheduleItem & si) { EMSESP::logger().warning("Schedule '%s': no command assigned", si.name); return false; } - return EMSESP::webCommandService.executeCommand(si.cmd_name.c_str()); + return EMSESP::webCommandService.dispatchCommand(si.cmd_name.c_str()); } // queue schedules to be handled executed in WebSchedulerService::loop() called from emsesp.cpp @@ -443,19 +433,6 @@ void WebSchedulerService::loop() { } } -// process schedules async -void WebSchedulerService::scheduler_task(void * pvParameters) { - while (1) { - delay(10); - if (EMSESP::system_.systemStatus() == SYSTEM_STATUS::SYSTEM_STATUS_NORMAL) { - EMSESP::webSchedulerService.loop(); - } - } -#ifndef EMSESP_STANDALONE - vTaskDelete(NULL); -#endif -} - #if defined(EMSESP_TEST) void WebSchedulerService::load_test_data() { Command::erase_device_commands(EMSdevice::DeviceType::SCHEDULER); diff --git a/src/web/WebSchedulerService.h b/src/web/WebSchedulerService.h index cf2e74ce4..db21f38cf 100644 --- a/src/web/WebSchedulerService.h +++ b/src/web/WebSchedulerService.h @@ -24,18 +24,6 @@ #define EMSESP_SCHEDULER_FILE "/config/emsespScheduler.json" #define EMSESP_SCHEDULER_SERVICE_PATH "/rest/schedule" // GET and POST -#ifndef EMSESP_SCHEDULER_RUNNING_CORE -#define EMSESP_SCHEDULER_RUNNING_CORE 1 -#endif - -#ifndef EMSESP_SCHEDULER_STACKSIZE -#define EMSESP_SCHEDULER_STACKSIZE 5120 -#endif - -#ifndef EMSESP_SCHEDULER_PRIORITY -#define EMSESP_SCHEDULER_PRIORITY 1 -#endif - // bit flags for the schedule items. Matches those in interface/src/app/main/SchedulerDialog.tsx // 0-127 (0->0x7F) is day schedule // 128 (0x80) is timer @@ -98,8 +86,6 @@ class WebSchedulerService : public StatefulService { #ifndef EMSESP_STANDALONE private: #endif - static void scheduler_task(void * pvParameters); - bool runScheduleCommand(const ScheduleItem & si); void condition(); diff --git a/src/web/WebStatusService.cpp b/src/web/WebStatusService.cpp index 4ae37bb47..c351eb771 100644 --- a/src/web/WebStatusService.cpp +++ b/src/web/WebStatusService.cpp @@ -233,7 +233,7 @@ void WebStatusService::action(AsyncWebServerRequest * request, JsonVariant json) } else if (action == "upgradeImportantMessages") { root["upgradeImportantMessageType"] = upgradeImportantMessages(param); } else if (action == "executeCommand") { - ok = EMSESP::webCommandService.executeCommand(param.c_str()); + ok = EMSESP::webCommandService.dispatchCommand(param.c_str()); // command worker task (ok = dispatched); fast internal commands run inline (ok = real result) } #if defined(EMSESP_STANDALONE) && !defined(EMSESP_UNITY)