41 #include "DataFlowS.h" 43 #include "DataFlowC.h" 45 #include "../../base/PortInterface.hpp" 46 #include "../../Logger.hpp" 49 #include "../../InputPort.hpp" 50 #include "../../OutputPort.hpp" 66 CDataFlowInterface_i::ServantMap CDataFlowInterface_i::s_servant_map;
69 : mdf(interface), mpoa(PortableServer::POA::_duplicate(poa))
85 s_servant_map.push_back(ServantInfo(objref, servant));
89 for (ServantMap::iterator it = s_servant_map.begin();
90 it != s_servant_map.end(); ++it)
92 if (it->getDataFlowInterface() == obj)
94 log(
Debug) <<
"deregistered servant for data flow interface" << endlog();
96 PortableServer::ObjectId_var oid = servant->mpoa->servant_to_id(it->servant);
97 servant->mpoa->deactivate_object(oid);
98 s_servant_map.erase(it);
106 while (!s_servant_map.empty())
108 ServantMap::iterator it = s_servant_map.begin();
115 for (ServantMap::const_iterator it = s_servant_map.begin();
116 it != s_servant_map.end(); ++it)
118 if (it->objref->_is_equivalent(objref))
119 return it->getDataFlowInterface();
126 for (ServantMap::const_iterator it = s_servant_map.begin();
127 it != s_servant_map.end(); ++it)
129 if (it->getDataFlowInterface() == dfi)
133 CDataFlowInterface_ptr server = servant->_this();
134 servant->_remove_ref();
141 return PortableServer::POA::_duplicate(mpoa);
145 CORBA::SystemException
151 pn->length( ports.size() );
153 for (
unsigned int i=0; i != ports.size(); ++i )
154 pn[i] = CORBA::string_dup( ports[i].c_str() );
160 CORBA::SystemException
165 result->length( ports.size() );
168 for (
unsigned int i = 0; i < ports.size(); ++i)
173 port_desc.
name = CORBA::string_dup(ports[i].c_str());
179 log(
Warning) <<
"the type of port " << ports[i] <<
" is not registered into the Orocos type system. It is ignored by the CORBA layer." << endlog();
183 port_desc.
type_name = CORBA::string_dup(type_info->getTypeName().c_str());
184 if (dynamic_cast<InputPortInterface*>(port))
189 result[j++] = port_desc;
192 return result._retn();
196 CORBA::SystemException
204 if (dynamic_cast<InputPortInterface*>(p))
209 char* CDataFlowInterface_i::getDataType(
const char * port_name)
ACE_THROW_SPEC ((
210 CORBA::SystemException
220 CORBA::Boolean CDataFlowInterface_i::isConnected(
const char * port_name)
ACE_THROW_SPEC ((
221 CORBA::SystemException
234 ChannelList::iterator it=channel_list.begin();
235 for (; it != channel_list.end(); ++it) {
236 if (it->first->_is_equivalent (channel) ) {
237 channel_list.erase(it);
243 void CDataFlowInterface_i::disconnectPort(
const char * port_name)
ACE_THROW_SPEC ((
244 CORBA::SystemException
250 log(
Error) <<
"disconnectPort: No such port: "<< port_name <<endlog();
257 bool CDataFlowInterface_i::removeConnection(
258 const char* local_port,
259 CDataFlowInterface_ptr remote_interface,
const char* remote_port)
ACE_THROW_SPEC ((
260 CORBA::SystemException
267 log(
Error) <<
"disconnectPort: No such port: "<< local_port <<endlog();
270 if (dynamic_cast<OutputPortInterface*>(port) == 0) {
271 log(
Error) <<
"disconnectPort: "<< local_port <<
" is an input port" << endlog();
295 ::CORBA::Boolean CDataFlowInterface_i::createStream(
const char* port,
297 CORBA::SystemException
303 log(
Error) <<
"createStream: No such port: "<< p->
getName() <<endlog();
316 void CDataFlowInterface_i::removeStream(
const char* port,
const char* stream_name)
ACE_THROW_SPEC ((
317 CORBA::SystemException
323 log(
Error) <<
"createStream: No such port: "<< p->
getName() <<endlog();
332 CChannelElement_ptr CDataFlowInterface_i::buildChannelOutput(
334 CORBA::SystemException
340 Logger::In in(
"CDataFlowInterface_i::buildChannelOutput");
362 if (!shared_connection) {
367 if ( strlen( corba_policy.
name_id.in()) == 0 )
368 corba_policy.
name_id = CORBA::string_dup( shared_connection->getName().c_str() );
372 return RTT::corba::CChannelElement::_nil();
375 end = shared_connection;
397 log(
Error) <<
"Could not create out-of-band transport for port "<< port_name <<
" with transport id " << corba_policy.
transport <<endlog();
398 log(
Error) <<
"No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->
getTypeName() <<endlog();
399 return RTT::corba::CChannelElement::_nil();
403 if ( strlen( corba_policy.
name_id.in()) == 0 )
404 corba_policy.
name_id = CORBA::string_dup( policy2.
name_id.c_str() );
410 log(
Info) <<
"Receiving data for port "<< policy2.
name_id <<
" from out-of-band protocol "<< corba_policy.
transport <<endlog();
412 log(
Error) <<
"The type transporter for type "<<type_info->
getTypeName()<<
" failed to create an out-of-band endpoint for port " << port_name<<endlog();
413 return RTT::corba::CChannelElement::_nil();
421 this_element->_remove_ref();
425 channel_list.push_back( ChannelList::value_type(this_element->_this(), end) );
428 CRemoteChannelElement_var proxy = this_element->_this();
429 return proxy._retn();
435 CChannelElement_ptr CDataFlowInterface_i::buildChannelInput(
437 CORBA::SystemException
443 Logger::In in(
"CDataFlowInterface_i::buildChannelInput");
470 PortableServer::ServantBase_var servant = this_element = transporter->
createChannelElement_i(mdf, mpoa, policy2);
475 assert( dynamic_cast<ChannelElementBase*>(this_element) );
484 log(
Error) <<
"Could not create out-of-band transport for port "<< port_name <<
" with transport id " << corba_policy.
transport <<endlog();
485 log(
Error) <<
"No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->
getTypeName() <<endlog();
490 if ( strlen( corba_policy.
name_id.in()) == 0 )
491 corba_policy.
name_id = CORBA::string_dup( policy2.
name_id.c_str() );
495 start->connectTo( dynamic_cast<ChannelElementBase*>(this_element), policy2.
mandatory );
497 log(
Info) <<
"Sending data from port "<< policy2.
name_id <<
" to out-of-band protocol "<< corba_policy.
transport <<endlog();
499 log(
Error) <<
"The type transporter for type "<<type_info->
getTypeName()<<
" failed to create an out-of-band endpoint for port " << port_name<<endlog();
506 start->connectTo(buf, policy2.
mandatory);
507 buf->connectTo( dynamic_cast<ChannelElementBase*>(this_element) );
519 channel_list.push_back( ChannelList::value_type(this_element->_this(), start->getInputEndPoint()));
522 return this_element->_this();
526 CORBA::SystemException
531 Logger::In in(
"CDataFlowInterface_i::createSharedConnection");
554 if ( strlen( corba_policy.
name_id.in()) == 0 )
555 corba_policy.
name_id = CORBA::string_dup( shared_connection->getName().c_str() );
561 ::CORBA::Boolean CDataFlowInterface_i::createConnection(
562 const char* writer_port, CDataFlowInterface_ptr reader_interface,
564 CORBA::SystemException
568 Logger::In in(
"CDataFlowInterface_i::createConnection");
577 if (local_interface && policy.
transport == 0)
583 log(
Warning) <<
"CORBA: createConnection() target is not an input port" << endlog();
588 log(
Debug) <<
"CORBA: createConnection() is creating a LOCAL connection between " <<
589 writer_port <<
" and " << reader_port << endlog();
593 log(
Debug) <<
"CORBA: createConnection() is creating a REMOTE connection between " <<
594 writer_port <<
" and " << reader_port << endlog();
597 if (reader_interface->getPortType(reader_port) !=
corba::CInput) {
598 log(
Error) <<
"Could not create connection: " << reader_port <<
" is not an input port."<<endlog();
612 catch(CORBA::COMM_FAILURE&) {
throw; }
613 catch(CORBA::TRANSIENT&) {
throw; }
614 catch(...) {
throw; }
620 PortableServer::POA_ptr poa)
621 : transport(transport)
622 , mpoa(PortableServer::POA::_duplicate(poa))
629 return PortableServer::POA::_duplicate(
mpoa);
632 void CRemoteChannelElement_i::setRemoteSide(CRemoteChannelElement_ptr remote)
ACE_THROW_SPEC ((
633 CORBA::SystemException
636 this->
remote_side = RTT::corba::CRemoteChannelElement::_duplicate(remote);
static CDataFlowInterface_ptr getRemoteInterface(DataFlowInterface *dfi, PortableServer::POA_ptr poa)
Returns an object reference to a remote interface.
virtual const types::TypeInfo * getTypeInfo() const =0
Returns the types::TypeInfo object for the port's type.
The Interface of a TaskContext which exposes its data-flow ports.
virtual ~CRemoteChannelElement_i()
boost::intrusive_ptr< SharedConnectionBase > shared_ptr
base::ChannelElementBase::shared_ptr buildChannelOutput(base::InputPortInterface &port, ConnPolicy const &policy) const
const std::string & getTypeName() const
Return the type name which was first registered.
virtual CRemoteChannelElement_i * createChannelElement_i(DataFlowInterface *sender,::PortableServer::POA *poa, const ConnPolicy &policy) const =0
Builds a channel element for remote transport in both directions.
PortNames getPortNames() const
Get all port names of this interface.
Represents a Stream connection created by the ConnFactory.
Classes for typekits for describing and handling user data types.
Emitted when information is requested on a port that does not exist.
base::ChannelElementBase::shared_ptr buildDataStorage(ConnPolicy const &policy) const
Creates single data or buffered storage for this type.
void deregisterChannel(CChannelElement_ptr channel)
Deregisters the given channel from the channel list.
const std::string & getName() const
Get the name of this Port.
CRemoteChannelElement_i(corba::CorbaTypeTransporter const &transport, PortableServer::POA_ptr poa)
base::ChannelElementBase::shared_ptr buildChannelInput(base::OutputPortInterface &port, ConnPolicy const &policy) const
Base class for CORBA channel servers.
A connection policy object describes how a given connection should behave.
The base class of each OutputPort.
sequence< CPortDescription > CPortDescriptions
CBufferPolicy buffer_policy
RTT::ConnPolicy toRTT(RTT::corba::CConnPolicy const &corba_policy)
Converts a Corba CConnPolicy object to a RTT ConPolicy object.
static void clearServants()
Emitted during connections, when there is no CORBA transport defined for the data type of the given p...
virtual void disconnect()=0
Removes any connection that either go to or come from this port.
bool mandatory
Whether the connection described by this connection policy is mandatory, which means that write opera...
static void registerServant(CDataFlowInterface_ptr objref, CDataFlowInterface_i *servant)
virtual bool removeConnection(internal::ConnID *cid)
Removes a user created connection from this port.
base::PortInterface * getPort(const std::string &name) const
Get an added port.
DataFlowInterface * getDataFlowInterface() const
#define ACE_THROW_SPEC(x)
PortableServer::POA_ptr _default_POA()
const std::string & getDescription() const
Get the documentation of this port.
bool createConnection(InputPortInterface &sink)
Connects this write port to the given read port, using as policy the default policy of the sink port...
virtual base::ChannelElementBase::shared_ptr createStream(base::PortInterface *port, const ConnPolicy &policy, bool is_sender) const =0
Creates a streaming channel element for reading or writing over this transport.
Represents a connection to a remote CORBA port.
Classes which contain all implementation code for the RTT.
A class for representing a user type, and which can build instances of that type. ...
virtual bool addConnection(internal::ConnID *port_id, ChannelElementBase::shared_ptr channel_input, ConnPolicy const &policy)
Adds a new connection to this output port and initializes the connection if required by policy...
boost::intrusive_ptr< ChannelElementBase > shared_ptr
static void deregisterServant(DataFlowInterface *obj)
RTT::corba::CConnPolicy toCORBA(RTT::ConnPolicy const &policy)
Converts a RTT ConnPolicy object to a Corba CConPolicy object.
virtual ~CDataFlowInterface_i()
Base classes of RTT classes.
std::vector< std::string > PortNames
A sequence of names of ports.
TypeTransporter * getProtocol(int protocol_id) const
Returns this type's transport for a given protocol.
A simplistic id that is only same with its own clones (and clones of clones).
CORBA (OmniORB/TAO) code for network data transport.
Notify the Logger in which 'module' the message occured.
#define ORO_CORBA_PROTOCOL_ID
void setInterface(DataFlowInterface *iface)
Once a port is added to a DataFlowInterface, it gets a pointer to that interface. ...
CRemoteChannelElement_var remote_side
Emitted during connections, when it is not possible to build new connections with the given arguments...
static DataFlowInterface * getLocalInterface(CDataFlowInterface_ptr objref)
int buffer_policy
The policy on how buffer elements will be installed for this connection, which influences the behavio...
sequence< string > CPortNames
CDataFlowInterface_i(DataFlowInterface *interface, PortableServer::POA_ptr poa)
virtual bool createStream(ConnPolicy const &policy)=0
Creates a data stream from or to this port using connection-less transports.
#define CORBA_CHECK_THREAD()
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
virtual bool connected() const =0
Returns true if this port is connected.
The base class of every data flow port.
std::string name_id
The name of this connection.
virtual shared_ptr getOutputEndPoint()
Returns the last output channel element of this connection.
internal::SharedConnectionBase::shared_ptr buildSharedConnection(base::OutputPortInterface *output_port, base::InputPortInterface *input_port, ConnPolicy const &policy) const
static bool findSharedConnection(base::OutputPortInterface *output_port, base::InputPortInterface *input_port, ConnPolicy const &policy, SharedConnectionBase::shared_ptr &shared_connection)
Tries to find an existing or creates a new shared connection object for the given output port...
PortableServer::POA_ptr _default_POA()
Represents a remote data flow interface.
PortableServer::POA_var mpoa
Extends the TypeTransporter in order to allow the creation of channel elements or output halves for a...
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
void setCDataFlowInterface(CDataFlowInterface_i *dataflow)