代码拉取完成,页面将自动刷新
"""
Copyright (C) 2019 Interactive Brokers LLC. All rights reserved. This code is subject to the terms
and conditions of the IB API Non-Commercial License or the IB API Commercial License, as applicable.
"""
"""
This module has tools for implementing the IB low level messaging.
"""
import struct
import logging
import sys
from ibapi.common import UNSET_INTEGER, UNSET_DOUBLE
from ibapi.utils import ClientException
from ibapi.utils import isAsciiPrintable
from ibapi.errors import INVALID_SYMBOL
logger = logging.getLogger(__name__)
def make_msg(text) -> bytes:
""" adds the length prefix """
msg = struct.pack("!I%ds" % len(text), len(text), str.encode(text))
return msg
def make_field(val) -> str:
""" adds the NULL string terminator """
if val is None:
raise ValueError("Cannot send None to TWS")
# if string is not empty and contains invalid symbols
if val is not None and type(val) == str and val and not isAsciiPrintable(val):
raise ClientException(INVALID_SYMBOL.code(), INVALID_SYMBOL.msg(), val.encode(sys.stdout.encoding, errors='ignore').decode(sys.stdout.encoding))
# bool type is encoded as int
if type(val) is bool:
val = int(val)
field = str(val) + '\0'
return field
def make_field_handle_empty(val) -> str:
if val is None:
raise ValueError("Cannot send None to TWS")
if UNSET_INTEGER == val or UNSET_DOUBLE == val:
val = ""
return make_field(val)
def read_msg(buf:bytes) -> tuple:
""" first the size prefix and then the corresponding msg payload """
if len(buf) < 4:
return (0, "", buf)
size = struct.unpack("!I", buf[0:4])[0]
logger.debug("read_msg: size: %d", size)
if len(buf) - 4 >= size:
text = struct.unpack("!%ds" % size, buf[4:4+size])[0]
return (size, text, buf[4+size:])
else:
return (size, "", buf)
def read_fields(buf:bytes) -> tuple:
if isinstance(buf, str):
buf = buf.encode()
""" msg payload is made of fields terminated/separated by NULL chars """
fields = buf.split(b"\0")
return tuple(fields[0:-1]) #last one is empty; this may slow dow things though, TODO
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。