How can I get multiple historical data requests from Interactive Brokers (IB) C# sample app. (threading UI problems, background threads)

huangapple go评论62阅读模式
英文:

How can I get multiple historical data requests from Interactive Brokers (IB) C# sample app. (threading UI problems, background threads)

问题

我认为这个问题需要了解Interactive Broker API附带的C#示例应用程序的工作原理。我试图修改IB API附带的C#示例应用程序以请求多个历史数据系列,但遇到了多个问题。似乎我需要让程序等待当前数据请求完成后再提交第二个、第三个等等请求。如果我添加以下代码(假设我添加了一个按钮来启动该过程),第二个请求似乎会“覆盖”第一个请求,破坏了应用程序中每个系列生成的图表。也会因为太多请求导致IB开始拒绝数据请求的竞态条件。这种情况下的代码是:

private void button7_Click(object sender, EventArgs e)
{
    if (IsConnected)
    {
        srInputTickerFile = new StreamReader(stInputTickerFileName);
        if (!inputTickerFileOpenFlag)
        {
            srInputTickerFile = new StreamReader(stInputTickerFileName);
            inputTickerFileOpenFlag = true;
        }

        while (srInputTickerFile.Peek() >= 0)
        {
            // ... 代码继续 ...
        }
        srInputTickerFile.Close();
    }
}

我尝试了其他方法,例如在函数调用后添加以下代码:

historicalDataManager.AddRequest

System.Threading.Thread.Sleep(3000);

这里的问题似乎是应用程序将控制权传递回UI线程以处理图表,从而阻止图表显示。

如果尝试使用后台工作线程,并将代码的“while”部分放在其中,即:

new Thread(() =>
{
    Thread.CurrentThread.IsBackground = true;
    // ... 代码继续 ...
}).Start();

这段代码会在historicalDataManager.AddRequest处引发错误,因为它位于UI线程上,而不是工作线程上。如果在while循环中添加代码以更新应用程序中的文本框,它也会生成错误,因为它们位于UI线程上。

所以,问题归结为:

  1. 如何正确地限制数据请求。
  2. 如果使用后台工作线程,如何在IB应用程序中访问UI线程上的函数调用。
英文:

I think this problem requires and understanding of how Interactive Broker's C# sample app that comes with the API works. I am trying to modify IB's sample C# app that comes with the API to request multiple historical data series, but I have run into multiple problems. It seems that I need to have the program wait until the current data request is completed before submitting the second, third.. etc etc etc. If I add the following code (assuming I added a button to start the process), the second request seems to "run over" the first request, messing up the charts that each series produces in the app. That is the data is merged into one chart. Also too many requests causes a racing condition that IB starts rejecting the data requests. The code in this case is:

private void button7_Click(object sender, EventArgs e)
{
    if (IsConnected)
    {
        srInputTickerFile = new StreamReader(stInputTickerFileName);
        if (!inputTickerFileOpenFlag)
        {
            srInputTickerFile = new StreamReader(stInputTickerFileName);
            inputTickerFileOpenFlag = true;
        }

        while (srInputTickerFile.Peek() >= 0)
        {
            String line = srInputTickerFile.ReadLine();
            String[] lineSplit = line.Split(',');
            string ticker = lineSplit[0];
            string timeDateTo = lineSplit[1];
            string myDuration = lineSplit[2];
            string myBarSize = lineSplit[3];
            string myWhatToShow = lineSplit[4];
            int myRTH = Int32.Parse(lineSplit[5]);
            Contract contract = GetMDContractForHistorical(line);
            contract.ConId = histCounter + 1;
            string endTime = hdRequest_EndTime.Text.Trim();
            string duration = hdRequest_Duration.Text.Trim() + " " +
                hdRequest_TimeUnit.Text.Trim();
            string barSize = hdRequest_BarSize.Text.Trim();
            string whatToShow = hdRequest_WhatToShow.Text.Trim();
            int outsideRTH = contractMDRTH.Checked ? 1 : 0;
            historicalDataManager.AddRequest(contract, timeDateTo, myDuration, 
            myBarSize, myWhatToShow, myRTH, 1, cbKeepUpToDate.Checked);
            historicalDataTab.Text = Utils.ContractToString(contract) + " (HD)";
            ShowTab(marketData_MDT, historicalDataTab);
        }
        srInputTickerFile.Close();
    }
}

I have tried other things such as adding after the function call

 historicalDataManager.AddRequest

the following code so that it can process the request.

  System.Threading.Thread.Sleep(3000);

The problem here seems to be that the app passes control back to the UI thread to process the graphs, thereby preventing the chart to be displayed.

If I try to use a background worker thread, and put the "while" part of the code inside it. That is,

new Thread(() =>
{
    Thread.CurrentThread.IsBackground = true;
    //while code above goes here
}).Start();

This code give an error at historicalDataManager.AddRequest because it resides on the UI thread, not the worker thread. It will also generate an error if I add code in the while loop to update the text boxes in the app because they reside on the UI thread.

So I guess it boils down to:

  1. How to properly throttle the data requests.
  2. If a background worker thread is used, how to access function calls on the UI thread in the IB app.

答案1

得分: 0

  1. 如何正确地限制数据请求的速率。
    这取决于您想要达到的目标。如果您需要一个简单的解决方案,可以使用以下代码:
// 在UI线程上,非阻塞
Task.Delay(1000).ContinueWith(() => {
    // 发起请求2
});

为了完全控制所有请求,并保持历史数据请求的50/秒 API 消息限制和6/分钟(软限制)或短期突发限制,我建议至少使用两个基于生产者/消费者模型的单独线程。以下是一个基本的大纲,可以帮助您入门(替代上面的 Task.Delay):

// 用于排队请求的数据类
class Request
{
    public int? reqId;
    public ReqType? reqType, reqFrom;
    public bool snapshot = false, regulatorySnaphsot = false;
    public string? genericTickList;

    // 对于历史数据
    public string? endDateTime, duration, barSize, show;
    public int? useRTH, formatDate;
    public bool update = false;

    public Request(int reqId, ReqType reqType, Contract contract, string endDateTime, string duration, string barSize, string show, int useRTH, int formatDate, bool update)
    {
        // 历史数据
        this.reqId = reqId;
        this.reqType = reqType;
        this.contract = contract;
        this.endDateTime = endDateTime;
        this.duration = duration;
        this.barSize = barSize;
        this.show = show;
        this.useRTH = useRTH;
        this.formatDate = formatDate;
        this.update = update;
    }
}

/*
 * 此类接收所有请求,将其排队并带有延迟处理,以避免超过消息速率限制
 * 还需要处理所有未完成的请求,以便可以在一个地方取消数据请求
 */
class Requests<T> : IDisposable where T : Request
{
    // 省略部分内容以节省空间
}

// 在UI表单中使用
Requests = new(iBclient);
requests.Add(new HistoricalData(......));
  1. 如果使用后台工作线程,如何在 IB 应用程序中访问 UI 线程上的函数调用。

iBclient 类在单独的线程上运行,并将事件和数据发送回 UI 线程。这意味着您的 UI 线程仅在接收到数据时工作,永远不需要在 UI 线程上使用 Thread.Sleep 或 DoEvents。

您可以通过以下方式在 UI 线程上接收这些事件:

private readonly EReaderMonitorSignal signal;
private readonly IBClient ibClient;

partial class MyForm : Form
{
    public MyForm()
    {
        // 省略部分内容以节省空间
    }

    private void Recv_ConnectedAndReady(ConnectionStatusMessage msg)
    {
        // 我们现在已连接并可以发起请求
        // 旧版本的 API 使用 NextOrderId(int)
        ibClient.ClientSocket.reqHistoricalData(reqId, contract, endDateTime, durationStr, "5 min", "Trades", 0, 0, new List<TagValue>());
    }

    // 省略其他事件处理函数的内容以节省空间
}

请注意,上述代码中的事件处理函数会在 UI 线程上接收事件和数据。

英文:

> So I guess it boils down to:
> 1) How to properly throttle the data requests.
> 2) If a background worker thread is used, how to access function calls on the UI thread in the IB app.

1) How to properly throttle the data requests.
It depends how far you want to go.
If you want a simple solution you can use...

// On UI thread, non-blocking
Task.Delay(1000).ContinueWith(() =&gt; {
    // make request 2
}

For complete control of all requests, and to maintain both the 50/sec api message limit and the 6/min (soft) or short term burst limits of historical data requests I would recommend at least 2 separate threads based on a producer/consumer model.

A basic outline to get you started follows... (alternative to Task.Delay above)

// The data class used to queue a request
class Request
{
	public int? reqId;
	public ReqType? reqType, reqFrom;
	public bool snapshot = false, regulatorySnaphsot = false;
	public string? genericTickList;

	// For historical data
	public string? endDateTime, duration, barSize, show;
	public int? useRTH, formatDate;
	public bool update = false;

	public Request(int reqId, ReqType reqType, Contract contract, string endDateTime, string duration, string barSize, string show, int useRTH, int formatDate, bool update)
	{
		// Historical data
		this.reqId = reqId;
		this.reqType = reqType;
		this.contract = contract;
		this.endDateTime = endDateTime;
		this.duration = duration;
		this.barSize = barSize;
		this.show = show;
		this.useRTH = useRTH;
		this.formatDate = formatDate;
		this.update = update;
	}
}

/*
	* This class receives all requests, queues them and process with delay to avoid exceeding message rate limitations
	* It also needs to take care of all outstanding requests, so requests for data can be cancelled in one place.
*/
class Requests&lt;T&gt;:IDisposable where T : Request
{
	private readonly IBClient ibClient;
	private readonly object msgLock = new();
	private readonly object historyLock = new();
	private readonly Thread msgThread, contractThread, mktDataThread, historyThread;
	private readonly Queue&lt;Request&gt; msgQueue = new();
	private readonly Queue&lt;Request&gt; historyQueue = new();
	private readonly static AutoResetEvent waitHistory = new(true);
	private readonly Dictionary&lt;int, Request&gt; historyReqs = new();


	public Requests(IBClient ibClient)
	{
		this.ibClient = ibClient;
		msgThread = new Thread(ConsumeMsg);
		msgThread.Start();

		historyThread = new Thread(ConsumeHistory);
		historyThread.Start();
	}

	private void EnqueueMsg(T req)
	{
		lock(msgLock)
		{
			msgQueue.Enqueue(req);
			Monitor.PulseAll(msgLock);
		}
	}
	private void ConsumeMsg()
	{
		/*
			* The message queue does not wait other than ~25ms for rate limitation of messages (50/sec max).
			* Other queues are responsible for limiting request rates depending on request type.
			* We do not increment any counters here, that is done in the respective queue that rate limits requests by type
			* EVERY counter is decremented here!
		*/
		while(true)
		{
			Request req;
			lock(msgLock)
			{
				while(msgQueue.Count == 0)
					Monitor.Wait(msgLock);    // Wait for next Task in queue
				req = msgQueue.Dequeue();
				if(req == null)
					return;      // This signals our exit
			}

			switch(req.reqType)
			{
				case ReqType.History:
					ibClient.ClientSocket.reqHistoricalData((int)req.reqId, req.contract, req.endDateTime, req.duration, req.barSize, req.show, req.useRTH ?? 1, req.formatDate ?? 1, req.update, new List&lt;TagValue&gt;());
					break;

				case ReqType.CancelHistory:
					ibClient.ClientSocket.cancelHistoricalData((int)req.reqId);
					historyReqs.Remove((int)req.reqId);
					break;
			}
			Thread.Sleep(20); // This prevents over 50 msg per second.
		}
	}


	public void HistoricalData(int reqId, Contract contract, string endDateTime, string duration, string barSize, string show, int useRTH, int formatDate, bool update)
	{
		EnqueueHistoryRequest((T)new Request(reqId, ReqType.History, contract, endDateTime, duration, barSize, show, useRTH, formatDate, update));
	}
	public void CancelHistoryData(int reqId)
	{
		EnqueueMsg((T)new Request(reqId, ReqType.CancelHistory));
	}
	private void EnqueueHistoryRequest(T req)
	{
		lock(historyLock)
		{
			historyQueue.Enqueue(req);
			Monitor.PulseAll(historyLock);
		}
	}
	private void ConsumeHistory()
	{
		while(true)
		{
			Request req;
			lock(historyLock)
			{
				while(historyQueue.Count == 0)
					Monitor.Wait(historyLock);      // Wait for next Task in queue
				req = historyQueue.Dequeue();
			}
			if(req == null) return;         // This signals our exit
			

			EnqueueMsg((T)req);
			// We actually have a soft 6/min limit on hist data.
			// This delay does not follow that, we&#39;re limiting the number to 50 instead.
			Thread.Sleep(800);
		}
	}
	public void HistDataRecieved(int reqID)
	{
		historyReqs.Remove(reqID);
		if(!waitHistory.SafeWaitHandle.IsClosed &amp;&amp; ReqCntHistory &lt; MaxReqHistory)
			waitHistory.Set();     // We can proceed if less than maxHistory requests outstanding
	}
}

To use, in a UI form

Requests = new(iBclient);
requests.Add(new HistoricalData(......));

2) If a background worker thread is used, how to access function calls on the UI thread in the IB app.

The iBclient class runs on a separate thread, and sends events and data back to the UI thread.
This means your UI thread only works when data is received, there is never any need to Thread.Sleep or DoEvents on the UI thread.

You can receive these events on the UI thread by registering such as below...

private readonly EReaderMonitorSignal signal;
private readonly IBClient ibClient;


partial class MyForm:Form
{
	public MyForm()
	{
		InitializeComponent();
		signal = new EReaderMonitorSignal();
		ibClient = new(signal);

		// Register for events

		// Depending on your API version use one of the following to know when connected.
		ibClient.ConnectedAndReady += Recv_ConnectedAndReady;
		ibClient.NextOrderId += Recv_NextOrderId;
		
		ibClient.Error += Recv_Error;
		ibClient.HistoricalData += Recv_HistoricalData;
		ibClient.HistoricalDataEnd += Recv_HistoricalDataEnd;


		// Connect and start API thread.
		ibClient.ClientId = 0;
		ibClient.ClientSocket.eConnect(&quot;127.0.0.1&quot;, 7496, 0); // (ClientId = 0) This is the master client that will receive all orders (even from TWS)
		var reader = new EReader(ibClient.ClientSocket, signal);
		reader.Start();

		new Thread(() =&gt;
		{
			while(ibClient.ClientSocket.IsConnected())
			{
				signal.waitForSignal();
				reader.processMsgs();
			}
		})
		{ IsBackground = true }.Start();
	}
		
		
	private void Recv_ConnectedAndReady(ConnectionStatusMessage msg)
	{
		// We are now connected and can make requests
		// Older API versions use NextOrderId(int)
		ibClient.ClientSocket.reqHistoricalData(reqId, contract, endDateTime, durationStr, &quot;5 min&quot;, &quot;Trades&quot;, 0, 0, new List&lt;TagValue&gt;())
	}
	
	private void Recv_Error(int reqId, int errorCode, string? str, string? ordRejectJson, Exception? ex)
	{
		// Possible combinations of parameters
		// reqId, errorCode, str, ex
		//
		// 0,0,null,Exception
		// 0,0,string,null
		// -1,int,string,null
		// int,int,string,null
	}
	
	private void Recv_HistoricalData(HistoricalDataMessage msg)
	{
		//msg.RequestId is the same Id you used to make the request. This is how you can separate data to different charts.
	}
	
	
	private void Recv_HistoricalDataEnd(HistoricalDataEndMessage msg)
	{
		//request complete.
	}
}

huangapple
  • 本文由 发表于 2023年4月20日 04:58:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/76058760.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定