3using System.Collections.Generic;
7using System.Threading.Tasks;
17 ManualResetEvent m_evtQueryEnabled =
new ManualResetEvent(
false);
18 ManualResetEvent m_evtPaused =
new ManualResetEvent(
false);
19 bool m_bQueryEnd =
false;
20 object m_objSync =
new object();
21 Queue<double[]> m_rgDataQueue =
new Queue<double[]>();
28 int m_nSegmentSize = 1;
40 m_nSegmentSize = nSegmentSize;
41 m_nMaxCount = nMaxCount;
46 m_queryTask = Task.Factory.StartNew(
new Action(queryThread));
80 if (m_evtQueryEnabled.WaitOne(0))
88 m_evtQueryEnabled.Set();
90 m_evtQueryEnabled.Reset();
99 get {
return m_rgDataQueue.Count; }
109 if (m_rgDataQueue.Count < nCount)
131 if (nIdx >= m_rgDataQueue.Count)
134 return m_rgDataQueue.ElementAt(nIdx);
145 double[] rg = m_rgDataQueue.ElementAt(nIdx);
146 return rg[nFieldIdx];
157 if (m_rgDataQueue.Count == 0)
160 return m_rgDataQueue.Dequeue();
170 m_evtQueryEnabled.Reset();
171 m_evtPaused.WaitOne();
176 if (nStartOffset != 0)
177 m_dt += TimeSpan.FromMilliseconds(nStartOffset * m_tsInc.TotalMilliseconds);
179 m_rgDataQueue.Clear();
180 m_evtQueryEnabled.Set();
186 private void queryThread()
194 while (!m_evtCancel.
WaitOne(nWait))
196 if (!m_evtQueryEnabled.WaitOne(0))
205 if (m_rgDataQueue.Count >= m_nMaxCount)
211 double[] rgData = m_iquery.
QueryByTime(m_dt, m_tsInc, m_nSegmentSize);
222 int nItemCount = rgData.Length / m_nSegmentSize;
227 for (
int i = 0; i < m_nSegmentSize; i++)
229 double[] rgItem =
new double[nItemCount];
230 Array.Copy(rgData, nSrcIdx, rgItem, 0, nItemCount);
231 nSrcIdx += nItemCount;
233 m_rgDataQueue.Enqueue(rgItem);
237 m_dt += TimeSpan.FromMilliseconds(m_nSegmentSize * m_tsInc.TotalMilliseconds);
240 catch (Exception excpt)
The CancelEvent provides an extension to the manual cancel event that allows for overriding the manua...
bool WaitOne(int nMs=int.MaxValue)
Waits for the signal state to occur.
void Set()
Sets the event to the signaled state.
The DataQuery manages a custom query interface and queues data from the custom query via an internal ...
bool EnableQueueThread
Enable/disable the internal query thread.
double[] PeekDataAt(int nIdx)
Returns data at an index within the queue without removing it, or null if no data exists at the index...
int Count
Returns the number of items in the data queue.
bool DataReady(int nCount)
Returns true when data is ready, false otherwise.
void Dispose()
Release all resources used and shutdown.
int FieldCount
Returns the number of fields (including the sync field) that this query manages.
DataQuery(IXCustomQuery iquery, DateTime dtStart, TimeSpan tsInc, int nSegmentSize, int nMaxCount)
The constructor.
double PeekDataAt(int nIdx, int nFieldIdx)
Returns data at an index and field within the queue without removing it.
double[] GetNextData()
Returns the next data and removes it from the queue.
void Reset(int nStartOffset)
Reset the data query to and offset from the start date.
void Shutdown()
Stop the internal query thread.
bool DataDone()
Returns true when there is no more data to query.
The custom query interface defines the functions implemented by each Custom Query object used to spec...
int FieldCount
Returns the field count for this query.
void Open()
Open a connection to the underlying database using the connection string specified.
double[] QueryByTime(DateTime dt, TimeSpan ts, int nCount)
Query the fields specified (in the Open function) starting from the date-time specified.
void Reset()
Reset the custom query.
void Close()
Close a currently open connection.
The MyCaffe.basecode contains all generic types used throughout MyCaffe.
The MyCaffe.db.stream namespace contains all data streaming related classes.
The MyCaffe namespace contains the main body of MyCaffe code that closesly tracks the C++ Caffe open-...