Compare commits

...

14 Commits

Author SHA1 Message Date
Pedro Pena
5408101dc1 fixed uncrustify issues 2020-07-07 17:57:52 -04:00
Pedro Pena
86daeb5ba5 fixed uncrustify 2020-07-07 17:37:11 -04:00
Pedro Pena
b5740d566b check function has been assigned already 2020-07-07 17:03:42 -04:00
Pedro Pena
22615ed50c remove vertical whitespace 2020-07-07 15:58:34 -04:00
Pedro Pena
40de3cdd2d alphabetic order includes 2020-07-07 15:58:34 -04:00
Pedro Pena
69dcfd0d57 added more comments and changed func sigs 2020-07-07 15:58:34 -04:00
Pedro Pena
1fb22363b4 added descriptive doc block for get_wake_after_execute_flag 2020-07-07 15:58:34 -04:00
Pedro Pena
6c4cbeb0ec added function from exec to node and cbg 2020-07-07 15:58:34 -04:00
Pedro Pena
03cb422085 added std function in cbg level 2020-07-07 15:58:34 -04:00
Pedro Pena
c10c900b36 Update rclcpp/include/rclcpp/callback_group.hpp
Co-authored-by: William Woodall <william@osrfoundation.org>
2020-07-07 15:58:34 -04:00
Pedro Pena
05ad418b3b Update rclcpp/include/rclcpp/callback_group.hpp
Co-authored-by: William Woodall <william@osrfoundation.org>
2020-07-07 15:58:34 -04:00
Pedro Pena
e6e7da1203 added verification of add/remove items at the cbg/memory strategy level 2020-07-07 15:58:34 -04:00
Pedro Pena
bd1ef80605 check in add and remove node 2020-07-07 15:58:34 -04:00
Pedro Pena
822c9ab491 wake after execute feature
Signed-off-by: Pedro Pena <peter.a.pena@gmail.com>
2020-07-07 15:58:34 -04:00
11 changed files with 280 additions and 2 deletions

View File

@@ -94,6 +94,28 @@ public:
return _find_ptrs_if_impl<rclcpp::Waitable, Function>(func, waitable_ptrs_);
}
/// Return the total number of entities in the callback group.
/**
* The size of the callback group includes subscriptions, timers,
* clients, services, and waitables.
*
* \return size of the callback group
*/
RCLCPP_PUBLIC
size_t
size() const
{
return subscription_ptrs_.size() + timer_ptrs_.size() +
client_ptrs_.size() + service_ptrs_.size() + waitable_ptrs_.size();
}
RCLCPP_PUBLIC
void
set_executor_function(std::function<void()> call_executor_function)
{
call_executor_function_ = call_executor_function;
}
RCLCPP_PUBLIC
std::atomic_bool &
can_be_taken_from();
@@ -142,6 +164,7 @@ protected:
std::vector<rclcpp::ClientBase::WeakPtr> client_ptrs_;
std::vector<rclcpp::Waitable::WeakPtr> waitable_ptrs_;
std::atomic_bool can_be_taken_from_;
std::function<void()> call_executor_function_;
private:
template<typename TypeT, typename Function>

View File

@@ -280,6 +280,20 @@ public:
void
set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy);
/// Returns true if guard condition should be triggered after executing executable.
/**
* After a thread executes an executable, the thread triggers the guard condition
* to wake any thread sleeping so that it can check for pending executables.
* If this is not done, a thread can find itself sleeping indefinitely. This will
* only be true when there is at least one non-empty mutually exclusive callback group.
*
* \return wake_after_execute_ flag, returns true if the guard condition should be
* triggered after a thread executes an executable.
*/
RCLCPP_PUBLIC
bool
get_wake_after_execute_flag() {return wake_after_execute_.load();}
protected:
RCLCPP_PUBLIC
void
@@ -301,6 +315,22 @@ protected:
void
execute_any_executable(AnyExecutable & any_exec);
/// Wake after executing when the executor is multi-threaded and has
/// at least one non-empty mutually exclusive group
/**
* After executing an executable, this function determines
* if it should wake the wait in rcl_wait so that the executor
* can process any pending executable. This only has to be done
* when the executor is multi-threaded and has at least one
* non-empty mutually exclusive callback group.
*
* \return true if there is a non-empty mutually exclusive
* callback group in a multithreaded executor and false otherwise
*/
RCLCPP_PUBLIC
virtual bool
determine_wake_after_execute() {return false;}
RCLCPP_PUBLIC
static void
execute_subscription(
@@ -346,6 +376,12 @@ protected:
/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
std::atomic_bool spinning;
/// boolean to control whether guard condition is triggered after executing
std::atomic_bool wake_after_execute_;
// a callback group was added in the node or an item was removed or added to the callbacks.
std::atomic_bool exec_added_or_removed_;
/// Guard condition for signaling the rmw layer to wake up for special events.
rcl_guard_condition_t interrupt_guard_condition_ = rcl_get_zero_initialized_guard_condition();

View File

@@ -78,6 +78,22 @@ protected:
void
run(size_t this_thread_number);
/// Wake after executing when the executor is multi-threaded and has
/// at least one non-empty mutually exclusive group
/**
* After executing an executable, this function determines
* if it should wake the wait in rcl_wait so that the executor
* can process any pending executable. This only has to be done
* when the executor is multi-threaded and has at least one
* non-empty mutually exclusive callback group.
*
* \return true if there is a non-empty mutually exclusive
* callback group in a multithreaded executor and false otherwise
*/
RCLCPP_PUBLIC
bool
determine_wake_after_execute();
private:
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)

View File

@@ -116,6 +116,13 @@ public:
bool
get_enable_topic_statistics_default() const override;
RCLCPP_PUBLIC
void
set_executor_function(std::function<void()> call_executor_function) override
{
call_executor_function_ = call_executor_function;
}
private:
RCLCPP_DISABLE_COPY(NodeBase)
@@ -129,6 +136,7 @@ private:
std::vector<rclcpp::CallbackGroup::WeakPtr> callback_groups_;
std::atomic_bool associated_with_executor_;
std::function<void()> call_executor_function_;
/// Guard condition for notifying the Executor of changes to this node.
mutable std::recursive_mutex notify_guard_condition_mutex_;

View File

@@ -161,6 +161,11 @@ public:
virtual
bool
get_enable_topic_statistics_default() const = 0;
RCLCPP_PUBLIC
virtual
void
set_executor_function(std::function<void()> call_executor_function) = 0;
};
} // namespace node_interfaces

View File

@@ -48,6 +48,9 @@ CallbackGroup::add_subscription(
subscription_ptrs_.end(),
[](rclcpp::SubscriptionBase::WeakPtr x) {return x.expired();}),
subscription_ptrs_.end());
if (call_executor_function_ != nullptr) {
call_executor_function_();
}
}
void
@@ -61,6 +64,9 @@ CallbackGroup::add_timer(const rclcpp::TimerBase::SharedPtr timer_ptr)
timer_ptrs_.end(),
[](rclcpp::TimerBase::WeakPtr x) {return x.expired();}),
timer_ptrs_.end());
if (call_executor_function_ != nullptr) {
call_executor_function_();
}
}
void
@@ -74,6 +80,9 @@ CallbackGroup::add_service(const rclcpp::ServiceBase::SharedPtr service_ptr)
service_ptrs_.end(),
[](rclcpp::ServiceBase::WeakPtr x) {return x.expired();}),
service_ptrs_.end());
if (call_executor_function_ != nullptr) {
call_executor_function_();
}
}
void
@@ -87,6 +96,9 @@ CallbackGroup::add_client(const rclcpp::ClientBase::SharedPtr client_ptr)
client_ptrs_.end(),
[](rclcpp::ClientBase::WeakPtr x) {return x.expired();}),
client_ptrs_.end());
if (call_executor_function_ != nullptr) {
call_executor_function_();
}
}
void
@@ -100,6 +112,9 @@ CallbackGroup::add_waitable(const rclcpp::Waitable::SharedPtr waitable_ptr)
waitable_ptrs_.end(),
[](rclcpp::Waitable::WeakPtr x) {return x.expired();}),
waitable_ptrs_.end());
if (call_executor_function_ != nullptr) {
call_executor_function_();
}
}
void
@@ -113,4 +128,7 @@ CallbackGroup::remove_waitable(const rclcpp::Waitable::SharedPtr waitable_ptr) n
break;
}
}
if (call_executor_function_ != nullptr) {
call_executor_function_();
}
}

View File

@@ -38,6 +38,8 @@ using rclcpp::FutureReturnCode;
Executor::Executor(const rclcpp::ExecutorOptions & options)
: spinning(false),
wake_after_execute_(false),
exec_added_or_removed_(false),
memory_strategy_(options.memory_strategy)
{
rcl_guard_condition_options_t guard_condition_options = rcl_guard_condition_get_default_options();
@@ -131,6 +133,18 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
throw std::runtime_error("Cannot add node to executor, node already added.");
}
}
node_ptr->set_executor_function(
[this]
{this->exec_added_or_removed_.store(true);});
for (auto & weak_group : node_ptr->get_callback_groups()) {
auto callback_group = weak_group.lock();
if (!callback_group) {
continue;
}
callback_group->set_executor_function(
[this]
{this->exec_added_or_removed_.store(true);});
}
weak_nodes_.push_back(node_ptr);
guard_conditions_.push_back(node_ptr->get_notify_guard_condition());
if (notify) {
@@ -139,6 +153,11 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
throw std::runtime_error(rcl_get_error_string().str);
}
}
// Check whether triggering a guard condition is necessary
// (will depend on the type of executor and callback groups)
determine_wake_after_execute();
// Add the node's notify condition to the guard condition handles
std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
memory_strategy_->add_guard_condition(node_ptr->get_notify_guard_condition());
@@ -179,6 +198,11 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node
}
}
}
// Check whether triggering a guard condition is necessary
// (will depend on the type of executor and callback groups)
determine_wake_after_execute();
std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
memory_strategy_->remove_guard_condition(node_ptr->get_notify_guard_condition());
}
@@ -325,9 +349,22 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
}
// Reset the callback_group, regardless of type
any_exec.callback_group->can_be_taken_from().store(true);
// check whether callback groups were added or removed or
// an exec was added or removed from callback groups
// if it was, check whether we should trigger
// guard condition after waking up
if (exec_added_or_removed_.exchange(false)) {
determine_wake_after_execute();
}
// Check whether triggering a guard condition is necessary
// (will depend on the type of executor and callback groups)
// Wake the wait, because it may need to be recalculated or work that
// was previously blocked is now available.
if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) {
// was previously blocked is now available
if (get_wake_after_execute_flag() &&
rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK)
{
throw std::runtime_error(rcl_get_error_string().str);
}
}

View File

@@ -70,6 +70,42 @@ MultiThreadedExecutor::get_number_of_threads()
return number_of_threads_;
}
// Iterate through all the callback groups of
// all nodes in the executor and verify
// there is a non-empty callback group
// that is "Mutually Exclusive". If there is,
// then set trigger guard condition to true
// so that it avoids a thread getting stuck
// in rcl_wait.
bool
MultiThreadedExecutor::determine_wake_after_execute()
{
// Initialize as false in the case
// it is true and the executor has
// zero nodes, nodes with empty
// callback groups, or nodes with
// all reentrant callback groups
wake_after_execute_.store(false);
for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (!node) {
continue;
}
for (auto & weak_group : node->get_callback_groups()) {
auto callback_group = weak_group.lock();
// Skip over callback groups that are empty
if (!callback_group || callback_group->size() == 0) {
continue;
}
if (callback_group->type() == rclcpp::callback_group::CallbackGroupType::MutuallyExclusive) {
wake_after_execute_.store(true);
break;
}
}
}
return wake_after_execute_.load();
}
void
MultiThreadedExecutor::run(size_t)
{

View File

@@ -222,6 +222,9 @@ NodeBase::create_callback_group(rclcpp::CallbackGroupType group_type)
using rclcpp::CallbackGroupType;
auto group = CallbackGroup::SharedPtr(new CallbackGroup(group_type));
callback_groups_.push_back(group);
if (call_executor_function_ != nullptr) {
call_executor_function_();
}
return group;
}

View File

@@ -458,6 +458,12 @@ if(TARGET test_multi_threaded_executor)
target_link_libraries(test_multi_threaded_executor ${PROJECT_NAME})
endif()
ament_add_gtest(test_wake_after_execute_flag rclcpp/test_wake_after_execute_flag.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}")
if(TARGET test_wake_after_execute_flag)
target_link_libraries(test_wake_after_execute_flag ${PROJECT_NAME})
endif()
ament_add_gtest(test_guard_condition rclcpp/test_guard_condition.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}")
if(TARGET test_guard_condition)

View File

@@ -0,0 +1,90 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include <chrono>
#include <memory>
#include <string>
#include "rclcpp/exceptions.hpp"
#include "rclcpp/executors.hpp"
#include "rclcpp/executor.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/rclcpp.hpp"
using namespace std::chrono_literals;
class TestWakeAfterExecuteFlag : public ::testing::Test
{
protected:
static void SetUpTestCase()
{
rclcpp::init(0, nullptr);
}
};
constexpr int EXECUTION_COUNT = 5;
/*
Test guard condition trigger is set when a node is added and resetted when it is removed
*/
TEST_F(TestWakeAfterExecuteFlag, determine_wake_after_execute_flag_multi_threaded) {
rclcpp::executors::MultiThreadedExecutor executor;
std::shared_ptr<rclcpp::Node> node =
std::make_shared<rclcpp::Node>("test_wake_after_execute_flag_multi_threaded");
auto cbg = node->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
std::atomic_int timer_count {0};
auto timer_callback = [&executor, &timer_count]() {
printf("Timer executed!");
if (timer_count > 0) {
ASSERT_EQ(executor.get_wake_after_execute_flag(), true);
}
timer_count++;
if (timer_count > EXECUTION_COUNT) {
executor.cancel();
}
};
auto timer_ = node->create_wall_timer(
2s, timer_callback, cbg);
executor.add_node(node);
executor.spin();
}
TEST_F(TestWakeAfterExecuteFlag, determine_wake_after_execute_flag_single_threaded) {
rclcpp::executors::SingleThreadedExecutor executor;
std::shared_ptr<rclcpp::Node> node =
std::make_shared<rclcpp::Node>("test_wake_after_execute_flag_single_threaded");
std::atomic_int timer_count {0};
auto timer_callback = [&executor, &timer_count]() {
printf("Timer executed!");
if (timer_count > 0) {
ASSERT_EQ(executor.get_wake_after_execute_flag(), false);
}
timer_count++;
if (timer_count > EXECUTION_COUNT) {
executor.cancel();
}
};
auto timer_ = node->create_wall_timer(
2s, timer_callback);
executor.add_node(node);
executor.spin();
}