38 #ifndef CORELIB_DATAOBJECT_LOCK_FREE_HPP 39 #define CORELIB_DATAOBJECT_LOCK_FREE_HPP 42 #include "../os/oro_arch.h" 44 #include "../Logger.hpp" 45 #include "../types/Types.hpp" 46 #include "../internal/DataSourceTypeInfo.hpp" 99 const unsigned int BUF_LEN;
110 : data(), status(
NoData), next()
121 typedef DataBuf*
volatile VolPtrType;
122 typedef DataBuf ValueType;
123 typedef DataBuf* PtrType;
126 VolPtrType write_ptr;
142 : MAX_THREADS(options.max_threads()), BUF_LEN( options.max_threads() + 2),
147 data =
new DataBuf[BUF_LEN];
149 write_ptr = &data[1];
159 : MAX_THREADS(options.max_threads()), BUF_LEN( options.max_threads() + 2),
164 data =
new DataBuf[BUF_LEN];
166 write_ptr = &data[1];
181 virtual value_t
Get()
const {
197 virtual FlowStatus Get( reference_t pull,
bool copy_old_data,
bool copy_sample )
const 199 if (!initialized && !copy_sample) {
211 if ( reading != read_ptr )
223 result = reading->status;
227 ((result ==
OldData) && copy_old_data) || copy_sample) {
228 pull = reading->data;
247 return Get( pull, copy_old_data,
false );
255 virtual bool Set( param_t push )
259 <<
"This might not be real-time safe." << endlog();
263 PtrType writing = write_ptr;
277 if ( writing != write_ptr ) {
287 writing->data = push;
292 PtrType next_write_ptr = writing->next;
294 next_write_ptr == read_ptr )
296 next_write_ptr = next_write_ptr->next;
297 if (next_write_ptr == writing) {
305 write_ptr = next_write_ptr;
311 if (!initialized || reset) {
313 for (
unsigned int i = 0; i < BUF_LEN; ++i) {
314 data[i].data = sample;
316 data[i].next = &data[i+1];
318 data[BUF_LEN-1].next = &data[0];
331 (void)
Get(sample,
true,
true);
338 if (!initialized)
return;
348 if ( reading != read_ptr )
360 result = reading->status;
virtual value_t Get() const
Get a copy of the data.
DataObjectInterface< T >::value_t value_t
virtual bool Set(param_t push)
Set the data to a certain value (non blocking).
virtual FlowStatus Get(reference_t pull, bool copy_old_data, bool copy_sample) const
Get a copy of the Data (non allocating).
virtual value_t data_sample() const
Reads back a data sample.
FlowStatus
Returns the status of a data flow read operation.
int oro_atomic_inc_and_test(oro_atomic_t *a)
Increment a atomically and test for zero.
virtual bool data_sample(param_t sample, bool reset=true)
Provides a data sample to initialize this data object.
DataObjectLockFree(param_t initial_value, const Options &options=Options())
Construct a DataObjectLockFree.
int oro_atomic_read(oro_atomic_t *a)
Returns the current counter value of the atomic structure a.
A helper class to pass optional arguments to the constructor of DataObjectLockFree<T> in order to avo...
static const std::string & getType()
Return the qualified type.
bool CAS(volatile T *addr, const V &expected, const W &value)
Compare And Swap.
virtual FlowStatus Get(reference_t pull, bool copy_old_data=true) const
Get a copy of the Data (non allocating).
void oro_atomic_inc(oro_atomic_t *a)
Increment a atomically.
This DataObject is a Lock-Free implementation, such that reads and writes can happen concurrently wit...
DataObjectInterface< T >::reference_t reference_t
DataObjectBase::Options Options
boost::call_traits< T >::param_type param_t
DataObjectInterface< T >::param_t param_t
const unsigned int MAX_THREADS
The maximum number of threads.
A DataObjectInterface implements multi-threaded read/write solutions.
void oro_atomic_set(oro_atomic_t *a, int n)
Sets the current counter value of the atomic structure a to n.
boost::call_traits< T >::reference reference_t
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
void oro_atomic_dec(oro_atomic_t *a)
Decrement a atomically.
virtual void clear()
Clears any data stored by this data object, so that any subsequent Get() without a new Set() will ret...
DataObjectLockFree(const Options &options=Options())
Construct an uninitialized DataObjectLockFree.
Structure that contains an int for atomic operations.