-
Notifications
You must be signed in to change notification settings - Fork 327
/
Copy pathDataQueue.h
120 lines (110 loc) · 5.43 KB
/
DataQueue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#ifndef OPENSIM_DATA_QUEUE_H_
#define OPENSIM_DATA_QUEUE_H_
/* -------------------------------------------------------------------------- *
* OpenSim: DataQueue.h *
* -------------------------------------------------------------------------- *
* The OpenSim API is a toolkit for musculoskeletal modeling and simulation. *
* See http://opensim.stanford.edu and the NOTICE file for more information. *
* OpenSim is developed at Stanford University and supported by the US *
* National Institutes of Health (U54 GM072970, R24 HD065690) and by DARPA *
* through the Warrior Web program. *
* *
* Copyright (c) 2005-2020 Stanford University and the Authors *
* Author(s): Ayman Habib *
* *
* Licensed under the Apache License, Version 2.0 (the "License"); you may *
* not use this file except in compliance with the License. You may obtain a *
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0. *
* *
* Unless required by applicable law or agreed to in writing, software *
* distributed under the License is distributed on an "AS IS" BASIS, *
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
* See the License for the specific language governing permissions and *
* limitations under the License. *
* -------------------------------------------------------------------------- */
#include <queue>
#include <condition_variable>
#include <SimTKcommon.h>
#include <OpenSim/Common/osimCommonDLL.h>
namespace OpenSim {
//=============================================================================
//=============================================================================
/**
* This base class defines the interface for a DataQueue. A data structure
* to maintain a queue of data to be passed between computations that are potentially
* different in processing speeds, decoupling the producers (e.g. File or live stream)
* from consumers. Synchronization mechanism will be implmented to allow handling of
* multiple threads or significant differences in speeds.
*
* @author Ayman Habib
*/
/** Template class to contain Queue Entries, typically timestamped */
template <class U>
class DataQueueEntry_ {
public:
DataQueueEntry_(double timeStamp, const SimTK::RowVectorView_<U>& data)
: _timeStamp(timeStamp), _data(data){};
DataQueueEntry_(const DataQueueEntry_& other): _data(other._data)
{
_timeStamp = other.getTimeStamp();
};
DataQueueEntry_(DataQueueEntry_&&) = default;
DataQueueEntry_& operator=(const DataQueueEntry_&) { return (*this); };
virtual ~DataQueueEntry_(){};
double getTimeStamp() const { return _timeStamp; };
SimTK::RowVectorView_<U> getData() const { return _data; };
private:
double _timeStamp;
SimTK::RowVectorView_<U> _data;
};
/**
* DataQueue is a wrapper around the std::queue customized to handle data processing
* and synchronization, and limiting the interface to only the subset of operations
* needed for this use case. Synchronization is experimental as of now.
*/
template<class T> class DataQueue_ {
//=============================================================================
// METHODS
//=============================================================================
public:
//--------------------------------------------------------------------------
// CONSTRUCTION
//--------------------------------------------------------------------------
virtual ~DataQueue_() {}
DataQueue_() = default;
DataQueue_(const DataQueue_&){};
DataQueue_(DataQueue_&&){};
DataQueue_& operator=(const DataQueue_&) {
return (*this);
};
//--------------------------------------------------------------------------
// DataQueue Interface
//--------------------------------------------------------------------------
void push_back(const double time, const SimTK::RowVectorView_<T>& data) {
std::unique_lock<std::mutex> mlock(m_mutex);
m_data_queue.push(DataQueueEntry_<T>(time, data));
mlock.unlock(); // unlock before notificiation to minimize mutex con
m_cond.notify_one();
}
void pop_front(double& time, SimTK::RowVector_<T>& data) {
std::unique_lock<std::mutex> mlock(m_mutex);
while (empty()) { m_cond.wait(mlock); }
DataQueueEntry_<SimTK::Rotation> frontEntry = m_data_queue.front();
m_data_queue.pop();
mlock.unlock();
time = frontEntry.getTimeStamp();
data = frontEntry.getData();
}
bool empty() const {
return m_data_queue.empty();
}
private:
// As of now we use std::queue but other data structures could be used as well
std::queue<DataQueueEntry_<T>> m_data_queue;
std::mutex m_mutex;
std::condition_variable m_cond;
//=============================================================================
}; // END of class templatized DataQueue_<T>
//=============================================================================
}
#endif // OPENSIM_DATA_QUEUE_H_