Compare commits
14 Commits
runtime_in
...
peterpena/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5408101dc1 | ||
|
|
86daeb5ba5 | ||
|
|
b5740d566b | ||
|
|
22615ed50c | ||
|
|
40de3cdd2d | ||
|
|
69dcfd0d57 | ||
|
|
1fb22363b4 | ||
|
|
6c4cbeb0ec | ||
|
|
03cb422085 | ||
|
|
c10c900b36 | ||
|
|
05ad418b3b | ||
|
|
e6e7da1203 | ||
|
|
bd1ef80605 | ||
|
|
822c9ab491 |
@@ -94,6 +94,28 @@ public:
|
||||
return _find_ptrs_if_impl<rclcpp::Waitable, Function>(func, waitable_ptrs_);
|
||||
}
|
||||
|
||||
/// Return the total number of entities in the callback group.
|
||||
/**
|
||||
* The size of the callback group includes subscriptions, timers,
|
||||
* clients, services, and waitables.
|
||||
*
|
||||
* \return size of the callback group
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
size_t
|
||||
size() const
|
||||
{
|
||||
return subscription_ptrs_.size() + timer_ptrs_.size() +
|
||||
client_ptrs_.size() + service_ptrs_.size() + waitable_ptrs_.size();
|
||||
}
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
set_executor_function(std::function<void()> call_executor_function)
|
||||
{
|
||||
call_executor_function_ = call_executor_function;
|
||||
}
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
std::atomic_bool &
|
||||
can_be_taken_from();
|
||||
@@ -142,6 +164,7 @@ protected:
|
||||
std::vector<rclcpp::ClientBase::WeakPtr> client_ptrs_;
|
||||
std::vector<rclcpp::Waitable::WeakPtr> waitable_ptrs_;
|
||||
std::atomic_bool can_be_taken_from_;
|
||||
std::function<void()> call_executor_function_;
|
||||
|
||||
private:
|
||||
template<typename TypeT, typename Function>
|
||||
|
||||
@@ -280,6 +280,20 @@ public:
|
||||
void
|
||||
set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy);
|
||||
|
||||
/// Returns true if guard condition should be triggered after executing executable.
|
||||
/**
|
||||
* After a thread executes an executable, the thread triggers the guard condition
|
||||
* to wake any thread sleeping so that it can check for pending executables.
|
||||
* If this is not done, a thread can find itself sleeping indefinitely. This will
|
||||
* only be true when there is at least one non-empty mutually exclusive callback group.
|
||||
*
|
||||
* \return wake_after_execute_ flag, returns true if the guard condition should be
|
||||
* triggered after a thread executes an executable.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
get_wake_after_execute_flag() {return wake_after_execute_.load();}
|
||||
|
||||
protected:
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
@@ -301,6 +315,22 @@ protected:
|
||||
void
|
||||
execute_any_executable(AnyExecutable & any_exec);
|
||||
|
||||
/// Wake after executing when the executor is multi-threaded and has
|
||||
/// at least one non-empty mutually exclusive group
|
||||
/**
|
||||
* After executing an executable, this function determines
|
||||
* if it should wake the wait in rcl_wait so that the executor
|
||||
* can process any pending executable. This only has to be done
|
||||
* when the executor is multi-threaded and has at least one
|
||||
* non-empty mutually exclusive callback group.
|
||||
*
|
||||
* \return true if there is a non-empty mutually exclusive
|
||||
* callback group in a multithreaded executor and false otherwise
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
virtual bool
|
||||
determine_wake_after_execute() {return false;}
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
static void
|
||||
execute_subscription(
|
||||
@@ -346,6 +376,12 @@ protected:
|
||||
/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
|
||||
std::atomic_bool spinning;
|
||||
|
||||
/// boolean to control whether guard condition is triggered after executing
|
||||
std::atomic_bool wake_after_execute_;
|
||||
|
||||
// a callback group was added in the node or an item was removed or added to the callbacks.
|
||||
std::atomic_bool exec_added_or_removed_;
|
||||
|
||||
/// Guard condition for signaling the rmw layer to wake up for special events.
|
||||
rcl_guard_condition_t interrupt_guard_condition_ = rcl_get_zero_initialized_guard_condition();
|
||||
|
||||
|
||||
@@ -78,6 +78,22 @@ protected:
|
||||
void
|
||||
run(size_t this_thread_number);
|
||||
|
||||
/// Wake after executing when the executor is multi-threaded and has
|
||||
/// at least one non-empty mutually exclusive group
|
||||
/**
|
||||
* After executing an executable, this function determines
|
||||
* if it should wake the wait in rcl_wait so that the executor
|
||||
* can process any pending executable. This only has to be done
|
||||
* when the executor is multi-threaded and has at least one
|
||||
* non-empty mutually exclusive callback group.
|
||||
*
|
||||
* \return true if there is a non-empty mutually exclusive
|
||||
* callback group in a multithreaded executor and false otherwise
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
determine_wake_after_execute();
|
||||
|
||||
private:
|
||||
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)
|
||||
|
||||
|
||||
@@ -116,6 +116,13 @@ public:
|
||||
bool
|
||||
get_enable_topic_statistics_default() const override;
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
set_executor_function(std::function<void()> call_executor_function) override
|
||||
{
|
||||
call_executor_function_ = call_executor_function;
|
||||
}
|
||||
|
||||
private:
|
||||
RCLCPP_DISABLE_COPY(NodeBase)
|
||||
|
||||
@@ -129,6 +136,7 @@ private:
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr> callback_groups_;
|
||||
|
||||
std::atomic_bool associated_with_executor_;
|
||||
std::function<void()> call_executor_function_;
|
||||
|
||||
/// Guard condition for notifying the Executor of changes to this node.
|
||||
mutable std::recursive_mutex notify_guard_condition_mutex_;
|
||||
|
||||
@@ -161,6 +161,11 @@ public:
|
||||
virtual
|
||||
bool
|
||||
get_enable_topic_statistics_default() const = 0;
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
virtual
|
||||
void
|
||||
set_executor_function(std::function<void()> call_executor_function) = 0;
|
||||
};
|
||||
|
||||
} // namespace node_interfaces
|
||||
|
||||
@@ -48,6 +48,9 @@ CallbackGroup::add_subscription(
|
||||
subscription_ptrs_.end(),
|
||||
[](rclcpp::SubscriptionBase::WeakPtr x) {return x.expired();}),
|
||||
subscription_ptrs_.end());
|
||||
if (call_executor_function_ != nullptr) {
|
||||
call_executor_function_();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
@@ -61,6 +64,9 @@ CallbackGroup::add_timer(const rclcpp::TimerBase::SharedPtr timer_ptr)
|
||||
timer_ptrs_.end(),
|
||||
[](rclcpp::TimerBase::WeakPtr x) {return x.expired();}),
|
||||
timer_ptrs_.end());
|
||||
if (call_executor_function_ != nullptr) {
|
||||
call_executor_function_();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
@@ -74,6 +80,9 @@ CallbackGroup::add_service(const rclcpp::ServiceBase::SharedPtr service_ptr)
|
||||
service_ptrs_.end(),
|
||||
[](rclcpp::ServiceBase::WeakPtr x) {return x.expired();}),
|
||||
service_ptrs_.end());
|
||||
if (call_executor_function_ != nullptr) {
|
||||
call_executor_function_();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
@@ -87,6 +96,9 @@ CallbackGroup::add_client(const rclcpp::ClientBase::SharedPtr client_ptr)
|
||||
client_ptrs_.end(),
|
||||
[](rclcpp::ClientBase::WeakPtr x) {return x.expired();}),
|
||||
client_ptrs_.end());
|
||||
if (call_executor_function_ != nullptr) {
|
||||
call_executor_function_();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
@@ -100,6 +112,9 @@ CallbackGroup::add_waitable(const rclcpp::Waitable::SharedPtr waitable_ptr)
|
||||
waitable_ptrs_.end(),
|
||||
[](rclcpp::Waitable::WeakPtr x) {return x.expired();}),
|
||||
waitable_ptrs_.end());
|
||||
if (call_executor_function_ != nullptr) {
|
||||
call_executor_function_();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
@@ -113,4 +128,7 @@ CallbackGroup::remove_waitable(const rclcpp::Waitable::SharedPtr waitable_ptr) n
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (call_executor_function_ != nullptr) {
|
||||
call_executor_function_();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,6 +38,8 @@ using rclcpp::FutureReturnCode;
|
||||
|
||||
Executor::Executor(const rclcpp::ExecutorOptions & options)
|
||||
: spinning(false),
|
||||
wake_after_execute_(false),
|
||||
exec_added_or_removed_(false),
|
||||
memory_strategy_(options.memory_strategy)
|
||||
{
|
||||
rcl_guard_condition_options_t guard_condition_options = rcl_guard_condition_get_default_options();
|
||||
@@ -131,6 +133,18 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
|
||||
throw std::runtime_error("Cannot add node to executor, node already added.");
|
||||
}
|
||||
}
|
||||
node_ptr->set_executor_function(
|
||||
[this]
|
||||
{this->exec_added_or_removed_.store(true);});
|
||||
for (auto & weak_group : node_ptr->get_callback_groups()) {
|
||||
auto callback_group = weak_group.lock();
|
||||
if (!callback_group) {
|
||||
continue;
|
||||
}
|
||||
callback_group->set_executor_function(
|
||||
[this]
|
||||
{this->exec_added_or_removed_.store(true);});
|
||||
}
|
||||
weak_nodes_.push_back(node_ptr);
|
||||
guard_conditions_.push_back(node_ptr->get_notify_guard_condition());
|
||||
if (notify) {
|
||||
@@ -139,6 +153,11 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
|
||||
throw std::runtime_error(rcl_get_error_string().str);
|
||||
}
|
||||
}
|
||||
|
||||
// Check whether triggering a guard condition is necessary
|
||||
// (will depend on the type of executor and callback groups)
|
||||
determine_wake_after_execute();
|
||||
|
||||
// Add the node's notify condition to the guard condition handles
|
||||
std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
|
||||
memory_strategy_->add_guard_condition(node_ptr->get_notify_guard_condition());
|
||||
@@ -179,6 +198,11 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check whether triggering a guard condition is necessary
|
||||
// (will depend on the type of executor and callback groups)
|
||||
determine_wake_after_execute();
|
||||
|
||||
std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
|
||||
memory_strategy_->remove_guard_condition(node_ptr->get_notify_guard_condition());
|
||||
}
|
||||
@@ -325,9 +349,22 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
|
||||
}
|
||||
// Reset the callback_group, regardless of type
|
||||
any_exec.callback_group->can_be_taken_from().store(true);
|
||||
|
||||
// check whether callback groups were added or removed or
|
||||
// an exec was added or removed from callback groups
|
||||
// if it was, check whether we should trigger
|
||||
// guard condition after waking up
|
||||
if (exec_added_or_removed_.exchange(false)) {
|
||||
determine_wake_after_execute();
|
||||
}
|
||||
|
||||
// Check whether triggering a guard condition is necessary
|
||||
// (will depend on the type of executor and callback groups)
|
||||
// Wake the wait, because it may need to be recalculated or work that
|
||||
// was previously blocked is now available.
|
||||
if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) {
|
||||
// was previously blocked is now available
|
||||
if (get_wake_after_execute_flag() &&
|
||||
rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK)
|
||||
{
|
||||
throw std::runtime_error(rcl_get_error_string().str);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,6 +70,42 @@ MultiThreadedExecutor::get_number_of_threads()
|
||||
return number_of_threads_;
|
||||
}
|
||||
|
||||
// Iterate through all the callback groups of
|
||||
// all nodes in the executor and verify
|
||||
// there is a non-empty callback group
|
||||
// that is "Mutually Exclusive". If there is,
|
||||
// then set trigger guard condition to true
|
||||
// so that it avoids a thread getting stuck
|
||||
// in rcl_wait.
|
||||
bool
|
||||
MultiThreadedExecutor::determine_wake_after_execute()
|
||||
{
|
||||
// Initialize as false in the case
|
||||
// it is true and the executor has
|
||||
// zero nodes, nodes with empty
|
||||
// callback groups, or nodes with
|
||||
// all reentrant callback groups
|
||||
wake_after_execute_.store(false);
|
||||
for (auto & weak_node : weak_nodes_) {
|
||||
auto node = weak_node.lock();
|
||||
if (!node) {
|
||||
continue;
|
||||
}
|
||||
for (auto & weak_group : node->get_callback_groups()) {
|
||||
auto callback_group = weak_group.lock();
|
||||
// Skip over callback groups that are empty
|
||||
if (!callback_group || callback_group->size() == 0) {
|
||||
continue;
|
||||
}
|
||||
if (callback_group->type() == rclcpp::callback_group::CallbackGroupType::MutuallyExclusive) {
|
||||
wake_after_execute_.store(true);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return wake_after_execute_.load();
|
||||
}
|
||||
|
||||
void
|
||||
MultiThreadedExecutor::run(size_t)
|
||||
{
|
||||
|
||||
@@ -222,6 +222,9 @@ NodeBase::create_callback_group(rclcpp::CallbackGroupType group_type)
|
||||
using rclcpp::CallbackGroupType;
|
||||
auto group = CallbackGroup::SharedPtr(new CallbackGroup(group_type));
|
||||
callback_groups_.push_back(group);
|
||||
if (call_executor_function_ != nullptr) {
|
||||
call_executor_function_();
|
||||
}
|
||||
return group;
|
||||
}
|
||||
|
||||
|
||||
@@ -458,6 +458,12 @@ if(TARGET test_multi_threaded_executor)
|
||||
target_link_libraries(test_multi_threaded_executor ${PROJECT_NAME})
|
||||
endif()
|
||||
|
||||
ament_add_gtest(test_wake_after_execute_flag rclcpp/test_wake_after_execute_flag.cpp
|
||||
APPEND_LIBRARY_DIRS "${append_library_dirs}")
|
||||
if(TARGET test_wake_after_execute_flag)
|
||||
target_link_libraries(test_wake_after_execute_flag ${PROJECT_NAME})
|
||||
endif()
|
||||
|
||||
ament_add_gtest(test_guard_condition rclcpp/test_guard_condition.cpp
|
||||
APPEND_LIBRARY_DIRS "${append_library_dirs}")
|
||||
if(TARGET test_guard_condition)
|
||||
|
||||
90
rclcpp/test/rclcpp/test_wake_after_execute_flag.cpp
Normal file
90
rclcpp/test/rclcpp/test_wake_after_execute_flag.cpp
Normal file
@@ -0,0 +1,90 @@
|
||||
// Copyright 2020 Open Source Robotics Foundation, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include "rclcpp/exceptions.hpp"
|
||||
#include "rclcpp/executors.hpp"
|
||||
#include "rclcpp/executor.hpp"
|
||||
#include "rclcpp/node.hpp"
|
||||
#include "rclcpp/rclcpp.hpp"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
class TestWakeAfterExecuteFlag : public ::testing::Test
|
||||
{
|
||||
protected:
|
||||
static void SetUpTestCase()
|
||||
{
|
||||
rclcpp::init(0, nullptr);
|
||||
}
|
||||
};
|
||||
|
||||
constexpr int EXECUTION_COUNT = 5;
|
||||
|
||||
/*
|
||||
Test guard condition trigger is set when a node is added and resetted when it is removed
|
||||
*/
|
||||
TEST_F(TestWakeAfterExecuteFlag, determine_wake_after_execute_flag_multi_threaded) {
|
||||
rclcpp::executors::MultiThreadedExecutor executor;
|
||||
|
||||
std::shared_ptr<rclcpp::Node> node =
|
||||
std::make_shared<rclcpp::Node>("test_wake_after_execute_flag_multi_threaded");
|
||||
auto cbg = node->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
|
||||
std::atomic_int timer_count {0};
|
||||
|
||||
auto timer_callback = [&executor, &timer_count]() {
|
||||
printf("Timer executed!");
|
||||
if (timer_count > 0) {
|
||||
ASSERT_EQ(executor.get_wake_after_execute_flag(), true);
|
||||
}
|
||||
timer_count++;
|
||||
if (timer_count > EXECUTION_COUNT) {
|
||||
executor.cancel();
|
||||
}
|
||||
};
|
||||
|
||||
auto timer_ = node->create_wall_timer(
|
||||
2s, timer_callback, cbg);
|
||||
executor.add_node(node);
|
||||
executor.spin();
|
||||
}
|
||||
|
||||
TEST_F(TestWakeAfterExecuteFlag, determine_wake_after_execute_flag_single_threaded) {
|
||||
rclcpp::executors::SingleThreadedExecutor executor;
|
||||
|
||||
std::shared_ptr<rclcpp::Node> node =
|
||||
std::make_shared<rclcpp::Node>("test_wake_after_execute_flag_single_threaded");
|
||||
std::atomic_int timer_count {0};
|
||||
|
||||
auto timer_callback = [&executor, &timer_count]() {
|
||||
printf("Timer executed!");
|
||||
if (timer_count > 0) {
|
||||
ASSERT_EQ(executor.get_wake_after_execute_flag(), false);
|
||||
}
|
||||
timer_count++;
|
||||
if (timer_count > EXECUTION_COUNT) {
|
||||
executor.cancel();
|
||||
}
|
||||
};
|
||||
|
||||
auto timer_ = node->create_wall_timer(
|
||||
2s, timer_callback);
|
||||
executor.add_node(node);
|
||||
executor.spin();
|
||||
}
|
||||
Reference in New Issue
Block a user