2 Star 1 Fork 1

云金杞/ibapi

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
comm.py 2.15 KB
一键复制 编辑 原始数据 按行查看 历史
云金杞 提交于 2021-12-25 13:27 . add ibapi to this repo
"""
Copyright (C) 2019 Interactive Brokers LLC. All rights reserved. This code is subject to the terms
and conditions of the IB API Non-Commercial License or the IB API Commercial License, as applicable.
"""
"""
This module has tools for implementing the IB low level messaging.
"""
import struct
import logging
import sys
from ibapi.common import UNSET_INTEGER, UNSET_DOUBLE
from ibapi.utils import ClientException
from ibapi.utils import isAsciiPrintable
from ibapi.errors import INVALID_SYMBOL
logger = logging.getLogger(__name__)
def make_msg(text) -> bytes:
""" adds the length prefix """
msg = struct.pack("!I%ds" % len(text), len(text), str.encode(text))
return msg
def make_field(val) -> str:
""" adds the NULL string terminator """
if val is None:
raise ValueError("Cannot send None to TWS")
# if string is not empty and contains invalid symbols
if val is not None and type(val) == str and val and not isAsciiPrintable(val):
raise ClientException(INVALID_SYMBOL.code(), INVALID_SYMBOL.msg(), val.encode(sys.stdout.encoding, errors='ignore').decode(sys.stdout.encoding))
# bool type is encoded as int
if type(val) is bool:
val = int(val)
field = str(val) + '\0'
return field
def make_field_handle_empty(val) -> str:
if val is None:
raise ValueError("Cannot send None to TWS")
if UNSET_INTEGER == val or UNSET_DOUBLE == val:
val = ""
return make_field(val)
def read_msg(buf:bytes) -> tuple:
""" first the size prefix and then the corresponding msg payload """
if len(buf) < 4:
return (0, "", buf)
size = struct.unpack("!I", buf[0:4])[0]
logger.debug("read_msg: size: %d", size)
if len(buf) - 4 >= size:
text = struct.unpack("!%ds" % size, buf[4:4+size])[0]
return (size, text, buf[4+size:])
else:
return (size, "", buf)
def read_fields(buf:bytes) -> tuple:
if isinstance(buf, str):
buf = buf.encode()
""" msg payload is made of fields terminated/separated by NULL chars """
fields = buf.split(b"\0")
return tuple(fields[0:-1]) #last one is empty; this may slow dow things though, TODO
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/yunjinqi/ibapi.git
[email protected]:yunjinqi/ibapi.git
yunjinqi
ibapi
ibapi
master

搜索帮助