41 #include "DataFlowS.h" 43 #include "DataFlowC.h" 45 #include "../../base/PortInterface.hpp" 46 #include "../../Logger.hpp" 49 #include "../../InputPort.hpp" 50 #include "../../OutputPort.hpp" 66 CDataFlowInterface_i::ServantMap CDataFlowInterface_i::s_servant_map;
69 : mdf(interface), mpoa(PortableServer::POA::_duplicate(poa))
85 s_servant_map.push_back(ServantInfo(objref, servant));
89 for (ServantMap::iterator it = s_servant_map.begin();
90 it != s_servant_map.end(); ++it)
92 if (it->getDataFlowInterface() == obj)
94 log(
Debug) <<
"deregistered servant for data flow interface" << endlog();
96 PortableServer::ObjectId_var oid = servant->mpoa->servant_to_id(it->servant);
97 servant->mpoa->deactivate_object(oid);
98 s_servant_map.erase(it);
106 while (!s_servant_map.empty())
108 ServantMap::iterator it = s_servant_map.begin();
115 for (ServantMap::const_iterator it = s_servant_map.begin();
116 it != s_servant_map.end(); ++it)
118 if (it->objref->_is_equivalent(objref))
119 return it->getDataFlowInterface();
126 for (ServantMap::const_iterator it = s_servant_map.begin();
127 it != s_servant_map.end(); ++it)
129 if (it->getDataFlowInterface() == dfi)
133 CDataFlowInterface_ptr server = servant->_this();
134 servant->_remove_ref();
141 return PortableServer::POA::_duplicate(mpoa);
145 CORBA::SystemException
151 pn->length( ports.size() );
153 for (
unsigned int i=0; i != ports.size(); ++i )
154 pn[i] = CORBA::string_dup( ports[i].c_str() );
160 CORBA::SystemException
165 result->length( ports.size() );
168 for (
unsigned int i = 0; i < ports.size(); ++i)
173 port_desc.
name = CORBA::string_dup(ports[i].c_str());
178 log(
Warning) <<
"the type of port " << ports[i] <<
" is not registered into the Orocos type system. It is ignored by the CORBA layer." << endlog();
183 if (dynamic_cast<InputPortInterface*>(port))
188 result[j++] = port_desc;
191 return result._retn();
195 CORBA::SystemException
203 if (dynamic_cast<InputPortInterface*>(p))
208 char* CDataFlowInterface_i::getDataType(
const char * port_name)
ACE_THROW_SPEC ((
209 CORBA::SystemException
219 CORBA::Boolean CDataFlowInterface_i::isConnected(
const char * port_name)
ACE_THROW_SPEC ((
220 CORBA::SystemException
233 ChannelList::iterator it=channel_list.begin();
234 for (; it != channel_list.end(); ++it) {
235 if (it->first->_is_equivalent (channel) ) {
236 channel_list.erase(it);
242 CORBA::Boolean CDataFlowInterface_i::channelReady(
const char * reader_port_name, CChannelElement_ptr channel,
CConnPolicy const& policy )
ACE_THROW_SPEC ((
243 CORBA::SystemException
259 ChannelList::iterator it=channel_list.begin();
260 for (; it != channel_list.end(); ++it) {
261 if (it->first->_is_equivalent (channel) ) {
267 catch(std::exception
const& e)
269 log(
Error) <<
"call to channelReady threw " << e.what() << endlog();
275 log(
Error) <<
"Invalid CORBA channel given for port " << reader_port_name <<
": could not match it to a local C++ channel." <<endlog();
279 void CDataFlowInterface_i::disconnectPort(
const char * port_name)
ACE_THROW_SPEC ((
280 CORBA::SystemException
286 log(
Error) <<
"disconnectPort: No such port: "<< port_name <<endlog();
293 bool CDataFlowInterface_i::removeConnection(
294 const char* local_port,
295 CDataFlowInterface_ptr remote_interface,
const char* remote_port)
ACE_THROW_SPEC ((
296 CORBA::SystemException
303 log(
Error) <<
"disconnectPort: No such port: "<< local_port <<endlog();
306 if (dynamic_cast<OutputPortInterface*>(port) == 0) {
307 log(
Error) <<
"disconnectPort: "<< local_port <<
" is an input port" << endlog();
331 ::CORBA::Boolean CDataFlowInterface_i::createStream(
const char* port,
333 CORBA::SystemException
339 log(
Error) <<
"createStream: No such port: "<< p->
getName() <<endlog();
352 void CDataFlowInterface_i::removeStream(
const char* port,
const char* stream_name)
ACE_THROW_SPEC ((
353 CORBA::SystemException
359 log(
Error) <<
"createStream: No such port: "<< p->
getName() <<endlog();
368 CChannelElement_ptr CDataFlowInterface_i::buildChannelOutput(
370 CORBA::SystemException
375 Logger::In in(
"CDataFlowInterface_i::buildChannelOutput");
405 log(
Error) <<
"Could not create out-of-band transport for port "<< port_name <<
" with transport id " << corba_policy.
transport <<endlog();
406 log(
Error) <<
"No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->
getTypeName() <<endlog();
407 return RTT::corba::CChannelElement::_nil();
411 if ( strlen( corba_policy.
name_id.in()) == 0 )
412 corba_policy.
name_id = CORBA::string_dup( policy2.
name_id.c_str() );
418 ceb->setOutput( buf );
420 log(
Info) <<
"Receiving data for port "<< policy2.
name_id <<
" from out-of-band protocol "<< corba_policy.
transport <<endlog();
422 log(
Error) <<
"The type transporter for type "<<type_info->
getTypeName()<<
" failed to create an out-of-band endpoint for port " << port_name<<endlog();
423 return RTT::corba::CChannelElement::_nil();
428 if ( !corba_policy.
pull ) {
437 this_element->_remove_ref();
441 channel_list.push_back( ChannelList::value_type(this_element->_this(), end->getOutputEndPoint()));
444 CRemoteChannelElement_var proxy = this_element->_this();
445 return proxy._retn();
451 CChannelElement_ptr CDataFlowInterface_i::buildChannelInput(
453 CORBA::SystemException
458 Logger::In in(
"CDataFlowInterface_i::buildChannelInput");
486 assert( dynamic_cast<ChannelElementBase*>(this_element) );
487 start->getOutputEndPoint()->setOutput( dynamic_cast<ChannelElementBase*>(this_element));
496 log(
Error) <<
"Could not create out-of-band transport for port "<< port_name <<
" with transport id " << corba_policy.
transport <<endlog();
497 log(
Error) <<
"No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->
getTypeName() <<endlog();
502 if ( strlen( corba_policy.
name_id.in()) == 0 )
503 corba_policy.
name_id = CORBA::string_dup( policy2.
name_id.c_str() );
507 start->getOutputEndPoint()->setOutput( ceb );
508 log(
Info) <<
"Sending data from port "<< policy2.
name_id <<
" to out-of-band protocol "<< corba_policy.
transport <<endlog();
510 log(
Error) <<
"The type transporter for type "<<type_info->
getTypeName()<<
" failed to create an out-of-band endpoint for port " << port_name<<endlog();
516 start->setOutput(buf);
517 buf->setOutput( dynamic_cast<ChannelElementBase*>(this_element) );
529 channel_list.push_back( ChannelList::value_type(this_element->_this(), start->getInputEndPoint()));
532 return this_element->_this();
536 ::CORBA::Boolean CDataFlowInterface_i::createConnection(
537 const char* writer_port, CDataFlowInterface_ptr reader_interface,
539 CORBA::SystemException
543 Logger::In in(
"CDataFlowInterface_i::createConnection");
552 if (local_interface && policy.
transport == 0)
558 log(
Warning) <<
"CORBA: createConnection() target is not an input port" << endlog();
563 log(
Debug) <<
"CORBA: createConnection() is creating a LOCAL connection between " <<
564 writer_port <<
" and " << reader_port << endlog();
568 log(
Debug) <<
"CORBA: createConnection() is creating a REMOTE connection between " <<
569 writer_port <<
" and " << reader_port << endlog();
572 if (reader_interface->getPortType(reader_port) !=
corba::CInput) {
573 log(
Error) <<
"Could not create connection: " << reader_port <<
" is not an input port."<<endlog();
584 catch(CORBA::COMM_FAILURE&) {
throw; }
585 catch(CORBA::TRANSIENT&) {
throw; }
586 catch(...) {
throw; }
592 PortableServer::POA_ptr poa)
593 : transport(transport)
594 , mpoa(PortableServer::POA::_duplicate(poa))
599 {
return PortableServer::POA::_duplicate(
mpoa); }
600 void CRemoteChannelElement_i::setRemoteSide(CRemoteChannelElement_ptr remote)
ACE_THROW_SPEC ((
601 CORBA::SystemException
603 { this->
remote_side = RTT::corba::CRemoteChannelElement::_duplicate(remote); }
static CDataFlowInterface_ptr getRemoteInterface(DataFlowInterface *dfi, PortableServer::POA_ptr poa)
Returns an object reference to a remote interface.
virtual const types::TypeInfo * getTypeInfo() const =0
Returns the types::TypeInfo object for the port's type.
The Interface of a TaskContext which exposes its data-flow ports.
virtual ~CRemoteChannelElement_i()
base::ChannelElementBase::shared_ptr buildChannelInput(base::OutputPortInterface &port) const
const std::string & getTypeName() const
Return the type name which was first registered.
PortNames getPortNames() const
Get all port names of this interface.
Represents a Stream connection created by the ConnFactory.
Classes for typekits for describing and handling user data types.
Emitted when information is requested on a port that does not exist.
base::ChannelElementBase::shared_ptr buildDataStorage(ConnPolicy const &policy) const
Creates single data or buffered storage for this type.
void deregisterChannel(CChannelElement_ptr channel)
Deregisters the given channel from the channel list.
const std::string & getName() const
Get the name of this Port.
CRemoteChannelElement_i(corba::CorbaTypeTransporter const &transport, PortableServer::POA_ptr poa)
Base class for CORBA channel servers.
A connection policy object describes how a given connection should behave.
The base class of each OutputPort.
sequence< CPortDescription > CPortDescriptions
RTT::ConnPolicy toRTT(RTT::corba::CConnPolicy const &corba_policy)
Converts a Corba CConnPolicy object to a RTT ConPolicy object.
static void clearServants()
Emitted during connections, when there is no CORBA transport defined for the data type of the given p...
virtual void disconnect()=0
Removes any connection that either go to or come from this port.
static void registerServant(CDataFlowInterface_ptr objref, CDataFlowInterface_i *servant)
base::PortInterface * getPort(const std::string &name) const
Get an added port.
DataFlowInterface * getDataFlowInterface() const
#define ACE_THROW_SPEC(x)
PortableServer::POA_ptr _default_POA()
bool createConnection(InputPortInterface &sink)
Connects this write port to the given read port, using as policy the default policy of the sink port...
virtual base::ChannelElementBase::shared_ptr createStream(base::PortInterface *port, const ConnPolicy &policy, bool is_sender) const =0
Creates a streaming channel element for reading or writing over this transport.
Represents a connection to a remote CORBA port.
Classes which contain all implementation code for the RTT.
A class for representing a user type, and which can build instances of that type. ...
virtual bool addConnection(internal::ConnID *port_id, ChannelElementBase::shared_ptr channel_input, ConnPolicy const &policy)
Adds a new connection to this output port and initializes the connection if required by policy...
void setOutput(shared_ptr output)
Sets the output of this channel element to output and sets the input of output to this...
virtual CRemoteChannelElement_i * createChannelElement_i(DataFlowInterface *sender,::PortableServer::POA *poa, bool is_pull) const =0
Builds a channel element for remote transport in both directions.
boost::intrusive_ptr< ChannelElementBase > shared_ptr
virtual bool removeConnection(internal::ConnID *cid)=0
Removes a user created connection from this port.
static void deregisterServant(DataFlowInterface *obj)
RTT::corba::CConnPolicy toCORBA(RTT::ConnPolicy const &policy)
Converts a RTT ConnPolicy object to a Corba CConPolicy object.
virtual ~CDataFlowInterface_i()
Base classes of RTT classes.
std::vector< std::string > PortNames
A sequence of names of ports.
TypeTransporter * getProtocol(int protocol_id) const
Returns this type's transport for a given protocol.
A simplistic id that is only same with its own clones (and clones of clones).
CORBA (OmniORB/TAO) code for network data transport.
Notify the Logger in which 'module' the message occured.
#define ORO_CORBA_PROTOCOL_ID
void setInterface(DataFlowInterface *iface)
Once a port is added to a DataFlowInterface, it gets a pointer to that interface. ...
CRemoteChannelElement_var remote_side
static DataFlowInterface * getLocalInterface(CDataFlowInterface_ptr objref)
sequence< string > CPortNames
CDataFlowInterface_i(DataFlowInterface *interface, PortableServer::POA_ptr poa)
virtual bool createStream(ConnPolicy const &policy)=0
Creates a data stream from or to this port using connection-less transports.
#define CORBA_CHECK_THREAD()
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
virtual bool connected() const =0
Returns true if this port is connected.
The base class of every data flow port.
std::string name_id
The name of this connection.
PortableServer::POA_ptr _default_POA()
Represents a remote data flow interface.
PortableServer::POA_var mpoa
Extends the TypeTransporter in order to allow the creation of channel elements or output halves for a...
base::ChannelElementBase::shared_ptr buildChannelOutput(base::InputPortInterface &port) const
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
void setCDataFlowInterface(CDataFlowInterface_i *dataflow)