Orocos Real-Time Toolkit  2.9.0
RemoteChannelElement.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Oct 22 11:59:07 CEST 2009 RemoteChannelElement.hpp
3 
4  RemoteChannelElement.hpp - description
5  -------------------
6  begin : Thu October 22 2009
7  copyright : (C) 2009 Peter Soetens
8  email : peter@thesourcworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #ifndef CORBA_REMOTE_CHANNEL_ELEMENT_H
40 #define CORBA_REMOTE_CHANNEL_ELEMENT_H
41 
42 #include "DataFlowI.h"
43 #include "CorbaTypeTransporter.hpp"
44 #include "CorbaDispatcher.hpp"
45 #include "CorbaConnPolicy.hpp"
46 #include "ApplicationServer.hpp"
47 
48 namespace RTT {
49 
50  namespace corba {
51 
59  template<typename T>
62  , public base::ChannelElement<T>
63  {
64 
68  bool valid;
69 
70  DataFlowInterface* msender;
71 
72  PortableServer::ObjectId_var oid;
73 
74  std::string localUri;
75 
76  ConnPolicy policy;
77 
78  public:
84  RemoteChannelElement(CorbaTypeTransporter const& transport, DataFlowInterface* sender, PortableServer::POA_ptr poa, const ConnPolicy &policy)
85  : CRemoteChannelElement_i(transport, poa)
86  , valid(true)
87  , msender(sender)
88  , policy(policy)
89  {
90  // Big note about cleanup: The RTT will dispose this object through
91  // the ChannelElement<T> refcounting. So we only need to inform the
92  // POA that our object is dead in disconnect().
93  // CORBA refcount-managed servants must start with a refcount of
94  // 1
95  this->ref();
96  oid = mpoa->activate_object(this);
97  // Force creation of dispatcher.
99 
100  localUri = ApplicationServer::orb->object_to_string(_this());
101  }
102 
104  {
105  }
106 
108  void _add_ref()
109  { this->ref(); }
111  void _remove_ref()
112  { this->deref(); }
113 
114 
119  CORBA::SystemException
120  ))
122 
123  bool signal()
124  {
125  // forward too.
127  // intercept signal if no remote side set.
128  if ( CORBA::is_nil(remote_side.in()) )
129  return true;
130  // Remember that signal() is called in the context of the one
131  // that wrote the data, so we must decouple here to keep hard-RT happy.
132  // the dispatch thread must read the data and send it over by calling transferSample().
133  CorbaDispatcher::Instance(msender)->dispatchChannel( this );
134 
135  return valid;
136  }
137 
138  virtual void transferSamples() {
139  if (!valid)
140  return;
141  //log(Debug) <<"transfering..." <<endlog();
142  // in push mode, transfer all data, in pull mode, only signal once for each sample.
143  if ( policy.pull == ConnPolicy::PULL ) {
144  try
145  {
146 #ifndef RTT_CORBA_PORTS_DISABLE_SIGNAL
147  remote_side->remoteSignal();
148 #endif
149  }
150 #ifdef CORBA_IS_OMNIORB
151  catch(CORBA::SystemException& e)
152  {
153  log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << " " << e.NP_minorString() << endlog();
154  valid = false;
155  }
156 #endif
157  catch(CORBA::Exception& e)
158  {
159  log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << endlog();
160  valid = false;
161  }
162  } else {
164  typename base::ChannelElement<T>::value_t sample;
165 
166  //log(Debug) <<"...read..."<<endlog();
167  while ( this->read(sample, false) == NewData && valid) {
168  //log(Debug) <<"...write..."<<endlog();
169  if ( this->write(sample) == NotConnected )
170  valid = false;
171  //log(Debug) <<"...next read?..."<<endlog();
172  }
173  }
174  //log(Debug) <<"... done." <<endlog();
175 
176  }
177 
178  void disconnect() {
179  // disconnect both local and remote side.
180  // !!!THIS RELIES ON BEHAVIOR OF REMOTEDISCONNECT BELOW doing both forward and !forward !!!
181  try {
182  if ( ! CORBA::is_nil(remote_side.in()) )
183  remote_side->remoteDisconnect(true);
184  }
185  catch(CORBA::Exception&) {}
186 
187  try { this->remoteDisconnect(true); }
188  catch(CORBA::Exception&) {}
189  }
190 
194  void remoteDisconnect(bool forward) ACE_THROW_SPEC ((
195  CORBA::SystemException
196  ))
197  {
199 
200  // Because we support out-of-band transports, we must cleanup more thoroughly.
201  // an oob channel may be sitting at our other end. If not, this is a nop.
203 
204  // Will fail at shutdown if all objects are already deactivated
205  try {
206  if (mdataflow)
207  mdataflow->deregisterChannel(_this());
208  mpoa->deactivate_object(oid);
209  }
210  catch(CORBA::Exception&) {}
211  }
212 
213  bool disconnect(const base::ChannelElementBase::shared_ptr& channel, bool forward)
214  {
215  bool success = false;
216 
217  try {
218  if ( ! CORBA::is_nil(remote_side.in()) ) {
219  remote_side->remoteDisconnect(forward);
220  success = true;
221  }
222  }
223  catch(CORBA::Exception&) {}
224 
225  if ( ! CORBA::is_nil(remote_side.in()) ) {
226  success = base::ChannelElement<T>::disconnect(channel, forward);
227  }
228 
229  // Will fail at shutdown if all objects are already deactivated
230  if (success) {
231  try {
232  if (mdataflow)
233  mdataflow->deregisterChannel(_this());
234  mpoa->deactivate_object(oid);
235  }
236  catch(CORBA::Exception&) {}
237  }
238 
239  return success;
240  }
241 
242  FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data)
243  {
244  if (!valid)
245  return NoData;
246 
247  // try to read locally first
248  FlowStatus fs;
249  CFlowStatus cfs;
250  if ( (fs = base::ChannelElement<T>::read(sample, copy_old_data)) )
251  return fs;
252 
253  // can only read through corba if remote_side is known
254  if ( CORBA::is_nil(remote_side.in()) ) {
255  return NoData;
256  }
257 
258  // go through corba
259  CORBA::Any_var remote_value;
260  try
261  {
262  if ( remote_side && (cfs = remote_side->read(remote_value, copy_old_data) ) )
263  {
264  if (cfs == CNewData || (cfs == COldData && copy_old_data)) {
265  internal::LateReferenceDataSource<T> ref_data_source(&sample);
266  ref_data_source.ref();
267  transport.updateFromAny(&remote_value.in(), &ref_data_source);
268  }
269  return (FlowStatus)cfs;
270  }
271  else
272  return NoData;
273  }
274 #ifdef CORBA_IS_OMNIORB
275  catch(CORBA::SystemException& e)
276  {
277  log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << " " << e.NP_minorString() << endlog();
278  valid = false;
279  return NoData;
280  }
281 #endif
282  catch(CORBA::Exception& e)
283  {
284  log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << endlog();
285  valid = false;
286  return NoData;
287  }
288  }
289 
293  CFlowStatus read(::CORBA::Any_out sample, bool copy_old_data) ACE_THROW_SPEC ((
294  CORBA::SystemException
295  ))
296  {
297 
298  FlowStatus fs;
299  typename internal::ValueDataSource<T> value_data_source;
300  value_data_source.ref();
301  fs = base::ChannelElement<T>::read(value_data_source.set(), copy_old_data);
302  if (fs == NewData || (fs == OldData && copy_old_data)) {
303  sample = transport.createAny(&value_data_source);
304  if ( sample != 0) {
305  return (CFlowStatus)fs;
306  }
307  // this is a programmatic error and should never happen during run-time.
308  log(Error) << "CORBA Transport failed to create Any for " << value_data_source.getTypeName() << " while it should have!" <<endlog();
309  }
310  // we *must* return something in sample.
311  sample = new CORBA::Any();
312  return (CFlowStatus)fs;
313  }
314 
316  {
317  WriteStatus result;
318 
319  // try to write locally first
320  result = base::ChannelElement<T>::write(sample);
321  if (result != NotConnected)
322  return result;
323 
324  // can only write through corba if remote_side is known
325  if ( CORBA::is_nil(remote_side.in()) ) {
326  return NotConnected;
327  }
328 
329  // go through corba
330  assert( remote_side.in() != 0 && "Got write() without remote side. Need buffer OR remote side but neither was present.");
331  try
332  {
333  // This is used on the writing side, to avoid allocating an Any for
334  // each write
335  CORBA::Any write_any;
336  internal::LateConstReferenceDataSource<T> const_ref_data_source(&sample);
337  // There is a trick. We allocate on the stack, but need to
338  // provide shared pointers. Manually increment refence count
339  // (the stack "owns" the object)
340  const_ref_data_source.ref();
341 
342  if (!transport.updateAny(&const_ref_data_source, write_any)) {
343  return WriteFailure;
344  }
345 
346 #ifndef RTT_CORBA_PORTS_WRITE_ONEWAY
347  CWriteStatus cfs = remote_side->write(write_any);
348  return (WriteStatus)cfs;
349 #else
350  remote_side->writeOneway(write_any);
351  return WriteSuccess;
352 #endif
353  }
354 #ifdef CORBA_IS_OMNIORB
355  catch(CORBA::SystemException& e)
356  {
357  log(Error) << "caught CORBA exception while marshalling: " << e._name() << " " << e.NP_minorString() << endlog();
358  return NotConnected;
359  }
360 #endif
361  catch(CORBA::Exception& e)
362  {
363  log(Error) << "caught CORBA exception while marshalling: " << e._name() << endlog();
364  return NotConnected;
365  }
366  }
367 
371  CWriteStatus write(const ::CORBA::Any& sample) ACE_THROW_SPEC ((
372  CORBA::SystemException
373  ))
374  {
375  typename internal::ValueDataSource<T> value_data_source;
376  value_data_source.ref();
377  if (!transport.updateFromAny(&sample, &value_data_source)) {
378  return CWriteFailure;
379  }
380  WriteStatus fs = base::ChannelElement<T>::write(value_data_source.rvalue());
381  return (CWriteStatus)fs;
382  }
383 
387  void writeOneway(const ::CORBA::Any& sample) ACE_THROW_SPEC ((
388  CORBA::SystemException
389  ))
390  {
391  (void) write(sample);
392  }
393 
395  {
396  // we don't pass it on through CORBA (yet).
397  // If an oob transport is used, that one will send it through.
399  }
400 
402  {
403  // try locally first
405  return true;
406  }
407 
408  // if we do not have a reference to the remote side, assume that it's alright.
409  if ( CORBA::is_nil(remote_side.in()) ) return true;
410 
411  // go through corba
412  assert( remote_side.in() != 0 && "Got inputReady() without remote side.");
413  try {
414  return remote_side->inputReady();
415  }
416 #ifdef CORBA_IS_OMNIORB
417  catch(CORBA::SystemException& e)
418  {
419  log(Error) << "caught CORBA exception while checking a remote channel: " << e._name() << " " << e.NP_minorString() << endlog();
420  return false;
421  }
422 #endif
423  catch(CORBA::Exception& e)
424  {
425  log(Error) << "caught CORBA exception while checking a remote channel: " << e._name() << endlog();
426  return false;
427  }
428  }
429 
433  virtual bool inputReady()
434  {
435  // signal to oob transport if any.
437  this->getInput();
438  if (input)
440  return true;
441  }
442 
443  virtual bool channelReady(base::ChannelElementBase::shared_ptr const& caller, ConnPolicy const& policy, internal::ConnID *conn_id)
444  {
445  // try to forward locally first
446  if (base::ChannelElement<T>::channelReady(caller, policy, conn_id))
447  return true;
448 
449  // we are not using the ConnID on the remote side, so we clean it up here
450  delete conn_id;
451 
452  // go through corba
453  assert( remote_side.in() != 0 && "Got channelReady() request without remote side.");
454 
455  try
456  {
457  return remote_side->channelReady(toCORBA(policy));
458  }
459 #ifdef CORBA_IS_OMNIORB
460  catch(CORBA::SystemException& e)
461  {
462  log(Error) << "caught CORBA exception while marshalling: " << e._name() << " " << e.NP_minorString() << endlog();
463  return false;
464  }
465 #endif
466  catch(CORBA::Exception& e)
467  {
468  log(Error) << "caught CORBA exception while marshalling: " << e._name() << endlog();
469  return false;
470  }
471  }
472 
476  virtual bool channelReady(const CConnPolicy& cp) ACE_THROW_SPEC ((
477  CORBA::SystemException
478  ))
479  {
480  ConnPolicy policy = toRTT(cp);
481  return base::ChannelElement<T>::channelReady(this, policy);
482  }
483 
484  virtual bool isRemoteElement() const
485  {
486  return true;
487  }
488 
489  virtual std::string getRemoteURI() const
490  {
491  //check for output element case
493  if(base->getOutput())
495 
496  std::string uri = ApplicationServer::orb->object_to_string(remote_side);
497  return uri;
498  }
499 
500  virtual std::string getLocalURI() const
501  {
502  //check for input element case
504  if(base->getInput())
506 
507  return localUri;
508  }
509 
510  virtual std::string getElementName() const
511  {
512  return "CorbaRemoteChannelElement";
513  }
514  };
515  }
516 }
517 
518 #endif
519 
virtual std::string getRemoteURI() const
This function returns the URI of the next channel element in the logical chain.
virtual std::string getLocalURI() const
This function return the URI of this element.
boost::call_traits< T >::param_type param_t
boost::intrusive_ptr< ChannelElement< T > > shared_ptr
The Interface of a TaskContext which exposes its data-flow ports.
virtual bool channelReady(ChannelElementBase::shared_ptr const &caller, ConnPolicy const &policy, internal::ConnID *conn_id=0)
This is called on the output half of a new connection by the connection factory in order to notify th...
virtual bool isRemoteElement() const
This function may be used to identify, if the current element uses a network transport, to send the data to the next Element in the logical chain.
virtual bool channelReady(const CConnPolicy &cp) ACE_THROW_SPEC((CORBA
CORBA IDL function.
A DataSource which is used to manipulate a reference to an external value, by means of a pointer...
virtual bool channelReady(base::ChannelElementBase::shared_ptr const &caller, ConnPolicy const &policy, internal::ConnID *conn_id)
This is called on the output half of a new connection by the connection factory in order to notify th...
static const bool PULL
Definition: ConnPolicy.hpp:120
FlowStatus
Returns the status of a data flow read operation.
Definition: FlowStatus.hpp:56
void deregisterChannel(CChannelElement_ptr channel)
Deregisters the given channel from the channel list.
Definition: DataFlowI.cpp:232
virtual CORBA::Any_ptr createAny(base::DataSourceBase::shared_ptr source) const =0
Evaluate source and create an any which contains the value of source.
void _remove_ref()
Decrease the reference count, called from the CORBA side.
virtual WriteStatus data_sample(typename base::ChannelElement< T >::param_t sample)
CWriteStatus write(const ::CORBA::Any &sample) ACE_THROW_SPEC((CORBA
CORBA IDL function.
CFlowStatus read(::CORBA::Any_out sample, bool copy_old_data) ACE_THROW_SPEC((CORBA
CORBA IDL function.
RTT::corba::CorbaTypeTransporter const & transport
Definition: DataFlowI.h:75
Base class for CORBA channel servers.
Definition: DataFlowI.h:69
A connection policy object describes how a given connection should behave.
Definition: ConnPolicy.hpp:107
void set(typename AssignableDataSource< T >::param_t t)
Definition: DataSources.inl:32
RTT::ConnPolicy toRTT(RTT::corba::CConnPolicy const &corba_policy)
Converts a Corba CConnPolicy object to a RTT ConPolicy object.
A DataSource which is used to manipulate a const reference to an external value, by means of a pointe...
bool pull
If true, then the sink will have to pull data.
Definition: ConnPolicy.hpp:209
void _add_ref()
Increase the reference count, called from the CORBA side.
virtual std::string getRemoteURI() const
This function returns the URI of the next channel element in the logical chain.
virtual bool signal()
Signals that there is new data available on this channel By default, the channel element forwards the...
shared_ptr getInput()
Returns the current input channel element.
virtual void disconnect(bool forward)
Performs a disconnection of this channel&#39;s endpoints.
Implements the CRemoteChannelElement of the CORBA IDL interface.
A typed version of ChannelElementBase.
virtual bool inputReady(base::ChannelElementBase::shared_ptr const &caller)
This is called by an input port when it is ready to receive data.
#define ACE_THROW_SPEC(x)
Definition: corba.h:65
void remoteSignal() ACE_THROW_SPEC((CORBA
CORBA IDL function.
void dispatchChannel(base::ChannelElementBase::shared_ptr chan)
bool disconnect(const base::ChannelElementBase::shared_ptr &channel, bool forward)
Performs a disconnection of a single input or output endpoint.
virtual std::string getLocalURI() const
This function return the URI of this element.
RemoteChannelElement(CorbaTypeTransporter const &transport, DataFlowInterface *sender, PortableServer::POA_ptr poa, const ConnPolicy &policy)
Create a channel element for remote data exchange.
virtual bool inputReady()
CORBA IDL function.
virtual FlowStatus read(reference_t sample, bool copy_old_data=true)
Reads a sample from the connection.
Definition: corba.h:61
CDataFlowInterface_i * mdataflow
Definition: DataFlowI.h:77
static CorbaDispatcher * Instance(DataFlowInterface *iface, int scheduler=defaultScheduler, int priority=defaultPriority)
Create a new dispatcher for a given data flow interface.
void writeOneway(const ::CORBA::Any &sample) ACE_THROW_SPEC((CORBA
CORBA IDL function.
boost::call_traits< T >::reference reference_t
virtual value_t data_sample()
boost::intrusive_ptr< ChannelElementBase > shared_ptr
void deref()
Decreases the reference count, and deletes the object if it is zero.
RTT::corba::CConnPolicy toCORBA(RTT::ConnPolicy const &policy)
Converts a RTT ConnPolicy object to a Corba CConPolicy object.
virtual std::string getElementName() const
Returns the class name of this element.
FlowStatus read(typename base::ChannelElement< T >::reference_t sample, bool copy_old_data)
CRemoteChannelElement_var remote_side
Definition: DataFlowI.h:74
virtual bool updateFromAny(const CORBA::Any *blob, base::DataSourceBase::shared_ptr target) const =0
Update an assignable datasource target with the contents of blob.
This class is used in places where a permanent representation of a reference to a connection is neede...
Definition: ConnID.hpp:58
virtual WriteStatus write(param_t sample)
Writes a new sample on this connection.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:52
AssignableDataSource< T >::const_reference_t rvalue() const
Get a const reference to the value of this DataSource.
Definition: DataSources.hpp:95
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
void ref() const
Increase the reference count by one.
Definition: DataSource.cpp:80
A simple, yet very useful DataSource, which keeps a value, and returns it in its get() method...
Definition: DataSources.hpp:60
bool signal()
Signals that there is new data available on this channel By default, the channel element forwards the...
shared_ptr getOutput()
Returns the next channel element in the channel&#39;s propagation direction.
virtual bool updateAny(base::DataSourceBase::shared_ptr source, CORBA::Any &any) const =0
Evaluate source and update an any which contains the value of source.
PortableServer::POA_var mpoa
Definition: DataFlowI.h:76
void remoteDisconnect(bool forward) ACE_THROW_SPEC((CORBA
CORBA IDL function.
virtual std::string getTypeName() const
Return the Orocos type name, without const, pointer or reference qualifiers.
Definition: DataSource.inl:26
void ref()
Increases the reference count.
WriteStatus
Returns the status of a data flow write operation.
Definition: FlowStatus.hpp:66
static CORBA::ORB_var orb
The orb of this process.
WriteStatus write(typename base::ChannelElement< T >::param_t sample)
Extends the TypeTransporter in order to allow the creation of channel elements or output halves for a...