代码拉取完成,页面将自动刷新
同步操作将从 src-openEuler/gazelle 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
From b87de3f10de1839e8dccc64ba01b3d4b7bd114c3 Mon Sep 17 00:00:00 2001
From: wu-changsheng <[email protected]>
Date: Sat, 8 Oct 2022 19:50:42 +0800
Subject: [PATCH 18/21] merge sendmsg write
---
src/lstack/core/lstack_lwip.c | 92 ++++++++++++++++------
src/lstack/core/lstack_protocol_stack.c | 16 ----
src/lstack/core/lstack_thread_rpc.c | 32 +-------
src/lstack/include/lstack_protocol_stack.h | 2 -
src/lstack/include/lstack_thread_rpc.h | 2 -
5 files changed, 68 insertions(+), 76 deletions(-)
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index bb5a7e5..f4128d7 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -259,6 +259,22 @@ static inline void del_data_out_event(struct lwip_sock *sock)
pthread_spin_unlock(&sock->wakeup->event_list_lock);
}
+void write_stack_over(struct lwip_sock *sock)
+{
+ if (sock->send_lastdata) {
+ sock->send_lastdata->tot_len = sock->send_lastdata->len = sock->send_datalen;
+ sock->send_lastdata = NULL;
+ }
+
+ gazelle_ring_read_over(sock->send_ring);
+
+ if (sock->wakeup) {
+ if (sock->wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLOUT)) {
+ del_data_out_event(sock);
+ }
+ }
+}
+
ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len)
{
if (sock->errevent > 0) {
@@ -272,31 +288,37 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len)
struct pbuf *pbuf = NULL;
ssize_t send_len = 0;
- size_t copy_len;
uint32_t send_pkt = 0;
while (send_len < len && send_pkt < free_count) {
- if (gazelle_ring_read(sock->send_ring, (void **)&pbuf, 1) != 1) {
- if (sock->wakeup) {
- sock->wakeup->stat.app_write_idlefail++;
+ if (sock->send_lastdata) {
+ pbuf = sock->send_lastdata;
+ } else {
+ if (gazelle_ring_read(sock->send_ring, (void **)&pbuf, 1) != 1) {
+ if (sock->wakeup) {
+ sock->wakeup->stat.app_write_idlefail++;
+ }
+ break;
}
- break;
+ sock->send_lastdata = pbuf;
+ sock->send_datalen = 0;
}
- copy_len = (len - send_len > pbuf->len) ? pbuf->len : (len - send_len);
- pbuf_take(pbuf, (char *)buf + send_len, copy_len);
- pbuf->tot_len = pbuf->len = copy_len;
+ uint16_t remian_len = pbuf->len - sock->send_datalen;
+ uint16_t copy_len = (len - send_len > remian_len) ? remian_len : (len - send_len);
+ pbuf_take_at(pbuf, (char *)buf + send_len, copy_len, sock->send_datalen);
+ sock->send_datalen += copy_len;
+ if (sock->send_datalen >= pbuf->len) {
+ sock->send_lastdata = NULL;
+ pbuf->tot_len = pbuf->len = sock->send_datalen;
+ send_pkt++;
+ }
send_len += copy_len;
- send_pkt++;
}
- gazelle_ring_read_over(sock->send_ring);
if (sock->wakeup) {
sock->wakeup->stat.app_write_cnt += send_pkt;
- if (sock->wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLOUT)) {
- del_data_out_event(sock);
- }
}
return send_len;
@@ -500,6 +522,16 @@ ssize_t recvmsg_from_stack(int32_t s, struct msghdr *message, int32_t flags)
return buflen;
}
+static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags)
+{
+ if (__atomic_load_n(&sock->in_send, __ATOMIC_ACQUIRE) == 0) {
+ __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE);
+ if (rpc_call_send(fd, NULL, len, flags) != 0) {
+ __atomic_store_n(&sock->in_send, 0, __ATOMIC_RELEASE);
+ }
+ }
+}
+
ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags)
{
if (buf == NULL) {
@@ -516,18 +548,12 @@ ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags)
}
ssize_t send = write_stack_data(sock, buf, len);
- if (send < 0) {
- GAZELLE_RETURN(EAGAIN);
- } else if (send == 0) {
- return 0;
+ if (send <= 0) {
+ return send;
}
+ write_stack_over(sock);
- if (__atomic_load_n(&sock->in_send, __ATOMIC_ACQUIRE) == 0) {
- __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE);
- if (rpc_call_send(fd, NULL, send, flags) != 0) {
- __atomic_store_n(&sock->in_send, 0, __ATOMIC_RELEASE);
- }
- }
+ notice_stack_send(sock, fd, send, flags);
return send;
}
@@ -537,23 +563,37 @@ ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags)
int32_t i;
ssize_t buflen = 0;
+ struct lwip_sock *sock = get_socket(s);
+ if (sock == NULL) {
+ GAZELLE_RETURN(EINVAL);
+ }
+
if (check_msg_vaild(message)) {
GAZELLE_RETURN(EINVAL);
}
for (i = 0; i < message->msg_iovlen; i++) {
- ret = gazelle_send(s, message->msg_iov[i].iov_base, message->msg_iov[i].iov_len, flags);
+ if (message->msg_iov[i].iov_len == 0) {
+ continue;
+ }
+
+ ret = write_stack_data(sock, message->msg_iov[i].iov_base, message->msg_iov[i].iov_len);
if (ret < 0) {
- return buflen == 0 ? ret : buflen;
+ buflen = (buflen == 0) ? ret : buflen;
+ break;
}
buflen += ret;
if (ret < message->msg_iov[i].iov_len) {
- return buflen;
+ break;
}
}
+ if (buflen > 0) {
+ write_stack_over(sock);
+ notice_stack_send(sock, s, buflen, flags);
+ }
return buflen;
}
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index 6119975..fbeca62 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -608,22 +608,6 @@ void stack_recv(struct rpc_msg *msg)
msg->args[MSG_ARG_3].i);
}
-void stack_sendmsg(struct rpc_msg *msg)
-{
- msg->result = lwip_sendmsg(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].cp, msg->args[MSG_ARG_2].i);
- if (msg->result != 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
- }
-}
-
-void stack_recvmsg(struct rpc_msg *msg)
-{
- msg->result = lwip_recvmsg(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].i);
- if (msg->result != 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
- }
-}
-
/* any protocol stack thread receives arp packet and sync it to other threads so that it can have the arp table */
void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack)
{
diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c
index d1f7580..ad967e9 100644
--- a/src/lstack/core/lstack_thread_rpc.c
+++ b/src/lstack/core/lstack_thread_rpc.c
@@ -236,7 +236,8 @@ int32_t rpc_call_arp(struct protocol_stack *stack, struct rte_mbuf *mbuf)
msg->self_release = 0;
msg->args[MSG_ARG_0].p = mbuf;
- lockless_queue_mpsc_push(&stack->rpc_queue, &msg->queue_node);
+
+ rpc_call(&stack->rpc_queue, msg);
return 0;
}
@@ -446,32 +447,3 @@ int32_t rpc_call_send(int fd, const void *buf, size_t len, int flags)
return 0;
}
-int32_t rpc_call_sendmsg(int fd, const struct msghdr *msghdr, int flags)
-{
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_sendmsg);
- if (msg == NULL) {
- return -1;
- }
-
- msg->args[MSG_ARG_0].i = fd;
- msg->args[MSG_ARG_1].cp = msghdr;
- msg->args[MSG_ARG_2].i = flags;
-
- return rpc_sync_call(&stack->rpc_queue, msg);
-}
-
-int32_t rpc_call_recvmsg(int fd, struct msghdr *msghdr, int flags)
-{
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_recvmsg);
- if (msg == NULL) {
- return -1;
- }
-
- msg->args[MSG_ARG_0].i = fd;
- msg->args[MSG_ARG_1].p = msghdr;
- msg->args[MSG_ARG_2].i = flags;
-
- return rpc_sync_call(&stack->rpc_queue, msg);
-}
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index a791357..f58ae56 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -124,8 +124,6 @@ void stack_listen(struct rpc_msg *msg);
void stack_accept(struct rpc_msg *msg);
void stack_connect(struct rpc_msg *msg);
void stack_recv(struct rpc_msg *msg);
-void stack_sendmsg(struct rpc_msg *msg);
-void stack_recvmsg(struct rpc_msg *msg);
void stack_getpeername(struct rpc_msg *msg);
void stack_getsockname(struct rpc_msg *msg);
void stack_getsockopt(struct rpc_msg *msg);
diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h
index e1223de..175c8c9 100644
--- a/src/lstack/include/lstack_thread_rpc.h
+++ b/src/lstack/include/lstack_thread_rpc.h
@@ -67,8 +67,6 @@ int32_t rpc_call_listen(int s, int backlog);
int32_t rpc_call_accept(int fd, struct sockaddr *addr, socklen_t *addrlen);
int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen);
int32_t rpc_call_send(int fd, const void *buf, size_t len, int flags);
-int32_t rpc_call_sendmsg(int fd, const struct msghdr *msg, int flags);
-int32_t rpc_call_recvmsg(int fd, struct msghdr *msg, int flags);
int32_t rpc_call_getpeername(int fd, struct sockaddr *addr, socklen_t *addrlen);
int32_t rpc_call_getsockname(int fd, struct sockaddr *addr, socklen_t *addrlen);
int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, socklen_t *optlen);
--
2.23.0
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。