38 #ifndef ORO_BUFFER_LOCK_FREE_HPP 39 #define ORO_BUFFER_LOCK_FREE_HPP 41 #include "../os/oro_arch.h" 42 #include "../os/Atomic.hpp" 43 #include "../os/CAS.hpp" 45 #include "../internal/AtomicMWSRQueue.hpp" 46 #include "../internal/AtomicMWMRQueue.hpp" 47 #include "../internal/TsPool.hpp" 50 #ifdef ORO_PRAGMA_INTERFACE 101 : MAX_THREADS(options.max_threads())
102 , mcircular(options.circular()), initialized(false)
103 , bufs((!options.circular() && !options.multiple_readers()) ?
106 , mpool(new
internal::
TsPool<Item>(bufsize + options.max_threads()))
117 : MAX_THREADS(options.max_threads())
118 , mcircular(options.circular()), initialized(false)
119 , bufs((!options.circular() && !options.multiple_readers()) ?
122 , mpool(new
internal::
TsPool<Item>(bufsize + options.max_threads()))
138 if (!initialized || reset) {
189 return droppedSamples.
read();
194 if (!mcircular && (
capacity() == (size_type)bufs->
size() )) {
195 droppedSamples.
inc();
202 droppedSamples.
inc();
206 if (bufs->
dequeue( mitem ) == false ) {
207 droppedSamples.
inc();
216 if (bufs->
enqueue( mitem ) == false ) {
222 droppedSamples.
inc();
230 droppedSamples.
inc();
238 }
while ( bufs->
enqueue( mitem ) == false );
244 size_type
Push(
const std::vector<value_t>& items)
247 int towrite = items.size();
248 size_type written = 0;
249 typename std::vector<value_t>::const_iterator it;
250 for( it = items.begin(); it != items.end(); ++it) {
251 if ( this->
Push( *it ) == false ) {
256 droppedSamples.
add(towrite - written);
264 if (bufs->
dequeue( ipop ) == false )
272 size_type
Pop(std::vector<value_t>& items )
277 items.push_back( *ipop );
287 if (bufs->
dequeue( ipop ) == false )
A Lock-free buffer implementation to read and write data of type T in a FIFO way. ...
bool empty() const
Check if this buffer is empty.
void data_sample(const T &sample)
Initializes every element of the pool with the given sample and clears the pool.
size_type Push(const std::vector< value_t > &items)
Write a sequence of values to the buffer.
boost::call_traits< T >::reference reference_t
bool full() const
Check if this buffer is full.
C++ abstraction of atomic integer operations.
A Buffer is an object which is used to store (Push) and retrieve (Pop) values from.
virtual bool data_sample(param_t sample, bool reset=true)
Initializes this buffer with a data sample, such that for dynamical allocated types T...
FlowStatus
Returns the status of a data flow read operation.
size_type size() const
Returns the actual number of items that are stored in the buffer.
virtual size_type capacity() const =0
Return the maximum number of items this queue can contain.
boost::call_traits< T >::param_type param_t
void clear()
Clears all contents of this buffer.
size_type Pop(std::vector< value_t > &items)
Read the whole buffer.
FlowStatus Pop(reference_t item)
Read the oldest value from the buffer.
virtual bool isEmpty() const =0
Inspect if the Queue is empty.
virtual bool dequeue(T &result)=0
Dequeue an item.
BufferBase::size_type size_type
Create an atomic, non-blocking single ended queue (FIFO) for storing a pointer to T...
BufferInterface< T >::size_type size_type
virtual bool enqueue(const T &value)=0
Enqueue an item.
BufferLockFree(unsigned int bufsize, param_t initial_value, const Options &options=Options())
Create a lock-free buffer which can store bufsize elements.
const unsigned int MAX_THREADS
The maximum number of threads.
bool deallocate(T *Value)
BufferLockFree(unsigned int bufsize, const Options &options=Options())
Create an uninitialized lock-free buffer which can store bufsize elements.
virtual value_t data_sample() const
Reads back a data sample.
BufferBase::Options Options
bool Push(param_t item)
Write a single value to the buffer.
int read() const
Read the current value of the integer.
virtual size_type size() const =0
Return the exact number of elements in the queue.
size_type capacity() const
Returns the maximum number of items that can be stored in the buffer.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
BufferInterface< T >::param_t param_t
void Release(value_t *item)
Releases the pointer.
virtual bool isFull() const =0
Inspect if the Queue is full.
Create an atomic, non-blocking Multi-Writer Single-Reader FIFO for storing a pointer T by value...
BufferInterface< T >::reference_t reference_t
value_t * PopWithoutRelease()
Returns a pointer to the first element in the buffer.
virtual size_type dropped() const
Returns the number of dropped samples, because the buffer was full.