1 Star 0 Fork 0

石明强/esp-mqtt

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
mqtt_client.c 75.22 KB
一键复制 编辑 原始数据 按行查看 历史
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961
#include <stdio.h>
#include "platform.h"
#include "esp_event.h"
#include "mqtt_client.h"
#include "mqtt_msg.h"
#include "esp_transport.h"
#include "esp_transport_tcp.h"
#include "esp_transport_ssl.h"
#include "esp_transport_ws.h"
#include "esp_log.h"
#include "mqtt_outbox.h"
#include "freertos/event_groups.h"
#include <errno.h>
#include <string.h>
#include "mqtt_supported_features.h"
/* using uri parser */
#include "http_parser.h"
#ifdef MQTT_DISABLE_API_LOCKS
# define MQTT_API_LOCK(c)
# define MQTT_API_UNLOCK(c)
#else
# define MQTT_API_LOCK(c) xSemaphoreTakeRecursive(c->api_lock, portMAX_DELAY)
# define MQTT_API_UNLOCK(c) xSemaphoreGiveRecursive(c->api_lock)
#endif /* MQTT_USE_API_LOCKS */
_Static_assert(sizeof(uint64_t) == sizeof(outbox_tick_t), "mqtt-client tick type size different from outbox tick type");
#ifdef ESP_EVENT_ANY_ID
_Static_assert(MQTT_EVENT_ANY == ESP_EVENT_ANY_ID, "mqtt-client event enum does not match the global EVENT_ANY_ID");
#endif
static const char *TAG = "MQTT_CLIENT";
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
/**
* @brief Define of MQTT Event base
*
*/
ESP_EVENT_DEFINE_BASE(MQTT_EVENTS);
#endif
typedef struct mqtt_state {
mqtt_connect_info_t *connect_info;
uint8_t *in_buffer;
uint8_t *out_buffer;
int in_buffer_length;
int out_buffer_length;
size_t message_length;
size_t in_buffer_read_len;
mqtt_message_t *outbound_message;
mqtt_connection_t mqtt_connection;
uint16_t pending_msg_id;
int pending_msg_type;
int pending_publish_qos;
int pending_msg_count;
} mqtt_state_t;
typedef struct {
mqtt_event_callback_t event_handle;
esp_event_loop_handle_t event_loop_handle;
int task_stack;
int task_prio;
char *uri;
char *host;
char *path;
char *scheme;
int port;
bool auto_reconnect;
void *user_context;
int network_timeout_ms;
int refresh_connection_after_ms;
int reconnect_timeout_ms;
char **alpn_protos;
int num_alpn_protos;
char *clientkey_password;
int clientkey_password_len;
bool use_global_ca_store;
esp_err_t ((*crt_bundle_attach)(void *conf));
const char *cacert_buf;
size_t cacert_bytes;
const char *clientcert_buf;
size_t clientcert_bytes;
const char *clientkey_buf;
size_t clientkey_bytes;
const struct psk_key_hint *psk_hint_key;
bool skip_cert_common_name_check;
bool use_secure_element;
void *ds_data;
int message_retransmit_timeout;
} mqtt_config_storage_t;
typedef enum {
MQTT_STATE_INIT = 0,
MQTT_STATE_DISCONNECTED,
MQTT_STATE_CONNECTED,
MQTT_STATE_WAIT_RECONNECT,
} mqtt_client_state_t;
struct esp_mqtt_client {
esp_transport_list_handle_t transport_list;
esp_transport_handle_t transport;
mqtt_config_storage_t *config;
mqtt_state_t mqtt_state;
mqtt_connect_info_t connect_info;
mqtt_client_state_t state;
uint64_t refresh_connection_tick;
uint64_t keepalive_tick;
uint64_t reconnect_tick;
int wait_timeout_ms;
int auto_reconnect;
esp_mqtt_event_t event;
bool run;
bool wait_for_ping_resp;
outbox_handle_t outbox;
EventGroupHandle_t status_bits;
SemaphoreHandle_t api_lock;
TaskHandle_t task_handle;
};
const static int STOPPED_BIT = (1 << 0);
const static int RECONNECT_BIT = (1 << 1);
const static int DISCONNECT_BIT = (1 << 2);
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client);
static void esp_mqtt_destroy_config(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_ms);
static void esp_mqtt_abort_connection(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client);
static char *create_string(const char *ptr, int len);
static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_timeout_ms);
static void esp_mqtt_client_dispatch_transport_error(esp_mqtt_client_handle_t client);
static esp_err_t send_disconnect_msg(esp_mqtt_client_handle_t client);
#if MQTT_ENABLE_SSL
enum esp_mqtt_ssl_cert_key_api {
MQTT_SSL_DATA_API_CA_CERT,
MQTT_SSL_DATA_API_CLIENT_CERT,
MQTT_SSL_DATA_API_CLIENT_KEY,
MQTT_SSL_DATA_API_MAX,
};
static esp_err_t esp_mqtt_set_cert_key_data(esp_transport_handle_t ssl, enum esp_mqtt_ssl_cert_key_api what, const char *cert_key_data, int cert_key_len)
{
char *data = (char *)cert_key_data;
int ssl_transport_api_id = what;
int len = cert_key_len;
if (!data) {
return ESP_OK;
}
if (len == 0) {
// if length not specified, expect 0-terminated PEM string
// and the original transport_api_id (by convention after the last api_id in the enum)
ssl_transport_api_id += MQTT_SSL_DATA_API_MAX;
len = strlen(data);
}
#ifndef MQTT_SUPPORTED_FEATURE_DER_CERTIFICATES
else {
ESP_LOGE(TAG, "Explicit cert-/key-len is not available in IDF version %s", IDF_VER);
return ESP_ERR_NOT_SUPPORTED;
}
#endif
// option to force the cert/key config to null (i.e. skip validation) when existing config updates
if (0 == strcmp(data, "NULL")) {
data = NULL;
len = 0;
}
switch (ssl_transport_api_id) {
#ifdef MQTT_SUPPORTED_FEATURE_DER_CERTIFICATES
case MQTT_SSL_DATA_API_CA_CERT:
esp_transport_ssl_set_cert_data_der(ssl, data, len);
break;
case MQTT_SSL_DATA_API_CLIENT_CERT:
esp_transport_ssl_set_client_cert_data_der(ssl, data, len);
break;
case MQTT_SSL_DATA_API_CLIENT_KEY:
esp_transport_ssl_set_client_key_data_der(ssl, data, len);
break;
#endif
case MQTT_SSL_DATA_API_CA_CERT + MQTT_SSL_DATA_API_MAX:
esp_transport_ssl_set_cert_data(ssl, data, len);
break;
case MQTT_SSL_DATA_API_CLIENT_CERT + MQTT_SSL_DATA_API_MAX:
esp_transport_ssl_set_client_cert_data(ssl, data, len);
break;
case MQTT_SSL_DATA_API_CLIENT_KEY + MQTT_SSL_DATA_API_MAX:
esp_transport_ssl_set_client_key_data(ssl, data, len);
break;
default:
return ESP_ERR_INVALID_ARG;
}
return ESP_OK;
}
static esp_err_t esp_mqtt_set_ssl_transport_properties(esp_transport_list_handle_t transport_list, mqtt_config_storage_t *cfg)
{
esp_transport_handle_t ssl = esp_transport_list_get_transport(transport_list, "mqtts");
if (cfg->use_global_ca_store == true) {
esp_transport_ssl_enable_global_ca_store(ssl);
} else if (cfg->crt_bundle_attach != NULL) {
#ifdef MQTT_SUPPORTED_FEATURE_CERTIFICATE_BUNDLE
#ifdef CONFIG_MBEDTLS_CERTIFICATE_BUNDLE
esp_transport_ssl_crt_bundle_attach(ssl, cfg->crt_bundle_attach);
#else
ESP_LOGE(TAG, "Certificate bundle is not enabled for mbedTLS in menuconfig");
goto esp_mqtt_set_transport_failed;
#endif /* CONFIG_MBEDTLS_CERTIFICATE_BUNDLE */
#else
ESP_LOGE(TAG, "Certificate bundle feature is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;
#endif /* MQTT_SUPPORTED_FEATURE_CERTIFICATE_BUNDLE */
} else {
ESP_OK_CHECK(TAG, esp_mqtt_set_cert_key_data(ssl, MQTT_SSL_DATA_API_CA_CERT, cfg->cacert_buf, cfg->cacert_bytes),
goto esp_mqtt_set_transport_failed);
}
if (cfg->use_secure_element) {
#ifdef MQTT_SUPPORTED_FEATURE_SECURE_ELEMENT
#ifdef CONFIG_ESP_TLS_USE_SECURE_ELEMENT
esp_transport_ssl_use_secure_element(ssl);
#else
ESP_LOGE(TAG, "Secure element not enabled for esp-tls in menuconfig");
goto esp_mqtt_set_transport_failed;
#endif /* CONFIG_ESP_TLS_USE_SECURE_ELEMENT */
#else
ESP_LOGE(TAG, "Secure element feature is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;
#endif /* MQTT_SUPPORTED_FEATURE_SECURE_ELEMENT */
}
if (cfg->ds_data != NULL) {
#ifdef MQTT_SUPPORTED_FEATURE_DIGITAL_SIGNATURE
#ifdef CONFIG_ESP_TLS_USE_DS_PERIPHERAL
esp_transport_ssl_set_ds_data(ssl, cfg->ds_data);
#else
ESP_LOGE(TAG, "Digital Signature not enabled for esp-tls in menuconfig");
goto esp_mqtt_set_transport_failed;
#endif /* CONFIG_ESP_TLS_USE_DS_PERIPHERAL */
#else
ESP_LOGE(TAG, "Digital Signature feature is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;
#endif
}
ESP_OK_CHECK(TAG, esp_mqtt_set_cert_key_data(ssl, MQTT_SSL_DATA_API_CLIENT_CERT, cfg->clientcert_buf, cfg->clientcert_bytes),
goto esp_mqtt_set_transport_failed);
ESP_OK_CHECK(TAG, esp_mqtt_set_cert_key_data(ssl, MQTT_SSL_DATA_API_CLIENT_KEY, cfg->clientkey_buf, cfg->clientkey_bytes),
goto esp_mqtt_set_transport_failed);
if (cfg->clientkey_password && cfg->clientkey_password_len) {
#if defined(MQTT_SUPPORTED_FEATURE_CLIENT_KEY_PASSWORD) && MQTT_ENABLE_SSL
esp_transport_ssl_set_client_key_password(ssl,
cfg->clientkey_password,
cfg->clientkey_password_len);
#else
ESP_LOGE(TAG, "Password protected keys are not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;
#endif
}
if (cfg->psk_hint_key) {
#if defined(MQTT_SUPPORTED_FEATURE_PSK_AUTHENTICATION) && MQTT_ENABLE_SSL
#ifdef CONFIG_ESP_TLS_PSK_VERIFICATION
esp_transport_ssl_set_psk_key_hint(ssl, cfg->psk_hint_key);
#else
ESP_LOGE(TAG, "PSK authentication configured but not enabled in menuconfig: Please enable ESP_TLS_PSK_VERIFICATION option");
goto esp_mqtt_set_transport_failed;
#endif
#else
ESP_LOGE(TAG, "PSK authentication is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;
#endif
}
if (cfg->alpn_protos) {
#if defined(MQTT_SUPPORTED_FEATURE_ALPN) && MQTT_ENABLE_SSL
#if defined(CONFIG_MBEDTLS_SSL_ALPN) || defined(CONFIG_WOLFSSL_HAVE_ALPN)
esp_transport_ssl_set_alpn_protocol(ssl, (const char **)cfg->alpn_protos);
#else
ESP_LOGE(TAG, "APLN configured but not enabled in menuconfig: Please enable MBEDTLS_SSL_ALPN or WOLFSSL_HAVE_ALPN option");
goto esp_mqtt_set_transport_failed;
#endif
#else
ESP_LOGE(TAG, "APLN is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;
#endif
}
if (cfg->skip_cert_common_name_check) {
#if defined(MQTT_SUPPORTED_FEATURE_SKIP_CRT_CMN_NAME_CHECK) && MQTT_ENABLE_SSL
esp_transport_ssl_skip_common_name_check(ssl);
#else
ESP_LOGE(TAG, "Skip certificate common name check is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;
#endif
}
return ESP_OK;
esp_mqtt_set_transport_failed:
return ESP_FAIL;
}
#endif // MQTT_ENABLE_SSL
/* Checks if the user supplied config values are internally consistent */
static esp_err_t esp_mqtt_check_cfg_conflict(const mqtt_config_storage_t *cfg, const esp_mqtt_client_config_t *user_cfg)
{
esp_err_t ret = ESP_OK;
bool ssl_cfg_enabled = cfg->use_global_ca_store || cfg->cacert_buf || cfg->clientcert_buf || cfg->psk_hint_key || cfg->alpn_protos;
bool is_ssl_scheme = false;
if (cfg->scheme) {
is_ssl_scheme = (strcasecmp(cfg->scheme, "mqtts") == 0) || (strcasecmp(cfg->scheme, "wss") == 0);
}
if (!is_ssl_scheme && ssl_cfg_enabled) {
if (cfg->uri) {
ESP_LOGW(TAG, "SSL related configs set, but the URI scheme specifies a non-SSL scheme, scheme = %s", cfg->scheme);
} else {
ESP_LOGW(TAG, "SSL related configs set, but the transport protocol is a non-SSL scheme, transport = %d", user_cfg->transport);
}
ret = ESP_ERR_INVALID_ARG;
}
if (cfg->uri && user_cfg->transport) {
ESP_LOGW(TAG, "Transport config set, but overridden by scheme from URI: transport = %d, uri scheme = %s", user_cfg->transport, cfg->scheme);
ret = ESP_ERR_INVALID_ARG;
}
return ret;
}
static bool set_if_config(char const *const new_config, char **old_config)
{
if (new_config) {
free(*old_config);
*old_config = strdup(new_config);
if (*old_config == NULL) {
return false;
}
}
return true;
}
esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return ESP_ERR_INVALID_ARG;
}
MQTT_API_LOCK(client);
//Copy user configurations to client context
esp_err_t err = ESP_OK;
if (!client->config) {
client->config = calloc(1, sizeof(mqtt_config_storage_t));
ESP_MEM_CHECK(TAG, client->config, {
MQTT_API_UNLOCK(client);
return ESP_ERR_NO_MEM;
});
}
client->config->message_retransmit_timeout = config->message_retransmit_timeout;
if (config->message_retransmit_timeout <= 0) {
client->config->message_retransmit_timeout = 1000;
}
client->config->task_prio = config->task_prio;
if (client->config->task_prio <= 0) {
client->config->task_prio = MQTT_TASK_PRIORITY;
}
client->config->task_stack = config->task_stack;
if (client->config->task_stack <= 0) {
client->config->task_stack = MQTT_TASK_STACK;
}
if (config->port) {
client->config->port = config->port;
}
err = ESP_ERR_NO_MEM;
ESP_MEM_CHECK(TAG, set_if_config(config->host, &client->config->host), goto _mqtt_set_config_failed);
ESP_MEM_CHECK(TAG, set_if_config(config->path, &client->config->path), goto _mqtt_set_config_failed);
ESP_MEM_CHECK(TAG, set_if_config(config->username, &client->connect_info.username), goto _mqtt_set_config_failed);
ESP_MEM_CHECK(TAG, set_if_config(config->password, &client->connect_info.password), goto _mqtt_set_config_failed);
if (config->client_id) {
ESP_MEM_CHECK(TAG, set_if_config(config->client_id, &client->connect_info.client_id), goto _mqtt_set_config_failed);
} else if (client->connect_info.client_id == NULL) {
client->connect_info.client_id = platform_create_id_string();
}
ESP_MEM_CHECK(TAG, client->connect_info.client_id, goto _mqtt_set_config_failed);
ESP_LOGD(TAG, "MQTT client_id=%s", client->connect_info.client_id);
ESP_MEM_CHECK(TAG, set_if_config(config->uri, &client->config->uri), goto _mqtt_set_config_failed);
ESP_MEM_CHECK(TAG, set_if_config(config->lwt_topic, &client->connect_info.will_topic), goto _mqtt_set_config_failed);
if (config->lwt_msg_len && config->lwt_msg) {
free(client->connect_info.will_message);
client->connect_info.will_message = malloc(config->lwt_msg_len);
ESP_MEM_CHECK(TAG, client->connect_info.will_message, goto _mqtt_set_config_failed);
memcpy(client->connect_info.will_message, config->lwt_msg, config->lwt_msg_len);
client->connect_info.will_length = config->lwt_msg_len;
} else if (config->lwt_msg) {
free(client->connect_info.will_message);
client->connect_info.will_message = strdup(config->lwt_msg);
ESP_MEM_CHECK(TAG, client->connect_info.will_message, goto _mqtt_set_config_failed);
client->connect_info.will_length = strlen(config->lwt_msg);
}
if (config->lwt_qos) {
client->connect_info.will_qos = config->lwt_qos;
}
if (config->lwt_retain) {
client->connect_info.will_retain = config->lwt_retain;
}
if (config->disable_clean_session == client->connect_info.clean_session) {
client->connect_info.clean_session = !config->disable_clean_session;
}
if (config->keepalive) {
client->connect_info.keepalive = config->keepalive;
}
if (client->connect_info.keepalive == 0) {
client->connect_info.keepalive = MQTT_KEEPALIVE_TICK;
}
if (config->disable_keepalive) {
// internal `keepalive` value (in connect_info) is in line with 3.1.2.10 Keep Alive from mqtt spec:
// * keepalive=0: Keep alive mechanism disabled (server not to disconnect the client on its inactivity)
// * period in seconds to send a Control packet if inactive
client->connect_info.keepalive = 0;
}
if (config->protocol_ver) {
client->connect_info.protocol_ver = config->protocol_ver;
}
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_UNDEFINED) {
#ifdef MQTT_PROTOCOL_311
client->connect_info.protocol_ver = MQTT_PROTOCOL_V_3_1_1;
#else
client->connect_info.protocol_ver = MQTT_PROTOCOL_V_3_1;
#endif
}
client->config->network_timeout_ms = config->network_timeout_ms;
if (client->config->network_timeout_ms <= 0) {
client->config->network_timeout_ms = MQTT_NETWORK_TIMEOUT_MS;
}
if (config->user_context) {
client->config->user_context = config->user_context;
}
if (config->event_handle) {
client->config->event_handle = config->event_handle;
}
if (config->refresh_connection_after_ms) {
client->config->refresh_connection_after_ms = config->refresh_connection_after_ms;
}
client->config->auto_reconnect = true;
if (config->disable_auto_reconnect == client->config->auto_reconnect) {
client->config->auto_reconnect = !config->disable_auto_reconnect;
}
if (config->reconnect_timeout_ms) {
client->config->reconnect_timeout_ms = config->reconnect_timeout_ms;
} else {
client->config->reconnect_timeout_ms = MQTT_RECON_DEFAULT_MS;
}
if (config->alpn_protos) {
for (int i = 0; i < client->config->num_alpn_protos; i++) {
free(client->config->alpn_protos[i]);
}
free(client->config->alpn_protos);
client->config->num_alpn_protos = 0;
const char **p;
for (p = config->alpn_protos; *p != NULL; p++ ) {
client->config->num_alpn_protos++;
}
// mbedTLS expects the list to be null-terminated
client->config->alpn_protos = calloc(client->config->num_alpn_protos + 1, sizeof(config->alpn_protos));
ESP_MEM_CHECK(TAG, client->config->alpn_protos, goto _mqtt_set_config_failed);
for (int i = 0; i < client->config->num_alpn_protos; i++) {
client->config->alpn_protos[i] = strdup(config->alpn_protos[i]);
ESP_MEM_CHECK(TAG, client->config->alpn_protos[i], goto _mqtt_set_config_failed);
}
}
// configure ssl related parameters
client->config->use_global_ca_store = config->use_global_ca_store;
client->config->cacert_buf = config->cert_pem;
client->config->cacert_bytes = config->cert_len;
client->config->clientcert_buf = config->client_cert_pem;
client->config->clientcert_bytes = config->client_cert_len;
client->config->clientkey_buf = config->client_key_pem;
client->config->clientkey_bytes = config->client_key_len;
client->config->psk_hint_key = config->psk_hint_key;
client->config->skip_cert_common_name_check = config->skip_cert_common_name_check;
client->config->use_secure_element = config->use_secure_element;
client->config->crt_bundle_attach = config->crt_bundle_attach;
client->config->ds_data = config->ds_data;
if (config->clientkey_password && config->clientkey_password_len) {
client->config->clientkey_password_len = config->clientkey_password_len;
client->config->clientkey_password = malloc(client->config->clientkey_password_len);
ESP_MEM_CHECK(TAG, client->config->clientkey_password, goto _mqtt_set_config_failed);
memcpy(client->config->clientkey_password, config->clientkey_password, client->config->clientkey_password_len);
}
if (config->transport) {
free(client->config->scheme);
if (config->transport == MQTT_TRANSPORT_OVER_TCP) {
client->config->scheme = create_string("mqtt", 4);
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_set_config_failed);
}
#if MQTT_ENABLE_WS
else if (config->transport == MQTT_TRANSPORT_OVER_WS) {
client->config->scheme = create_string("ws", 2);
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_set_config_failed);
}
#endif
#if MQTT_ENABLE_SSL
else if (config->transport == MQTT_TRANSPORT_OVER_SSL) {
client->config->scheme = create_string("mqtts", 5);
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_set_config_failed);
}
#endif
#if MQTT_ENABLE_WSS
else if (config->transport == MQTT_TRANSPORT_OVER_WSS) {
client->config->scheme = create_string("wss", 3);
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_set_config_failed);
}
#endif
}
// Set uri at the end of config to override separately configured uri elements
if (config->uri) {
if (esp_mqtt_client_set_uri(client, client->config->uri) != ESP_OK) {
err = ESP_FAIL;
goto _mqtt_set_config_failed;
}
}
esp_err_t config_has_conflict = esp_mqtt_check_cfg_conflict(client->config, config);
MQTT_API_UNLOCK(client);
return config_has_conflict;
_mqtt_set_config_failed:
esp_mqtt_destroy_config(client);
MQTT_API_UNLOCK(client);
return err;
}
static void esp_mqtt_destroy_config(esp_mqtt_client_handle_t client)
{
if (client->config == NULL) {
return;
}
free(client->config->host);
free(client->config->uri);
free(client->config->path);
free(client->config->scheme);
for (int i = 0; i < client->config->num_alpn_protos; i++) {
free(client->config->alpn_protos[i]);
}
free(client->config->alpn_protos);
free(client->config->clientkey_password);
free(client->connect_info.will_topic);
free(client->connect_info.will_message);
free(client->connect_info.client_id);
free(client->connect_info.username);
free(client->connect_info.password);
memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t));
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
if (client->config->event_loop_handle) {
esp_event_loop_delete(client->config->event_loop_handle);
}
#endif
memset(client->config, 0, sizeof(mqtt_config_storage_t));
free(client->config);
client->config = NULL;
}
static inline esp_err_t esp_mqtt_write(esp_mqtt_client_handle_t client)
{
int wlen = 0, widx = 0, len = client->mqtt_state.outbound_message->length;
while (len > 0) {
wlen = esp_transport_write(client->transport,
(char *)client->mqtt_state.outbound_message->data + widx,
len,
client->config->network_timeout_ms);
if (wlen < 0) {
ESP_LOGE(TAG, "Writing failed: errno=%d", errno);
return ESP_FAIL;
} else if (wlen == 0) {
ESP_LOGE(TAG, "Writing didn't complete in specified timeout: errno=%d", errno);
return ESP_ERR_TIMEOUT;
}
widx += wlen;
len -= wlen;
}
return ESP_OK;
}
static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_ms)
{
int read_len, connect_rsp_code;
client->wait_for_ping_resp = false;
client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection,
client->mqtt_state.connect_info);
if (client->mqtt_state.outbound_message->length == 0) {
ESP_LOGE(TAG, "Connect message cannot be created");
return ESP_FAIL;
}
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length);
ESP_LOGD(TAG, "Sending MQTT CONNECT message, type: %d, id: %04X",
client->mqtt_state.pending_msg_type,
client->mqtt_state.pending_msg_id);
if (esp_mqtt_write(client) != ESP_OK) {
return ESP_FAIL;
}
client->mqtt_state.in_buffer_read_len = 0;
client->mqtt_state.message_length = 0;
/* wait configured network timeout for broker connection response */
uint64_t connack_recv_started = platform_tick_get_ms();
do {
read_len = mqtt_message_receive(client, client->config->network_timeout_ms);
} while (read_len == 0 && platform_tick_get_ms() - connack_recv_started < client->config->network_timeout_ms);
if (read_len <= 0) {
ESP_LOGE(TAG, "%s: mqtt_message_receive() returned %d", __func__, read_len);
return ESP_FAIL;
}
if (mqtt_get_type(client->mqtt_state.in_buffer) != MQTT_MSG_TYPE_CONNACK) {
ESP_LOGE(TAG, "Invalid MSG_TYPE response: %d, read_len: %d", mqtt_get_type(client->mqtt_state.in_buffer), read_len);
return ESP_FAIL;
}
client->mqtt_state.in_buffer_read_len = 0;
connect_rsp_code = mqtt_get_connect_return_code(client->mqtt_state.in_buffer);
if (connect_rsp_code == MQTT_CONNECTION_ACCEPTED) {
ESP_LOGD(TAG, "Connected");
return ESP_OK;
}
switch (connect_rsp_code) {
case MQTT_CONNECTION_REFUSE_PROTOCOL:
ESP_LOGW(TAG, "Connection refused, bad protocol");
break;
case MQTT_CONNECTION_REFUSE_SERVER_UNAVAILABLE:
ESP_LOGW(TAG, "Connection refused, server unavailable");
break;
case MQTT_CONNECTION_REFUSE_BAD_USERNAME:
ESP_LOGW(TAG, "Connection refused, bad username or password");
break;
case MQTT_CONNECTION_REFUSE_NOT_AUTHORIZED:
ESP_LOGW(TAG, "Connection refused, not authorized");
break;
default:
ESP_LOGW(TAG, "Connection refused, Unknow reason");
break;
}
/* propagate event with connection refused error */
client->event.event_id = MQTT_EVENT_ERROR;
client->event.error_handle->error_type = MQTT_ERROR_TYPE_CONNECTION_REFUSED;
client->event.error_handle->connect_return_code = connect_rsp_code;
client->event.error_handle->esp_tls_stack_err = 0;
client->event.error_handle->esp_tls_last_esp_err = 0;
client->event.error_handle->esp_tls_cert_verify_flags = 0;
esp_mqtt_dispatch_event_with_msgid(client);
return ESP_FAIL;
}
static void esp_mqtt_abort_connection(esp_mqtt_client_handle_t client)
{
MQTT_API_LOCK(client);
esp_transport_close(client->transport);
client->wait_timeout_ms = client->config->reconnect_timeout_ms;
client->reconnect_tick = platform_tick_get_ms();
client->state = MQTT_STATE_WAIT_RECONNECT;
ESP_LOGD(TAG, "Reconnect after %d ms", client->wait_timeout_ms);
client->event.event_id = MQTT_EVENT_DISCONNECTED;
client->wait_for_ping_resp = false;
esp_mqtt_dispatch_event_with_msgid(client);
MQTT_API_UNLOCK(client);
}
static bool create_client_data(esp_mqtt_client_handle_t client)
{
client->event.error_handle = calloc(1, sizeof(esp_mqtt_error_codes_t));
ESP_MEM_CHECK(TAG, client->event.error_handle, return false)
client->api_lock = xSemaphoreCreateRecursiveMutex();
ESP_MEM_CHECK(TAG, client->api_lock, return false);
client->transport_list = esp_transport_list_init();
ESP_MEM_CHECK(TAG, client->transport_list, return false);
esp_transport_handle_t tcp = esp_transport_tcp_init();
ESP_MEM_CHECK(TAG, tcp, return false);
esp_transport_set_default_port(tcp, MQTT_TCP_DEFAULT_PORT);
esp_transport_list_add(client->transport_list, tcp, "mqtt");
#if MQTT_ENABLE_WS
esp_transport_handle_t ws = esp_transport_ws_init(tcp);
ESP_MEM_CHECK(TAG, ws, return false);
esp_transport_set_default_port(ws, MQTT_WS_DEFAULT_PORT);
#ifdef MQTT_SUPPORTED_FEATURE_WS_SUBPROTOCOL
esp_transport_ws_set_subprotocol(ws, "mqtt");
#endif
esp_transport_list_add(client->transport_list, ws, "ws");
#endif
#if MQTT_ENABLE_SSL
esp_transport_handle_t ssl = esp_transport_ssl_init();
ESP_MEM_CHECK(TAG, ssl, return false);
esp_transport_set_default_port(ssl, MQTT_SSL_DEFAULT_PORT);
esp_transport_list_add(client->transport_list, ssl, "mqtts");
#endif
#if MQTT_ENABLE_WSS
esp_transport_handle_t wss = esp_transport_ws_init(ssl);
ESP_MEM_CHECK(TAG, wss, return false);
#ifdef MQTT_SUPPORTED_FEATURE_WS_SUBPROTOCOL
esp_transport_ws_set_subprotocol(wss, "mqtt");
#endif
esp_transport_set_default_port(wss, MQTT_WSS_DEFAULT_PORT);
esp_transport_list_add(client->transport_list, wss, "wss");
#endif
ESP_MEM_CHECK(TAG, client->transport_list, return false);
return true;
}
esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *config)
{
esp_mqtt_client_handle_t client = calloc(1, sizeof(struct esp_mqtt_client));
ESP_MEM_CHECK(TAG, client, return NULL);
if (!create_client_data(client)) {
goto _mqtt_init_failed;
}
if (esp_mqtt_set_config(client, config) != ESP_OK) {
goto _mqtt_init_failed;
}
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
esp_event_loop_args_t no_task_loop = {
.queue_size = 1,
.task_name = NULL,
};
esp_event_loop_create(&no_task_loop, &client->config->event_loop_handle);
#endif
client->keepalive_tick = platform_tick_get_ms();
client->reconnect_tick = platform_tick_get_ms();
client->refresh_connection_tick = platform_tick_get_ms();
client->wait_for_ping_resp = false;
int buffer_size = config->buffer_size;
if (buffer_size <= 0) {
buffer_size = MQTT_BUFFER_SIZE_BYTE;
}
// use separate value for output buffer size if configured
int out_buffer_size = config->out_buffer_size > 0 ? config->out_buffer_size : buffer_size;
client->mqtt_state.in_buffer = (uint8_t *)malloc(buffer_size);
ESP_MEM_CHECK(TAG, client->mqtt_state.in_buffer, goto _mqtt_init_failed);
client->mqtt_state.in_buffer_length = buffer_size;
client->mqtt_state.out_buffer = (uint8_t *)malloc(out_buffer_size);
ESP_MEM_CHECK(TAG, client->mqtt_state.out_buffer, goto _mqtt_init_failed);
client->mqtt_state.out_buffer_length = out_buffer_size;
client->mqtt_state.connect_info = &client->connect_info;
client->outbox = outbox_init();
ESP_MEM_CHECK(TAG, client->outbox, goto _mqtt_init_failed);
client->status_bits = xEventGroupCreate();
ESP_MEM_CHECK(TAG, client->status_bits, goto _mqtt_init_failed);
mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer,
client->mqtt_state.out_buffer_length);
return client;
_mqtt_init_failed:
esp_mqtt_client_destroy(client);
return NULL;
}
esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client)
{
if (client == NULL) {
return ESP_ERR_INVALID_ARG;
}
if (client->api_lock) {
esp_mqtt_client_stop(client);
}
esp_mqtt_destroy_config(client);
if (client->transport_list) {
esp_transport_list_destroy(client->transport_list);
}
if (client->outbox) {
outbox_destroy(client->outbox);
}
if (client->status_bits) {
vEventGroupDelete(client->status_bits);
}
free(client->mqtt_state.in_buffer);
free(client->mqtt_state.out_buffer);
if (client->api_lock) {
vSemaphoreDelete(client->api_lock);
}
free(client->event.error_handle);
free(client);
return ESP_OK;
}
static char *create_string(const char *ptr, int len)
{
char *ret;
if (len <= 0) {
return NULL;
}
ret = calloc(1, len + 1);
ESP_MEM_CHECK(TAG, ret, return NULL);
memcpy(ret, ptr, len);
return ret;
}
esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *uri)
{
struct http_parser_url puri;
http_parser_url_init(&puri);
int parser_status = http_parser_parse_url(uri, strlen(uri), 0, &puri);
if (parser_status != 0) {
ESP_LOGE(TAG, "Error parse uri = %s", uri);
return ESP_FAIL;
}
// This API could be also executed when client is active (need to protect config fields)
MQTT_API_LOCK(client);
// set uri overrides actual scheme, host, path if configured previously
free(client->config->scheme);
free(client->config->host);
free(client->config->path);
client->config->scheme = create_string(uri + puri.field_data[UF_SCHEMA].off, puri.field_data[UF_SCHEMA].len);
client->config->host = create_string(uri + puri.field_data[UF_HOST].off, puri.field_data[UF_HOST].len);
client->config->path = NULL;
if (puri.field_data[UF_PATH].len || puri.field_data[UF_QUERY].len) {
if (puri.field_data[UF_QUERY].len == 0) {
asprintf(&client->config->path, "%.*s", puri.field_data[UF_PATH].len, uri + puri.field_data[UF_PATH].off);
} else if (puri.field_data[UF_PATH].len == 0) {
asprintf(&client->config->path, "/?%.*s", puri.field_data[UF_QUERY].len, uri + puri.field_data[UF_QUERY].off);
} else {
asprintf(&client->config->path, "%.*s?%.*s", puri.field_data[UF_PATH].len, uri + puri.field_data[UF_PATH].off,
puri.field_data[UF_QUERY].len, uri + puri.field_data[UF_QUERY].off);
}
ESP_MEM_CHECK(TAG, client->config->path, {
MQTT_API_UNLOCK(client);
return ESP_ERR_NO_MEM;
});
}
if (client->config->path) {
#if MQTT_ENABLE_WS
esp_transport_handle_t ws_trans = esp_transport_list_get_transport(client->transport_list, "ws");
if (ws_trans) {
esp_transport_ws_set_path(ws_trans, client->config->path);
}
#endif
#if MQTT_ENABLE_WSS
esp_transport_handle_t wss_trans = esp_transport_list_get_transport(client->transport_list, "wss");
if (wss_trans) {
esp_transport_ws_set_path(wss_trans, client->config->path);
}
#endif
}
if (puri.field_data[UF_PORT].len) {
client->config->port = strtol((const char *)(uri + puri.field_data[UF_PORT].off), NULL, 10);
}
char *user_info = create_string(uri + puri.field_data[UF_USERINFO].off, puri.field_data[UF_USERINFO].len);
if (user_info) {
char *pass = strchr(user_info, ':');
if (pass) {
pass[0] = 0; //terminal username
pass ++;
client->connect_info.password = strdup(pass);
}
client->connect_info.username = strdup(user_info);
free(user_info);
}
MQTT_API_UNLOCK(client);
return ESP_OK;
}
static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client)
{
if (esp_mqtt_write(client) != ESP_OK) {
esp_mqtt_client_dispatch_transport_error(client);
return ESP_FAIL;
}
/* we've just sent a mqtt control packet, update keepalive counter
* [MQTT-3.1.2-23]
*/
client->keepalive_tick = platform_tick_get_ms();
return ESP_OK;
}
static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client)
{
client->event.msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
return esp_mqtt_dispatch_event(client);
}
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
{
client->event.user_context = client->config->user_context;
client->event.client = client;
if (client->config->event_handle) {
return client->config->event_handle(&client->event);
} else {
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
esp_event_post_to(client->config->event_loop_handle, MQTT_EVENTS, client->event.event_id, &client->event, sizeof(client->event), portMAX_DELAY);
return esp_event_loop_run(client->config->event_loop_handle, 0);
#else
return ESP_FAIL;
#endif
}
}
static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
{
uint8_t *msg_buf = client->mqtt_state.in_buffer;
size_t msg_read_len = client->mqtt_state.in_buffer_read_len;
size_t msg_total_len = client->mqtt_state.message_length;
size_t msg_topic_len = msg_read_len, msg_data_len = msg_read_len;
size_t msg_data_offset = 0;
char *msg_topic = NULL, *msg_data = NULL;
// get topic
msg_topic = mqtt_get_publish_topic(msg_buf, &msg_topic_len);
if (msg_topic == NULL) {
ESP_LOGE(TAG, "%s: mqtt_get_publish_topic() failed", __func__);
return ESP_FAIL;
}
ESP_LOGD(TAG, "%s: msg_topic_len=%zu", __func__, msg_topic_len);
// get payload
msg_data = mqtt_get_publish_data(msg_buf, &msg_data_len);
if (msg_data_len > 0 && msg_data == NULL) {
ESP_LOGE(TAG, "%s: mqtt_get_publish_data() failed", __func__);
return ESP_FAIL;
}
// post data event
client->event.retain = mqtt_get_retain(msg_buf);
client->event.msg_id = mqtt_get_id(msg_buf, msg_data_len);
client->event.qos = mqtt_get_qos(msg_buf);
client->event.dup = mqtt_get_dup(msg_buf);
client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len;
post_data_event:
ESP_LOGD(TAG, "Get data len= %zu, topic len=%zu, total_data: %d offset: %zu", msg_data_len, msg_topic_len,
client->event.total_data_len, msg_data_offset);
client->event.event_id = MQTT_EVENT_DATA;
client->event.data = msg_data_len > 0 ? msg_data : NULL;
client->event.data_len = msg_data_len;
client->event.current_data_offset = msg_data_offset;
client->event.topic = msg_topic;
client->event.topic_len = msg_topic_len;
esp_mqtt_dispatch_event(client);
if (msg_read_len < msg_total_len) {
size_t buf_len = client->mqtt_state.in_buffer_length;
msg_data = (char *)client->mqtt_state.in_buffer;
msg_topic = NULL;
msg_topic_len = 0;
msg_data_offset += msg_data_len;
msg_data_len = esp_transport_read(client-> transport, (char *)client->mqtt_state.in_buffer,
msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len,
client->config->network_timeout_ms);
if (msg_data_len <= 0) {
ESP_LOGE(TAG, "Read error or timeout: len_read=%zu, errno=%d", msg_data_len, errno);
return ESP_FAIL;
}
msg_read_len += msg_data_len;
goto post_data_event;
}
return ESP_OK;
}
static esp_err_t deliver_suback(esp_mqtt_client_handle_t client)
{
uint8_t *msg_buf = client->mqtt_state.in_buffer;
size_t msg_data_len = client->mqtt_state.in_buffer_read_len;
char *msg_data = NULL;
msg_data = mqtt_get_suback_data(msg_buf, &msg_data_len);
if (msg_data_len <= 0) {
ESP_LOGE(TAG, "Failed to acquire suback data");
return ESP_FAIL;
}
// post data event
client->event.data_len = msg_data_len;
client->event.total_data_len = msg_data_len;
client->event.event_id = MQTT_EVENT_SUBSCRIBED;
client->event.data = msg_data;
client->event.current_data_offset = 0;
esp_mqtt_dispatch_event_with_msgid(client);
return ESP_OK;
}
static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id)
{
ESP_LOGD(TAG, "pending_id=%d, pending_msg_count = %d", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_count);
if (client->mqtt_state.pending_msg_count == 0) {
return false;
}
if (outbox_delete(client->outbox, msg_id, msg_type) == ESP_OK) {
client->mqtt_state.pending_msg_count --;
return true;
}
return false;
}
static outbox_item_handle_t mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len)
{
ESP_LOGD(TAG, "mqtt_enqueue_oversized id: %d, type=%d successful",
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
//lock mutex
outbox_message_t msg = { 0 };
msg.data = client->mqtt_state.outbound_message->data;
msg.len = client->mqtt_state.outbound_message->length;
msg.msg_id = client->mqtt_state.pending_msg_id;
msg.msg_type = client->mqtt_state.pending_msg_type;
msg.msg_qos = client->mqtt_state.pending_publish_qos;
msg.remaining_data = remaining_data;
msg.remaining_len = remaining_len;
//Copy to queue buffer
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
//unlock
}
static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client)
{
ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful",
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
if (client->mqtt_state.pending_msg_count > 0) {
outbox_message_t msg = { 0 };
msg.data = client->mqtt_state.outbound_message->data;
msg.len = client->mqtt_state.outbound_message->length;
msg.msg_id = client->mqtt_state.pending_msg_id;
msg.msg_type = client->mqtt_state.pending_msg_type;
msg.msg_qos = client->mqtt_state.pending_publish_qos;
//Copy to queue buffer
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
}
return NULL;
}
/*
* Returns:
* -1 in case of failure
* 0 if no message has been received
* 1 if a message has been received and placed to client->mqtt_state:
* message length: client->mqtt_state.message_length
* message content: client->mqtt_state.in_buffer
*/
static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_timeout_ms)
{
int read_len, total_len, fixed_header_len;
uint8_t *buf = client->mqtt_state.in_buffer + client->mqtt_state.in_buffer_read_len;
esp_transport_handle_t t = client->transport;
client->mqtt_state.message_length = 0;
if (client->mqtt_state.in_buffer_read_len == 0) {
/*
* Read first byte of the mqtt packet fixed header, it contains packet
* type and flags.
*/
read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms);
if (read_len < 0) {
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
goto err;
}
if (read_len == 0) {
ESP_LOGV(TAG, "%s: transport_read(): no data or EOF", __func__);
return 0;
}
ESP_LOGD(TAG, "%s: first byte: 0x%x", __func__, *buf);
/*
* Verify the flags and act according to MQTT protocol: close connection
* if the flags are set incorrectly.
*/
if (!mqtt_has_valid_msg_hdr(buf, read_len)) {
ESP_LOGE(TAG, "%s: received a message with an invalid header=0x%x", __func__, *buf);
goto err;
}
buf++;
client->mqtt_state.in_buffer_read_len++;
}
if ((client->mqtt_state.in_buffer_read_len == 1) ||
((client->mqtt_state.in_buffer_read_len < 6) && (*(buf - 1) & 0x80))) {
do {
/*
* Read the "remaining length" part of mqtt packet fixed header. It
* starts at second byte and spans up to 4 bytes, but we accept here
* only up to 2 bytes of remaining length, i.e. messages with
* maximal remaining length value = 16383 (maximal total message
* size of 16386 bytes).
*/
read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms);
if (read_len < 0) {
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
goto err;
}
if (read_len == 0) {
ESP_LOGD(TAG, "%s: transport_read(): no data or EOF", __func__);
return 0;
}
ESP_LOGD(TAG, "%s: read \"remaining length\" byte: 0x%x", __func__, *buf);
buf++;
client->mqtt_state.in_buffer_read_len++;
} while ((client->mqtt_state.in_buffer_read_len < 6) && (*(buf - 1) & 0x80));
}
total_len = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len, &fixed_header_len);
ESP_LOGD(TAG, "%s: total message length: %d (already read: %zu)", __func__, total_len, client->mqtt_state.in_buffer_read_len);
client->mqtt_state.message_length = total_len;
if (client->mqtt_state.in_buffer_length < total_len) {
if (mqtt_get_type(client->mqtt_state.in_buffer) == MQTT_MSG_TYPE_PUBLISH) {
/*
* In case larger publish messages, we only need to read full topic, data can be split to multiple data event.
* Evaluate and correct total_len to read only publish message header, so data can be read separately
*/
if (client->mqtt_state.in_buffer_read_len < fixed_header_len + 2) {
/* read next 2 bytes - topic length to get minimum portion of publish packet */
read_len = esp_transport_read(t, (char *)buf, client->mqtt_state.in_buffer_read_len - fixed_header_len + 2, read_poll_timeout_ms);
ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len);
if (read_len < 0) {
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
goto err;
} else if (read_len == 0) {
ESP_LOGD(TAG, "%s: transport_read(): no data or EOF", __func__);
return 0;
}
client->mqtt_state.in_buffer_read_len += read_len;
buf += read_len;
if (client->mqtt_state.in_buffer_read_len < fixed_header_len + 2) {
ESP_LOGD(TAG, "%s: transport_read(): message reading left in progress :: total message length: %d (already read: %zu)",
__func__, total_len, client->mqtt_state.in_buffer_read_len);
return 0;
}
}
int topic_len = client->mqtt_state.in_buffer[fixed_header_len] << 8;
topic_len |= client->mqtt_state.in_buffer[fixed_header_len + 1];
total_len = fixed_header_len + topic_len + (mqtt_get_qos(client->mqtt_state.in_buffer) > 0 ? 2 : 0);
ESP_LOGD(TAG, "%s: total len modified to %d as message longer than input buffer", __func__, total_len);
if (client->mqtt_state.in_buffer_length < total_len) {
ESP_LOGE(TAG, "%s: message is too big, insufficient buffer size", __func__);
goto err;
} else {
total_len = client->mqtt_state.in_buffer_length;
}
/* free to continue with reading */
} else {
ESP_LOGE(TAG, "%s: message is too big, insufficient buffer size", __func__);
goto err;
}
}
if (client->mqtt_state.in_buffer_read_len < total_len) {
/* read the rest of the mqtt message */
read_len = esp_transport_read(t, (char *)buf, total_len - client->mqtt_state.in_buffer_read_len, read_poll_timeout_ms);
ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len);
if (read_len < 0) {
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
goto err;
}
if (read_len == 0) {
ESP_LOGD(TAG, "%s: transport_read(): no data or EOF", __func__);
return 0;
}
client->mqtt_state.in_buffer_read_len += read_len;
if (client->mqtt_state.in_buffer_read_len < total_len) {
ESP_LOGD(TAG, "%s: transport_read(): message reading left in progress :: total message length: %d (already read: %zu)",
__func__, total_len, client->mqtt_state.in_buffer_read_len);
return 0;
}
}
ESP_LOGD(TAG, "%s: transport_read():%zu %zu", __func__, client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
return 1;
err:
esp_mqtt_client_dispatch_transport_error(client);
return -1;
}
static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
{
uint8_t msg_type;
uint8_t msg_qos;
uint16_t msg_id;
/* non-blocking receive in order not to block other tasks */
int recv = mqtt_message_receive(client, 0);
if (recv < 0) {
ESP_LOGE(TAG, "%s: mqtt_message_receive() returned %d", __func__, recv);
return ESP_FAIL;
}
if (recv == 0) {
return ESP_OK;
}
int read_len = client->mqtt_state.message_length;
// If the message was valid, get the type, quality of service and id of the message
msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
msg_id = mqtt_get_id(client->mqtt_state.in_buffer, read_len);
ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id);
switch (msg_type) {
case MQTT_MSG_TYPE_SUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
ESP_LOGD(TAG, "deliver_suback, message_length_read=%zu, message_length=%zu", client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
if (deliver_suback(client) != ESP_OK) {
ESP_LOGE(TAG, "Failed to deliver suback message id=%d", msg_id);
return ESP_FAIL;
}
}
break;
case MQTT_MSG_TYPE_UNSUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
ESP_LOGD(TAG, "UnSubscribe successful");
client->event.event_id = MQTT_EVENT_UNSUBSCRIBED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_PUBLISH:
ESP_LOGD(TAG, "deliver_publish, message_length_read=%zu, message_length=%zu", client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
if (deliver_publish(client) != ESP_OK) {
ESP_LOGE(TAG, "Failed to deliver publish message id=%d", msg_id);
return ESP_FAIL;
}
if (msg_qos == 1) {
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
} else if (msg_qos == 2) {
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
}
if (client->mqtt_state.outbound_message->length == 0) {
ESP_LOGE(TAG, "Publish response message PUBACK or PUBREC cannot be created");
return ESP_FAIL;
}
if (msg_qos == 1 || msg_qos == 2) {
ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos);
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos);
return ESP_FAIL;
}
}
break;
case MQTT_MSG_TYPE_PUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_PUBREC:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
if (client->mqtt_state.outbound_message->length == 0) {
ESP_LOGE(TAG, "Publish response message PUBREL cannot be created");
return ESP_FAIL;
}
outbox_set_pending(client->outbox, msg_id, ACKNOWLEDGED);
mqtt_write_data(client);
break;
case MQTT_MSG_TYPE_PUBREL:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
if (client->mqtt_state.outbound_message->length == 0) {
ESP_LOGE(TAG, "Publish response message PUBCOMP cannot be created");
return ESP_FAIL;
}
mqtt_write_data(client);
break;
case MQTT_MSG_TYPE_PUBCOMP:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_PINGRESP:
ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP");
client->wait_for_ping_resp = false;
break;
}
client->mqtt_state.in_buffer_read_len = 0;
return ESP_OK;
}
static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item_handle_t item)
{
// decode queued data
client->mqtt_state.outbound_message->data = outbox_item_get_data(item, &client->mqtt_state.outbound_message->length, &client->mqtt_state.pending_msg_id,
&client->mqtt_state.pending_msg_type, &client->mqtt_state.pending_publish_qos);
// set duplicate flag for QoS-1 and QoS-2 messages
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos > 0) {
mqtt_set_dup(client->mqtt_state.outbound_message->data);
}
// try to resend the data
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to resend data ");
esp_mqtt_abort_connection(client);
return ESP_FAIL;
}
// check if it was QoS-0 publish message
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos == 0) {
// delete all qos0 publish messages once we process them
if (outbox_delete_item(client->outbox, item) != ESP_OK) {
ESP_LOGE(TAG, "Failed to remove queued qos0 message from the outbox");
}
}
return ESP_OK;
}
static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client)
{
// Delete message after OUTBOX_EXPIRED_TIMEOUT_MS milliseconds
#if MQTT_REPORT_DELETED_MESSAGES
// also report the deleted items as MQTT_EVENT_DELETED events if enabled
int deleted_items = 0;
int msg_id = 0;
while ((msg_id = outbox_delete_single_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS)) > 0) {
client->event.event_id = MQTT_EVENT_DELETED;
client->event.msg_id = msg_id;
if (esp_mqtt_dispatch_event(client) != ESP_OK) {
ESP_LOGE(TAG, "Failed to post event on deleting message id=%d", msg_id);
}
deleted_items ++;
}
#else
int deleted_items = outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS);
#endif
client->mqtt_state.pending_msg_count -= deleted_items;
if (client->mqtt_state.pending_msg_count < 0) {
client->mqtt_state.pending_msg_count = 0;
}
}
static void esp_mqtt_task(void *pv)
{
esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv;
uint64_t last_retransmit = 0;
outbox_tick_t msg_tick = 0;
client->run = true;
//get transport by scheme
client->transport = esp_transport_list_get_transport(client->transport_list, client->config->scheme);
if (client->transport == NULL) {
ESP_LOGE(TAG, "There are no transports valid, stop mqtt client, config scheme = %s", client->config->scheme);
client->run = false;
}
//default port
if (client->config->port == 0) {
client->config->port = esp_transport_get_default_port(client->transport);
}
client->state = MQTT_STATE_INIT;
xEventGroupClearBits(client->status_bits, STOPPED_BIT);
while (client->run) {
MQTT_API_LOCK(client);
switch (client->state) {
case MQTT_STATE_DISCONNECTED:
break;
case MQTT_STATE_INIT:
xEventGroupClearBits(client->status_bits, RECONNECT_BIT | DISCONNECT_BIT);
client->event.event_id = MQTT_EVENT_BEFORE_CONNECT;
esp_mqtt_dispatch_event_with_msgid(client);
if (client->transport == NULL) {
ESP_LOGE(TAG, "There is no transport");
client->run = false;
}
#if MQTT_ENABLE_SSL
esp_mqtt_set_ssl_transport_properties(client->transport_list, client->config);
#endif
if (esp_transport_connect(client->transport,
client->config->host,
client->config->port,
client->config->network_timeout_ms) < 0) {
ESP_LOGE(TAG, "Error transport connect");
esp_mqtt_client_dispatch_transport_error(client);
esp_mqtt_abort_connection(client);
break;
}
ESP_LOGD(TAG, "Transport connected to %s://%s:%d", client->config->scheme, client->config->host, client->config->port);
if (esp_mqtt_connect(client, client->config->network_timeout_ms) != ESP_OK) {
ESP_LOGE(TAG, "MQTT connect failed");
esp_mqtt_abort_connection(client);
break;
}
client->event.event_id = MQTT_EVENT_CONNECTED;
client->event.session_present = mqtt_get_connect_session_present(client->mqtt_state.in_buffer);
client->state = MQTT_STATE_CONNECTED;
esp_mqtt_dispatch_event_with_msgid(client);
client->refresh_connection_tick = platform_tick_get_ms();
break;
case MQTT_STATE_CONNECTED:
// check for disconnection request
if (xEventGroupWaitBits(client->status_bits, DISCONNECT_BIT, true, true, 0) & DISCONNECT_BIT) {
send_disconnect_msg(client); // ignore error, if clean disconnect fails, just abort the connection
esp_mqtt_abort_connection(client);
break;
}
// receive and process data
if (mqtt_process_receive(client) == ESP_FAIL) {
esp_mqtt_abort_connection(client);
break;
}
// delete long pending messages
mqtt_delete_expired_messages(client);
// resend all non-transmitted messages first
outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL);
if (item) {
if (mqtt_resend_queued(client, item) == ESP_OK) {
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);
}
// resend other "transmitted" messages after 1s
} else if (platform_tick_get_ms() - last_retransmit > client->config->message_retransmit_timeout) {
last_retransmit = platform_tick_get_ms();
item = outbox_dequeue(client->outbox, TRANSMITTED, &msg_tick);
if (item && (last_retransmit - msg_tick > client->config->message_retransmit_timeout)) {
mqtt_resend_queued(client, item);
}
}
if (client->connect_info.keepalive && // connect_info.keepalive=0 means that the keepslive is disabled
platform_tick_get_ms() - client->keepalive_tick > client->connect_info.keepalive * 1000 / 2) {
//No ping resp from last ping => Disconnected
if (client->wait_for_ping_resp) {
ESP_LOGE(TAG, "No PING_RESP, disconnected");
esp_mqtt_abort_connection(client);
client->wait_for_ping_resp = false;
break;
}
if (esp_mqtt_client_ping(client) == ESP_FAIL) {
ESP_LOGE(TAG, "Can't send ping, disconnected");
esp_mqtt_abort_connection(client);
break;
} else {
client->wait_for_ping_resp = true;
}
ESP_LOGD(TAG, "PING sent");
}
if (client->config->refresh_connection_after_ms &&
platform_tick_get_ms() - client->refresh_connection_tick > client->config->refresh_connection_after_ms) {
ESP_LOGD(TAG, "Refreshing the connection...");
esp_mqtt_abort_connection(client);
client->state = MQTT_STATE_INIT;
}
break;
case MQTT_STATE_WAIT_RECONNECT:
if (!client->config->auto_reconnect && xEventGroupGetBits(client->status_bits)&RECONNECT_BIT) {
xEventGroupClearBits(client->status_bits, RECONNECT_BIT);
client->state = MQTT_STATE_INIT;
client->wait_timeout_ms = MQTT_RECON_DEFAULT_MS;
ESP_LOGD(TAG, "Reconnecting per user request...");
break;
} else if (client->config->auto_reconnect &&
platform_tick_get_ms() - client->reconnect_tick > client->wait_timeout_ms) {
client->state = MQTT_STATE_INIT;
client->reconnect_tick = platform_tick_get_ms();
ESP_LOGD(TAG, "Reconnecting...");
break;
}
MQTT_API_UNLOCK(client);
xEventGroupWaitBits(client->status_bits, RECONNECT_BIT, false, true,
client->wait_timeout_ms / 2 / portTICK_RATE_MS);
// continue the while loop instead of break, as the mutex is unlocked
continue;
default:
ESP_LOGE(TAG, "MQTT client error, client is in an unrecoverable state.");
break;
}
MQTT_API_UNLOCK(client);
if (MQTT_STATE_CONNECTED == client->state) {
if (esp_transport_poll_read(client->transport, MQTT_POLL_READ_TIMEOUT_MS) < 0) {
ESP_LOGE(TAG, "Poll read error: %d, aborting connection", errno);
esp_mqtt_abort_connection(client);
}
}
}
esp_transport_close(client->transport);
outbox_delete_all_items(client->outbox);
xEventGroupSetBits(client->status_bits, STOPPED_BIT);
vTaskDelete(NULL);
}
esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return ESP_ERR_INVALID_ARG;
}
MQTT_API_LOCK(client);
if (client->state != MQTT_STATE_INIT && client->state != MQTT_STATE_DISCONNECTED) {
ESP_LOGE(TAG, "Client has started");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
}
esp_err_t err = ESP_OK;
#if MQTT_CORE_SELECTION_ENABLED
ESP_LOGD(TAG, "Core selection enabled on %u", MQTT_TASK_CORE);
if (xTaskCreatePinnedToCore(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle, MQTT_TASK_CORE) != pdTRUE) {
ESP_LOGE(TAG, "Error create mqtt task");
err = ESP_FAIL;
}
#else
ESP_LOGD(TAG, "Core selection disabled");
if (xTaskCreate(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle) != pdTRUE) {
ESP_LOGE(TAG, "Error create mqtt task");
err = ESP_FAIL;
}
#endif
MQTT_API_UNLOCK(client);
return err;
}
esp_err_t esp_mqtt_client_disconnect(esp_mqtt_client_handle_t client)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return ESP_ERR_INVALID_ARG;
}
ESP_LOGI(TAG, "Client asked to disconnect");
xEventGroupSetBits(client->status_bits, DISCONNECT_BIT);
return ESP_OK;
}
esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return ESP_ERR_INVALID_ARG;
}
ESP_LOGI(TAG, "Client force reconnect requested");
if (client->state != MQTT_STATE_WAIT_RECONNECT) {
ESP_LOGD(TAG, "The client is not waiting for reconnection. Ignore the request");
return ESP_FAIL;
}
client->wait_timeout_ms = 0;
xEventGroupSetBits(client->status_bits, RECONNECT_BIT);
return ESP_OK;
}
static esp_err_t send_disconnect_msg(esp_mqtt_client_handle_t client)
{
// Notify the broker we are disconnecting
client->mqtt_state.outbound_message = mqtt_msg_disconnect(&client->mqtt_state.mqtt_connection);
if (client->mqtt_state.outbound_message->length == 0) {
ESP_LOGE(TAG, "Disconnect message cannot be created");
return ESP_FAIL;
}
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error sending disconnect message");
}
return ESP_OK;
}
esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return ESP_ERR_INVALID_ARG;
}
MQTT_API_LOCK(client);
if (client->run) {
/* A running client cannot be stopped from the MQTT task/event handler */
TaskHandle_t running_task = xTaskGetCurrentTaskHandle();
if (running_task == client->task_handle) {
MQTT_API_UNLOCK(client);
ESP_LOGE(TAG, "Client cannot be stopped from MQTT task");
return ESP_FAIL;
}
// Only send the disconnect message if the client is connected
if (client->state == MQTT_STATE_CONNECTED) {
if (send_disconnect_msg(client) != ESP_OK) {
MQTT_API_UNLOCK(client);
return ESP_FAIL;
}
}
client->run = false;
client->state = MQTT_STATE_DISCONNECTED;
MQTT_API_UNLOCK(client);
xEventGroupWaitBits(client->status_bits, STOPPED_BIT, false, true, portMAX_DELAY);
return ESP_OK;
} else {
ESP_LOGW(TAG, "Client asked to stop, but was not started");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
}
}
static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client)
{
client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection);
if (client->mqtt_state.outbound_message->length == 0) {
ESP_LOGE(TAG, "Ping message cannot be created");
return ESP_FAIL;
}
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error sending ping");
return ESP_FAIL;
}
ESP_LOGD(TAG, "Sent PING successful");
return ESP_OK;
}
int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic, int qos)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return -1;
}
MQTT_API_LOCK(client);
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGE(TAG, "Client has not connected");
MQTT_API_UNLOCK(client);
return -1;
}
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
topic, qos,
&client->mqtt_state.pending_msg_id);
if (client->mqtt_state.outbound_message->length == 0) {
ESP_LOGE(TAG, "Subscribe message cannot be created");
MQTT_API_UNLOCK(client);
return -1;
}
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_count ++;
//move pending msg to outbox (if have)
if (!mqtt_enqueue(client)) {
MQTT_API_UNLOCK(client);
return -1;
}
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);// handle error
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos);
MQTT_API_UNLOCK(client);
return -1;
}
ESP_LOGD(TAG, "Sent subscribe topic=%s, id: %d, type=%d successful", topic, client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
MQTT_API_UNLOCK(client);
return client->mqtt_state.pending_msg_id;
}
int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return -1;
}
MQTT_API_LOCK(client);
if (client->state != MQTT_STATE_CONNECTED) {
MQTT_API_UNLOCK(client);
ESP_LOGE(TAG, "Client has not connected");
return -1;
}
client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
topic,
&client->mqtt_state.pending_msg_id);
if (client->mqtt_state.outbound_message->length == 0) {
MQTT_API_UNLOCK(client);
ESP_LOGE(TAG, "Unubscribe message cannot be created");
return -1;
}
ESP_LOGD(TAG, "unsubscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id);
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_count ++;
if (!mqtt_enqueue(client)) {
MQTT_API_UNLOCK(client);
return -1;
}
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); //handle error
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic);
MQTT_API_UNLOCK(client);
return -1;
}
ESP_LOGD(TAG, "Sent Unsubscribe topic=%s, id: %d, successful", topic, client->mqtt_state.pending_msg_id);
MQTT_API_UNLOCK(client);
return client->mqtt_state.pending_msg_id;
}
static inline int mqtt_client_enqueue_priv(esp_mqtt_client_handle_t client, const char *topic, const char *data,
int len, int qos, int retain, bool store)
{
uint16_t pending_msg_id = 0;
/* Acceptable publish messages:
data == NULL, len == 0: publish null message
data valid, len == 0: publish all data, payload len is determined from string length
data valid, len > 0: publish data with defined length
*/
if (len <= 0 && data != NULL) {
len = strlen(data);
}
mqtt_message_t *publish_msg = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
topic, data, len,
qos, retain,
&pending_msg_id);
if (publish_msg->length == 0) {
ESP_LOGE(TAG, "Publish message cannot be created");
return -1;
}
/* We have to set as pending all the qos>0 messages */
client->mqtt_state.outbound_message = publish_msg;
if (qos > 0 || store) {
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_id = pending_msg_id;
client->mqtt_state.pending_publish_qos = qos;
client->mqtt_state.pending_msg_count ++;
// by default store as QUEUED (not transmitted yet) only for messages which would fit outbound buffer
if (client->mqtt_state.mqtt_connection.message.fragmented_msg_total_length == 0) {
if (!mqtt_enqueue(client)) {
return -1;
}
} else {
int first_fragment = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
if (!mqtt_enqueue_oversized(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) {
return -1;
}
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
}
}
return pending_msg_id;
}
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return -1;
}
MQTT_API_LOCK(client);
#if MQTT_SKIP_PUBLISH_IF_DISCONNECTED
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGI(TAG, "Publishing skipped: client is not connected");
MQTT_API_UNLOCK(client);
return -1;
}
#endif
int pending_msg_id = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain, false);
if (pending_msg_id < 0) {
MQTT_API_UNLOCK(client);
return -1;
}
int ret = 0;
/* Skip sending if not connected (rely on resending) */
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGD(TAG, "Publish: client is not connected");
if (qos > 0) {
ret = pending_msg_id;
}
// delete long pending messages
mqtt_delete_expired_messages(client);
goto cannot_publish;
}
/* Provide support for sending fragmented message if it doesn't fit buffer */
int remaining_len = len;
const char *current_data = data;
bool sending = true;
while (sending) {
if (mqtt_write_data(client) != ESP_OK) {
esp_mqtt_abort_connection(client);
ret = -1;
goto cannot_publish;
}
int data_sent = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
client->mqtt_state.outbound_message->fragmented_msg_data_offset = 0;
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
remaining_len -= data_sent;
current_data += data_sent;
if (remaining_len > 0) {
mqtt_connection_t *connection = &client->mqtt_state.mqtt_connection;
ESP_LOGD(TAG, "Sending fragmented message, remains to send %d bytes of %d", remaining_len, len);
if (remaining_len > connection->buffer_length) {
// Continue with sending
memcpy(connection->buffer, current_data, connection->buffer_length);
connection->message.length = connection->buffer_length;
sending = true;
} else {
memcpy(connection->buffer, current_data, remaining_len);
connection->message.length = remaining_len;
sending = true;
}
connection->message.data = connection->buffer;
client->mqtt_state.outbound_message = &connection->message;
} else {
// Message was sent correctly
sending = false;
}
}
if (qos > 0) {
//Tick is set after transmit to avoid retransmitting too early due slow network speed / big messages
outbox_set_tick(client->outbox, pending_msg_id, platform_tick_get_ms());
outbox_set_pending(client->outbox, pending_msg_id, TRANSMITTED);
}
MQTT_API_UNLOCK(client);
return pending_msg_id;
cannot_publish:
// clear out possible fragmented publish if failed or skipped
client->mqtt_state.outbound_message->fragmented_msg_total_length = 0;
if (qos == 0) {
ESP_LOGW(TAG, "Publish: Losing qos0 data when client not connected");
}
MQTT_API_UNLOCK(client);
return ret;
}
int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain, bool store)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return -1;
}
MQTT_API_LOCK(client);
int ret = mqtt_client_enqueue_priv(client, topic, data, len, qos, retain, store);
MQTT_API_UNLOCK(client);
if (ret == 0 && store == false) {
// messages with qos=0 are not enqueued if not overridden by store_in_outobx -> indicate as error
return -1;
}
return ret;
}
esp_err_t esp_mqtt_client_register_event(esp_mqtt_client_handle_t client, esp_mqtt_event_id_t event, esp_event_handler_t event_handler, void *event_handler_arg)
{
if (client == NULL) {
return ESP_ERR_INVALID_ARG;
}
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
if (client->config->event_handle) {
ESP_LOGW(TAG, "Registering event loop while event callback is not null, clearing callback");
client->config->event_handle = NULL;
}
return esp_event_handler_register_with(client->config->event_loop_handle, MQTT_EVENTS, event, event_handler, event_handler_arg);
#else
ESP_LOGE(TAG, "Registering event handler while event loop not available in IDF version %s", IDF_VER);
return ESP_FAIL;
#endif
}
static void esp_mqtt_client_dispatch_transport_error(esp_mqtt_client_handle_t client)
{
client->event.event_id = MQTT_EVENT_ERROR;
client->event.error_handle->error_type = MQTT_ERROR_TYPE_TCP_TRANSPORT;
client->event.error_handle->connect_return_code = 0;
#ifdef MQTT_SUPPORTED_FEATURE_TRANSPORT_ERR_REPORTING
client->event.error_handle->esp_tls_last_esp_err = esp_tls_get_and_clear_last_error(esp_transport_get_error_handle(client->transport),
&client->event.error_handle->esp_tls_stack_err,
&client->event.error_handle->esp_tls_cert_verify_flags);
#ifdef MQTT_SUPPORTED_FEATURE_TRANSPORT_SOCK_ERRNO_REPORTING
client->event.error_handle->esp_transport_sock_errno = esp_transport_get_errno(client->transport);
#endif
#endif
esp_mqtt_dispatch_event_with_msgid(client);
}
int esp_mqtt_client_get_outbox_size(esp_mqtt_client_handle_t client)
{
int outbox_size = 0;
if (client == NULL) {
return 0;
}
MQTT_API_LOCK(client);
if (client->outbox) {
outbox_size = outbox_get_size(client->outbox);
}
MQTT_API_UNLOCK(client);
return outbox_size;
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/sharbang127/esp-mqtt.git
[email protected]:sharbang127/esp-mqtt.git
sharbang127
esp-mqtt
esp-mqtt
master

搜索帮助