41 #include "DataFlowS.h"    43 #include "DataFlowC.h"    45 #include "../../base/PortInterface.hpp"    46 #include "../../Logger.hpp"    49 #include "../../InputPort.hpp"    50 #include "../../OutputPort.hpp"    66 CDataFlowInterface_i::ServantMap CDataFlowInterface_i::s_servant_map;
    69     : mdf(interface), mpoa(PortableServer::POA::_duplicate(poa))
    85     s_servant_map.push_back(ServantInfo(objref, servant));
    89     for (ServantMap::iterator it = s_servant_map.begin();
    90             it != s_servant_map.end(); ++it)
    92         if (it->getDataFlowInterface() == obj)
    94             log(
Debug) << 
"deregistered servant for data flow interface" << endlog();
    96             PortableServer::ObjectId_var oid = servant->mpoa->servant_to_id(it->servant);
    97             servant->mpoa->deactivate_object(oid);
    98             s_servant_map.erase(it);
   106     while (!s_servant_map.empty())
   108         ServantMap::iterator it = s_servant_map.begin();
   115     for (ServantMap::const_iterator it = s_servant_map.begin();
   116             it != s_servant_map.end(); ++it)
   118         if (it->objref->_is_equivalent(objref))
   119             return it->getDataFlowInterface();
   126     for (ServantMap::const_iterator it = s_servant_map.begin();
   127             it != s_servant_map.end(); ++it)
   129         if (it->getDataFlowInterface() == dfi)
   133     CDataFlowInterface_ptr server = servant->_this();
   134     servant->_remove_ref();
   141     return PortableServer::POA::_duplicate(mpoa);
   145           CORBA::SystemException
   151     pn->length( ports.size() );
   153     for (
unsigned int i=0; i != ports.size(); ++i )
   154         pn[i] = CORBA::string_dup( ports[i].c_str() );
   160           CORBA::SystemException
   165     result->length( ports.size() );
   168     for (
unsigned int i = 0; i < ports.size(); ++i)
   173         port_desc.
name = CORBA::string_dup(ports[i].c_str());
   178             log(
Warning) << 
"the type of port " << ports[i] << 
" is not registered into the Orocos type system. It is ignored by the CORBA layer." << endlog();
   183         if (dynamic_cast<InputPortInterface*>(port))
   188         result[j++] = port_desc;
   191     return result._retn();
   195           CORBA::SystemException
   203     if (dynamic_cast<InputPortInterface*>(p))
   208 char* CDataFlowInterface_i::getDataType(
const char * port_name) 
ACE_THROW_SPEC ((
   209           CORBA::SystemException
   219 CORBA::Boolean CDataFlowInterface_i::isConnected(
const char * port_name) 
ACE_THROW_SPEC ((
   220           CORBA::SystemException
   233     ChannelList::iterator it=channel_list.begin();
   234     for (; it != channel_list.end(); ++it) {
   235         if (it->first->_is_equivalent (channel) ) {
   236             channel_list.erase(it); 
   242 CORBA::Boolean CDataFlowInterface_i::channelReady(
const char * reader_port_name, CChannelElement_ptr channel, 
CConnPolicy const& policy ) 
ACE_THROW_SPEC ((
   243           CORBA::SystemException
   259         ChannelList::iterator it=channel_list.begin();
   260         for (; it != channel_list.end(); ++it) {
   261             if (it->first->_is_equivalent (channel) ) {
   267                 catch(std::exception 
const& e)
   269                     log(
Error) << 
"call to channelReady threw " << e.what() << endlog();
   275     log(
Error) << 
"Invalid CORBA channel given for port " << reader_port_name << 
": could not match it to a local C++ channel." <<endlog();
   279 void CDataFlowInterface_i::disconnectPort(
const char * port_name) 
ACE_THROW_SPEC ((
   280           CORBA::SystemException
   286         log(
Error) << 
"disconnectPort: No such port: "<< port_name <<endlog();
   293 bool CDataFlowInterface_i::removeConnection(
   294         const char* local_port,
   295         CDataFlowInterface_ptr remote_interface, 
const char* remote_port) 
ACE_THROW_SPEC ((
   296                   CORBA::SystemException
   303         log(
Error) << 
"disconnectPort: No such port: "<< local_port <<endlog();
   306     if (dynamic_cast<OutputPortInterface*>(port) == 0) {
   307         log(
Error) << 
"disconnectPort: "<< local_port << 
" is an input port" << endlog();
   331 ::CORBA::Boolean CDataFlowInterface_i::createStream( 
const char* port,
   333                                       CORBA::SystemException
   339         log(
Error) << 
"createStream: No such port: "<< p->
getName() <<endlog();
   352 void CDataFlowInterface_i::removeStream( 
const char* port, 
const char* stream_name) 
ACE_THROW_SPEC ((
   353           CORBA::SystemException
   359         log(
Error) << 
"createStream: No such port: "<< p->
getName() <<endlog();
   368 CChannelElement_ptr CDataFlowInterface_i::buildChannelOutput(
   370                   CORBA::SystemException
   375     Logger::In in(
"CDataFlowInterface_i::buildChannelOutput");
   405             log(
Error) << 
"Could not create out-of-band transport for port "<< port_name << 
" with transport id " << corba_policy.
transport <<endlog();
   406             log(
Error) << 
"No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->
getTypeName() <<endlog();
   407             return RTT::corba::CChannelElement::_nil();
   411         if ( strlen( corba_policy.
name_id.in()) == 0 )
   412             corba_policy.
name_id = CORBA::string_dup( policy2.
name_id.c_str() );
   418             ceb->setOutput( buf );
   420             log(
Info) <<
"Receiving data for port "<< policy2.
name_id << 
" from out-of-band protocol "<< corba_policy.
transport <<endlog();
   422             log(
Error) << 
"The type transporter for type "<<type_info->
getTypeName()<< 
" failed to create an out-of-band endpoint for port " << port_name<<endlog();
   423             return RTT::corba::CChannelElement::_nil();
   428         if ( !corba_policy.
pull ) {
   437     this_element->_remove_ref();
   441         channel_list.push_back( ChannelList::value_type(this_element->_this(), end->getOutputEndPoint()));
   444     CRemoteChannelElement_var proxy = this_element->_this();
   445     return proxy._retn();
   451 CChannelElement_ptr CDataFlowInterface_i::buildChannelInput(
   453                   CORBA::SystemException
   458     Logger::In in(
"CDataFlowInterface_i::buildChannelInput");
   486     assert( dynamic_cast<ChannelElementBase*>(this_element) );
   487     start->getOutputEndPoint()->setOutput( dynamic_cast<ChannelElementBase*>(this_element));
   496             log(
Error) << 
"Could not create out-of-band transport for port "<< port_name << 
" with transport id " << corba_policy.
transport <<endlog();
   497             log(
Error) << 
"No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->
getTypeName() <<endlog();
   502         if ( strlen( corba_policy.
name_id.in()) == 0 )
   503             corba_policy.
name_id = CORBA::string_dup( policy2.
name_id.c_str() );
   507             start->getOutputEndPoint()->setOutput( ceb );
   508             log(
Info) <<
"Sending data from port "<< policy2.
name_id << 
" to out-of-band protocol "<< corba_policy.
transport <<endlog();
   510             log(
Error) << 
"The type transporter for type "<<type_info->
getTypeName()<< 
" failed to create an out-of-band endpoint for port " << port_name<<endlog();
   516         start->setOutput(buf);
   517         buf->setOutput( dynamic_cast<ChannelElementBase*>(this_element) );
   529         channel_list.push_back( ChannelList::value_type(this_element->_this(), start->getInputEndPoint()));
   532     return this_element->_this();
   536 ::CORBA::Boolean CDataFlowInterface_i::createConnection(
   537         const char* writer_port, CDataFlowInterface_ptr reader_interface,
   539                   CORBA::SystemException
   543     Logger::In in(
"CDataFlowInterface_i::createConnection");
   552     if (local_interface && policy.
transport == 0)
   558             log(
Warning) << 
"CORBA: createConnection() target is not an input port" << endlog();
   563         log(
Debug) << 
"CORBA: createConnection() is creating a LOCAL connection between " <<
   564            writer_port << 
" and " << reader_port << endlog();
   568         log(
Debug) << 
"CORBA: createConnection() is creating a REMOTE connection between " <<
   569            writer_port << 
" and " << reader_port << endlog();
   572         if (reader_interface->getPortType(reader_port) != 
corba::CInput) {
   573             log(
Error) << 
"Could not create connection: " << reader_port <<
" is not an input port."<<endlog();
   584     catch(CORBA::COMM_FAILURE&) { 
throw; }
   585     catch(CORBA::TRANSIENT&) { 
throw; }
   586     catch(...) { 
throw; }
   592       PortableServer::POA_ptr poa)
   593     : transport(transport)
   594     , mpoa(PortableServer::POA::_duplicate(poa))
   599 { 
return PortableServer::POA::_duplicate(
mpoa); }
   600 void CRemoteChannelElement_i::setRemoteSide(CRemoteChannelElement_ptr remote) 
ACE_THROW_SPEC ((
   601           CORBA::SystemException
   603 { this->
remote_side = RTT::corba::CRemoteChannelElement::_duplicate(remote); }
 
static CDataFlowInterface_ptr getRemoteInterface(DataFlowInterface *dfi, PortableServer::POA_ptr poa)
Returns an object reference to a remote interface. 
virtual const types::TypeInfo * getTypeInfo() const =0
Returns the types::TypeInfo object for the port's type. 
The Interface of a TaskContext which exposes its data-flow ports. 
virtual ~CRemoteChannelElement_i()
base::ChannelElementBase::shared_ptr buildChannelInput(base::OutputPortInterface &port) const 
const std::string & getTypeName() const 
Return the type name which was first registered. 
PortNames getPortNames() const 
Get all port names of this interface. 
Represents a Stream connection created by the ConnFactory. 
Classes for typekits for describing and handling user data types. 
Emitted when information is requested on a port that does not exist. 
base::ChannelElementBase::shared_ptr buildDataStorage(ConnPolicy const &policy) const 
Creates single data or buffered storage for this type. 
void deregisterChannel(CChannelElement_ptr channel)
Deregisters the given channel from the channel list. 
const std::string & getName() const 
Get the name of this Port. 
CRemoteChannelElement_i(corba::CorbaTypeTransporter const &transport, PortableServer::POA_ptr poa)
Base class for CORBA channel servers. 
A connection policy object describes how a given connection should behave. 
The base class of each OutputPort. 
sequence< CPortDescription > CPortDescriptions
RTT::ConnPolicy toRTT(RTT::corba::CConnPolicy const &corba_policy)
Converts a Corba CConnPolicy object to a RTT ConPolicy object. 
static void clearServants()
Emitted during connections, when there is no CORBA transport defined for the data type of the given p...
virtual void disconnect()=0
Removes any connection that either go to or come from this port. 
static void registerServant(CDataFlowInterface_ptr objref, CDataFlowInterface_i *servant)
base::PortInterface * getPort(const std::string &name) const 
Get an added port. 
DataFlowInterface * getDataFlowInterface() const 
#define ACE_THROW_SPEC(x)
PortableServer::POA_ptr _default_POA()
bool createConnection(InputPortInterface &sink)
Connects this write port to the given read port, using as policy the default policy of the sink port...
virtual base::ChannelElementBase::shared_ptr createStream(base::PortInterface *port, const ConnPolicy &policy, bool is_sender) const =0
Creates a streaming channel element for reading or writing over this transport. 
Represents a connection to a remote CORBA port. 
Classes which contain all implementation code for the RTT. 
A class for representing a user type, and which can build instances of that type. ...
virtual bool addConnection(internal::ConnID *port_id, ChannelElementBase::shared_ptr channel_input, ConnPolicy const &policy)
Adds a new connection to this output port and initializes the connection if required by policy...
void setOutput(shared_ptr output)
Sets the output of this channel element to output and sets the input of output to this...
virtual CRemoteChannelElement_i * createChannelElement_i(DataFlowInterface *sender,::PortableServer::POA *poa, bool is_pull) const =0
Builds a channel element for remote transport in both directions. 
boost::intrusive_ptr< ChannelElementBase > shared_ptr
virtual bool removeConnection(internal::ConnID *cid)=0
Removes a user created connection from this port. 
static void deregisterServant(DataFlowInterface *obj)
RTT::corba::CConnPolicy toCORBA(RTT::ConnPolicy const &policy)
Converts a RTT ConnPolicy object to a Corba CConPolicy object. 
virtual ~CDataFlowInterface_i()
Base classes of RTT classes. 
std::vector< std::string > PortNames
A sequence of names of ports. 
TypeTransporter * getProtocol(int protocol_id) const 
Returns this type's transport for a given protocol. 
A simplistic id that is only same with its own clones (and clones of clones). 
CORBA (OmniORB/TAO) code for network data transport. 
Notify the Logger in which 'module' the message occured. 
#define ORO_CORBA_PROTOCOL_ID
void setInterface(DataFlowInterface *iface)
Once a port is added to a DataFlowInterface, it gets a pointer to that interface. ...
CRemoteChannelElement_var remote_side
static DataFlowInterface * getLocalInterface(CDataFlowInterface_ptr objref)
sequence< string > CPortNames
CDataFlowInterface_i(DataFlowInterface *interface, PortableServer::POA_ptr poa)
virtual bool createStream(ConnPolicy const &policy)=0
Creates a data stream from or to this port using connection-less transports. 
#define CORBA_CHECK_THREAD()
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
virtual bool connected() const =0
Returns true if this port is connected. 
The base class of every data flow port. 
std::string name_id
The name of this connection. 
PortableServer::POA_ptr _default_POA()
Represents a remote data flow interface. 
PortableServer::POA_var mpoa
Extends the TypeTransporter in order to allow the creation of channel elements or output halves for a...
base::ChannelElementBase::shared_ptr buildChannelOutput(base::InputPortInterface &port) const 
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
void setCDataFlowInterface(CDataFlowInterface_i *dataflow)