2 Star 0 Fork 0

于佳耕/AIoTDemo

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
process_test.c 9.15 KB
一键复制 编辑 原始数据 按行查看 历史
zq 提交于 2021-09-15 17:42 . 0915
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <net/if.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <pthread.h>
#include <fcntl.h>
#include "ucom_process.h"
#include "common.h"
#define TERM_X_PORT 57000
#define TERM_Y_PORT 57001
#define SERVER_PORT 8888
lockFreeQueue_t cntocm_queue[4]; //定义连接到命令的消息队列
lockFreeQueue_t ser_queue[3]; //定义命令到业务的消息队列,ser_queue[0]是发往softap的队列
lockFreeQueue_t cmtocn_queue; //定义命令到连接的消息队列
#define SIZE 8
//#define PRINT
typedef struct cmd_to_async
{
void (* method)(struct cmd_to_async *);
uint16_t udp_port;
ucom_async_id_t async_id;
ucom_info_to_async_t info_to_async;
} cmd_to_async_t;
typedef struct cmd
{
struct cmd *next;
cmd_to_async_t to_async;
} cmd_t;
cmd_t *cmd_list = NULL;
ucom_async_id_t global_async_id = 1;
int server_fd = -1;
static int sdk_ev_set_io_nonblock(int fd)
{
int flags = 0;
if ((flags=fcntl(fd, F_GETFL, 0)) < 0) {
return (-1);
} else if (fcntl(fd, F_SETFL, flags | O_NONBLOCK)) {
return (-1);
}
return (0);
}
static int udp_create(uint16_t udp_port)
{
int fd, ret;
struct sockaddr_in addr;
int opt;
/*create udp sock*/
fd = socket(AF_INET, SOCK_DGRAM, 0);
if (fd < 0)
{
printf("socket fail %d\n", errno);
return (-1);
}
opt = 1;
ret = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&opt, sizeof(opt));
if (ret < 0)
{
printf("SO_REUSEADDR fail %d\n", errno);
goto fail;
}
/*bind*/
memset(&addr, 0, sizeof(addr));
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(udp_port);
addr.sin_family = AF_INET;
ret = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
if (ret < 0)
{
printf("bind fail %d\n", errno);
goto fail;
}
return (fd);
fail:
close(fd);
return (-1);
}
static void connect_process(uint16_t port)
{
uint8_t buf[BUFF_LEN];
fd_set read_fd_set;
int ret;
int max_fd;
struct timeval timeout;
command_msg_t *in_msg;
socklen_t from_len;
struct sockaddr_in from ;
uint8_t cmd;
uint32_t size;
int i ;
from_len = sizeof(from);
printf("enter %s\n", __func__);
/*create udp sock*/
server_fd = udp_create(port);
if (server_fd < 0)
{
printf("socket fail %d\n", errno);
return ;
}
sdk_ev_set_io_nonblock(server_fd);
max_fd = server_fd;
max_fd += 1;
memset(&in_msg,0,sizeof(in_msg));
for (;;)
{
FD_ZERO(&read_fd_set);
FD_SET(server_fd, &read_fd_set);
timeout.tv_sec = 20;
timeout.tv_usec = 0;
ret = select(max_fd, &read_fd_set, NULL, NULL, &timeout);
if (ret < 0)
{
printf("select error %d\n", errno);
break;
}
if (ret == 0)
{
printf("select timeout\n");
continue;
}
//receive
if (FD_ISSET(server_fd, &read_fd_set))
{
ret = recvfrom(server_fd, buf, BUFF_LEN, 0, (struct sockaddr *)&from, &from_len);
if (ret < 0)
printf("receive err\n");
else if (ret == 0)
printf("receive null\n");
else
{
cmd = ((command_msg_t*)buf)->command;
size = ((command_msg_t*)buf)->size;
printf("connect receive cmd %d size %d\n",cmd,size);
in_msg = (command_msg_t *)shared_memory_malloc(&(cntocm_queue[cmd].mem));
if(in_msg == NULL)
{
printf("alloc err\n");
return;
}
in_msg->command = cmd;
in_msg->size = size;
in_msg->data = (uint8_t*)in_msg + sizeof(command_msg_t);
memcpy(in_msg->data, buf+sizeof(command_msg_t),size);
#ifdef PRINT
for(i = 0 ; i < size ;i++)
printf("data %d\t",(in_msg->data)[i]);
printf("\n");
#endif
if(!push_lock_free_queue(&(cntocm_queue[cmd]),in_msg))
printf("connect push command data cg\n");
}
}
//...
}
}
static void command_process(void )
{
uint8_t *buf;
command_msg_t *out_msg,*in_msg;
uint8_t i,j;
printf("enter %s\n", __func__);
while (1)
{
//pop queue
for( i = 0 ; i < 4; i++)
{
if(out_msg = (command_msg_t *)pop_lock_free_queue(&(cntocm_queue[i])))
{
printf("command pop cmd %d size %d\n",out_msg->command,out_msg->size);
buf = (uint8_t*)out_msg +sizeof(command_msg_t) ;
#ifdef PRINT
for(j = 0 ; j < out_msg->size ;j++)
{
printf("out data %d\t",buf[j]);
}
printf("\n");
#endif
//'a' indicate soft ap command
if((out_msg->data)[0] == 'a')
{
in_msg = (command_msg_t *)shared_memory_malloc(&(ser_queue[0].mem));
if(in_msg == NULL)
{
printf("alloc err\n");
return;
}
in_msg->command = out_msg->command;
in_msg->size = out_msg->size;
in_msg->data = (uint8_t*)in_msg + sizeof(command_msg_t);
memcpy(in_msg->data, buf,out_msg->size);
#ifdef PRINT
for(j = 0 ; j < out_msg->size ;j++)
{
printf("in data %d\t",in_msg->data[j]);
}
printf("\n");
#endif
if(!push_lock_free_queue(&(ser_queue[0]),in_msg))
printf("command push softap data cg\n");
}
}
}
}
}
static int softap_service_process(void)
{
ucom_async_id_t async_id;
ucom_info_to_async_t info_to_async;
memset(&info_to_async, 0, sizeof(info_to_async));
info_to_async.cb = NULL;
printf("enter %s\n", __func__);
//协程初始化
ucom_process_init(NULL);
for (;;)
{
UCOM_PROCESS_CALL(my_example, &async_id, &info_to_async );
}
ucom_process_deinit();
return (0);
}
int main(int argc, char * argv[])
{
#if 1
int ret, i;
pthread_t thread_cd,thread_ct, thread_process;
for( i = 0 ; i < 4 ;i++)
init_lock_free_queue(&cntocm_queue[i], sizeof(command_msg_t)+SIZE);
for( i = 0 ; i < 3 ;i++)
init_lock_free_queue(&ser_queue[i], sizeof(command_msg_t)+SIZE);
init_lock_free_queue(&cmtocn_queue, sizeof(command_msg_t)+SIZE);
ret = pthread_create(&thread_ct, NULL, (void * (*)(void *))connect_process, (void *)SERVER_PORT);
if (ret != 0)
return (-1);
ret = pthread_create(&thread_cd, NULL, (void * (*)(void *))command_process, NULL);
if (ret != 0)
return (-1);
sleep(1);
ret = pthread_create(&thread_process, NULL, (void * (*)(void *))softap_service_process, NULL);
if (ret != 0)
return (-1);
while (1)
sleep(1);
#endif
return (0);
}
ucom_process_error_t ucom_process_softap_cmd(ucom_async_id_t *async_id, ucom_info_to_async_t *info_to_async )
{
uint8_t *buf;
command_msg_t *out_msg;
int j ;
uint8_t cmd;
if(out_msg = (command_msg_t *)pop_lock_free_queue(&(ser_queue[0])))
{
printf("softap pop cmd %d size %d\n",out_msg->command,out_msg->size);
buf = (uint8_t*)out_msg +sizeof(command_msg_t) ;
#ifdef PRINT
for(j = 0 ; j < out_msg->size ;j++)
{
printf("ap out data %d\t",buf[j]);
}
printf("\n");
#endif
cmd = buf[1];
switch(cmd)
{
case COM_GET_SESSION:
printf("softap get session\n");
break;
case COM_GET_PARA:
printf("softap get para \n");
break;
case COM_SET_PARA:
printf("softap get para \n");
break;
}
printf("\n\n");
}
}
static void send_cmd_to_term_internal(cmd_to_async_t *to_async)
{
}
static ucom_process_error_t send_cmd_to_term(
ucom_async_id_t *async_id, ucom_info_to_async_t *info_to_async, uint16_t udp_port)
{
return (0);
}
ucom_process_error_t send_cmd_0_to_term_x(ucom_async_id_t *async_id, ucom_info_to_async_t *info_to_async)
{
printf("enter %s\n", __func__);
return send_cmd_to_term(async_id, info_to_async, TERM_X_PORT);
}
ucom_process_error_t send_cmd_1_to_term_x(ucom_async_id_t *async_id, ucom_info_to_async_t *info_to_async)
{
printf("enter %s\n", __func__);
return send_cmd_to_term(async_id, info_to_async, TERM_X_PORT);
}
ucom_process_error_t send_cmd_1_to_term_y(ucom_async_id_t *async_id, ucom_info_to_async_t *info_to_async)
{
printf("enter %s\n", __func__);
return send_cmd_to_term(async_id, info_to_async, TERM_Y_PORT);
}
void cancel_cmd_to_term_x(ucom_async_id_t async_id)
{
printf("enter %s\n", __func__);
}
void cancel_cmd_to_term_y(ucom_async_id_t async_id)
{
printf("enter %s\n", __func__);
}
void build_cmd_0_to_term_x(ucom_async_data_t *async_data)
{
printf("enter %s\n", __func__);
}
void build_cmd_1_to_term_x(ucom_async_data_t *async_data)
{
printf("enter %s\n", __func__);
}
void build_cmd_1_to_term_y(ucom_async_data_t *async_data)
{
async_data->data = calloc(1, 16);
if (async_data->data)
{
strcpy(async_data->data, "cmd_1_to_term_y");
async_data->free = free;
}
else
{
async_data->free = NULL;
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/yu_jia_geng/aiotdemo.git
[email protected]:yu_jia_geng/aiotdemo.git
yu_jia_geng
aiotdemo
AIoTDemo
master

搜索帮助