Compare commits
7 Commits
mjcarroll/
...
irobot/eve
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c6d873d5fe | ||
|
|
5e1fc89175 | ||
|
|
3799e31088 | ||
|
|
4f33b20248 | ||
|
|
dabe7ad0b3 | ||
|
|
9b0752fe97 | ||
|
|
c7121d1375 |
@@ -51,6 +51,9 @@ set(${PROJECT_NAME}_SRCS
|
||||
src/rclcpp/duration.cpp
|
||||
src/rclcpp/event.cpp
|
||||
src/rclcpp/exceptions/exceptions.cpp
|
||||
src/rclcpp/experimental/executors/events_executor/events_executor_entities_collector.cpp
|
||||
src/rclcpp/experimental/executors/events_executor/events_executor.cpp
|
||||
src/rclcpp/experimental/timers_manager.cpp
|
||||
src/rclcpp/executable_list.cpp
|
||||
src/rclcpp/executor.cpp
|
||||
src/rclcpp/executors.cpp
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
#include "rclcpp/executors/multi_threaded_executor.hpp"
|
||||
#include "rclcpp/executors/single_threaded_executor.hpp"
|
||||
#include "rclcpp/executors/static_single_threaded_executor.hpp"
|
||||
#include "rclcpp/experimental/executors/events_executor/events_executor.hpp"
|
||||
#include "rclcpp/node.hpp"
|
||||
#include "rclcpp/utilities.hpp"
|
||||
#include "rclcpp/visibility_control.hpp"
|
||||
|
||||
@@ -0,0 +1,70 @@
|
||||
// Copyright 2023 iRobot Corporation.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENT_WAITABLE_HPP_
|
||||
#define RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENT_WAITABLE_HPP_
|
||||
|
||||
#include "rclcpp/waitable.hpp"
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
namespace experimental
|
||||
{
|
||||
namespace executors
|
||||
{
|
||||
|
||||
/**
|
||||
* @brief This class provides a wrapper around the waitable object, that is
|
||||
* meant to be used with the EventsExecutor.
|
||||
* The waitset related methods are stubbed out as they should not be called.
|
||||
* This class is abstract as the execute method of rclcpp::Waitable is not implemented.
|
||||
* Nodes who want to implement a custom EventWaitable, can derive from this class and override
|
||||
* the execute method.
|
||||
*/
|
||||
class EventWaitable : public rclcpp::Waitable
|
||||
{
|
||||
public:
|
||||
// Constructor
|
||||
RCLCPP_PUBLIC
|
||||
EventWaitable() = default;
|
||||
|
||||
// Destructor
|
||||
RCLCPP_PUBLIC
|
||||
virtual ~EventWaitable() = default;
|
||||
|
||||
// Stub API: not used by EventsExecutor
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
is_ready(rcl_wait_set_t * wait_set) final
|
||||
{
|
||||
(void)wait_set;
|
||||
throw std::runtime_error("EventWaitable can't be checked if it's ready");
|
||||
return false;
|
||||
}
|
||||
|
||||
// Stub API: not used by EventsExecutor
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
add_to_wait_set(rcl_wait_set_t * wait_set) final
|
||||
{
|
||||
(void)wait_set;
|
||||
throw std::runtime_error("EventWaitable can't be added to a wait_set");
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace executors
|
||||
} // namespace experimental
|
||||
} // namespace rclcpp
|
||||
|
||||
#endif // RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENT_WAITABLE_HPP_
|
||||
@@ -0,0 +1,237 @@
|
||||
// Copyright 2023 iRobot Corporation.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_HPP_
|
||||
#define RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_HPP_
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <queue>
|
||||
#include <vector>
|
||||
|
||||
#include "rclcpp/executor.hpp"
|
||||
#include "rclcpp/experimental/executors/events_executor/events_executor_entities_collector.hpp"
|
||||
#include "rclcpp/experimental/executors/events_executor/events_executor_event_types.hpp"
|
||||
#include "rclcpp/experimental/executors/events_executor/events_executor_notify_waitable.hpp"
|
||||
#include "rclcpp/experimental/executors/events_executor/events_queue.hpp"
|
||||
#include "rclcpp/experimental/executors/events_executor/simple_events_queue.hpp"
|
||||
#include "rclcpp/experimental/timers_manager.hpp"
|
||||
#include "rclcpp/node.hpp"
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
namespace experimental
|
||||
{
|
||||
namespace executors
|
||||
{
|
||||
|
||||
/// Events executor implementation
|
||||
/**
|
||||
* This executor uses an events queue and a timers manager to execute entities from its
|
||||
* associated nodes and callback groups.
|
||||
* The RMW listener APIs are used to collect new events.
|
||||
*
|
||||
* This executor tries to reduce as much as possible the amount of maintenance operations.
|
||||
* This allows to use customized `EventsQueue` classes to achieve different goals such
|
||||
* as very low CPU usage, bounded memory requirement, determinism, etc.
|
||||
*
|
||||
* The executor uses a weak ownership model and it locks entities only while executing
|
||||
* their related events.
|
||||
*
|
||||
* To run this executor:
|
||||
* rclcpp::experimental::executors::EventsExecutor executor;
|
||||
* executor.add_node(node);
|
||||
* executor.spin();
|
||||
* executor.remove_node(node);
|
||||
*/
|
||||
class EventsExecutor : public rclcpp::Executor
|
||||
{
|
||||
friend class EventsExecutorEntitiesCollector;
|
||||
|
||||
public:
|
||||
RCLCPP_SMART_PTR_DEFINITIONS(EventsExecutor)
|
||||
|
||||
/// Default constructor. See the default constructor for Executor.
|
||||
/**
|
||||
* \param[in] events_queue The queue used to store events.
|
||||
* \param[in] execute_timers_separate_thread If true, timers are executed in a separate
|
||||
* thread. If false, timers are executed in the same thread as all other entities.
|
||||
* \param[in] options Options used to configure the executor.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
explicit EventsExecutor(
|
||||
rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue = std::make_unique<
|
||||
rclcpp::experimental::executors::SimpleEventsQueue>(),
|
||||
bool execute_timers_separate_thread = false,
|
||||
const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions());
|
||||
|
||||
/// Default destructor.
|
||||
RCLCPP_PUBLIC
|
||||
virtual ~EventsExecutor();
|
||||
|
||||
/// Events executor implementation of spin.
|
||||
/**
|
||||
* This function will block until work comes in, execute it, and keep blocking.
|
||||
* It will only be interrupted by a CTRL-C (managed by the global signal handler).
|
||||
* \throws std::runtime_error when spin() called while already spinning
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
spin() override;
|
||||
|
||||
/// Events executor implementation of spin some
|
||||
/**
|
||||
* This non-blocking function will execute the timers and events
|
||||
* that were ready when this API was called, until timeout or no
|
||||
* more work available. New ready-timers/events arrived while
|
||||
* executing work, won't be taken into account here.
|
||||
*
|
||||
* Example:
|
||||
* while(condition) {
|
||||
* spin_some();
|
||||
* sleep(); // User should have some sync work or
|
||||
* // sleep to avoid a 100% CPU usage
|
||||
* }
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
spin_some(std::chrono::nanoseconds max_duration = std::chrono::nanoseconds(0)) override;
|
||||
|
||||
/// Events executor implementation of spin all
|
||||
/**
|
||||
* This non-blocking function will execute timers and events
|
||||
* until timeout or no more work available. If new ready-timers/events
|
||||
* arrive while executing work available, they will be executed
|
||||
* as long as the timeout hasn't expired.
|
||||
*
|
||||
* Example:
|
||||
* while(condition) {
|
||||
* spin_all();
|
||||
* sleep(); // User should have some sync work or
|
||||
* // sleep to avoid a 100% CPU usage
|
||||
* }
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
spin_all(std::chrono::nanoseconds max_duration) override;
|
||||
|
||||
/// Add a node to the executor.
|
||||
/**
|
||||
* \sa rclcpp::Executor::add_node
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
add_node(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
bool notify = true) override;
|
||||
|
||||
/// Convenience function which takes Node and forwards NodeBaseInterface.
|
||||
/**
|
||||
* \sa rclcpp::EventsExecutor::add_node
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true) override;
|
||||
|
||||
/// Remove a node from the executor.
|
||||
/**
|
||||
* \sa rclcpp::Executor::remove_node
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
remove_node(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
bool notify = true) override;
|
||||
|
||||
/// Convenience function which takes Node and forwards NodeBaseInterface.
|
||||
/**
|
||||
* \sa rclcpp::Executor::remove_node
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true) override;
|
||||
|
||||
/// Add a callback group to an executor.
|
||||
/**
|
||||
* \sa rclcpp::Executor::add_callback_group
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
add_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
bool notify = true) override;
|
||||
|
||||
/// Remove callback group from the executor
|
||||
/**
|
||||
* \sa rclcpp::Executor::remove_callback_group
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
remove_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
bool notify = true) override;
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
get_all_callback_groups() override;
|
||||
|
||||
/// Get callback groups that belong to executor.
|
||||
/**
|
||||
* \sa rclcpp::Executor::get_manually_added_callback_groups()
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
get_manually_added_callback_groups() override;
|
||||
|
||||
/// Get callback groups that belong to executor.
|
||||
/**
|
||||
* \sa rclcpp::Executor::get_automatically_added_callback_groups_from_nodes()
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
get_automatically_added_callback_groups_from_nodes() override;
|
||||
|
||||
protected:
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
spin_once_impl(std::chrono::nanoseconds timeout) override;
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive);
|
||||
|
||||
private:
|
||||
RCLCPP_DISABLE_COPY(EventsExecutor)
|
||||
|
||||
// Execute a single event
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
execute_event(const ExecutorEvent & event);
|
||||
|
||||
// Queue where entities can push events
|
||||
rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue_;
|
||||
|
||||
EventsExecutorEntitiesCollector::SharedPtr entities_collector_;
|
||||
EventsExecutorNotifyWaitable::SharedPtr executor_notifier_;
|
||||
|
||||
// Timers manager
|
||||
std::shared_ptr<rclcpp::experimental::TimersManager> timers_manager_;
|
||||
};
|
||||
|
||||
} // namespace executors
|
||||
} // namespace experimental
|
||||
} // namespace rclcpp
|
||||
|
||||
#endif // RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_HPP_
|
||||
@@ -0,0 +1,348 @@
|
||||
// Copyright 2023 iRobot Corporation.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_ENTITIES_COLLECTOR_HPP_
|
||||
#define RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_ENTITIES_COLLECTOR_HPP_
|
||||
|
||||
#include <chrono>
|
||||
#include <list>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "rcl/guard_condition.h"
|
||||
#include "rcl/wait.h"
|
||||
|
||||
#include "rclcpp/callback_group.hpp"
|
||||
#include "rclcpp/experimental/executors/events_executor/events_executor_event_types.hpp"
|
||||
#include "rclcpp/experimental/timers_manager.hpp"
|
||||
#include "rclcpp/macros.hpp"
|
||||
#include "rclcpp/node_interfaces/node_base_interface.hpp"
|
||||
#include "rclcpp/visibility_control.hpp"
|
||||
#include "rclcpp/waitable.hpp"
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
namespace experimental
|
||||
{
|
||||
namespace executors
|
||||
{
|
||||
// forward declaration of EventsExecutor to avoid circular dependency
|
||||
class EventsExecutor;
|
||||
|
||||
typedef std::map<rclcpp::CallbackGroup::WeakPtr,
|
||||
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
|
||||
std::owner_less<rclcpp::CallbackGroup::WeakPtr>> WeakCallbackGroupsToNodesMap;
|
||||
|
||||
/**
|
||||
* @brief This class provides a waitable object that is used for managing the
|
||||
* entities (i.e. nodes and their subscriptions, timers, services, etc)
|
||||
* added to an EventsExecutor.
|
||||
* The add/remove node APIs are used when a node is added/removed from
|
||||
* the associated EventsExecutor and result in setting/unsetting the
|
||||
* events callbacks and adding timers to the timers manager.
|
||||
*
|
||||
* Being this class derived from Waitable, it can be used to wake up an
|
||||
* executor thread while it's spinning.
|
||||
* When this occurs, the execute API takes care of handling changes
|
||||
* in the entities currently used by the executor.
|
||||
*/
|
||||
class EventsExecutorEntitiesCollector final
|
||||
: public rclcpp::Waitable,
|
||||
public std::enable_shared_from_this<EventsExecutorEntitiesCollector>
|
||||
{
|
||||
public:
|
||||
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(EventsExecutorEntitiesCollector)
|
||||
|
||||
// Constructor
|
||||
RCLCPP_PUBLIC
|
||||
EventsExecutorEntitiesCollector(
|
||||
EventsExecutor * executor);
|
||||
|
||||
// Destructor
|
||||
RCLCPP_PUBLIC
|
||||
~EventsExecutorEntitiesCollector() override;
|
||||
|
||||
// Initialize entities collector
|
||||
RCLCPP_PUBLIC
|
||||
void init();
|
||||
|
||||
/// Execute the waitable.
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
execute(std::shared_ptr<void> & data) override;
|
||||
|
||||
/// Function to add_handles_to_wait_set and wait for work and
|
||||
/**
|
||||
* block until the wait set is ready or until the timeout has been exceeded.
|
||||
* \throws std::runtime_error if wait set couldn't be cleared or filled.
|
||||
* \throws any rcl errors from rcl_wait, \see rclcpp::exceptions::throw_from_rcl_error()
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
refresh_wait_set(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
|
||||
|
||||
/**
|
||||
* \throws std::runtime_error if it couldn't add guard condition to wait set
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
add_to_wait_set(rcl_wait_set_t * wait_set) override;
|
||||
|
||||
/// Complete all available queued work without blocking.
|
||||
/**
|
||||
* This function checks if after the guard condition was triggered
|
||||
* (or a spurious wakeup happened) we are really ready to execute
|
||||
* i.e. re-collect entities
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
is_ready(rcl_wait_set_t * wait_set) override;
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
std::shared_ptr<void>
|
||||
take_data() override;
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
std::shared_ptr<void>
|
||||
take_data_by_entity_id(size_t id) override;
|
||||
|
||||
/// Add a callback group to an executor.
|
||||
/**
|
||||
* \see rclcpp::Executor::add_callback_group
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
add_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr);
|
||||
|
||||
/// Add a callback group to an executor.
|
||||
/**
|
||||
* \see rclcpp::Executor::add_callback_group
|
||||
* \return boolean whether the node from the callback group is new
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
add_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
|
||||
|
||||
/// Remove a callback group from the executor.
|
||||
/**
|
||||
* \see rclcpp::Executor::remove_callback_group
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
remove_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr);
|
||||
|
||||
/// Remove a callback group from the executor.
|
||||
/**
|
||||
* \see rclcpp::Executor::remove_callback_group_from_map
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
remove_callback_group_from_map(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
|
||||
|
||||
/**
|
||||
* \see rclcpp::Executor::add_node()
|
||||
* \throw std::runtime_error if node was already added
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
add_node(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr);
|
||||
|
||||
/**
|
||||
* \see rclcpp::Executor::remove_node()
|
||||
* \throw std::runtime_error if no guard condition is associated with node.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
remove_node(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr);
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
get_all_callback_groups();
|
||||
|
||||
/// Get callback groups that belong to executor.
|
||||
/**
|
||||
* \see rclcpp::Executor::get_manually_added_callback_groups()
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
get_manually_added_callback_groups();
|
||||
|
||||
/// Get callback groups that belong to executor.
|
||||
/**
|
||||
* \see rclcpp::Executor::get_automatically_added_callback_groups_from_nodes()
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
get_automatically_added_callback_groups_from_nodes();
|
||||
|
||||
///
|
||||
/**
|
||||
* Get the subscription shared pointer corresponding
|
||||
* to a subscription identifier
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
rclcpp::SubscriptionBase::SharedPtr
|
||||
get_subscription(const void * subscription_id);
|
||||
|
||||
///
|
||||
/**
|
||||
* Get the client shared pointer corresponding
|
||||
* to a client identifier
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
rclcpp::ClientBase::SharedPtr
|
||||
get_client(const void * client_id);
|
||||
|
||||
///
|
||||
/**
|
||||
* Get the service shared pointer corresponding
|
||||
* to a service identifier
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
rclcpp::ServiceBase::SharedPtr
|
||||
get_service(const void * service_id);
|
||||
|
||||
///
|
||||
/**
|
||||
* Get the waitable shared pointer corresponding
|
||||
* to a waitable identifier
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
rclcpp::Waitable::SharedPtr
|
||||
get_waitable(const void * waitable_id);
|
||||
|
||||
///
|
||||
/**
|
||||
* Add a weak pointer to a waitable
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
add_waitable(rclcpp::Waitable::SharedPtr waitable);
|
||||
|
||||
private:
|
||||
/// Return true if the node belongs to the collector
|
||||
/**
|
||||
* \param[in] node_ptr a node base interface shared pointer
|
||||
* \param[in] weak_groups_to_nodes map to nodes to lookup
|
||||
* \return boolean whether a node belongs the collector
|
||||
*/
|
||||
bool
|
||||
has_node(
|
||||
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const;
|
||||
|
||||
/// Add all callback groups that can be automatically added by any executor
|
||||
/// and is not already associated with an executor from nodes
|
||||
/// that are associated with executor
|
||||
/**
|
||||
* \see rclcpp::Executor::add_callback_groups_from_nodes_associated_to_executor()
|
||||
*/
|
||||
void
|
||||
add_callback_groups_from_nodes_associated_to_executor();
|
||||
|
||||
void
|
||||
callback_group_added_impl(
|
||||
rclcpp::CallbackGroup::SharedPtr group);
|
||||
|
||||
void
|
||||
node_added_impl(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node);
|
||||
|
||||
void
|
||||
callback_group_removed_impl(
|
||||
rclcpp::CallbackGroup::SharedPtr group);
|
||||
|
||||
void
|
||||
node_removed_impl(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node);
|
||||
|
||||
void
|
||||
set_entities_event_callbacks_from_map(
|
||||
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
|
||||
|
||||
void
|
||||
set_callback_group_entities_callbacks(rclcpp::CallbackGroup::SharedPtr group);
|
||||
|
||||
void
|
||||
unset_callback_group_entities_callbacks(rclcpp::CallbackGroup::SharedPtr group);
|
||||
|
||||
void
|
||||
set_guard_condition_callback(rclcpp::GuardCondition * guard_condition);
|
||||
|
||||
void
|
||||
unset_guard_condition_callback(rclcpp::GuardCondition * guard_condition);
|
||||
|
||||
std::function<void(size_t)>
|
||||
create_entity_callback(void * exec_entity_id, ExecutorEventType type);
|
||||
|
||||
std::function<void(size_t, int)>
|
||||
create_waitable_callback(void * waitable_id);
|
||||
|
||||
typedef std::map<rclcpp::CallbackGroup::WeakPtr,
|
||||
rclcpp::GuardCondition *,
|
||||
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>
|
||||
WeakCallbackGroupsToGuardConditionsMap;
|
||||
|
||||
typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
|
||||
const rclcpp::GuardCondition *,
|
||||
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
|
||||
WeakNodesToGuardConditionsMap;
|
||||
|
||||
/// maps callback groups to guard conditions
|
||||
WeakCallbackGroupsToGuardConditionsMap weak_groups_to_guard_conditions_;
|
||||
|
||||
WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_;
|
||||
|
||||
// maps callback groups to nodes.
|
||||
WeakCallbackGroupsToNodesMap weak_groups_associated_with_executor_to_nodes_;
|
||||
// maps callback groups to nodes.
|
||||
WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_;
|
||||
|
||||
/// List of weak nodes registered in the static executor
|
||||
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> weak_nodes_;
|
||||
|
||||
// Maps: entity identifiers to weak pointers from the entities registered in the executor
|
||||
// so in the case of an event providing and ID, we can retrieve and own the corresponding
|
||||
// entity while it performs work
|
||||
std::unordered_map<const void *, rclcpp::SubscriptionBase::WeakPtr> weak_subscriptions_map_;
|
||||
std::unordered_map<const void *, rclcpp::ServiceBase::WeakPtr> weak_services_map_;
|
||||
std::unordered_map<const void *, rclcpp::ClientBase::WeakPtr> weak_clients_map_;
|
||||
std::unordered_map<const void *, rclcpp::Waitable::WeakPtr> weak_waitables_map_;
|
||||
|
||||
/// Executor using this entities collector object
|
||||
EventsExecutor * associated_executor_ = nullptr;
|
||||
/// Instance of the timers manager used by the associated executor
|
||||
rclcpp::experimental::TimersManager::SharedPtr timers_manager_;
|
||||
|
||||
// Mutex to protect vector of new nodes.
|
||||
std::recursive_mutex reentrant_mutex_;
|
||||
};
|
||||
|
||||
} // namespace executors
|
||||
} // namespace experimental
|
||||
} // namespace rclcpp
|
||||
|
||||
#endif // RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_ENTITIES_COLLECTOR_HPP_
|
||||
@@ -0,0 +1,46 @@
|
||||
// Copyright 2023 iRobot Corporation.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_EVENT_TYPES_HPP_
|
||||
#define RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_EVENT_TYPES_HPP_
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
namespace experimental
|
||||
{
|
||||
namespace executors
|
||||
{
|
||||
|
||||
enum ExecutorEventType
|
||||
{
|
||||
CLIENT_EVENT,
|
||||
SUBSCRIPTION_EVENT,
|
||||
SERVICE_EVENT,
|
||||
TIMER_EVENT,
|
||||
WAITABLE_EVENT
|
||||
};
|
||||
|
||||
struct ExecutorEvent
|
||||
{
|
||||
const void * exec_entity_id;
|
||||
int gen_entity_id;
|
||||
ExecutorEventType type;
|
||||
size_t num_events;
|
||||
};
|
||||
|
||||
} // namespace executors
|
||||
} // namespace experimental
|
||||
} // namespace rclcpp
|
||||
|
||||
#endif // RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_EVENT_TYPES_HPP_
|
||||
@@ -0,0 +1,103 @@
|
||||
// Copyright 2023 iRobot Corporation.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_NOTIFY_WAITABLE_HPP_
|
||||
#define RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_NOTIFY_WAITABLE_HPP_
|
||||
|
||||
#include <list>
|
||||
#include <memory>
|
||||
|
||||
#include "rcl/guard_condition.h"
|
||||
#include "rclcpp/experimental/executors/events_executor/event_waitable.hpp"
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
namespace experimental
|
||||
{
|
||||
namespace executors
|
||||
{
|
||||
|
||||
/**
|
||||
* @brief This class provides an EventWaitable that allows to
|
||||
* wake up an EventsExecutor when a guard condition is notified.
|
||||
*/
|
||||
class EventsExecutorNotifyWaitable final : public EventWaitable
|
||||
{
|
||||
public:
|
||||
RCLCPP_SMART_PTR_DEFINITIONS(EventsExecutorNotifyWaitable)
|
||||
|
||||
// Constructor
|
||||
RCLCPP_PUBLIC
|
||||
EventsExecutorNotifyWaitable() = default;
|
||||
|
||||
// Destructor
|
||||
RCLCPP_PUBLIC
|
||||
virtual ~EventsExecutorNotifyWaitable() = default;
|
||||
|
||||
// The function is a no-op, since we only care of waking up the executor
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
execute(std::shared_ptr<void> & data) override
|
||||
{
|
||||
(void)data;
|
||||
}
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
add_guard_condition(rclcpp::GuardCondition * guard_condition)
|
||||
{
|
||||
notify_guard_conditions_.push_back(guard_condition);
|
||||
}
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
set_on_ready_callback(std::function<void(size_t, int)> callback) override
|
||||
{
|
||||
// The second argument of the callback should identify which guard condition
|
||||
// triggered the event. However it's not relevant here as we only
|
||||
// care about waking up the executor, so we pass a 0.
|
||||
auto gc_callback = [callback](size_t count) {
|
||||
callback(count, 0);
|
||||
};
|
||||
|
||||
for (auto gc : notify_guard_conditions_) {
|
||||
gc->set_on_trigger_callback(gc_callback);
|
||||
}
|
||||
}
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
std::shared_ptr<void>
|
||||
take_data() override
|
||||
{
|
||||
// This waitable doesn't handle any data
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
std::shared_ptr<void>
|
||||
take_data_by_entity_id(size_t id) override
|
||||
{
|
||||
(void) id;
|
||||
return take_data();
|
||||
}
|
||||
|
||||
private:
|
||||
std::list<rclcpp::GuardCondition *> notify_guard_conditions_;
|
||||
};
|
||||
|
||||
} // namespace executors
|
||||
} // namespace experimental
|
||||
} // namespace rclcpp
|
||||
|
||||
#endif // RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_NOTIFY_WAITABLE_HPP_
|
||||
@@ -0,0 +1,100 @@
|
||||
// Copyright 2023 iRobot Corporation.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_QUEUE_HPP_
|
||||
#define RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_QUEUE_HPP_
|
||||
|
||||
#include <queue>
|
||||
|
||||
#include "rclcpp/macros.hpp"
|
||||
#include "rclcpp/visibility_control.hpp"
|
||||
|
||||
#include "rclcpp/experimental/executors/events_executor/events_executor_event_types.hpp"
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
namespace experimental
|
||||
{
|
||||
namespace executors
|
||||
{
|
||||
|
||||
/**
|
||||
* @brief This abstract class can be used to implement different types of queues
|
||||
* where `ExecutorEvent` can be stored.
|
||||
* The derived classes should choose which underlying container to use and
|
||||
* the strategy for pushing and popping events.
|
||||
* For example a queue implementation may be bounded or unbounded and have
|
||||
* different pruning strategies.
|
||||
* Implementations may or may not check the validity of events and decide how to handle
|
||||
* the situation where an event is not valid anymore (e.g. a subscription history cache overruns)
|
||||
*/
|
||||
class EventsQueue
|
||||
{
|
||||
public:
|
||||
RCLCPP_SMART_PTR_ALIASES_ONLY(EventsQueue)
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
EventsQueue() = default;
|
||||
|
||||
/**
|
||||
* @brief Destruct the object.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
virtual ~EventsQueue() = default;
|
||||
|
||||
/**
|
||||
* @brief push event into the queue
|
||||
* @param event The event to push into the queue
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
virtual
|
||||
void
|
||||
enqueue(const rclcpp::experimental::executors::ExecutorEvent & event) = 0;
|
||||
|
||||
/**
|
||||
* @brief Extracts an event from the queue, eventually waiting until timeout
|
||||
* if none is available.
|
||||
* @return true if event has been found, false if timeout
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
virtual
|
||||
bool
|
||||
dequeue(
|
||||
rclcpp::experimental::executors::ExecutorEvent & event,
|
||||
std::chrono::nanoseconds timeout = std::chrono::nanoseconds::max()) = 0;
|
||||
|
||||
/**
|
||||
* @brief Test whether queue is empty
|
||||
* @return true if the queue's size is 0, false otherwise.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
virtual
|
||||
bool
|
||||
empty() const = 0;
|
||||
|
||||
/**
|
||||
* @brief Returns the number of elements in the queue.
|
||||
* @return the number of elements in the queue.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
virtual
|
||||
size_t
|
||||
size() const = 0;
|
||||
};
|
||||
|
||||
} // namespace executors
|
||||
} // namespace experimental
|
||||
} // namespace rclcpp
|
||||
|
||||
#endif // RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_QUEUE_HPP_
|
||||
@@ -0,0 +1,134 @@
|
||||
// Copyright 2023 iRobot Corporation.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__SIMPLE_EVENTS_QUEUE_HPP_
|
||||
#define RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__SIMPLE_EVENTS_QUEUE_HPP_
|
||||
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
#include <utility>
|
||||
|
||||
#include "rclcpp/experimental/executors/events_executor/events_queue.hpp"
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
namespace experimental
|
||||
{
|
||||
namespace executors
|
||||
{
|
||||
|
||||
/**
|
||||
* @brief This class implements an EventsQueue as a simple wrapper around a std::queue.
|
||||
* It does not perform any checks about the size of queue, which can grow
|
||||
* unbounded without being pruned.
|
||||
* The simplicity of this implementation makes it suitable for optimizing CPU usage.
|
||||
*/
|
||||
class SimpleEventsQueue : public EventsQueue
|
||||
{
|
||||
public:
|
||||
RCLCPP_PUBLIC
|
||||
~SimpleEventsQueue() override = default;
|
||||
|
||||
/**
|
||||
* @brief enqueue event into the queue
|
||||
* Thread safe
|
||||
* @param event The event to enqueue into the queue
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
enqueue(const rclcpp::experimental::executors::ExecutorEvent & event) override
|
||||
{
|
||||
rclcpp::experimental::executors::ExecutorEvent single_event = event;
|
||||
single_event.num_events = 1;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
for (size_t ev = 0; ev < event.num_events; ev++) {
|
||||
event_queue_.push(single_event);
|
||||
}
|
||||
}
|
||||
events_queue_cv_.notify_one();
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief waits for an event until timeout, gets a single event
|
||||
* Thread safe
|
||||
* @return true if event, false if timeout
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
dequeue(
|
||||
rclcpp::experimental::executors::ExecutorEvent & event,
|
||||
std::chrono::nanoseconds timeout = std::chrono::nanoseconds::max()) override
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
|
||||
// Initialize to true because it's only needed if we have a valid timeout
|
||||
bool has_data = true;
|
||||
if (timeout != std::chrono::nanoseconds::max()) {
|
||||
has_data =
|
||||
events_queue_cv_.wait_for(lock, timeout, [this]() {return !event_queue_.empty();});
|
||||
} else {
|
||||
events_queue_cv_.wait(lock, [this]() {return !event_queue_.empty();});
|
||||
}
|
||||
|
||||
if (has_data) {
|
||||
event = event_queue_.front();
|
||||
event_queue_.pop();
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Test whether queue is empty
|
||||
* Thread safe
|
||||
* @return true if the queue's size is 0, false otherwise.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
empty() const override
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
return event_queue_.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Returns the number of elements in the queue.
|
||||
* Thread safe
|
||||
* @return the number of elements in the queue.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
size_t
|
||||
size() const override
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
return event_queue_.size();
|
||||
}
|
||||
|
||||
private:
|
||||
// The underlying queue implementation
|
||||
std::queue<rclcpp::experimental::executors::ExecutorEvent> event_queue_;
|
||||
// Mutex to protect the insertion/extraction of events in the queue
|
||||
mutable std::mutex mutex_;
|
||||
// Variable used to notify when an event is added to the queue
|
||||
std::condition_variable events_queue_cv_;
|
||||
};
|
||||
|
||||
} // namespace executors
|
||||
} // namespace experimental
|
||||
} // namespace rclcpp
|
||||
|
||||
#endif // RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__SIMPLE_EVENTS_QUEUE_HPP_
|
||||
539
rclcpp/include/rclcpp/experimental/timers_manager.hpp
Normal file
539
rclcpp/include/rclcpp/experimental/timers_manager.hpp
Normal file
@@ -0,0 +1,539 @@
|
||||
// Copyright 2023 iRobot Corporation.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef RCLCPP__EXPERIMENTAL__TIMERS_MANAGER_HPP_
|
||||
#define RCLCPP__EXPERIMENTAL__TIMERS_MANAGER_HPP_
|
||||
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "rclcpp/context.hpp"
|
||||
#include "rclcpp/timer.hpp"
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
namespace experimental
|
||||
{
|
||||
|
||||
/**
|
||||
* @brief This class provides a way for storing and executing timer objects.
|
||||
* It provides APIs to suit the needs of different applications and execution models.
|
||||
* All public APIs provided by this class are thread-safe.
|
||||
*
|
||||
* Timers management
|
||||
* This class provides APIs to add/remove timers to/from an internal storage.
|
||||
* It keeps a list of weak pointers from added timers, and locks them only when
|
||||
* they need to be executed or modified.
|
||||
* Timers are kept ordered in a binary-heap priority queue.
|
||||
* Calls to add/remove APIs will temporarily block the execution of the timers and
|
||||
* will require to reorder the internal priority queue.
|
||||
* Because of this, they have a not-negligible impact on the performance.
|
||||
*
|
||||
* Timers execution
|
||||
* The most efficient implementation consists in letting a TimersManager object
|
||||
* to spawn a thread where timers are monitored and periodically executed.
|
||||
* Besides this, other APIs allow to either execute a single timer or all the
|
||||
* currently ready ones.
|
||||
* This class assumes that the `execute_callback()` API of the stored timers is never
|
||||
* called by other entities, but it can only be called from here.
|
||||
* If this assumption is not respected, the heap property may be invalidated,
|
||||
* so timers may be executed out of order, without this object noticing it.
|
||||
*
|
||||
*/
|
||||
class TimersManager
|
||||
{
|
||||
public:
|
||||
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(TimersManager)
|
||||
|
||||
/**
|
||||
* @brief Construct a new TimersManager object
|
||||
*
|
||||
* @param context custom context to be used.
|
||||
* Shared ownership of the context is held until destruction.
|
||||
* @param on_ready_callback The timers on ready callback. if not callable,
|
||||
* this object will directly execute timers when they are ready.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
TimersManager(
|
||||
std::shared_ptr<rclcpp::Context> context,
|
||||
std::function<void(void *)> on_ready_callback = nullptr);
|
||||
|
||||
/**
|
||||
* @brief Destruct the TimersManager object making sure to stop thread and release memory.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
~TimersManager();
|
||||
|
||||
/**
|
||||
* @brief Adds a new timer to the storage, maintaining weak ownership of it.
|
||||
* Function is thread safe and it can be called regardless of the state of the timers thread.
|
||||
*
|
||||
* @param timer the timer to add.
|
||||
* @throws std::invalid_argument if timer is a nullptr.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void add_timer(rclcpp::TimerBase::SharedPtr timer);
|
||||
|
||||
/**
|
||||
* @brief Remove a single timer from the object storage.
|
||||
* Will do nothing if the timer was not being stored here.
|
||||
* Function is thread safe and it can be called regardless of the state of the timers thread.
|
||||
*
|
||||
* @param timer the timer to remove.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void remove_timer(rclcpp::TimerBase::SharedPtr timer);
|
||||
|
||||
/**
|
||||
* @brief Remove all the timers stored in the object.
|
||||
* Function is thread safe and it can be called regardless of the state of the timers thread.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void clear();
|
||||
|
||||
/**
|
||||
* @brief Starts a thread that takes care of executing the timers stored in this object.
|
||||
* Function will throw an error if the timers thread was already running.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void start();
|
||||
|
||||
/**
|
||||
* @brief Stops the timers thread.
|
||||
* Will do nothing if the timer thread was not running.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void stop();
|
||||
|
||||
/**
|
||||
* @brief Executes all the timers currently ready when the function was invoked.
|
||||
* This function will lock all the stored timers throughout its duration.
|
||||
* This function is thread safe.
|
||||
* @throws std::runtime_error if the timers thread was already running.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void execute_ready_timers();
|
||||
|
||||
/**
|
||||
* @brief Get the number of timers that are currently ready.
|
||||
* This function is thread safe.
|
||||
*
|
||||
* @return size_t number of ready timers.
|
||||
* @throws std::runtime_error if the timers thread was already running.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
size_t get_number_ready_timers();
|
||||
|
||||
/**
|
||||
* @brief Executes head timer if ready.
|
||||
* This function is thread safe.
|
||||
*
|
||||
* @return true if head timer was ready.
|
||||
* @throws std::runtime_error if the timers thread was already running.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool execute_head_timer();
|
||||
|
||||
/**
|
||||
* @brief Executes timer identified by its ID.
|
||||
* This function is thread safe.
|
||||
*
|
||||
* @param timer_id the timer ID of the timer to execute
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void execute_ready_timer(const void * timer_id);
|
||||
|
||||
/**
|
||||
* @brief Get the amount of time before the next timer triggers.
|
||||
* This function is thread safe.
|
||||
*
|
||||
* @return std::chrono::nanoseconds to wait,
|
||||
* the returned value could be negative if the timer is already expired
|
||||
* or std::chrono::nanoseconds::max() if there are no timers stored in the object.
|
||||
* @throws std::runtime_error if the timers thread was already running.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
std::chrono::nanoseconds get_head_timeout();
|
||||
|
||||
private:
|
||||
RCLCPP_DISABLE_COPY(TimersManager)
|
||||
|
||||
using TimerPtr = rclcpp::TimerBase::SharedPtr;
|
||||
using WeakTimerPtr = rclcpp::TimerBase::WeakPtr;
|
||||
|
||||
// Forward declaration
|
||||
class TimersHeap;
|
||||
|
||||
/**
|
||||
* @brief This class allows to store weak pointers to timers in a heap-like data structure.
|
||||
* The root of the heap is the timer that triggers first.
|
||||
* Since this class uses weak ownership, it is not guaranteed that it represents a valid heap
|
||||
* at any point in time as timers could go out of scope, thus invalidating it.
|
||||
* The "validate_and_lock" API allows to restore the heap property and also returns a locked version
|
||||
* of the timers heap.
|
||||
* This class is not thread safe and requires external mutexes to protect its usage.
|
||||
*/
|
||||
class WeakTimersHeap
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* @brief Add a new timer to the heap. After the addition, the heap property is enforced.
|
||||
*
|
||||
* @param timer new timer to add.
|
||||
* @return true if timer has been added, false if it was already there.
|
||||
*/
|
||||
bool add_timer(TimerPtr timer)
|
||||
{
|
||||
TimersHeap locked_heap = this->validate_and_lock();
|
||||
bool added = locked_heap.add_timer(std::move(timer));
|
||||
|
||||
if (added) {
|
||||
// Re-create the weak heap with the new timer added
|
||||
this->store(locked_heap);
|
||||
}
|
||||
|
||||
return added;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Remove a timer from the heap. After the removal, the heap property is enforced.
|
||||
*
|
||||
* @param timer timer to remove.
|
||||
* @return true if timer has been removed, false if it was not there.
|
||||
*/
|
||||
bool remove_timer(TimerPtr timer)
|
||||
{
|
||||
TimersHeap locked_heap = this->validate_and_lock();
|
||||
bool removed = locked_heap.remove_timer(std::move(timer));
|
||||
|
||||
if (removed) {
|
||||
// Re-create the weak heap with the timer removed
|
||||
this->store(locked_heap);
|
||||
}
|
||||
|
||||
return removed;
|
||||
}
|
||||
|
||||
TimerPtr get_timer(const void * timer_id)
|
||||
{
|
||||
for (auto & weak_timer : weak_heap_) {
|
||||
auto timer = weak_timer.lock();
|
||||
if (timer.get() == timer_id) {
|
||||
return timer;
|
||||
}
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Returns a const reference to the front element.
|
||||
*/
|
||||
const WeakTimerPtr & front() const
|
||||
{
|
||||
return weak_heap_.front();
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Returns whether the heap is empty or not.
|
||||
*/
|
||||
bool empty() const
|
||||
{
|
||||
return weak_heap_.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief This function restores the current object as a valid heap
|
||||
* and it returns a locked version of it.
|
||||
* Timers that went out of scope are removed from the container.
|
||||
* It is the only public API to access and manipulate the stored timers.
|
||||
*
|
||||
* @return TimersHeap owned timers corresponding to the current object
|
||||
*/
|
||||
TimersHeap validate_and_lock()
|
||||
{
|
||||
TimersHeap locked_heap;
|
||||
bool any_timer_destroyed = false;
|
||||
|
||||
for (auto weak_timer : weak_heap_) {
|
||||
auto timer = weak_timer.lock();
|
||||
if (timer) {
|
||||
// This timer is valid, so add it to the locked heap
|
||||
// Note: we access friend private `owned_heap_` member field.
|
||||
locked_heap.owned_heap_.push_back(std::move(timer));
|
||||
} else {
|
||||
// This timer went out of scope, so we don't add it to locked heap
|
||||
// and we mark the corresponding flag.
|
||||
// It's not needed to erase it from weak heap, as we are going to re-heapify.
|
||||
// Note: we can't exit from the loop here, as we need to find all valid timers.
|
||||
any_timer_destroyed = true;
|
||||
}
|
||||
}
|
||||
|
||||
// If a timer has gone out of scope, then the remaining elements do not represent
|
||||
// a valid heap anymore. We need to re-heapify the timers heap.
|
||||
if (any_timer_destroyed) {
|
||||
locked_heap.heapify();
|
||||
// Re-create the weak heap now that elements have been heapified again
|
||||
this->store(locked_heap);
|
||||
}
|
||||
|
||||
return locked_heap;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief This function allows to recreate the heap of weak pointers
|
||||
* from an heap of owned pointers.
|
||||
* It is required to be called after a locked TimersHeap generated from this object
|
||||
* has been modified in any way (e.g. timers triggered, added, removed).
|
||||
*
|
||||
* @param heap timers heap to store as weak pointers
|
||||
*/
|
||||
void store(const TimersHeap & heap)
|
||||
{
|
||||
weak_heap_.clear();
|
||||
// Note: we access friend private `owned_heap_` member field.
|
||||
for (auto t : heap.owned_heap_) {
|
||||
weak_heap_.push_back(t);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Remove all timers from the heap.
|
||||
*/
|
||||
void clear()
|
||||
{
|
||||
weak_heap_.clear();
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<WeakTimerPtr> weak_heap_;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief This class is the equivalent of WeakTimersHeap but with ownership of the timers.
|
||||
* It can be generated by locking the weak version.
|
||||
* It provides operations to manipulate the heap.
|
||||
* This class is not thread safe and requires external mutexes to protect its usage.
|
||||
*/
|
||||
class TimersHeap
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* @brief Try to add a new timer to the heap.
|
||||
* After the addition, the heap property is preserved.
|
||||
* @param timer new timer to add.
|
||||
* @return true if timer has been added, false if it was already there.
|
||||
*/
|
||||
bool add_timer(TimerPtr timer)
|
||||
{
|
||||
// Nothing to do if the timer is already stored here
|
||||
auto it = std::find(owned_heap_.begin(), owned_heap_.end(), timer);
|
||||
if (it != owned_heap_.end()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
owned_heap_.push_back(std::move(timer));
|
||||
std::push_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Try to remove a timer from the heap.
|
||||
* After the removal, the heap property is preserved.
|
||||
* @param timer timer to remove.
|
||||
* @return true if timer has been removed, false if it was not there.
|
||||
*/
|
||||
bool remove_timer(TimerPtr timer)
|
||||
{
|
||||
// Nothing to do if the timer is not stored here
|
||||
auto it = std::find(owned_heap_.begin(), owned_heap_.end(), timer);
|
||||
if (it == owned_heap_.end()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
owned_heap_.erase(it);
|
||||
this->heapify();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Returns a reference to the front element.
|
||||
* @return reference to front element.
|
||||
*/
|
||||
TimerPtr & front()
|
||||
{
|
||||
return owned_heap_.front();
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Returns a const reference to the front element.
|
||||
* @return const reference to front element.
|
||||
*/
|
||||
const TimerPtr & front() const
|
||||
{
|
||||
return owned_heap_.front();
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Returns whether the heap is empty or not.
|
||||
* @return true if the heap is empty.
|
||||
*/
|
||||
bool empty() const
|
||||
{
|
||||
return owned_heap_.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Returns the size of the heap.
|
||||
* @return the number of valid timers in the heap.
|
||||
*/
|
||||
size_t size() const
|
||||
{
|
||||
return owned_heap_.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Get the number of timers that are currently ready.
|
||||
* @return size_t number of ready timers.
|
||||
*/
|
||||
size_t get_number_ready_timers() const
|
||||
{
|
||||
size_t ready_timers = 0;
|
||||
|
||||
for (TimerPtr t : owned_heap_) {
|
||||
if (t->is_ready()) {
|
||||
ready_timers++;
|
||||
}
|
||||
}
|
||||
|
||||
return ready_timers;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Restore a valid heap after the root value has been replaced (e.g. timer triggered).
|
||||
*/
|
||||
void heapify_root()
|
||||
{
|
||||
// The following code is a more efficient version than doing
|
||||
// pop_heap, pop_back, push_back, push_heap
|
||||
// as it removes the need for the last push_heap
|
||||
|
||||
// Push the modified element (i.e. the current root) at the bottom of the heap
|
||||
owned_heap_.push_back(owned_heap_[0]);
|
||||
// Exchange first and last-1 elements and reheapify
|
||||
std::pop_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater);
|
||||
// Remove last element
|
||||
owned_heap_.pop_back();
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Completely restores the structure to a valid heap
|
||||
*/
|
||||
void heapify()
|
||||
{
|
||||
std::make_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater);
|
||||
}
|
||||
|
||||
void clear_callbacks()
|
||||
{
|
||||
for (TimerPtr & t : owned_heap_) {
|
||||
t->clear_on_reset_callback();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Friend declaration to allow the `validate_and_lock()` function to access the
|
||||
* underlying heap container
|
||||
*/
|
||||
friend TimersHeap WeakTimersHeap::validate_and_lock();
|
||||
|
||||
/**
|
||||
* @brief Friend declaration to allow the `store()` function to access the
|
||||
* underlying heap container
|
||||
*/
|
||||
friend void WeakTimersHeap::store(const TimersHeap & heap);
|
||||
|
||||
private:
|
||||
/**
|
||||
* @brief Comparison function between timers.
|
||||
* @return true if `a` triggers after `b`.
|
||||
*/
|
||||
static bool timer_greater(TimerPtr a, TimerPtr b)
|
||||
{
|
||||
// FIXME!
|
||||
return a->time_until_trigger() > b->time_until_trigger();
|
||||
}
|
||||
|
||||
std::vector<TimerPtr> owned_heap_;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Implements a loop that keeps executing ready timers.
|
||||
* This function is executed in the timers thread.
|
||||
*/
|
||||
void run_timers();
|
||||
|
||||
/**
|
||||
* @brief Get the amount of time before the next timer triggers.
|
||||
* This function is not thread safe, acquire a mutex before calling it.
|
||||
*
|
||||
* @return std::chrono::nanoseconds to wait,
|
||||
* the returned value could be negative if the timer is already expired
|
||||
* or std::chrono::nanoseconds::max() if the heap is empty.
|
||||
* This function is not thread safe, acquire the timers_mutex_ before calling it.
|
||||
*/
|
||||
std::chrono::nanoseconds get_head_timeout_unsafe();
|
||||
|
||||
/**
|
||||
* @brief Executes all the timers currently ready when the function is invoked
|
||||
* while keeping the heap correctly sorted.
|
||||
* This function is not thread safe, acquire the timers_mutex_ before calling it.
|
||||
*/
|
||||
void execute_ready_timers_unsafe();
|
||||
|
||||
// Callback to be called when timer is ready
|
||||
std::function<void(void *)> on_ready_callback_ = nullptr;
|
||||
|
||||
// Thread used to run the timers execution task
|
||||
std::thread timers_thread_;
|
||||
// Protects access to timers
|
||||
std::mutex timers_mutex_;
|
||||
// Protects access to stop()
|
||||
std::mutex stop_mutex_;
|
||||
// Notifies the timers thread whenever timers are added/removed
|
||||
std::condition_variable timers_cv_;
|
||||
// Flag used as predicate by timers_cv_ that denotes one or more timers being added/removed
|
||||
bool timers_updated_ {false};
|
||||
// Indicates whether the timers thread is currently running or not
|
||||
std::atomic<bool> running_ {false};
|
||||
// Parent context used to understand if ROS is still active
|
||||
std::shared_ptr<rclcpp::Context> context_;
|
||||
// Timers heap storage with weak ownership
|
||||
WeakTimersHeap weak_timers_heap_;
|
||||
};
|
||||
|
||||
} // namespace experimental
|
||||
} // namespace rclcpp
|
||||
|
||||
#endif // RCLCPP__EXPERIMENTAL__TIMERS_MANAGER_HPP_
|
||||
@@ -0,0 +1,327 @@
|
||||
// Copyright 2023 iRobot Corporation.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "rclcpp/experimental/executors/events_executor/events_executor.hpp"
|
||||
|
||||
#include <memory>
|
||||
#include <queue>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "rcpputils/scope_exit.hpp"
|
||||
|
||||
#include "rclcpp/exceptions/exceptions.hpp"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
using rclcpp::experimental::executors::EventsExecutor;
|
||||
|
||||
EventsExecutor::EventsExecutor(
|
||||
rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue,
|
||||
bool execute_timers_separate_thread,
|
||||
const rclcpp::ExecutorOptions & options)
|
||||
: rclcpp::Executor(options)
|
||||
{
|
||||
// Get ownership of the queue used to store events.
|
||||
if (!events_queue) {
|
||||
throw std::invalid_argument("events_queue can't be a null pointer");
|
||||
}
|
||||
events_queue_ = std::move(events_queue);
|
||||
|
||||
// Create timers manager
|
||||
std::function<void(void *)> timer_on_ready_cb = nullptr;
|
||||
if (execute_timers_separate_thread) {
|
||||
timer_on_ready_cb = [this](const void * timer_id) {
|
||||
ExecutorEvent event = {timer_id, -1, ExecutorEventType::TIMER_EVENT, 1};
|
||||
this->events_queue_->enqueue(event);
|
||||
};
|
||||
}
|
||||
timers_manager_ =
|
||||
std::make_shared<rclcpp::experimental::TimersManager>(context_, timer_on_ready_cb);
|
||||
|
||||
// Create entities collector
|
||||
entities_collector_ = std::make_shared<EventsExecutorEntitiesCollector>(this);
|
||||
entities_collector_->init();
|
||||
|
||||
// Setup the executor notifier to wake up the executor when some guard conditions are tiggered.
|
||||
// The added guard conditions are guaranteed to not go out of scope before the executor itself.
|
||||
executor_notifier_ = std::make_shared<EventsExecutorNotifyWaitable>();
|
||||
executor_notifier_->add_guard_condition(shutdown_guard_condition_.get());
|
||||
executor_notifier_->add_guard_condition(&interrupt_guard_condition_);
|
||||
|
||||
entities_collector_->add_waitable(executor_notifier_);
|
||||
}
|
||||
|
||||
EventsExecutor::~EventsExecutor()
|
||||
{
|
||||
spinning.store(false);
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutor::spin()
|
||||
{
|
||||
if (spinning.exchange(true)) {
|
||||
throw std::runtime_error("spin() called while already spinning");
|
||||
}
|
||||
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
|
||||
|
||||
timers_manager_->start();
|
||||
RCPPUTILS_SCOPE_EXIT(timers_manager_->stop(); );
|
||||
|
||||
while (rclcpp::ok(context_) && spinning.load()) {
|
||||
// Wait until we get an event
|
||||
ExecutorEvent event;
|
||||
bool has_event = events_queue_->dequeue(event);
|
||||
if (has_event) {
|
||||
this->execute_event(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutor::spin_some(std::chrono::nanoseconds max_duration)
|
||||
{
|
||||
return this->spin_some_impl(max_duration, false);
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutor::spin_all(std::chrono::nanoseconds max_duration)
|
||||
{
|
||||
if (max_duration <= 0ns) {
|
||||
throw std::invalid_argument("max_duration must be positive");
|
||||
}
|
||||
return this->spin_some_impl(max_duration, true);
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
|
||||
{
|
||||
if (spinning.exchange(true)) {
|
||||
throw std::runtime_error("spin_some() called while already spinning");
|
||||
}
|
||||
|
||||
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
|
||||
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
|
||||
auto max_duration_not_elapsed = [max_duration, start]() {
|
||||
if (std::chrono::nanoseconds(0) == max_duration) {
|
||||
// told to spin forever if need be
|
||||
return true;
|
||||
} else if (std::chrono::steady_clock::now() - start < max_duration) {
|
||||
// told to spin only for some maximum amount of time
|
||||
return true;
|
||||
}
|
||||
// spun too long
|
||||
return false;
|
||||
};
|
||||
|
||||
// Get the number of events and timers ready at start
|
||||
const size_t ready_events_at_start = events_queue_->size();
|
||||
size_t executed_events = 0;
|
||||
const size_t ready_timers_at_start = timers_manager_->get_number_ready_timers();
|
||||
size_t executed_timers = 0;
|
||||
|
||||
while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
|
||||
// Execute first ready event from queue if exists
|
||||
if (exhaustive || (executed_events < ready_events_at_start)) {
|
||||
bool has_event = !events_queue_->empty();
|
||||
|
||||
if (has_event) {
|
||||
ExecutorEvent event;
|
||||
bool ret = events_queue_->dequeue(event, std::chrono::nanoseconds(0));
|
||||
if (ret) {
|
||||
this->execute_event(event);
|
||||
executed_events++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Execute first timer if it is ready
|
||||
if (exhaustive || (executed_timers < ready_timers_at_start)) {
|
||||
bool timer_executed = timers_manager_->execute_head_timer();
|
||||
if (timer_executed) {
|
||||
executed_timers++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// If there's no more work available, exit
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
|
||||
{
|
||||
// In this context a negative input timeout means no timeout
|
||||
if (timeout < 0ns) {
|
||||
timeout = std::chrono::nanoseconds::max();
|
||||
}
|
||||
|
||||
// Select the smallest between input timeout and timer timeout
|
||||
bool is_timer_timeout = false;
|
||||
auto next_timer_timeout = timers_manager_->get_head_timeout();
|
||||
if (next_timer_timeout < timeout) {
|
||||
timeout = next_timer_timeout;
|
||||
is_timer_timeout = true;
|
||||
}
|
||||
|
||||
ExecutorEvent event;
|
||||
bool has_event = events_queue_->dequeue(event, timeout);
|
||||
|
||||
// If we wake up from the wait with an event, it means that it
|
||||
// arrived before any of the timers expired.
|
||||
if (has_event) {
|
||||
this->execute_event(event);
|
||||
} else if (is_timer_timeout) {
|
||||
timers_manager_->execute_head_timer();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutor::add_node(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
|
||||
{
|
||||
// This field is unused because we don't have to wake up the executor when a node is added.
|
||||
(void) notify;
|
||||
|
||||
// Add node to entities collector
|
||||
entities_collector_->add_node(node_ptr);
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutor::add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
|
||||
{
|
||||
this->add_node(node_ptr->get_node_base_interface(), notify);
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutor::remove_node(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
|
||||
{
|
||||
// This field is unused because we don't have to wake up the executor when a node is removed.
|
||||
(void)notify;
|
||||
|
||||
// Remove node from entities collector.
|
||||
// This will result in un-setting all the event callbacks from its entities.
|
||||
// After this function returns, this executor will not receive any more events associated
|
||||
// to these entities.
|
||||
entities_collector_->remove_node(node_ptr);
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutor::remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
|
||||
{
|
||||
this->remove_node(node_ptr->get_node_base_interface(), notify);
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutor::execute_event(const ExecutorEvent & event)
|
||||
{
|
||||
switch (event.type) {
|
||||
case ExecutorEventType::CLIENT_EVENT:
|
||||
{
|
||||
auto client = entities_collector_->get_client(event.exec_entity_id);
|
||||
|
||||
if (client) {
|
||||
for (size_t i = 0; i < event.num_events; i++) {
|
||||
execute_client(client);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ExecutorEventType::SUBSCRIPTION_EVENT:
|
||||
{
|
||||
auto subscription = entities_collector_->get_subscription(event.exec_entity_id);
|
||||
|
||||
if (subscription) {
|
||||
for (size_t i = 0; i < event.num_events; i++) {
|
||||
execute_subscription(subscription);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ExecutorEventType::SERVICE_EVENT:
|
||||
{
|
||||
auto service = entities_collector_->get_service(event.exec_entity_id);
|
||||
|
||||
if (service) {
|
||||
for (size_t i = 0; i < event.num_events; i++) {
|
||||
execute_service(service);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ExecutorEventType::TIMER_EVENT:
|
||||
{
|
||||
timers_manager_->execute_ready_timer(event.exec_entity_id);
|
||||
break;
|
||||
}
|
||||
case ExecutorEventType::WAITABLE_EVENT:
|
||||
{
|
||||
auto waitable = entities_collector_->get_waitable(event.exec_entity_id);
|
||||
|
||||
if (waitable) {
|
||||
for (size_t i = 0; i < event.num_events; i++) {
|
||||
auto data = waitable->take_data_by_entity_id(event.gen_entity_id);
|
||||
waitable->execute(data);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutor::add_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
bool notify)
|
||||
{
|
||||
// This field is unused because we don't have to wake up
|
||||
// the executor when a callback group is added.
|
||||
(void)notify;
|
||||
entities_collector_->add_callback_group(group_ptr, node_ptr);
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutor::remove_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify)
|
||||
{
|
||||
// This field is unused because we don't have to wake up
|
||||
// the executor when a callback group is removed.
|
||||
(void)notify;
|
||||
entities_collector_->remove_callback_group(group_ptr);
|
||||
}
|
||||
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
EventsExecutor::get_all_callback_groups()
|
||||
{
|
||||
return entities_collector_->get_all_callback_groups();
|
||||
}
|
||||
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
EventsExecutor::get_manually_added_callback_groups()
|
||||
{
|
||||
return entities_collector_->get_manually_added_callback_groups();
|
||||
}
|
||||
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
EventsExecutor::get_automatically_added_callback_groups_from_nodes()
|
||||
{
|
||||
return entities_collector_->get_automatically_added_callback_groups_from_nodes();
|
||||
}
|
||||
@@ -0,0 +1,699 @@
|
||||
// Copyright 2023 iRobot Corporation.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "rclcpp/experimental/executors/events_executor/events_executor_entities_collector.hpp"
|
||||
|
||||
#include <algorithm>
|
||||
#include <memory>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "rclcpp/memory_strategy.hpp"
|
||||
#include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp"
|
||||
#include "rclcpp/experimental/executors/events_executor/events_executor.hpp"
|
||||
|
||||
using rclcpp::experimental::executors::ExecutorEvent;
|
||||
using rclcpp::experimental::executors::ExecutorEventType;
|
||||
using rclcpp::experimental::executors::EventsExecutorEntitiesCollector;
|
||||
|
||||
EventsExecutorEntitiesCollector::EventsExecutorEntitiesCollector(
|
||||
rclcpp::experimental::executors::EventsExecutor * executor)
|
||||
{
|
||||
if (executor == nullptr) {
|
||||
throw std::runtime_error("Received nullptr executor in EventsExecutorEntitiesCollector.");
|
||||
}
|
||||
|
||||
associated_executor_ = executor;
|
||||
timers_manager_ = associated_executor_->timers_manager_;
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutorEntitiesCollector::init()
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
|
||||
// Add the EventsExecutorEntitiesCollector shared_ptr to waitables map
|
||||
weak_waitables_map_.emplace(this, this->shared_from_this());
|
||||
}
|
||||
|
||||
EventsExecutorEntitiesCollector::~EventsExecutorEntitiesCollector()
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
|
||||
|
||||
// Disassociate all callback groups and thus nodes.
|
||||
for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) {
|
||||
auto group = pair.first.lock();
|
||||
if (group) {
|
||||
std::atomic_bool & has_executor = group->get_associated_with_executor_atomic();
|
||||
has_executor.store(false);
|
||||
callback_group_removed_impl(group);
|
||||
}
|
||||
}
|
||||
for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) {
|
||||
auto group = pair.first.lock();
|
||||
if (group) {
|
||||
std::atomic_bool & has_executor = group->get_associated_with_executor_atomic();
|
||||
has_executor.store(false);
|
||||
callback_group_removed_impl(group);
|
||||
}
|
||||
}
|
||||
// Disassociate all nodes
|
||||
for (const auto & weak_node : weak_nodes_) {
|
||||
auto node = weak_node.lock();
|
||||
if (node) {
|
||||
std::atomic_bool & has_executor = node->get_associated_with_executor_atomic();
|
||||
has_executor.store(false);
|
||||
node_removed_impl(node);
|
||||
}
|
||||
}
|
||||
|
||||
// Unset callback group notify guard condition executor callback
|
||||
for (const auto & pair : weak_groups_to_guard_conditions_) {
|
||||
auto group = pair.first.lock();
|
||||
if (group) {
|
||||
auto & group_gc = pair.second;
|
||||
unset_guard_condition_callback(group_gc);
|
||||
}
|
||||
}
|
||||
|
||||
weak_clients_map_.clear();
|
||||
weak_services_map_.clear();
|
||||
weak_waitables_map_.clear();
|
||||
weak_subscriptions_map_.clear();
|
||||
weak_nodes_to_guard_conditions_.clear();
|
||||
weak_groups_to_guard_conditions_.clear();
|
||||
|
||||
weak_groups_associated_with_executor_to_nodes_.clear();
|
||||
weak_groups_to_nodes_associated_with_executor_.clear();
|
||||
weak_nodes_.clear();
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutorEntitiesCollector::execute(std::shared_ptr<void> & data)
|
||||
{
|
||||
// This function is called when the associated executor is notified that something changed.
|
||||
// We do not know if an entity has been added or removed so we have to rebuild everything.
|
||||
(void)data;
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
|
||||
timers_manager_->clear();
|
||||
|
||||
// If a registered node has a new callback group, register the group.
|
||||
add_callback_groups_from_nodes_associated_to_executor();
|
||||
|
||||
// For all groups registered in the executor, set their event callbacks.
|
||||
set_entities_event_callbacks_from_map(weak_groups_associated_with_executor_to_nodes_);
|
||||
set_entities_event_callbacks_from_map(weak_groups_to_nodes_associated_with_executor_);
|
||||
}
|
||||
|
||||
std::shared_ptr<void>
|
||||
EventsExecutorEntitiesCollector::take_data()
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
std::shared_ptr<void>
|
||||
EventsExecutorEntitiesCollector::take_data_by_entity_id(size_t id)
|
||||
{
|
||||
(void)id;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutorEntitiesCollector::add_to_wait_set(rcl_wait_set_t * wait_set)
|
||||
{
|
||||
(void)wait_set;
|
||||
}
|
||||
|
||||
bool
|
||||
EventsExecutorEntitiesCollector::is_ready(rcl_wait_set_t * p_wait_set)
|
||||
{
|
||||
(void)p_wait_set;
|
||||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
EventsExecutorEntitiesCollector::add_node(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
|
||||
{
|
||||
bool is_new_node = false;
|
||||
// If the node already has an executor
|
||||
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
|
||||
if (has_executor.exchange(true)) {
|
||||
throw std::runtime_error("Node has already been added to an executor.");
|
||||
}
|
||||
node_ptr->for_each_callback_group(
|
||||
[this, node_ptr, &is_new_node](rclcpp::CallbackGroup::SharedPtr group_ptr)
|
||||
{
|
||||
if (
|
||||
!group_ptr->get_associated_with_executor_atomic().load() &&
|
||||
group_ptr->automatically_add_to_executor_with_node())
|
||||
{
|
||||
is_new_node = (add_callback_group(
|
||||
group_ptr,
|
||||
node_ptr,
|
||||
weak_groups_to_nodes_associated_with_executor_) ||
|
||||
is_new_node);
|
||||
}
|
||||
});
|
||||
weak_nodes_.push_back(node_ptr);
|
||||
return is_new_node;
|
||||
}
|
||||
|
||||
bool
|
||||
EventsExecutorEntitiesCollector::add_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
|
||||
{
|
||||
// If the callback_group already has an executor
|
||||
std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic();
|
||||
if (has_executor.exchange(true)) {
|
||||
throw std::runtime_error("Callback group has already been added to an executor.");
|
||||
}
|
||||
bool is_new_node = !has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) &&
|
||||
!has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_);
|
||||
rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
|
||||
auto insert_info = weak_groups_to_nodes.insert(
|
||||
std::make_pair(weak_group_ptr, node_ptr));
|
||||
bool was_inserted = insert_info.second;
|
||||
if (!was_inserted) {
|
||||
throw std::runtime_error("Callback group was already added to executor.");
|
||||
}
|
||||
|
||||
if (is_new_node) {
|
||||
node_added_impl(node_ptr);
|
||||
}
|
||||
|
||||
if (node_ptr->get_context()->is_valid()) {
|
||||
auto callback_group_guard_condition =
|
||||
group_ptr->get_notify_guard_condition(node_ptr->get_context());
|
||||
|
||||
rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
|
||||
weak_groups_to_guard_conditions_[weak_group_ptr] = callback_group_guard_condition.get();
|
||||
}
|
||||
|
||||
callback_group_added_impl(group_ptr);
|
||||
|
||||
return is_new_node;
|
||||
}
|
||||
|
||||
bool
|
||||
EventsExecutorEntitiesCollector::add_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
|
||||
{
|
||||
return add_callback_group(group_ptr, node_ptr, weak_groups_associated_with_executor_to_nodes_);
|
||||
}
|
||||
|
||||
bool
|
||||
EventsExecutorEntitiesCollector::remove_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr)
|
||||
{
|
||||
return this->remove_callback_group_from_map(
|
||||
group_ptr,
|
||||
weak_groups_associated_with_executor_to_nodes_);
|
||||
}
|
||||
|
||||
bool
|
||||
EventsExecutorEntitiesCollector::remove_callback_group_from_map(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
|
||||
{
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr;
|
||||
rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
|
||||
auto iter = weak_groups_to_nodes.find(weak_group_ptr);
|
||||
if (iter != weak_groups_to_nodes.end()) {
|
||||
node_ptr = iter->second.lock();
|
||||
if (node_ptr == nullptr) {
|
||||
throw std::runtime_error("Node must not be deleted before its callback group(s).");
|
||||
}
|
||||
weak_groups_to_nodes.erase(iter);
|
||||
callback_group_removed_impl(group_ptr);
|
||||
} else {
|
||||
throw std::runtime_error("Callback group needs to be associated with executor.");
|
||||
}
|
||||
// If the node was matched and removed, interrupt waiting.
|
||||
bool node_removed = false;
|
||||
if (!has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) &&
|
||||
!has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_))
|
||||
{
|
||||
node_removed_impl(node_ptr);
|
||||
node_removed = true;
|
||||
}
|
||||
|
||||
weak_groups_to_guard_conditions_.erase(weak_group_ptr);
|
||||
|
||||
return node_removed;
|
||||
}
|
||||
|
||||
bool
|
||||
EventsExecutorEntitiesCollector::remove_node(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
|
||||
{
|
||||
if (!node_ptr->get_associated_with_executor_atomic().load()) {
|
||||
return false;
|
||||
}
|
||||
bool node_found = false;
|
||||
auto node_it = weak_nodes_.begin();
|
||||
while (node_it != weak_nodes_.end()) {
|
||||
bool matched = (node_it->lock() == node_ptr);
|
||||
if (matched) {
|
||||
weak_nodes_.erase(node_it);
|
||||
node_found = true;
|
||||
break;
|
||||
}
|
||||
++node_it;
|
||||
}
|
||||
if (!node_found) {
|
||||
return false;
|
||||
}
|
||||
std::vector<rclcpp::CallbackGroup::SharedPtr> found_group_ptrs;
|
||||
std::for_each(
|
||||
weak_groups_to_nodes_associated_with_executor_.begin(),
|
||||
weak_groups_to_nodes_associated_with_executor_.end(),
|
||||
[&found_group_ptrs, node_ptr](std::pair<rclcpp::CallbackGroup::WeakPtr,
|
||||
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> key_value_pair) {
|
||||
auto & weak_node_ptr = key_value_pair.second;
|
||||
auto shared_node_ptr = weak_node_ptr.lock();
|
||||
auto group_ptr = key_value_pair.first.lock();
|
||||
if (shared_node_ptr == node_ptr) {
|
||||
found_group_ptrs.push_back(group_ptr);
|
||||
}
|
||||
});
|
||||
std::for_each(
|
||||
found_group_ptrs.begin(), found_group_ptrs.end(), [this]
|
||||
(rclcpp::CallbackGroup::SharedPtr group_ptr) {
|
||||
this->remove_callback_group_from_map(
|
||||
group_ptr,
|
||||
weak_groups_to_nodes_associated_with_executor_);
|
||||
});
|
||||
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
|
||||
has_executor.store(false);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Returns true iff the weak_groups_to_nodes map has node_ptr as the value in any of its entry.
|
||||
bool
|
||||
EventsExecutorEntitiesCollector::has_node(
|
||||
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
|
||||
weak_groups_to_nodes) const
|
||||
{
|
||||
return std::find_if(
|
||||
weak_groups_to_nodes.begin(),
|
||||
weak_groups_to_nodes.end(),
|
||||
[&](const WeakCallbackGroupsToNodesMap::value_type & other) -> bool {
|
||||
auto other_ptr = other.second.lock();
|
||||
return other_ptr == node_ptr;
|
||||
}) != weak_groups_to_nodes.end();
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutorEntitiesCollector::add_callback_groups_from_nodes_associated_to_executor()
|
||||
{
|
||||
for (const auto & weak_node : weak_nodes_) {
|
||||
auto node = weak_node.lock();
|
||||
if (node) {
|
||||
node->for_each_callback_group(
|
||||
[this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr)
|
||||
{
|
||||
if (shared_group_ptr->automatically_add_to_executor_with_node() &&
|
||||
!shared_group_ptr->get_associated_with_executor_atomic().load())
|
||||
{
|
||||
add_callback_group(
|
||||
shared_group_ptr,
|
||||
node,
|
||||
weak_groups_to_nodes_associated_with_executor_);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
EventsExecutorEntitiesCollector::get_all_callback_groups()
|
||||
{
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
|
||||
for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
|
||||
groups.push_back(group_node_ptr.first);
|
||||
}
|
||||
for (const auto & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
|
||||
groups.push_back(group_node_ptr.first);
|
||||
}
|
||||
return groups;
|
||||
}
|
||||
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
EventsExecutorEntitiesCollector::get_manually_added_callback_groups()
|
||||
{
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
|
||||
for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
|
||||
groups.push_back(group_node_ptr.first);
|
||||
}
|
||||
return groups;
|
||||
}
|
||||
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
EventsExecutorEntitiesCollector::get_automatically_added_callback_groups_from_nodes()
|
||||
{
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
|
||||
for (const auto & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
|
||||
groups.push_back(group_node_ptr.first);
|
||||
}
|
||||
return groups;
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutorEntitiesCollector::callback_group_added_impl(
|
||||
rclcpp::CallbackGroup::SharedPtr group)
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
|
||||
|
||||
rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group;
|
||||
|
||||
auto iter = weak_groups_to_guard_conditions_.find(weak_group_ptr);
|
||||
if (iter != weak_groups_to_guard_conditions_.end()) {
|
||||
// Set an event callback for the group's notify guard condition, so if new entities are added
|
||||
// or removed to this node we will receive an event.
|
||||
set_guard_condition_callback(iter->second);
|
||||
}
|
||||
// For all entities in the callback group, set their event callback
|
||||
set_callback_group_entities_callbacks(group);
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutorEntitiesCollector::node_added_impl(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node)
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
|
||||
|
||||
auto notify_guard_condition = &(node->get_notify_guard_condition());
|
||||
// Set an event callback for the node's notify guard condition, so if new entities are added
|
||||
// or removed to this node we will receive an event.
|
||||
set_guard_condition_callback(notify_guard_condition);
|
||||
|
||||
// Store node's notify guard condition
|
||||
weak_nodes_to_guard_conditions_[node] = notify_guard_condition;
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutorEntitiesCollector::callback_group_removed_impl(
|
||||
rclcpp::CallbackGroup::SharedPtr group)
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
|
||||
// For all the entities in the group, unset their callbacks
|
||||
unset_callback_group_entities_callbacks(group);
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutorEntitiesCollector::node_removed_impl(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node)
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
|
||||
// Node doesn't have more callback groups associated to the executor.
|
||||
// Unset the event callback for the node's notify guard condition, to stop
|
||||
// receiving events if entities are added or removed to this node.
|
||||
unset_guard_condition_callback(&(node->get_notify_guard_condition()));
|
||||
|
||||
// Remove guard condition from list
|
||||
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr weak_node_ptr(node);
|
||||
weak_nodes_to_guard_conditions_.erase(weak_node_ptr);
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutorEntitiesCollector::set_entities_event_callbacks_from_map(
|
||||
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
|
||||
{
|
||||
for (const auto & pair : weak_groups_to_nodes) {
|
||||
auto group = pair.first.lock();
|
||||
auto node = pair.second.lock();
|
||||
if (!node || !group || !group->can_be_taken_from().load()) {
|
||||
continue;
|
||||
}
|
||||
set_callback_group_entities_callbacks(group);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutorEntitiesCollector::set_callback_group_entities_callbacks(
|
||||
rclcpp::CallbackGroup::SharedPtr group)
|
||||
{
|
||||
// Timers are handled by the timers manager
|
||||
group->find_timer_ptrs_if(
|
||||
[this](const rclcpp::TimerBase::SharedPtr & timer) {
|
||||
if (timer) {
|
||||
timers_manager_->add_timer(timer);
|
||||
}
|
||||
return false;
|
||||
});
|
||||
|
||||
// Set callbacks for all other entity types
|
||||
group->find_subscription_ptrs_if(
|
||||
[this](const rclcpp::SubscriptionBase::SharedPtr & subscription) {
|
||||
if (subscription) {
|
||||
weak_subscriptions_map_.emplace(subscription.get(), subscription);
|
||||
|
||||
subscription->set_on_new_message_callback(
|
||||
create_entity_callback(subscription.get(), ExecutorEventType::SUBSCRIPTION_EVENT));
|
||||
}
|
||||
return false;
|
||||
});
|
||||
group->find_service_ptrs_if(
|
||||
[this](const rclcpp::ServiceBase::SharedPtr & service) {
|
||||
if (service) {
|
||||
weak_services_map_.emplace(service.get(), service);
|
||||
|
||||
service->set_on_new_request_callback(
|
||||
create_entity_callback(service.get(), ExecutorEventType::SERVICE_EVENT));
|
||||
}
|
||||
return false;
|
||||
});
|
||||
group->find_client_ptrs_if(
|
||||
[this](const rclcpp::ClientBase::SharedPtr & client) {
|
||||
if (client) {
|
||||
weak_clients_map_.emplace(client.get(), client);
|
||||
|
||||
client->set_on_new_response_callback(
|
||||
create_entity_callback(client.get(), ExecutorEventType::CLIENT_EVENT));
|
||||
}
|
||||
return false;
|
||||
});
|
||||
group->find_waitable_ptrs_if(
|
||||
[this](const rclcpp::Waitable::SharedPtr & waitable) {
|
||||
if (waitable) {
|
||||
weak_waitables_map_.emplace(waitable.get(), waitable);
|
||||
|
||||
waitable->set_on_ready_callback(
|
||||
create_waitable_callback(waitable.get()));
|
||||
}
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutorEntitiesCollector::unset_callback_group_entities_callbacks(
|
||||
rclcpp::CallbackGroup::SharedPtr group)
|
||||
{
|
||||
auto iter = weak_groups_to_guard_conditions_.find(group);
|
||||
|
||||
if (iter != weak_groups_to_guard_conditions_.end()) {
|
||||
unset_guard_condition_callback(iter->second);
|
||||
}
|
||||
|
||||
// Timers are handled by the timers manager
|
||||
group->find_timer_ptrs_if(
|
||||
[this](const rclcpp::TimerBase::SharedPtr & timer) {
|
||||
if (timer) {
|
||||
timers_manager_->remove_timer(timer);
|
||||
}
|
||||
return false;
|
||||
});
|
||||
|
||||
// Unset callbacks for all other entity types
|
||||
group->find_subscription_ptrs_if(
|
||||
[this](const rclcpp::SubscriptionBase::SharedPtr & subscription) {
|
||||
if (subscription) {
|
||||
subscription->clear_on_new_message_callback();
|
||||
weak_subscriptions_map_.erase(subscription.get());
|
||||
}
|
||||
return false;
|
||||
});
|
||||
group->find_service_ptrs_if(
|
||||
[this](const rclcpp::ServiceBase::SharedPtr & service) {
|
||||
if (service) {
|
||||
service->clear_on_new_request_callback();
|
||||
weak_services_map_.erase(service.get());
|
||||
}
|
||||
return false;
|
||||
});
|
||||
group->find_client_ptrs_if(
|
||||
[this](const rclcpp::ClientBase::SharedPtr & client) {
|
||||
if (client) {
|
||||
client->clear_on_new_response_callback();
|
||||
weak_clients_map_.erase(client.get());
|
||||
}
|
||||
return false;
|
||||
});
|
||||
group->find_waitable_ptrs_if(
|
||||
[this](const rclcpp::Waitable::SharedPtr & waitable) {
|
||||
if (waitable) {
|
||||
waitable->clear_on_ready_callback();
|
||||
weak_waitables_map_.erase(waitable.get());
|
||||
}
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutorEntitiesCollector::set_guard_condition_callback(
|
||||
rclcpp::GuardCondition * guard_condition)
|
||||
{
|
||||
auto gc_callback = [this](size_t num_events) {
|
||||
// Override num events (we don't care more than a single event)
|
||||
num_events = 1;
|
||||
int gc_id = -1;
|
||||
ExecutorEvent event = {this, gc_id, ExecutorEventType::WAITABLE_EVENT, num_events};
|
||||
associated_executor_->events_queue_->enqueue(event);
|
||||
};
|
||||
|
||||
guard_condition->set_on_trigger_callback(gc_callback);
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutorEntitiesCollector::unset_guard_condition_callback(
|
||||
rclcpp::GuardCondition * guard_condition)
|
||||
{
|
||||
guard_condition->set_on_trigger_callback(nullptr);
|
||||
}
|
||||
|
||||
rclcpp::SubscriptionBase::SharedPtr
|
||||
EventsExecutorEntitiesCollector::get_subscription(const void * subscription_id)
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
|
||||
|
||||
auto it = weak_subscriptions_map_.find(subscription_id);
|
||||
|
||||
if (it != weak_subscriptions_map_.end()) {
|
||||
auto subscription_weak_ptr = it->second;
|
||||
auto subscription_shared_ptr = subscription_weak_ptr.lock();
|
||||
|
||||
if (subscription_shared_ptr) {
|
||||
return subscription_shared_ptr;
|
||||
}
|
||||
|
||||
// The subscription expired, remove from map
|
||||
weak_subscriptions_map_.erase(it);
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
rclcpp::ClientBase::SharedPtr
|
||||
EventsExecutorEntitiesCollector::get_client(const void * client_id)
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
|
||||
|
||||
auto it = weak_clients_map_.find(client_id);
|
||||
|
||||
if (it != weak_clients_map_.end()) {
|
||||
auto client_weak_ptr = it->second;
|
||||
auto client_shared_ptr = client_weak_ptr.lock();
|
||||
|
||||
if (client_shared_ptr) {
|
||||
return client_shared_ptr;
|
||||
}
|
||||
|
||||
// The client expired, remove from map
|
||||
weak_clients_map_.erase(it);
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
rclcpp::ServiceBase::SharedPtr
|
||||
EventsExecutorEntitiesCollector::get_service(const void * service_id)
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
|
||||
|
||||
auto it = weak_services_map_.find(service_id);
|
||||
|
||||
if (it != weak_services_map_.end()) {
|
||||
auto service_weak_ptr = it->second;
|
||||
auto service_shared_ptr = service_weak_ptr.lock();
|
||||
|
||||
if (service_shared_ptr) {
|
||||
return service_shared_ptr;
|
||||
}
|
||||
|
||||
// The service expired, remove from map
|
||||
weak_services_map_.erase(it);
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
rclcpp::Waitable::SharedPtr
|
||||
EventsExecutorEntitiesCollector::get_waitable(const void * waitable_id)
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
|
||||
|
||||
auto it = weak_waitables_map_.find(waitable_id);
|
||||
|
||||
if (it != weak_waitables_map_.end()) {
|
||||
auto waitable_weak_ptr = it->second;
|
||||
auto waitable_shared_ptr = waitable_weak_ptr.lock();
|
||||
|
||||
if (waitable_shared_ptr) {
|
||||
return waitable_shared_ptr;
|
||||
}
|
||||
|
||||
// The waitable expired, remove from map
|
||||
weak_waitables_map_.erase(it);
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutorEntitiesCollector::add_waitable(rclcpp::Waitable::SharedPtr waitable)
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
|
||||
|
||||
weak_waitables_map_.emplace(waitable.get(), waitable);
|
||||
|
||||
waitable->set_on_ready_callback(
|
||||
create_waitable_callback(waitable.get()));
|
||||
}
|
||||
|
||||
std::function<void(size_t)>
|
||||
EventsExecutorEntitiesCollector::create_entity_callback(
|
||||
void * exec_entity_id, ExecutorEventType event_type)
|
||||
{
|
||||
std::function<void(size_t)>
|
||||
callback = [this, exec_entity_id, event_type](size_t num_events) {
|
||||
ExecutorEvent event = {exec_entity_id, -1, event_type, num_events};
|
||||
associated_executor_->events_queue_->enqueue(event);
|
||||
};
|
||||
return callback;
|
||||
}
|
||||
|
||||
std::function<void(size_t, int)>
|
||||
EventsExecutorEntitiesCollector::create_waitable_callback(void * exec_entity_id)
|
||||
{
|
||||
std::function<void(size_t, int)>
|
||||
callback = [this, exec_entity_id](size_t num_events, int gen_entity_id) {
|
||||
ExecutorEvent event =
|
||||
{exec_entity_id, gen_entity_id, ExecutorEventType::WAITABLE_EVENT, num_events};
|
||||
associated_executor_->events_queue_->enqueue(event);
|
||||
};
|
||||
return callback;
|
||||
}
|
||||
311
rclcpp/src/rclcpp/experimental/timers_manager.cpp
Normal file
311
rclcpp/src/rclcpp/experimental/timers_manager.cpp
Normal file
@@ -0,0 +1,311 @@
|
||||
// Copyright 2023 iRobot Corporation.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "rclcpp/experimental/timers_manager.hpp"
|
||||
|
||||
#include <inttypes.h>
|
||||
|
||||
#include <ctime>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <stdexcept>
|
||||
|
||||
using rclcpp::experimental::TimersManager;
|
||||
|
||||
TimersManager::TimersManager(
|
||||
std::shared_ptr<rclcpp::Context> context,
|
||||
std::function<void(void *)> on_ready_callback)
|
||||
{
|
||||
context_ = context;
|
||||
on_ready_callback_ = on_ready_callback;
|
||||
}
|
||||
|
||||
TimersManager::~TimersManager()
|
||||
{
|
||||
// Remove all timers
|
||||
this->clear();
|
||||
|
||||
// Make sure timers thread is stopped before destroying this object
|
||||
this->stop();
|
||||
}
|
||||
|
||||
void TimersManager::add_timer(rclcpp::TimerBase::SharedPtr timer)
|
||||
{
|
||||
if (!timer) {
|
||||
throw std::invalid_argument("TimersManager::add_timer() trying to add nullptr timer");
|
||||
}
|
||||
|
||||
bool added = false;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(timers_mutex_);
|
||||
added = weak_timers_heap_.add_timer(timer);
|
||||
timers_updated_ = timers_updated_ || added;
|
||||
}
|
||||
|
||||
timer->set_on_reset_callback(
|
||||
[this](size_t arg) {
|
||||
{
|
||||
(void)arg;
|
||||
std::unique_lock<std::mutex> lock(timers_mutex_);
|
||||
timers_updated_ = true;
|
||||
}
|
||||
timers_cv_.notify_one();
|
||||
});
|
||||
|
||||
if (added) {
|
||||
// Notify that a timer has been added
|
||||
timers_cv_.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
void TimersManager::start()
|
||||
{
|
||||
// Make sure that the thread is not already running
|
||||
if (running_.exchange(true)) {
|
||||
throw std::runtime_error("TimersManager::start() can't start timers thread as already running");
|
||||
}
|
||||
|
||||
timers_thread_ = std::thread(&TimersManager::run_timers, this);
|
||||
}
|
||||
|
||||
void TimersManager::stop()
|
||||
{
|
||||
// Lock stop() function to prevent race condition in destructor
|
||||
std::unique_lock<std::mutex> lock(stop_mutex_);
|
||||
running_ = false;
|
||||
|
||||
// Notify the timers manager thread to wake up
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(timers_mutex_);
|
||||
timers_updated_ = true;
|
||||
}
|
||||
timers_cv_.notify_one();
|
||||
|
||||
// Join timers thread if it's running
|
||||
if (timers_thread_.joinable()) {
|
||||
timers_thread_.join();
|
||||
}
|
||||
}
|
||||
|
||||
std::chrono::nanoseconds TimersManager::get_head_timeout()
|
||||
{
|
||||
// Do not allow to interfere with the thread running
|
||||
if (running_) {
|
||||
throw std::runtime_error(
|
||||
"get_head_timeout() can't be used while timers thread is running");
|
||||
}
|
||||
|
||||
std::unique_lock<std::mutex> lock(timers_mutex_);
|
||||
return this->get_head_timeout_unsafe();
|
||||
}
|
||||
|
||||
size_t TimersManager::get_number_ready_timers()
|
||||
{
|
||||
// Do not allow to interfere with the thread running
|
||||
if (running_) {
|
||||
throw std::runtime_error(
|
||||
"get_number_ready_timers() can't be used while timers thread is running");
|
||||
}
|
||||
|
||||
std::unique_lock<std::mutex> lock(timers_mutex_);
|
||||
TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
|
||||
return locked_heap.get_number_ready_timers();
|
||||
}
|
||||
|
||||
void TimersManager::execute_ready_timers()
|
||||
{
|
||||
// Do not allow to interfere with the thread running
|
||||
if (running_) {
|
||||
throw std::runtime_error(
|
||||
"execute_ready_timers() can't be used while timers thread is running");
|
||||
}
|
||||
|
||||
std::unique_lock<std::mutex> lock(timers_mutex_);
|
||||
this->execute_ready_timers_unsafe();
|
||||
}
|
||||
|
||||
bool TimersManager::execute_head_timer()
|
||||
{
|
||||
// Do not allow to interfere with the thread running
|
||||
if (running_) {
|
||||
throw std::runtime_error(
|
||||
"execute_head_timer() can't be used while timers thread is running");
|
||||
}
|
||||
|
||||
std::unique_lock<std::mutex> lock(timers_mutex_);
|
||||
|
||||
TimersHeap timers_heap = weak_timers_heap_.validate_and_lock();
|
||||
|
||||
// Nothing to do if we don't have any timer
|
||||
if (timers_heap.empty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
TimerPtr head_timer = timers_heap.front();
|
||||
|
||||
const bool timer_ready = head_timer->is_ready();
|
||||
if (timer_ready) {
|
||||
head_timer->call();
|
||||
if (on_ready_callback_) {
|
||||
on_ready_callback_(head_timer.get());
|
||||
} else {
|
||||
head_timer->execute_callback();
|
||||
}
|
||||
timers_heap.heapify_root();
|
||||
weak_timers_heap_.store(timers_heap);
|
||||
}
|
||||
|
||||
return timer_ready;
|
||||
}
|
||||
|
||||
void TimersManager::execute_ready_timer(const void * timer_id)
|
||||
{
|
||||
TimerPtr ready_timer;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(timers_mutex_);
|
||||
ready_timer = weak_timers_heap_.get_timer(timer_id);
|
||||
}
|
||||
if (ready_timer) {
|
||||
ready_timer->execute_callback();
|
||||
}
|
||||
}
|
||||
|
||||
std::chrono::nanoseconds TimersManager::get_head_timeout_unsafe()
|
||||
{
|
||||
// If we don't have any weak pointer, then we just return maximum timeout
|
||||
if (weak_timers_heap_.empty()) {
|
||||
return std::chrono::nanoseconds::max();
|
||||
}
|
||||
// Weak heap is not empty, so try to lock the first element.
|
||||
// If it is still a valid pointer, it is guaranteed to be the correct head
|
||||
TimerPtr head_timer = weak_timers_heap_.front().lock();
|
||||
|
||||
if (!head_timer) {
|
||||
// The first element has expired, we can't make other assumptions on the heap
|
||||
// and we need to entirely validate it.
|
||||
TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
|
||||
// NOTE: the following operations will not modify any element in the heap, so we
|
||||
// don't have to call `weak_timers_heap_.store(locked_heap)` at the end.
|
||||
|
||||
if (locked_heap.empty()) {
|
||||
return std::chrono::nanoseconds::max();
|
||||
}
|
||||
head_timer = locked_heap.front();
|
||||
}
|
||||
|
||||
return head_timer->time_until_trigger();
|
||||
}
|
||||
|
||||
void TimersManager::execute_ready_timers_unsafe()
|
||||
{
|
||||
// We start by locking the timers
|
||||
TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
|
||||
|
||||
// Nothing to do if we don't have any timer
|
||||
if (locked_heap.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Keep executing timers until they are ready and they were already ready when we started.
|
||||
// The two checks prevent this function from blocking indefinitely if the
|
||||
// time required for executing the timers is longer than their period.
|
||||
|
||||
TimerPtr head_timer = locked_heap.front();
|
||||
const size_t number_ready_timers = locked_heap.get_number_ready_timers();
|
||||
size_t executed_timers = 0;
|
||||
while (executed_timers < number_ready_timers && head_timer->is_ready()) {
|
||||
head_timer->call();
|
||||
if (on_ready_callback_) {
|
||||
on_ready_callback_(head_timer.get());
|
||||
} else {
|
||||
head_timer->execute_callback();
|
||||
}
|
||||
|
||||
executed_timers++;
|
||||
// Executing a timer will result in updating its time_until_trigger, so re-heapify
|
||||
locked_heap.heapify_root();
|
||||
// Get new head timer
|
||||
head_timer = locked_heap.front();
|
||||
}
|
||||
|
||||
// After having performed work on the locked heap we reflect the changes to weak one.
|
||||
// Timers will be already sorted the next time we need them if none went out of scope.
|
||||
weak_timers_heap_.store(locked_heap);
|
||||
}
|
||||
|
||||
void TimersManager::run_timers()
|
||||
{
|
||||
while (rclcpp::ok(context_) && running_) {
|
||||
// Lock mutex
|
||||
std::unique_lock<std::mutex> lock(timers_mutex_);
|
||||
|
||||
std::chrono::nanoseconds time_to_sleep = get_head_timeout_unsafe();
|
||||
|
||||
// No need to wait if a timer is already available
|
||||
if (time_to_sleep > std::chrono::nanoseconds::zero()) {
|
||||
if (time_to_sleep != std::chrono::nanoseconds::max()) {
|
||||
// Wait until timeout or notification that timers have been updated
|
||||
timers_cv_.wait_for(lock, time_to_sleep, [this]() {return timers_updated_;});
|
||||
} else {
|
||||
// Wait until notification that timers have been updated
|
||||
timers_cv_.wait(lock, [this]() {return timers_updated_;});
|
||||
}
|
||||
}
|
||||
|
||||
// Reset timers updated flag
|
||||
timers_updated_ = false;
|
||||
|
||||
// Execute timers
|
||||
this->execute_ready_timers_unsafe();
|
||||
}
|
||||
|
||||
// Make sure the running flag is set to false when we exit from this function
|
||||
// to allow restarting the timers thread.
|
||||
running_ = false;
|
||||
}
|
||||
|
||||
void TimersManager::clear()
|
||||
{
|
||||
{
|
||||
// Lock mutex and then clear all data structures
|
||||
std::unique_lock<std::mutex> lock(timers_mutex_);
|
||||
|
||||
TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
|
||||
locked_heap.clear_callbacks();
|
||||
|
||||
weak_timers_heap_.clear();
|
||||
|
||||
timers_updated_ = true;
|
||||
}
|
||||
|
||||
// Notify timers thread such that it can re-compute its timeout
|
||||
timers_cv_.notify_one();
|
||||
}
|
||||
|
||||
void TimersManager::remove_timer(TimerPtr timer)
|
||||
{
|
||||
bool removed = false;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(timers_mutex_);
|
||||
removed = weak_timers_heap_.remove_timer(timer);
|
||||
|
||||
timers_updated_ = timers_updated_ || removed;
|
||||
}
|
||||
|
||||
if (removed) {
|
||||
// Notify timers thread such that it can re-compute its timeout
|
||||
timers_cv_.notify_one();
|
||||
timer->clear_on_reset_callback();
|
||||
}
|
||||
}
|
||||
@@ -618,6 +618,14 @@ if(TARGET test_timer)
|
||||
target_link_libraries(test_timer ${PROJECT_NAME} mimick)
|
||||
endif()
|
||||
|
||||
ament_add_gtest(test_timers_manager test_timers_manager.cpp
|
||||
APPEND_LIBRARY_DIRS "${append_library_dirs}")
|
||||
if(TARGET test_timers_manager)
|
||||
ament_target_dependencies(test_timers_manager
|
||||
"rcl")
|
||||
target_link_libraries(test_timers_manager ${PROJECT_NAME} mimick)
|
||||
endif()
|
||||
|
||||
ament_add_gtest(test_time_source test_time_source.cpp
|
||||
APPEND_LIBRARY_DIRS "${append_library_dirs}")
|
||||
if(TARGET test_time_source)
|
||||
@@ -688,6 +696,22 @@ if(TARGET test_static_executor_entities_collector)
|
||||
target_link_libraries(test_static_executor_entities_collector ${PROJECT_NAME} mimick)
|
||||
endif()
|
||||
|
||||
ament_add_gtest(test_events_executor executors/test_events_executor.cpp
|
||||
APPEND_LIBRARY_DIRS "${append_library_dirs}")
|
||||
if(TARGET test_events_executor)
|
||||
ament_target_dependencies(test_events_executor
|
||||
"test_msgs")
|
||||
target_link_libraries(test_events_executor ${PROJECT_NAME})
|
||||
endif()
|
||||
|
||||
ament_add_gtest(test_events_queue executors/test_events_queue.cpp
|
||||
APPEND_LIBRARY_DIRS "${append_library_dirs}")
|
||||
if(TARGET test_events_queue)
|
||||
ament_target_dependencies(test_events_queue
|
||||
"test_msgs")
|
||||
target_link_libraries(test_events_queue ${PROJECT_NAME})
|
||||
endif()
|
||||
|
||||
ament_add_gtest(test_guard_condition test_guard_condition.cpp
|
||||
APPEND_LIBRARY_DIRS "${append_library_dirs}")
|
||||
if(TARGET test_guard_condition)
|
||||
|
||||
499
rclcpp/test/rclcpp/executors/test_events_executor.cpp
Normal file
499
rclcpp/test/rclcpp/executors/test_events_executor.cpp
Normal file
@@ -0,0 +1,499 @@
|
||||
// Copyright 2023 iRobot Corporation.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include "rclcpp/experimental/executors/events_executor/events_executor.hpp"
|
||||
|
||||
#include "test_msgs/srv/empty.hpp"
|
||||
#include "test_msgs/msg/empty.hpp"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
using rclcpp::experimental::executors::EventsExecutor;
|
||||
|
||||
class TestEventsExecutor : public ::testing::Test
|
||||
{
|
||||
public:
|
||||
void SetUp()
|
||||
{
|
||||
rclcpp::init(0, nullptr);
|
||||
}
|
||||
|
||||
void TearDown()
|
||||
{
|
||||
rclcpp::shutdown();
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(TestEventsExecutor, run_clients_servers)
|
||||
{
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
auto node = std::make_shared<rclcpp::Node>("node");
|
||||
|
||||
bool request_received = false;
|
||||
bool response_received = false;
|
||||
auto service =
|
||||
node->create_service<test_msgs::srv::Empty>(
|
||||
"service",
|
||||
[&request_received](
|
||||
const test_msgs::srv::Empty::Request::SharedPtr,
|
||||
test_msgs::srv::Empty::Response::SharedPtr)
|
||||
{
|
||||
request_received = true;
|
||||
});
|
||||
auto client = node->create_client<test_msgs::srv::Empty>("service");
|
||||
|
||||
EventsExecutor executor;
|
||||
executor.add_node(node);
|
||||
|
||||
bool spin_exited = false;
|
||||
std::thread spinner([&spin_exited, &executor, this]() {
|
||||
executor.spin();
|
||||
spin_exited = true;
|
||||
});
|
||||
|
||||
auto request = std::make_shared<test_msgs::srv::Empty::Request>();
|
||||
client->async_send_request(
|
||||
request,
|
||||
[&response_received](rclcpp::Client<test_msgs::srv::Empty>::SharedFuture result_future) {
|
||||
(void)result_future;
|
||||
response_received = true;
|
||||
});
|
||||
|
||||
// Wait some time for the client-server to be invoked
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
while (
|
||||
!response_received &&
|
||||
!spin_exited &&
|
||||
(std::chrono::steady_clock::now() - start < 1s))
|
||||
{
|
||||
std::this_thread::sleep_for(5ms);
|
||||
}
|
||||
|
||||
executor.cancel();
|
||||
spinner.join();
|
||||
executor.remove_node(node);
|
||||
|
||||
EXPECT_TRUE(request_received);
|
||||
EXPECT_TRUE(response_received);
|
||||
EXPECT_TRUE(spin_exited);
|
||||
}
|
||||
|
||||
TEST_F(TestEventsExecutor, spin_once_max_duration_timeout)
|
||||
{
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
auto node = std::make_shared<rclcpp::Node>("node");
|
||||
|
||||
EventsExecutor executor;
|
||||
executor.add_node(node);
|
||||
|
||||
// Consume previous events so we have a fresh start
|
||||
executor.spin_all(1s);
|
||||
|
||||
size_t t_runs = 0;
|
||||
auto t = node->create_wall_timer(
|
||||
10s,
|
||||
[&]() {
|
||||
t_runs++;
|
||||
});
|
||||
|
||||
// This first spin_once takes care of the waitable event
|
||||
// generated by the addition of the timer to the node
|
||||
executor.spin_once(1s);
|
||||
EXPECT_EQ(0u, t_runs);
|
||||
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
|
||||
// This second spin_once should take care of the timer,
|
||||
executor.spin_once(10ms);
|
||||
|
||||
// but doesn't spin the time enough to call the timer callback.
|
||||
EXPECT_EQ(0u, t_runs);
|
||||
EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms);
|
||||
}
|
||||
|
||||
// FIX THIS TEST! The entities collector is being called too many times!
|
||||
/*
|
||||
TEST_F(TestEventsExecutor, spin_once_max_duration_timer)
|
||||
{
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
auto node = std::make_shared<rclcpp::Node>("node");
|
||||
|
||||
EventsExecutor executor;
|
||||
executor.add_node(node);
|
||||
|
||||
// Consume previous events so we have a fresh start
|
||||
executor.spin_all(1s);
|
||||
|
||||
size_t t_runs = 0;
|
||||
auto t = node->create_wall_timer(
|
||||
10ms,
|
||||
[&]() {
|
||||
t_runs++;
|
||||
});
|
||||
|
||||
// This first spin_once takes care of the waitable event
|
||||
// generated by the addition of the timer to the node
|
||||
executor.spin_once(1s);
|
||||
EXPECT_EQ(0u, t_runs);
|
||||
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
|
||||
// This second spin_once should take care of the timer
|
||||
executor.spin_once(11ms);
|
||||
|
||||
EXPECT_EQ(1u, t_runs);
|
||||
EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms);
|
||||
}
|
||||
*/
|
||||
|
||||
TEST_F(TestEventsExecutor, spin_some_max_duration)
|
||||
{
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
{
|
||||
auto node = std::make_shared<rclcpp::Node>("node");
|
||||
|
||||
size_t t_runs = 0;
|
||||
auto t = node->create_wall_timer(
|
||||
10s,
|
||||
[&]() {
|
||||
t_runs++;
|
||||
});
|
||||
|
||||
EventsExecutor executor;
|
||||
executor.add_node(node);
|
||||
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
executor.spin_some(10ms);
|
||||
|
||||
EXPECT_EQ(0u, t_runs);
|
||||
EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms);
|
||||
}
|
||||
|
||||
{
|
||||
auto node = std::make_shared<rclcpp::Node>("node");
|
||||
|
||||
size_t t_runs = 0;
|
||||
auto t = node->create_wall_timer(
|
||||
10ms,
|
||||
[&]() {
|
||||
t_runs++;
|
||||
});
|
||||
|
||||
// Sleep some time for the timer to be ready when spin
|
||||
std::this_thread::sleep_for(10ms);
|
||||
|
||||
EventsExecutor executor;
|
||||
executor.add_node(node);
|
||||
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
executor.spin_some(10s);
|
||||
|
||||
EXPECT_EQ(1u, t_runs);
|
||||
EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestEventsExecutor, spin_some_zero_duration)
|
||||
{
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
auto node = std::make_shared<rclcpp::Node>("node");
|
||||
|
||||
size_t t_runs = 0;
|
||||
auto t = node->create_wall_timer(
|
||||
20ms,
|
||||
[&]() {
|
||||
t_runs++;
|
||||
});
|
||||
|
||||
// Sleep some time for the timer to be ready when spin
|
||||
std::this_thread::sleep_for(20ms);
|
||||
|
||||
EventsExecutor executor;
|
||||
executor.add_node(node);
|
||||
executor.spin_some(0ms);
|
||||
|
||||
EXPECT_EQ(1u, t_runs);
|
||||
}
|
||||
|
||||
TEST_F(TestEventsExecutor, spin_all_max_duration)
|
||||
{
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
{
|
||||
auto node = std::make_shared<rclcpp::Node>("node");
|
||||
|
||||
size_t t_runs = 0;
|
||||
auto t = node->create_wall_timer(
|
||||
10s,
|
||||
[&]() {
|
||||
t_runs++;
|
||||
});
|
||||
|
||||
EventsExecutor executor;
|
||||
executor.add_node(node);
|
||||
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
executor.spin_all(10ms);
|
||||
|
||||
EXPECT_EQ(0u, t_runs);
|
||||
EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms);
|
||||
}
|
||||
|
||||
{
|
||||
auto node = std::make_shared<rclcpp::Node>("node");
|
||||
|
||||
size_t t_runs = 0;
|
||||
auto t = node->create_wall_timer(
|
||||
10ms,
|
||||
[&]() {
|
||||
t_runs++;
|
||||
});
|
||||
|
||||
// Sleep some time for the timer to be ready when spin
|
||||
std::this_thread::sleep_for(10ms);
|
||||
|
||||
EventsExecutor executor;
|
||||
executor.add_node(node);
|
||||
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
executor.spin_all(10s);
|
||||
|
||||
EXPECT_EQ(1u, t_runs);
|
||||
EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms);
|
||||
}
|
||||
|
||||
EventsExecutor executor;
|
||||
EXPECT_THROW(executor.spin_all(0ms), std::invalid_argument);
|
||||
EXPECT_THROW(executor.spin_all(-5ms), std::invalid_argument);
|
||||
}
|
||||
|
||||
TEST_F(TestEventsExecutor, cancel_while_timers_running)
|
||||
{
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
auto node = std::make_shared<rclcpp::Node>("node");
|
||||
|
||||
EventsExecutor executor;
|
||||
executor.add_node(node);
|
||||
|
||||
// Take care of previous events for a fresh start
|
||||
executor.spin_all(1s);
|
||||
|
||||
size_t t1_runs = 0;
|
||||
auto t1 = node->create_wall_timer(
|
||||
1ms,
|
||||
[&]() {
|
||||
t1_runs++;
|
||||
std::this_thread::sleep_for(50ms);
|
||||
});
|
||||
|
||||
size_t t2_runs = 0;
|
||||
auto t2 = node->create_wall_timer(
|
||||
1ms,
|
||||
[&]() {
|
||||
t2_runs++;
|
||||
std::this_thread::sleep_for(50ms);
|
||||
});
|
||||
|
||||
|
||||
std::thread spinner([&executor, this]() {executor.spin();});
|
||||
|
||||
std::this_thread::sleep_for(10ms);
|
||||
// Call cancel while t1 callback is still being executed
|
||||
executor.cancel();
|
||||
spinner.join();
|
||||
|
||||
// Depending on the latency on the system, t2 may start to execute before cancel is signaled
|
||||
EXPECT_GE(1u, t1_runs);
|
||||
EXPECT_GE(1u, t2_runs);
|
||||
}
|
||||
|
||||
TEST_F(TestEventsExecutor, cancel_while_timers_waiting)
|
||||
{
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
auto node = std::make_shared<rclcpp::Node>("node");
|
||||
|
||||
size_t t1_runs = 0;
|
||||
auto t1 = node->create_wall_timer(
|
||||
100s,
|
||||
[&]() {
|
||||
t1_runs++;
|
||||
});
|
||||
|
||||
EventsExecutor executor;
|
||||
executor.add_node(node);
|
||||
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
std::thread spinner([&executor, this]() {executor.spin();});
|
||||
|
||||
std::this_thread::sleep_for(10ms);
|
||||
executor.cancel();
|
||||
spinner.join();
|
||||
|
||||
EXPECT_EQ(0u, t1_runs);
|
||||
EXPECT_TRUE(std::chrono::steady_clock::now() - start < 1s);
|
||||
}
|
||||
|
||||
TEST_F(TestEventsExecutor, destroy_entities)
|
||||
{
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
// Create a publisher node and start publishing messages
|
||||
auto node_pub = std::make_shared<rclcpp::Node>("node_pub");
|
||||
auto publisher = node_pub->create_publisher<test_msgs::msg::Empty>("topic", rclcpp::QoS(10));
|
||||
auto timer = node_pub->create_wall_timer(
|
||||
2ms, [&]() {publisher->publish(std::make_unique<test_msgs::msg::Empty>());});
|
||||
EventsExecutor executor_pub;
|
||||
executor_pub.add_node(node_pub);
|
||||
std::thread spinner([&executor_pub, this]() {executor_pub.spin();});
|
||||
|
||||
// Create a node with two different subscriptions to the topic
|
||||
auto node_sub = std::make_shared<rclcpp::Node>("node_sub");
|
||||
size_t callback_count_1 = 0;
|
||||
auto subscription_1 =
|
||||
node_sub->create_subscription<test_msgs::msg::Empty>(
|
||||
"topic", rclcpp::QoS(10), [&](test_msgs::msg::Empty::ConstSharedPtr) {callback_count_1++;});
|
||||
size_t callback_count_2 = 0;
|
||||
auto subscription_2 =
|
||||
node_sub->create_subscription<test_msgs::msg::Empty>(
|
||||
"topic", rclcpp::QoS(10), [&](test_msgs::msg::Empty::ConstSharedPtr) {callback_count_2++;});
|
||||
EventsExecutor executor_sub;
|
||||
executor_sub.add_node(node_sub);
|
||||
|
||||
// Wait some time while messages are published
|
||||
std::this_thread::sleep_for(10ms);
|
||||
|
||||
// Destroy one of the two subscriptions
|
||||
subscription_1.reset();
|
||||
|
||||
// Let subscriptions executor spin
|
||||
executor_sub.spin_some(10ms);
|
||||
|
||||
// The callback count of the destroyed subscription remained at 0
|
||||
EXPECT_EQ(0u, callback_count_1);
|
||||
EXPECT_LT(0u, callback_count_2);
|
||||
|
||||
executor_pub.cancel();
|
||||
spinner.join();
|
||||
}
|
||||
|
||||
/*
|
||||
Testing construction of a subscriptions with QoS event callback functions.
|
||||
*/
|
||||
std::string * g_pub_log_msg;
|
||||
std::string * g_sub_log_msg;
|
||||
std::promise<void> * g_log_msgs_promise;
|
||||
TEST_F(TestEventsExecutor, test_default_incompatible_qos_callbacks)
|
||||
{
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
auto node = std::make_shared<rclcpp::Node>("node");
|
||||
rcutils_logging_output_handler_t original_output_handler = rcutils_logging_get_output_handler();
|
||||
|
||||
std::string pub_log_msg;
|
||||
std::string sub_log_msg;
|
||||
std::promise<void> log_msgs_promise;
|
||||
g_pub_log_msg = &pub_log_msg;
|
||||
g_sub_log_msg = &sub_log_msg;
|
||||
g_log_msgs_promise = &log_msgs_promise;
|
||||
auto logger_callback = [](
|
||||
const rcutils_log_location_t * /*location*/,
|
||||
int /*level*/, const char * /*name*/, rcutils_time_point_value_t /*timestamp*/,
|
||||
const char * format, va_list * args) -> void {
|
||||
char buffer[1024];
|
||||
vsnprintf(buffer, sizeof(buffer), format, *args);
|
||||
const std::string msg = buffer;
|
||||
if (msg.rfind("New subscription discovered on topic '/test_topic'", 0) == 0) {
|
||||
*g_pub_log_msg = buffer;
|
||||
} else if (msg.rfind("New publisher discovered on topic '/test_topic'", 0) == 0) {
|
||||
*g_sub_log_msg = buffer;
|
||||
}
|
||||
|
||||
if (!g_pub_log_msg->empty() && !g_sub_log_msg->empty()) {
|
||||
g_log_msgs_promise->set_value();
|
||||
}
|
||||
};
|
||||
rcutils_logging_set_output_handler(logger_callback);
|
||||
|
||||
std::shared_future<void> log_msgs_future = log_msgs_promise.get_future();
|
||||
|
||||
rclcpp::QoS qos_profile_publisher(10);
|
||||
qos_profile_publisher.durability(RMW_QOS_POLICY_DURABILITY_VOLATILE);
|
||||
auto publisher = node->create_publisher<test_msgs::msg::Empty>(
|
||||
"test_topic", qos_profile_publisher);
|
||||
|
||||
rclcpp::QoS qos_profile_subscription(10);
|
||||
qos_profile_subscription.durability(RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL);
|
||||
auto subscription = node->create_subscription<test_msgs::msg::Empty>(
|
||||
"test_topic", qos_profile_subscription, [&](test_msgs::msg::Empty::ConstSharedPtr) {});
|
||||
|
||||
EventsExecutor ex;
|
||||
ex.add_node(node->get_node_base_interface());
|
||||
|
||||
const auto timeout = std::chrono::seconds(10);
|
||||
ex.spin_until_future_complete(log_msgs_future, timeout);
|
||||
|
||||
EXPECT_EQ(
|
||||
"New subscription discovered on topic '/test_topic', requesting incompatible QoS. "
|
||||
"No messages will be sent to it. Last incompatible policy: DURABILITY_QOS_POLICY",
|
||||
pub_log_msg);
|
||||
EXPECT_EQ(
|
||||
"New publisher discovered on topic '/test_topic', offering incompatible QoS. "
|
||||
"No messages will be sent to it. Last incompatible policy: DURABILITY_QOS_POLICY",
|
||||
sub_log_msg);
|
||||
|
||||
rcutils_logging_set_output_handler(original_output_handler);
|
||||
}
|
||||
82
rclcpp/test/rclcpp/executors/test_events_queue.cpp
Normal file
82
rclcpp/test/rclcpp/executors/test_events_queue.cpp
Normal file
@@ -0,0 +1,82 @@
|
||||
// Copyright 2023 iRobot Corporation.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "rclcpp/experimental/executors/events_executor/events_executor_event_types.hpp"
|
||||
#include "rclcpp/experimental/executors/events_executor/simple_events_queue.hpp"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
TEST(TestEventsQueue, SimpleQueueTest)
|
||||
{
|
||||
// Create a SimpleEventsQueue and a local queue
|
||||
auto simple_queue = std::make_unique<rclcpp::experimental::executors::SimpleEventsQueue>();
|
||||
rclcpp::experimental::executors::ExecutorEvent event {};
|
||||
bool ret = false;
|
||||
|
||||
// Make sure the queue is empty at startup
|
||||
EXPECT_TRUE(simple_queue->empty());
|
||||
EXPECT_EQ(simple_queue->size(), 0u);
|
||||
|
||||
// Push 11 messages
|
||||
for (uint32_t i = 1; i < 11; i++) {
|
||||
rclcpp::experimental::executors::ExecutorEvent stub_event {};
|
||||
stub_event.num_events = 1;
|
||||
simple_queue->enqueue(stub_event);
|
||||
|
||||
EXPECT_FALSE(simple_queue->empty());
|
||||
EXPECT_EQ(simple_queue->size(), i);
|
||||
}
|
||||
|
||||
// Pop one message
|
||||
ret = simple_queue->dequeue(event);
|
||||
EXPECT_TRUE(ret);
|
||||
EXPECT_FALSE(simple_queue->empty());
|
||||
EXPECT_EQ(simple_queue->size(), 9u);
|
||||
|
||||
// Pop one message
|
||||
ret = simple_queue->dequeue(event, std::chrono::nanoseconds(0));
|
||||
EXPECT_TRUE(ret);
|
||||
EXPECT_FALSE(simple_queue->empty());
|
||||
EXPECT_EQ(simple_queue->size(), 8u);
|
||||
|
||||
while (!simple_queue->empty()) {
|
||||
ret = simple_queue->dequeue(event);
|
||||
EXPECT_TRUE(ret);
|
||||
}
|
||||
|
||||
EXPECT_TRUE(simple_queue->empty());
|
||||
EXPECT_EQ(simple_queue->size(), 0u);
|
||||
|
||||
ret = simple_queue->dequeue(event, std::chrono::nanoseconds(0));
|
||||
EXPECT_FALSE(ret);
|
||||
|
||||
// Lets push an event into the queue and get it back
|
||||
rclcpp::experimental::executors::ExecutorEvent push_event = {
|
||||
simple_queue.get(),
|
||||
99,
|
||||
rclcpp::experimental::executors::ExecutorEventType::SUBSCRIPTION_EVENT,
|
||||
1};
|
||||
|
||||
simple_queue->enqueue(push_event);
|
||||
ret = simple_queue->dequeue(event);
|
||||
EXPECT_TRUE(ret);
|
||||
EXPECT_EQ(push_event.exec_entity_id, event.exec_entity_id);
|
||||
EXPECT_EQ(push_event.gen_entity_id, event.gen_entity_id);
|
||||
EXPECT_EQ(push_event.type, event.type);
|
||||
EXPECT_EQ(push_event.num_events, event.num_events);
|
||||
}
|
||||
@@ -92,7 +92,8 @@ using ExecutorTypes =
|
||||
::testing::Types<
|
||||
rclcpp::executors::SingleThreadedExecutor,
|
||||
rclcpp::executors::MultiThreadedExecutor,
|
||||
rclcpp::executors::StaticSingleThreadedExecutor>;
|
||||
rclcpp::executors::StaticSingleThreadedExecutor,
|
||||
rclcpp::experimental::executors::EventsExecutor>;
|
||||
|
||||
class ExecutorTypeNames
|
||||
{
|
||||
@@ -113,6 +114,10 @@ public:
|
||||
return "StaticSingleThreadedExecutor";
|
||||
}
|
||||
|
||||
if (std::is_same<T, rclcpp::experimental::executors::EventsExecutor>()) {
|
||||
return "EventsExecutor";
|
||||
}
|
||||
|
||||
return "";
|
||||
}
|
||||
};
|
||||
@@ -126,12 +131,22 @@ TYPED_TEST_SUITE(TestExecutors, ExecutorTypes, ExecutorTypeNames);
|
||||
using StandardExecutors =
|
||||
::testing::Types<
|
||||
rclcpp::executors::SingleThreadedExecutor,
|
||||
rclcpp::executors::MultiThreadedExecutor>;
|
||||
rclcpp::executors::MultiThreadedExecutor,
|
||||
rclcpp::experimental::executors::EventsExecutor>;
|
||||
TYPED_TEST_SUITE(TestExecutorsStable, StandardExecutors, ExecutorTypeNames);
|
||||
|
||||
// Make sure that executors detach from nodes when destructing
|
||||
TYPED_TEST(TestExecutors, detachOnDestruction) {
|
||||
TYPED_TEST(TestExecutors, detachOnDestruction)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
{
|
||||
ExecutorType executor;
|
||||
executor.add_node(this->node);
|
||||
@@ -145,8 +160,17 @@ TYPED_TEST(TestExecutors, detachOnDestruction) {
|
||||
// Make sure that the executor can automatically remove expired nodes correctly
|
||||
// Currently fails for StaticSingleThreadedExecutor so it is being skipped, see:
|
||||
// https://github.com/ros2/rclcpp/issues/1231
|
||||
TYPED_TEST(TestExecutorsStable, addTemporaryNode) {
|
||||
TYPED_TEST(TestExecutorsStable, addTemporaryNode)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
ExecutorType executor;
|
||||
|
||||
{
|
||||
@@ -164,8 +188,17 @@ TYPED_TEST(TestExecutorsStable, addTemporaryNode) {
|
||||
}
|
||||
|
||||
// Check executor throws properly if the same node is added a second time
|
||||
TYPED_TEST(TestExecutors, addNodeTwoExecutors) {
|
||||
TYPED_TEST(TestExecutors, addNodeTwoExecutors)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
ExecutorType executor1;
|
||||
ExecutorType executor2;
|
||||
EXPECT_NO_THROW(executor1.add_node(this->node));
|
||||
@@ -174,8 +207,17 @@ TYPED_TEST(TestExecutors, addNodeTwoExecutors) {
|
||||
}
|
||||
|
||||
// Check simple spin example
|
||||
TYPED_TEST(TestExecutors, spinWithTimer) {
|
||||
TYPED_TEST(TestExecutors, spinWithTimer)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
ExecutorType executor;
|
||||
|
||||
bool timer_completed = false;
|
||||
@@ -196,8 +238,17 @@ TYPED_TEST(TestExecutors, spinWithTimer) {
|
||||
executor.remove_node(this->node, true);
|
||||
}
|
||||
|
||||
TYPED_TEST(TestExecutors, spinWhileAlreadySpinning) {
|
||||
TYPED_TEST(TestExecutors, spinWhileAlreadySpinning)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
ExecutorType executor;
|
||||
executor.add_node(this->node);
|
||||
|
||||
@@ -222,8 +273,17 @@ TYPED_TEST(TestExecutors, spinWhileAlreadySpinning) {
|
||||
}
|
||||
|
||||
// Check executor exits immediately if future is complete.
|
||||
TYPED_TEST(TestExecutors, testSpinUntilFutureComplete) {
|
||||
TYPED_TEST(TestExecutors, testSpinUntilFutureComplete)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
ExecutorType executor;
|
||||
executor.add_node(this->node);
|
||||
|
||||
@@ -244,8 +304,17 @@ TYPED_TEST(TestExecutors, testSpinUntilFutureComplete) {
|
||||
}
|
||||
|
||||
// Same test, but uses a shared future.
|
||||
TYPED_TEST(TestExecutors, testSpinUntilSharedFutureComplete) {
|
||||
TYPED_TEST(TestExecutors, testSpinUntilSharedFutureComplete)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
ExecutorType executor;
|
||||
executor.add_node(this->node);
|
||||
|
||||
@@ -267,8 +336,17 @@ TYPED_TEST(TestExecutors, testSpinUntilSharedFutureComplete) {
|
||||
}
|
||||
|
||||
// For a longer running future that should require several iterations of spin_once
|
||||
TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteNoTimeout) {
|
||||
TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteNoTimeout)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
ExecutorType executor;
|
||||
executor.add_node(this->node);
|
||||
|
||||
@@ -313,8 +391,17 @@ TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteNoTimeout) {
|
||||
}
|
||||
|
||||
// Check spin_until_future_complete timeout works as expected
|
||||
TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteWithTimeout) {
|
||||
TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteWithTimeout)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
ExecutorType executor;
|
||||
executor.add_node(this->node);
|
||||
|
||||
@@ -380,6 +467,13 @@ public:
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
std::shared_ptr<void>
|
||||
take_data_by_entity_id(size_t id) override
|
||||
{
|
||||
(void) id;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void
|
||||
execute(std::shared_ptr<void> & data) override
|
||||
{
|
||||
@@ -388,6 +482,21 @@ public:
|
||||
std::this_thread::sleep_for(3ms);
|
||||
}
|
||||
|
||||
void
|
||||
set_on_ready_callback(std::function<void(size_t, int)> callback) override
|
||||
{
|
||||
auto gc_callback = [callback](size_t count) {
|
||||
callback(count, 0);
|
||||
};
|
||||
gc_.set_on_trigger_callback(gc_callback);
|
||||
}
|
||||
|
||||
void
|
||||
clear_on_ready_callback() override
|
||||
{
|
||||
gc_.set_on_trigger_callback(nullptr);
|
||||
}
|
||||
|
||||
size_t
|
||||
get_number_of_ready_guard_conditions() override {return 1;}
|
||||
|
||||
@@ -402,8 +511,17 @@ private:
|
||||
rclcpp::GuardCondition gc_;
|
||||
};
|
||||
|
||||
TYPED_TEST(TestExecutors, spinAll) {
|
||||
TYPED_TEST(TestExecutors, spinAll)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
ExecutorType executor;
|
||||
auto waitable_interfaces = this->node->get_node_waitables_interface();
|
||||
auto my_waitable = std::make_shared<TestWaitable>();
|
||||
@@ -443,8 +561,17 @@ TYPED_TEST(TestExecutors, spinAll) {
|
||||
spinner.join();
|
||||
}
|
||||
|
||||
TYPED_TEST(TestExecutors, spinSome) {
|
||||
TYPED_TEST(TestExecutors, spinSome)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
ExecutorType executor;
|
||||
auto waitable_interfaces = this->node->get_node_waitables_interface();
|
||||
auto my_waitable = std::make_shared<TestWaitable>();
|
||||
@@ -483,8 +610,17 @@ TYPED_TEST(TestExecutors, spinSome) {
|
||||
}
|
||||
|
||||
// Check spin_node_until_future_complete with node base pointer
|
||||
TYPED_TEST(TestExecutors, testSpinNodeUntilFutureCompleteNodeBasePtr) {
|
||||
TYPED_TEST(TestExecutors, testSpinNodeUntilFutureCompleteNodeBasePtr)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
ExecutorType executor;
|
||||
|
||||
std::promise<bool> promise;
|
||||
@@ -498,8 +634,17 @@ TYPED_TEST(TestExecutors, testSpinNodeUntilFutureCompleteNodeBasePtr) {
|
||||
}
|
||||
|
||||
// Check spin_node_until_future_complete with node pointer
|
||||
TYPED_TEST(TestExecutors, testSpinNodeUntilFutureCompleteNodePtr) {
|
||||
TYPED_TEST(TestExecutors, testSpinNodeUntilFutureCompleteNodePtr)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
ExecutorType executor;
|
||||
|
||||
std::promise<bool> promise;
|
||||
@@ -513,8 +658,17 @@ TYPED_TEST(TestExecutors, testSpinNodeUntilFutureCompleteNodePtr) {
|
||||
}
|
||||
|
||||
// Check spin_until_future_complete can be properly interrupted.
|
||||
TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteInterrupted) {
|
||||
TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteInterrupted)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
ExecutorType executor;
|
||||
executor.add_node(this->node);
|
||||
|
||||
@@ -556,7 +710,8 @@ TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteInterrupted) {
|
||||
}
|
||||
|
||||
// Check spin_until_future_complete with node base pointer (instantiates its own executor)
|
||||
TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr) {
|
||||
TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr)
|
||||
{
|
||||
rclcpp::init(0, nullptr);
|
||||
|
||||
{
|
||||
@@ -576,7 +731,8 @@ TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr) {
|
||||
}
|
||||
|
||||
// Check spin_until_future_complete with node pointer (instantiates its own executor)
|
||||
TEST(TestExecutors, testSpinUntilFutureCompleteNodePtr) {
|
||||
TEST(TestExecutors, testSpinUntilFutureCompleteNodePtr)
|
||||
{
|
||||
rclcpp::init(0, nullptr);
|
||||
|
||||
{
|
||||
|
||||
@@ -46,11 +46,15 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
class TestAddCallbackGroupsToExecutorStable : public TestAddCallbackGroupsToExecutor<T> {};
|
||||
|
||||
using ExecutorTypes =
|
||||
::testing::Types<
|
||||
rclcpp::executors::SingleThreadedExecutor,
|
||||
rclcpp::executors::MultiThreadedExecutor,
|
||||
rclcpp::executors::StaticSingleThreadedExecutor>;
|
||||
rclcpp::executors::StaticSingleThreadedExecutor,
|
||||
rclcpp::experimental::executors::EventsExecutor>;
|
||||
|
||||
class ExecutorTypeNames
|
||||
{
|
||||
@@ -71,17 +75,38 @@ public:
|
||||
return "StaticSingleThreadedExecutor";
|
||||
}
|
||||
|
||||
if (std::is_same<T, rclcpp::experimental::executors::EventsExecutor>()) {
|
||||
return "EventsExecutor";
|
||||
}
|
||||
|
||||
return "";
|
||||
}
|
||||
};
|
||||
|
||||
TYPED_TEST_SUITE(TestAddCallbackGroupsToExecutor, ExecutorTypes, ExecutorTypeNames);
|
||||
|
||||
// StaticSingleThreadedExecutor is not included in these tests for now
|
||||
using StandardExecutors =
|
||||
::testing::Types<
|
||||
rclcpp::executors::SingleThreadedExecutor,
|
||||
rclcpp::executors::MultiThreadedExecutor,
|
||||
rclcpp::experimental::executors::EventsExecutor>;
|
||||
TYPED_TEST_SUITE(TestAddCallbackGroupsToExecutorStable, StandardExecutors, ExecutorTypeNames);
|
||||
|
||||
/*
|
||||
* Test adding callback groups.
|
||||
*/
|
||||
TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups) {
|
||||
TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");
|
||||
auto timer_callback = []() {};
|
||||
rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group(
|
||||
@@ -127,8 +152,17 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups) {
|
||||
/*
|
||||
* Test removing callback groups.
|
||||
*/
|
||||
TYPED_TEST(TestAddCallbackGroupsToExecutor, remove_callback_groups) {
|
||||
TYPED_TEST(TestAddCallbackGroupsToExecutor, remove_callback_groups)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");
|
||||
auto timer_callback = []() {};
|
||||
rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group(
|
||||
@@ -158,7 +192,16 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, remove_callback_groups) {
|
||||
*/
|
||||
TYPED_TEST(TestAddCallbackGroupsToExecutor, add_duplicate_callback_groups)
|
||||
{
|
||||
rclcpp::executors::MultiThreadedExecutor executor;
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
ExecutorType executor;
|
||||
auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");
|
||||
auto timer_callback = []() {};
|
||||
rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group(
|
||||
@@ -176,7 +219,16 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_duplicate_callback_groups)
|
||||
*/
|
||||
TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups_after_add_node_to_executor)
|
||||
{
|
||||
rclcpp::executors::MultiThreadedExecutor executor;
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
ExecutorType executor;
|
||||
auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");
|
||||
executor.add_node(node->get_node_base_interface());
|
||||
ASSERT_EQ(executor.get_all_callback_groups().size(), 1u);
|
||||
@@ -210,13 +262,22 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups_after_add_node_t
|
||||
*/
|
||||
TYPED_TEST(TestAddCallbackGroupsToExecutor, add_unallowable_callback_groups)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
ExecutorType executor;
|
||||
auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");
|
||||
auto timer_callback = []() {};
|
||||
rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group(
|
||||
rclcpp::CallbackGroupType::MutuallyExclusive, false);
|
||||
rclcpp::TimerBase::SharedPtr timer_ = node->create_wall_timer(
|
||||
2s, timer_callback, cb_grp);
|
||||
rclcpp::executors::MultiThreadedExecutor executor;
|
||||
executor.add_callback_group(cb_grp, node->get_node_base_interface());
|
||||
ASSERT_EQ(executor.get_all_callback_groups().size(), 1u);
|
||||
|
||||
@@ -245,14 +306,23 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_unallowable_callback_groups)
|
||||
*/
|
||||
TYPED_TEST(TestAddCallbackGroupsToExecutor, one_node_many_callback_groups_many_executors)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
ExecutorType timer_executor;
|
||||
ExecutorType sub_executor;
|
||||
auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");
|
||||
auto timer_callback = []() {};
|
||||
rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group(
|
||||
rclcpp::CallbackGroupType::MutuallyExclusive, false);
|
||||
rclcpp::TimerBase::SharedPtr timer_ = node->create_wall_timer(
|
||||
2s, timer_callback, cb_grp);
|
||||
rclcpp::executors::MultiThreadedExecutor timer_executor;
|
||||
rclcpp::executors::MultiThreadedExecutor sub_executor;
|
||||
timer_executor.add_callback_group(cb_grp, node->get_node_base_interface());
|
||||
const rclcpp::QoS qos(10);
|
||||
auto options = rclcpp::SubscriptionOptions();
|
||||
@@ -282,14 +352,23 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, one_node_many_callback_groups_many_e
|
||||
* because the executor can't be triggered while a subscriber created, see
|
||||
* https://github.com/ros2/rclcpp/issues/1611
|
||||
*/
|
||||
TYPED_TEST(TestAddCallbackGroupsToExecutor, subscriber_triggered_to_receive_message)
|
||||
TYPED_TEST(TestAddCallbackGroupsToExecutorStable, subscriber_triggered_to_receive_message)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");
|
||||
|
||||
// create a thread running an executor with a new callback group for a coming subscriber
|
||||
rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group(
|
||||
rclcpp::CallbackGroupType::MutuallyExclusive, false);
|
||||
rclcpp::executors::SingleThreadedExecutor cb_grp_executor;
|
||||
ExecutorType cb_grp_executor;
|
||||
|
||||
std::promise<bool> received_message_promise;
|
||||
auto received_message_future = received_message_promise.get_future();
|
||||
@@ -329,7 +408,7 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, subscriber_triggered_to_receive_mess
|
||||
timer_promise.set_value();
|
||||
};
|
||||
|
||||
rclcpp::executors::SingleThreadedExecutor timer_executor;
|
||||
ExecutorType timer_executor;
|
||||
timer = node->create_wall_timer(100ms, timer_callback);
|
||||
timer_executor.add_node(node);
|
||||
auto future = timer_promise.get_future();
|
||||
@@ -346,8 +425,17 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, subscriber_triggered_to_receive_mess
|
||||
* because the executor can't be triggered while a subscriber created, see
|
||||
* https://github.com/ros2/rclcpp/issues/2067
|
||||
*/
|
||||
TYPED_TEST(TestAddCallbackGroupsToExecutor, callback_group_create_after_spin)
|
||||
TYPED_TEST(TestAddCallbackGroupsToExecutorStable, callback_group_create_after_spin)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");
|
||||
|
||||
// create a publisher to send data
|
||||
@@ -357,7 +445,7 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, callback_group_create_after_spin)
|
||||
publisher->publish(test_msgs::msg::Empty());
|
||||
|
||||
// create a thread running an executor
|
||||
rclcpp::executors::SingleThreadedExecutor executor;
|
||||
ExecutorType executor;
|
||||
executor.add_node(node);
|
||||
std::promise<bool> received_message_promise;
|
||||
auto received_message_future = received_message_promise.get_future();
|
||||
@@ -392,7 +480,16 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, callback_group_create_after_spin)
|
||||
*/
|
||||
TYPED_TEST(TestAddCallbackGroupsToExecutor, remove_callback_group)
|
||||
{
|
||||
rclcpp::executors::MultiThreadedExecutor executor;
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
ExecutorType executor;
|
||||
auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");
|
||||
auto timer_callback = []() {};
|
||||
rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group(
|
||||
|
||||
426
rclcpp/test/rclcpp/test_timers_manager.cpp
Normal file
426
rclcpp/test/rclcpp/test_timers_manager.cpp
Normal file
@@ -0,0 +1,426 @@
|
||||
// Copyright 2023 iRobot Corporation.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
|
||||
#include "rclcpp/contexts/default_context.hpp"
|
||||
#include "rclcpp/experimental/timers_manager.hpp"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
using rclcpp::experimental::TimersManager;
|
||||
|
||||
using CallbackT = std::function<void ()>;
|
||||
using TimerT = rclcpp::WallTimer<CallbackT>;
|
||||
|
||||
class TestTimersManager : public ::testing::Test
|
||||
{
|
||||
public:
|
||||
void SetUp()
|
||||
{
|
||||
rclcpp::init(0, nullptr);
|
||||
}
|
||||
|
||||
void TearDown()
|
||||
{
|
||||
rclcpp::shutdown();
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(TestTimersManager, empty_manager)
|
||||
{
|
||||
auto timers_manager = std::make_shared<TimersManager>(
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
EXPECT_EQ(std::chrono::nanoseconds::max(), timers_manager->get_head_timeout());
|
||||
EXPECT_FALSE(timers_manager->execute_head_timer());
|
||||
EXPECT_NO_THROW(timers_manager->execute_ready_timers());
|
||||
EXPECT_NO_THROW(timers_manager->clear());
|
||||
EXPECT_NO_THROW(timers_manager->start());
|
||||
EXPECT_NO_THROW(timers_manager->stop());
|
||||
}
|
||||
|
||||
TEST_F(TestTimersManager, add_run_remove_timer)
|
||||
{
|
||||
size_t t_runs = 0;
|
||||
auto t = TimerT::make_shared(
|
||||
1ms,
|
||||
[&t_runs]() {
|
||||
t_runs++;
|
||||
},
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
std::weak_ptr<TimerT> t_weak = t;
|
||||
|
||||
// Add the timer to the timers manager
|
||||
auto timers_manager = std::make_shared<TimersManager>(
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
timers_manager->add_timer(t);
|
||||
|
||||
// Sleep for more 3 times the timer period
|
||||
std::this_thread::sleep_for(3ms);
|
||||
|
||||
// The timer is executed only once, even if we slept 3 times the period
|
||||
timers_manager->execute_ready_timers();
|
||||
EXPECT_EQ(1u, t_runs);
|
||||
|
||||
// Remove the timer from the manager
|
||||
timers_manager->remove_timer(t);
|
||||
|
||||
t.reset();
|
||||
// The timer is now not valid anymore
|
||||
EXPECT_FALSE(t_weak.lock() != nullptr);
|
||||
}
|
||||
|
||||
TEST_F(TestTimersManager, clear)
|
||||
{
|
||||
auto timers_manager = std::make_shared<TimersManager>(
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
auto t1 = TimerT::make_shared(1ms, CallbackT(), rclcpp::contexts::get_global_default_context());
|
||||
std::weak_ptr<TimerT> t1_weak = t1;
|
||||
auto t2 = TimerT::make_shared(1ms, CallbackT(), rclcpp::contexts::get_global_default_context());
|
||||
std::weak_ptr<TimerT> t2_weak = t2;
|
||||
|
||||
timers_manager->add_timer(t1);
|
||||
timers_manager->add_timer(t2);
|
||||
|
||||
EXPECT_TRUE(t1_weak.lock() != nullptr);
|
||||
EXPECT_TRUE(t2_weak.lock() != nullptr);
|
||||
|
||||
timers_manager->clear();
|
||||
|
||||
t1.reset();
|
||||
t2.reset();
|
||||
|
||||
EXPECT_FALSE(t1_weak.lock() != nullptr);
|
||||
EXPECT_FALSE(t2_weak.lock() != nullptr);
|
||||
}
|
||||
|
||||
TEST_F(TestTimersManager, remove_not_existing_timer)
|
||||
{
|
||||
auto timers_manager = std::make_shared<TimersManager>(
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
// Try to remove a nullptr timer
|
||||
EXPECT_NO_THROW(timers_manager->remove_timer(nullptr));
|
||||
|
||||
auto t = TimerT::make_shared(1ms, CallbackT(), rclcpp::contexts::get_global_default_context());
|
||||
timers_manager->add_timer(t);
|
||||
|
||||
// Remove twice the same timer
|
||||
timers_manager->remove_timer(t);
|
||||
EXPECT_NO_THROW(timers_manager->remove_timer(t));
|
||||
}
|
||||
|
||||
TEST_F(TestTimersManager, timers_thread_exclusive_usage)
|
||||
{
|
||||
auto timers_manager = std::make_shared<TimersManager>(
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
timers_manager->start();
|
||||
|
||||
EXPECT_THROW(timers_manager->start(), std::exception);
|
||||
EXPECT_THROW(timers_manager->get_head_timeout(), std::exception);
|
||||
EXPECT_THROW(timers_manager->execute_ready_timers(), std::exception);
|
||||
EXPECT_THROW(timers_manager->execute_head_timer(), std::exception);
|
||||
|
||||
timers_manager->stop();
|
||||
|
||||
EXPECT_NO_THROW(timers_manager->get_head_timeout());
|
||||
EXPECT_NO_THROW(timers_manager->execute_ready_timers());
|
||||
EXPECT_NO_THROW(timers_manager->execute_head_timer());
|
||||
}
|
||||
|
||||
TEST_F(TestTimersManager, add_timer_twice)
|
||||
{
|
||||
auto timers_manager = std::make_shared<TimersManager>(
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
auto t = TimerT::make_shared(1ms, CallbackT(), rclcpp::contexts::get_global_default_context());
|
||||
|
||||
timers_manager->add_timer(t);
|
||||
EXPECT_NO_THROW(timers_manager->add_timer(t));
|
||||
}
|
||||
|
||||
TEST_F(TestTimersManager, add_nullptr)
|
||||
{
|
||||
auto timers_manager = std::make_shared<TimersManager>(
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
EXPECT_THROW(timers_manager->add_timer(nullptr), std::exception);
|
||||
}
|
||||
|
||||
TEST_F(TestTimersManager, head_not_ready)
|
||||
{
|
||||
auto timers_manager = std::make_shared<TimersManager>(
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
size_t t_runs = 0;
|
||||
auto t = TimerT::make_shared(
|
||||
10s,
|
||||
[&t_runs]() {
|
||||
t_runs++;
|
||||
},
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
timers_manager->add_timer(t);
|
||||
|
||||
// Timer will take 10s to get ready, so nothing to execute here
|
||||
bool ret = timers_manager->execute_head_timer();
|
||||
EXPECT_FALSE(ret);
|
||||
EXPECT_EQ(0u, t_runs);
|
||||
}
|
||||
|
||||
TEST_F(TestTimersManager, timers_order)
|
||||
{
|
||||
auto timers_manager = std::make_shared<TimersManager>(
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
size_t t1_runs = 0;
|
||||
auto t1 = TimerT::make_shared(
|
||||
10ms,
|
||||
[&t1_runs]() {
|
||||
t1_runs++;
|
||||
},
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
size_t t2_runs = 0;
|
||||
auto t2 = TimerT::make_shared(
|
||||
30ms,
|
||||
[&t2_runs]() {
|
||||
t2_runs++;
|
||||
},
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
size_t t3_runs = 0;
|
||||
auto t3 = TimerT::make_shared(
|
||||
100ms,
|
||||
[&t3_runs]() {
|
||||
t3_runs++;
|
||||
},
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
// Add timers in a random order
|
||||
timers_manager->add_timer(t2);
|
||||
timers_manager->add_timer(t3);
|
||||
timers_manager->add_timer(t1);
|
||||
|
||||
std::this_thread::sleep_for(10ms);
|
||||
timers_manager->execute_ready_timers();
|
||||
EXPECT_EQ(1u, t1_runs);
|
||||
EXPECT_EQ(0u, t2_runs);
|
||||
EXPECT_EQ(0u, t3_runs);
|
||||
|
||||
std::this_thread::sleep_for(30ms);
|
||||
timers_manager->execute_ready_timers();
|
||||
EXPECT_EQ(2u, t1_runs);
|
||||
EXPECT_EQ(1u, t2_runs);
|
||||
EXPECT_EQ(0u, t3_runs);
|
||||
|
||||
std::this_thread::sleep_for(100ms);
|
||||
timers_manager->execute_ready_timers();
|
||||
EXPECT_EQ(3u, t1_runs);
|
||||
EXPECT_EQ(2u, t2_runs);
|
||||
EXPECT_EQ(1u, t3_runs);
|
||||
|
||||
timers_manager->remove_timer(t1);
|
||||
|
||||
std::this_thread::sleep_for(30ms);
|
||||
timers_manager->execute_ready_timers();
|
||||
EXPECT_EQ(3u, t1_runs);
|
||||
EXPECT_EQ(3u, t2_runs);
|
||||
EXPECT_EQ(1u, t3_runs);
|
||||
}
|
||||
|
||||
TEST_F(TestTimersManager, start_stop_timers_thread)
|
||||
{
|
||||
auto timers_manager = std::make_shared<TimersManager>(
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
auto t = TimerT::make_shared(1ms, []() {}, rclcpp::contexts::get_global_default_context());
|
||||
timers_manager->add_timer(t);
|
||||
|
||||
// Calling start multiple times will throw an error
|
||||
EXPECT_NO_THROW(timers_manager->start());
|
||||
EXPECT_THROW(timers_manager->start(), std::exception);
|
||||
|
||||
// Calling stop multiple times does not throw an error
|
||||
EXPECT_NO_THROW(timers_manager->stop());
|
||||
EXPECT_NO_THROW(timers_manager->stop());
|
||||
}
|
||||
|
||||
TEST_F(TestTimersManager, timers_thread)
|
||||
{
|
||||
auto timers_manager = std::make_shared<TimersManager>(
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
size_t t1_runs = 0;
|
||||
auto t1 = TimerT::make_shared(
|
||||
1ms,
|
||||
[&t1_runs]() {
|
||||
t1_runs++;
|
||||
},
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
size_t t2_runs = 0;
|
||||
auto t2 = TimerT::make_shared(
|
||||
1ms,
|
||||
[&t2_runs]() {
|
||||
t2_runs++;
|
||||
},
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
// Add timers
|
||||
timers_manager->add_timer(t1);
|
||||
timers_manager->add_timer(t2);
|
||||
|
||||
// Run timers thread for a while
|
||||
timers_manager->start();
|
||||
std::this_thread::sleep_for(5ms);
|
||||
timers_manager->stop();
|
||||
|
||||
EXPECT_LT(1u, t1_runs);
|
||||
EXPECT_LT(1u, t2_runs);
|
||||
EXPECT_EQ(t1_runs, t2_runs);
|
||||
}
|
||||
|
||||
TEST_F(TestTimersManager, destructor)
|
||||
{
|
||||
size_t t_runs = 0;
|
||||
auto t = TimerT::make_shared(
|
||||
1ms,
|
||||
[&t_runs]() {
|
||||
t_runs++;
|
||||
},
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
std::weak_ptr<TimerT> t_weak = t;
|
||||
|
||||
// When the timers manager is destroyed, it will stop the thread
|
||||
// and clear the timers
|
||||
{
|
||||
auto timers_manager = std::make_shared<TimersManager>(
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
timers_manager->add_timer(t);
|
||||
|
||||
timers_manager->start();
|
||||
std::this_thread::sleep_for(100ms);
|
||||
|
||||
EXPECT_LT(1u, t_runs);
|
||||
}
|
||||
|
||||
// The thread is not running anymore, so this value does not increase
|
||||
size_t runs = t_runs;
|
||||
std::this_thread::sleep_for(100ms);
|
||||
EXPECT_EQ(runs, t_runs);
|
||||
t.reset();
|
||||
EXPECT_FALSE(t_weak.lock() != nullptr);
|
||||
}
|
||||
|
||||
TEST_F(TestTimersManager, add_remove_while_thread_running)
|
||||
{
|
||||
auto timers_manager = std::make_shared<TimersManager>(
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
size_t t1_runs = 0;
|
||||
auto t1 = TimerT::make_shared(
|
||||
1ms,
|
||||
[&t1_runs]() {
|
||||
t1_runs++;
|
||||
},
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
size_t t2_runs = 0;
|
||||
auto t2 = TimerT::make_shared(
|
||||
1ms,
|
||||
[&t2_runs]() {
|
||||
t2_runs++;
|
||||
},
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
// Add timers
|
||||
timers_manager->add_timer(t1);
|
||||
|
||||
// Start timers thread
|
||||
timers_manager->start();
|
||||
|
||||
// After a while remove t1 and add t2
|
||||
std::this_thread::sleep_for(5ms);
|
||||
timers_manager->remove_timer(t1);
|
||||
size_t tmp_t1 = t1_runs;
|
||||
timers_manager->add_timer(t2);
|
||||
|
||||
// Wait some more time and then stop
|
||||
std::this_thread::sleep_for(5ms);
|
||||
timers_manager->stop();
|
||||
|
||||
// t1 has stopped running
|
||||
EXPECT_EQ(tmp_t1, t1_runs);
|
||||
// t2 is correctly running
|
||||
EXPECT_LT(1u, t2_runs);
|
||||
}
|
||||
|
||||
TEST_F(TestTimersManager, infinite_loop)
|
||||
{
|
||||
// This test makes sure that even if timers have a period shorter than the duration
|
||||
// of their callback the functions never block indefinitely.
|
||||
|
||||
auto timers_manager = std::make_shared<TimersManager>(
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
size_t t1_runs = 0;
|
||||
auto t1 = TimerT::make_shared(
|
||||
1ms,
|
||||
[&t1_runs]() {
|
||||
t1_runs++;
|
||||
std::this_thread::sleep_for(5ms);
|
||||
},
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
size_t t2_runs = 0;
|
||||
auto t2 = TimerT::make_shared(
|
||||
1ms,
|
||||
[&t2_runs]() {
|
||||
t2_runs++;
|
||||
std::this_thread::sleep_for(5ms);
|
||||
},
|
||||
rclcpp::contexts::get_global_default_context());
|
||||
|
||||
timers_manager->add_timer(t1);
|
||||
timers_manager->add_timer(t2);
|
||||
|
||||
// Sleep for enough time to trigger timers
|
||||
std::this_thread::sleep_for(3ms);
|
||||
timers_manager->execute_ready_timers();
|
||||
EXPECT_EQ(1u, t1_runs);
|
||||
EXPECT_EQ(1u, t2_runs);
|
||||
|
||||
// Due to the long execution of timer callbacks, timers are already ready
|
||||
bool ret = timers_manager->execute_head_timer();
|
||||
EXPECT_TRUE(ret);
|
||||
EXPECT_EQ(3u, t1_runs + t2_runs);
|
||||
|
||||
// Start a timers thread
|
||||
timers_manager->start();
|
||||
std::this_thread::sleep_for(10ms);
|
||||
timers_manager->stop();
|
||||
|
||||
EXPECT_LT(3u, t1_runs + t2_runs);
|
||||
EXPECT_LT(1u, t1_runs);
|
||||
EXPECT_LT(1u, t2_runs);
|
||||
}
|
||||
Reference in New Issue
Block a user