Orocos Real-Time Toolkit  2.8.3
ListLockFree.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Wed Jan 18 14:11:39 CET 2006 ListLockFree.hpp
3 
4  ListLockFree.hpp - description
5  -------------------
6  begin : Wed January 18 2006
7  copyright : (C) 2006 Peter Soetens
8  email : peter.soetens@mech.kuleuven.be
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #ifndef ORO_LIST_LOCK_FREE_HPP
40 #define ORO_LIST_LOCK_FREE_HPP
41 
42 #include <vector>
43 #include "../os/oro_arch.h"
44 #include "../os/CAS.hpp"
45 #include <boost/intrusive_ptr.hpp>
46 #include "../rtt-config.h"
47 
48 #ifdef ORO_PRAGMA_INTERFACE
49 #pragma interface
50 #endif
51 
52 namespace RTT
53 {
54  namespace internal {
56  {
59  virtual ~IntrusiveStorage();
60  };
61 
64 
65  }
66 }
67 
68 
69 namespace RTT
70 { namespace internal {
71 
84  template< class T>
86  {
87  public:
93  const unsigned int MAX_THREADS;
94 
95  typedef T value_t;
96  private:
97  typedef std::vector<value_t> BufferType;
98  typedef typename BufferType::iterator Iterator;
99  typedef typename BufferType::const_iterator CIterator;
100  struct Item {
101  Item() {
102  //ORO_ATOMIC_INIT(count);
103  oro_atomic_set(&count,-1);
104  }
105  mutable oro_atomic_t count; // refcount
106  BufferType data;
107  };
108 
109  struct StorageImpl : public IntrusiveStorage
110  {
111  Item* items;
112  StorageImpl(size_t alloc) : items( new Item[alloc] ) {
113  }
114  ~StorageImpl() {
115  delete[] items;
116  }
117  Item& operator[](int i) {
118  return items[i];
119  }
120  };
121 
126  typedef boost::intrusive_ptr<StorageImpl> Storage;
127 
128  Storage newStorage(size_t alloc, size_t items, bool init = true)
129  {
130  Storage st( new StorageImpl(alloc) );
131  for (unsigned int i=0; i < alloc; ++i) {
132  (*st)[i].data.reserve( items ); // pre-allocate
133  }
134  // bootstrap the first list :
135  if (init) {
136  active = &(*st)[0];
137  oro_atomic_inc( &active->count );
138  }
139 
140  return st;
141  }
142 
143  Storage bufs;
144  Item* volatile active;
145  Item* volatile blankp;
146 
147  // each thread has one 'working' buffer, and one 'active' buffer
148  // lock. Thus we require to allocate twice as much buffers as threads,
149  // for all the locks to succeed in a worst case scenario.
150  inline size_t BufNum() const {
151  return MAX_THREADS * 2;
152  }
153 
154  size_t required;
155  public:
164  ListLockFree(unsigned int lsize, unsigned int threads = ORONUM_OS_MAX_THREADS )
165  : MAX_THREADS( threads ), blankp(0), required(lsize)
166  {
167  const unsigned int BUF_NUM = BufNum();
168  bufs = newStorage( BUF_NUM, lsize );
169  }
170 
172  }
173 
178  size_t capacity() const
179  {
180  size_t res;
181  Storage st;
182  Item* orig = lockAndGetActive(st);
183  res = orig->data.capacity();
184  oro_atomic_dec( &orig->count ); // lockAndGetActive
185  return res;
186  }
187 
192  size_t size() const
193  {
194  size_t res;
195  Storage st;
196  Item* orig = lockAndGetActive(st);
197  res = orig->data.size();
198  oro_atomic_dec( &orig->count ); // lockAndGetActive
199  return res;
200  }
201 
206  bool empty() const
207  {
208  bool res;
209  Storage st;
210  Item* orig = lockAndGetActive(st);
211  res = orig->data.empty();
212  oro_atomic_dec( &orig->count ); // lockAndGetActive
213  return res;
214  }
215 
224  void grow(size_t items = 1) {
225  required += items;
226  if (required > this->capacity()) {
227  this->reserve( required*2 );
228  }
229  }
238  void shrink(size_t items = 1) {
239  required -= items;
240  }
241 
252  void reserve(size_t lsize)
253  {
254  if (lsize <= this->capacity() )
255  return;
256 
257  const unsigned int BUF_NUM = BufNum();
258  Storage res( newStorage(BUF_NUM, lsize, false) );
259 
260  // init the future 'active' buffer.
261  Item* nextbuf = &(*res)[0];
262  oro_atomic_inc( &nextbuf->count );
263 
264  // temporary for current active buffer.
265  Item* orig = 0;
266 
267  // prevent current bufs from deletion.
268  // will free upon return.
269  Storage save = bufs;
270  // active points at old, bufs points at new:
271  // first the refcount is added to res, then
272  // bufs' pointer is switched to res' pointer,
273  // and stored in a temporary. Then the temp
274  // is destructed and decrements bufs' old reference.
275  bufs = res;
276  // from now on, any findEmptyBuf will use the new bufs,
277  // unless the algorithm was entered before the switch.
278  // then, it will write the result to the old buf.
279  // if it detects we updated active, it will find an
280  // empty buf in the new buf. If it gets there before
281  // our CAS, our CAS will fail and we try to recopy
282  // everything. This retry may be unnessessary
283  // if the data already is in the new buf, but for this
284  // cornercase, we must be sure.
285 
286  // copy active into new:
287  do {
288  if (orig)
289  oro_atomic_dec(&orig->count);
290  orig = lockAndGetActive(); // active is guaranteed to point in valid buffer ( save or bufs )
291  nextbuf->data.clear();
292  Iterator it( orig->data.begin() );
293  while ( it != orig->data.end() ) {
294  nextbuf->data.push_back( *it );
295  ++it;
296  }
297  // see explanation above: active could have changed,
298  // and still point in old buffer. we could check this
299  // with pointer arithmetics, but this is not a performant
300  // method.
301  } while ( os::CAS(&active, orig, nextbuf ) == false);
302  // now,
303  // active is guaranteed to point into bufs.
304  assert( pointsTo( active, bufs ) );
305 
306  oro_atomic_dec( &orig->count ); // lockAndGetActive
307  oro_atomic_dec( &orig->count ); // ref count
308  }
309 
316  void clear()
317  {
318  Storage bufptr;
319  Item* orig(0);
320  Item* nextbuf(0);
321  do {
322  if (orig) {
323  oro_atomic_dec(&orig->count);
324  oro_atomic_dec(&nextbuf->count);
325  }
326  orig = lockAndGetActive(bufptr);
327  orig->data.size();
328  nextbuf = findEmptyBuf(bufptr); // find unused Item in bufs
329  nextbuf->data.clear();
330  } while ( os::CAS(&active, orig, nextbuf ) == false );
331  oro_atomic_dec( &orig->count ); // lockAndGetActive
332  oro_atomic_dec( &orig->count ); // ref count
333  }
334 
344  bool append( value_t item )
345  {
346  Item* orig=0;
347  Storage bufptr;
348  Item* usingbuf(0);
349  do {
350  if (orig) {
351  oro_atomic_dec(&orig->count);
352  oro_atomic_dec(&usingbuf->count);
353  }
354  orig = lockAndGetActive( bufptr );
355  if ( orig->data.size() == orig->data.capacity() ) { // check for full
356  oro_atomic_dec( &orig->count );
357  return false;
358  }
359  usingbuf = findEmptyBuf( bufptr ); // find unused Item in bufs
360  usingbuf->data = orig->data;
361  usingbuf->data.push_back( item );
362  } while ( os::CAS(&active, orig, usingbuf ) ==false);
363  oro_atomic_dec( &orig->count ); // lockAndGetActive()
364  oro_atomic_dec( &orig->count ); // set list free
365  return true;
366  }
367 
373  value_t front() const
374  {
375  Storage bufptr;
376  Item* orig = lockAndGetActive(bufptr);
377  value_t ret(orig->data.front());
378  oro_atomic_dec( &orig->count ); //lockAndGetActive
379  return ret;
380  }
381 
385  value_t back() const
386  {
387  Storage bufptr;
388  Item* orig = lockAndGetActive(bufptr);
389  value_t ret(orig->data.back());
390  oro_atomic_dec( &orig->count ); //lockAndGetActive
391  return ret;
392  }
393 
401  size_t append(const std::vector<T>& items)
402  {
403  Item* usingbuf(0);
404  Item* orig=0;
405  int towrite = items.size();
406  Storage bufptr;
407  do {
408  if (orig) {
409  oro_atomic_dec(&orig->count);
410  oro_atomic_dec(&usingbuf->count);
411  }
412 
413  orig = lockAndGetActive( bufptr );
414  int maxwrite = orig->data.capacity() - orig->data.size();
415  if ( maxwrite == 0 ) {
416  oro_atomic_dec( &orig->count ); // lockAndGetActive()
417  return 0;
418  }
419  if ( towrite > maxwrite )
420  towrite = maxwrite;
421  usingbuf = findEmptyBuf( bufptr ); // find unused Item in bufs
422  usingbuf->data = orig->data;
423  usingbuf->data.insert( usingbuf->data.end(), items.begin(), items.begin() + towrite );
424  } while ( os::CAS(&active, orig, usingbuf ) ==false );
425  oro_atomic_dec( &orig->count ); // lockAndGetActive()
426  oro_atomic_dec( &orig->count ); // set list free
427  return towrite;
428  }
429 
430 
438  bool erase( value_t item )
439  {
440  Item* orig=0;
441  Item* nextbuf(0);
442  Storage bufptr;
443  do {
444  if (orig) {
445  oro_atomic_dec(&orig->count);
446  oro_atomic_dec(&nextbuf->count);
447  }
448  orig = lockAndGetActive( bufptr ); // find active in bufptr
449  // we do this in the loop because bufs can change.
450  nextbuf = findEmptyBuf( bufptr ); // find unused Item in same buf.
451  Iterator it( orig->data.begin() );
452  while (it != orig->data.end() && !( *it == item ) ) {
453  nextbuf->data.push_back( *it );
454  ++it;
455  }
456  if ( it == orig->data.end() ) {
457  oro_atomic_dec( &orig->count );
458  oro_atomic_dec( &nextbuf->count );
459  return false; // item not found.
460  }
461  ++it; // skip item.
462  while ( it != orig->data.end() ) {
463  nextbuf->data.push_back( *it );
464  ++it;
465  }
466  } while ( os::CAS(&active, orig, nextbuf ) ==false );
467  oro_atomic_dec( &orig->count ); // lockAndGetActive
468  oro_atomic_dec( &orig->count ); // ref count
469  return true;
470  }
471 
479  template<typename Pred>
480  bool delete_if(Pred pred)
481  {
482  Item* orig=0;
483  Item* nextbuf(0);
484  bool removed_sth = false;
485  Storage bufptr;
486  do {
487  removed_sth = false;
488  if (orig) {
489  oro_atomic_dec(&orig->count);
490  oro_atomic_dec(&nextbuf->count);
491  }
492  orig = lockAndGetActive( bufptr ); // find active in bufptr
493  // we do this in the loop because bufs can change.
494  nextbuf = findEmptyBuf( bufptr ); // find unused Item in same buf.
495 
496  Iterator it(orig->data.begin());
497  while (it != orig->data.end()) {
498  if (!pred(*it))
499  nextbuf->data.push_back( *it );
500  else
501  removed_sth = true;
502 
503  ++it;
504  }
505 
506  if (!removed_sth) {
507  oro_atomic_dec( &orig->count );
508  oro_atomic_dec( &nextbuf->count );
509  return false; // no matching item found.
510  }
511  } while ( os::CAS(&active, orig, nextbuf ) == false );
512  oro_atomic_dec( &orig->count ); // lockAndGetActive
513  oro_atomic_dec( &orig->count ); // ref count
514  return true;
515  }
516 
517 
523  template<class Function>
524  void apply(Function func )
525  {
526  Storage st;
527  Item* orig = lockAndGetActive(st);
528  Iterator it( orig->data.begin() );
529  while ( it != orig->data.end() ) {
530  func( *it );
531  ++it;
532  }
533  oro_atomic_dec( &orig->count ); //lockAndGetActive
534  }
535 
551  template<class Function>
552  void apply_and_blank(Function func, value_t blank )
553  {
554  Storage st;
555  Item* orig = lockAndGetActive(st);
556  Item* newp = findEmptyBuf(st);
557  Iterator it( orig->data.begin() );
558  // first copy the whole list.
559  while ( it != orig->data.end() ) {
560  newp->data.push_back( *it );
561  ++it;
562  }
563  blankp = newp;
564  it = blankp->data.begin();
565  // iterate over copy and skip blanks.
566  while ( it != blankp->data.end() ) {
567  // XXX Race condition: 'it' can be blanked after
568  // comparison or even during func.
569  value_t a = *it;
570  if ( !(a == blank) )
571  func( a );
572  ++it;
573  }
574  blankp = 0;
575 
576  oro_atomic_dec( &orig->count ); //lockAndGetActive
577  oro_atomic_dec( &newp->count ); //findEmptyBuf
578  }
579 
597  bool erase_and_blank(value_t item, value_t blank )
598  {
599  Storage st;
600  bool res = this->erase(item);
601  Item* orig = lockAndGetBlank(st);
602  if (orig) {
603  Iterator it( orig->data.begin() );
604  // item may still not be present in the blank-list.
605  while ( *it != item ) {
606  ++it;
607  if (it == orig->data.end() ) {
608  oro_atomic_dec( &orig->count ); //lockAndGetBlank
609  return res;
610  }
611  }
612  (*it) = blank;
613  oro_atomic_dec( &orig->count ); //lockAndGetBlank
614  }
615  return res;
616  }
617 
625  template<class Function>
626  value_t find_if( Function func, value_t blank = value_t() )
627  {
628  Storage st;
629  Item* orig = lockAndGetActive(st);
630  Iterator it( orig->data.begin() );
631  while ( it != orig->data.end() ) {
632  if (func( *it ) == true ) {
633  oro_atomic_dec( &orig->count ); //lockAndGetActive
634  return *it;
635  }
636  ++it;
637  }
638  oro_atomic_dec( &orig->count ); //lockAndGetActive
639  return blank;
640  }
641  private:
649  Item* findEmptyBuf(Storage& bufptr) {
650  // These two functions are copy/pasted from BufferLockFree.
651  // If MAX_THREADS is large enough, this will always succeed :
652  Item* start = &(*bufptr)[0];
653  while( true ) {
654  if ( oro_atomic_inc_and_test( &start->count ) )
655  break;
656  oro_atomic_dec( &start->count );
657  ++start;
658  if (start == &(*bufptr)[0] + BufNum() )
659  start = &(*bufptr)[0]; // in case of races, rewind
660  }
661  assert( pointsTo(start, bufptr) );
662  start->data.clear(); // this calls the destructors of T.
663  return start; // unique pointer across all threads
664  }
665 
670  Item* lockAndGetActive(Storage& bufptr) const {
671  // This is a kind-of smart-pointer implementation
672  // We could move it into Item itself and overload operator=
673  Item* orig=0;
674  do {
675  if (orig)
676  oro_atomic_dec( &orig->count );
677  bufptr = bufs;
678  orig = active;
679  // also check that orig points into bufptr.
680  if ( pointsTo(orig, bufptr) )
681  oro_atomic_inc( &orig->count );
682  else {
683  orig = 0;
684  }
685  // this synchronisation point is 'aggressive' (a _sufficient_ condition)
686  // if active is still equal to orig, the increase of orig->count is
687  // surely valid, since no contention (change of active) occured.
688  } while ( active != orig );
689  assert( pointsTo(orig, bufptr) );
690  return orig;
691  }
692 
698  Item* lockAndGetActive() const {
699  // only operates on active's refcount.
700  Item* orig=0;
701  do {
702  if (orig)
703  oro_atomic_dec( &orig->count );
704  orig = active;
705  oro_atomic_inc( &orig->count );
706  // this synchronisation point is 'aggressive' (a _sufficient_ condition)
707  // if active is still equal to orig, the increase of orig->count is
708  // surely valid, since no contention (change of active) occured.
709  } while ( active != orig );
710  return orig;
711  }
712 
716  Item* lockAndGetBlank(Storage& bufptr) const {
717  Item* orig=0;
718  do {
719  if (orig)
720  oro_atomic_dec( &orig->count );
721  bufptr = bufs;
722  orig = blankp;
723  if (orig == 0)
724  return 0; // no blankp.
725  // also check that orig points into bufptr.
726  if ( pointsTo(orig, bufptr) )
727  oro_atomic_inc( &orig->count );
728  else {
729  orig = 0;
730  }
731  // this synchronisation point is 'aggressive' (a _sufficient_ condition)
732  // if active is still equal to orig, the increase of orig->count is
733  // surely valid, since no contention (change of active) occured.
734  } while ( blankp != orig );
735  assert( pointsTo(orig, bufptr) );
736  return orig;
737  }
738 
739  inline bool pointsTo( Item* p, const Storage& bf ) const {
740  return p >= &(*bf)[0] && p <= &(*bf)[ BufNum() - 1 ];
741  }
742 
743  };
744  }
745 }
746 
747 #endif
value_t front() const
Returns the first element of the list.
const unsigned int MAX_THREADS
The maximum number of threads.
void reserve(size_t lsize)
Reserve a capacity for this list.
bool empty() const
Returns true if this list is empty.
size_t append(const std::vector< T > &items)
Append a sequence of values to the list.
#define ORONUM_OS_MAX_THREADS
void RTT_API intrusive_ptr_add_ref(RTT::internal::IntrusiveStorage *p)
int oro_atomic_inc_and_test(oro_atomic_t *a)
Increment a atomically and test for zero.
size_t capacity() const
Returns the maximum number of elements this list can hold.
#define RTT_API
Definition: rtt-config.h:97
bool erase_and_blank(value_t item, value_t blank)
Erase an element from the list and blank it if possible.
void RTT_API intrusive_ptr_release(RTT::internal::IntrusiveStorage *p)
void clear()
Clears all elements in the list.
bool CAS(volatile T *addr, const V &expected, const W &value)
Compare And Swap.
Definition: CAS.hpp:54
void apply(Function func)
Apply a function to the elements of the whole list.
void oro_atomic_inc(oro_atomic_t *a)
Increment a atomically.
void shrink(size_t items=1)
Shrink the capacity with at most n items.
void apply_and_blank(Function func, value_t blank)
Apply a function to the non-blanked elements of the list.
ListLockFree(unsigned int lsize, unsigned int threads=ORONUM_OS_MAX_THREADS)
Create a lock-free list wich can store lsize elements.
value_t find_if(Function func, value_t blank=value_t())
Find an item in the list such that func( item ) == true.
size_t size() const
Returns the current number of elements in this list.
void oro_atomic_set(oro_atomic_t *a, int n)
Sets the current counter value of the atomic structure a to n.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:51
void grow(size_t items=1)
Grow the capacity to contain at least n additional items.
A simple lock-free list implementation to append or erase data of type T.
AtomicInt threads(0)
The number of threads in addition to the main() thread.
Definition: threads.hpp:54
void oro_atomic_dec(oro_atomic_t *a)
Decrement a atomically.
bool append(value_t item)
Append a single value to the list.
bool erase(value_t item)
Erase a value from the list.
value_t back() const
Returns the last element of the list.
Structure that contains an int for atomic operations.
Definition: oro_arch.h:10
bool delete_if(Pred pred)
Erase a value from the list.