42 #include "../../DataFlowInterface.hpp" 47 #include "../../internal/ConnID.hpp" 48 #include "../../rtt-detail-fwd.hpp" 53 template<
typename BaseClass>
55 CDataFlowInterface_ptr dataflow,
56 std::string
const& name,
57 PortableServer::POA_ptr poa)
59 , type_info(type_info)
61 , mpoa(PortableServer::POA::_duplicate(poa)) { }
63 template<
typename BaseClass>
65 {
return CDataFlowInterface::_duplicate(
dataflow); }
66 template<
typename BaseClass>
68 template<
typename BaseClass>
70 template<
typename BaseClass>
73 return dataflow->isConnected(this->getName().c_str());
75 template<
typename BaseClass>
78 dataflow->disconnectPort(this->getName().c_str());
82 template<
typename BaseClass>
84 {
return PortableServer::POA::_duplicate(
mpoa); }
86 template<
typename BaseClass>
90 template<
typename BaseClass>
93 log(
Error) <<
"Can't create a data stream on a remote port !" <<endlog();
97 template<
typename BaseClass>
100 assert(
false &&
"Can/Should not add connection to remote port object !");
104 template<
typename BaseClass>
111 CDataFlowInterface_ptr
dataflow, std::string
const& reader_port,
112 PortableServer::POA_ptr poa)
120 {
throw std::runtime_error(
"InputPort::getDataSource() is not supported in CORBA port proxies"); }
129 Logger::In in(
"RemoteInputPort::buildRemoteChannelOutput");
133 CRemoteChannelElement_var remote;
137 CChannelElement_var ret =
dataflow->buildChannelOutput(
getName().c_str(), cpolicy);
138 if ( CORBA::is_nil(ret) ) {
141 remote = CRemoteChannelElement::_narrow( ret.in() );
145 catch(CORBA::Exception& e)
147 log(
Error) <<
"Caught CORBA exception while creating a remote channel output:" << endlog();
158 CRemoteChannelElement_var proxy = local->_this();
159 local->setRemoteSide(remote);
160 remote->setRemoteSide(proxy.in());
161 local->_remove_ref();
173 log(
Error) <<
"Could not create out-of-band transport for port "<< name <<
" with transport id " << policy.
transport <<endlog();
174 log(
Error) <<
"No such transport registered. Check your policy.transport settings or add the transport for type "<< type->
getTypeName() <<endlog();
179 ceb->connectTo( corba_ceb, policy.
mandatory );
181 log(
Info) <<
"Redirecting data for port "<<name <<
" to out-of-band protocol "<< policy.
transport << endlog();
183 log(
Error) <<
"The type transporter for type "<<type->
getTypeName()<<
" failed to create a dual channel for port " << name<<endlog();
191 buf->connectTo( corba_ceb, policy.
mandatory );
202 Logger::In in(
"RemoteInputPort::createConnection");
206 cpolicy.
name_id = CORBA::string_dup( shared_connection->getName().c_str() );
207 if (
dataflow->createSharedConnection( this->getName().c_str(), cpolicy ) ) {
213 catch(CORBA::Exception& e)
215 log(
Error) <<
"Caught CORBA exception while trying to add an input port to an existing connection:" << endlog();
220 log(
Error) <<
"Failed to connect remote InputPort '" <<
getName() <<
"' to shared connection '" << shared_connection->getName() <<
"', " 221 <<
"most likely because you tried to connect input ports in different processes." << endlog();
239 CDataFlowInterface_ptr
dataflow, std::string
const& reader_port,
240 PortableServer::POA_ptr poa)
248 {
throw std::runtime_error(
"OutputPort::keepLastWrittenValue() is not supported in CORBA port proxies"); }
262 Logger::In in(
"RemoteOutputPort::disconnect(PortInterface& port)");
263 log(
Error) <<
"Port: " << port->
getName() <<
" could not be disconnected from: " << this->
getName()
264 <<
" because it could not be casted to a RemoteInputPort type!" << nlog()
265 <<
"Only disconnect of two remote ports supported by corba layer, yet!" << endlog();
283 if (
dataflow->createConnection( this->getName().c_str(), cdfi.in() , sink.
getName().c_str(), cpolicy ) ) {
293 log(
Error)<<
"RemotePort connection is only possible if the local port '"<<sink.
getName()<<
"' is added to a DataFlowInterface. Use addPort for this."<<endlog();
297 if (
dataflow->createConnection( this->getName().c_str(), cdfi , sink.
getName().c_str(), cpolicy ) ) {
303 catch(CORBA::Exception& e)
305 log(
Error) <<
"Remote call to "<<
getName() <<
".createConnection() failed with a CORBA exception: aborting connection."<<endlog();
static CDataFlowInterface_ptr getRemoteInterface(DataFlowInterface *dfi, PortableServer::POA_ptr poa)
Returns an object reference to a remote interface.
bool createStream(const ConnPolicy &policy)
An interface to access the dataflow of a CControlTask object.
boost::intrusive_ptr< SharedConnectionBase > shared_ptr
virtual base::DataSourceBase::shared_ptr getDataSource() const
Returns a Data source that stores the last written value, or a null pointer if this port does not kee...
const std::string & getTypeName() const
Return the type name which was first registered.
The base class for all internal data representations.
CDataFlowInterface_var dataflow
types::TypeInfo const * type_info
base::PortInterface * antiClone() const
Create a local clone of this port with the same name.
int data_size
Suggest the payload size of the data sent over this channel.
base::ChannelElementBase::shared_ptr buildDataStorage(ConnPolicy const &policy) const
Creates single data or buffered storage for this type.
#define CORBA_EXCEPTION_INFO(x)
const std::string & getName() const
Get the name of this Port.
bool keepsLastWrittenValue() const
Returns true if this port records the last written value.
virtual bool addConnection(internal::ConnID *port_id, base::ChannelElementBase::shared_ptr channel_input, ConnPolicy const &policy)
Base class for CORBA channel servers.
A connection policy object describes how a given connection should behave.
The base class of each OutputPort.
types::TypeInfo const * getTypeInfo() const
DataFlowInterface * getInterface() const
Returns the DataFlowInterface this port belongs to or null if it was not added to such an interface...
virtual void disconnect()
void keepLastWrittenValue(bool new_flag)
Change the setting for keeping the last written value.
bool pull
If true, then the sink will have to pull data.
virtual void disconnect()=0
Removes any connection that either go to or come from this port.
bool mandatory
Whether the connection described by this connection policy is mandatory, which means that write opera...
Convenient short notation for every sub-namespace of RTT.
Contains the common CORBA management code for proxy port objects representing ports available through...
PortableServer::POA_ptr _default_POA()
bool createConnection(base::InputPortInterface &sink, ConnPolicy const &policy)
Connects this write port to the given read port, using the given connection policy.
base::ChannelElementBase * getEndpoint() const
virtual base::ChannelElementBase::shared_ptr createStream(base::PortInterface *port, const ConnPolicy &policy, bool is_sender) const =0
Creates a streaming channel element for reading or writing over this transport.
Represents a connection to a remote CORBA port.
A class for representing a user type, and which can build instances of that type. ...
internal::ConnID * getPortID() const
PortableServer::POA_var mpoa
RemoteOutputPort(types::TypeInfo const *type_info, CDataFlowInterface_ptr dataflow, std::string const &name, PortableServer::POA_ptr poa)
CDataFlowInterface_ptr getDataFlowInterface() const
boost::intrusive_ptr< ChannelElementBase > shared_ptr
RTT::corba::CConnPolicy toCORBA(RTT::ConnPolicy const &policy)
Converts a RTT ConnPolicy object to a Corba CConPolicy object.
TypeTransporter * getProtocol(int protocol_id) const
Returns this type's transport for a given protocol.
int serverProtocol() const
Notify the Logger in which 'module' the message occured.
#define ORO_CORBA_PROTOCOL_ID
base::OutputPortInterface * outputPort(std::string const &name) const
Returns a new OutputPort<T> object where T is the type represented by this TypeInfo object...
int transport
The prefered transport used.
base::PortInterface * clone() const
Create a local clone of this port with the same name.
This class is used in places where a permanent representation of a reference to a connection is neede...
boost::intrusive_ptr< DataSourceBase > shared_ptr
Use this type to store a pointer to a DataSourceBase.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
The base class of every data flow port.
std::string name_id
The name of this connection.
base::InputPortInterface * inputPort(std::string const &name) const
Returns a new InputPort<T> object where T is the type represented by this TypeInfo object...
Extends the TypeTransporter in order to allow the creation of channel elements or output halves for a...