diff --git a/python/socketd/setup.py b/python/socketd/setup.py index 98298eec7205049cb81f2b0400b39384febc2200..66a5eaf9e3cffe3249d55159250a842daf42b335 100644 --- a/python/socketd/setup.py +++ b/python/socketd/setup.py @@ -1,10 +1,14 @@ #! /usr/bin/env python # -*- coding: utf-8 -*_ -from distutils.core import setup -import setuptools +import sys + +from setuptools import find_packages with open("README.md", "r", encoding="utf-8") as fh: long_description = fh.read() - +if sys.version_info <= (3, 10): + from distutils.core import setup +elif sys.version_info >= (3, 12): + from setuptools import setup setup( name='socketD', # 包的名字 version='0.0.1', # 版本号 @@ -12,7 +16,7 @@ setup( author='bai,noear', # 作者 author_email='loserbai@foxmail.com,9979331@qq.com', # 你的邮箱** url='https://socketd.noear.org/', # 可以写github上的地址,或者其他地址 - packages=setuptools.find_packages(exclude=['test']), # 包内不需要引用的文件夹 + packages=find_packages(exclude=['test']), # 包内不需要引用的文件夹 # 依赖包 install_requires=[ @@ -30,5 +34,5 @@ setup( 'Topic :: Software Development :: Libraries' ], zip_safe=True, - python_requires='>=3.10', + python_requires='>=3.10', # 建议使用3.12及以上 ) diff --git a/python/socketd/socketd/transport/core/config/ConfigBase.py b/python/socketd/socketd/transport/core/config/ConfigBase.py index 6c2f1fac28d71f08d1a789de5499245463baa0a2..8d4ed82c9ead54e2551d4966f2f226961db8b917 100644 --- a/python/socketd/socketd/transport/core/config/ConfigBase.py +++ b/python/socketd/socketd/transport/core/config/ConfigBase.py @@ -37,7 +37,7 @@ class ConfigBase(Config, ABC): self._max_udp_size = 2048 # ws最大传输大小 # socket.d 分片最小16m - self._ws_max_size = 2 ** 20 * 17 + self._ws_max_size = (2 << 19) * 17 self.__is_thread = False self.__logger_level = "INFO" @@ -69,10 +69,10 @@ class ConfigBase(Config, ABC): self._fragmentHandler = fragmentHandler return self - def get_id_generator(self) -> Callable[[None], Any]: + def get_id_generator(self) -> Callable[[], Any]: return self._idGenerator - def id_generator(self, _idGenerator: Callable[[None], Any]): + def id_generator(self, _idGenerator: Callable[[], Any]): # assert _idGenerator is None self._idGenerator = _idGenerator return self diff --git a/python/socketd/socketd/transport/core/impl/ProcessorDefault.py b/python/socketd/socketd/transport/core/impl/ProcessorDefault.py index b11d377d90943527d949bad38cd9f654f18e6f71..ff59c98d7dfa26431adce148ead942a1c00a0acf 100644 --- a/python/socketd/socketd/transport/core/impl/ProcessorDefault.py +++ b/python/socketd/socketd/transport/core/impl/ProcessorDefault.py @@ -134,7 +134,7 @@ class ProcessorDefault(Processor, ABC): async def on_message(self, channel: ChannelInternal, message: Message): AsyncUtil.thread_loop(self.listener.on_message( channel.get_session(), message), - thread=channel.get_config().get_executor()) + pool=channel.get_config().get_executor()) def on_close(self, channel: ChannelInternal): self.listener.on_close(channel.get_session()) diff --git a/python/socketd/socketd/transport/stream/RequestStream.py b/python/socketd/socketd/transport/stream/RequestStream.py index b617e818d573e71215539e51dd659a9156d4e1e4..a738ff9ba11fe0978349f6cf3b037ff4ab05ce36 100644 --- a/python/socketd/socketd/transport/stream/RequestStream.py +++ b/python/socketd/socketd/transport/stream/RequestStream.py @@ -25,11 +25,13 @@ class RequestStream(StreamBase): except Exception as _e: raise SocketDException(f"Request failed, sid= sid={self.get_sid()} {str(_e)}") - def await_result(self): - return self.__await__() + async def await_result(self): + if self._future.done(): + return self._future.get_result() + return await self.__await__() async def on_reply(self, message: Reply): - return self._future.set_result(message) + return await self._future.set_result(message) def then_reply(self, onReply: Callable[[Reply], None]): self._future.then_callback(onReply) diff --git a/python/socketd/socketd/transport/stream/StreamBase.py b/python/socketd/socketd/transport/stream/StreamBase.py index 4d93b964cb5b3e706872469cd514bcd3585ca45f..847cd38e680d8d9dfb06f87222f1c7cc93a34231 100644 --- a/python/socketd/socketd/transport/stream/StreamBase.py +++ b/python/socketd/socketd/transport/stream/StreamBase.py @@ -11,6 +11,7 @@ class StreamBase(StreamInternal, ABC): """流接收器基类""" def __init__(self, sid: str, demands: int, timeout: int): + self.__onError: Callable[[Exception], None] = None self.__sid = sid self.__timeout = timeout self.__demands = demands diff --git a/python/socketd/socketd/transport/stream/StreamMangerDefault.py b/python/socketd/socketd/transport/stream/StreamMangerDefault.py index f7db11f9010bc26b0d527b233633440040b6d24a..4190c13b6f023f7bce793d6a4e05ab2547837980 100644 --- a/python/socketd/socketd/transport/stream/StreamMangerDefault.py +++ b/python/socketd/socketd/transport/stream/StreamMangerDefault.py @@ -1,6 +1,6 @@ import logging -from socketd.transport.stream.StreamManger import StreamManger +from socketd.transport.stream.StreamManger import StreamManger, StreamInternal logger = logging.getLogger(__name__) @@ -8,7 +8,7 @@ logger = logging.getLogger(__name__) class StreamMangerDefault(StreamManger): def __init__(self, config): self.config = config - self.stream_map = {} + self.stream_map:[str, StreamInternal] = {} def get_stream(self, sid): return self.stream_map.get(sid) @@ -26,7 +26,7 @@ class StreamMangerDefault(StreamManger): stream.insurance_start(self, stream_timeout) def remove_stream(self, sid): - stream = self.stream_map.pop(sid, None) + stream: StreamInternal = self.stream_map.pop(sid, None) if stream: stream.insurance_cancel() logger.debug(f"{self.config.get_role_name()} stream removed, sid={sid}") diff --git a/python/socketd/socketd/transport/utils/CompletableFuture.py b/python/socketd/socketd/transport/utils/CompletableFuture.py index 6ba4e3e430179b8d3ae3a2ae6f1ed6aeac60a125..3fbf9cb0241dbc242faaf0ae9974b99f59d2d0d4 100644 --- a/python/socketd/socketd/transport/utils/CompletableFuture.py +++ b/python/socketd/socketd/transport/utils/CompletableFuture.py @@ -18,10 +18,10 @@ class CompletableFuture(Generic[T]): self._lock = Lock() def get(self, timeout): - async def _get(): - await asyncio.wait_for(self._future, timeout) - return self._future.result() - + with self._lock: + async def _get(): + await asyncio.wait_for(self._future, timeout) + return self._future.result() return _get() def accept(self, result: T): @@ -46,9 +46,10 @@ class CompletableFuture(Generic[T]): except Exception as e: await callback(self._future.result(), e) - def set_result(self, t: T): + async def set_result(self, t: T): with self._lock: self._future.set_result(t) + await self._future def set_e(self, e: Exception): self._future.set_exception(e) diff --git a/python/socketd/socketd_websocket/impl/AIOWebSocketClientImpl.py b/python/socketd/socketd_websocket/impl/AIOWebSocketClientImpl.py index a2fa983e5baaac3c22b7c33e03a52edd30e3fa99..bb628786dd6a3f4de28342ef2ca99f61af32ae40 100644 --- a/python/socketd/socketd_websocket/impl/AIOWebSocketClientImpl.py +++ b/python/socketd/socketd_websocket/impl/AIOWebSocketClientImpl.py @@ -8,7 +8,7 @@ from websockets.uri import WebSocketURI from socketd.exception.SocketdExecption import SocketDConnectionException from socketd.transport.client.ClientHandshakeResult import ClientHandshakeResult from socketd.transport.core.impl.ChannelDefault import ChannelDefault -from websockets import WebSocketClientProtocol, Origin, Subprotocol, HeadersLike +from websockets import WebSocketClientProtocol, Origin, Subprotocol, HeadersLike, ConnectionClosedOK from socketd.transport.core.Costants import Flag from socketd.transport.core.Frame import Frame @@ -96,8 +96,8 @@ class AIOWebSocketClientImpl(WebSocketClientProtocol): return # frame: Frame = self.client.get_assistant().read(message) frame: Frame = await _loop.run_in_executor(None, - lambda _message: self.client.get_assistant().read(_message), - message) + lambda _message: self.client.get_assistant().read(_message), + message) if frame is not None: self.status_state = frame.get_flag() if frame.get_flag() == Flag.Connack: @@ -122,9 +122,10 @@ class AIOWebSocketClientImpl(WebSocketClientProtocol): except SocketDConnectionException as s: self.handshake_future.accept(ClientHandshakeResult(self.channel, s)) logger.warning(s) + except ConnectionClosedOK as e: + pass except Exception as e: self.on_error(e) - raise e def on_close(self): self.client.get_processor().on_close(self.channel) diff --git a/python/socketd/socketd_websocket/impl/AIOWebSocketServerImpl.py b/python/socketd/socketd_websocket/impl/AIOWebSocketServerImpl.py index 53f57124729b0b8fda098d55efce045888c9283a..6c5932144ae11cdd7c9806f96eafd411ed1fab7d 100644 --- a/python/socketd/socketd_websocket/impl/AIOWebSocketServerImpl.py +++ b/python/socketd/socketd_websocket/impl/AIOWebSocketServerImpl.py @@ -2,7 +2,7 @@ import asyncio from threading import Thread from typing import Optional, Union -from websockets import ConnectionClosedError +from websockets import ConnectionClosedError, ConnectionClosedOK from websockets.server import WebSocketServer, WebSocketServerProtocol from socketd.transport.core.Channel import Channel @@ -85,6 +85,9 @@ class AIOWebSocketServerImpl(WebSocketServerProtocol, IWebSocketServer): except asyncio.CancelledError as c: logger.warning(c) break + except ConnectionClosedOK as e: + # received 1000 (OK); then sent 1000 (OK) 或者 1001 成功直接忽略 + break except ConnectionClosedError as e: # 客户端异常关闭 log.error(e) diff --git a/python/socketd/test/TestCase01.py b/python/socketd/test/TestCase01.py index 8c9499e7a750d734f468eb787f5da9df70fb45a6..fb74cca7e5b5331ed83547a33a0d8d1b4562f265 100644 --- a/python/socketd/test/TestCase01.py +++ b/python/socketd/test/TestCase01.py @@ -18,6 +18,7 @@ from test.cases.TestCase12_sendAndSubscribe2rep import TestCase12_sendAndSubscri from test.cases.TestCase13_ssl import TestCase13_ssl from test.cases.TestCase14_timeout import TestCase14_timeout from test.cases.TestCase15_bigString import TestCase15_bigString +from test.cases.TestCase15_connect import TestCase15_connect class TestCase01(unittest.TestCase): @@ -171,6 +172,16 @@ class TestCase01(unittest.TestCase): def test_Case15_bigString(self): for i in range(len(TestCase01.schemas)): t = TestCase15_bigString(TestCase01.schemas[i], 9100 + i) + try: + t.start() + t.stop() + except Exception as e: + t.on_error() + raise e + + def test_Case15_connect(self): + for i in range(len(TestCase01.schemas)): + t = TestCase15_connect(TestCase01.schemas[i], 9100 + i) try: t.start() t.stop() diff --git a/python/socketd/test/cases/TestCase05_file.py b/python/socketd/test/cases/TestCase05_file.py index ece6166e74490a119ac6adadb923d4f39459574a..e796d945bc72e7106bd7d73b6172e75ae5862114 100644 --- a/python/socketd/test/cases/TestCase05_file.py +++ b/python/socketd/test/cases/TestCase05_file.py @@ -22,7 +22,7 @@ from socketd.transport.server.Server import Server def config_handler(config: ServerConfig | ClientConfig) -> ServerConfig | ClientConfig: config.set_is_thread(False) - config.set_ws_max_size(2 ** 20 * 17) + config.set_ws_max_size((2 << 19) * 17) return config.id_generator(uuid.uuid4) diff --git a/python/socketd/test/cases/TestCase11_sendAndRequest2rep.py b/python/socketd/test/cases/TestCase11_sendAndRequest2rep.py index 9b24d68e31c1054ce4d4b6ccc374c3b76760a4ab..35b31a4148ae47bcf5b9cba7df6b4bb24653a7cb 100644 --- a/python/socketd/test/cases/TestCase11_sendAndRequest2rep.py +++ b/python/socketd/test/cases/TestCase11_sendAndRequest2rep.py @@ -1,6 +1,7 @@ import asyncio import uuid +from threading import current_thread from socketd.SocketD import SocketD from socketd.transport.client.ClientConfig import ClientConfig from socketd.transport.core.Message import Message @@ -23,7 +24,7 @@ def config_handler(config: ServerConfig | ClientConfig) -> ServerConfig | Client config.set_is_thread(True) config.set_idle_timeout(10) config.set_logger_level("DEBUG") - config.id_generator(uuid.uuid4) + config.id_generator(lambda: str(uuid.uuid4())) return config @@ -43,9 +44,11 @@ class SimpleListenerTest(Listener): logger.info(f"server::{message.get_data_as_string()} :: {message}") if message.is_request(): req: RequestStream = await session.send_and_request("demo", StringEntity("今天不好"), 100) - # todo 开启单独线程后,在open确认连接后,会停留10s(线程可见性不佳),但是可以解决线程阻塞问题 - await asyncio.sleep(1) + # todo await_result会进行强阻塞线程,导致无法监听到其他线程修改的值,线程可见性,这里就停止0.1等待 + await asyncio.sleep(0.1) + logger.debug(f"开始等待::s::{current_thread().name} eventLoop: {id(asyncio.get_running_loop())}") entity = await req.await_result() + logger.debug(f"等待结束::s::{current_thread().name} eventLoop: {id(asyncio.get_running_loop())}") await session.reply_end(message, entity) logger.info(f"server::res::: {entity}") with self.message_counter: @@ -79,6 +82,7 @@ class ClientListenerTest(Listener): async def on_message(self, session: Session, message: Message): logger.info(f"client: {message} {message.get_data_as_string()}") if message.is_request(): + logger.debug(f"运行::c:: {current_thread().name} eventLoop: {id(asyncio.get_running_loop())}") await session.reply_end(message, StringEntity("很好")) @@ -103,14 +107,17 @@ class TestCase11_sendAndRequest2rep(BaseTestCase): .listen(ClientListenerTest()) \ .config(config_handler) self.client_session: Session = await self.client.open() - await asyncio.sleep(1) + # await asyncio.sleep(1) req: RequestStream = await self.client_session.send_and_request("demo", StringEntity("你好"), 100) + logger.debug(f"开始等待 main {current_thread().name} eventLoop: {id(asyncio.get_running_loop())}") entity = await req.await_result() + logger.debug(f"等待结束 main {current_thread().name} eventLoop: {id(asyncio.get_running_loop())}") logger.info(f"c: res{entity} {entity.get_data_as_string()}") await asyncio.sleep(1) def start(self): super().start() + # self.loop.set_debug(True) self.loop.run_until_complete(self._start()) async def _stop(self): diff --git a/python/socketd/test/cases/TestCase12_sendAndSubscribe2rep.py b/python/socketd/test/cases/TestCase12_sendAndSubscribe2rep.py index 688e358c3bba932f9638ad2357eca5408cdab130..7b1628fedd73249f3f52050e8b173c309c690f9c 100644 --- a/python/socketd/test/cases/TestCase12_sendAndSubscribe2rep.py +++ b/python/socketd/test/cases/TestCase12_sendAndSubscribe2rep.py @@ -109,7 +109,7 @@ class TestCase12_sendAndSubscribe2rep(BaseTestCase): req: SubscribeStream = await self.client_session.send_and_subscribe("demo", StringEntity("hi"), 100) req.then_reply(send_and_subscribe_test) - await asyncio.sleep(1) + await asyncio.sleep(3) def start(self): super().start() diff --git a/python/socketd/test/cases/TestCase15_connect.py b/python/socketd/test/cases/TestCase15_connect.py new file mode 100644 index 0000000000000000000000000000000000000000..2f193b61f0ab5bb344e8712235d2d8cb6d58ade1 --- /dev/null +++ b/python/socketd/test/cases/TestCase15_connect.py @@ -0,0 +1,70 @@ +import asyncio + +from socketd.SocketD import SocketD +from test.modelu.BaseTestCase import BaseTestCase + +from websockets.legacy.server import WebSocketServer + +from socketd.transport.core.Session import Session +from socketd.transport.server.ServerConfig import ServerConfig +from socketd.transport.core.entity.StringEntity import StringEntity +from socketd.transport.server.Server import Server +from test.modelu.SimpleListenerTest import SimpleListenerTest, config_handler +from loguru import logger + + +class TestCase15_connect(BaseTestCase): + + def __init__(self, schema, port): + super().__init__(schema, port) + self.server: Server = None + self.server_session: WebSocketServer = None + self.loop = asyncio.get_event_loop() + self.client_session_queue = asyncio.Queue() + + async def _start(self): + s = SimpleListenerTest() + self.server: Server = SocketD.create_server(ServerConfig(self.schema).set_port(self.port)) + self.server_session: WebSocketServer = await self.server \ + .config(config_handler) \ + .listen(s) \ + .start() + await asyncio.sleep(1) + serverUrl = self.schema + "://127.0.0.1:" + str(self.port) + "/path?u=a&p=2" + async def _main(): + for _ in range(10): + client_session: Session = await SocketD.create_client(serverUrl) \ + .config(config_handler).open() + await client_session.send_and_request("demo", StringEntity("test"), 100) + + for _ in range(3): + await client_session.send("demo", StringEntity("test")) + await self.client_session_queue.put(client_session) + # 并发连接到服务器 10 * 10 + await asyncio.gather(*[_main() for _ in range(10)]) + await asyncio.sleep(30) + logger.info( + f" message {s.server_counter.get()}") + + def start(self): + super().start() + self.loop.run_until_complete(self._start()) + + async def _stop(self): + if not self.client_session_queue.empty(): + for _ in range(self.client_session_queue.qsize()): + client_session = await self.client_session_queue.get() + await client_session.close() + + if self.server_session: + self.server_session.close() + if self.server: + await self.server.stop() + + def stop(self): + super().stop() + + self.loop.run_until_complete(self._stop()) + + def on_error(self): + super().on_error() diff --git a/python/socketd/test/modelu/CompletableFutureTest.py b/python/socketd/test/modelu/CompletableFutureTest.py index 336993c838146ca7cce779f9ada2d224d98a6839..047b9f887c9d797448d4704a540546717b067512 100644 --- a/python/socketd/test/modelu/CompletableFutureTest.py +++ b/python/socketd/test/modelu/CompletableFutureTest.py @@ -1,8 +1,11 @@ import asyncio +import concurrent import unittest +from concurrent.futures import ThreadPoolExecutor from loguru import logger +from socketd.transport.utils.AsyncUtil import AsyncUtil from socketd.transport.utils.CompletableFuture import CompletableFuture @@ -38,3 +41,18 @@ class CompletableFutureTest(unittest.TestCase): print(await c.get(100)) asyncio.run(_test()) + + def test_thread_wait(self): + c = CompletableFuture() + + async def _test(future: CompletableFuture): + future.accept(1) + + async def _main(): + with ThreadPoolExecutor() as executor: + loop = asyncio.get_event_loop() + AsyncUtil.thread_loop(c.get(100), executor) + await loop.run_in_executor(executor, lambda c: asyncio.run(_test(c)), c) + asyncio.run(_main()) + + diff --git a/python/socketd/test/modelu/FurtureTest.py b/python/socketd/test/modelu/FurtureTest.py index 7a2873394b8824a15595f1eaf2bdd15eaa8f721b..f16e3ab42fad4d931e697318eed47812f1023346 100644 --- a/python/socketd/test/modelu/FurtureTest.py +++ b/python/socketd/test/modelu/FurtureTest.py @@ -162,8 +162,7 @@ class FutureTest(unittest.TestCase): task_B = asyncio.create_task(function_B()) print(" await ...") # 等待两个函数完成 - await task_A - await task_B + await asyncio.gather(task_A, task_B) asyncio.run(main()) def test_warp(self):