From af6ce8a2fb670c8d985d2b37e189ab41c648b790 Mon Sep 17 00:00:00 2001 From: bai <1145000687@qq.com> Date: Tue, 19 Mar 2024 15:26:31 +0800 Subject: [PATCH 1/6] =?UTF-8?q?py:=20=E4=BF=AE=E6=94=B9=E8=B7=A8=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E6=93=8D=E4=BD=9C=EF=BC=8C=E4=BF=AE=E6=94=B9=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../socketd/cluster/ClusterClientSession.py | 10 ++-- .../transport/client/ClientHandshakeResult.py | 2 +- .../socketd/transport/client/ClientSession.py | 4 +- .../socketd/transport/core/Listener.py | 8 +-- .../transport/core/listener/PathListener.py | 6 ++- .../socketd/transport/utils/AsyncUtil.py | 30 ++++++++--- .../socketd_websocket/WsAioClientConnector.py | 18 +++---- .../impl/AIOWebSocketClientImpl.py | 50 +++++++++---------- .../impl/AIOWebSocketServerImpl.py | 2 +- python/socketd/test/TestCase01.py | 4 +- .../test/base_test/01_applictionTest.py | 39 +++++++++++---- .../test/cases/TestCase01_client_send.py | 6 +-- .../test/cases/TestCase02_auto_reconnect.py | 5 +- python/socketd/test/cases/TestCase05_file.py | 4 +- python/socketd/test/cases/TestCase13_ssl.py | 1 - python/socketd/test/modelu/FurtureTest.py | 46 ++++++++++++----- 16 files changed, 149 insertions(+), 86 deletions(-) diff --git a/python/socketd/socketd/cluster/ClusterClientSession.py b/python/socketd/socketd/cluster/ClusterClientSession.py index db68cc60..d1033556 100644 --- a/python/socketd/socketd/cluster/ClusterClientSession.py +++ b/python/socketd/socketd/cluster/ClusterClientSession.py @@ -17,8 +17,8 @@ class ClusterClientSession(ClientSession): def get_session_all(self) -> list[ClientSession]: return self.__sessionSet - def get_session_any(self, diversionOrNull:str) -> ClientSession: - session:ClientSession + def get_session_any(self, diversionOrNull: str | None) -> ClientSession: + session: ClientSession if diversionOrNull: session = LoadBalancer.get_any_by_poll(self.__sessionSet) @@ -36,7 +36,7 @@ class ClusterClientSession(ClientSession): return True return False - def is_closing(self) ->bool: + def is_closing(self) -> bool: for session in self.__sessionSet: if session.is_closing(): return True @@ -74,7 +74,7 @@ class ClusterClientSession(ClientSession): except RuntimeError as e: pass - def reconnect(self): + async def reconnect(self): for session in self.__sessionSet: if not session.is_valid(): - session.reconnect() + await session.reconnect() diff --git a/python/socketd/socketd/transport/client/ClientHandshakeResult.py b/python/socketd/socketd/transport/client/ClientHandshakeResult.py index 925c1431..338dbea0 100644 --- a/python/socketd/socketd/transport/client/ClientHandshakeResult.py +++ b/python/socketd/socketd/transport/client/ClientHandshakeResult.py @@ -4,7 +4,7 @@ from socketd.transport.core.ChannelInternal import ChannelInternal class ClientHandshakeResult: - def __init__(self, __channel: ChannelInternal, __throwable: Exception): + def __init__(self, __channel: ChannelInternal, __throwable: Exception | None): self._channel = __channel self._throwable = __throwable diff --git a/python/socketd/socketd/transport/client/ClientSession.py b/python/socketd/socketd/transport/client/ClientSession.py index e9656a54..2e56c15e 100644 --- a/python/socketd/socketd/transport/client/ClientSession.py +++ b/python/socketd/socketd/transport/client/ClientSession.py @@ -37,9 +37,9 @@ class ClientSession: ... @abc.abstractmethod - def close(self): + async def close(self): ... @abc.abstractmethod - def reconnect(self): + async def reconnect(self): ... diff --git a/python/socketd/socketd/transport/core/Listener.py b/python/socketd/socketd/transport/core/Listener.py index b3d11422..c9c0a865 100644 --- a/python/socketd/socketd/transport/core/Listener.py +++ b/python/socketd/socketd/transport/core/Listener.py @@ -6,17 +6,17 @@ from socketd.transport.core.Session import Session class Listener(abc.ABC): @abc.abstractmethod - async def on_open(self, session:Session): + async def on_open(self, session: Session): pass @abc.abstractmethod - async def on_message(self, session:Session, message:Message): + async def on_message(self, session: Session, message: Message): pass @abc.abstractmethod - def on_close(self, session:Session): + def on_close(self, session: Session): pass @abc.abstractmethod - def on_error(self, session:Session, error): + def on_error(self, session: Session, error): pass diff --git a/python/socketd/socketd/transport/core/listener/PathListener.py b/python/socketd/socketd/transport/core/listener/PathListener.py index fc773afc..f5c3cfdd 100644 --- a/python/socketd/socketd/transport/core/listener/PathListener.py +++ b/python/socketd/socketd/transport/core/listener/PathListener.py @@ -1,3 +1,5 @@ +from __future__ import annotations +from typing import Dict from socketd.transport.core.Listener import Listener from socketd.transport.core.Session import Session @@ -7,9 +9,9 @@ from socketd.transport.core.listener.PathMapper import PathMapperDefault class PathListener(Listener): def __init__(self, mapper: PathMapperDefault): - self._mapper: PathMapperDefault = mapper + self._mapper: Dict[str, Listener] | PathMapperDefault = mapper - def of(self, path: str, listener: Listener) -> 'PathListener': + def of(self, path: str, listener: Listener) -> PathListener: self._mapper[path] = listener return self diff --git a/python/socketd/socketd/transport/utils/AsyncUtil.py b/python/socketd/socketd/transport/utils/AsyncUtil.py index 64a55612..482cd479 100644 --- a/python/socketd/socketd/transport/utils/AsyncUtil.py +++ b/python/socketd/socketd/transport/utils/AsyncUtil.py @@ -23,7 +23,17 @@ class AsyncUtil(object): return _loop.run_until_complete(fn) @staticmethod - def run_forever(loop: asyncio.AbstractEventLoop) -> typing.Optional[asyncio.Future]: + def stop(loop: asyncio.AbstractEventLoop): + loop.stop() + + @staticmethod + def run_forever(loop: asyncio.AbstractEventLoop, daemon=False) -> typing.Optional[asyncio.Future]: + """ + 使用一个单独的线程运行事件循环 + :param loop: 事件循环 + :param daemon: 是否为守护线程 + :return: future + """ future = loop.create_future() def _main(_loop: asyncio.AbstractEventLoop, _future: asyncio.Future): @@ -33,9 +43,11 @@ class AsyncUtil(object): watcher = asyncio.get_child_watcher() # 给一个事件循环绑定监视器。 # 如果监视器之前已绑定另一个事件循环,那么在绑定新循环前会先解绑原来的事件循环。 - watcher.attach_loop(loop) + watcher.attach_loop(_loop) + try: - future.add_done_callback(lambda f: _loop.stop()) + future.add_done_callback( + lambda f: AsyncUtil.stop(_loop)) _loop.run_forever() finally: try: @@ -45,12 +57,19 @@ class AsyncUtil(object): _loop.close() t: Thread = Thread(target=_main, args=(loop, future)) - # t.daemon = True + t.daemon = daemon t.start() return future @staticmethod def thread_loop(core: typing.Coroutine, thread=None, pool: Executor = None) -> asyncio.AbstractEventLoop: + """ + 才用线程或者线程池运行异步函数 + :param core: 异步函数 + :param thread: 是否是线程 + :param pool: 线程池 + :return: 新的事件循环 + """ loop = asyncio.new_event_loop() async def _run(): @@ -64,7 +83,6 @@ class AsyncUtil(object): if thread: t = Thread(target=AsyncUtil.thread_handler, args=(loop, loop.create_task(_run()))) t.start() - if pool: + elif pool: pool.submit(lambda x: AsyncUtil.thread_handler(*x), (loop, loop.create_task(_run()))) return loop - diff --git a/python/socketd/socketd_websocket/WsAioClientConnector.py b/python/socketd/socketd_websocket/WsAioClientConnector.py index 85609d94..a6822c8f 100644 --- a/python/socketd/socketd_websocket/WsAioClientConnector.py +++ b/python/socketd/socketd_websocket/WsAioClientConnector.py @@ -8,7 +8,7 @@ from socketd.transport.client.Client import ClientInternal from socketd.transport.client.ClientHandshakeResult import ClientHandshakeResult from socketd.transport.core.Channel import Channel from socketd.transport.core.Costants import Constants -from socketd.transport.core.impl.LogConfig import logger +from socketd.transport.core.impl.LogConfig import log, logger from socketd.transport.client.ClientConnectorBase import ClientConnectorBase from socketd.transport.utils.AsyncUtil import AsyncUtil from socketd_websocket.impl.AIOConnect import AIOConnect @@ -34,7 +34,7 @@ class WsAioClientConnector(ClientConnectorBase): if self.client.get_config().get_ssl_context() is not None: ws_url = ws_url.replace("ws", "wss") self._loop = asyncio.new_event_loop() - self._top = AsyncUtil.run_forever(self._loop) + self._top = AsyncUtil.run_forever(self._loop, daemon=True) try: self.__con: AIOConnect = AIOConnect(ws_url, client=self.client, ssl=self.client.get_config().get_ssl_context(), @@ -67,15 +67,15 @@ class WsAioClientConnector(ClientConnectorBase): return try: await self.__real.close() - self.__real.on_close() + await self.__real.on_close() await self.stop() except Exception as e: - logger.debug(e) + log.debug(e) async def stop(self): if self._top: - _top = self._top - if not _top.done(): - _top.set_result(1) - self._loop.stop() - logger.debug(f"Stopping WebSocket::{self._loop.is_running()}") + if not self._top.done(): + self._top.set_result(1) + if self._loop: + self._loop.stop() + log.debug(f"Stopping WebSocket::{self._loop.is_running()}") diff --git a/python/socketd/socketd_websocket/impl/AIOWebSocketClientImpl.py b/python/socketd/socketd_websocket/impl/AIOWebSocketClientImpl.py index 38282d3d..bf571930 100644 --- a/python/socketd/socketd_websocket/impl/AIOWebSocketClientImpl.py +++ b/python/socketd/socketd_websocket/impl/AIOWebSocketClientImpl.py @@ -1,7 +1,6 @@ import asyncio -import concurrent.futures from asyncio import CancelledError -from typing import Optional, Sequence +from typing import Optional, Sequence, List from loguru import logger from websockets.extensions import ClientExtensionFactory from websockets.uri import WebSocketURI @@ -31,7 +30,8 @@ class AIOWebSocketClientImpl(WebSocketClientProtocol): else: self._loop = asyncio.get_running_loop() self.handshake_future: Optional[CompletableFuture] = None - self._handler_future: Optional[concurrent.futures.Future] = None + self._handler_future: Optional[asyncio.Future] = None + self.__on_receive_tasks: List[asyncio.Task] = [] def get_channel(self): return self.channel @@ -46,32 +46,32 @@ class AIOWebSocketClientImpl(WebSocketClientProtocol): self.handshake_future: Optional[CompletableFuture] = CompletableFuture() return return_data + async def _handler(self): + """ + 异步处理函数,用于处理握手完成后的消息处理逻辑。 + """ + await self.on_open() + while True: + await asyncio.sleep(0) + if self.closed or self.status_state == Flags.Close: + break + try: + await self.on_message() + except Exception as e: + log.warning(e) + break + def connection_open(self) -> None: """ 打开握手完成回调函数。 :return: 无返回值 """ super().connection_open() - - async def _handler(): - """ - 异步处理函数,用于处理握手完成后的消息处理逻辑。 - """ - await self.on_open() - while True: - await asyncio.sleep(0) - if self.closed or self.status_state == Flags.Close: - break - try: - await self.on_message() - except Exception as e: - log.warning(e) - break - # 挂在到self.loop 上 - self._handler_future = asyncio.run_coroutine_threadsafe(_handler(), self.loop) + self._handler_future = self.loop.create_task(self._handler()) @logger.catch async def on_open(self): + """开始建立连接""" try: log.info("Client:Websocket onOpen...") await self.channel.send_connect(self.client.get_config().get_url(), self.client.get_config().get_meta_map()) @@ -90,7 +90,6 @@ class AIOWebSocketClientImpl(WebSocketClientProtocol): if message is None: # 结束握手 return - # frame: Frame = self.client.get_assistant().read(message) frame: Frame = await self.loop.run_in_executor(self.client.get_config().get_exchange_executor(), lambda _message: self.client.get_assistant().read(_message), message) @@ -103,10 +102,9 @@ class AIOWebSocketClientImpl(WebSocketClientProtocol): self.handshake_future.cancel() else: self.handshake_future.accept(ClientHandshakeResult(self.channel, None)) - await self.channel.on_open_future(__future) # 将on_receive 让新的事件循环进行回调,不阻塞当前read循环 - asyncio.run_coroutine_threadsafe(self.client.get_processor().on_receive(self.channel, frame), self.loop) + self.__on_receive_tasks.append(self.loop.create_task(self.client.get_processor().on_receive(self.channel, frame))) if frame.flag() == Flags.Close: """服务端主动关闭""" log.debug("{sessionId} 服务端主动关闭", @@ -123,9 +121,11 @@ class AIOWebSocketClientImpl(WebSocketClientProtocol): except Exception as e: self.on_error(e) - def on_close(self): + async def on_close(self): self.client.get_processor().on_close(self.channel) - self._handler_future.cancel() + if self.handshake_future is not None: + await asyncio.wait(self.__on_receive_tasks, timeout=10) + await asyncio.wait([self._handler_future], timeout=10) def on_error(self, e): self.client.get_processor().on_error(self.channel, e) diff --git a/python/socketd/socketd_websocket/impl/AIOWebSocketServerImpl.py b/python/socketd/socketd_websocket/impl/AIOWebSocketServerImpl.py index 3032953b..76c7e177 100644 --- a/python/socketd/socketd_websocket/impl/AIOWebSocketServerImpl.py +++ b/python/socketd/socketd_websocket/impl/AIOWebSocketServerImpl.py @@ -98,7 +98,7 @@ class AIOWebSocketServerImpl(WebSocketServerProtocol, IWebSocketServer): await self.on_error(conn, e) try: # 等待未完成任务 - await asyncio.gather(*tasks) + await asyncio.wait(tasks, timeout=10) except asyncio.CancelledError as c: pass except TimeoutError as e: diff --git a/python/socketd/test/TestCase01.py b/python/socketd/test/TestCase01.py index acbeac8a..3edbcb30 100644 --- a/python/socketd/test/TestCase01.py +++ b/python/socketd/test/TestCase01.py @@ -23,8 +23,8 @@ from test.cases.TestCase16_openAnTry import TestCase16_openAnTry class TestCase01(unittest.TestCase): schemas = [ - # "std:ws", - "std:tcp" + "std:ws", + # "std:tcp" ] def __init__(self, *args, **kwargs): diff --git a/python/socketd/test/base_test/01_applictionTest.py b/python/socketd/test/base_test/01_applictionTest.py index e3857e68..ea90a8f3 100644 --- a/python/socketd/test/base_test/01_applictionTest.py +++ b/python/socketd/test/base_test/01_applictionTest.py @@ -1,10 +1,12 @@ import asyncio +import sys from websockets.legacy.server import WebSocketServer from socketd import SocketD from socketd.transport.core import Entity from socketd.transport.core.Session import Session +from socketd.transport.core.impl.LogConfig import log from socketd.transport.server.ServerConfig import ServerConfig from socketd.transport.core.entity.StringEntity import StringEntity from socketd.transport.server.Server import Server @@ -12,7 +14,11 @@ from socketd.transport.stream.RequestStream import RequestStream from socketd.transport.stream.SubscribeStream import SubscribeStream from test.modelu.SimpleListenerTest import SimpleListenerTest from test.uitls import calc_async_time -from loguru import logger + +COUNT = 1000 + +log.remove() +log.add(sys.stderr, level="INFO") @calc_async_time @@ -24,18 +30,33 @@ async def application_test(): client_session: Session = await SocketD.create_client("std:ws://127.0.0.1:9999").open() # 单向发送 - await client_session.send("demo", StringEntity("test.png")) + @calc_async_time + async def _send(): + for _ in range(COUNT): + await client_session.send("demo", StringEntity("test")) + + await _send() + # 发送并请求(且,等待一个答复) - req: RequestStream = await client_session.send_and_request("demo", StringEntity("你好"), 100) - entity: Entity = await req.await_result() - print(entity.data_as_string()) + @calc_async_time + async def _send_and_request(): + for _ in range(COUNT): + req: RequestStream = await client_session.send_and_request("demo", StringEntity("你好"), 100) + entity: Entity = await req.await_result() - async def send_and_subscribe_test(_entity: Entity): - logger.debug(f"c::subscribe::{_entity.data_as_string()} {_entity}") + # await _send_and_request() # 发送并订阅(且,接收零个或多个答复流) - req: SubscribeStream = await client_session.send_and_subscribe("demo", StringEntity("hi"), 100) - req.then_reply(send_and_subscribe_test) + @calc_async_time + async def _send_and_subscribe(): + async def send_and_subscribe_test(_entity: Entity): + log.debug(f"c::subscribe::{_entity.data_as_string()} {_entity}") + + for _ in range(COUNT): + req: SubscribeStream = await client_session.send_and_subscribe("demo", StringEntity("hi"), 100) + req.then_reply(send_and_subscribe_test) + + await _send_and_subscribe() await asyncio.sleep(3) # 关闭客户端会话 await client_session.close() diff --git a/python/socketd/test/cases/TestCase01_client_send.py b/python/socketd/test/cases/TestCase01_client_send.py index bb80ad6f..9d7d91c2 100644 --- a/python/socketd/test/cases/TestCase01_client_send.py +++ b/python/socketd/test/cases/TestCase01_client_send.py @@ -20,12 +20,12 @@ class TestCase01_client_send(BaseTestCase): self.server: Server = None self.server_session: WebSocketServer = None self.client_session: Session = None - self.loop = asyncio.get_event_loop() + self.loop = asyncio.new_event_loop() async def _start(self): s = SimpleListenerTest() self.server: Server = SocketD.create_server(ServerConfig(self.schema).port(self.port)) - self.server_session: Server = await self.server.config(config_handler) .listen(s) .start() + self.server_session: Server = await self.server.config(config_handler).listen(s).start() await asyncio.sleep(1) serverUrl = self.schema + "://127.0.0.1:" + str(self.port) + "/path?u=a&p=2" self.client_session: Session = await SocketD.create_client(serverUrl) \ @@ -54,8 +54,8 @@ class TestCase01_client_send(BaseTestCase): def stop(self): super().stop() - self.loop.run_until_complete(self._stop()) + self.loop.stop() def on_error(self): super().on_error() diff --git a/python/socketd/test/cases/TestCase02_auto_reconnect.py b/python/socketd/test/cases/TestCase02_auto_reconnect.py index 91f48045..554110fb 100644 --- a/python/socketd/test/cases/TestCase02_auto_reconnect.py +++ b/python/socketd/test/cases/TestCase02_auto_reconnect.py @@ -20,7 +20,7 @@ class TestCase02_auto_reconnect(BaseTestCase): self.server: Server self.server_session: WebSocketServer self.client_session: Session - self.loop = asyncio.get_event_loop() + self.loop = asyncio.new_event_loop() async def _start(self): self.server: Server = SocketD.create_server(ServerConfig(self.schema).port(self.port)) @@ -31,9 +31,10 @@ class TestCase02_auto_reconnect(BaseTestCase): serverUrl = self.schema + "://127.0.0.1:" + str(self.port) + "/path?u=a&p=2" self.client_session: Session = await SocketD.create_client(serverUrl) \ .config(config_handler).open() - await self.client_session.send_and_request("demo", StringEntity("test"), 100) + await self.client_session.send("demo", StringEntity("test")) await self.server.stop() + # self.client_session.close() del self.server_session await asyncio.sleep(10) self.server_session = await _server.start() diff --git a/python/socketd/test/cases/TestCase05_file.py b/python/socketd/test/cases/TestCase05_file.py index c2034992..48153281 100644 --- a/python/socketd/test/cases/TestCase05_file.py +++ b/python/socketd/test/cases/TestCase05_file.py @@ -75,13 +75,13 @@ class TestCase05_file(BaseTestCase): .config(config_handler).open() await asyncio.sleep(1) try: - with open(r"C:\Users\bai\Pictures\飞书20230728-180708.mp4", + with open(r"C:\Users\11450\Pictures\飞书20230728-180708.mp4", "rb") as f: await self.client_session.send("/path?u=a&p=2", FileEntity(f, "test.png")) except Exception as e: logger.error(e) raise e - await asyncio.sleep(10) + await asyncio.sleep(20) def start(self): super().start() diff --git a/python/socketd/test/cases/TestCase13_ssl.py b/python/socketd/test/cases/TestCase13_ssl.py index eac40a8b..3b67084b 100644 --- a/python/socketd/test/cases/TestCase13_ssl.py +++ b/python/socketd/test/cases/TestCase13_ssl.py @@ -1,6 +1,5 @@ import asyncio import ssl -import uuid from socketd import SocketD from socketd.transport.client.ClientConfig import ClientConfig diff --git a/python/socketd/test/modelu/FurtureTest.py b/python/socketd/test/modelu/FurtureTest.py index bd232509..a68569e7 100644 --- a/python/socketd/test/modelu/FurtureTest.py +++ b/python/socketd/test/modelu/FurtureTest.py @@ -58,17 +58,15 @@ class FutureTest(unittest.TestCase): print(results) def test_loop(self): - lock = asyncio.Lock() + def _run(_loop: asyncio.AbstractEventLoop): asyncio.set_event_loop(_loop) _loop.run_forever() - async def _handle(_loop): for _ in range(10): print("_") _loop.stop() - lock.release() return True loop = asyncio.new_event_loop() @@ -77,8 +75,6 @@ class FutureTest(unittest.TestCase): Future = asyncio.run_coroutine_threadsafe(_handle(loop), loop) for _ in range(10): print(_) - time.sleep(0.1) - # print(Future.result()) def test_loop1(self): logger.info("test_loop1") @@ -169,6 +165,7 @@ class FutureTest(unittest.TestCase): print(" await ...") # 等待两个函数完成 await asyncio.gather(task_A, task_B) + asyncio.run(main()) def test_warp(self): @@ -181,7 +178,6 @@ class FutureTest(unittest.TestCase): asyncio.run(_main(get)) - def test_send(self): async def send(): await asyncio.sleep(2) @@ -197,12 +193,38 @@ class FutureTest(unittest.TestCase): asyncio.run(_main()) def testTask(self): + """ + 多线程混合运行异步函数 + :return: + """ + loop = asyncio.new_event_loop() + stop = AsyncUtil.run_forever(loop, daemon=True) + async def task(): - async def send(): + num = AtomicRefer(0) + + async def send(_num): await asyncio.sleep(1) + logger.info(1) + async with _num as _n: + await _num.set(_n + 1) return 1 - loop = asyncio.get_running_loop() - task = asyncio.run_coroutine_threadsafe(send(), loop) - print(task.result(timeout=3)) - print(2) - asyncio.run(task()) \ No newline at end of file + + tasks = [] + for t in range(1000): + # asyncio.run_coroutine_threadsafe(send(num), loop) + task = loop.create_task(send(num)) + tasks.append(task) + f = asyncio.run_coroutine_threadsafe(asyncio.wait(tasks), loop) + await asyncio.sleep(1) + try: + f.result() + except TimeoutError as e: + f.cancel() + logger.error(e) + loop.stop() + logger.info("num : {}".format(await num.get())) + assert await num.get() == 1000 + stop.set_result(True) + + asyncio.run(task()) -- Gitee From 1a9136eb07f3145e1b793241cea8e0e8fec3ac31 Mon Sep 17 00:00:00 2001 From: bai <1145000687@qq.com> Date: Thu, 21 Mar 2024 18:34:32 +0800 Subject: [PATCH 2/6] =?UTF-8?q?py:=20=E4=BF=AE=E6=94=B9=EF=BC=8C=E4=BC=98?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../socketd/transport/client/ClientChannel.py | 8 +- .../transport/core/ChannelAssistant.py | 10 +-- .../socketd/transport/core/Listener.py | 2 +- .../socketd/transport/core/Processor.py | 2 +- .../transport/core/impl/ProcessorDefault.py | 16 ++-- .../transport/core/listener/PathListener.py | 4 +- .../transport/core/listener/SimpleListener.py | 2 +- .../socketd/transport/server/Server.py | 12 +-- .../socketd/transport/utils/AsyncUtil.py | 12 +++ .../socketd/socketd_aio_tcp/TCPAIOServer.py | 90 ++++++++----------- .../socketd_aio_tcp/TcpAIOChannelAssistant.py | 42 ++++----- .../socketd_aio_tcp/TcpAioClientConnector.py | 88 ++++++++++-------- .../impl/AIOWebSocketClientImpl.py | 4 + .../impl/AIOWebSocketServerImpl.py | 4 + python/socketd/subs_root/__init__.py | 0 python/socketd/subs_root/socketd/Channel.pyi | 88 ------------------ python/socketd/subs_root/socketd/Client.pyi | 18 ---- .../subs_root/socketd/ClientFactory.pyi | 14 --- python/socketd/subs_root/socketd/Server.pyi | 17 ---- .../subs_root/socketd/ServerFactory.pyi | 9 -- python/socketd/subs_root/socketd/__init__.py | 0 python/socketd/test/TestCase01.py | 4 +- .../test/base_test/01_applictionTest.py | 32 ++++--- .../socketd/test/modelu/SimpleListenerTest.py | 4 +- 24 files changed, 184 insertions(+), 298 deletions(-) delete mode 100644 python/socketd/subs_root/__init__.py delete mode 100644 python/socketd/subs_root/socketd/Channel.pyi delete mode 100644 python/socketd/subs_root/socketd/Client.pyi delete mode 100644 python/socketd/subs_root/socketd/ClientFactory.pyi delete mode 100644 python/socketd/subs_root/socketd/Server.pyi delete mode 100644 python/socketd/subs_root/socketd/ServerFactory.pyi delete mode 100644 python/socketd/subs_root/socketd/__init__.py diff --git a/python/socketd/socketd/transport/client/ClientChannel.py b/python/socketd/socketd/transport/client/ClientChannel.py index b8ec6abf..9c5d6857 100644 --- a/python/socketd/socketd/transport/client/ClientChannel.py +++ b/python/socketd/socketd/transport/client/ClientChannel.py @@ -44,7 +44,7 @@ class ClientChannel(ChannelBase): async def __heartbeatScheduled(self) -> None: while True: - await asyncio.sleep(self.__client.get_heartbeat_interval() / 100) + await asyncio.sleep(self.__client.get_heartbeat_interval() / 1000) await self.heartbeat_handle() def init_heartbeat(self): @@ -118,7 +118,7 @@ class ClientChannel(ChannelBase): raise s except Exception as e: if self.__connector.auto_reconnect(): - self.internalCloseIfError() + await self.internalCloseIfError() raise SocketDChannelException(f"Client channel send failed {e}") @@ -173,9 +173,9 @@ class ClientChannel(ChannelBase): finally: self.__isConnecting.set(False) - def internalCloseIfError(self): + async def internalCloseIfError(self): if self.__real: - self.__real.close(Constants.CLOSE2001_ERROR) + await self.__real.close(Constants.CLOSE2001_ERROR) self.__real = None async def internalCheck(self): diff --git a/python/socketd/socketd/transport/core/ChannelAssistant.py b/python/socketd/socketd/transport/core/ChannelAssistant.py index 939d465a..f792faf2 100644 --- a/python/socketd/socketd/transport/core/ChannelAssistant.py +++ b/python/socketd/socketd/transport/core/ChannelAssistant.py @@ -9,23 +9,23 @@ S = typing.TypeVar("S") class ChannelAssistant(typing.Generic[S], ABC): @abstractmethod async def write(self, target: Any, frame: Frame) -> None: - pass + ... @abstractmethod def read(self, target: Any) -> Frame: ... @abstractmethod def is_valid(self, target: Any) -> bool: - pass + ... @abstractmethod async def close(self, target: Any) -> None: - pass + ... @abstractmethod def get_remote_address(self, target: Any) -> str: - pass + ... @abstractmethod def get_local_address(self, target: Any) -> str: - pass + ... diff --git a/python/socketd/socketd/transport/core/Listener.py b/python/socketd/socketd/transport/core/Listener.py index c9c0a865..c94fefa4 100644 --- a/python/socketd/socketd/transport/core/Listener.py +++ b/python/socketd/socketd/transport/core/Listener.py @@ -14,7 +14,7 @@ class Listener(abc.ABC): pass @abc.abstractmethod - def on_close(self, session: Session): + async def on_close(self, session: Session): pass @abc.abstractmethod diff --git a/python/socketd/socketd/transport/core/Processor.py b/python/socketd/socketd/transport/core/Processor.py index 4dcc9971..a6cd93c9 100644 --- a/python/socketd/socketd/transport/core/Processor.py +++ b/python/socketd/socketd/transport/core/Processor.py @@ -25,7 +25,7 @@ class Processor(ABC): pass @abstractmethod - def on_close(self, channel: ChannelInternal): + async def on_close(self, channel: ChannelInternal): pass @abstractmethod diff --git a/python/socketd/socketd/transport/core/impl/ProcessorDefault.py b/python/socketd/socketd/transport/core/impl/ProcessorDefault.py index 8d9961c9..f22a1484 100644 --- a/python/socketd/socketd/transport/core/impl/ProcessorDefault.py +++ b/python/socketd/socketd/transport/core/impl/ProcessorDefault.py @@ -49,7 +49,7 @@ class ProcessorDefault(Processor, ABC): else: # 如果还有效,则关闭通道 await channel.close(Constants.CLOSE1002_PROTOCOL_ILLEGAL) - self.on_close_internal(channel) + await self.on_close_internal(channel) await channel.on_open_future(_future) await self.on_open(channel) @@ -72,7 +72,7 @@ class ProcessorDefault(Processor, ABC): pass elif frame.flag() == Flags.Close: await channel.close(Constants.CLOSE1001_PROTOCOL_CLOSE) - self.on_close(channel) + await self.on_close(channel) elif frame.flag() == Flags.Alarm: e = SocketDAlarmException(frame.message()) stream: Union[StreamInternal, Stream] = channel.get_config().get_stream_manger() \ @@ -87,8 +87,8 @@ class ProcessorDefault(Processor, ABC): elif frame.flag() in [Flags.Reply, Flags.ReplyEnd]: await self.on_receive_do(channel, frame, True) else: - await channel.close(Constants.CLOSE12_PROTOCOL_ILLEGAL) - self.on_close(channel) + await channel.close(Constants.CLOSE1002_PROTOCOL_ILLEGAL) + await self.on_close(channel) except Exception as e: logger.error(e) self.on_error(channel, e) @@ -143,11 +143,11 @@ class ProcessorDefault(Processor, ABC): # 异步运行 task = asyncio.create_task(self.listener.on_message(channel.get_session(), message)) - def on_close(self, channel: ChannelInternal): - self.listener.on_close(channel.get_session()) + async def on_close(self, channel: ChannelInternal): + await self.listener.on_close(channel.get_session()) def on_error(self, channel: ChannelInternal, error): self.listener.on_error(channel.get_session(), error) - def on_close_internal(self, channel: ChannelInternal): - self.listener.on_close(channel.get_session()) + async def on_close_internal(self, channel: ChannelInternal): + await self.listener.on_close(channel.get_session()) diff --git a/python/socketd/socketd/transport/core/listener/PathListener.py b/python/socketd/socketd/transport/core/listener/PathListener.py index f5c3cfdd..2b833af6 100644 --- a/python/socketd/socketd/transport/core/listener/PathListener.py +++ b/python/socketd/socketd/transport/core/listener/PathListener.py @@ -23,9 +23,9 @@ class PathListener(Listener): if l := self._mapper.get(session.path()): await l.on_message(session) - def on_close(self, session): + async def on_close(self, session): if l := self._mapper.get(session.path()): - l.on_close(session) + await l.on_close(session) def on_error(self, session, error): if l := self._mapper.get(session.path()): diff --git a/python/socketd/socketd/transport/core/listener/SimpleListener.py b/python/socketd/socketd/transport/core/listener/SimpleListener.py index 71782218..5180eddc 100644 --- a/python/socketd/socketd/transport/core/listener/SimpleListener.py +++ b/python/socketd/socketd/transport/core/listener/SimpleListener.py @@ -11,7 +11,7 @@ class SimpleListener(Listener): async def on_message(self, session: Session, message: Message): pass - def on_close(self, session: Session): + async def on_close(self, session: Session): pass def on_error(self, session: Session, error): diff --git a/python/socketd/socketd/transport/server/Server.py b/python/socketd/socketd/transport/server/Server.py index 1497c9a2..d0d2c77d 100644 --- a/python/socketd/socketd/transport/server/Server.py +++ b/python/socketd/socketd/transport/server/Server.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from typing import Callable, Coroutine from websockets.sync.server import WebSocketServer @@ -7,15 +9,13 @@ from socketd.transport.core.Listener import Listener class Server: - def get_title(self): - ... + def get_title(self): ... - def get_config(self) -> ServerConfig: - ... + def get_config(self) -> ServerConfig: ... - def config(self, consumer: Callable[[ServerConfig], None]) -> 'Server': ... + def config(self, consumer: Callable[[ServerConfig], None]) -> Server: ... - def listen(self, listener: Listener) -> 'Server': ... + def listen(self, listener: Listener) -> Server: ... def start(self) -> WebSocketServer | Coroutine: ... diff --git a/python/socketd/socketd/transport/utils/AsyncUtil.py b/python/socketd/socketd/transport/utils/AsyncUtil.py index 482cd479..cf448ce5 100644 --- a/python/socketd/socketd/transport/utils/AsyncUtil.py +++ b/python/socketd/socketd/transport/utils/AsyncUtil.py @@ -86,3 +86,15 @@ class AsyncUtil(object): elif pool: pool.submit(lambda x: AsyncUtil.thread_handler(*x), (loop, loop.create_task(_run()))) return loop + + @staticmethod + async def gather_concurrent(coros: typing.List[typing.Coroutine], limit=10): + """并发执行多个协程任务,并限制同时执行的数量""" + + async def worker(semaphore: asyncio.Semaphore, coro): + async with semaphore: + return await coro + + semaphore = asyncio.Semaphore(limit) + tasks = [worker(semaphore, coro) for coro in coros] + return await asyncio.gather(*tasks, return_exceptions=True) diff --git a/python/socketd/socketd_aio_tcp/TCPAIOServer.py b/python/socketd/socketd_aio_tcp/TCPAIOServer.py index ccdff60c..76d0fc88 100644 --- a/python/socketd/socketd_aio_tcp/TCPAIOServer.py +++ b/python/socketd/socketd_aio_tcp/TCPAIOServer.py @@ -1,19 +1,21 @@ import asyncio -from concurrent.futures import Future import socket +from asyncio import StreamReader, StreamReaderProtocol, StreamWriter +from concurrent.futures import Future from typing import Optional, List from socketd.exception.SocketDExecption import SocketDTimeoutException from socketd.transport.core.ChannelSupporter import ChannelSupporter +from socketd.transport.core.Costants import Constants from socketd.transport.core.Flags import Flags from socketd.transport.core.Frame import Frame from socketd.transport.core.impl.ChannelDefault import ChannelDefault from socketd.transport.core.impl.LogConfig import log from socketd.transport.server.ServerBase import ServerBase from socketd.transport.server.ServerConfig import ServerConfig -from socketd.transport.utils.AsyncUtil import AsyncUtil from socketd.transport.utils.async_api.AtomicRefer import AtomicRefer +from .TCPStreamIO import TCPStreamIO from .TcpAIOChannelAssistant import TcpAIOChannelAssistant @@ -21,87 +23,73 @@ from .TcpAIOChannelAssistant import TcpAIOChannelAssistant class TCPAIOServer(ServerBase, ChannelSupporter): def __init__(self, config: ServerConfig): - self.__loop = asyncio.new_event_loop() - super().__init__(config, TcpAIOChannelAssistant(config, self.__loop)) - self._server: Optional[socket.socket] = None - self.__top: Optional[asyncio.Future] = None + super().__init__(config, TcpAIOChannelAssistant(config, asyncio.get_running_loop())) + self._server: Optional[asyncio.Server] = None self._is_close: AtomicRefer = AtomicRefer(False) - self._sock_future_list: List[asyncio.Future] = [] self._server_forever_future: Optional[asyncio.Future | Future] = None - self._sock_list: List[socket.socket] = [] + self._on_receive_tasks: List[asyncio.Task] = [] # 服务器的回调函数 - async def handler(self, loop: asyncio.AbstractEventLoop, sock: socket.socket, - addr, channel: ChannelDefault): # reader和writer参数是asyncio.start_server生成异步服务器后自动传入进来的 - while True: # 循环接受数据,直到套接字关闭 + async def server_forever(self, reader: StreamReader, writer: StreamWriter): + + channel = ChannelDefault(TCPStreamIO(self._server, reader, writer), self) + while True: try: + if self._on_receive_tasks: + task = self._on_receive_tasks[0] + if task.done(): + self._on_receive_tasks.pop(0) if await self._is_close.get(): - self.get_processor().on_close(channel) + await self.get_processor().on_close(channel) break - frame: Frame = await loop.create_task(self.get_assistant().read(sock)) + frame: Frame = await self.get_assistant().read(reader) if frame is not None: - await self.get_processor().on_receive(channel, frame) + self._on_receive_tasks.append(asyncio.create_task(self.get_processor().on_receive(channel, frame))) if frame.flag() == Flags.Close: """客户端主动关闭""" log.debug("{sessionId} 主动退出", sessionId=channel.get_session().session_id()) break except SocketDTimeoutException as e: - await channel.send_close() + await channel.send_close(Constants.CLOSE1001_PROTOCOL_CLOSE) log.error("server handler {e}", e=e) break except Exception as e: self.get_processor().on_error(channel, e) - self.get_processor().on_close(channel) + await self.get_processor().on_close(channel) log.error("server handler {e}", e=e) break - sock.close() - - async def server_forever(self, loop: asyncio.AbstractEventLoop, listener: socket.socket): - while True: - try: - if await self._is_close.get(): - break - sock, addr = await loop.sock_accept(listener) - channel = ChannelDefault(sock, self) - self._sock_future_list.append(loop.create_task(self.handler(loop, sock, addr, channel))) - self._sock_list.append(sock) - except asyncio.CancelledError as e: - log.warning("Server asyncio cancelled {e}", e=e) - break - except Exception as e: - log.warning("Server accept error {e}", e=e) - break - listener.close() async def start(self): + loop = asyncio.get_running_loop() + + def factory(): + reader = StreamReader(limit=self.get_config().get_read_buffer_size(), loop=loop) + protocol = StreamReaderProtocol(reader, self.server_forever, + loop=loop) + return protocol + _sock: socket.socket = socket.socket(socket.AF_INET) + _sock.settimeout(self.get_config().get_idle_timeout() / 1000) # 生成一个服务器 - self._server: socket.socket = socket.create_server((self.get_config().get_host(), - self.get_config().get_port())) - self._server.setblocking(False) - if self.__top is None or not self.__loop.is_running(): - self.__top = AsyncUtil.run_forever(self.__loop) - self._server_forever_future = asyncio.run_coroutine_threadsafe(self.server_forever(self.__loop, self._server), self.__loop) + self._server: asyncio.Server = await loop.create_server(factory, + self.get_config().get_host(), + self.get_config().get_port(), + start_serving=True, + ) + # self._server_forever_future = loop.create_task(self._server.serve_forever()) return self._server async def close_wait(self): - # asyncio.run_coroutine_threadsafe(asyncio.wait(self._sock_future_list), self.__loop).result() - await asyncio.wait(self._sock_future_list, timeout=10) - for sock in self._sock_list: - sock.detach() - sock.close() try: if self._server_forever_future: - self._server_forever_future.result(10) + await asyncio.wait_for(self._server_forever_future, timeout=10) except asyncio.TimeoutError: - self._server_forever_future.cancel() + pass async def stop(self): log.info("TcpAioServer stop...") # 等等执行完成 await self._is_close.set(True) - await self.close_wait() - if not self.__top.done(): - self.__top.set_result(True) self._server.close() - self.__top = None + # await self._server.wait_closed() + await self.close_wait() diff --git a/python/socketd/socketd_aio_tcp/TcpAIOChannelAssistant.py b/python/socketd/socketd_aio_tcp/TcpAIOChannelAssistant.py index fc7d7825..8ea8c9a6 100644 --- a/python/socketd/socketd_aio_tcp/TcpAIOChannelAssistant.py +++ b/python/socketd/socketd_aio_tcp/TcpAIOChannelAssistant.py @@ -1,5 +1,6 @@ import asyncio import socket +from asyncio import StreamReader from socketd.transport.core import Config from socketd.transport.core.ChannelAssistant import ChannelAssistant @@ -10,10 +11,7 @@ from socketd.transport.core.codec.Buffer import Buffer from socketd.transport.core.codec.ByteBufferCodecReader import ByteBufferCodecReader from socketd.transport.core.codec.ByteBufferCodecWriter import ByteBufferCodecWriter from socketd.transport.core.impl.LogConfig import log - -SHUT_RD = 0 # 断开输入流 -SHUT_WR = 1 # 断开输出流 -SHUT_RDWR = 2 # 同时断开 I/O 流 +from socketd_aio_tcp.TCPStreamIO import TCPStreamIO class TcpAIOChannelAssistant(ChannelAssistant): @@ -21,39 +19,43 @@ class TcpAIOChannelAssistant(ChannelAssistant): self.config = config self.loop = loop - async def write(self, source: socket.socket, frame: Frame) -> None: + async def write(self, stream_io: TCPStreamIO, frame: Frame) -> None: writer: CodecWriter = self.config.get_codec().write(frame, lambda size: ByteBufferCodecWriter(Buffer(limit=size))) if writer is not None: _data = writer.get_buffer().getvalue() _len = len(_data) - await self.loop.sock_sendall(source, _len.to_bytes(length=4, byteorder='big', signed=False)) - await self.loop.sock_sendall(source, _data) + stream_io.writer.write(_len.to_bytes(length=4, byteorder='big', signed=False)) + stream_io.writer.write(_data) + # Flush the write buffer + await stream_io.writer.drain() writer.close() - def is_valid(self, target: socket.socket) -> bool: - _closed = not getattr(target, '_closed', True) - log.debug(_closed) - return _closed + def is_valid(self, stream_io: TCPStreamIO) -> bool: + if isinstance(stream_io.server, asyncio.Transport): + return not stream_io.server.is_closing() + elif isinstance(stream_io.server, asyncio.Server): + return stream_io.server.is_serving() - async def close(self, target: socket.socket) -> None: - target.shutdown(SHUT_RDWR) - target.close() + async def close(self, stream_io: TCPStreamIO) -> None: + stream_io.writer.close() + stream_io.server.close() - def get_remote_address(self, target: socket.socket) -> str: + def get_remote_address(self, stream_io: TCPStreamIO) -> str: + target: socket.socket = stream_io.writer.get_extra_info('socket') return target.getpeername() - def get_local_address(self, target: socket.socket) -> str: + def get_local_address(self, stream_io: TCPStreamIO) -> str: + target: socket.socket = stream_io.writer.get_extra_info('socket') return target.getsockname() - async def read(self, sock: socket.socket) -> Frame | None: - loop = asyncio.get_running_loop() - lenBt = await loop.sock_recv(sock, 4) + async def read(self, reader: StreamReader) -> Frame | None: + lenBt = await reader.read(4) if lenBt is None: return None _len = bytes_to_int32(lenBt) - _buffer = await loop.sock_recv(sock, _len) + _buffer = await reader.read(_len) if _buffer is None: return None buffer = Buffer(len(_buffer), _buffer) diff --git a/python/socketd/socketd_aio_tcp/TcpAioClientConnector.py b/python/socketd/socketd_aio_tcp/TcpAioClientConnector.py index 4b07e715..d8920b4e 100644 --- a/python/socketd/socketd_aio_tcp/TcpAioClientConnector.py +++ b/python/socketd/socketd_aio_tcp/TcpAioClientConnector.py @@ -1,6 +1,6 @@ import asyncio -import socket -from typing import Optional, AsyncGenerator +from asyncio import StreamReader, StreamReaderProtocol, StreamWriter +from typing import Optional, AsyncGenerator, List from socketd.exception.SocketDExecption import SocketDTimeoutException, SocketDConnectionException from socketd.transport.client.Client import ClientInternal @@ -13,7 +13,7 @@ from socketd.transport.core.impl.LogConfig import log from socketd.transport.utils.AsyncUtil import AsyncUtil from socketd.transport.utils.CompletableFuture import CompletableFuture -from socketd.transport.utils.async_api.AtomicRefer import AtomicRefer +from socketd_aio_tcp.TCPStreamIO import TCPStreamIO class TcpAioClientConnector(ClientConnectorBase): @@ -21,30 +21,35 @@ class TcpAioClientConnector(ClientConnectorBase): def __init__(self, client: ClientInternal): super().__init__(client) self.__top: Optional[asyncio.Future] = None - self.__real: Optional[socket.socket] = None - self.__loop = asyncio.new_event_loop() + self.__real: Optional[asyncio.Transport] = None self.__message: asyncio.Queue = asyncio.Queue() + self.__message_future: Optional[asyncio.Future] = None self._handshakeFuture = CompletableFuture() self._handshakeTask: Optional[AsyncGenerator] = None self._receiveTask: Optional[asyncio.Task] = None self.transfer_dataTask: Optional[asyncio.Task] = None + self._on_receive_tasks: List[asyncio.Task] = [] + self._loop: Optional[asyncio.AbstractEventLoop] = None async def connect(self): # 处理自定义架构的影响 tcp_url = self.client.get_config().get_url().replace("std:", "").replace("-python", "") _sch, _host, _port = tcp_url.replace("//", "").split(":") - if self.__top is None: - self.__top = AtomicRefer(AsyncUtil.run_forever(self.__loop)) - if not self.__loop.is_running(): - self.__top = AtomicRefer(AsyncUtil.run_forever(self.__loop)) _port = _port.split("/")[0] + if self.__top is None: + self._loop = asyncio.new_event_loop() + self.__top = AsyncUtil.run_forever(self._loop, daemon=True) try: - self.__real: socket.socket = socket.create_connection((_host, _port), - timeout=self.client.get_config().get_idle_timeout()) - channel = ChannelDefault(self.__real, self.client) - - self._receiveTask = asyncio.create_task(self.receive(channel, self.__real, self._handshakeFuture)) - self.transfer_dataTask = asyncio.create_task(self.transfer_data(self.__real)) + loop = asyncio.get_running_loop() + reader = StreamReader(limit=self.get_config().get_read_buffer_size(), loop=loop) + protocol = StreamReaderProtocol(reader, loop=loop) + transport, _ = await loop.create_connection( + lambda: protocol, _host, _port) + writer = StreamWriter(transport, protocol, reader, loop) + self.__real: asyncio.Transport = transport + channel = ChannelDefault(TCPStreamIO(self.__real, reader, writer), self.client) + self._receiveTask = loop.create_task(self.receive(channel, self._handshakeFuture)) + self.transfer_dataTask = loop.create_task(self.transfer_data(reader)) await channel.send_connect(tcp_url, self.client.get_config().get_meta_map()) handshakeResult: ClientHandshakeResult = await self._handshakeFuture.get( @@ -63,68 +68,73 @@ class TcpAioClientConnector(ClientConnectorBase): await self.close() raise SocketDTimeoutException(f"Connection timeout: {self.client.get_config().get_link_url()} {e}") - async def transfer_data(self, _sock: socket.socket) -> None: - + async def transfer_data(self, _reader: asyncio.StreamReader) -> None: while True: try: if await self.is_closed(): break - _frame: Frame = await self.client.get_assistant().read(_sock) - if _frame is None: - break - await self.__message.put(_frame) - if _frame.flag() == Flags.Close: - break + _frame: Frame = await self.client.get_assistant().read(_reader) + if _frame is not None: + await self.__message.put(_frame) + if _frame.flag() == Flags.Close: + break + break except asyncio.CancelledError as e: break - except ConnectionAbortedError as e: + except Exception as e: break - await self.close() - async def receive(self, channel: ChannelDefault, _socket: socket.socket, + async def receive(self, channel: ChannelDefault, handshake_future: CompletableFuture) -> None: + loop = asyncio.get_running_loop() while True: try: + if self._on_receive_tasks: + task = self._on_receive_tasks[0] + if task.done(): + self._on_receive_tasks.pop(0) + self.__message_future = loop.create_task(self.__message.get()) if await self.is_closed(): break - frame: Frame = await self.__message.get() + frame: Frame = await self.__message_future + self.__message.task_done() if frame is not None: if frame.flag() == Flags.Connack: async def future(b, _e): handshake_future.accept(ClientHandshakeResult(channel, _e)) - await channel.on_open_future(future) - await self.client.get_processor().on_receive(channel, frame) + self._on_receive_tasks.append(loop.create_task(self.client.get_processor().on_receive(channel, frame))) if frame.flag() == Flags.Close: break + except asyncio.TimeoutError as e: + break + except asyncio.CancelledError as e: + break except SocketDConnectionException as e: handshake_future.accept(ClientHandshakeResult(channel, e)) break except Exception as e: self.client.get_processor().on_error(channel, e) break - await self.close() async def is_closed(self): - return getattr(self.__real, '_closed') + return self.__real.is_closing() async def close(self): log.debug("TcpAioClientConnector stop... ") if self.__real is None: return try: - self._receiveTask.cancel() - self.transfer_dataTask.cancel() self.__real.close() await self.stop() except Exception as e: log.debug(e) async def stop(self): + self.__message_future.cancel() + self.transfer_dataTask.cancel() + await asyncio.wait([self._receiveTask], timeout=5) if self.__top: - async with self.__top as _top: - if not _top.done(): - _top.cancel() - if self.__loop.is_running(): - self.__loop.stop() - log.debug(f"Stopping TCP::{self.__loop.is_running()}") + self.__top.set_result(None) + self._loop.stop() + self.__top = None diff --git a/python/socketd/socketd_websocket/impl/AIOWebSocketClientImpl.py b/python/socketd/socketd_websocket/impl/AIOWebSocketClientImpl.py index bf571930..5fa2a7e2 100644 --- a/python/socketd/socketd_websocket/impl/AIOWebSocketClientImpl.py +++ b/python/socketd/socketd_websocket/impl/AIOWebSocketClientImpl.py @@ -86,6 +86,10 @@ class AIOWebSocketClientImpl(WebSocketClientProtocol): if self.status_state == Flags.Close: return try: + if self.__on_receive_tasks: + task = self.__on_receive_tasks[0] + if task.done(): + self.__on_receive_tasks.pop(0) message = await self.recv() if message is None: # 结束握手 diff --git a/python/socketd/socketd_websocket/impl/AIOWebSocketServerImpl.py b/python/socketd/socketd_websocket/impl/AIOWebSocketServerImpl.py index 76c7e177..435ce4e1 100644 --- a/python/socketd/socketd_websocket/impl/AIOWebSocketServerImpl.py +++ b/python/socketd/socketd_websocket/impl/AIOWebSocketServerImpl.py @@ -65,6 +65,10 @@ class AIOWebSocketServerImpl(WebSocketServerProtocol, IWebSocketServer): if conn.closed: break try: + if tasks: + task = tasks[0] + if task.done(): + tasks.pop(0) message = await self.recv() # frame: Frame = self.ws_aio_server.get_assistant().read( # message) diff --git a/python/socketd/subs_root/__init__.py b/python/socketd/subs_root/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/socketd/subs_root/socketd/Channel.pyi b/python/socketd/subs_root/socketd/Channel.pyi deleted file mode 100644 index 2b620e4c..00000000 --- a/python/socketd/subs_root/socketd/Channel.pyi +++ /dev/null @@ -1,88 +0,0 @@ -from typing import Any -from abc import ABC, abstractmethod - - -class Channel(ABC): - @abstractmethod - def get_attachment(self, name: str) -> Any: - pass - - @abstractmethod - def set_attachment(self, name: str, val: Any) -> None: - pass - - @abstractmethod - def remove_acceptor(self, sid: str) -> None: - pass - - @abstractmethod - def is_valid(self) -> bool: - pass - - @abstractmethod - def is_closed(self) -> bool: - pass - - @abstractmethod - def get_config(self) -> 'Config': - pass - - @abstractmethod - def get_requests(self) -> int: - pass - - @abstractmethod - def set_handshake(self, handshake: 'HandshakeDefault') -> None: - pass - - @abstractmethod - def get_handshake(self) -> 'HandshakeDefault': - pass - - @abstractmethod - def get_remote_address(self) -> str: - pass - - @abstractmethod - def get_local_address(self) -> str: - pass - - @abstractmethod - def set_live_time(self) -> None: - pass - - @abstractmethod - def get_live_time(self) -> int: - pass - - @abstractmethod - def send_connect(self, url: str) -> None: - pass - - @abstractmethod - def send_connack(self, connect_message: 'Message') -> None: - pass - - @abstractmethod - def send_ping(self) -> None: - pass - - @abstractmethod - def send_pong(self) -> None: - pass - - @abstractmethod - def send_close(self) -> None: - pass - - @abstractmethod - def send(self, frame: 'Frame', acceptor: 'Acceptor') -> None: - pass - - @abstractmethod - def retrieve(self, frame: 'Frame') -> None: - pass - - @abstractmethod - def get_session(self) -> 'Session': - pass \ No newline at end of file diff --git a/python/socketd/subs_root/socketd/Client.pyi b/python/socketd/subs_root/socketd/Client.pyi deleted file mode 100644 index f00bfb22..00000000 --- a/python/socketd/subs_root/socketd/Client.pyi +++ /dev/null @@ -1,18 +0,0 @@ -#客户端 - -from typing import Callable - -from socketd.transport.client.ClientConfig import ClientConfig - - -class Client: - def heartbeatHandler(self, handler: Callable) -> 'Client':... - - def config(self, consumer: Callable[['ClientConfig'], None]) -> 'Client':... - - def process(self, processor: Callable) -> 'Client':... - - def listen(self, listener: Callable) -> 'Client':... - - def open(self):... - diff --git a/python/socketd/subs_root/socketd/ClientFactory.pyi b/python/socketd/subs_root/socketd/ClientFactory.pyi deleted file mode 100644 index ec14d70a..00000000 --- a/python/socketd/subs_root/socketd/ClientFactory.pyi +++ /dev/null @@ -1,14 +0,0 @@ -from socketd.transport.client.ClientConfig import ClientConfig - - -class ClientFactory: - - """ - 协议架构 - """ - def schema(self) -> str: ... - - """ - 创建客户端 - """ - def create_client(self, clientConfig: ClientConfig) -> 'IClient': ... diff --git a/python/socketd/subs_root/socketd/Server.pyi b/python/socketd/subs_root/socketd/Server.pyi deleted file mode 100644 index 45b17c95..00000000 --- a/python/socketd/subs_root/socketd/Server.pyi +++ /dev/null @@ -1,17 +0,0 @@ -from typing import Callable - -from socketd.transport.core.Listener import Listener -from socketd.transport.core.Processor import Processor -from socketd.transport.server.ServerConfig import ServerConfig - - -class Server: - def config(self, consumer: Callable[[ServerConfig], None]) -> Server:... - - def process(self, processor: Processor) -> Server:... - - def listen(self, listener: Listener) -> Server:... - - def start(self) -> None:... - - def stop(self) -> None:... diff --git a/python/socketd/subs_root/socketd/ServerFactory.pyi b/python/socketd/subs_root/socketd/ServerFactory.pyi deleted file mode 100644 index 11fa06e8..00000000 --- a/python/socketd/subs_root/socketd/ServerFactory.pyi +++ /dev/null @@ -1,9 +0,0 @@ -from socketd.transport.server.Server import Server -from socketd.transport.server.ServerConfig import ServerConfig - - -class ServerFactory: - - def schema(self) -> list[str]: ... - - def create_server(self, serverConfig: ServerConfig) -> Server: ... diff --git a/python/socketd/subs_root/socketd/__init__.py b/python/socketd/subs_root/socketd/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/socketd/test/TestCase01.py b/python/socketd/test/TestCase01.py index 3edbcb30..acbeac8a 100644 --- a/python/socketd/test/TestCase01.py +++ b/python/socketd/test/TestCase01.py @@ -23,8 +23,8 @@ from test.cases.TestCase16_openAnTry import TestCase16_openAnTry class TestCase01(unittest.TestCase): schemas = [ - "std:ws", - # "std:tcp" + # "std:ws", + "std:tcp" ] def __init__(self, *args, **kwargs): diff --git a/python/socketd/test/base_test/01_applictionTest.py b/python/socketd/test/base_test/01_applictionTest.py index ea90a8f3..a576d8b7 100644 --- a/python/socketd/test/base_test/01_applictionTest.py +++ b/python/socketd/test/base_test/01_applictionTest.py @@ -10,12 +10,12 @@ from socketd.transport.core.impl.LogConfig import log from socketd.transport.server.ServerConfig import ServerConfig from socketd.transport.core.entity.StringEntity import StringEntity from socketd.transport.server.Server import Server -from socketd.transport.stream.RequestStream import RequestStream -from socketd.transport.stream.SubscribeStream import SubscribeStream from test.modelu.SimpleListenerTest import SimpleListenerTest from test.uitls import calc_async_time -COUNT = 1000 + +# 超过一定数量会导致异步并发发生异常 +COUNT = 10000 log.remove() log.add(sys.stderr, level="INFO") @@ -23,28 +23,36 @@ log.add(sys.stderr, level="INFO") @calc_async_time async def application_test(): + loop = asyncio.get_running_loop() server: Server = SocketD.create_server(ServerConfig("ws").port(9999)) server_session: WebSocketServer = await server.listen( SimpleListenerTest()).start() await asyncio.sleep(1) client_session: Session = await SocketD.create_client("std:ws://127.0.0.1:9999").open() + log.info(f"client send count: {COUNT} ...") # 单向发送 @calc_async_time async def _send(): + tasks = [] for _ in range(COUNT): - await client_session.send("demo", StringEntity("test")) + tasks.append(loop.create_task(client_session.send("demo", StringEntity("test")))) + await asyncio.wait(tasks) await _send() # 发送并请求(且,等待一个答复) @calc_async_time async def _send_and_request(): + tasks = [] for _ in range(COUNT): - req: RequestStream = await client_session.send_and_request("demo", StringEntity("你好"), 100) - entity: Entity = await req.await_result() - - # await _send_and_request() + tasks.append(loop.create_task(client_session.send_and_request("demo", StringEntity("你好"), 100))) + data, _ = await asyncio.wait(tasks) + tasks.clear() + for req in data: + tasks.append(req.result().await_result()) + await asyncio.gather(*tasks) + await _send_and_request() # 发送并订阅(且,接收零个或多个答复流) @calc_async_time @@ -52,11 +60,15 @@ async def application_test(): async def send_and_subscribe_test(_entity: Entity): log.debug(f"c::subscribe::{_entity.data_as_string()} {_entity}") + tasks = [] for _ in range(COUNT): - req: SubscribeStream = await client_session.send_and_subscribe("demo", StringEntity("hi"), 100) - req.then_reply(send_and_subscribe_test) + tasks.append(loop.create_task(client_session.send_and_subscribe("demo", StringEntity("hi"), 100))) + data, _ = await asyncio.wait(tasks) + for req in data: + tasks.append(req.result().then_reply(send_and_subscribe_test)) await _send_and_subscribe() + await asyncio.sleep(3) # 关闭客户端会话 await client_session.close() diff --git a/python/socketd/test/modelu/SimpleListenerTest.py b/python/socketd/test/modelu/SimpleListenerTest.py index c2b4df58..98306766 100644 --- a/python/socketd/test/modelu/SimpleListenerTest.py +++ b/python/socketd/test/modelu/SimpleListenerTest.py @@ -36,7 +36,7 @@ class SimpleListenerTest(Listener, ABC): await session.reply_end(message, StringEntity("ok test")) await session.reply(message, StringEntity("reply")) - def on_close(self, session): + async def on_close(self, session): with self.close_counter: self.close_counter.set(self.close_counter.get() + 1) @@ -46,7 +46,7 @@ class SimpleListenerTest(Listener, ABC): def config_handler(config: ServerConfig | ClientConfig): config.is_thread(False) - config.idle_timeout(10000) + config.idle_timeout(1500) # config.set_logger_level("DEBUG") -- Gitee From aaf11bda43a42b404023cd137b24b41068980ace Mon Sep 17 00:00:00 2001 From: bai <1145000687@qq.com> Date: Thu, 21 Mar 2024 18:56:40 +0800 Subject: [PATCH 3/6] =?UTF-8?q?py:=20=E4=BF=AE=E6=94=B9=E8=B6=85=E6=97=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- python/socketd/socketd_aio_tcp/TCPAIOServer.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/socketd/socketd_aio_tcp/TCPAIOServer.py b/python/socketd/socketd_aio_tcp/TCPAIOServer.py index 76d0fc88..c90b3edd 100644 --- a/python/socketd/socketd_aio_tcp/TCPAIOServer.py +++ b/python/socketd/socketd_aio_tcp/TCPAIOServer.py @@ -69,11 +69,12 @@ class TCPAIOServer(ServerBase, ChannelSupporter): loop=loop) return protocol _sock: socket.socket = socket.socket(socket.AF_INET) + _sock.bind(("127.0.0.1" if not self.get_config().get_host() else self.get_config().get_host(), + self.get_config().get_port())) _sock.settimeout(self.get_config().get_idle_timeout() / 1000) # 生成一个服务器 self._server: asyncio.Server = await loop.create_server(factory, - self.get_config().get_host(), - self.get_config().get_port(), + sock=_sock, start_serving=True, ) # self._server_forever_future = loop.create_task(self._server.serve_forever()) -- Gitee From 48eb2f1ca0704929bc2cc6a27743af0c7f92db2a Mon Sep 17 00:00:00 2001 From: bai <1145000687@qq.com> Date: Fri, 22 Mar 2024 12:27:29 +0800 Subject: [PATCH 4/6] =?UTF-8?q?py:=20=E6=96=B0=E5=A2=9ETCP=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../socketd/socketd/transport/core/Channel.py | 2 +- .../socketd/socketd_aio_tcp/TCPAIOServer.py | 34 ++++++++----------- python/socketd/socketd_aio_tcp/TCPStreamIO.py | 24 +++++++++++++ .../socketd_aio_tcp/TcpAIOChannelAssistant.py | 12 +++---- .../socketd_aio_tcp/TcpAioClientConnector.py | 25 +++++++++----- 5 files changed, 61 insertions(+), 36 deletions(-) create mode 100644 python/socketd/socketd_aio_tcp/TCPStreamIO.py diff --git a/python/socketd/socketd/transport/core/Channel.py b/python/socketd/socketd/transport/core/Channel.py index 1fc24328..9c990619 100644 --- a/python/socketd/socketd/transport/core/Channel.py +++ b/python/socketd/socketd/transport/core/Channel.py @@ -30,7 +30,7 @@ class Channel(ABC): ... @abstractmethod - def is_closed(self) -> bool: + def is_closed(self) -> int: ... @abstractmethod diff --git a/python/socketd/socketd_aio_tcp/TCPAIOServer.py b/python/socketd/socketd_aio_tcp/TCPAIOServer.py index c90b3edd..a82aee7c 100644 --- a/python/socketd/socketd_aio_tcp/TCPAIOServer.py +++ b/python/socketd/socketd_aio_tcp/TCPAIOServer.py @@ -1,7 +1,6 @@ import asyncio import socket from asyncio import StreamReader, StreamReaderProtocol, StreamWriter -from concurrent.futures import Future from typing import Optional, List from socketd.exception.SocketDExecption import SocketDTimeoutException @@ -23,16 +22,17 @@ from .TcpAIOChannelAssistant import TcpAIOChannelAssistant class TCPAIOServer(ServerBase, ChannelSupporter): def __init__(self, config: ServerConfig): - super().__init__(config, TcpAIOChannelAssistant(config, asyncio.get_running_loop())) + self._loop: asyncio.AbstractEventLoop = asyncio.get_running_loop() + super().__init__(config, TcpAIOChannelAssistant(config, self._loop)) self._server: Optional[asyncio.Server] = None + self._sock: Optional[socket.socket] = None self._is_close: AtomicRefer = AtomicRefer(False) - self._server_forever_future: Optional[asyncio.Future | Future] = None self._on_receive_tasks: List[asyncio.Task] = [] # 服务器的回调函数 async def server_forever(self, reader: StreamReader, writer: StreamWriter): - channel = ChannelDefault(TCPStreamIO(self._server, reader, writer), self) + channel = ChannelDefault(TCPStreamIO(self._sock, reader, writer), self) while True: try: if self._on_receive_tasks: @@ -44,7 +44,7 @@ class TCPAIOServer(ServerBase, ChannelSupporter): break frame: Frame = await self.get_assistant().read(reader) if frame is not None: - self._on_receive_tasks.append(asyncio.create_task(self.get_processor().on_receive(channel, frame))) + self._on_receive_tasks.append(self._loop.create_task(self.get_processor().on_receive(channel, frame))) if frame.flag() == Flags.Close: """客户端主动关闭""" log.debug("{sessionId} 主动退出", @@ -68,29 +68,25 @@ class TCPAIOServer(ServerBase, ChannelSupporter): protocol = StreamReaderProtocol(reader, self.server_forever, loop=loop) return protocol - _sock: socket.socket = socket.socket(socket.AF_INET) - _sock.bind(("127.0.0.1" if not self.get_config().get_host() else self.get_config().get_host(), - self.get_config().get_port())) - _sock.settimeout(self.get_config().get_idle_timeout() / 1000) + + self._sock: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.bind(("127.0.0.1" if not self.get_config().get_host() else self.get_config().get_host(), + self.get_config().get_port())) + self._sock.settimeout(self.get_config().get_idle_timeout() / 1000) # 生成一个服务器 self._server: asyncio.Server = await loop.create_server(factory, - sock=_sock, + sock=self._sock, start_serving=True, ) - # self._server_forever_future = loop.create_task(self._server.serve_forever()) return self._server - async def close_wait(self): - try: - if self._server_forever_future: - await asyncio.wait_for(self._server_forever_future, timeout=10) - except asyncio.TimeoutError: - pass - async def stop(self): log.info("TcpAioServer stop...") # 等等执行完成 await self._is_close.set(True) self._server.close() + if self._sock is not None and not getattr(self._sock, "_closed"): + self._sock.close() # await self._server.wait_closed() - await self.close_wait() + + diff --git a/python/socketd/socketd_aio_tcp/TCPStreamIO.py b/python/socketd/socketd_aio_tcp/TCPStreamIO.py new file mode 100644 index 00000000..d1feda44 --- /dev/null +++ b/python/socketd/socketd_aio_tcp/TCPStreamIO.py @@ -0,0 +1,24 @@ +import socket +from asyncio import StreamReader, StreamWriter + + +class TCPStreamIO: + + __slots__ = ["_sock", "_reader", "_writer"] + + def __init__(self, server: socket.socket, reader: StreamReader, writer: StreamWriter): + self._sock: socket.socket = server + self._reader = reader + self._writer = writer + + @property + def sock(self): + return self._sock + + @property + def reader(self): + return self._reader + + @property + def writer(self): + return self._writer diff --git a/python/socketd/socketd_aio_tcp/TcpAIOChannelAssistant.py b/python/socketd/socketd_aio_tcp/TcpAIOChannelAssistant.py index 8ea8c9a6..8611d80f 100644 --- a/python/socketd/socketd_aio_tcp/TcpAIOChannelAssistant.py +++ b/python/socketd/socketd_aio_tcp/TcpAIOChannelAssistant.py @@ -10,7 +10,6 @@ from socketd.transport.core.codec import bytes_to_int32 from socketd.transport.core.codec.Buffer import Buffer from socketd.transport.core.codec.ByteBufferCodecReader import ByteBufferCodecReader from socketd.transport.core.codec.ByteBufferCodecWriter import ByteBufferCodecWriter -from socketd.transport.core.impl.LogConfig import log from socketd_aio_tcp.TCPStreamIO import TCPStreamIO @@ -32,21 +31,18 @@ class TcpAIOChannelAssistant(ChannelAssistant): writer.close() def is_valid(self, stream_io: TCPStreamIO) -> bool: - if isinstance(stream_io.server, asyncio.Transport): - return not stream_io.server.is_closing() - elif isinstance(stream_io.server, asyncio.Server): - return stream_io.server.is_serving() + return not getattr(stream_io.sock, "_closed") async def close(self, stream_io: TCPStreamIO) -> None: + # stream_io.sock.close() stream_io.writer.close() - stream_io.server.close() def get_remote_address(self, stream_io: TCPStreamIO) -> str: - target: socket.socket = stream_io.writer.get_extra_info('socket') + target: socket.socket = stream_io.sock return target.getpeername() def get_local_address(self, stream_io: TCPStreamIO) -> str: - target: socket.socket = stream_io.writer.get_extra_info('socket') + target: socket.socket = stream_io.sock return target.getsockname() async def read(self, reader: StreamReader) -> Frame | None: diff --git a/python/socketd/socketd_aio_tcp/TcpAioClientConnector.py b/python/socketd/socketd_aio_tcp/TcpAioClientConnector.py index d8920b4e..99df5204 100644 --- a/python/socketd/socketd_aio_tcp/TcpAioClientConnector.py +++ b/python/socketd/socketd_aio_tcp/TcpAioClientConnector.py @@ -1,4 +1,5 @@ import asyncio +import socket from asyncio import StreamReader, StreamReaderProtocol, StreamWriter from typing import Optional, AsyncGenerator, List @@ -20,6 +21,7 @@ class TcpAioClientConnector(ClientConnectorBase): def __init__(self, client: ClientInternal): super().__init__(client) + self._sock: Optional[socket.socket] = None self.__top: Optional[asyncio.Future] = None self.__real: Optional[asyncio.Transport] = None self.__message: asyncio.Queue = asyncio.Queue() @@ -33,21 +35,26 @@ class TcpAioClientConnector(ClientConnectorBase): async def connect(self): # 处理自定义架构的影响 + loop = asyncio.get_running_loop() tcp_url = self.client.get_config().get_url().replace("std:", "").replace("-python", "") _sch, _host, _port = tcp_url.replace("//", "").split(":") - _port = _port.split("/")[0] + _port = int(_port.split("/")[0]) if self.__top is None: self._loop = asyncio.new_event_loop() self.__top = AsyncUtil.run_forever(self._loop, daemon=True) try: - loop = asyncio.get_running_loop() + reader = StreamReader(limit=self.get_config().get_read_buffer_size(), loop=loop) protocol = StreamReaderProtocol(reader, loop=loop) + self._sock: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.connect((_host, _port)) transport, _ = await loop.create_connection( - lambda: protocol, _host, _port) + lambda: protocol, sock=self._sock) writer = StreamWriter(transport, protocol, reader, loop) + self.__real: asyncio.Transport = transport - channel = ChannelDefault(TCPStreamIO(self.__real, reader, writer), self.client) + + channel = ChannelDefault(TCPStreamIO(self._sock, reader, writer), self.client) self._receiveTask = loop.create_task(self.receive(channel, self._handshakeFuture)) self.transfer_dataTask = loop.create_task(self.transfer_data(reader)) @@ -78,11 +85,8 @@ class TcpAioClientConnector(ClientConnectorBase): await self.__message.put(_frame) if _frame.flag() == Flags.Close: break - break except asyncio.CancelledError as e: break - except Exception as e: - break async def receive(self, channel: ChannelDefault, handshake_future: CompletableFuture) -> None: @@ -126,6 +130,8 @@ class TcpAioClientConnector(ClientConnectorBase): return try: self.__real.close() + if self._sock is not None and not getattr(self._sock, "_closed"): + self._sock.close() await self.stop() except Exception as e: log.debug(e) @@ -133,7 +139,10 @@ class TcpAioClientConnector(ClientConnectorBase): async def stop(self): self.__message_future.cancel() self.transfer_dataTask.cancel() - await asyncio.wait([self._receiveTask], timeout=5) + try: + await asyncio.wait([self._receiveTask], timeout=5) + except asyncio.CancelledError: + log.debug("_receiveTask Cancelling") if self.__top: self.__top.set_result(None) self._loop.stop() -- Gitee From b82f01fb4d9141d00f40194a437127ea240f64bb Mon Sep 17 00:00:00 2001 From: bai <1145000687@qq.com> Date: Mon, 25 Mar 2024 16:45:23 +0800 Subject: [PATCH 5/6] =?UTF-8?q?py:=20=E4=BF=AE=E6=94=B9=E7=B1=BB=E5=9E=8B?= =?UTF-8?q?=E6=A0=87=E6=B3=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- python/socketd/socketd/transport/core/codec/CodecDefault.py | 2 +- .../socketd/socketd/transport/core/entity/EntityDefault.py | 4 ++-- python/socketd/socketd/transport/core/entity/FileEntity.py | 6 +++--- python/socketd/socketd_aio_tcp/TcpAioProvider.py | 2 +- .../socketd_websocket/impl/AIOWebSocketClientImpl.py | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/python/socketd/socketd/transport/core/codec/CodecDefault.py b/python/socketd/socketd/transport/core/codec/CodecDefault.py index 7f8e91b1..f6118835 100644 --- a/python/socketd/socketd/transport/core/codec/CodecDefault.py +++ b/python/socketd/socketd/transport/core/codec/CodecDefault.py @@ -90,7 +90,6 @@ class CodecDefault(Codec): sid = self.decodeString(_reader, by, Constants.MAX_SIZE_SID) event = self.decodeString(_reader, by, Constants.MAX_SIZE_EVENT) metaString = self.decodeString(_reader, by, Constants.MAX_SIZE_META_STRING) - # 2. decode body dataRealSize = len0 - _reader.position() data: Optional[bytearray] = None @@ -106,6 +105,7 @@ class CodecDefault(Codec): message = MessageBuilder().flag(Flags.of(flag)).sid(sid).event(event).entity( EntityDefault().meta_string_set(metaString).data_set(data) ).build() + by.close() _reader.close() return Frame(message.flag(), message) diff --git a/python/socketd/socketd/transport/core/entity/EntityDefault.py b/python/socketd/socketd/transport/core/entity/EntityDefault.py index 9241e54a..ff8c2688 100644 --- a/python/socketd/socketd/transport/core/entity/EntityDefault.py +++ b/python/socketd/socketd/transport/core/entity/EntityDefault.py @@ -1,7 +1,7 @@ import os from io import BytesIO, TextIOWrapper, BufferedReader -from typing import Any, Optional +from typing import Any, Optional, BinaryIO from socketd.transport.core.Entity import Entity from socketd.transport.core.Costants import Constants @@ -93,7 +93,7 @@ class EntityDefault(Entity): def del_meta(self, name:str): self.meta_del(name) - def data_set(self, data: bytes | bytearray | memoryview | BytesIO | BufferedReader): + def data_set(self, data: bytes | bytearray | memoryview | BytesIO | BinaryIO): _type = type(data) if _type == BytesIO: self._data = data diff --git a/python/socketd/socketd/transport/core/entity/FileEntity.py b/python/socketd/socketd/transport/core/entity/FileEntity.py index e81258cb..2c0c0df0 100644 --- a/python/socketd/socketd/transport/core/entity/FileEntity.py +++ b/python/socketd/socketd/transport/core/entity/FileEntity.py @@ -1,4 +1,4 @@ -from io import BufferedReader +from io import BufferedReader, BytesIO from typing import BinaryIO from socketd.exception.SocketDExecption import SocketDCodecException @@ -8,9 +8,9 @@ from socketd.transport.core.entity.EntityDefault import EntityDefault class FileEntity(EntityDefault): - def __init__(self, file: BufferedReader, filename: str): + def __init__(self, file: BinaryIO, filename: str): super().__init__() - self._file: BufferedReader = file + self._file: BinaryIO = file self.data_set(file) self.meta_put(EntityMetas.META_DATA_DISPOSITION_FILENAME, filename) diff --git a/python/socketd/socketd_aio_tcp/TcpAioProvider.py b/python/socketd/socketd_aio_tcp/TcpAioProvider.py index 7385212d..0a38d2c0 100644 --- a/python/socketd/socketd_aio_tcp/TcpAioProvider.py +++ b/python/socketd/socketd_aio_tcp/TcpAioProvider.py @@ -10,7 +10,7 @@ from socketd_aio_tcp import TCPAIOServer, TcpAioClient class TcpAioProvider(ClientProvider, ServerProvider): def schema(self) -> list[str]: - return ["tcp", "tcp-python", "std:tcp"] + return ["tcp", "tcp-python", "std:tcp", "sd:tcp"] def create_server(self, serverConfig: ServerConfig) -> Server: return TCPAIOServer(serverConfig) diff --git a/python/socketd/socketd_websocket/impl/AIOWebSocketClientImpl.py b/python/socketd/socketd_websocket/impl/AIOWebSocketClientImpl.py index 5fa2a7e2..c497877a 100644 --- a/python/socketd/socketd_websocket/impl/AIOWebSocketClientImpl.py +++ b/python/socketd/socketd_websocket/impl/AIOWebSocketClientImpl.py @@ -126,7 +126,7 @@ class AIOWebSocketClientImpl(WebSocketClientProtocol): self.on_error(e) async def on_close(self): - self.client.get_processor().on_close(self.channel) + await self.client.get_processor().on_close(self.channel) if self.handshake_future is not None: await asyncio.wait(self.__on_receive_tasks, timeout=10) await asyncio.wait([self._handler_future], timeout=10) -- Gitee From f70e806543d8097131acbd94980918d6833c7bd0 Mon Sep 17 00:00:00 2001 From: bai <1145000687@qq.com> Date: Tue, 26 Mar 2024 12:02:40 +0800 Subject: [PATCH 6/6] =?UTF-8?q?py:=20tcp=E5=85=B3=E9=97=ADsock?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- python/socketd/socketd_aio_tcp/TcpAIOChannelAssistant.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/socketd/socketd_aio_tcp/TcpAIOChannelAssistant.py b/python/socketd/socketd_aio_tcp/TcpAIOChannelAssistant.py index 8611d80f..ccdb7dc2 100644 --- a/python/socketd/socketd_aio_tcp/TcpAIOChannelAssistant.py +++ b/python/socketd/socketd_aio_tcp/TcpAIOChannelAssistant.py @@ -34,7 +34,7 @@ class TcpAIOChannelAssistant(ChannelAssistant): return not getattr(stream_io.sock, "_closed") async def close(self, stream_io: TCPStreamIO) -> None: - # stream_io.sock.close() + stream_io.sock.close() stream_io.writer.close() def get_remote_address(self, stream_io: TCPStreamIO) -> str: -- Gitee