MyCaffe  1.12.2.41
Deep learning software for Windows C# programmers.
TemporalSet.cs
1using Microsoft.VisualBasic.Devices;
2using MyCaffe.basecode;
4using MyCaffe.db.image;
5using SimpleGraphing;
6using System;
7using System.Collections.Generic;
8using System.Diagnostics;
9using System.Linq;
10using System.Text;
11using System.Threading;
12using System.Threading.Tasks;
13
14namespace MyCaffe.db.temporal
15{
19 [Serializable]
20 public class TemporalSet : IDisposable
21 {
22 Exception m_loadException = null;
23 int m_nHistoricSteps;
24 int m_nFutureSteps;
25 int m_nTotalSteps;
26 DateTime? m_dtStart;
27 DateTime? m_dtEnd;
28 ManualResetEvent m_evtCancel = new ManualResetEvent(false);
29 AutoResetEvent m_evtDone = new AutoResetEvent(false);
30 List<ItemSet> m_rgItems = new List<ItemSet>();
31 int m_nLoadLimit = 0;
32 double m_dfReplacementPct;
33 int m_nRefreshUpdateMs;
34 DB_LOAD_METHOD m_loadMethod;
36 CryptoRandom m_random = null;
37 SourceDescriptor m_src;
38 Log m_log;
39 Thread m_loadThread = null;
40 int m_nChunks = 1024;
41 int m_nItemIdx = 0;
42 bool m_bNormalizeData = false;
43 double m_dfLoadPct = 0;
44 object m_objSync = new object();
45
60 public TemporalSet(Log log, DatabaseTemporal db, SourceDescriptor src, DB_LOAD_METHOD loadMethod, int nLoadLimit, double dfReplacementPct, int nRefreshUpdateMs, CryptoRandom random, int nHistoricSteps, int nFutureSteps, int nChunks)
61 {
62 m_log = log;
63 m_random = random;
64 m_db = db;
65 m_src = src;
66 m_loadMethod = loadMethod;
67 m_nLoadLimit = nLoadLimit;
68 m_nHistoricSteps = nHistoricSteps;
69 m_nFutureSteps = nFutureSteps;
70 m_nTotalSteps = m_nHistoricSteps + m_nFutureSteps;
71 m_nChunks = nChunks;
72 m_dfReplacementPct = dfReplacementPct;
73 m_nRefreshUpdateMs = nRefreshUpdateMs;
74
75 if (m_nLoadLimit < 0)
76 m_nLoadLimit = 0;
77
78 if (m_nLoadLimit > 0 && m_nLoadLimit < 1000)
79 m_nLoadLimit = 1000;
80
81 if (m_dfReplacementPct > 0.9)
82 m_dfReplacementPct = 0.9;
83
84 if (m_nRefreshUpdateMs < 250)
85 m_nRefreshUpdateMs = 250;
86 }
87
91 public void Dispose()
92 {
93 m_evtCancel.Set();
94 }
95
99 public void CleanUp()
100 {
101 if (m_loadThread != null)
102 {
103 m_evtCancel.Set();
104 m_loadThread = null;
105 }
106
107 m_rgItems.Clear();
108 }
109
113 public void Reset()
114 {
115 m_nItemIdx = 0;
116
117 foreach (ItemSet item in m_rgItems)
118 {
119 item.Reset();
120 }
121 }
122
129 public bool Initialize(bool bNormalizedData, EventWaitHandle evtCancel)
130 {
131 m_bNormalizeData = bNormalizedData;
132
133 if (m_loadThread == null)
134 {
135 m_loadThread = new Thread(new ThreadStart(loadThread));
136 m_loadThread.Start();
137 }
138
139 if (m_loadMethod == DB_LOAD_METHOD.LOAD_ALL)
140 return WaitForLoadingToComplete(evtCancel);
141
142 return true;
143 }
144
145 private void removeItem(List<ItemSet> rgItems)
146 {
147 lock (m_objSync)
148 {
149 if (m_rgItems == null)
150 return;
151
152 if (m_rgItems.Count == 0)
153 return;
154
155 ItemSet item = rgItems[0];
156 rgItems.RemoveAt(0);
157 item.CleanUp();
158 }
159 }
160
161 private void loadThread()
162 {
163 DatabaseLoader loader = new DatabaseLoader();
164 List<ValueItem> rgItems = m_db.GetAllValueItems(m_src.ID);
165 double dfMemMin = 1000.0;
166
167 try
168 {
169 m_dfLoadPct = 0;
170
171 TemporalDescriptor td = loader.LoadTemporalFromDb(m_src.ID);
173
174 m_dtStart = td.StartDate;
175 m_dtEnd = td.EndDate;
176
177 if (m_dtEnd.HasValue && m_dtStart.HasValue && m_dtEnd <= m_dtStart)
178 throw new Exception("The end date must be greater than the start date.");
179
180 // Load the data.
181 DateTime? dt = m_dtStart;
182 bool bEOD = false;
183
184 Stopwatch sw = new Stopwatch();
185 sw.Start();
186 m_dfLoadPct = 0;
187 int nIdx = 0;
188
189 ComputerInfo info = new ComputerInfo();
190 Stopwatch swReplacement = new Stopwatch();
191
192 swReplacement.Start();
193
194 while (!m_evtCancel.WaitOne(0))
195 {
196 bool bEOD1;
197
198 // Load one chunk for each item.
199 while (m_rgItems.Count < rgItems.Count && (m_nLoadLimit <= 0 || m_rgItems.Count < m_nLoadLimit))
200 {
201 ItemSet item = new ItemSet(m_random, m_db, rgItems[nIdx], rgStrm);
202 item.Load(out bEOD1);
203
204 lock (m_objSync)
205 {
206 m_rgItems.Add(item);
207 }
208
209 m_dfLoadPct = (double)m_rgItems.Count / rgItems.Count;
210 if (bEOD1)
211 bEOD = true;
212
213 if (sw.Elapsed.TotalMilliseconds > 1000)
214 {
215 sw.Restart();
216 m_log.Progress = m_dfLoadPct;
217 m_log.WriteLine("Loading '" + m_src.Name + "' data for item " + item.Item.Name + " (" + m_dfLoadPct.ToString("P") + ")...", true);
218 }
219
220 if (m_evtCancel.WaitOne(0))
221 break;
222
223 nIdx++;
224
225 if (nIdx >= rgItems.Count)
226 nIdx = 0;
227
228 if (nIdx % m_nChunks == 0)
229 break;
230 }
231
232 if (m_nLoadLimit > 0)
233 {
234 int nLoadLimit = m_nLoadLimit;
235
236 if ((int)swReplacement.Elapsed.TotalMilliseconds > m_nRefreshUpdateMs)
237 nLoadLimit = (int)(m_nLoadLimit * (1 - m_dfReplacementPct));
238
239 if (nLoadLimit > 1000)
240 {
241 while (m_rgItems.Count > nLoadLimit)
242 {
243 removeItem(m_rgItems);
244 }
245
246 GC.Collect(2, GCCollectionMode.Forced);
247 }
248 }
249
250 ulong nMem = info.AvailablePhysicalMemory;
251 double dfMb = (double)nMem / (1024.0 * 1024.0);
252 int nRetryCount = 0;
253
254 while (dfMb < dfMemMin && nRetryCount < 10 && m_rgItems.Count > 1000)
255 {
256 m_log.WriteLine("Waiting for memory to free up...", true);
257
258 for (int i=0; i<1000 && m_rgItems.Count > 1000; i++)
259 {
260 removeItem(m_rgItems);
261 }
262
263 GC.Collect(2, GCCollectionMode.Forced);
264 nMem = info.AvailablePhysicalMemory;
265 dfMb = (double)nMem / (1024.0 * 1024.0);
266
267 nRetryCount++;
268 Thread.Sleep(250);
269 }
270
271 if (dfMb < dfMemMin)
272 return;
273
274 if (m_rgItems.Count == rgItems.Count && m_nLoadLimit <= 0)
275 break;
276 }
277 }
278 catch (Exception excpt)
279 {
280 m_loadException = excpt;
281 return;
282 }
283 finally
284 {
285 m_evtDone.Set();
286 }
287 }
288
295 public bool WaitForLoadingToComplete(EventWaitHandle evtCancel, int nWaitMs = int.MaxValue)
296 {
297 WaitHandle[] rgWait = new WaitHandle[] { m_evtCancel, evtCancel, m_evtDone };
298 int nWait = WaitHandle.WaitAny(rgWait, nWaitMs);
299
300 if (nWait <= 1)
301 return false;
302
303 return true;
304 }
305
309 public double LoadPercent
310 {
311 get { return m_dfLoadPct; }
312 }
313
318 {
319 get { return m_loadMethod; }
320 }
321
338 public SimpleTemporalDatumCollection GetData(int nQueryIdx, ref int? nItemIdx, ref int? nValueIdx, DB_LABEL_SELECTION_METHOD itemSelectionMethod, DB_ITEM_SELECTION_METHOD valueSelectionMethod, int nValueStepOffset = 1, bool bEnableDebug = false, string strDebugPath = null)
339 {
341
342 lock (m_objSync)
343 {
344 if (itemSelectionMethod == DB_LABEL_SELECTION_METHOD.RANDOM)
345 {
346 m_nItemIdx = m_random.Next(m_rgItems.Count);
347 }
348 else
349 {
350 if (m_nItemIdx >= m_rgItems.Count)
351 m_nItemIdx = 0;
352 }
353
354 if (nItemIdx.HasValue)
355 m_nItemIdx = nItemIdx.Value;
356 nItemIdx = m_nItemIdx;
357
358 if (m_nItemIdx >= m_rgItems.Count)
359 return null;
360
361 data = m_rgItems[m_nItemIdx].GetData(nQueryIdx, ref nValueIdx, valueSelectionMethod, m_nHistoricSteps, m_nFutureSteps, nValueStepOffset, bEnableDebug, strDebugPath);
362
363 int nRetryCount = 0;
364 while (data == null && nRetryCount < 40)
365 {
366 if (itemSelectionMethod == DB_LABEL_SELECTION_METHOD.RANDOM)
367 {
368 m_nItemIdx = m_random.Next(m_rgItems.Count);
369 }
370 else
371 {
372 m_nItemIdx++;
373
374 if (m_nItemIdx >= m_rgItems.Count)
375 m_nItemIdx = 0;
376 }
377
378 nItemIdx = m_nItemIdx;
379 data = m_rgItems[m_nItemIdx].GetData(nQueryIdx, ref nValueIdx, valueSelectionMethod, m_nHistoricSteps, m_nFutureSteps, nValueStepOffset, bEnableDebug, strDebugPath);
380 nRetryCount++;
381 }
382 }
383
384 return data;
385 }
386
391 public int GetCount()
392 {
393 lock (m_objSync)
394 {
395 return m_rgItems.Sum(p => p.GetCount(m_nHistoricSteps + m_nFutureSteps));
396 }
397 }
398
402 public int Count
403 {
404 get
405 {
406 lock (m_objSync)
407 {
408 return m_rgItems.Count;
409 }
410 }
411 }
412 }
413}
The CryptoRandom is a random number generator that can use either the standard .Net Random objec or t...
Definition: CryptoRandom.cs:14
int Next(int nMinVal, int nMaxVal, bool bMaxInclusive=true)
Returns a random int within the range
The Log class provides general output in text form.
Definition: Log.cs:13
void WriteLine(string str, bool bOverrideEnabled=false, bool bHeader=false, bool bError=false, bool bDisable=false)
Write a line of output.
Definition: Log.cs:80
double Progress
Get/set the progress associated with the Log.
Definition: Log.cs:147
The SimpleTemporalDatumCollection manages a collection of SimpleTemporalDatum objects.
Definition: SimpleDatum.cs:91
int ID
Get/set the database ID of the item.
string Name
Get/set the name of the item.
The ordered value stream descriptor set is used to order the value stream descriptors by class and va...
The SourceDescriptor class contains all information describing a data source.
The TemporalDescriptor is used to describe a temporal aspects of the data source.
OrderedValueStreamDescriptorSet OrderedValueStreamDescriptors
Retunrs the ordered set of stream descriptors.
The DatabaseTemporal is used to manage all temporal specific database objects.
List< ValueItem > GetAllValueItems(int nSrcID)
Returns a list of all value items associated with a SourceID.
The ItemSet manages the data for a single item (e.g., customer, station, stock symbol,...
Definition: ItemSet.cs:21
void CleanUp()
Release all resources used.
Definition: ItemSet.cs:52
void Reset()
Reset the value index.
Definition: ItemSet.cs:60
The TemporalSet manages a set of temporal data for a given data source.
Definition: TemporalSet.cs:21
SimpleTemporalDatumCollection GetData(int nQueryIdx, ref int? nItemIdx, ref int? nValueIdx, DB_LABEL_SELECTION_METHOD itemSelectionMethod, DB_ITEM_SELECTION_METHOD valueSelectionMethod, int nValueStepOffset=1, bool bEnableDebug=false, string strDebugPath=null)
Get a data set consisting of the static, historical, and future data for a selected item where the st...
Definition: TemporalSet.cs:338
bool WaitForLoadingToComplete(EventWaitHandle evtCancel, int nWaitMs=int.MaxValue)
Wait for the image set to complete loading.
Definition: TemporalSet.cs:295
void Dispose()
Release all resources used.
Definition: TemporalSet.cs:91
double LoadPercent
Returns the percentage of the load completed.
Definition: TemporalSet.cs:310
void Reset()
Reset all indexes to their starting locations.
Definition: TemporalSet.cs:113
DB_LOAD_METHOD LoadMethod
Get the image load method used on initialization.
Definition: TemporalSet.cs:318
void CleanUp()
Release all resources used by the temporal set and shut down all internal threads.
Definition: TemporalSet.cs:99
TemporalSet(Log log, DatabaseTemporal db, SourceDescriptor src, DB_LOAD_METHOD loadMethod, int nLoadLimit, double dfReplacementPct, int nRefreshUpdateMs, CryptoRandom random, int nHistoricSteps, int nFutureSteps, int nChunks)
The constructor.
Definition: TemporalSet.cs:60
int Count
Return the total number of queries available in the temporal set.
Definition: TemporalSet.cs:403
bool Initialize(bool bNormalizedData, EventWaitHandle evtCancel)
Load the data based on the data load method.
Definition: TemporalSet.cs:129
int GetCount()
Return the total number of queries available in the temporal set.
Definition: TemporalSet.cs:391
The descriptors namespace contains all descriptor used to describe various items stored within the da...
The MyCaffe.basecode contains all generic types used throughout MyCaffe.
Definition: Annotation.cs:12
DB_ITEM_SELECTION_METHOD
Defines the item (e.g., image or temporal item) selection method.
Definition: Interfaces.cs:278
DB_LOAD_METHOD
Defines how to laod the items into the in-memory database.
Definition: Interfaces.cs:154
DB_LABEL_SELECTION_METHOD
Defines the label selection method.
Definition: Interfaces.cs:333
The MyCaffe.db.image namespace contains all image database related classes.
Definition: Database.cs:18
The MyCaffe.db.temporal namespace contains all classes used to create the MyCaffeTemporalDatabase in-...
The MyCaffe namespace contains the main body of MyCaffe code that closesly tracks the C++ Caffe open-...
Definition: Annotation.cs:12