Scheduler to sync, Commands to async

This commit is contained in:
proddy
2026-06-09 23:04:41 +02:00
parent 423515ec37
commit 0406ee0529
5 changed files with 187 additions and 44 deletions

View File

@@ -23,6 +23,10 @@
namespace emsesp {
#ifndef EMSESP_STANDALONE
QueueHandle_t WebCommandService::commandQueue_ = nullptr;
#endif
WebCommandService::WebCommandService(AsyncWebServer * server, FS * fs, SecurityManager * securityManager)
: _httpEndpoint(WebCommands::read, WebCommands::update, this, server, EMSESP_COMMANDS_SERVICE_PATH, securityManager, AuthenticationPredicates::IS_AUTHENTICATED)
, _fsPersistence(WebCommands::read, WebCommands::update, this, fs, EMSESP_COMMANDS_FILE) {
@@ -38,11 +42,134 @@ void WebCommandService::begin() {
snprintf(topic, sizeof(topic), "%s/#", F_(commands));
Mqtt::subscribe(EMSdevice::DeviceType::COMMAND, topic, nullptr);
#ifndef EMSESP_STANDALONE
// when PSRAM is available, run command execution (potentially blocking, e.g. HTTP) in its own task
// so it never stalls the main loop. Without PSRAM we execute inline (see queueCommand()).
if (EMSESP::system_.PSram()) {
commandQueue_ = xQueueCreate(EMSESP_COMMAND_QUEUE_SIZE, sizeof(CommandJob *));
if (commandQueue_ != nullptr) {
#if defined(CONFIG_FREERTOS_UNICORE) || (EMSESP_COMMAND_RUNNING_CORE < 0)
xTaskCreate((TaskFunction_t)command_task, "command_task", EMSESP_COMMAND_STACKSIZE, NULL, EMSESP_COMMAND_PRIORITY, NULL);
#else
xTaskCreatePinnedToCore(
(TaskFunction_t)command_task, "command_task", EMSESP_COMMAND_STACKSIZE, NULL, EMSESP_COMMAND_PRIORITY, NULL, EMSESP_COMMAND_RUNNING_CORE);
#endif
}
}
#endif
#if defined(EMSESP_TEST)
load_test_data();
#endif
}
// enqueue a command for the worker task. Fire-and-forget: returns true when the job was queued
// (or executed inline when no worker exists). It does NOT report the command's success/failure.
bool WebCommandService::queueCommand(const char * name, const char * value) {
#ifndef EMSESP_STANDALONE
if (commandQueue_ != nullptr) {
CommandJob * job = new CommandJob();
if (job == nullptr) {
return false;
}
job->name = name ? name : "";
job->has_value = (value != nullptr);
job->value = value ? value : "";
if (xQueueSend(commandQueue_, &job, 0) != pdPASS) {
EMSESP::logger().warning("Command queue full, dropping '%s'", job->name.c_str());
delete job;
return false;
}
return true;
}
#endif
// no worker task available (no PSRAM or standalone build) - run synchronously
return executeCommand(name, value);
}
// true if the command definition is a HTTP/URL command (JSON with a http(s):// url),
// i.e. one that will do a blocking TCP/TLS request when executed
bool WebCommandService::isUrlCommand(const std::string & command) {
JsonDocument doc;
if (deserializeJson(doc, command) != DeserializationError::Ok) {
return false;
}
std::string url = doc["url"] | "";
auto lower_url = Helpers::toLower(url.c_str());
return lower_url.starts_with("http://") || lower_url.starts_with("https://");
}
// true if a value expression contains an embedded {"url":...} JSON snippet, which compute()
// will resolve with a blocking HTTP request. Mirrors the scan compute() does in shuntingYard.cpp
bool WebCommandService::valueContainsUrl(const std::string & value) {
auto f = value.find_first_of('{');
while (f != std::string::npos) {
// find the matching closing brace, like compute() does
auto e = f + 1;
for (uint8_t i = 1; i > 0; e++) {
if (e >= value.length()) {
return false; // unbalanced braces, compute() will give up too
} else if (value[e] == '}') {
i--;
} else if (value[e] == '{') {
i++;
}
}
JsonDocument doc;
if (deserializeJson(doc, value.substr(f, e - f)) == DeserializationError::Ok) {
for (JsonPairConst p : doc.as<JsonObjectConst>()) {
if (Helpers::toLower(p.key().c_str()) == "url") {
return true;
}
}
}
f = value.find_first_of('{', e);
}
return false;
}
// smart dispatch: commands that will do a blocking HTTP/TLS request - either a URL command or an
// internal command whose value embeds a {url} fetch - are offloaded to the worker task so they
// can't stall the caller (main loop for MQTT, async_tcp task for the web API). Everything else
// runs synchronously, so the caller still gets the command's real success/failure.
// for queued commands the return value only means "dispatched".
bool WebCommandService::dispatchCommand(const char * name, const char * value) {
#ifndef EMSESP_STANDALONE
if (commandQueue_ != nullptr) {
const CommandItem * ci = find(name);
if (ci != nullptr) {
if (isUrlCommand(ci->cmd.c_str())) {
return queueCommand(name, value);
}
// system/message defers evaluation of its value (via the scheduler's raw_value),
// so executing it never blocks - keep it synchronous even if the value has a {url}
if (Helpers::toLower(ci->cmd.c_str()) != "system/message") {
// the effective value is the override if given, else the command's stored default
const std::string effective_value = value ? value : std::string(ci->value.c_str());
if (valueContainsUrl(effective_value)) {
return queueCommand(name, value);
}
}
}
}
#endif
return executeCommand(name, value);
}
#ifndef EMSESP_STANDALONE
// worker task: blocks on the queue and executes each command in turn, off the main loop
void WebCommandService::command_task(void * pvParameters) {
CommandJob * job = nullptr;
while (1) {
if (xQueueReceive(commandQueue_, &job, portMAX_DELAY) == pdPASS && job != nullptr) {
EMSESP::webCommandService.executeCommand(job->name.c_str(), job->has_value ? job->value.c_str() : nullptr);
delete job;
job = nullptr;
}
}
}
#endif
void WebCommands::read(WebCommands & webCommands, JsonObject root) {
JsonArray items = root["commands"].to<JsonArray>();
uint8_t counter = 1;
@@ -71,7 +198,7 @@ StateUpdateResult WebCommands::update(JsonObject root, WebCommands & webCommands
EMSdevice::DeviceType::COMMAND,
webCommands.commandItems.back().name,
[name = std::string(webCommands.commandItems.back().name)](const char * value, const int8_t id) {
return EMSESP::webCommandService.executeCommand(name.c_str(), value); // value is optional
return EMSESP::webCommandService.dispatchCommand(name.c_str(), value); // value is optional
},
FL_(command_cmd),
CommandFlag::ADMIN_ONLY);
@@ -110,6 +237,20 @@ bool WebCommandService::executeCommand(const char * name, const char * value) {
bool WebCommandService::executeCommand(const char * name, const std::string & command, const std::string & data) {
std::string cmd = Helpers::toLower(command);
// run the value through the shunting-yard calculator so expressions like "custom/heatcnt + 1"
// are resolved (entity references replaced by their values, then computed). Plain values pass
// through unchanged. Applies to both URL and internal commands, like the old scheduler code
// which computed the value before executing. system/message evaluates its own argument later
// (deferred via the scheduler's raw_value), so pre-computing it would run it twice - pass raw.
std::string computed_data = data;
if (!data.empty() && cmd != "system/message") {
computed_data = compute(data);
if (computed_data.empty()) {
EMSESP::logger().warning("Command '%s': cannot compute value '%s'", name, data.c_str());
return false;
}
}
// handle HTTP commands (JSON with url/method/value)
JsonDocument doc;
if (deserializeJson(doc, cmd) == DeserializationError::Ok) {
@@ -121,7 +262,8 @@ bool WebCommandService::executeCommand(const char * name, const std::string & co
commands(s, false);
url.replace(q + 1, l, s);
}
std::string value = doc["value"] | data;
// the cmd's embedded value only gets entity substitution (commands), the passed value is fully computed
std::string value = doc["value"] | computed_data;
std::string method = doc["method"] | "GET";
commands(value, false);
auto lower_url = Helpers::toLower(url.c_str());
@@ -142,8 +284,8 @@ bool WebCommandService::executeCommand(const char * name, const std::string & co
// handle internal API commands
doc.clear();
JsonObject input = doc.to<JsonObject>();
if (!data.empty()) {
input["data"] = data;
if (!computed_data.empty()) {
input["data"] = computed_data;
}
JsonDocument doc_output;
@@ -301,7 +443,7 @@ void WebCommandService::load_test_data() {
EMSdevice::DeviceType::COMMAND,
item.name,
[name = std::string(item.name)](const char * value, const int8_t id) {
return EMSESP::webCommandService.executeCommand(name.c_str(), value);
return EMSESP::webCommandService.dispatchCommand(name.c_str(), value);
},
FL_(command_cmd),
CommandFlag::ADMIN_ONLY);

View File

@@ -18,12 +18,31 @@
#include <esp32-psram.h>
#ifndef EMSESP_STANDALONE
#include <freertos/FreeRTOS.h>
#include <freertos/queue.h>
#include <freertos/task.h>
#endif
#ifndef WebCommandService_h
#define WebCommandService_h
#define EMSESP_COMMANDS_FILE "/config/emsespCommands.json"
#define EMSESP_COMMANDS_SERVICE_PATH "/rest/commands" // GET and POST
#ifndef EMSESP_COMMAND_RUNNING_CORE
#define EMSESP_COMMAND_RUNNING_CORE 1
#endif
#ifndef EMSESP_COMMAND_STACKSIZE
#define EMSESP_COMMAND_STACKSIZE 8192
#endif
#ifndef EMSESP_COMMAND_PRIORITY
#define EMSESP_COMMAND_PRIORITY 1
#endif
#ifndef EMSESP_COMMAND_QUEUE_SIZE
#define EMSESP_COMMAND_QUEUE_SIZE 10
#endif
namespace emsesp {
class CommandItem {
@@ -33,6 +52,14 @@ class CommandItem {
char name[20];
};
// a single unit of work handed to the command worker task
class CommandJob {
public:
std::string name;
std::string value;
bool has_value;
};
class WebCommands {
public:
std::list<CommandItem, AllocatorPSRAM<CommandItem>> commandItems;
@@ -53,6 +80,12 @@ class WebCommandService : public StatefulService<WebCommands> {
bool executeCommand(const char * name, const char * value = nullptr);
bool executeCommand(const char * name, const std::string & cmd, const std::string & value);
bool queueCommand(const char * name, const char * value = nullptr);
bool dispatchCommand(const char * name, const char * value = nullptr);
static bool isUrlCommand(const std::string & command); // true if the command definition is a HTTP/URL command
static bool valueContainsUrl(const std::string & value); // true if a value embeds a {"url":...} compute() will fetch
const CommandItem * find(const char * name);
uint8_t count_entities();
@@ -64,6 +97,11 @@ class WebCommandService : public StatefulService<WebCommands> {
#endif
private:
#ifndef EMSESP_STANDALONE
static void command_task(void * pvParameters);
static QueueHandle_t commandQueue_;
#endif
HttpEndpoint<WebCommands> _httpEndpoint;
FSPersistence<WebCommands> _fsPersistence;

View File

@@ -39,16 +39,6 @@ void WebSchedulerService::begin() {
char topic[Mqtt::MQTT_TOPIC_MAX_SIZE];
snprintf(topic, sizeof(topic), "%s/#", F_(scheduler));
Mqtt::subscribe(EMSdevice::DeviceType::SCHEDULER, topic, nullptr); // use empty function callback
#ifndef EMSESP_STANDALONE
if (EMSESP::system_.PSram()) {
#if defined(CONFIG_FREERTOS_UNICORE) || (EMSESP_SCHEDULER_RUNNING_CORE < 0)
xTaskCreate((TaskFunction_t)scheduler_task, "scheduler_task", EMSESP_SCHEDULER_STACKSIZE, NULL, EMSESP_SCHEDULER_PRIORITY, NULL);
#else
xTaskCreatePinnedToCore(
(TaskFunction_t)scheduler_task, "scheduler_task", EMSESP_SCHEDULER_STACKSIZE, NULL, EMSESP_SCHEDULER_PRIORITY, NULL, EMSESP_SCHEDULER_RUNNING_CORE);
#endif
}
#endif
#if defined(EMSESP_TEST)
load_test_data();
@@ -339,7 +329,7 @@ bool WebSchedulerService::runScheduleCommand(const ScheduleItem & si) {
EMSESP::logger().warning("Schedule '%s': no command assigned", si.name);
return false;
}
return EMSESP::webCommandService.executeCommand(si.cmd_name.c_str());
return EMSESP::webCommandService.dispatchCommand(si.cmd_name.c_str());
}
// queue schedules to be handled executed in WebSchedulerService::loop() called from emsesp.cpp
@@ -443,19 +433,6 @@ void WebSchedulerService::loop() {
}
}
// process schedules async
void WebSchedulerService::scheduler_task(void * pvParameters) {
while (1) {
delay(10);
if (EMSESP::system_.systemStatus() == SYSTEM_STATUS::SYSTEM_STATUS_NORMAL) {
EMSESP::webSchedulerService.loop();
}
}
#ifndef EMSESP_STANDALONE
vTaskDelete(NULL);
#endif
}
#if defined(EMSESP_TEST)
void WebSchedulerService::load_test_data() {
Command::erase_device_commands(EMSdevice::DeviceType::SCHEDULER);

View File

@@ -24,18 +24,6 @@
#define EMSESP_SCHEDULER_FILE "/config/emsespScheduler.json"
#define EMSESP_SCHEDULER_SERVICE_PATH "/rest/schedule" // GET and POST
#ifndef EMSESP_SCHEDULER_RUNNING_CORE
#define EMSESP_SCHEDULER_RUNNING_CORE 1
#endif
#ifndef EMSESP_SCHEDULER_STACKSIZE
#define EMSESP_SCHEDULER_STACKSIZE 5120
#endif
#ifndef EMSESP_SCHEDULER_PRIORITY
#define EMSESP_SCHEDULER_PRIORITY 1
#endif
// bit flags for the schedule items. Matches those in interface/src/app/main/SchedulerDialog.tsx
// 0-127 (0->0x7F) is day schedule
// 128 (0x80) is timer
@@ -98,8 +86,6 @@ class WebSchedulerService : public StatefulService<WebScheduler> {
#ifndef EMSESP_STANDALONE
private:
#endif
static void scheduler_task(void * pvParameters);
bool runScheduleCommand(const ScheduleItem & si);
void condition();

View File

@@ -233,7 +233,7 @@ void WebStatusService::action(AsyncWebServerRequest * request, JsonVariant json)
} else if (action == "upgradeImportantMessages") {
root["upgradeImportantMessageType"] = upgradeImportantMessages(param);
} else if (action == "executeCommand") {
ok = EMSESP::webCommandService.executeCommand(param.c_str());
ok = EMSESP::webCommandService.dispatchCommand(param.c_str()); // command worker task (ok = dispatched); fast internal commands run inline (ok = real result)
}
#if defined(EMSESP_STANDALONE) && !defined(EMSESP_UNITY)