代码拉取完成,页面将自动刷新
同步操作将从 src-openEuler/gazelle 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
From aa422a75961523de411ec849fd1f4e45da4477ac Mon Sep 17 00:00:00 2001
From: wuchangsheng <[email protected]>
Date: Mon, 14 Mar 2022 20:32:39 +0800
Subject: [PATCH 25/34] fix event miss
---
src/common/gazelle_dfx_msg.h | 1 +
src/lstack/api/lstack_epoll.c | 6 ++--
src/lstack/core/lstack_lwip.c | 54 ++++++++++++++++++++++++++----
src/lstack/core/lstack_protocol_stack.c | 31 +++++++++++++++++
src/lstack/core/lstack_stack_stat.c | 27 +++++++++++----
src/lstack/core/lstack_thread_rpc.c | 38 ++++++---------------
src/lstack/include/lstack_lwip.h | 2 ++
src/lstack/include/lstack_protocol_stack.h | 1 +
src/lstack/include/lstack_thread_rpc.h | 1 +
src/lstack/include/lstack_weakup.h | 2 +-
src/ltran/ltran_dfx.c | 3 +-
11 files changed, 120 insertions(+), 46 deletions(-)
diff --git a/src/common/gazelle_dfx_msg.h b/src/common/gazelle_dfx_msg.h
index ae20436..de669f5 100644
--- a/src/common/gazelle_dfx_msg.h
+++ b/src/common/gazelle_dfx_msg.h
@@ -95,6 +95,7 @@ struct gazelle_stat_pkts {
uint64_t epoll_pending_call;
uint64_t epoll_self_call;
uint64_t epoll_self_event;
+ uint64_t send_list;
};
/* same as define in lwip/stats.h - struct stats_mib2 */
diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c
index a686ddb..cf072b0 100644
--- a/src/lstack/api/lstack_epoll.c
+++ b/src/lstack/api/lstack_epoll.c
@@ -61,7 +61,7 @@ static inline bool report_events(struct lwip_sock *sock, uint32_t event)
return true;
}
- if (sock->have_event) {
+ if (__atomic_load_n(&sock->have_event, __ATOMIC_ACQUIRE)) {
return false;
}
@@ -92,7 +92,7 @@ void add_epoll_event(struct netconn *conn, uint32_t event)
list_add_node(&sock->stack->event_list, &sock->event_list);
}
} else {
- sock->have_event = true;
+ __atomic_store_n(&sock->have_event, true, __ATOMIC_RELEASE);
sock->stack->stats.weakup_events++;
}
}
@@ -302,7 +302,7 @@ static int32_t get_lwip_events(struct weakup_poll *weakup, void *out, uint32_t m
if (sock->stack == NULL) {
return true;
}
- sock->have_event = false;
+ __atomic_store_n(&sock->have_event, false, __ATOMIC_RELEASE);
if (remove_event(etype, weakup->sock_list, event_num, sock)) {
sock->stack->stats.remove_event++;
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index d55f1e6..d35a217 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -112,6 +112,7 @@ void gazelle_init_sock(int32_t fd)
init_list_node(&sock->listen_list);
init_list_node(&sock->event_list);
init_list_node(&sock->wakeup_list);
+ init_list_node(&sock->send_list);
}
void gazelle_clean_sock(int32_t fd)
@@ -130,6 +131,7 @@ void gazelle_clean_sock(int32_t fd)
list_del_node_init(&sock->listen_list);
list_del_node_init(&sock->event_list);
list_del_node_init(&sock->wakeup_list);
+ list_del_node_init(&sock->send_list);
}
void gazelle_free_pbuf(struct pbuf *pbuf)
@@ -280,12 +282,12 @@ void add_self_event(struct lwip_sock *sock, uint32_t events)
sock->events |= events;
- if (sock->have_event) {
+ if (__atomic_load_n(&sock->have_event, __ATOMIC_ACQUIRE)) {
return;
}
if (rte_ring_mp_enqueue(wakeup->self_ring, (void *)sock) == 0) {
- sock->have_event = true;
+ __atomic_store_n(&sock->have_event, true, __ATOMIC_RELEASE);
sem_post(&sock->weakup->event_sem);
stack->stats.epoll_self_event++;
} else {
@@ -346,6 +348,34 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len)
return send_len;
}
+void stack_send(struct rpc_msg *msg)
+{
+ int32_t fd = msg->args[MSG_ARG_0].i;
+ int32_t flags = msg->args[MSG_ARG_2].i;
+
+ struct protocol_stack *stack = get_protocol_stack();
+ struct lwip_sock *sock = get_socket(fd);
+ if (sock == NULL) {
+ msg->result = -1;
+ return;
+ }
+
+ msg->result = write_lwip_data(sock, fd, flags);
+ __atomic_store_n(&sock->have_rpc_send, false, __ATOMIC_RELEASE);
+
+ if (msg->result >= 0 && rte_ring_count(sock->send_ring)) {
+ if (list_is_empty(&sock->send_list)) {
+ __atomic_store_n(&sock->have_rpc_send, true, __ATOMIC_RELEASE);
+ list_add_node(&stack->send_list, &sock->send_list);
+ sock->stack->stats.send_self_rpc++;
+ }
+ }
+
+ if (rte_ring_free_count(sock->send_ring)) {
+ add_epoll_event(sock->conn, EPOLLOUT);
+ }
+}
+
ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags)
{
if (sock->conn->recvmbox == NULL) {
@@ -448,14 +478,19 @@ ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags)
GAZELLE_RETURN(EINVAL);
}
+ sock->send_flags = flags;
ssize_t send = write_stack_data(sock, buf, len);
- if (send < 0 || sock->have_rpc_send) {
- return send;
+
+ ssize_t ret = 0;
+ if (!__atomic_load_n(&sock->have_rpc_send, __ATOMIC_ACQUIRE)) {
+ __atomic_store_n(&sock->have_rpc_send, true, __ATOMIC_RELEASE);
+ ret = rpc_call_send(fd, buf, len, flags);
}
- sock->have_rpc_send = true;
- ssize_t ret = rpc_call_send(fd, buf, len, flags);
- return (ret < 0) ? ret : send;
+ if (send <= 0 || ret < 0) {
+ GAZELLE_RETURN(EAGAIN);
+ }
+ return send;
}
ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags)
@@ -743,6 +778,11 @@ void stack_eventlist_count(struct rpc_msg *msg)
msg->result = get_list_count(&get_protocol_stack()->event_list);
}
+void stack_sendlist_count(struct rpc_msg *msg)
+{
+ msg->result = get_list_count(&get_protocol_stack()->send_list);
+}
+
void stack_recvlist_count(struct rpc_msg *msg)
{
msg->result = get_list_count(&get_protocol_stack()->recv_list);
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index 45649fe..4ba851a 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -194,6 +194,7 @@ int32_t init_protocol_stack(void)
init_list_node(&stack->recv_list);
init_list_node(&stack->listen_list);
init_list_node(&stack->event_list);
+ init_list_node(&stack->send_list);
stack_group->stacks[i] = stack;
}
@@ -324,6 +325,7 @@ static void report_stack_event(struct protocol_stack *stack)
sock = container_of(node, struct lwip_sock, event_list);
if (weakup_enqueue(stack->weakup_ring, sock) == 0) {
+ __atomic_store_n(&sock->have_event, true, __ATOMIC_RELEASE);
list_del_node_init(&sock->event_list);
stack->stats.weakup_events++;
} else {
@@ -332,6 +334,33 @@ static void report_stack_event(struct protocol_stack *stack)
}
}
+static void send_stack_list(struct protocol_stack *stack)
+{
+ struct list_node *list = &(stack->send_list);
+ struct list_node *node, *temp;
+ struct lwip_sock *sock;
+
+ list_for_each_safe(node, temp, list) {
+ sock = container_of(node, struct lwip_sock, send_list);
+
+ if (sock->conn == NULL) {
+ continue;
+ }
+
+ ssize_t ret = write_lwip_data(sock, sock->conn->socket, sock->send_flags);
+ __atomic_store_n(&sock->have_rpc_send, false, __ATOMIC_RELEASE);
+ if (ret >= 0 && rte_ring_count(sock->send_ring)) {
+ __atomic_store_n(&sock->have_rpc_send, true, __ATOMIC_RELEASE);
+ } else {
+ list_del_node_init(&sock->send_list);
+ }
+
+ if (rte_ring_free_count(sock->send_ring)) {
+ add_epoll_event(sock->conn, EPOLLOUT);
+ }
+ }
+}
+
static void* gazelle_stack_thread(void *arg)
{
struct protocol_stack *stack = (struct protocol_stack *)arg;
@@ -348,6 +377,8 @@ static void* gazelle_stack_thread(void *arg)
sys_timer_run();
report_stack_event(stack);
+
+ send_stack_list(stack);
}
return NULL;
diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c
index b7b94e2..9a8fd08 100644
--- a/src/lstack/core/lstack_stack_stat.c
+++ b/src/lstack/core/lstack_stack_stat.c
@@ -107,11 +107,22 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_
dfx->data.pkts.call_alloc_fail = stack_group->call_alloc_fail;
dfx->data.pkts.weakup_ring_cnt = rte_ring_count(stack->weakup_ring);
dfx->data.pkts.send_idle_ring_cnt = rte_ring_count(stack->send_idle_ring);
- dfx->data.pkts.call_msg_cnt = rpc_call_msgcnt(stack);
- dfx->data.pkts.recv_list = rpc_call_recvlistcnt(stack);
- dfx->data.pkts.event_list = rpc_call_eventlistcnt(stack);
+
+ int32_t rpc_call_result = rpc_call_msgcnt(stack);
+ dfx->data.pkts.call_msg_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result;
+
+ rpc_call_result = rpc_call_recvlistcnt(stack);
+ dfx->data.pkts.recv_list = (rpc_call_result < 0) ? 0 : rpc_call_result;
+
+ rpc_call_result = rpc_call_eventlistcnt(stack);
+ dfx->data.pkts.event_list = (rpc_call_result < 0) ? 0 : rpc_call_result;
+
+ rpc_call_result = rpc_call_sendlistcnt(stack);
+ dfx->data.pkts.send_list = (rpc_call_result < 0) ? 0 : rpc_call_result;
+
if (stack->wakeup_list) {
- dfx->data.pkts.wakeup_list = rpc_call_eventlistcnt(stack);
+ rpc_call_result = rpc_call_eventlistcnt(stack);
+ dfx->data.pkts.wakeup_list = (rpc_call_result < 0) ? 0 : rpc_call_result;
}
dfx->data.pkts.conn_num = stack->conn_num;
}
@@ -119,6 +130,8 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_
static void get_stack_dfx_data(struct gazelle_stack_dfx_data *dfx, struct protocol_stack *stack,
enum GAZELLE_STAT_MODE stat_mode)
{
+ int32_t rpc_call_result;
+
switch (stat_mode) {
case GAZELLE_STAT_LSTACK_SHOW:
case GAZELLE_STAT_LSTACK_SHOW_RATE:
@@ -129,8 +142,10 @@ static void get_stack_dfx_data(struct gazelle_stack_dfx_data *dfx, struct protoc
sizeof(stack->lwip_stats->mib2));
break;
case GAZELLE_STAT_LSTACK_SHOW_CONN:
- dfx->data.conn.conn_num = rpc_call_conntable(stack, dfx->data.conn.conn_list, GAZELLE_LSTACK_MAX_CONN);
- dfx->data.conn.total_conn_num = rpc_call_connnum(stack);
+ rpc_call_result = rpc_call_conntable(stack, dfx->data.conn.conn_list, GAZELLE_LSTACK_MAX_CONN);
+ dfx->data.conn.conn_num = (rpc_call_result < 0) ? 0 : rpc_call_result;
+ rpc_call_result = rpc_call_connnum(stack);
+ dfx->data.conn.total_conn_num = (rpc_call_result < 0) ? 0 : rpc_call_result;
break;
case GAZELLE_STAT_LSTACK_SHOW_LATENCY:
memcpy_s(&dfx->data.latency, sizeof(dfx->data.latency), &stack->latency, sizeof(stack->latency));
diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c
index 2fb24b4..c95f2c0 100644
--- a/src/lstack/core/lstack_thread_rpc.c
+++ b/src/lstack/core/lstack_thread_rpc.c
@@ -197,6 +197,16 @@ int32_t rpc_call_eventlistcnt(struct protocol_stack *stack)
return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
}
+int32_t rpc_call_sendlistcnt(struct protocol_stack *stack)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(stack, stack_sendlist_count);
+ if (msg == NULL) {
+ return -1;
+ }
+
+ return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+}
+
int32_t rpc_call_recvlistcnt(struct protocol_stack *stack)
{
struct rpc_msg *msg = rpc_msg_alloc(stack, stack_recvlist_count);
@@ -442,34 +452,6 @@ int32_t rpc_call_ioctl(int fd, long cmd, void *argp)
return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
}
-static void stack_send(struct rpc_msg *msg)
-{
- int32_t fd = msg->args[MSG_ARG_0].i;
- int32_t flags = msg->args[MSG_ARG_2].i;
-
- struct protocol_stack *stack = get_protocol_stack();
- struct lwip_sock *sock = get_socket(fd);
- if (sock == NULL) {
- msg->result = -1;
- msg->self_release = 0;
- return;
- }
-
- msg->result = write_lwip_data(sock, fd, flags);
- sock->have_rpc_send = false;
-
- if (msg->result >= 0 && rte_ring_count(sock->send_ring)) {
- sock->have_rpc_send = true;
- sock->stack->stats.send_self_rpc++;
- msg->self_release = 1;
- rpc_call(&stack->rpc_queue, msg);
- }
-
- if (rte_ring_free_count(sock->send_ring)) {
- add_epoll_event(sock->conn, EPOLLOUT);
- }
-}
-
ssize_t rpc_call_send(int fd, const void *buf, size_t len, int flags)
{
struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h
index 87442cd..cfd454d 100644
--- a/src/lstack/include/lstack_lwip.h
+++ b/src/lstack/include/lstack_lwip.h
@@ -33,11 +33,13 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags);
ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags);
void read_recv_list(void);
void add_recv_list(int32_t fd);
+void stack_sendlist_count(struct rpc_msg *msg);
void stack_eventlist_count(struct rpc_msg *msg);
void stack_wakeuplist_count(struct rpc_msg *msg);
void get_lwip_conntable(struct rpc_msg *msg);
void get_lwip_connnum(struct rpc_msg *msg);
void stack_recvlist_count(struct rpc_msg *msg);
+void stack_send(struct rpc_msg *msg);
void stack_replenish_send_idlembuf(struct protocol_stack *stack);
int32_t gazelle_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs, uint32_t num);
void gazelle_free_pbuf(struct pbuf *pbuf);
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index dd7633b..5b95dc9 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -52,6 +52,7 @@ struct protocol_stack {
struct list_node recv_list;
struct list_node listen_list;
struct list_node event_list;
+ struct list_node send_list;
struct list_node *wakeup_list;
struct gazelle_stat_pkts stats;
diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h
index cffb273..76ba36a 100644
--- a/src/lstack/include/lstack_thread_rpc.h
+++ b/src/lstack/include/lstack_thread_rpc.h
@@ -55,6 +55,7 @@ int32_t rpc_call_msgcnt(struct protocol_stack *stack);
int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen);
int32_t rpc_call_recvlistcnt(struct protocol_stack *stack);
int32_t rpc_call_eventlistcnt(struct protocol_stack *stack);
+int32_t rpc_call_sendlistcnt(struct protocol_stack *stack);
int32_t rpc_call_wakeuplistcnt(struct protocol_stack *stack);
int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn);
int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn);
diff --git a/src/lstack/include/lstack_weakup.h b/src/lstack/include/lstack_weakup.h
index 8f7fca2..4f6321e 100644
--- a/src/lstack/include/lstack_weakup.h
+++ b/src/lstack/include/lstack_weakup.h
@@ -16,7 +16,7 @@
#include <rte_ring.h>
#include "lstack_dpdk.h"
-#define EPOLL_MAX_EVENTS 256
+#define EPOLL_MAX_EVENTS 512
struct weakup_poll {
sem_t event_sem;
diff --git a/src/ltran/ltran_dfx.c b/src/ltran/ltran_dfx.c
index 66d6053..a575c33 100644
--- a/src/ltran/ltran_dfx.c
+++ b/src/ltran/ltran_dfx.c
@@ -582,6 +582,7 @@ static void show_lstack_stats(struct gazelle_stack_dfx_data *lstack_stat)
printf("call_msg: %-19"PRIu64" ", lstack_stat->data.pkts.call_msg_cnt);
printf("call_alloc_fail: %-12"PRIu64" ", lstack_stat->data.pkts.call_alloc_fail);
printf("call_null: %-18"PRIu64" \n", lstack_stat->data.pkts.call_null);
+ printf("send_list: %-18"PRIu64" \n", lstack_stat->data.pkts.send_list);
}
static void gazelle_print_lstack_stat_detail(struct gazelle_stack_dfx_data *lstack_stat,
@@ -884,7 +885,7 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_
do {
printf("\n------ stack tid: %6u ------\n", stat->tid);
printf("No. Proto recv_cnt recv_ring in_send send_ring event self_event Local Address"
- " Foreign Address State\n");
+ " Foreign Address State\n");
uint32_t unread_pkts = 0;
uint32_t unsend_pkts = 0;
for (i = 0; i < conn->conn_num && i < GAZELLE_LSTACK_MAX_CONN; i++) {
--
1.8.3.1
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。