Compare commits

...

1 Commits

Author SHA1 Message Date
Pedro Pena
61e37dfae1 scheduled timer mutex 2020-06-15 18:48:23 -04:00
4 changed files with 142 additions and 13 deletions

View File

@@ -82,6 +82,7 @@ private:
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)
std::mutex wait_mutex_;
std::mutex scheduled_timers_mutex_;
size_t number_of_threads_;
bool yield_before_execute_;
std::chrono::nanoseconds next_exec_timeout_;

View File

@@ -83,19 +83,22 @@ MultiThreadedExecutor::run(size_t)
if (!get_next_executable(any_exec, next_exec_timeout_)) {
continue;
}
if (any_exec.timer) {
// Guard against multiple threads getting the same timer.
if (scheduled_timers_.count(any_exec.timer) != 0) {
// Make sure that any_exec's callback group is reset before
// the lock is released.
if (any_exec.callback_group) {
any_exec.callback_group->can_be_taken_from().store(true);
}
continue;
}
scheduled_timers_.insert(any_exec.timer);
}
}
if (any_exec.timer) {
std::lock_guard<std::mutex> wait_lock(scheduled_timers_mutex_);
// Guard against multiple threads getting the same timer.
if (scheduled_timers_.count(any_exec.timer) != 0) {
// Make sure that any_exec's callback group is reset before
// the lock is released.
if (any_exec.callback_group) {
any_exec.callback_group->can_be_taken_from().store(true);
}
continue;
}
scheduled_timers_.insert(any_exec.timer);
}
if (yield_before_execute_) {
std::this_thread::yield();
}
@@ -103,7 +106,7 @@ MultiThreadedExecutor::run(size_t)
execute_any_executable(any_exec);
if (any_exec.timer) {
std::lock_guard<std::mutex> wait_lock(wait_mutex_);
std::lock_guard<std::mutex> wait_lock(scheduled_timers_mutex_);
auto it = scheduled_timers_.find(any_exec.timer);
if (it != scheduled_timers_.end()) {
scheduled_timers_.erase(it);

View File

@@ -397,6 +397,12 @@ if(TARGET test_guard_condition)
target_link_libraries(test_guard_condition ${PROJECT_NAME})
endif()
ament_add_gtest(test_timer_count rclcpp/test_timer_call_count.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}")
if(TARGET test_timer_count)
target_link_libraries(test_timer_count ${PROJECT_NAME})
endif()
ament_add_gtest(test_wait_set rclcpp/test_wait_set.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}")
if(TARGET test_wait_set)

View File

@@ -0,0 +1,119 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include <chrono>
#include <string>
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/rclcpp.hpp"
#include "rclcpp/executors.hpp"
#include "rclcpp/executor.hpp"
using namespace std::chrono_literals;
class TestTimerCount: public ::testing::Test
{
protected:
static void SetUpTestCase()
{
rclcpp::init(0, nullptr);
}
};
constexpr int EXECUTION_COUNT = 5;
constexpr double TIME_ELAPSED = 12.0;
/*
Test timer wait mutex with multithreaded executor.
After 5 call
*/
TEST_F(TestTimerCount, timer_call_count_multi_threaded) {
rclcpp::executors::MultiThreadedExecutor executor;
std::shared_ptr<rclcpp::Node> node =
std::make_shared<rclcpp::Node>("timer_call_count_multi_threaded");
auto cbg = node->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
rclcpp::Clock system_clock(RCL_STEADY_TIME);
std::mutex last_mutex;
rclcpp::Time initial = system_clock.now();
std::atomic_int timer_count {0};
auto timer_callback = [&timer_count, &executor, &system_clock, &last_mutex, &initial]() {
rclcpp::Time now = system_clock.now();
timer_count++;
{
std::lock_guard<std::mutex> lock(last_mutex);
double diff = std::abs((now - initial).nanoseconds()) / 1.0e9;
if (diff > TIME_ELAPSED) {
executor.cancel();
ASSERT_GT(timer_count, EXECUTION_COUNT);
}
}
};
auto timer_ = node->create_wall_timer(
2s, timer_callback, cbg);
executor.add_node(node);
executor.spin();
}
/*
Test timer wait mutex with singlethreaded executor
*/
TEST_F(TestTimerCount, timer_call_count_single_threaded) {
rclcpp::executors::SingleThreadedExecutor executor;
std::shared_ptr<rclcpp::Node> node =
std::make_shared<rclcpp::Node>("timer_call_count_single_threaded");
rclcpp::Clock system_clock(RCL_STEADY_TIME);
std::mutex last_mutex;
rclcpp::Time initial = system_clock.now();
std::atomic_int timer_count {0};
auto timer_callback = [&timer_count, &executor, &system_clock, &last_mutex, &initial]() {
rclcpp::Time now = system_clock.now();
timer_count++;
{
std::lock_guard<std::mutex> lock(last_mutex);
double diff = std::abs((now - initial).nanoseconds()) / 1.0e9;
if (diff > TIME_ELAPSED) {
executor.cancel();
ASSERT_GT(timer_count, EXECUTION_COUNT);
}
}
};
auto timer_ = node->create_wall_timer(
2s, timer_callback);
executor.add_node(node);
executor.spin();
}