MyCaffe  1.12.2.41
Deep learning software for Windows C# programmers.
DataQuery.cs
1using MyCaffe.basecode;
2using System;
3using System.Collections.Generic;
4using System.Linq;
5using System.Text;
6using System.Threading;
7using System.Threading.Tasks;
8
9namespace MyCaffe.db.stream
10{
14 public class DataQuery : IDisposable
15 {
16 CancelEvent m_evtCancel = new CancelEvent();
17 ManualResetEvent m_evtQueryEnabled = new ManualResetEvent(false);
18 ManualResetEvent m_evtPaused = new ManualResetEvent(false);
19 bool m_bQueryEnd = false;
20 object m_objSync = new object();
21 Queue<double[]> m_rgDataQueue = new Queue<double[]>();
22 Task m_queryTask;
23 IXCustomQuery m_iquery;
24 DateTime m_dtStart;
25 DateTime m_dt;
26 TimeSpan m_tsInc;
27 int m_nMaxCount = 0;
28 int m_nSegmentSize = 1;
29
38 public DataQuery(IXCustomQuery iquery, DateTime dtStart, TimeSpan tsInc, int nSegmentSize, int nMaxCount)
39 {
40 m_nSegmentSize = nSegmentSize;
41 m_nMaxCount = nMaxCount;
42 m_dtStart = dtStart;
43 m_dt = dtStart;
44 m_tsInc = tsInc;
45 m_iquery = iquery;
46 m_queryTask = Task.Factory.StartNew(new Action(queryThread));
47 }
48
52 public void Dispose()
53 {
54 Shutdown();
55 }
56
60 public void Shutdown()
61 {
62 m_evtCancel.Set();
63 }
64
68 public int FieldCount
69 {
70 get { return m_iquery.FieldCount; }
71 }
72
77 {
78 get
79 {
80 if (m_evtQueryEnabled.WaitOne(0))
81 return true;
82 else
83 return false;
84 }
85 set
86 {
87 if (value)
88 m_evtQueryEnabled.Set();
89 else
90 m_evtQueryEnabled.Reset();
91 }
92 }
93
97 public int Count
98 {
99 get { return m_rgDataQueue.Count; }
100 }
101
107 public bool DataReady(int nCount)
108 {
109 if (m_rgDataQueue.Count < nCount)
110 return false;
111
112 return true;
113 }
114
119 public bool DataDone()
120 {
121 return m_bQueryEnd;
122 }
123
129 public double[] PeekDataAt(int nIdx)
130 {
131 if (nIdx >= m_rgDataQueue.Count)
132 return null;
133
134 return m_rgDataQueue.ElementAt(nIdx);
135 }
136
143 public double PeekDataAt(int nIdx, int nFieldIdx)
144 {
145 double[] rg = m_rgDataQueue.ElementAt(nIdx);
146 return rg[nFieldIdx];
147 }
148
153 public double[] GetNextData()
154 {
155 lock (m_objSync)
156 {
157 if (m_rgDataQueue.Count == 0)
158 return null;
159
160 return m_rgDataQueue.Dequeue();
161 }
162 }
163
168 public void Reset(int nStartOffset)
169 {
170 m_evtQueryEnabled.Reset();
171 m_evtPaused.WaitOne();
172
173 m_iquery.Reset();
174 m_dt = m_dtStart;
175
176 if (nStartOffset != 0)
177 m_dt += TimeSpan.FromMilliseconds(nStartOffset * m_tsInc.TotalMilliseconds);
178
179 m_rgDataQueue.Clear();
180 m_evtQueryEnabled.Set();
181 }
182
186 private void queryThread()
187 {
188 try
189 {
190 int nWait = 0;
191
192 m_iquery.Open();
193
194 while (!m_evtCancel.WaitOne(nWait))
195 {
196 if (!m_evtQueryEnabled.WaitOne(0))
197 {
198 m_evtPaused.Set();
199 nWait = 250;
200 continue;
201 }
202
203 m_evtPaused.Reset();
204
205 if (m_rgDataQueue.Count >= m_nMaxCount)
206 {
207 nWait = 10;
208 continue;
209 }
210
211 double[] rgData = m_iquery.QueryByTime(m_dt, m_tsInc, m_nSegmentSize);
212 if (rgData == null)
213 {
214 m_bQueryEnd = true;
215 nWait = 10;
216 continue;
217 }
218
219 nWait = 0;
220 m_bQueryEnd = false;
221
222 int nItemCount = rgData.Length / m_nSegmentSize;
223 int nSrcIdx = 0;
224
225 lock (m_objSync)
226 {
227 for (int i = 0; i < m_nSegmentSize; i++)
228 {
229 double[] rgItem = new double[nItemCount];
230 Array.Copy(rgData, nSrcIdx, rgItem, 0, nItemCount);
231 nSrcIdx += nItemCount;
232
233 m_rgDataQueue.Enqueue(rgItem);
234 }
235 }
236
237 m_dt += TimeSpan.FromMilliseconds(m_nSegmentSize * m_tsInc.TotalMilliseconds);
238 }
239 }
240 catch (Exception excpt)
241 {
242 throw excpt;
243 }
244 finally
245 {
246 m_iquery.Close();
247 }
248 }
249 }
250}
The CancelEvent provides an extension to the manual cancel event that allows for overriding the manua...
Definition: CancelEvent.cs:17
bool WaitOne(int nMs=int.MaxValue)
Waits for the signal state to occur.
Definition: CancelEvent.cs:290
void Set()
Sets the event to the signaled state.
Definition: CancelEvent.cs:270
The DataQuery manages a custom query interface and queues data from the custom query via an internal ...
Definition: DataQuery.cs:15
bool EnableQueueThread
Enable/disable the internal query thread.
Definition: DataQuery.cs:77
double[] PeekDataAt(int nIdx)
Returns data at an index within the queue without removing it, or null if no data exists at the index...
Definition: DataQuery.cs:129
int Count
Returns the number of items in the data queue.
Definition: DataQuery.cs:98
bool DataReady(int nCount)
Returns true when data is ready, false otherwise.
Definition: DataQuery.cs:107
void Dispose()
Release all resources used and shutdown.
Definition: DataQuery.cs:52
int FieldCount
Returns the number of fields (including the sync field) that this query manages.
Definition: DataQuery.cs:69
DataQuery(IXCustomQuery iquery, DateTime dtStart, TimeSpan tsInc, int nSegmentSize, int nMaxCount)
The constructor.
Definition: DataQuery.cs:38
double PeekDataAt(int nIdx, int nFieldIdx)
Returns data at an index and field within the queue without removing it.
Definition: DataQuery.cs:143
double[] GetNextData()
Returns the next data and removes it from the queue.
Definition: DataQuery.cs:153
void Reset(int nStartOffset)
Reset the data query to and offset from the start date.
Definition: DataQuery.cs:168
void Shutdown()
Stop the internal query thread.
Definition: DataQuery.cs:60
bool DataDone()
Returns true when there is no more data to query.
Definition: DataQuery.cs:119
The custom query interface defines the functions implemented by each Custom Query object used to spec...
Definition: Interfaces.cs:168
int FieldCount
Returns the field count for this query.
Definition: Interfaces.cs:180
void Open()
Open a connection to the underlying database using the connection string specified.
double[] QueryByTime(DateTime dt, TimeSpan ts, int nCount)
Query the fields specified (in the Open function) starting from the date-time specified.
void Reset()
Reset the custom query.
void Close()
Close a currently open connection.
The MyCaffe.basecode contains all generic types used throughout MyCaffe.
Definition: Annotation.cs:12
The MyCaffe.db.stream namespace contains all data streaming related classes.
The MyCaffe namespace contains the main body of MyCaffe code that closesly tracks the C++ Caffe open-...
Definition: Annotation.cs:12