28 #include "ReportingComponent.hpp" 29 #include <rtt/Logger.hpp> 32 #include "EmptyMarshaller.hpp" 33 #include <rtt/marsh/PropertyDemarshaller.hpp> 34 #include <rtt/marsh/PropertyMarshaller.hpp> 38 #include <boost/algorithm/string.hpp> 41 #include <rtt/types/PropertyDecomposition.hpp> 42 #include <boost/lexical_cast.hpp> 44 ORO_CREATE_COMPONENT_TYPE()
58 DataSource<int>::shared_ptr mds;
59 DataSource<bool>::shared_ptr mupstream;
61 CheckSizeDataSource(
int size, DataSource<int>::shared_ptr ds, DataSource<bool>::shared_ptr upstream)
62 : msize(size), mds(ds), mupstream(upstream)
73 result = (mupstream->get() && msize == mds->get());
75 result = (msize == mds->get());
87 bool memberDecomposition( base::DataSourceBase::shared_ptr dsb, PropertyBag& targetbag, DataSource<bool>::shared_ptr& resized)
91 vector<string> parts = dsb->getMemberNames();
92 if ( parts.empty() ) {
96 targetbag.setType( dsb->getTypeName() );
99 auto_ptr< Property<PropertyBag> > recurse_bag(
new Property<PropertyBag>(
"recurse_bag",
"") );
101 for(vector<string>::iterator it = parts.begin(); it != parts.end(); ++it ) {
103 DataSourceBase::shared_ptr part = dsb->getMember( *it );
105 log(Error) <<
"memberDecomposition: Inconsistent type info for "<< dsb->getTypeName() <<
": reported to have part '"<<*it<<
"' but failed to return it."<<endlog();
108 if ( !part->isAssignable() ) {
110 log(Debug)<<
"memberDecomposition: Part "<< *it <<
":"<< part->getTypeName() <<
" is not changeable."<<endlog();
114 DataSourceBase::shared_ptr ref = part->getTypeInfo()->buildReference( 0 );
115 dsb->getTypeInfo()->getMember( dynamic_cast<Reference*>(ref.get() ), dsb, *it);
117 PropertyBase* newpb = part->getTypeInfo()->buildProperty(*it,
"Part", ref);
119 log(Error)<<
"Decomposition failed because Part '"<<*it<<
"' is not known to type system."<<endlog();
124 assert( recurse_bag->value().empty() );
126 base::DataSourceBase::shared_ptr converted = newpb->getTypeInfo()->convertType( dsb );
127 if ( converted && converted != dsb ) {
129 targetbag.add( converted->getTypeInfo()->buildProperty(*it,
"", converted) );
132 targetbag.ownProperty( newpb );
134 recurse_bag->setName(*it);
136 targetbag.ownProperty( recurse_bag.release() );
137 recurse_bag.reset(
new Property<PropertyBag>(
"recurse_bag",
"") );
145 DataSource<int>::shared_ptr size = DataSource<int>::narrow( dsb->getMember(
"size").get() );
147 int msize = size->get();
148 for (
int i=0; i < msize; ++i) {
149 string indx = boost::lexical_cast<
string>( i );
150 DataSourceBase::shared_ptr item = dsb->getMember(indx);
153 if ( !item->isAssignable() ) {
155 log(Warning)<<
"memberDecomposition: Item '"<< indx <<
"' of type "<< dsb->getTypeName() <<
" is not changeable."<<endlog();
159 PropertyBase* newpb = item->getTypeInfo()->buildProperty( indx,
"",item);
161 targetbag.ownProperty( newpb );
164 recurse_bag->setName( indx );
166 targetbag.ownProperty( recurse_bag.release() );
167 recurse_bag.reset(
new Property<PropertyBag>(
"recurse_bag",
"") );
172 if (targetbag.empty() )
173 log(Debug) <<
"memberDecomposition: "<< dsb->getTypeName() <<
" returns an empty property bag." << endlog();
177 ReportingComponent::ReportingComponent( std::string name )
178 : TaskContext( name ),
179 report(
"Report"), snapshotted(false),
180 writeHeader(
"WriteHeader",
"Set to true to start each report with a header.", true),
181 decompose(
"Decompose",
"Set to false in order to not decompose the port data. The marshaller must be able to handle this itself for this to work.", true),
182 insnapshot(
"Snapshot",
"Set to true to enable snapshot mode. This will cause a non-periodic reporter to only report data upon the snapshot() operation.",false),
183 synchronize_with_logging(
"Synchronize",
"Set to true if the timestamp should be synchronized with the logging",false),
184 report_data(
"ReportData",
"A PropertyBag which defines which ports or components to report."),
185 report_policy( ConnPolicy::data(ConnPolicy::LOCK_FREE,true,false) ),
188 timestamp(
"TimeStamp",
"The time at which the data was read.",0.0)
190 this->provides()->doc(
"Captures data on data ports. A periodic reporter will sample each added port according to its period, a non-periodic reporter will write out data as it comes in, or only during a snapshot() if the Snapshot property is true.");
192 this->properties()->addProperty( writeHeader );
193 this->properties()->addProperty( decompose );
194 this->properties()->addProperty( insnapshot );
195 this->properties()->addProperty( synchronize_with_logging);
196 this->properties()->addProperty( report_data);
197 this->properties()->addProperty(
"ReportPolicy", report_policy).doc(
"The ConnPolicy for the reporter's port connections.");
198 this->properties()->addProperty(
"ReportOnlyNewData", onlyNewData).doc(
"Turn on in order to only write out NewData on ports and omit unchanged ports. Turn off in order to sample and write out all ports (even old data).");
202 this->addOperation(
"snapshot", &
ReportingComponent::snapshot ,
this, RTT::OwnThread).doc(
"Take a new shapshot of all data and cause them to be written out.");
203 this->addOperation(
"screenComponent", &
ReportingComponent::screenComponent ,
this, RTT::ClientThread).doc(
"Display the variables and ports of a Component.").arg(
"Component",
"Name of the Component");
204 this->addOperation(
"reportComponent", &
ReportingComponent::reportComponent ,
this, RTT::ClientThread).doc(
"Add a peer Component and report all its data ports").arg(
"Component",
"Name of the Component");
205 this->addOperation(
"unreportComponent", &
ReportingComponent::unreportComponent ,
this, RTT::ClientThread).doc(
"Remove all Component's data ports from reporting.").arg(
"Component",
"Name of the Component");
206 this->addOperation(
"reportData", &
ReportingComponent::reportData ,
this, RTT::ClientThread).doc(
"Add a Component's Property or attribute for reporting.").arg(
"Component",
"Name of the Component").arg(
"Data",
"Name of the Data to report. A property's or attribute's name.");
207 this->addOperation(
"unreportData", &
ReportingComponent::unreportData ,
this, RTT::ClientThread).doc(
"Remove a Data object from reporting.").arg(
"Component",
"Name of the Component").arg(
"Data",
"Name of the property or attribute.");
208 this->addOperation(
"reportPort", &
ReportingComponent::reportPort ,
this, RTT::ClientThread).doc(
"Add a Component's OutputPort for reporting.").arg(
"Component",
"Name of the Component").arg(
"Port",
"Name of the Port.");
209 this->addOperation(
"unreportPort", &
ReportingComponent::unreportPort ,
this, RTT::ClientThread).doc(
"Remove a Port from reporting.").arg(
"Component",
"Name of the Component").arg(
"Port",
"Name of the Port.");
213 ReportingComponent::~ReportingComponent() {}
218 boost::shared_ptr<marsh::MarshallInterface> header(headerM);
219 boost::shared_ptr<marsh::MarshallInterface> body(bodyM);
220 if ( !header && !body)
227 marshallers.push_back( std::make_pair( header, body ) );
240 deletePropertyBag( report );
245 Logger::In in(
"ReportingComponent");
248 PropertyBag bag = report_data.value();
251 log(Error) <<
"No port or component configuration loaded."<<endlog();
252 log(Error) <<
"Please use marshalling.loadProperties(), reportComponent() (scripting) or LoadProperties (XML) in order to fill in ReportData." <<endlog();
257 PropertyBag::const_iterator it = bag.getProperties().begin();
258 while ( it != bag.getProperties().end() )
260 Property<std::string>* compName =
dynamic_cast<Property<std::string>*
>( *it );
262 log(Error) <<
"Expected Property \"" 263 << (*it)->getName() <<
"\" to be of type string."<< endlog();
264 else if ( compName->getName() ==
"Component" ) {
265 std::string name = compName->value();
269 else if ( compName->getName() ==
"Port" ) {
270 string cname = compName->value().substr(0, compName->value().find(
"."));
271 string pname = compName->value().substr( compName->value().find(
".")+1, string::npos);
272 if (cname.empty() || pname.empty() ) {
273 log(Error) <<
"The Port value '"<<compName->getName()<<
"' must at least consist of a component name followed by a dot and the port name." <<endlog();
280 else if ( compName->getName() ==
"Data" ) {
281 string cname = compName->value().substr(0, compName->value().find(
"."));
282 string pname = compName->value().substr( compName->value().find(
".")+1, string::npos);
283 if (cname.empty() || pname.empty() ) {
284 log(Error) <<
"The Data value '"<<compName->getName()<<
"' must at least consist of a component name followed by a dot and the property/attribute name." <<endlog();
292 log(Error) <<
"Expected \"Component\", \"Port\" or \"Data\", got " 293 << compName->getName() << endlog();
303 Logger::In in(
"ReportingComponent::screenComponent");
304 log(Error) <<
"not implemented." <<comp<<endlog();
310 Logger::In in(
"ReportingComponent");
311 TaskContext* c = this->getPeer(comp);
313 log(Error) <<
"Unknown Component: " <<comp<<endlog();
316 output <<
"Screening Component '"<< comp <<
"' : "<< endl << endl;
317 PropertyBag* bag = c->properties();
319 output <<
"Properties :" << endl;
320 for (PropertyBag::iterator it= bag->begin(); it != bag->end(); ++it)
321 output <<
" " << (*it)->getName() <<
" : " << (*it)->getDataSource() << endl;
323 ConfigurationInterface::AttributeNames atts = c->provides()->getAttributeNames();
324 if ( !atts.empty() ) {
325 output <<
"Attributes :" << endl;
326 for (ConfigurationInterface::AttributeNames::iterator it= atts.begin(); it != atts.end(); ++it)
327 output <<
" " << *it <<
" : " << c->provides()->getValue(*it)->getDataSource() << endl;
330 vector<string> ports = c->ports()->getPortNames();
331 if ( !ports.empty() ) {
332 output <<
"Ports :" << endl;
333 for (vector<string>::iterator it= ports.begin(); it != ports.end(); ++it) {
334 output <<
" " << *it <<
" : ";
335 if (c->ports()->getPort(*it)->connected() )
336 output <<
"(connected)" << endl;
338 output <<
"(not connected)" << endl;
345 Logger::In in(
"ReportingComponent");
348 TaskContext* comp = this->getPeer(component);
350 log(Error) <<
"Could not report Component " << component <<
" : no such peer."<<endlog();
353 if ( !report_data.value().findValue<
string>(component) )
354 report_data.value().ownProperty(
new Property<string>(
"Component",
"",component) );
355 Ports ports = comp->ports()->getPorts();
356 for (Ports::iterator it = ports.begin(); it != ports.end() ; ++it) {
357 log(Debug) <<
"Checking port " << (*it)->getName()<<
"."<<endlog();
358 this->
reportPort( component, (*it)->getName() );
365 TaskContext* comp = this->getPeer(component);
367 log(Error) <<
"Could not unreport Component " << component <<
" : no such peer."<<endlog();
370 Ports ports = comp->ports()->getPorts();
371 for (Ports::iterator it = ports.begin(); it != ports.end() ; ++it) {
372 this->unreportDataSource( component +
"." + (*it)->getName() );
375 base::PropertyBase* pb = report_data.value().findValue<
string>(component);
377 report_data.value().removeProperty( pb );
383 Logger::In in(
"ReportingComponent");
384 TaskContext* comp = this->getPeer(component);
385 if ( this->ports()->getPort(component +
"_"+port) ) {
386 log(Warning) <<
"Already reporting "<<component<<
"."<<port<<
": removing old port first."<<endlog();
390 log(Error) <<
"Could not report Component " << component <<
" : no such peer."<<endlog();
393 std::vector<std::string> strs;
394 boost::split(strs, port, boost::is_any_of(
"."));
397 if (strs.empty())
return false;
399 Service::shared_ptr service=comp->provides();
400 while ( strs.size() != 1 && service) {
401 service = service->getService( strs.front() );
403 strs.erase( strs.begin() );
406 log(Error) <<
"No such service: '"<< strs.front() <<
"' while looking for port '"<< port<<
"'"<<endlog();
409 base::PortInterface* porti = 0;
410 porti = service->getPort(strs.front());
412 log(Error) <<
"Could not report Port " << port
413 <<
" : no such port on Component "<<component<<
"."<<endlog();
417 base::InputPortInterface* ipi =
dynamic_cast<base::InputPortInterface*
>(porti);
419 log(Error) <<
"Can not report InputPort "<< porti->getName() <<
" of Component " << component <<endlog();
425 base::PortInterface* ourport = porti->antiClone();
427 ourport->setName(component +
"_" + port);
428 ipi =
dynamic_cast<base::InputPortInterface*
> (ourport);
431 if (report_policy.type == ConnPolicy::DATA ) {
432 log(Info) <<
"Not buffering of data flow connections. You may miss samples." <<endlog();
434 log(Info) <<
"Buffering ports with size "<< report_policy.size <<
", as set in ReportPolicy property." <<endlog();
437 this->ports()->addEventPort( *ipi );
438 if (porti->connectTo(ourport, report_policy ) ==
false)
440 log(Error) <<
"Could not connect to OutputPort " << porti->getName() << endlog();
441 this->ports()->removePort(ourport->getName());
446 if (this->reportDataSource(component +
"." + port,
"Port",
447 ipi->getDataSource(),ipi,
true) ==
false)
449 log(Error) <<
"Failed reporting port " << port << endlog();
450 this->ports()->removePort(ourport->getName());
455 log(Info) <<
"Monitoring OutputPort " << port <<
" : ok." << endlog();
457 if ( !report_data.value().findValue<
string>(component) && !report_data.value().findValue<
string>( component+
"."+port) )
458 report_data.value().ownProperty(
new Property<string>(
"Port",
"",component+
"."+port));
463 base::PortInterface* ourport = this->ports()->getPort(component +
"_" + port);
464 if ( this->unreportDataSource( component +
"." + port ) && report_data.value().removeProperty( report_data.value().findValue<
string>(component+
"."+port))) {
465 this->ports()->removePort(ourport->getName());
475 Logger::In in(
"ReportingComponent");
476 TaskContext* comp = this->getPeer(component);
478 log(Error) <<
"Could not report Component " << component <<
" : no such peer."<<endlog();
482 if ( comp->provides()->getValue( dataname ) ) {
483 if (this->reportDataSource( component +
"." + dataname,
"Data",
484 comp->provides()->getValue( dataname )->getDataSource(), 0, false ) ==
false) {
485 log(Error) <<
"Failed reporting data " << dataname <<endlog();
491 if ( comp->properties() && comp->properties()->find( dataname ) ) {
492 if (this->reportDataSource( component +
"." + dataname,
"Data",
493 comp->properties()->find( dataname )->getDataSource(), 0, false ) ==
false) {
494 log(Error) <<
"Failed reporting data " << dataname <<endlog();
500 if ( !report_data.value().findValue<
string>( component+
"."+dataname) )
501 report_data.value().ownProperty(
new Property<string>(
"Data",
"",component+
"."+dataname));
506 return this->unreportDataSource( component +
"." + datasource) && report_data.value().removeProperty( report_data.value().findValue<
string>(component+
"."+datasource));
509 bool ReportingComponent::reportDataSource(std::string tag, std::string type, base::DataSourceBase::shared_ptr orig, base::InputPortInterface* ipi,
bool track)
512 for (Reports::iterator it = root.begin();
513 it != root.end(); ++it)
514 if ( it->get<T_QualName>() == tag ) {
520 base::DataSourceBase::shared_ptr clone = orig->getTypeInfo()->buildValue();
522 log(Error) <<
"Could not report '"<< tag <<
"' : unknown type." << endlog();
525 PropertyBase* prop = 0;
526 root.push_back( boost::make_tuple( tag, orig, type, prop, ipi,
false, track ) );
530 bool ReportingComponent::unreportDataSource(std::string tag)
532 for (Reports::iterator it = root.begin();
533 it != root.end(); ++it)
534 if ( it->get<T_QualName>() == tag ) {
541 bool ReportingComponent::startHook() {
542 Logger::In in(
"ReportingComponent");
543 if (marshallers.begin() == marshallers.end()) {
544 log(Error) <<
"Need at least one marshaller to write reports." <<endlog();
548 if(synchronize_with_logging.get())
549 starttime = Logger::Instance()->getReferenceTime();
551 starttime = os::TimeService::Instance()->getTicks();
558 if (writeHeader.get()) {
560 for(Marshallers::iterator it=marshallers.begin(); it != marshallers.end(); ++it) {
561 it->first->serialize( report );
567 if ( getActivity()->isPeriodic() ) {
568 for(Marshallers::iterator it=marshallers.begin(); it != marshallers.end(); ++it) {
569 it->second->serialize( report );
576 for(Reports::iterator it = root.begin(); it != root.end(); ++it )
577 if ( it->get<T_Port>() ) {
578 #ifndef ORO_SIGNALLING_PORTS 579 it->get<T_Port>()->signalInterface( !insnapshot.get() );
581 it->get<T_Port>()->clear();
591 if ( getActivity()->isPeriodic() )
598 timestamp = os::TimeService::Instance()->secondsSince( starttime );
603 for(Reports::iterator it = root.begin(); it != root.end(); ++it ) {
604 it->get<T_NewData>() = (it->get<T_PortDS>())->evaluate();
606 result = result || ( it->get<T_NewData>() && it->get<T_Tracked>() );
611 void ReportingComponent::makeReport2()
614 assert( report.empty() );
616 report.add( timestamp.getTypeInfo()->buildProperty( timestamp.getName(),
"", timestamp.getDataSource() ) );
617 DataSource<bool>::shared_ptr checker;
618 for(Reports::iterator it = root.begin(); it != root.end(); ++it ) {
619 Property<PropertyBag>* subbag =
new Property<PropertyBag>( it->get<T_QualName>(),
"");
620 if ( decompose.get() &&
memberDecomposition( it->get<T_PortDS>(), subbag->value(), checker ) ) {
621 report.add( subbag );
622 it->get<T_Property>() = subbag;
625 base::DataSourceBase::shared_ptr converted = it->get<T_PortDS>()->getTypeInfo()->convertType( it->get<T_PortDS>() );
626 if ( converted && converted != it->get<T_PortDS>() ) {
628 PropertyBase* convProp = converted->getTypeInfo()->buildProperty(it->get<T_QualName>(),
"", converted);
629 it->get<T_Property>() = convProp;
630 report.add(convProp);
632 PropertyBase* origProp = it->get<T_PortDS>()->getTypeInfo()->buildProperty(it->get<T_QualName>(),
"", it->get<T_PortDS>());
633 it->get<T_Property>() = origProp;
634 report.add(origProp);
646 deletePropertyBag( report );
651 if( !getActivity()->isPeriodic() && insnapshot.get() && !
snapshotted)
667 for(Marshallers::iterator it=marshallers.begin(); it != marshallers.end(); ++it) {
670 it->second->serialize( *report.begin() );
671 for (Reports::const_iterator i = root.begin();
675 if ( i->get<T_NewData>() )
676 it->second->serialize( i->get<T_Property>() );
680 it->second->serialize( report );
684 }
while( !getActivity()->isPeriodic() && !insnapshot.get() &&
copydata() );
687 void ReportingComponent::stopHook() {
689 for(Marshallers::iterator it=marshallers.begin(); it != marshallers.end(); ++it) {
bool snapshotted
Used to communicate between snapshot() and updateHook() if updateHook needs to make a copy...
virtual bool configureHook()
Implementation of base::TaskCore::configureHook().
void cleanReport()
Implementation of base::TaskCore::configureHook().
Helper data source to check if two sizes are still equal and check an upstream comparison as well...
bool addMarshaller(RTT::marsh::MarshallInterface *headerM, RTT::marsh::MarshallInterface *bodyM)
Adds a Plugin to receive incomming data.
bool unreportPort(const std::string &component, const std::string &port)
Unreport a specific data port of a component.
virtual void updateHook()
This not real-time function processes the copied data.
A Dummy Empty MarshallInterface.
RTT::internal::DataSource< bool >::shared_ptr mchecker
If false, a sequence size has changed.
void snapshot()
Copy the reported data and trigger the generation of a sampling line.
This file contains the macros and definitions to create dynamically loadable components.
bool removeMarshallers()
Remove and delete all added Marshallers.
The Orocos Component Library.
virtual bool screenComponent(const std::string &comp)
Write state information of a component.
bool reportPort(const std::string &component, const std::string &port)
Report a specific data port of a component.
bool unreportData(const std::string &component, const std::string &datasource)
Unreport a specific data source of a component.
bool reportComponent(const std::string &component)
Report all the data ports of a component.
bool screenImpl(const std::string &comp, std::ostream &output)
This method writes out the status of a component's interface.
bool unreportComponent(const std::string &component)
Unreport the data ports of a component.
bool memberDecomposition(base::DataSourceBase::shared_ptr dsb, PropertyBag &targetbag, DataSource< bool >::shared_ptr &resized)
Decompose a given type using getMember() into a property tree.
bool copydata()
This real-time function makes copies of the data to be reported.
bool reportData(const std::string &component, const std::string &dataname)
Report a specific data source of a component.
virtual void cleanupHook()
Implementation of base::TaskCore::cleanupHook().