42 #include "../../DataFlowInterface.hpp" 47 #include "../../internal/ConnID.hpp" 48 #include "../../rtt-detail-fwd.hpp" 53 template<
typename BaseClass>
55 CDataFlowInterface_ptr dataflow,
56 std::string
const& name,
57 PortableServer::POA_ptr poa)
59 , type_info(type_info)
61 , mpoa(PortableServer::POA::_duplicate(poa)) { }
63 template<
typename BaseClass>
65 {
return CDataFlowInterface::_duplicate(
dataflow); }
66 template<
typename BaseClass>
68 template<
typename BaseClass>
70 template<
typename BaseClass>
73 return dataflow->isConnected(this->getName().c_str());
75 template<
typename BaseClass>
78 dataflow->disconnectPort(this->getName().c_str());
82 template<
typename BaseClass>
84 {
return PortableServer::POA::_duplicate(
mpoa); }
86 template<
typename BaseClass>
90 template<
typename BaseClass>
93 log(
Error) <<
"Can't create a data stream on a remote port !" <<endlog();
97 template<
typename BaseClass>
100 assert(
false &&
"Can/Should not add connection to remote port object !");
106 CDataFlowInterface_ptr
dataflow, std::string
const& reader_port,
107 PortableServer::POA_ptr poa)
112 {
throw std::runtime_error(
"InputPort::getDataSource() is not supported in CORBA port proxies"); }
121 Logger::In in(
"RemoteInputPort::buildRemoteChannelOutput");
125 CRemoteChannelElement_var remote;
129 CChannelElement_var ret =
dataflow->buildChannelOutput(
getName().c_str(), cpolicy);
130 if ( CORBA::is_nil(ret) ) {
133 remote = CRemoteChannelElement::_narrow( ret.in() );
136 catch(CORBA::Exception& e)
138 log(
Error) <<
"Caught CORBA exception while creating a remote channel output:" << endlog();
149 CRemoteChannelElement_var proxy = local->_this();
150 local->setRemoteSide(remote);
151 remote->setRemoteSide(proxy.in());
152 local->_remove_ref();
164 log(
Error) <<
"Could not create out-of-band transport for port "<< name <<
" with transport id " << policy.
transport <<endlog();
165 log(
Error) <<
"No such transport registered. Check your policy.transport settings or add the transport for type "<< type->
getTypeName() <<endlog();
170 ceb->setOutput( corba_ceb );
172 log(
Info) <<
"Redirecting data for port "<<name <<
" to out-of-band protocol "<< policy.
transport << endlog();
174 log(
Error) <<
"The type transporter for type "<<type->
getTypeName()<<
" failed to create a dual channel for port " << name<<endlog();
180 buf->setOutput( corba_ceb );
186 channel_map[ corba_ceb->getOutputEndPoint().get() ] = CChannelElement::_duplicate( remote );
205 if (! channel_map.count( channel.get() ) ) {
206 log(
Error) <<
"No such channel found in "<<
getName() <<
".channelReady( channel ): aborting connection."<<endlog();
210 CChannelElement_ptr cce = channel_map[ channel.get() ];
213 return dataflow->channelReady( this->
getName().c_str(), cce, cpolicy );
215 catch(CORBA::Exception& e)
217 log(
Error) <<
"Remote call to "<<
getName() <<
".channelReady( channel ) failed with a CORBA exception: aborting connection."<<endlog();
224 CDataFlowInterface_ptr
dataflow, std::string
const& reader_port,
225 PortableServer::POA_ptr poa)
233 {
throw std::runtime_error(
"OutputPort::keepLastWrittenValue() is not supported in CORBA port proxies"); }
247 Logger::In in(
"RemoteOutputPort::disconnect(PortInterface& port)");
248 log(
Error) <<
"Port: " << port->
getName() <<
" could not be disconnected from: " << this->
getName()
249 <<
" because it could not be casted to a RemoteInputPort type!" << nlog()
250 <<
"Only disconnect of two remote ports supported by corba layer, yet!" << endlog();
268 if (
dataflow->createConnection( this->getName().c_str(), cdfi.in() , sink.
getName().c_str(), cpolicy ) ) {
277 log(
Error)<<
"RemotePort connection is only possible if the local port '"<<sink.
getName()<<
"' is added to a DataFlowInterface. Use addPort for this."<<endlog();
281 if (
dataflow->createConnection( this->getName().c_str(), cdfi , sink.
getName().c_str(), cpolicy ) ) {
286 catch(CORBA::Exception& e)
288 log(
Error) <<
"Remote call to "<<
getName() <<
".createConnection() failed with a CORBA exception: aborting connection."<<endlog();
static CDataFlowInterface_ptr getRemoteInterface(DataFlowInterface *dfi, PortableServer::POA_ptr poa)
Returns an object reference to a remote interface.
bool createStream(const ConnPolicy &policy)
An interface to access the dataflow of a CControlTask object.
virtual base::DataSourceBase::shared_ptr getDataSource() const
Returns a Data source that stores the last written value, or a null pointer if this port does not kee...
const std::string & getTypeName() const
Return the type name which was first registered.
The base class for all internal data representations.
CDataFlowInterface_var dataflow
types::TypeInfo const * type_info
base::PortInterface * antiClone() const
Create a local clone of this port with the same name.
base::ChannelElementBase::shared_ptr buildDataStorage(ConnPolicy const &policy) const
Creates single data or buffered storage for this type.
#define CORBA_EXCEPTION_INFO(x)
const std::string & getName() const
Get the name of this Port.
bool keepsLastWrittenValue() const
Returns true if this port records the last written value.
virtual bool addConnection(internal::ConnID *port_id, base::ChannelElementBase::shared_ptr channel_input, ConnPolicy const &policy)
Base class for CORBA channel servers.
A connection policy object describes how a given connection should behave.
The base class of each OutputPort.
RTT::ConnPolicy toRTT(RTT::corba::CConnPolicy const &corba_policy)
Converts a Corba CConnPolicy object to a RTT ConPolicy object.
types::TypeInfo const * getTypeInfo() const
DataFlowInterface * getInterface() const
Returns the DataFlowInterface this port belongs to or null if it was not added to such an interface...
virtual void disconnect()
void keepLastWrittenValue(bool new_flag)
Change the setting for keeping the last written value.
bool pull
If true, then the sink will have to pull data.
virtual void disconnect()=0
Removes any connection that either go to or come from this port.
Convenient short notation for every sub-namespace of RTT.
Contains the common CORBA management code for proxy port objects representing ports available through...
PortableServer::POA_ptr _default_POA()
bool createConnection(base::InputPortInterface &sink, ConnPolicy const &policy)
Connects this write port to the given read port, using the given connection policy.
virtual base::ChannelElementBase::shared_ptr createStream(base::PortInterface *port, const ConnPolicy &policy, bool is_sender) const =0
Creates a streaming channel element for reading or writing over this transport.
Represents a connection to a remote CORBA port.
A class for representing a user type, and which can build instances of that type. ...
internal::ConnID * getPortID() const
PortableServer::POA_var mpoa
RemoteOutputPort(types::TypeInfo const *type_info, CDataFlowInterface_ptr dataflow, std::string const &name, PortableServer::POA_ptr poa)
CDataFlowInterface_ptr getDataFlowInterface() const
boost::intrusive_ptr< ChannelElementBase > shared_ptr
RTT::corba::CConnPolicy toCORBA(RTT::ConnPolicy const &policy)
Converts a RTT ConnPolicy object to a Corba CConPolicy object.
TypeTransporter * getProtocol(int protocol_id) const
Returns this type's transport for a given protocol.
int serverProtocol() const
Notify the Logger in which 'module' the message occured.
#define ORO_CORBA_PROTOCOL_ID
base::OutputPortInterface * outputPort(std::string const &name) const
Returns a new OutputPort<T> object where T is the type represented by this TypeInfo object...
int transport
The prefered transport used.
base::PortInterface * clone() const
Create a local clone of this port with the same name.
This class is used in places where a permanent representation of a reference to a connection is neede...
boost::intrusive_ptr< DataSourceBase > shared_ptr
Use this type to store a pointer to a DataSourceBase.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
The base class of every data flow port.
std::string name_id
The name of this connection.
base::InputPortInterface * inputPort(std::string const &name) const
Returns a new InputPort<T> object where T is the type represented by this TypeInfo object...
Extends the TypeTransporter in order to allow the creation of channel elements or output halves for a...