当前仓库属于暂停状态,部分功能使用受限,详情请查阅 仓库状态说明
1 Star 0 Fork 9

Zhouji0212/ctp_quote_mongo
暂停

forked from 海风/ctp_quote_mongo
暂停
 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
Tick2Mongo.cs 14.29 KB
一键复制 编辑 原始数据 按行查看 历史
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using HaiFeng;
using MongoDB.Bson;
using MongoDB.Driver;
using MongoDB.Driver.Builders;
using QueryDBData;
using MarketData = QueryDBData.MarketData;
using Newtonsoft.Json;
using System.IO;
namespace RealQuote
{
public class Tick2Mongo
{
Trade _t = null;
Quote _q = null;
Thread _trdOnOff = null;
int l = Console.CursorLeft;
int t = Console.CursorTop;
//private int _err = 0;
private string _investor;
private string _pwd;
private string _broker;
private string _trade;
private string _quote;
private readonly ConcurrentDictionary<string, MarketData> _dicTick = new ConcurrentDictionary<string, MarketData>(); //用于处理000数据
ConcurrentDictionary<string, double> _dicInstRate = new ConcurrentDictionary<string, double>(); //合约在000中的占比
QueryDB _qry = null;//"mongodb://root:[email protected]:27017/?authenticationDatabase=admin");
private string tradingDay = string.Empty;
private List<string> _inst888 = null;
//private bool _nigh_tick_exists = false; //保存夜盘tick的文件是否存在
private void Log(string pMsg)
{
Console.WriteLine($"{DateTime.Now,-20}{pMsg}");
}
/// <summary>
///
/// </summary>
/// <param name="front">tcp://192.168.105.72:53213|tcp://192.168.105.72:53205</param>
/// <param name="broker"></param>
/// <param name="investor"></param>
/// <param name="pwd"></param>
/// <param name="host"></param>
/// <param name="port"></param>
public Tick2Mongo(string front, string broker, string investor, string pwd, string mongo_url)
{
_qry = new QueryDB(mongo_url);
_trade = front.Split('|')[0];
_quote = front.Split('|')[1];
_investor = investor;
_pwd = pwd;
_broker = broker;
_trdOnOff = new Thread(() => Process());
_trdOnOff.Start();
}
void Process()
{
var list = _qry.TradeDate;
Console.WriteLine($"get trading dates: {list.Count}");
while (true)
{
//开始
if (_t == null)
{
//20170611:处理周末时判断错误的bug(因today为周六的日期)
if (list.IndexOf(DateTime.Today.ToString("yyyyMMdd")) < 0)
{
Log($"{DateTime.Today.ToString("yyyyMMdd")} 非交易日");
//直至下一日8:50再运行:以修复非交易日启动后周一凌晨错误启动的bug
Thread.Sleep(DateTime.Today.AddDays(1).Add(TimeSpan.Parse("08:30:00")) - DateTime.Now);
//Thread.Sleep(DateTime.Today.AddDays(1) - DateTime.Now + TimeSpan.FromMinutes(1)); //下一日再执行
list = _qry.TradeDate; //跨年时需要重新取到日期
continue;
}
if ((DateTime.Now.TimeOfDay > TimeSpan.Parse("08:50:00") && DateTime.Now.TimeOfDay < TimeSpan.Parse("15:00:00")) || (DateTime.Now.TimeOfDay > TimeSpan.Parse("20:50:00") || DateTime.Now.TimeOfDay < TimeSpan.Parse("02:00:00")))
{
Start();
Log("接口启动");
}
}
//结束
else
{
//退出接口:1.全部结束 2.2:00:00后全部非交易状态
if ((_t.DicExcStatus.Count(n => n.Value != ExchangeStatusType.Closed) == 0) || (DateTime.Now.TimeOfDay > TimeSpan.Parse("02:00:00") && DateTime.Now.TimeOfDay < TimeSpan.Parse("03:00:00") && _t.DicExcStatus.Count(n => n.Value == ExchangeStatusType.Trading) == 0))
{
//当日结束
if (_t.DicExcStatus.Count(n => n.Value != ExchangeStatusType.Closed) == 0)
{
//主力合约
var cfg = _qry.GetDatabase("future_config");
var coll = cfg.GetCollection("future_888");
var insts = coll.FindAll();
foreach (var g in _q.DicTick.GroupBy(n => _t.DicInstrumentField[n.Key].ProductID))
{
var instMax = g.First(n => n.Value.OpenInterest == g.Max(m => m.Value.OpenInterest)).Value.InstrumentID;
var inst = g.Key + "888";
var doc = insts.FirstOrDefault(n => n["_id"].AsString == inst);
if (doc == null || doc["value"].AsString.CompareTo(instMax) < 0 && _q.DicTick.TryGetValue(doc["value"].AsString, out HaiFeng.MarketData preTick) && _q.DicTick[instMax].OpenInterest > preTick.OpenInterest * 1.1)
{
var bd = new BsonDocument
{
new BsonElement("_id", inst),
new BsonElement("value", instMax)
};
coll.Save(bd);
}
}
//清除昨日数据
if (_qry.DBServer.DatabaseExists("future_real"))
_qry.GetDatabase("future_real").Drop();
Log("清除昨日数据");
}
Thread.Sleep(1000 * 5);
Release();
Log("接口退出");
Environment.Exit(0);
}
}
Thread.Sleep(TimeSpan.FromMinutes(1));
}
}
void Start()
{
_q = new CTPQuote();
_q.OnFrontConnected += (snd, ea) => { Log("行情登录"); _q.ReqUserLogin(_investor, _pwd, _broker); };
_q.OnRspUserLogin += _q_OnRspUserLogin;
_q.OnRspUserLogout += (snd, ea) => Log("行情退出" + ea.Value);
_q.OnRtnTick += _q_OnRtnTick;
_t = new CTPTrade();
_t.OnFrontConnected += (snd, ea) => _t.ReqUserLogin(_investor, _pwd, _broker);
_t.OnRspUserLogout += (snd, ea) => Log("交易退出" + ea.Value);
_t.OnRtnExchangeStatus += _t_OnRtnExchangeStatus;
_t.OnRspUserLogin += _t_OnRspUserLogin;
_t.ReqConnect(_trade);
}
private void _q_OnRspUserLogin(object sender, IntEventArgs e)
{
_q.ReqSubscribeMarketData(_dicInstRate.Keys.ToArray());
Log("行情订阅");
l = Console.CursorLeft;
t = Console.CursorTop;
}
private void _t_OnRspUserLogin(object sender, IntEventArgs e)
{
if (e.Value != 0)
{
Log("trade login error:" + e.Value);
//_err = ea.Value;
return;
}
Log("trade login success." + _t.TradingDay);
//保存品种信息
var c = _qry.GetDatabase("future_config").GetCollection("ProductInfo");
foreach (var p in _t.DicInstrumentField.Values.GroupBy(n => n.ProductID))
{
var v = new Product
{
_id = p.Key,
ExchangeID = p.Last().ExchangeID.ToString(),
PriceTick = p.Last().PriceTick,
ProductType = p.Last().ProductClass.ToString(),
VolumeTuple = p.Last().VolumeMultiple,
};
c.Save(v);
}
Log($"保存品种信息");
c = _qry.GetDatabase("future_config").GetCollection("InstrumentInfo");
//c.RemoveAll();
foreach (var p in _t.DicInstrumentField.Values)
{
var v = new Instrument
{
_id = p.InstrumentID,
ProductID = p.ProductID,
};
c.Save(v);
}
//增加xx000的合约信息
foreach (var p in _t.DicInstrumentField.Values.GroupBy(n => n.ProductID))
{
var v = new Instrument
{
_id = p.Key + "000",
ProductID = p.Key,
};
c.Save(v);
}
Log($"保存合约信息");
this.tradingDay = ((Trade)sender).TradingDay;
_inst888 = _qry.Instrument888.Values.Select(n => n.value).ToList();
_dicInstRate = _qry.Rate000;
new Thread(() =>
{
Thread.Sleep(500);
if (_q != null)
if (_q.IsLogin)
{
_q.ReqSubscribeMarketData(_dicInstRate.Keys.ToArray());
}
else
_q.ReqConnect(_quote);
}).Start();
}
private void _t_OnRtnExchangeStatus(object sender, StatusEventArgs e)
{
//小节收盘时清除对应的指数000数据
if (e.Status == ExchangeStatusType.NoTrading)
{
var list = _dicTick.Keys.Where(n => _t.DicInstrumentField.TryGetValue(n, out InstrumentField instinfo) && instinfo.ProductID == e.Exchange);
foreach (var inst in list)
_dicTick.TryRemove(inst, out MarketData tmp);
}
}
void Release()
{
if (_t != null)
{
//if (_t.IsLogin)
_t.ReqUserLogout();
_t = null;
}
if (_q != null) //行情未登出???
{
//if (_q.IsLogin)
_q.ReqUserLogout();
_q = null;
}
_dicTick.Clear();
}
void _q_OnRtnTick(object sender, TickEventArgs e)
{
if (!_t.DicInstrumentField.TryGetValue(e.Tick.InstrumentID, out InstrumentField instField) || !_t.DicExcStatus.TryGetValue(instField.ProductID, out ExchangeStatusType excStatus))
return;
var md = new MarketData
{
AskPrice = e.Tick.AskPrice,
AskVolume = e.Tick.AskVolume,
AveragePrice = e.Tick.AveragePrice,
BidPrice = e.Tick.BidPrice,
BidVolume = e.Tick.BidVolume,
InstrumentID = e.Tick.InstrumentID,
LastPrice = e.Tick.LastPrice,
LowerLimitPrice = e.Tick.LowerLimitPrice,
OpenInterest = e.Tick.OpenInterest,
UpdateMillisec = e.Tick.UpdateMillisec,
UpdateTime = e.Tick.UpdateTime,
UpperLimitPrice = e.Tick.UpperLimitPrice,
Volume = e.Tick.Volume,
};
if (!SaveBar(md)) return;
if (_dicTick.TryAdd(md.InstrumentID, md)) return; //第2次才处理000
if (excStatus != ExchangeStatusType.Trading) return; //交易时段才保存数据
if (_inst888.IndexOf(md.InstrumentID) >= 0) //主力驱动
if (Make000Double(md, out MarketData tick000))
{
SaveBar(tick000);
tick000.UpdateTime = md.UpdateTime;
tick000.UpdateMillisec = md.UpdateMillisec;
}
_dicTick[md.InstrumentID] = md; //000计算后再更新数据
}
bool Make000Double(MarketData f888, out MarketData tick000)
{
tick000 = null;
//应该已经处理过了.if (!_qry.FixTime(tradingDay, f)) return false; //时间修复为yyyyMMdd HH:mm:ss
var instInfo = _qry.InstrumentInfo[f888.InstrumentID];
tick000 = _dicTick.GetOrAdd(instInfo.ProductID + "000", new MarketData
{
InstrumentID = instInfo.ProductID + "000",
UpdateTime = f888.UpdateTime,
UpdateMillisec = f888.UpdateMillisec,//以主力合约为基准时间
});
//只处理rate中的合约
var ticks = _dicTick.Where(n => n.Key != instInfo.ProductID + "000" && _qry.InstrumentInfo[n.Key].ProductID == instInfo.ProductID);
tick000.LastPrice = 0;
tick000.Volume = 0;
tick000.OpenInterest = 0;
//收到所有合约的数据
if (ticks.Count() == _dicInstRate.Count(n => _qry.InstrumentInfo[n.Key].ProductID == instInfo.ProductID))
foreach (var t in ticks)
{
var rate = _dicInstRate[t.Value.InstrumentID];
tick000.LastPrice += t.Value.LastPrice * rate;
tick000.Volume += t.Value.Volume;
tick000.OpenInterest += t.Value.OpenInterest;
}
else if (ticks.Count() == 2) //只收到2个
{
var cnt = ticks.Sum(n => n.Value.OpenInterest);
foreach (var t in ticks)
{
var rate = t.Value.OpenInterest / cnt;
tick000.LastPrice += t.Value.LastPrice * rate;
tick000.Volume += t.Value.Volume;
tick000.OpenInterest += t.Value.OpenInterest;
}
}
else
return false;
tick000.LastPrice = (int)(tick000.LastPrice / _qry.ProductInfo[instInfo.ProductID].PriceTick) * _qry.ProductInfo[instInfo.ProductID].PriceTick;//小数修正
return true;
}
private bool SaveBar(QueryDBData.MarketData tick)
{
if (!_qry.Tick2Bar(tick, _t.TradingDay, out Bar bar))
return false;
var db = _qry.GetDatabase("future_real");
var col = db.GetCollection<BsonDocument>(tick.InstrumentID);
col.Save(bar);
Console.SetCursorPosition(l, t);
Console.Write($"{DateTime.Now.ToString("HH:mm:ss")}\t{tick.UpdateTime}\t{tick.InstrumentID,-20}");
return true;
}
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C#
1
https://gitee.com/zhouji0212/ctp_quote_mongo.git
[email protected]:zhouji0212/ctp_quote_mongo.git
zhouji0212
ctp_quote_mongo
ctp_quote_mongo
master

搜索帮助