39 #ifndef ORO_LIST_LOCK_FREE_HPP 40 #define ORO_LIST_LOCK_FREE_HPP 43 #include "../os/oro_arch.h" 44 #include "../os/CAS.hpp" 45 #include <boost/intrusive_ptr.hpp> 46 #include "../rtt-config.h" 48 #ifdef ORO_PRAGMA_INTERFACE 97 typedef std::vector<value_t> BufferType;
98 typedef typename BufferType::iterator Iterator;
99 typedef typename BufferType::const_iterator CIterator;
112 StorageImpl(
size_t alloc) : items(
new Item[alloc] ) {
117 Item& operator[](
int i) {
126 typedef boost::intrusive_ptr<StorageImpl> Storage;
128 Storage newStorage(
size_t alloc,
size_t items,
bool init =
true)
130 Storage st(
new StorageImpl(alloc) );
131 for (
unsigned int i=0; i < alloc; ++i) {
132 (*st)[i].data.reserve( items );
144 Item*
volatile active;
145 Item*
volatile blankp;
150 inline size_t BufNum()
const {
151 return MAX_THREADS * 2;
165 : MAX_THREADS(
threads ), blankp(0), required(lsize)
167 const unsigned int BUF_NUM = BufNum();
168 bufs = newStorage( BUF_NUM, lsize );
182 Item* orig = lockAndGetActive(st);
183 res = orig->data.capacity();
196 Item* orig = lockAndGetActive(st);
197 res = orig->data.size();
210 Item* orig = lockAndGetActive(st);
211 res = orig->data.empty();
226 if (required > this->capacity()) {
227 this->reserve( required*2 );
254 if (lsize <= this->capacity() )
257 const unsigned int BUF_NUM = BufNum();
258 Storage res( newStorage(BUF_NUM, lsize,
false) );
261 Item* nextbuf = &(*res)[0];
290 orig = lockAndGetActive();
291 nextbuf->data.clear();
292 Iterator it( orig->data.begin() );
293 while ( it != orig->data.end() ) {
294 nextbuf->data.push_back( *it );
301 }
while (
os::CAS(&active, orig, nextbuf ) ==
false);
304 assert( pointsTo( active, bufs ) );
326 orig = lockAndGetActive(bufptr);
328 nextbuf = findEmptyBuf(bufptr);
329 nextbuf->data.clear();
330 }
while (
os::CAS(&active, orig, nextbuf ) ==
false );
354 orig = lockAndGetActive( bufptr );
355 if ( orig->data.size() == orig->data.capacity() ) {
359 usingbuf = findEmptyBuf( bufptr );
360 usingbuf->data = orig->data;
361 usingbuf->data.push_back( item );
362 }
while (
os::CAS(&active, orig, usingbuf ) ==
false);
376 Item* orig = lockAndGetActive(bufptr);
377 value_t ret(orig->data.front());
388 Item* orig = lockAndGetActive(bufptr);
389 value_t ret(orig->data.back());
401 size_t append(
const std::vector<T>& items)
405 int towrite = items.size();
413 orig = lockAndGetActive( bufptr );
414 int maxwrite = orig->data.capacity() - orig->data.size();
415 if ( maxwrite == 0 ) {
419 if ( towrite > maxwrite )
421 usingbuf = findEmptyBuf( bufptr );
422 usingbuf->data = orig->data;
423 usingbuf->data.insert( usingbuf->data.end(), items.begin(), items.begin() + towrite );
424 }
while (
os::CAS(&active, orig, usingbuf ) ==false );
448 orig = lockAndGetActive( bufptr );
450 nextbuf = findEmptyBuf( bufptr );
451 Iterator it( orig->data.begin() );
452 while (it != orig->data.end() && !( *it == item ) ) {
453 nextbuf->data.push_back( *it );
456 if ( it == orig->data.end() ) {
462 while ( it != orig->data.end() ) {
463 nextbuf->data.push_back( *it );
466 }
while (
os::CAS(&active, orig, nextbuf ) ==
false );
479 template<
typename Pred>
484 bool removed_sth =
false;
492 orig = lockAndGetActive( bufptr );
494 nextbuf = findEmptyBuf( bufptr );
496 Iterator it(orig->data.begin());
497 while (it != orig->data.end()) {
499 nextbuf->data.push_back( *it );
511 }
while (
os::CAS(&active, orig, nextbuf ) ==
false );
523 template<
class Function>
527 Item* orig = lockAndGetActive(st);
528 Iterator it( orig->data.begin() );
529 while ( it != orig->data.end() ) {
551 template<
class Function>
555 Item* orig = lockAndGetActive(st);
556 Item* newp = findEmptyBuf(st);
557 Iterator it( orig->data.begin() );
559 while ( it != orig->data.end() ) {
560 newp->data.push_back( *it );
564 it = blankp->data.begin();
566 while ( it != blankp->data.end() ) {
600 bool res = this->erase(item);
601 Item* orig = lockAndGetBlank(st);
603 Iterator it( orig->data.begin() );
605 while ( *it != item ) {
607 if (it == orig->data.end() ) {
625 template<
class Function>
626 value_t
find_if( Function func, value_t blank = value_t() )
629 Item* orig = lockAndGetActive(st);
630 Iterator it( orig->data.begin() );
631 while ( it != orig->data.end() ) {
632 if (func( *it ) == true ) {
649 Item* findEmptyBuf(Storage& bufptr) {
652 Item* start = &(*bufptr)[0];
658 if (start == &(*bufptr)[0] + BufNum() )
659 start = &(*bufptr)[0];
661 assert( pointsTo(start, bufptr) );
670 Item* lockAndGetActive(Storage& bufptr)
const {
680 if ( pointsTo(orig, bufptr) )
688 }
while ( active != orig );
689 assert( pointsTo(orig, bufptr) );
698 Item* lockAndGetActive()
const {
709 }
while ( active != orig );
716 Item* lockAndGetBlank(Storage& bufptr)
const {
726 if ( pointsTo(orig, bufptr) )
734 }
while ( blankp != orig );
735 assert( pointsTo(orig, bufptr) );
739 inline bool pointsTo( Item* p,
const Storage& bf )
const {
740 return p >= &(*bf)[0] && p <= &(*bf)[ BufNum() - 1 ];
value_t front() const
Returns the first element of the list.
const unsigned int MAX_THREADS
The maximum number of threads.
void reserve(size_t lsize)
Reserve a capacity for this list.
bool empty() const
Returns true if this list is empty.
size_t append(const std::vector< T > &items)
Append a sequence of values to the list.
#define ORONUM_OS_MAX_THREADS
void RTT_API intrusive_ptr_add_ref(RTT::internal::IntrusiveStorage *p)
int oro_atomic_inc_and_test(oro_atomic_t *a)
Increment a atomically and test for zero.
size_t capacity() const
Returns the maximum number of elements this list can hold.
bool erase_and_blank(value_t item, value_t blank)
Erase an element from the list and blank it if possible.
void RTT_API intrusive_ptr_release(RTT::internal::IntrusiveStorage *p)
void clear()
Clears all elements in the list.
bool CAS(volatile T *addr, const V &expected, const W &value)
Compare And Swap.
void apply(Function func)
Apply a function to the elements of the whole list.
void oro_atomic_inc(oro_atomic_t *a)
Increment a atomically.
void shrink(size_t items=1)
Shrink the capacity with at most n items.
void apply_and_blank(Function func, value_t blank)
Apply a function to the non-blanked elements of the list.
ListLockFree(unsigned int lsize, unsigned int threads=ORONUM_OS_MAX_THREADS)
Create a lock-free list wich can store lsize elements.
value_t find_if(Function func, value_t blank=value_t())
Find an item in the list such that func( item ) == true.
size_t size() const
Returns the current number of elements in this list.
void oro_atomic_set(oro_atomic_t *a, int n)
Sets the current counter value of the atomic structure a to n.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
void grow(size_t items=1)
Grow the capacity to contain at least n additional items.
A simple lock-free list implementation to append or erase data of type T.
AtomicInt threads(0)
The number of threads in addition to the main() thread.
void oro_atomic_dec(oro_atomic_t *a)
Decrement a atomically.
bool append(value_t item)
Append a single value to the list.
bool erase(value_t item)
Erase a value from the list.
value_t back() const
Returns the last element of the list.
Structure that contains an int for atomic operations.
bool delete_if(Pred pred)
Erase a value from the list.