代码拉取完成,页面将自动刷新
同步操作将从 src-openEuler/gazelle 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
From 814b66143605ad409be0f8aace468386f4fd891e Mon Sep 17 00:00:00 2001
From: wu-changsheng <[email protected]>
Date: Mon, 5 Sep 2022 15:39:12 +0800
Subject: [PATCH] sync examples code
---
examples/CMakeLists.txt | 2 +-
examples/README.md | 178 +++++++++++-
examples/inc/bussiness.h | 122 +++++++++
examples/inc/client.h | 121 ++++++++
examples/inc/parameter.h | 11 +-
examples/inc/server.h | 231 ++++++++++++++++
examples/inc/utilities.h | 82 +++++-
examples/main.c | 8 +
examples/src/bussiness.c | 234 ++++++++++++++++
examples/src/client.c | 387 ++++++++++++++++++++++++++
examples/src/parameter.c | 214 ++++++++-------
examples/src/server.c | 578 +++++++++++++++++++++++++++++++++++++++
examples/src/utilities.c | 128 +++++++++
13 files changed, 2185 insertions(+), 111 deletions(-)
create mode 100644 examples/inc/bussiness.h
create mode 100644 examples/inc/client.h
create mode 100644 examples/inc/server.h
create mode 100644 examples/src/bussiness.c
create mode 100644 examples/src/client.c
create mode 100644 examples/src/server.c
create mode 100644 examples/src/utilities.c
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt
index b1c2b07..c38e6cb 100644
--- a/examples/CMakeLists.txt
+++ b/examples/CMakeLists.txt
@@ -18,7 +18,7 @@ project(${PROJECT_NAME})
message(STATUS "PROJECT_SOURCE_DIR: " ${PROJECT_SOURCE_DIR})
message(STATUS "PROJECT_BINARY_DIR: " ${PROJECT_BINARY_DIR})
-set(CMAKE_C_FLAGS "-O2 -g -fstack-protector-strong -Wall -Werror -pthread")
+set(CMAKE_C_FLAGS "-O2 -g -fstack-protector-strong -Wall -Werror -pthread -lboundscheck")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D EXAMPLE_COMPILE")
include_directories(${PROJECT_SOURCE_DIR}/inc)
diff --git a/examples/README.md b/examples/README.md
index 6f82bb2..4e165a4 100644
--- a/examples/README.md
+++ b/examples/README.md
@@ -1,4 +1,6 @@
-# gazzle 示例程序
+# gazzlle 示例程序
+
+## 功能
* 支持 TCP 、 unix 非阻塞通讯。
* 支持多线程网络 IO 复用模型,线程之间相互独立。TCP 的 `listen` 、`epoll` 、`read` 、`write` 、`connect` 等接口都在同一线程内。`connect` 连接数可配。
@@ -9,9 +11,93 @@
## 网络模型
* **单线程非阻塞**:采用同步非阻塞 IO 模型,在单线程中采用非阻塞的方式监听并发起 IO 请求,当内核中数据到达后读取数据、执行业务逻辑并发送。
+
+```
+ 单线程非阻塞模型
+ |
+ 创建套接字并监听
+ | <-------+
+ 读取数据 |
+ | |
+ 业务逻辑 |
+ | |
+ 发送数据 |
+ | |
+ +---------+
+```
+
* **多线程非阻塞IO复用**:基于 `epoll` 实现多线程非阻塞 IO 模型。每个线程之间互不干涉。通过 `epoll` 监控多个当前线程负责的 fd ,当任何一个数据状态准备就绪时,返回并执行读写操作和对应的业务逻辑。
+
+```
+ 多线程非阻塞IO复用模型
+ |
+ 创建套接字并监听
+ |
+ 创建若干个线程
+ |
+ +------------+------------+------------+------------+
+ | | | | |
+ 创建套接字并监听 ... ... 创建套接字并监听 ...
+ | |
+ 线程内部初始化 线程内部初始化
+ epoll,并注册 epoll,并注册
+ 套接字监听事件 套接字监听事件
+ | <---------+ | <----------+
+ +-----+-----+ | +-----+-----+ |
+ | | | | | |
+ (新连接) (新报文) | (新连接) (新报文) |
+ 建连并注 读取数据 | 建连并注 读取数据 |
+ 册新连接 业务逻辑 | 册新连接 业务逻辑 |
+ 监听事件 发送数据 | 监听事件 发送数据 |
+ | | | | | |
+ +-----+-----+ | +-----+-----+ |
+ | | | |
+ +-----------+ +-----------+
+```
+
* **多线程非阻塞非对称**:采用基于 `epoll` 的单线程多路 IO 复用监听连接事件,并采用多线程的方式完成后续读写监听业务。 server 在启动监听之前,开辟一定数量的线程,用线程池管理。主线程创建监听 `fd` 之后,采用多路 IO 复用机制 (`epoll`) 进行 IO 状态监控。当监听到客户端的连接请求时,建立连接并将相关 `fd` 分发给线程池的某个线程进行监听。线程池中的每个线程都采用多路 IO 复用机制 (`epoll`) ,用来监听主线程中建立成功并分发下来的 `socket` 。
+```
+ 多线程非阻塞非对称模型 +------------------------+
+ | | |
+ 创建监听线程 | +-------------+--- ... -----+
+ | | | | |
+ 创建套接字,初始化 | 初始化epoll ... ... 初始化epoll
+ eopll并且并注册套 | 并注册事件 并注册事件
+ 接字监听事件 | | <-- + | <-- +
+ | | 读取数据 | 读取数据 |
+ 当有新连接时,创建工作线程 | 业务逻辑 | 业务逻辑 |
+ | | 发送数据 | 发送数据 |
+ +----------------+ | | | |
+ +-----+ +-----+
+```
+
+* **客户端**:创建若干线程,每个线程创建若干 `socket` 与客户端建立连接,并使用 `epoll` 进行状态监控,建连后向服务端发送数据并等待服务端数据传回,当接受到服务端传回数据后进行校验,校验无误再次发送数据。
+
+```
+ 多线程非阻塞IO复用模型
+ |
+ 创建若干个线程
+ +------------+------------+------------+------------+
+ | | | | |
+ 线程内部初始化 线程内部初始化
+ epoll ... ... epoll ...
+ | |
+ 依次创建套接字, 依次创建套接字,
+ 建连并注册事件 建连并注册事件
+ | <---------+ | <---------+
+ 发送数据 | 发送数据 |
+ 接收数据并校验 | 接收数据并校验 |
+ | | | |
+ +------------+ | +------------+ |
+ | | | | | |
+ 成功 失败 | 成功 失败 |
+ | | | | | |
+ 发送数据 终止 | 发送数据 终止 |
+ | | | |
+ +-----------------+ +-----------------+
+```
+
## 程序接口
* `-a, --as [server | client]`:作为服务端还是客户端。
@@ -24,21 +110,103 @@
* `mud (multi thread, unblock, dissymmetric)`:多线程非阻塞非对称。
* `-t, --threadnum`:线程数设置。
* `-c, --connectnum`:连接数设置。
-* `-A, --api [unix | posix]`:内部实现的接口类型。
- * `unix`:基于 unix 接口实现。
- * `posix`:基于 posix 接口实现。
+* `-D, --domain [unix | posix]`:通信协议。
+ * `unix`:基于 unix 协议实现。
+ * `posix`:基于 posix 协议实现。
+* `-A, --api [readwrite | recvsend | recvsendmsg]`:内部实现的接口类型。
+ * `readwrite` :使用 `read` 和 `write` 接口。
+ * `recvsend` :使用 `recv` 和 `send` 接口。
+ * `recvsendmsg` :使用 `recvmsg` 和 `sendmsg` 接口。
* `-P, --pktlen [xxxx]`:报文长度配置。
* `-v, --verify`:是否校验报文。
* `-r, --ringpmd`:是否基于dpdk ring PMD 收发环回。
+* `-d, --debug`:是否打印调试信息。
* `-h, --help`:获得帮助信息。
## 使用
+ * **环境配置**
+ * 参考 https://gitee.com/openeuler/libboundscheck 。
+
+ * **编译**
+
```
cd build
mkdir examples
cd examples
cmake ../../examples
make
-./examples --help
```
+
+ * **查看帮助信息**
+
+ ```
+ ./examples --help
+
+ -a, --as [server | client]: set programas server or client.
+ server: as server.
+ client: as client.
+-i, --ip [???.???.???.???]: set ip address.
+-p, --port [????]: set port number in range of 1024 - 65535.
+-m, --model [mum | mud]: set the network model.
+ mum: multi thread, unblock, multiplexing IO network model.
+ mud: multi thread, unblock, dissymmetric network model.
+-t, --threadnum [???]: set thread number in range of 1 - 1000.
+-c, --connectnum [???]: set connection number of each thread.
+-D, --domain [unix | posix]: set domain type is server or client.
+ unix: use unix's api.
+ posix: use posix api.
+-A, --api [readwrite | recvsend | recvsendmsg]: set api type is server or client.
+ readwrite: use `read` and `write`.
+ recvsend: use `recv and `send`.
+ recvsendmsg: use `recvmsg` and `sendmsg`.
+-P, --pktlen [????]: set packet length in range of 2 - 10485760.
+-v, --verify: set to verifying the message packet.
+-r, --ringpmd: set to use ringpmd.
+-d, --debug: set to print the debug information.
+-h, --help: see helps.
+ ```
+
+ * 创建服务端
+
+```
+./example --as server --verify
+
+[program parameters]:
+--> [as]: server
+--> [server ip]: 127.0.0.1
+--> [server port]: 5050
+--> [model]: mum
+--> [thread number]: 8
+--> [domain]: posix
+--> [api]: read & write
+--> [packet length]: 1024
+--> [verify]: on
+--> [ringpmd]: off
+--> [debug]: off
+
+[program informations]:
+--> <server>: [connect num]: 0, [receive]: 0.000 B/s
+```
+
+ * 创建客户端
+
+```
+./example --as client --verify
+
+[program parameters]:
+--> [as]: client
+--> [server ip]: 127.0.0.1
+--> [server port]: 5050
+--> [thread number]: 8
+--> [connection number]: 10
+--> [domain]: posix
+--> [api]: read & write
+--> [packet length]: 1024
+--> [verify]: on
+--> [ringpmd]: off
+--> [debug]: off
+
+[program informations]:
+--> <client>: [connect num]: 80, [send]: 357.959 MB/s
+```
\ No newline at end of file
diff --git a/examples/inc/bussiness.h b/examples/inc/bussiness.h
new file mode 100644
index 0000000..f16d771
--- /dev/null
+++ b/examples/inc/bussiness.h
@@ -0,0 +1,122 @@
+/*
+* Copyright (c) 2022-2023. yyangoO.
+* gazelle is licensed under the Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+* http://license.coscl.org.cn/MulanPSL2
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+* PURPOSE.
+* See the Mulan PSL v2 for more details.
+*/
+
+
+#ifndef __EXAMPLES_BUSSINESS_H__
+#define __EXAMPLES_BUSSINESS_H__
+
+
+#include "utilities.h"
+#include "parameter.h"
+
+
+#define BUSSINESS_MESSAGE_SIZE 26 ///< the size of business message
+
+
+/**
+ * @brief server handler
+ * The server handler.
+ */
+struct ServerHandler
+{
+ int32_t fd; ///< socket file descriptor
+};
+
+/**
+ * @brief client handler
+ * The client handler.
+ */
+struct ClientHandler
+{
+ int32_t fd; ///< socket file descriptor
+ uint32_t msg_idx; ///< the start charactors index of message
+};
+
+
+/**
+ * @brief read by specify api
+ * This function processes the reading by specify api.
+ * @param fd the file descriptor
+ * @param buffer_in the input buffer
+ * @param length the length
+ * @param api the type of api
+ * @return the result
+ */
+ int32_t read_api(int32_t fd, char *buffer_in, const uint32_t length, const char *api);
+
+/**
+ * @brief write by specify api
+ * This function processes the writing by specify api.
+ * @param fd the file descriptor
+ * @param buffer_out the output buffer
+ * @param length the length
+ * @param api the type of api
+ * @return the result
+ */
+ int32_t write_api(int32_t fd, char *buffer_out, const uint32_t length, const char *api);
+
+/**
+ * @brief the business processsing of server
+ * This function processes the business of server.
+ * @param out the output string
+ * @param in the input string
+ * @param size the size of input and output
+ * @param verify if we verify or not
+ * @return the result
+ */
+void server_bussiness(char *out, const char *in, uint32_t size);
+
+/**
+ * @brief the business processsing of client
+ * This function processes the business of client.
+ * @param out the output string
+ * @param in the input string
+ * @param size the size of input and output
+ * @param verify if we verify or not
+ * @param msg_idx the start charactors index of message
+ * @return the result
+ */
+int32_t client_bussiness(char *out, const char *in, uint32_t size, bool verify, uint32_t *msg_idx);
+
+/**
+ * @brief server checks the information and answers
+ * This function checks the information and answers.
+ * @param server_handler server handler
+ * @param pktlen the length of package
+ * @param api the api
+ * @return the result
+ */
+int32_t server_ans(struct ServerHandler *server_handler, uint32_t pktlen, const char* api);
+
+/**
+ * @brief client asks server
+ * This function asks server.
+ * @param client_handler client handler
+ * @param pktlen the length of package
+ * @param api the api
+ * @return the result
+ */
+int32_t client_ask(struct ClientHandler *client_handler, uint32_t pktlen, const char* api);
+
+/**
+ * @brief client checks the information and answers
+ * This function checks the information and answers.
+ * @param client_handler client handler
+ * @param pktlen the length of package
+ * @param verify verify or not
+ * @param api the api
+ * @return the result
+ */
+int32_t client_chkans(struct ClientHandler *client_handler, uint32_t pktlen, bool verify, const char* api);
+
+
+#endif // __EXAMPLES_BUSSINESS_H__
diff --git a/examples/inc/client.h b/examples/inc/client.h
new file mode 100644
index 0000000..d3ae017
--- /dev/null
+++ b/examples/inc/client.h
@@ -0,0 +1,121 @@
+/*
+* Copyright (c) 2022-2023. yyangoO.
+* gazelle is licensed under the Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+* http://license.coscl.org.cn/MulanPSL2
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+* PURPOSE.
+* See the Mulan PSL v2 for more details.
+*/
+
+
+#ifndef __EXAMPLES_CLIENT_H__
+#define __EXAMPLES_CLIENT_H__
+
+
+#include "utilities.h"
+#include "parameter.h"
+#include "bussiness.h"
+
+
+/**
+ * @brief client unit
+ * The information of each thread of client.
+ */
+struct ClientUnit
+{
+ struct ClientHandler *handlers; ///< the handlers
+ int32_t epfd; ///< the connect epoll file descriptor
+ struct epoll_event *epevs; ///< the epoll events
+ uint32_t curr_connect; ///< current connection number
+ uint64_t send_bytes; ///< total send bytes
+ in_addr_t ip; ///< server ip
+ uint16_t port; ///< server port
+ uint32_t connect_num; ///< total connection number
+ uint32_t pktlen; ///< the length of peckage
+ bool verify; ///< if we verify or not
+ char* domain; ///< the communication domain
+ char* api; ///< the type of api
+ bool debug; ///< if we print the debug information
+ struct ClientUnit *next; ///< next pointer
+};
+
+/**
+ * @brief client
+ * The information of client.
+ */
+struct Client
+{
+ struct ClientUnit *uints; ///< the server mum unit
+ bool debug; ///< if we print the debug information
+};
+
+
+/**
+ * @brief the single thread, client prints informations
+ * The single thread, client prints informations.
+ * @param ch_str the charactor string
+ * @param act_str the action string
+ * @param ip the ip address
+ * @param port the port
+ * @param debug if debug or not
+ * @return the result pointer
+ */
+void client_debug_print(const char *ch_str, const char *act_str, in_addr_t ip, uint16_t port, bool debug);
+
+/**
+ * @brief the client prints informations
+ * The client prints informations.
+ * @param client the client information
+ */
+void client_info_print(struct Client *client);
+
+/**
+ * @brief the single thread, client try to connect to server, register to epoll
+ * The single thread, client try to connect to server, register to epoll.
+ * @param client_handler the client handler
+ * @param epoll_fd the epoll file descriptor
+ * @param ip ip address
+ * @param port port
+ * @param domain domain
+ * @return the result pointer
+ */
+int32_t client_thread_try_connect(struct ClientHandler *client_handler, int32_t epoll_fd, in_addr_t ip, uint16_t port, const char *api);
+
+/**
+ * @brief the single thread, client retry to connect to server, register to epoll
+ * The single thread, client retry to connect to server, register to epoll.
+ * @param client_unit the client unit
+ * @param client_handler the client handler
+ * @return the result pointer
+ */
+int32_t client_thread_retry_connect(struct ClientUnit *client_unit, struct ClientHandler *client_handler);
+
+/**
+ * @brief the single thread, client connects and gets epoll feature descriptors
+ * The single thread, client connects and gets epoll feature descriptors.
+ * @param client_unit the client unit
+ * @return the result pointer
+ */
+int32_t client_thread_create_epfd_and_reg(struct ClientUnit *client_unit);
+
+/**
+ * @brief create client of single thread and run
+ * This function creates client of single thread and run.
+ * @param arg each thread's information of server
+ * @return the result pointer
+ */
+void *client_s_create_and_run(void *arg);
+
+/**
+ * @brief create client and run
+ * This function create the client and run.
+ * @param params the parameters pointer
+ * @return the result
+ */
+int32_t client_create_and_run(struct ProgramParams *params);
+
+
+#endif // __EXAMPLES_CLIENT_H__
diff --git a/examples/inc/parameter.h b/examples/inc/parameter.h
index d25a13a..ee8fe4e 100644
--- a/examples/inc/parameter.h
+++ b/examples/inc/parameter.h
@@ -24,9 +24,11 @@
#define PARAM_DEFAULT_MODEL ("mum") ///< default model type
#define PARAM_DEFAULT_CONNECT_NUM (10) ///< default connection number
#define PARAM_DEFAULT_THREAD_NUM (8) ///< default thread number
-#define PARAM_DEFAULT_API ("posix") ///< default API type
+#define PARAM_DEFAULT_DOMAIN ("posix") ///< default communication domain
+#define PARAM_DEFAULT_API ("readwrite") ///< default API type
#define PARAM_DEFAULT_PKTLEN (1024) ///< default packet length of message
#define PARAM_DEFAULT_VERIFY (false) ///< default flag of message verifying
+#define PARAM_DEFAULT_DEBUG (false) ///< default flag of debug
#define PARAM_DEFAULT_RINGPMD (false) ///< default flag of ring PMD of dpdk
enum {
@@ -42,6 +44,8 @@ enum {
PARAM_NUM_CONNECT_NUM = 'c',
#define PARAM_NAME_THREAD_NUM ("threadnum") ///< name of parameter thread number
PARAM_NUM_THREAD_NUM = 't',
+#define PARAM_NAME_DOMAIN ("domain") ///< name of parameter domain
+ PARAM_NUM_DOMAIN = 'D',
#define PARAM_NAME_API ("api") ///< name of parameter API type
PARAM_NUM_API = 'A',
#define PARAM_NAME_PKTLEN ("pktlen") ///< name of parameter packet length of message
@@ -50,6 +54,8 @@ enum {
PARAM_NUM_VERIFY = 'v',
#define PARAM_NAME_RINGPMD ("ringpmd") ///< name of parameter flag of ring PMD of dpdk
PARAM_NUM_RINGPMD = 'r',
+#define PARAM_NAME_DEBUG ("debug") ///< name of parameter flag of debug
+ PARAM_NUM_DEBUG = 'd',
#define PARAM_NAME_HELP ("help") ///< name of parameter help
PARAM_NUM_HELP = 'h',
};
@@ -81,13 +87,14 @@ struct ProgramParams {
char* model; ///< model type
uint32_t thread_num; ///< the number of threads
uint32_t connect_num; ///< the connection number
+ char* domain; ///< the communication dimain
char* api; ///< the type of api
uint32_t pktlen; ///< the packet length
bool verify; ///< if we verify the message or not
+ bool debug; ///< if we print the debug information or not
bool ringpmd; ///< if we use ring PMD or not
};
-
/**
* @brief initialize the parameters
* This function initializes the parameters of main function.
diff --git a/examples/inc/server.h b/examples/inc/server.h
new file mode 100644
index 0000000..fa9096b
--- /dev/null
+++ b/examples/inc/server.h
@@ -0,0 +1,231 @@
+/*
+* Copyright (c) 2022-2023. yyangoO.
+* gazelle is licensed under the Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+* http://license.coscl.org.cn/MulanPSL2
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+* PURPOSE.
+* See the Mulan PSL v2 for more details.
+*/
+
+
+#ifndef __EXAMPLES_SERVER_H__
+#define __EXAMPLES_SERVER_H__
+
+
+#include "utilities.h"
+#include "parameter.h"
+#include "bussiness.h"
+
+
+/**
+ * @brief server unit of model mum
+ * The information of each thread of server of model mum.
+ */
+struct ServerMumUnit
+{
+ struct ServerHandler listener; ///< the listen handler
+ int32_t epfd; ///< the listen epoll file descriptor
+ struct epoll_event *epevs; ///< the epoll events
+ uint32_t curr_connect; ///< current connection number
+ uint64_t recv_bytes; ///< total receive bytes
+ in_addr_t ip; ///< server ip
+ uint16_t port; ///< server port
+ uint32_t pktlen; ///< the length of peckage
+ char* domain; ///< communication domain
+ char* api; ///< the type of api
+ bool debug; ///< if we print the debug information
+ struct ServerMumUnit *next; ///< next pointer
+};
+
+/**
+ * @brief server model mum
+ * The information of server model mum.
+ */
+struct ServerMum
+{
+ struct ServerMumUnit *uints; ///< the server mum unit
+ bool debug; ///< if we print the debug information
+};
+
+/**
+ * @brief server unit of model mud worker unit
+ * The information of worker unit of server of model mud.
+ */
+struct ServerMudWorker
+{
+ struct ServerHandler worker; ///< the worker handler
+ int32_t epfd; ///< the worker epoll file descriptor
+ struct epoll_event *epevs; ///< the epoll events
+ uint64_t recv_bytes; ///< total receive bytes
+ uint32_t pktlen; ///< the length of peckage
+ in_addr_t ip; ///< client ip
+ uint16_t port; ///< client port
+ char* api; ///< the type of api
+ bool debug; ///< if we print the debug information
+ struct ServerMudWorker *next; ///< next pointer
+};
+
+/**
+ * @brief server model mud
+ * The information of server model mud.
+ */
+struct ServerMud
+{
+ struct ServerHandler listener; ///< the listen handler
+ struct ServerMudWorker *workers; ///< the workers
+ int32_t epfd; ///< the listen epoll file descriptor
+ struct epoll_event *epevs; ///< the epoll events
+ uint32_t curr_connect; ///< current connection number
+ in_addr_t ip; ///< server ip
+ uint16_t port; ///< server port
+ uint32_t pktlen; ///< the length of peckage
+ char* domain; ///< communication domain
+ char* api; ///< the type of api
+ bool debug; ///< if we print the debug information
+};
+
+
+/**
+ * @brief the worker thread, unblock, dissymmetric server prints debug informations
+ * The worker thread, unblock, dissymmetric server prints debug informations.
+ * @param ch_str the charactor string
+ * @param act_str the action string
+ * @param ip the ip address
+ * @param port the port
+ * @param debug if debug or not
+ * @return the result pointer
+ */
+void server_debug_print(const char *ch_str, const char *act_str, in_addr_t ip, uint16_t port, bool debug);
+
+/**
+ * @brief the multi thread, unblock, dissymmetric server prints informations
+ * The multi thread, unblock, dissymmetric server prints informations.
+ * @param server_mud the server information
+ */
+void sermud_info_print(struct ServerMud *server_mud);
+
+/**
+ * @brief the worker thread, unblock, dissymmetric server listens and gets epoll feature descriptors
+ * The worker thread, unblock, dissymmetric server listens and gets epoll feature descriptors.
+ * @param worker_unit the server worker
+ * @return the result pointer
+ */
+int32_t sermud_worker_create_epfd_and_reg(struct ServerMudWorker *worker_unit);
+
+/**
+ * @brief the listener thread, unblock, dissymmetric server listens and gets epoll feature descriptors
+ * The listener thread, unblock, dissymmetric server listens and gets epoll feature descriptors.
+ * @param server_mud the server unit
+ * @return the result pointer
+ */
+int32_t sermud_listener_create_epfd_and_reg(struct ServerMud *server_mud);
+
+/**
+ * @brief the listener thread, unblock, dissymmetric server accepts the connections
+ * The listener thread, unblock, dissymmetric server accepts the connections.
+ * @param server_mud the server unit
+ * @return the result pointer
+ */
+int32_t sermud_listener_accept_connects(struct ServerMud *server_mud);
+
+/**
+ * @brief the worker thread, unblock, dissymmetric server processes the events
+ * The worker thread, unblock, dissymmetric server processes the events.
+ * @param worker_unit the server worker
+ * @return the result pointer
+ */
+int32_t sermud_worker_proc_epevs(struct ServerMudWorker *worker_unit);
+
+/**
+ * @brief the listener thread, unblock, dissymmetric server processes the events
+ * The listener thread, unblock, dissymmetric server processes the events.
+ * @param server_mud the server unit
+ * @return the result pointer
+ */
+int32_t sermud_listener_proc_epevs(struct ServerMud *server_mud);
+
+/**
+ * @brief create the worker thread, unblock, dissymmetric server and run
+ * This function creates the worker thread, unblock, dissymmetric server and run.
+ * @param arg each thread's information of server
+ * @return the result pointer
+ */
+void *sermud_worker_create_and_run(void *arg);
+
+/**
+ * @brief create the listener thread, unblock, dissymmetric server and run
+ * This function creates the listener thread, unblock, dissymmetric server and run.
+ * @param arg each thread's information of server
+ * @return the result pointer
+ */
+void *sermud_listener_create_and_run(void *arg);
+
+/**
+ * @brief create the multi thread, unblock, dissymmetric server and run
+ * This function creates the multi thread, unblock, dissymmetric server and run.
+ * @param params the parameters pointer
+ * @return the result
+ */
+int32_t sermud_create_and_run(struct ProgramParams *params);
+
+/**
+ * @brief the multi thread, unblock, mutliplexing IO server prints informations
+ * The multi thread, unblock, mutliplexing IO server prints informations.
+ * @param server_mum the server information
+ */
+void sermum_info_print(struct ServerMum *server_mum);
+
+/**
+ * @brief the single thread, unblock, mutliplexing IO server listens and gets epoll feature descriptors
+ * The single thread, unblock, mutliplexing IO server listens and gets epoll feature descriptors.
+ * @param server_unit the server unit
+ * @return the result pointer
+ */
+int32_t sersum_create_epfd_and_reg(struct ServerMumUnit *server_unit);
+
+/**
+ * @brief the single thread, unblock, mutliplexing IO server accepts the connections
+ * The single thread, unblock, mutliplexing IO server accepts the connections.
+ * @param server_unit the server unit
+ * @param server_handler the server handler
+ * @return the result pointer
+ */
+int32_t sersum_accept_connects(struct ServerMumUnit *server_unit, struct ServerHandler *server_handler);
+
+/**
+ * @brief the single thread, unblock, mutliplexing IO server processes the events
+ * The single thread, unblock, mutliplexing IO server processes the events.
+ * @param server_unit the server unit
+ * @return the result pointer
+ */
+int32_t sersum_proc_epevs(struct ServerMumUnit *server_unit);
+
+/**
+ * @brief create the single thread, unblock, mutliplexing IO server
+ * This function creates the single thread, unblock, mutliplexing IO server.
+ * @param arg each thread's information of server
+ * @return the result pointer
+ */
+void *sersum_create_and_run(void *arg);
+
+/**
+ * @brief create the multi thread, unblock, mutliplexing IO server and run
+ * This function creates the multi thread, unblock, mutliplexing IO server and run.
+ * @param params the parameters pointer
+ * @return the result
+ */
+int32_t sermum_create_and_run(struct ProgramParams *params);
+
+/**
+ * @brief create server and run
+ * This function create the specify server and run.
+ * @param params the parameters pointer
+ * @return the result
+ */
+int32_t server_create_and_run(struct ProgramParams *params);
+
+
+#endif // __EXAMPLES_SERVER_H__
diff --git a/examples/inc/utilities.h b/examples/inc/utilities.h
index f9064c5..a684d35 100644
--- a/examples/inc/utilities.h
+++ b/examples/inc/utilities.h
@@ -24,16 +24,23 @@
#include <unistd.h>
#include <ctype.h>
+#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
+#include <sys/select.h>
+#include <sys/time.h>
+#include <sys/un.h>
#include <netinet/in.h>
#include <arpa/inet.h>
+#include "securec.h"
+#include "securectype.h"
+
#define PRINT_ERROR(format, ...) do \
{ \
@@ -59,19 +66,76 @@
printf(format, ##__VA_ARGS__); \
printf("\n"); \
} while(0)
+#define PRINT_SERVER_DATAFLOW(format, ...) do \
+ { \
+ printf("\033[?25l\033[A\033[K"); \
+ printf("--> <server>: "); \
+ printf(format, ##__VA_ARGS__); \
+ printf("\033[?25h\n"); \
+ } while(0)
+#define PRINT_CLIENT_DATAFLOW(format, ...) do \
+ { \
+ printf("\033[?25l\033[A\033[K"); \
+ printf("--> <client>: "); \
+ printf(format, ##__VA_ARGS__); \
+ printf("\033[?25h\n"); \
+ } while(0)
#define LIMIT_VAL_RANGE(val, min, max) ((val) < (min) ? (min) : ((val) > (max) ? (max) : (val)))
#define CHECK_VAL_RANGE(val, min, max) ((val) < (min) ? (false) : ((val) > (max) ? (false) : (true)))
-#define PROGRAM_OK (0) ///< program ok flag
-#define PROGRAM_ABORT (1) ///< program abort flag
-#define PROGRAM_FAULT (-1) ///< program fault flag
+#define PROGRAM_OK (0) ///< program ok flag
+#define PROGRAM_ABORT (1) ///< program abort flag
+#define PROGRAM_FAULT (-1) ///< program fault flag
+#define PROGRAM_INPROGRESS (-2) ///< program in progress flag
+
+#define UNIX_TCP_PORT_MIN (1024) ///< TCP minimum port number in unix
+#define UNIX_TCP_PORT_MAX (65535) ///< TCP minimum port number in unix
+#define THREAD_NUM_MIN (1) ///< minimum number of thead
+#define THREAD_NUM_MAX (1000) ///< maximum number of thead
+#define MESSAGE_PKTLEN_MIN (2) ///< minimum length of message (1 byte)
+#define MESSAGE_PKTLEN_MAX (1024 * 1024 * 10) ///< maximum length of message (10 Mb)
+
+#define SERVER_SOCKET_LISTEN_BACKLOG (128) ///< the queue of socket
+#define SERVER_EPOLL_SIZE_MAX (10000) ///< the max wait event of epoll
+#define SERVER_EPOLL_WAIT_TIMEOUT (-1) ///< the timeout value of epoll
+
+#define CLIENT_EPOLL_SIZE_MAX (10000) ///< the max wait event of epoll
+#define CLIENT_EPOLL_WAIT_TIMEOUT (-1) ///< the timeout value of epoll
+
+#define TERMINAL_REFRESH_MS (100) ///< the time cut off between of terminal refresh
+
+#define SOCKET_UNIX_DOMAIN_FILE "unix_domain_file" ///< socket unix domain file
+
+
+/**
+ * @brief create the socket and listen
+ * Thi function creates the socket and listen.
+ * @param socket_fd the socket file descriptor
+ * @param ip ip address
+ * @param port port number
+ * @param domain domain
+ * @return the result
+ */
+int32_t create_socket_and_listen(int32_t *socket_fd, in_addr_t ip, uint16_t port, const char *domain);
+
+/**
+ * @brief create the socket and connect
+ * Thi function creates the socket and connect.
+ * @param socket_fd the socket file descriptor
+ * @param ip ip address
+ * @param port port number
+ * @param domain domain
+ * @return the result
+ */
+int32_t create_socket_and_connect(int32_t *socket_fd, in_addr_t ip, uint16_t port, const char *domain);
-#define UNIX_TCP_PORT_MIN (1024) ///< TCP minimum port number in unix
-#define UNIX_TCP_PORT_MAX (65535) ///< TCP minimum port number in unix
-#define THREAD_NUM_MIN (1) ///< minimum number of thead
-#define THREAD_NUM_MAX (1000) ///< maximum number of thead
-#define MESSAGE_PKTLEN_MIN (1) ///< minimum length of message (1 byte)
-#define MESSAGE_PKTLEN_MAX (10485760) ///< maximum length of message (10 Mb)
+/**
+ * @brief set the socket to unblock
+ * Thi function sets the socket to unblock.
+ * @param socket_fd the socket file descriptor
+ * @return the result
+ */
+int32_t set_socket_unblock(int32_t socket_fd);
#endif // __EXAMPLES_UTILITIES_H__
diff --git a/examples/main.c b/examples/main.c
index f050dc5..5338572 100644
--- a/examples/main.c
+++ b/examples/main.c
@@ -12,6 +12,8 @@
#include "utilities.h"
#include "parameter.h"
+#include "server.h"
+#include "client.h"
static struct ProgramParams prog_params;
@@ -27,5 +29,11 @@ int32_t main(int argc, char *argv[])
}
program_params_print(&prog_params);
+ if (strcmp(prog_params.as, "server") == 0) {
+ server_create_and_run(&prog_params);
+ } else {
+ client_create_and_run(&prog_params);
+ }
+
return ret;
}
diff --git a/examples/src/bussiness.c b/examples/src/bussiness.c
new file mode 100644
index 0000000..f55a37b
--- /dev/null
+++ b/examples/src/bussiness.c
@@ -0,0 +1,234 @@
+/*
+* Copyright (c) 2022-2023. yyangoO.
+* gazelle is licensed under the Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+* http://license.coscl.org.cn/MulanPSL2
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+* PURPOSE.
+* See the Mulan PSL v2 for more details.
+*/
+
+
+#include "bussiness.h"
+
+
+static const char bussiness_messages_low[] = "abcdefghijklmnopqrstuvwxyz"; // the lower charactors of business message
+static const char bussiness_messages_cap[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; // the capital charactors of business message
+
+
+// read by specify api
+ int32_t read_api(int32_t fd, char *buffer_in, const uint32_t length, const char *api)
+ {
+ if (strcmp(api, "readwrite") == 0) {
+ return read(fd, buffer_in, length);
+ } else if (strcmp(api, "recvsend") == 0) {
+ return recv(fd, buffer_in, length, 0);
+ } else {
+ struct msghdr msg_recv;
+ struct iovec iov;
+
+ msg_recv.msg_name = NULL;
+ msg_recv.msg_namelen = 0;
+ msg_recv.msg_iov = &iov;
+ msg_recv.msg_iovlen = 1;
+ msg_recv.msg_iov->iov_base = buffer_in;
+ msg_recv.msg_iov->iov_len = length;
+ msg_recv.msg_control = 0;
+ msg_recv.msg_controllen = 0;
+ msg_recv.msg_flags = 0;
+
+ return recvmsg(fd, &msg_recv, 0);
+ }
+ }
+
+// write by specify api
+ int32_t write_api(int32_t fd, char *buffer_out, const uint32_t length, const char *api)
+ {
+ if (strcmp(api, "readwrite") == 0) {
+ return write(fd, buffer_out, length);
+ } else if (strcmp(api, "recvsend") == 0) {
+ return send(fd, buffer_out, length, 0);
+ } else {
+ struct msghdr msg_send;
+ struct iovec iov;
+
+ msg_send.msg_name = NULL;
+ msg_send.msg_namelen = 0;
+ msg_send.msg_iov = &iov;
+ msg_send.msg_iovlen = 1;
+ msg_send.msg_iov->iov_base = buffer_out;
+ msg_send.msg_iov->iov_len = length;
+ msg_send.msg_control = 0;
+ msg_send.msg_controllen = 0;
+ msg_send.msg_flags = 0;
+
+ return sendmsg(fd, &msg_send, 0);
+ }
+ }
+
+// the business processsing of server
+void server_bussiness(char *out, const char *in, uint32_t size)
+{
+ char diff = 'a' - 'A';
+ for (uint32_t i = 0; i < size; ++i) {
+ if (in[i] != '\0') {
+ out[i] = in[i] - diff;
+ } else {
+ out[i] = '\0';
+ }
+ }
+}
+
+// the business processsing of client
+int32_t client_bussiness(char *out, const char *in, uint32_t size, bool verify, uint32_t *msg_idx)
+{
+ if (verify == false) {
+ for (uint32_t i = 0; i < (size - 1); ++i) {
+ out[i] = bussiness_messages_low[(*msg_idx + i) % BUSSINESS_MESSAGE_SIZE];
+ }
+ } else {
+ uint32_t verify_start_idx = (*msg_idx == 0) ? (BUSSINESS_MESSAGE_SIZE - 1) : (*msg_idx - 1);
+ for (uint32_t i = 0; i < (size - 1); ++i) {
+ if (in[i] != bussiness_messages_cap[(verify_start_idx + i) % BUSSINESS_MESSAGE_SIZE]) {
+ return PROGRAM_FAULT;
+ }
+ out[i] = bussiness_messages_low[(*msg_idx + i) % BUSSINESS_MESSAGE_SIZE];
+ }
+ }
+ out[size - 1] = '\0';
+
+ ++(*msg_idx);
+ *msg_idx = (*msg_idx) % BUSSINESS_MESSAGE_SIZE;
+
+ return PROGRAM_OK;
+}
+
+// server answers
+int32_t server_ans(struct ServerHandler *server_handler, uint32_t pktlen, const char* api)
+{
+ const uint32_t length = pktlen;
+ char *buffer_in = (char *)malloc(length * sizeof(char));
+ char *buffer_out = (char *)malloc(length * sizeof(char));
+
+ int32_t cread = 0;
+ int32_t sread = length;
+ while (cread < sread) {
+ int32_t nread = read_api(server_handler->fd, buffer_in, length, api);
+ if (nread == 0) {
+ return PROGRAM_ABORT;
+ } else if (nread < 0) {
+ if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) {
+ return PROGRAM_FAULT;
+ }
+ } else {
+ cread += nread;
+ continue;
+ }
+ }
+
+ server_bussiness(buffer_out, buffer_in, length);
+
+ int32_t cwrite = 0;
+ int32_t swrite = length;
+ while (cwrite < swrite) {
+ int32_t nwrite = write_api(server_handler->fd, buffer_out, length, api);
+ if (nwrite == 0) {
+ return PROGRAM_ABORT;
+ } else if (nwrite < 0) {
+ if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) {
+ return PROGRAM_FAULT;
+ }
+ } else {
+ cwrite += nwrite;
+ continue;
+ }
+ }
+
+ free(buffer_in);
+ free(buffer_out);
+
+ return PROGRAM_OK;
+}
+
+// client asks
+int32_t client_ask(struct ClientHandler *client_handler, uint32_t pktlen, const char* api)
+{
+ const uint32_t length = pktlen;
+ char *buffer_in = (char *)malloc(length * sizeof(char));
+ char *buffer_out = (char *)malloc(length * sizeof(char));
+
+ client_bussiness(buffer_out, buffer_in, length, false, &(client_handler->msg_idx));
+
+ int32_t cwrite = 0;
+ int32_t swrite = length;
+ while (cwrite < swrite) {
+ int32_t nwrite = write_api(client_handler->fd, buffer_out, length, api);
+ if (nwrite == 0) {
+ return PROGRAM_ABORT;
+ } else if (nwrite < 0) {
+ if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) {
+ return PROGRAM_FAULT;
+ }
+ } else {
+ cwrite += nwrite;
+ continue;
+ }
+ }
+
+ free(buffer_in);
+ free(buffer_out);
+
+ return PROGRAM_OK;
+}
+
+// client checks
+int32_t client_chkans(struct ClientHandler *client_handler, uint32_t pktlen, bool verify, const char* api)
+{
+ const uint32_t length = pktlen;
+ char *buffer_in = (char *)malloc(length * sizeof(char));
+ char *buffer_out = (char *)malloc(length * sizeof(char));
+
+ int32_t cread = 0;
+ int32_t sread = length;
+ while (cread < sread) {
+ int32_t nread = read_api(client_handler->fd, buffer_in, length, api);
+ if (nread == 0) {
+ return PROGRAM_ABORT;
+ } else if (nread < 0) {
+ if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) {
+ return PROGRAM_FAULT;
+ }
+ } else {
+ cread += nread;
+ continue;
+ }
+ }
+
+ if (client_bussiness(buffer_out, buffer_in, length, verify, &(client_handler->msg_idx)) < 0) {
+ PRINT_ERROR("message verify fault! ");
+ getchar();
+ }
+
+ int32_t cwrite = 0;
+ int32_t swrite = length;
+ while (cwrite < swrite) {
+ int32_t nwrite = write_api(client_handler->fd, buffer_out, length, api);
+ if (nwrite == 0) {
+ return PROGRAM_ABORT;
+ } else if (nwrite < 0) {
+ if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) {
+ return PROGRAM_FAULT;
+ }
+ } else {
+ cwrite += nwrite;
+ continue;
+ }
+ }
+
+ free(buffer_in);
+ free(buffer_out);
+
+ return PROGRAM_OK;
+}
\ No newline at end of file
diff --git a/examples/src/client.c b/examples/src/client.c
new file mode 100644
index 0000000..aafcd00
--- /dev/null
+++ b/examples/src/client.c
@@ -0,0 +1,387 @@
+/*
+* Copyright (c) 2022-2023. yyangoO.
+* gazelle is licensed under the Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+* http://license.coscl.org.cn/MulanPSL2
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+* PURPOSE.
+* See the Mulan PSL v2 for more details.
+*/
+
+
+#include "client.h"
+
+
+static pthread_mutex_t client_debug_mutex; // the client mutex for printf
+
+
+// the single thread, client prints informations
+void client_debug_print(const char *ch_str, const char *act_str, in_addr_t ip, uint16_t port, bool debug)
+{
+ if (debug == true) {
+ pthread_mutex_lock(&client_debug_mutex);
+ struct in_addr sin_addr;
+ sin_addr.s_addr = ip;
+ PRINT_CLIENT("[%s] [pid: %d] [tid: %ld] [%s <- %s:%d]. ", \
+ ch_str, \
+ getpid(), \
+ pthread_self(), \
+ act_str, \
+ inet_ntoa(sin_addr), \
+ ntohs(port));
+ pthread_mutex_unlock(&client_debug_mutex);
+ }
+}
+
+// the client prints informations
+void client_info_print(struct Client *client)
+{
+ if (client->debug == false) {
+ struct timeval begin;
+ gettimeofday(&begin, NULL);
+ uint64_t begin_time = (uint64_t)begin.tv_sec * 1000 + (uint64_t)begin.tv_usec / 1000;
+
+ uint32_t curr_connect = 0;
+ double bytes_ps = 0;
+ uint64_t begin_send_bytes = 0;
+ struct ClientUnit *begin_uint = client->uints;
+ while (begin_uint != NULL) {
+ curr_connect += begin_uint->curr_connect;
+ begin_send_bytes += begin_uint->send_bytes;
+ begin_uint = begin_uint->next;
+ }
+
+ struct timeval delay;
+ delay.tv_sec = 0;
+ delay.tv_usec = TERMINAL_REFRESH_MS * 1000;
+ select(0, NULL, NULL, NULL, &delay);
+
+ uint64_t end_send_bytes = 0;
+ struct ClientUnit *end_uint = client->uints;
+ while (end_uint != NULL) {
+ end_send_bytes += end_uint->send_bytes;
+ end_uint = end_uint->next;
+ }
+
+ struct timeval end;
+ gettimeofday(&end, NULL);
+ uint64_t end_time = (uint64_t)end.tv_sec * 1000 + (uint64_t)end.tv_usec / 1000;
+
+ double bytes_sub = end_send_bytes > begin_send_bytes ? (double)(end_send_bytes - begin_send_bytes) : 0;
+ double time_sub = end_time > begin_time ? (double)(end_time - begin_time) / 1000 : 0;
+
+ bytes_ps = bytes_sub / time_sub;
+
+ if (bytes_ps < 1024) {
+ PRINT_CLIENT_DATAFLOW("[connect num]: %d, [send]: %.3f B/s", curr_connect, bytes_ps);
+ } else if (bytes_ps < (1024 * 1024)) {
+ PRINT_CLIENT_DATAFLOW("[connect num]: %d, [send]: %.3f KB/s", curr_connect, bytes_ps / 1024);
+ } else {
+ PRINT_CLIENT_DATAFLOW("[connect num]: %d, [send]: %.3f MB/s", curr_connect, bytes_ps / (1024 * 1024));
+ }
+ }
+}
+
+// the single thread, client try to connect to server, register to epoll
+int32_t client_thread_try_connect(struct ClientHandler *client_handler, int32_t epoll_fd, in_addr_t ip, uint16_t port, const char *domain)
+{
+ int32_t create_socket_and_connect_ret = create_socket_and_connect(&(client_handler->fd), ip, port, domain);
+ if (create_socket_and_connect_ret == PROGRAM_INPROGRESS) {
+ struct epoll_event ep_ev;
+ ep_ev.events = EPOLLOUT;
+ ep_ev.data.ptr = (void *)client_handler;
+ if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_handler->fd, &ep_ev) < 0) {
+ PRINT_ERROR("client cant't set epoll %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ }
+ return PROGRAM_OK;
+}
+
+// the single thread, client retry to connect to server, register to epoll
+int32_t client_thread_retry_connect(struct ClientUnit *client_unit, struct ClientHandler *client_handler)
+{
+ int32_t clithd_try_cnntask_ret = client_thread_try_connect(client_handler, client_unit->epfd, client_unit->ip, client_unit->port, client_unit->domain);
+ if (clithd_try_cnntask_ret < 0) {
+ if (clithd_try_cnntask_ret == PROGRAM_INPROGRESS) {
+ return PROGRAM_OK;
+ }
+ return PROGRAM_FAULT;
+ }
+ struct epoll_event ep_ev;
+ ep_ev.events = EPOLLIN | EPOLLET;
+ ep_ev.data.ptr = (void *)client_handler;
+ if (epoll_ctl(client_unit->epfd, EPOLL_CTL_ADD, client_handler->fd, &ep_ev) < 0) {
+ PRINT_ERROR("client cant't set epoll %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ ++(client_unit->curr_connect);
+
+ struct sockaddr_in server_addr;
+ socklen_t server_addr_len = sizeof(server_addr);
+ if (getpeername(client_handler->fd, (struct sockaddr *)&server_addr, &server_addr_len) < 0) {
+ PRINT_ERROR("client can't socket peername %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ client_debug_print("client unit", "connect", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+
+ int32_t client_ask_ret = client_ask(client_handler, client_unit->pktlen, client_unit->api);
+ if (client_ask_ret == PROGRAM_FAULT) {
+ --client_unit->curr_connect;
+ struct epoll_event ep_ev;
+ if (epoll_ctl(client_unit->epfd, EPOLL_CTL_DEL, client_handler->fd, &ep_ev) < 0) {
+ PRINT_ERROR("client can't delete socket '%d' to control epoll %d! ", client_handler->fd, errno);
+ return PROGRAM_FAULT;
+ }
+ } else if (client_ask_ret == PROGRAM_ABORT) {
+ --client_unit->curr_connect;
+ if (close(client_handler->fd) < 0) {
+ PRINT_ERROR("client can't close the socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ client_debug_print("client unit", "close", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+ } else {
+ client_unit->send_bytes += client_unit->pktlen;
+ client_debug_print("client unit", "send", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+ }
+
+ return PROGRAM_OK;
+}
+
+// the single thread, client connects and gets epoll feature descriptors
+int32_t client_thread_create_epfd_and_reg(struct ClientUnit *client_unit)
+{
+ const uint32_t connect_num = client_unit->connect_num;
+
+ client_unit->epfd = epoll_create(CLIENT_EPOLL_SIZE_MAX);
+ if (client_unit->epfd < 0) {
+ PRINT_ERROR("client can't create epoll %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ for (uint32_t i = 0; i < connect_num; ++i) {
+ int32_t clithd_try_cnntask_ret = client_thread_try_connect(client_unit->handlers + i, client_unit->epfd, client_unit->ip, client_unit->port, client_unit->domain);
+ if (clithd_try_cnntask_ret < 0) {
+ if (clithd_try_cnntask_ret == PROGRAM_INPROGRESS) {
+ continue;
+ }
+ return PROGRAM_FAULT;
+ } else {
+ struct epoll_event ep_ev;
+ ep_ev.events = EPOLLIN | EPOLLET;
+ ep_ev.data.ptr = (struct ClientHandler *)(client_unit->handlers + i);
+ if (epoll_ctl(client_unit->epfd, EPOLL_CTL_ADD, (client_unit->handlers + i)->fd, &ep_ev) < 0) {
+ PRINT_ERROR("client cant't set epoll %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ ++(client_unit->curr_connect);
+
+ struct sockaddr_in server_addr;
+ socklen_t server_addr_len = sizeof(server_addr);
+ if (getpeername((client_unit->handlers + i)->fd, (struct sockaddr *)&server_addr, &server_addr_len) < 0) {
+ PRINT_ERROR("client can't socket peername %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ client_debug_print("client unit", "connect", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+
+ int32_t client_ask_ret = client_ask(client_unit->handlers + i, client_unit->pktlen, client_unit->api);
+ if (client_ask_ret == PROGRAM_FAULT) {
+ --client_unit->curr_connect;
+ struct epoll_event ep_ev;
+ if (epoll_ctl(client_unit->epfd, EPOLL_CTL_DEL, (client_unit->handlers + i)->fd, &ep_ev) < 0) {
+ PRINT_ERROR("client can't delete socket '%d' to control epoll %d! ", client_unit->epevs[i].data.fd, errno);
+ return PROGRAM_FAULT;
+ }
+ } else if (client_ask_ret == PROGRAM_ABORT) {
+ --client_unit->curr_connect;
+ if (close((client_unit->handlers + i)->fd) < 0) {
+ PRINT_ERROR("client can't close the socket! ");
+ return PROGRAM_FAULT;
+ }
+ client_debug_print("client unit", "close", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+ } else {
+ client_unit->send_bytes += client_unit->pktlen;
+ client_debug_print("client unit", "send", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+ }
+ }
+ }
+
+ return PROGRAM_OK;
+}
+
+// the single thread, client processes epoll events
+int32_t clithd_proc_epevs(struct ClientUnit *client_unit)
+{
+ int32_t epoll_nfds = epoll_wait(client_unit->epfd, client_unit->epevs, CLIENT_EPOLL_SIZE_MAX, CLIENT_EPOLL_WAIT_TIMEOUT);
+ if (epoll_nfds < 0) {
+ PRINT_ERROR("client epoll wait error %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ for (int32_t i = 0; i < epoll_nfds; ++i) {
+ struct epoll_event *curr_epev = client_unit->epevs + i;
+
+ if (curr_epev->events == EPOLLERR) {
+ PRINT_ERROR("client epoll wait error! %d", curr_epev->events);
+ return PROGRAM_FAULT;
+ } else if (curr_epev->events == EPOLLOUT) {
+ int32_t connect_error = 0;
+ socklen_t connect_error_len = sizeof(connect_error);
+ struct ClientHandler *client_handler = (struct ClientHandler *)curr_epev->data.ptr;
+ if (getsockopt(client_handler->fd, SOL_SOCKET, SO_ERROR, (void *)(&connect_error), &connect_error_len) < 0) {
+ PRINT_ERROR("client can't get socket option %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ if (connect_error < 0) {
+ if (connect_error == ETIMEDOUT) {
+ if (client_thread_retry_connect(client_unit, client_handler) < 0) {
+ return PROGRAM_FAULT;
+ }
+ continue;
+ }
+ PRINT_ERROR("client connect error %d! ", connect_error);
+ return PROGRAM_FAULT;
+ } else {
+ ++(client_unit->curr_connect);
+
+ struct sockaddr_in server_addr;
+ socklen_t server_addr_len = sizeof(server_addr);
+ if (getpeername(client_handler->fd, (struct sockaddr *)&server_addr, &server_addr_len) < 0) {
+ PRINT_ERROR("client can't socket peername %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ client_debug_print("client unit", "connect", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+
+ int32_t client_ask_ret = client_ask(client_handler, client_unit->pktlen, client_unit->api);
+ if (client_ask_ret == PROGRAM_FAULT) {
+ --client_unit->curr_connect;
+ struct epoll_event ep_ev;
+ if (epoll_ctl(client_unit->epfd, EPOLL_CTL_DEL, curr_epev->data.fd, &ep_ev) < 0) {
+ PRINT_ERROR("client can't delete socket '%d' to control epoll %d! ", curr_epev->data.fd, errno);
+ return PROGRAM_FAULT;
+ }
+ } else if (client_ask_ret == PROGRAM_ABORT) {
+ --client_unit->curr_connect;
+ if (close(curr_epev->data.fd) < 0) {
+ PRINT_ERROR("client can't close the socket! ");
+ return PROGRAM_FAULT;
+ }
+ client_debug_print("client unit", "close", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+ } else {
+ client_unit->send_bytes += client_unit->pktlen;
+ client_debug_print("client unit", "send", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+ }
+ }
+ } else if (curr_epev->events == EPOLLIN) {
+ struct sockaddr_in server_addr;
+ socklen_t server_addr_len = sizeof(server_addr);
+ struct ClientHandler *client_handler = (struct ClientHandler *)curr_epev->data.ptr;
+ if (getpeername(client_handler->fd, (struct sockaddr *)&server_addr, &server_addr_len) < 0) {
+ PRINT_ERROR("client can't socket peername %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ int32_t client_chkans_ret = client_chkans((struct ClientHandler *)curr_epev->data.ptr, client_unit->pktlen, client_unit->verify, client_unit->api);
+ if (client_chkans_ret == PROGRAM_FAULT) {
+ --client_unit->curr_connect;
+ struct epoll_event ep_ev;
+ if (epoll_ctl(client_unit->epfd, EPOLL_CTL_DEL, curr_epev->data.fd, &ep_ev) < 0) {
+ PRINT_ERROR("client can't delete socket '%d' to control epoll %d! ", curr_epev->data.fd, errno);
+ return PROGRAM_FAULT;
+ }
+ } else if (client_chkans_ret == PROGRAM_ABORT) {
+ --client_unit->curr_connect;
+ if (close(curr_epev->data.fd) < 0) {
+ PRINT_ERROR("client can't close the socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ client_debug_print("client unit", "close", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+ } else {
+ client_unit->send_bytes += client_unit->pktlen;
+ client_debug_print("client unit", "receive", server_addr.sin_addr.s_addr, server_addr.sin_port, client_unit->debug);
+ }
+ }
+ }
+
+ return PROGRAM_OK;
+}
+
+// create client of single thread and run
+void *client_s_create_and_run(void *arg)
+{
+ struct ClientUnit *client_unit = (struct ClientUnit *)arg;
+
+ if (client_thread_create_epfd_and_reg(client_unit) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+ while (true) {
+ if (clithd_proc_epevs(client_unit) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+ }
+ for (int i = 0; i < client_unit->connect_num; ++i) {
+ close((client_unit->handlers + i)->fd);
+ }
+ close(client_unit->epfd);
+
+ return (void *)PROGRAM_OK;
+}
+
+// create client and run
+int32_t client_create_and_run(struct ProgramParams *params)
+{
+ const uint32_t connect_num = params->connect_num;
+ const uint32_t thread_num = params->thread_num;
+ pthread_t *tids = (pthread_t *)malloc(thread_num * sizeof(pthread_t));
+ struct Client *client = (struct Client *)malloc(sizeof(struct Client));
+ struct ClientUnit *client_unit = (struct ClientUnit *)malloc(sizeof(struct ClientUnit));
+
+ if (pthread_mutex_init(&client_debug_mutex, NULL) < 0) {
+ PRINT_ERROR("client can't init posix mutex %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ client->uints = client_unit;
+ client->debug = params->debug;
+
+ for (uint32_t i = 0; i < thread_num; ++i) {
+ client_unit->handlers = (struct ClientHandler *)malloc(connect_num * sizeof(struct ClientHandler));
+ for (uint32_t j = 0; j < connect_num; ++j) {
+ client_unit->handlers[j].fd = -1;
+ client_unit->handlers[j].msg_idx = 0;
+ }
+ client_unit->epfd = -1;
+ client_unit->epevs = (struct epoll_event *)malloc(CLIENT_EPOLL_SIZE_MAX * sizeof(struct epoll_event));
+ client_unit->curr_connect = 0;
+ client_unit->send_bytes = 0;
+ client_unit->ip = inet_addr(params->ip);
+ client_unit->port = htons(params->port);
+ client_unit->connect_num = params->connect_num;
+ client_unit->pktlen = params->pktlen;
+ client_unit->verify = params->verify;
+ client_unit->domain = params->domain;
+ client_unit->api = params->api;
+ client_unit->debug = params->debug;
+ client_unit->next = (struct ClientUnit *)malloc(sizeof(struct ClientUnit));
+
+ if (pthread_create((tids + i), NULL, client_s_create_and_run, client_unit) < 0) {
+ PRINT_ERROR("client can't create thread of poisx %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ client_unit = client_unit->next;
+ }
+
+ if (client->debug == false) {
+ printf("[program informations]: \n\n");
+ }
+ while (true) {
+ client_info_print(client);
+ }
+
+ pthread_mutex_destroy(&client_debug_mutex);
+
+ return PROGRAM_OK;
+}
diff --git a/examples/src/parameter.c b/examples/src/parameter.c
index 996188b..100ee11 100644
--- a/examples/src/parameter.c
+++ b/examples/src/parameter.c
@@ -22,148 +22,141 @@ const char prog_short_opts[] = \
"m:" // model
"t:" // thread number
"c:" // connect number
+ "D:" // communication domain
"A:" // api
"P:" // pktlen
"v" // verify
"r" // ringpmd
+ "d" // debug
"h" // help
;
// program long options
-const struct ProgramOption prog_long_opts[] = { \
+const struct ProgramOption prog_long_opts[] = \
+{
{PARAM_NAME_AS, REQUIRED_ARGUMETN, NULL, PARAM_NUM_AS},
{PARAM_NAME_IP, REQUIRED_ARGUMETN, NULL, PARAM_NUM_IP},
{PARAM_NAME_PORT, REQUIRED_ARGUMETN, NULL, PARAM_NUM_PORT},
{PARAM_NAME_MODEL, REQUIRED_ARGUMETN, NULL, PARAM_NUM_MODEL},
{PARAM_NAME_THREAD_NUM, REQUIRED_ARGUMETN, NULL, PARAM_NUM_THREAD_NUM},
{PARAM_NAME_CONNECT_NUM, REQUIRED_ARGUMETN, NULL, PARAM_NUM_CONNECT_NUM},
+ {PARAM_NAME_DOMAIN, REQUIRED_ARGUMETN, NULL, PARAM_NUM_DOMAIN},
{PARAM_NAME_API, REQUIRED_ARGUMETN, NULL, PARAM_NUM_API},
{PARAM_NAME_PKTLEN, REQUIRED_ARGUMETN, NULL, PARAM_NUM_PKTLEN},
{PARAM_NAME_VERIFY, NO_ARGUMENT, NULL, PARAM_NUM_VERIFY},
{PARAM_NAME_RINGPMD, NO_ARGUMENT, NULL, PARAM_NUM_RINGPMD},
+ {PARAM_NAME_DEBUG, NO_ARGUMENT, NULL, PARAM_NUM_DEBUG},
{PARAM_NAME_HELP, NO_ARGUMENT, NULL, PARAM_NUM_HELP},
};
// get long options
-int getopt_long(int argc, char * const argv[], const char *optstring, const struct ProgramOption *long_opts,
- int *long_idx);
+int getopt_long(int argc, char * const argv[], const char *optstring, const struct ProgramOption *long_opts, int *long_idx);
// set `as` parameter
-int32_t program_param_prase_as(struct ProgramParams *params, char *arg, const char *name)
+void program_param_parse_as(struct ProgramParams *params)
{
- if (strcmp(arg, "server") == 0 || strcmp(arg, "client") == 0) {
- params->as = arg;
- }
- else {
- PRINT_ERROR("illigal argument -- %s \n", name);
- return PROGRAM_ABORT;
+ if (strcmp(optarg, "server") == 0 || strcmp(optarg, "client") == 0) {
+ params->as = optarg;
+ } else {
+ PRINT_ERROR("illigal argument -- %s \n", optarg);
+ exit(PROGRAM_ABORT);
}
-
- return PROGRAM_OK;
}
// set `ip` parameter
-int32_t program_param_prase_ip(struct ProgramParams *params, char *arg, const char *name)
+void program_param_parse_ip(struct ProgramParams *params)
{
- if (inet_addr(arg) != INADDR_NONE) {
- params->ip = arg;
- }
- else {
- PRINT_ERROR("illigal argument -- %s \n", name);
- return PROGRAM_ABORT;
+ if (inet_addr(optarg) != INADDR_NONE) {
+ params->ip = optarg;
+ } else {
+ PRINT_ERROR("illigal argument -- %s \n", optarg);
+ exit(PROGRAM_ABORT);
}
-
- return PROGRAM_OK;
}
// set `port` parameter
-int32_t program_param_prase_port(struct ProgramParams *params, char *arg, const char *name)
+void program_param_parse_port(struct ProgramParams *params)
{
- int32_t port_arg = atoi(optarg);
+ int32_t port_arg = strtol(optarg, NULL, 0);
+ printf("%d\n", port_arg);
if (CHECK_VAL_RANGE(port_arg, UNIX_TCP_PORT_MIN, UNIX_TCP_PORT_MAX) == true) {
params->port = (uint32_t)port_arg;
+ } else {
+ PRINT_ERROR("illigal argument -- %s \n", optarg);
+ exit(PROGRAM_ABORT);
}
- else {
- PRINT_ERROR("illigal argument -- %s \n", name);
- return PROGRAM_ABORT;
- }
-
- return PROGRAM_OK;
}
// set `model` parameter
-int32_t program_param_prase_model(struct ProgramParams *params, char *arg, const char *name)
+void program_param_parse_model(struct ProgramParams *params)
{
if (strcmp(optarg, "mum") == 0 || strcmp(optarg, "mud") == 0) {
params->model = optarg;
+ } else {
+ PRINT_ERROR("illigal argument -- %s \n", optarg);
+ exit(PROGRAM_ABORT);
}
- else {
- PRINT_ERROR("illigal argument -- %s \n", name);
- return PROGRAM_ABORT;
- }
-
- return PROGRAM_OK;
}
// set `connect_num` parameter
-int32_t program_param_prase_connectnum(struct ProgramParams *params, char *arg, const char *name)
+void program_param_parse_connectnum(struct ProgramParams *params)
{
- int32_t connectnum_arg = atoi(optarg);
+ int32_t connectnum_arg = strtol(optarg, NULL, 0);
if (connectnum_arg > 0) {
params->connect_num = (uint32_t)connectnum_arg;
+ } else {
+ PRINT_ERROR("illigal argument -- %s \n", optarg);
+ exit(PROGRAM_ABORT);
}
- else {
- PRINT_ERROR("illigal argument -- %s \n", name);
- return PROGRAM_ABORT;
- }
-
- return PROGRAM_OK;
}
// set `thread_num` parameter
-int32_t program_param_prase_threadnum(struct ProgramParams *params, char *arg, const char *name)
+void program_param_parse_threadnum(struct ProgramParams *params)
{
- int32_t threadnum_arg = atoi(optarg);
+ int32_t threadnum_arg = strtol(optarg, NULL, 0);
if (CHECK_VAL_RANGE(threadnum_arg, THREAD_NUM_MIN, THREAD_NUM_MAX) == true) {
params->thread_num = (uint32_t)threadnum_arg;
+ } else {
+ PRINT_ERROR("illigal argument -- %s \n", optarg);
+ exit(PROGRAM_ABORT);
}
- else {
- PRINT_ERROR("illigal argument -- %s \n", name);
- return PROGRAM_ABORT;
- }
+}
- return PROGRAM_OK;
+// set `domain` parameter
+void program_param_parse_domain(struct ProgramParams *params)
+{
+ if (strcmp(optarg, "unix") == 0 || strcmp(optarg, "posix") == 0) {
+ params->domain = optarg;
+ } else {
+ PRINT_ERROR("illigal argument -- %s \n", optarg);
+ exit(PROGRAM_ABORT);
+ }
}
// set `api` parameter
-int32_t program_param_prase_api(struct ProgramParams *params, char *arg, const char *name)
+void program_param_parse_api(struct ProgramParams *params)
{
- if (strcmp(optarg, "unix") == 0 || strcmp(optarg, "posix") == 0) {
+ printf("aaaaaa %s\n", optarg);
+ if (strcmp(optarg, "readwrite") == 0 || strcmp(optarg, "recvsend") == 0 || strcmp(optarg, "recvsendmsg") == 0) {
params->api = optarg;
+ } else {
+ PRINT_ERROR("illigal argument -- %s \n", optarg);
+ exit(PROGRAM_ABORT);
}
- else {
- PRINT_ERROR("illigal argument -- %s \n", name);
- return PROGRAM_ABORT;
- }
-
- return PROGRAM_OK;
}
// set `pktlen` parameter
-int32_t program_param_prase_pktlen(struct ProgramParams *params, char *arg, const char *name)
+void program_param_parse_pktlen(struct ProgramParams *params)
{
- int32_t pktlen_arg = atoi(optarg);
+ int32_t pktlen_arg = strtol(optarg, NULL, 0);
if (CHECK_VAL_RANGE(pktlen_arg, MESSAGE_PKTLEN_MIN, MESSAGE_PKTLEN_MAX) == true) {
params->pktlen = (uint32_t)pktlen_arg;
+ } else {
+ PRINT_ERROR("illigal argument -- %s \n", optarg);
+ exit(PROGRAM_ABORT);
}
- else {
- PRINT_ERROR("illigal argument -- %s \n", name);
- return PROGRAM_ABORT;
- }
-
- return PROGRAM_OK;
}
// initialize the parameters
@@ -175,10 +168,12 @@ void program_params_init(struct ProgramParams *params)
params->model = PARAM_DEFAULT_MODEL;
params->thread_num = PARAM_DEFAULT_THREAD_NUM;
params->connect_num = PARAM_DEFAULT_CONNECT_NUM;
+ params->domain = PARAM_DEFAULT_DOMAIN;
params->api = PARAM_DEFAULT_API;
params->pktlen = PARAM_DEFAULT_PKTLEN;
params->verify = PARAM_DEFAULT_VERIFY;
params->ringpmd = PARAM_DEFAULT_RINGPMD;
+ params->debug = PARAM_DEFAULT_DEBUG;
}
// print program helps
@@ -188,19 +183,24 @@ void program_params_help(void)
printf("-a, --as [server | client]: set programas server or client. \n");
printf(" server: as server. \n");
printf(" client: as client. \n");
- printf("-i, --ip [xxx.xxx.xxx.xxx]: set ip address. \n");
- printf("-p, --port [xxxx]: set port number in range of %d - %d. \n", UNIX_TCP_PORT_MIN, UNIX_TCP_PORT_MAX);
+ printf("-i, --ip [???.???.???.???]: set ip address. \n");
+ printf("-p, --port [????]: set port number in range of %d - %d. \n", UNIX_TCP_PORT_MIN, UNIX_TCP_PORT_MAX);
printf("-m, --model [mum | mud]: set the network model. \n");
printf(" mum: multi thread, unblock, multiplexing IO network model. \n");
printf(" mud: multi thread, unblock, dissymmetric network model. \n");
- printf("-t, --threadnum [xxxx]: set thread number in range of %d - %d. \n", THREAD_NUM_MIN, THREAD_NUM_MAX);
- printf("-c, --connectnum [xxxx]: set thread number of connection. \n");
- printf("-A, --api [unix | posix]: set api type is server or client. \n");
+ printf("-t, --threadnum [???]: set thread number in range of %d - %d. \n", THREAD_NUM_MIN, THREAD_NUM_MAX);
+ printf("-c, --connectnum [???]: set connection number of each thread. \n");
+ printf("-D, --domain [unix | posix]: set domain type is server or client. \n");
printf(" unix: use unix's api. \n");
printf(" posix: use posix api. \n");
- printf("-P, --pktlen [xxxx]: set packet length in range of %d - %d. \n", MESSAGE_PKTLEN_MIN, MESSAGE_PKTLEN_MAX);
+ printf("-A, --api [readwrite | recvsend | recvsendmsg]: set api type is server or client. \n");
+ printf(" readwrite: use `read` and `write`. \n");
+ printf(" recvsend: use `recv and `send`. \n");
+ printf(" recvsendmsg: use `recvmsg` and `sendmsg`. \n");
+ printf("-P, --pktlen [????]: set packet length in range of %d - %d. \n", MESSAGE_PKTLEN_MIN, MESSAGE_PKTLEN_MAX);
printf("-v, --verify: set to verifying the message packet. \n");
- printf("-r, --ringpmd: set use ringpmd. \n");
+ printf("-r, --ringpmd: set to use ringpmd. \n");
+ printf("-d, --debug: set to print the debug information. \n");
printf("-h, --help: see helps. \n");
printf("\n");
}
@@ -208,40 +208,44 @@ void program_params_help(void)
// parse the parameters
int32_t program_params_parse(struct ProgramParams *params, uint32_t argc, char *argv[])
{
- int32_t ret = PROGRAM_OK;
+ int32_t c;
- while (ret == PROGRAM_OK) {
+ while (true) {
int32_t opt_idx = 0;
- int32_t c = getopt_long(argc, argv, prog_short_opts, prog_long_opts, &opt_idx);
+ c = getopt_long(argc, argv, prog_short_opts, prog_long_opts, &opt_idx);
+
if (c == -1) {
break;
}
switch (c) {
case (PARAM_NUM_AS):
- ret = program_param_prase_as(params, optarg, prog_long_opts[opt_idx].name);
+ program_param_parse_as(params);
break;
case (PARAM_NUM_IP):
- ret = program_param_prase_ip(params, optarg, prog_long_opts[opt_idx].name);
+ program_param_parse_ip(params);
break;
case (PARAM_NUM_PORT):
- ret = program_param_prase_port(params, optarg, prog_long_opts[opt_idx].name);
+ program_param_parse_port(params);
break;
case (PARAM_NUM_MODEL):
- ret = program_param_prase_model(params, optarg, prog_long_opts[opt_idx].name);
+ program_param_parse_model(params);
break;
case (PARAM_NUM_CONNECT_NUM):
- ret = program_param_prase_connectnum(params, optarg, prog_long_opts[opt_idx].name);
+ program_param_parse_connectnum(params);
break;
case (PARAM_NUM_THREAD_NUM):
- ret = program_param_prase_threadnum(params, optarg, prog_long_opts[opt_idx].name);
+ program_param_parse_threadnum(params);
+ break;
+ case (PARAM_NUM_DOMAIN):
+ program_param_parse_domain(params);
break;
case (PARAM_NUM_API):
- ret = program_param_prase_api(params, optarg, prog_long_opts[opt_idx].name);
+ program_param_parse_api(params);
break;
case (PARAM_NUM_PKTLEN):
- ret = program_param_prase_pktlen(params, optarg, prog_long_opts[opt_idx].name);
+ program_param_parse_pktlen(params);
break;
case (PARAM_NUM_VERIFY):
params->verify = true;
@@ -249,6 +253,9 @@ int32_t program_params_parse(struct ProgramParams *params, uint32_t argc, char *
case (PARAM_NUM_RINGPMD):
params->ringpmd = true;
break;
+ case (PARAM_NUM_DEBUG):
+ params->debug = true;
+ break;
case (PARAM_NUM_HELP):
program_params_help();
return PROGRAM_ABORT;
@@ -260,7 +267,12 @@ int32_t program_params_parse(struct ProgramParams *params, uint32_t argc, char *
}
}
- return ret;
+ if (strcmp(params->domain, "unix") == 0) {
+ params->thread_num = 1;
+ params->connect_num = 1;
+ }
+
+ return PROGRAM_OK;
}
// print the parameters
@@ -269,14 +281,28 @@ void program_params_print(struct ProgramParams *params)
printf("\n");
printf("[program parameters]: \n");
printf("--> [as]: %s \n", params->as);
- printf("--> [ip]: %s \n", params->ip);
- printf("--> [port]: %u \n", params->port);
- printf("--> [model]: %s \n", params->model);
- printf("--> [thread number]: %u \n", params->thread_num);
- printf("--> [connection number]: %u \n", params->connect_num);
- printf("--> [api]: %s \n", params->api);
+ printf("--> [server ip]: %s \n", params->ip);
+ printf("--> [server port]: %u \n", params->port);
+ if (strcmp(params->as, "server") == 0) {
+ printf("--> [model]: %s \n", params->model);
+ }
+ if ((strcmp(params->as, "server") == 0 && strcmp(params->model, "mum") == 0) || strcmp(params->as, "client") == 0) {
+ printf("--> [thread number]: %u \n", params->thread_num);
+ }
+ if (strcmp(params->as, "client") == 0) {
+ printf("--> [connection number]: %u \n", params->connect_num);
+ }
+ printf("--> [domain]: %s \n", params->domain);
+ if (strcmp(params->api, "readwrite") == 0) {
+ printf("--> [api]: read & write \n");
+ } else if (strcmp(params->api, "recvsend") == 0) {
+ printf("--> [api]: recv & send \n");
+ } else {
+ printf("--> [api]: recvmsg & sendmsg \n");
+ }
printf("--> [packet length]: %u \n", params->pktlen);
- printf("--> [verify]: %s \n", (true == params->verify) ? "on" : "off");
- printf("--> [ringpmd]: %s \n", (true == params->ringpmd) ? "on" : "off");
+ printf("--> [verify]: %s \n", (params->verify == true) ? "on" : "off");
+ printf("--> [ringpmd]: %s \n", (params->ringpmd == true) ? "on" : "off");
+ printf("--> [debug]: %s \n", (params->debug == true) ? "on" : "off");
printf("\n");
}
diff --git a/examples/src/server.c b/examples/src/server.c
new file mode 100644
index 0000000..d1dab72
--- /dev/null
+++ b/examples/src/server.c
@@ -0,0 +1,578 @@
+/*
+* Copyright (c) 2022-2023. yyangoO.
+* gazelle is licensed under the Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+* http://license.coscl.org.cn/MulanPSL2
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+* PURPOSE.
+* See the Mulan PSL v2 for more details.
+*/
+
+
+#include "server.h"
+
+
+static pthread_mutex_t server_debug_mutex; // the server mutex for debug
+
+// server debug information print
+void server_debug_print(const char *ch_str, const char *act_str, in_addr_t ip, uint16_t port, bool debug)
+{
+ if (debug == true) {
+ pthread_mutex_lock(&server_debug_mutex);
+ struct in_addr sin_addr;
+ sin_addr.s_addr = ip;
+ PRINT_SERVER("[%s] [pid: %d] [tid: %ld] [%s <- %s:%d]. ", \
+ ch_str, \
+ getpid(), \
+ pthread_self(), \
+ act_str, \
+ inet_ntoa(sin_addr), \
+ ntohs(port));
+ pthread_mutex_unlock(&server_debug_mutex);
+ }
+}
+
+// the multi thread, unblock, dissymmetric server prints informations
+void sermud_info_print(struct ServerMud *server_mud)
+{
+ if (server_mud->debug == false) {
+ uint32_t curr_connect = server_mud->curr_connect;
+
+ struct timeval begin;
+ gettimeofday(&begin, NULL);
+ uint64_t begin_time = (uint64_t)begin.tv_sec * 1000 + (uint64_t)begin.tv_usec / 1000;
+
+ double bytes_ps = 0;
+ uint64_t begin_recv_bytes = 0;
+ struct ServerMudWorker *begin_uint = server_mud->workers;
+ while (begin_uint != NULL) {
+ begin_recv_bytes += begin_uint->recv_bytes;
+ begin_uint = begin_uint->next;
+ }
+
+ struct timeval delay;
+ delay.tv_sec = 0;
+ delay.tv_usec = TERMINAL_REFRESH_MS * 1000;
+ select(0, NULL, NULL, NULL, &delay);
+
+ uint64_t end_recv_bytes = 0;
+ struct ServerMudWorker *end_uint = server_mud->workers;
+ while (end_uint != NULL) {
+ end_recv_bytes += end_uint->recv_bytes;
+ end_uint = end_uint->next;
+ }
+
+ struct timeval end;
+ gettimeofday(&end, NULL);
+ uint64_t end_time = (uint64_t)end.tv_sec * 1000 + (uint64_t)end.tv_usec / 1000;
+
+ double bytes_sub = end_recv_bytes > begin_recv_bytes ? (double)(end_recv_bytes - begin_recv_bytes) : 0;
+ double time_sub = end_time > begin_time ? (double)(end_time - begin_time) / 1000 : 0;
+
+ bytes_ps = bytes_sub / time_sub;
+
+ if (bytes_ps < 1024) {
+ PRINT_SERVER_DATAFLOW("[connect num]: %d, [receive]: %.3f B/s", curr_connect, bytes_ps);
+ } else if (bytes_ps < (1024 * 1024)) {
+ PRINT_SERVER_DATAFLOW("[connect num]: %d, [receive]: %.3f KB/s", curr_connect, bytes_ps / 1024);
+ } else {
+ PRINT_SERVER_DATAFLOW("[connect num]: %d, [receive]: %.3f MB/s", curr_connect, bytes_ps / (1024 * 1024));
+ }
+ }
+}
+
+// the worker thread, unblock, dissymmetric server listens and gets epoll feature descriptors
+int32_t sermud_worker_create_epfd_and_reg(struct ServerMudWorker *worker_unit)
+{
+ worker_unit->epfd = epoll_create(SERVER_EPOLL_SIZE_MAX);
+ if (worker_unit->epfd < 0) {
+ PRINT_ERROR("server can't create epoll %d! ", worker_unit->epfd);
+ return PROGRAM_FAULT;
+ }
+
+ struct epoll_event ep_ev;
+ ep_ev.data.ptr = (void *)&(worker_unit->worker);
+ ep_ev.events = EPOLLIN | EPOLLET;
+ if (epoll_ctl(worker_unit->epfd, EPOLL_CTL_ADD, worker_unit->worker.fd, &ep_ev) < 0) {
+ PRINT_ERROR("server can't control epoll %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ return PROGRAM_OK;
+}
+
+// the listener thread, unblock, dissymmetric server listens and gets epoll feature descriptors
+int32_t sermud_listener_create_epfd_and_reg(struct ServerMud *server_mud)
+{
+ server_mud->epfd = epoll_create(SERVER_EPOLL_SIZE_MAX);
+ if (server_mud->epfd < 0) {
+ PRINT_ERROR("server can't create epoll %d! ", server_mud->epfd);
+ return PROGRAM_FAULT;
+ }
+
+ struct epoll_event ep_ev;
+ ep_ev.data.ptr = (void *)&(server_mud->listener);
+ ep_ev.events = EPOLLIN | EPOLLET;
+ if (epoll_ctl(server_mud->epfd, EPOLL_CTL_ADD, server_mud->listener.fd, &ep_ev) < 0) {
+ PRINT_ERROR("server can't control epoll %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ server_debug_print("server mud listener", "waiting", server_mud->ip, server_mud->port, server_mud->debug);
+
+ return PROGRAM_OK;
+}
+
+// the listener thread, unblock, dissymmetric server accepts the connections
+int32_t sermud_listener_accept_connects(struct ServerMud *server_mud)
+{
+ while (true) {
+ struct sockaddr_in accept_addr;
+ uint32_t sockaddr_in_len = sizeof(struct sockaddr_in);
+ int32_t accept_fd = accept(server_mud->listener.fd, (struct sockaddr *)&accept_addr, &sockaddr_in_len);
+ if (accept_fd < 0) {
+ break;
+ }
+
+ if (set_socket_unblock(accept_fd) < 0) {
+ PRINT_ERROR("server can't set the connect socket to unblock! ");
+ return PROGRAM_FAULT;
+ }
+
+ ++(server_mud->curr_connect);
+
+ pthread_t *tid = (pthread_t *)malloc(sizeof(pthread_t));
+ struct ServerMudWorker *worker = (struct ServerMudWorker *)malloc(sizeof(struct ServerMudWorker));
+ worker->worker.fd = accept_fd;
+ worker->epfd = -1;
+ worker->epevs = (struct epoll_event *)malloc(sizeof(struct epoll_event));
+ worker->recv_bytes = 0;
+ worker->pktlen = server_mud->pktlen;
+ worker->ip = accept_addr.sin_addr.s_addr;
+ worker->port = accept_addr.sin_port;
+ worker->api = server_mud->api;
+ worker->debug = server_mud->debug;
+ worker->next = server_mud->workers;
+
+ server_mud->workers = worker;
+
+ if (pthread_create(tid, NULL, sermud_worker_create_and_run, server_mud->workers) < 0) {
+ PRINT_ERROR("server can't create poisx thread %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ server_debug_print("server mud listener", "accept", accept_addr.sin_addr.s_addr, accept_addr.sin_port, server_mud->debug);
+ }
+
+ return PROGRAM_OK;
+}
+
+// the worker thread, unblock, dissymmetric server processes the events
+int32_t sermud_worker_proc_epevs(struct ServerMudWorker *worker_unit)
+{
+ int32_t epoll_nfds = epoll_wait(worker_unit->epfd, worker_unit->epevs, SERVER_EPOLL_SIZE_MAX, SERVER_EPOLL_WAIT_TIMEOUT);
+ if (epoll_nfds < 0) {
+ PRINT_ERROR("server epoll wait error %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ for (int32_t i = 0; i < epoll_nfds; ++i) {
+ struct epoll_event *curr_epev = worker_unit->epevs + i;
+
+ if (curr_epev->events == EPOLLERR || curr_epev->events == EPOLLHUP || curr_epev->events == EPOLLRDHUP) {
+ PRINT_ERROR("server epoll wait error %d! ", curr_epev->events);
+ return PROGRAM_FAULT;
+ }
+
+ if (curr_epev->events == EPOLLIN) {
+ struct ServerHandler *server_handler = (struct ServerHandler *)curr_epev->data.ptr;
+
+ int32_t server_ans_ret = server_ans(server_handler, worker_unit->pktlen, worker_unit->api);
+ if (server_ans_ret == PROGRAM_FAULT) {
+ struct epoll_event ep_ev;
+ if (epoll_ctl(worker_unit->epfd, EPOLL_CTL_DEL, server_handler->fd, &ep_ev) < 0) {
+ PRINT_ERROR("server can't delete socket '%d' to control epoll %d! ", server_handler->fd, errno);
+ return PROGRAM_FAULT;
+ }
+ } else if (server_ans_ret == PROGRAM_ABORT) {
+ if (close(server_handler->fd) < 0) {
+ PRINT_ERROR("server can't close the socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ server_debug_print("server mud worker", "close", worker_unit->ip, worker_unit->port, worker_unit->debug);
+ } else {
+ worker_unit->recv_bytes += worker_unit->pktlen;
+ server_debug_print("server mud worker", "receive", worker_unit->ip, worker_unit->port, worker_unit->debug);
+ }
+ }
+ }
+
+ return PROGRAM_OK;
+}
+
+// the listener thread, unblock, dissymmetric server processes the events
+int32_t sermud_listener_proc_epevs(struct ServerMud *server_mud)
+{
+ int32_t epoll_nfds = epoll_wait(server_mud->epfd, server_mud->epevs, SERVER_EPOLL_SIZE_MAX, SERVER_EPOLL_WAIT_TIMEOUT);
+ if (epoll_nfds < 0) {
+ PRINT_ERROR("server epoll wait error %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ for (int32_t i = 0; i < epoll_nfds; ++i) {
+ struct epoll_event *curr_epev = server_mud->epevs + i;
+
+ if (curr_epev->events == EPOLLERR || curr_epev->events == EPOLLHUP || curr_epev->events == EPOLLRDHUP) {
+ PRINT_ERROR("server epoll wait error %d! ", curr_epev->events);
+ return PROGRAM_FAULT;
+ }
+
+ if (curr_epev->events == EPOLLIN) {
+ int32_t sermud_listener_accept_connects_ret = sermud_listener_accept_connects(server_mud);
+ if (sermud_listener_accept_connects_ret < 0) {
+ PRINT_ERROR("server try accept error %d! ", sermud_listener_accept_connects_ret);
+ return PROGRAM_FAULT;
+ }
+ }
+ }
+
+ return PROGRAM_OK;
+}
+
+// create the worker thread, unblock, dissymmetric server and run
+void *sermud_worker_create_and_run(void *arg)
+{
+ pthread_detach(pthread_self());
+
+ struct ServerMudWorker *worker_unit = (struct ServerMudWorker *)arg;
+
+ if (sermud_worker_create_epfd_and_reg(worker_unit) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+ while (true) {
+ if (sermud_worker_proc_epevs(worker_unit) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+ }
+
+ close(worker_unit->worker.fd);
+ close(worker_unit->epfd);
+
+ return (void *)PROGRAM_OK;
+}
+
+// create the listener thread, unblock, dissymmetric server and run
+void *sermud_listener_create_and_run(void *arg)
+{
+ struct ServerMud *server_mud = (struct ServerMud *)arg;
+
+ if (create_socket_and_listen(&(server_mud->listener.fd), server_mud->ip, server_mud->port, server_mud->domain) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+ if (sermud_listener_create_epfd_and_reg(server_mud) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+ while (true) {
+ if (sermud_listener_proc_epevs(server_mud) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+ }
+ if (close(server_mud->listener.fd) < 0 || close(server_mud->epfd) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+
+ return (void *)PROGRAM_OK;
+}
+
+// create the multi thread, unblock, dissymmetric server and run
+int32_t sermud_create_and_run(struct ProgramParams *params)
+{
+ pthread_t *tid = (pthread_t *)malloc(sizeof(pthread_t));
+ struct ServerMud *server_mud = (struct ServerMud *)malloc(sizeof(struct ServerMud));
+
+ if (pthread_mutex_init(&server_debug_mutex, NULL) < 0) {
+ PRINT_ERROR("server can't init posix mutex %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ server_mud->listener.fd = -1;
+ server_mud->workers = NULL;
+ server_mud->epfd = -1;
+ server_mud->epevs = (struct epoll_event *)malloc(SERVER_EPOLL_SIZE_MAX * sizeof(struct epoll_event));
+ server_mud->curr_connect = 0;
+ server_mud->ip = inet_addr(params->ip);
+ server_mud->port = htons(params->port);
+ server_mud->pktlen = params->pktlen;
+ server_mud->domain = params->domain;
+ server_mud->api = params->api;
+ server_mud->debug = params->debug;
+
+ if (pthread_create(tid, NULL, sermud_listener_create_and_run, server_mud) < 0) {
+ PRINT_ERROR("server can't create poisx thread %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ if (server_mud->debug == false) {
+ printf("[program informations]: \n\n");
+ }
+ while (true) {
+ sermud_info_print(server_mud);
+ }
+
+ pthread_mutex_destroy(&server_debug_mutex);
+
+ return PROGRAM_OK;
+}
+
+// the multi thread, unblock, mutliplexing IO server prints informations
+void sermum_info_print(struct ServerMum *server_mum)
+{
+ if (server_mum->debug == false) {
+ struct timeval begin;
+ gettimeofday(&begin, NULL);
+ uint64_t begin_time = (uint64_t)begin.tv_sec * 1000 + (uint64_t)begin.tv_usec / 1000;
+
+ uint32_t curr_connect = 0;
+ double bytes_ps = 0;
+ uint64_t begin_recv_bytes = 0;
+ struct ServerMumUnit *begin_uint = server_mum->uints;
+ while (begin_uint != NULL) {
+ curr_connect += begin_uint->curr_connect;
+ begin_recv_bytes += begin_uint->recv_bytes;
+ begin_uint = begin_uint->next;
+ }
+
+ struct timeval delay;
+ delay.tv_sec = 0;
+ delay.tv_usec = TERMINAL_REFRESH_MS * 1000;
+ select(0, NULL, NULL, NULL, &delay);
+
+ uint64_t end_recv_bytes = 0;
+ struct ServerMumUnit *end_uint = server_mum->uints;
+ while (end_uint != NULL) {
+ end_recv_bytes += end_uint->recv_bytes;
+ end_uint = end_uint->next;
+ }
+
+ struct timeval end;
+ gettimeofday(&end, NULL);
+ uint64_t end_time = (uint64_t)end.tv_sec * 1000 + (uint64_t)end.tv_usec / 1000;
+
+ double bytes_sub = end_recv_bytes > begin_recv_bytes ? (double)(end_recv_bytes - begin_recv_bytes) : 0;
+ double time_sub = end_time > begin_time ? (double)(end_time - begin_time) / 1000 : 0;
+
+ bytes_ps = bytes_sub / time_sub;
+
+ if (bytes_ps < 1024) {
+ PRINT_SERVER_DATAFLOW("[connect num]: %d, [receive]: %.3f B/s", curr_connect, bytes_ps);
+ } else if (bytes_ps < (1024 * 1024)) {
+ PRINT_SERVER_DATAFLOW("[connect num]: %d, [receive]: %.3f KB/s", curr_connect, bytes_ps / 1024);
+ } else {
+ PRINT_SERVER_DATAFLOW("[connect num]: %d, [receive]: %.3f MB/s", curr_connect, bytes_ps / (1024 * 1024));
+ }
+ }
+}
+
+// the single thread, unblock, mutliplexing IO server listens and gets epoll feature descriptors
+int32_t sersum_create_epfd_and_reg(struct ServerMumUnit *server_unit)
+{
+ server_unit->epfd = epoll_create(SERVER_EPOLL_SIZE_MAX);
+ if (server_unit->epfd < 0) {
+ PRINT_ERROR("server can't create epoll %d! ", server_unit->epfd);
+ return PROGRAM_FAULT;
+ }
+
+ struct epoll_event ep_ev;
+ ep_ev.data.ptr = (void *)&(server_unit->listener);
+ ep_ev.events = EPOLLIN | EPOLLET;
+ if (epoll_ctl(server_unit->epfd, EPOLL_CTL_ADD, server_unit->listener.fd, &ep_ev) < 0) {
+ PRINT_ERROR("server can't control epoll %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ server_debug_print("server mum unit", "waiting", server_unit->ip, server_unit->port, server_unit->debug);
+
+ return PROGRAM_OK;
+}
+
+// the single thread, unblock, mutliplexing IO server accepts the connections
+int32_t sersum_accept_connects(struct ServerMumUnit *server_unit, struct ServerHandler *server_handler)
+{
+ while (true) {
+ struct sockaddr_in accept_addr;
+ uint32_t sockaddr_in_len = sizeof(struct sockaddr_in);
+ int32_t accept_fd = accept(server_unit->listener.fd, (struct sockaddr *)&accept_addr, &sockaddr_in_len);
+ if (accept_fd < 0) {
+ break;
+ }
+
+ if (set_socket_unblock(accept_fd) < 0) {
+ PRINT_ERROR("server can't set the connect socket to unblock! ");
+ return PROGRAM_FAULT;
+ }
+
+ struct ServerHandler *server_handler = (struct ServerHandler *)malloc(sizeof(struct ServerHandler));
+ server_handler->fd = accept_fd;
+ struct epoll_event ep_ev;
+ ep_ev.data.ptr = (void *)server_handler;
+ ep_ev.events = EPOLLIN | EPOLLET;
+ if (epoll_ctl(server_unit->epfd, EPOLL_CTL_ADD, accept_fd, &ep_ev) < 0) {
+ PRINT_ERROR("server can't add socket '%d' to control epoll %d! ", accept_fd, errno);
+ return PROGRAM_FAULT;
+ }
+
+ ++server_unit->curr_connect;
+
+ server_debug_print("server mum unit", "accept", accept_addr.sin_addr.s_addr, accept_addr.sin_port, server_unit->debug);
+ }
+
+ return PROGRAM_OK;
+}
+
+// the single thread, unblock, mutliplexing IO server processes the events
+int32_t sersum_proc_epevs(struct ServerMumUnit *server_unit)
+{
+ int32_t epoll_nfds = epoll_wait(server_unit->epfd, server_unit->epevs, SERVER_EPOLL_SIZE_MAX, SERVER_EPOLL_WAIT_TIMEOUT);
+ if (epoll_nfds < 0) {
+ PRINT_ERROR("server epoll wait error %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ for (int32_t i = 0; i < epoll_nfds; ++i) {
+ struct epoll_event *curr_epev = server_unit->epevs + i;
+
+ if (curr_epev->events == EPOLLERR || curr_epev->events == EPOLLHUP || curr_epev->events == EPOLLRDHUP) {
+ PRINT_ERROR("server epoll wait error %d! ", curr_epev->events);
+ return PROGRAM_FAULT;
+ }
+
+ if (curr_epev->events == EPOLLIN) {
+ if (curr_epev->data.ptr == (void *)&(server_unit->listener)) {
+ int32_t sersum_accept_connects_ret = sersum_accept_connects(server_unit, &(server_unit->listener));
+ if (sersum_accept_connects_ret < 0) {
+ PRINT_ERROR("server try accept error %d! ", sersum_accept_connects_ret);
+ return PROGRAM_FAULT;
+ }
+ continue;
+ } else {
+ struct ServerHandler *server_handler = (struct ServerHandler *)curr_epev->data.ptr;
+ struct sockaddr_in connect_addr;
+ socklen_t connect_addr_len = sizeof(connect_addr);
+ if (getpeername(server_handler->fd, (struct sockaddr *)&connect_addr, &connect_addr_len) < 0) {
+ PRINT_ERROR("server can't socket peername %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ int32_t server_ans_ret = server_ans(server_handler, server_unit->pktlen, server_unit->api);
+ if (server_ans_ret == PROGRAM_FAULT) {
+ --server_unit->curr_connect;
+ struct epoll_event ep_ev;
+ if (epoll_ctl(server_unit->epfd, EPOLL_CTL_DEL, server_handler->fd, &ep_ev) < 0) {
+ PRINT_ERROR("server can't delete socket '%d' to control epoll %d! ", server_handler->fd, errno);
+ return PROGRAM_FAULT;
+ }
+ } else if (server_ans_ret == PROGRAM_ABORT) {
+ --server_unit->curr_connect;
+ if (close(server_handler->fd) < 0) {
+ PRINT_ERROR("server can't close the socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ server_debug_print("server mum unit", "close", connect_addr.sin_addr.s_addr, connect_addr.sin_port, server_unit->debug);
+ } else {
+ server_unit->recv_bytes += server_unit->pktlen;
+ server_debug_print("server mum unit", "receive", connect_addr.sin_addr.s_addr, connect_addr.sin_port, server_unit->debug);
+ }
+ }
+ }
+ }
+
+ return PROGRAM_OK;
+}
+
+// create the single thread, unblock, mutliplexing IO server
+void *sersum_create_and_run(void *arg)
+{
+ struct ServerMumUnit *server_unit = (struct ServerMumUnit *)arg;
+
+ if (create_socket_and_listen(&(server_unit->listener.fd), server_unit->ip, server_unit->port, server_unit->domain) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+ if (sersum_create_epfd_and_reg(server_unit) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+ while (true) {
+ if (sersum_proc_epevs(server_unit) < 0) {
+ exit(PROGRAM_FAULT);
+ }
+ }
+
+ close(server_unit->listener.fd);
+ close(server_unit->epfd);
+
+ return (void *)PROGRAM_OK;
+}
+
+// create the multi thread, unblock, mutliplexing IO server
+int32_t sermum_create_and_run(struct ProgramParams *params)
+{
+ const uint32_t thread_num = params->thread_num;
+ pthread_t *tids = (pthread_t *)malloc(thread_num * sizeof(pthread_t));
+ struct ServerMum *server_mum = (struct ServerMum *)malloc(sizeof(struct ServerMum));
+ struct ServerMumUnit *server_unit = (struct ServerMumUnit *)malloc(sizeof(struct ServerMumUnit));
+
+ if (pthread_mutex_init(&server_debug_mutex, NULL) < 0) {
+ PRINT_ERROR("server can't init posix mutex %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ server_mum->uints = server_unit;
+ server_mum->debug = params->debug;
+
+ for (uint32_t i = 0; i < thread_num; ++i) {
+ server_unit->listener.fd = -1;
+ server_unit->epfd = -1;
+ server_unit->epevs = (struct epoll_event *)malloc(SERVER_EPOLL_SIZE_MAX * sizeof(struct epoll_event));
+ server_unit->curr_connect = 0;
+ server_unit->recv_bytes = 0;
+ server_unit->ip = inet_addr(params->ip);
+ server_unit->port = htons(params->port);
+ server_unit->pktlen = params->pktlen;
+ server_unit->domain = params->domain;
+ server_unit->api = params->api;
+ server_unit->debug = params->debug;
+ server_unit->next = (struct ServerMumUnit *)malloc(sizeof(struct ServerMumUnit));
+
+ if (pthread_create((tids + i), NULL, sersum_create_and_run, server_unit) < 0) {
+ PRINT_ERROR("server can't create poisx thread %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ server_unit = server_unit->next;
+ }
+
+ if (server_mum->debug == false) {
+ printf("[program informations]: \n\n");
+ }
+ while (true) {
+ sermum_info_print(server_mum);
+ }
+
+ pthread_mutex_destroy(&server_debug_mutex);
+
+ return PROGRAM_OK;
+}
+
+// create server and run
+int32_t server_create_and_run(struct ProgramParams *params)
+{
+ int32_t ret = PROGRAM_OK;
+
+ if (strcmp(params->model, "mum") == 0) {
+ ret = sermum_create_and_run(params);
+ } else {
+ ret = sermud_create_and_run(params);
+ }
+
+ return ret;
+}
diff --git a/examples/src/utilities.c b/examples/src/utilities.c
new file mode 100644
index 0000000..b6ed269
--- /dev/null
+++ b/examples/src/utilities.c
@@ -0,0 +1,128 @@
+/*
+* Copyright (c) 2022-2023. yyangoO.
+* gazelle is licensed under the Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+* http://license.coscl.org.cn/MulanPSL2
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+* PURPOSE.
+* See the Mulan PSL v2 for more details.
+*/
+
+
+#include "utilities.h"
+
+
+// create the socket and listen
+int32_t create_socket_and_listen(int32_t *socket_fd, in_addr_t ip, uint16_t port, const char *domain)
+{
+ if (strcmp(domain, "posix") == 0) {
+ *socket_fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (*socket_fd < 0) {
+ PRINT_ERROR("can't create socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ } else {
+ *socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (*socket_fd < 0) {
+ PRINT_ERROR("can't create socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ }
+
+ int32_t port_multi = 1;
+ if (setsockopt(*socket_fd, SOL_SOCKET, SO_REUSEPORT, (void *)&port_multi, sizeof(int32_t)) < 0) {
+ PRINT_ERROR("can't set the option of socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ if (set_socket_unblock(*socket_fd) < 0) {
+ PRINT_ERROR("can't set the socket to unblock! ");
+ return PROGRAM_FAULT;
+ }
+
+ if (strcmp(domain, "posix") == 0) {
+ struct sockaddr_in socket_addr;
+ memset_s(&socket_addr, sizeof(socket_addr), 0, sizeof(socket_addr));
+ socket_addr.sin_family = AF_INET;
+ socket_addr.sin_addr.s_addr = ip;
+ socket_addr.sin_port = port;
+ if (bind(*socket_fd, (struct sockaddr *)&socket_addr, sizeof(struct sockaddr_in)) < 0) {
+ PRINT_ERROR("can't bind the address to socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ if (listen(*socket_fd, SERVER_SOCKET_LISTEN_BACKLOG) < 0) {
+ PRINT_ERROR("server socket can't lisiten %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ } else {
+ struct sockaddr_un socket_addr;
+ unlink(SOCKET_UNIX_DOMAIN_FILE);
+ socket_addr.sun_family = AF_UNIX;
+ strcpy_s(socket_addr.sun_path, sizeof(socket_addr.sun_path), SOCKET_UNIX_DOMAIN_FILE);
+ if (bind(*socket_fd, (struct sockaddr *)&socket_addr, sizeof(struct sockaddr_un)) < 0) {
+ PRINT_ERROR("can't bind the address to socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ if (listen(*socket_fd, SERVER_SOCKET_LISTEN_BACKLOG) < 0) {
+ PRINT_ERROR("server socket can't lisiten %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ }
+
+ return PROGRAM_OK;
+}
+
+// create the socket and connect
+int32_t create_socket_and_connect(int32_t *socket_fd, in_addr_t ip, uint16_t port, const char *domain)
+{
+ if (strcmp(domain, "posix") == 0) {
+ *socket_fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (*socket_fd < 0) {
+ PRINT_ERROR("client can't create socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ struct sockaddr_in server_addr;
+ memset_s(&server_addr, sizeof(server_addr), 0, sizeof(server_addr));
+ server_addr.sin_family = AF_INET;
+ server_addr.sin_addr.s_addr = ip;
+ server_addr.sin_port = port;
+ if (connect(*socket_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_in)) < 0) {
+ if (errno == EINPROGRESS) {
+ return PROGRAM_INPROGRESS;
+ } else {
+ PRINT_ERROR("client can't connect to the server %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ }
+ } else {
+ *socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (*socket_fd < 0) {
+ PRINT_ERROR("client can't create socket %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+
+ struct sockaddr_un server_addr;
+ server_addr.sun_family = AF_UNIX;
+ strcpy_s(server_addr.sun_path, sizeof(server_addr.sun_path), SOCKET_UNIX_DOMAIN_FILE);
+ if (connect(*socket_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_un)) < 0) {
+ if (errno == EINPROGRESS) {
+ return PROGRAM_INPROGRESS;
+ } else {
+ PRINT_ERROR("client can't connect to the server %d! ", errno);
+ return PROGRAM_FAULT;
+ }
+ }
+ }
+ return PROGRAM_OK;
+}
+
+// set the socket to unblock
+int32_t set_socket_unblock(int32_t socket_fd)
+{
+ return fcntl(socket_fd, F_SETFL, fcntl(socket_fd, F_GETFD, 0) | O_NONBLOCK);
+}
--
2.23.0
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。