Compare commits

...

8 Commits

Author SHA1 Message Date
Dirk Thomas
69f7bca85d Merge pull request #193 from ros2/issue_192
potential fix for issue 192
2016-02-16 14:19:02 -08:00
Jackie Kay
4a04fe8b4a potential fix for issue 192 2016-02-16 13:44:38 -08:00
Jackie Kay
6fbf3f8c5f Merge pull request #194 from ros2/get_rclcpp_info_cleanup
Cleanup in get_rclcpp_information
2016-02-11 15:07:29 -08:00
Jackie Kay
9f84273467 Cleanup in get_rclcpp_information 2016-02-11 15:07:08 -08:00
Jackie Kay
9d754a70a2 Merge pull request #166 from ros2/waitset_handle
Store handle of rmw_waitset_t in Executor
2016-01-12 17:46:35 -08:00
Jackie Kay
4b0ad21b3d Adjust for new rmw_waitset_t API 2016-01-12 17:42:34 -08:00
gerkey
6ec5e8e974 Merge pull request #191 from ros2/fix_rate_test
fix rate test
2015-12-31 16:17:22 -08:00
Jenkins @ ROS 2
be0be759ec fix rate test 2015-12-31 16:16:51 -08:00
18 changed files with 127 additions and 67 deletions

View File

@@ -65,6 +65,7 @@ endmacro()
call_for_each_rmw_implementation(target GENERATE_DEFAULT)
ament_export_dependencies(ament_cmake)
ament_export_dependencies(rcl_interfaces)
ament_export_dependencies(rmw)
ament_export_dependencies(rmw_implementation)

View File

@@ -13,7 +13,7 @@
# limitations under the License.
#
# Get all information abut rclcpp for a specific RMW implementation.
# Get all information about rclcpp for a specific RMW implementation.
#
# It sets the common variables _DEFINITIONS, _INCLUDE_DIRS and _LIBRARIES
# with the given prefix.
@@ -29,7 +29,7 @@ macro(get_rclcpp_information rmw_implementation var_prefix)
set(${var_prefix}_FOUND TRUE)
# include directories
set(${var_prefix}_INCLUDE_DIRS
normalize_path(${var_prefix}_INCLUDE_DIRS
"${rclcpp_DIR}/../../../include")
# libraries
@@ -49,7 +49,7 @@ macro(get_rclcpp_information rmw_implementation var_prefix)
)
if(NOT _lib)
# warn about not existing library and ignore it
message(FATAL_ERROR "Package 'rclcpp' doesn't contain the library '${_library_target}'")
message(WARNING "Package 'rclcpp' doesn't contain the library '${_library_target}'")
elseif(NOT IS_ABSOLUTE "${_lib}")
# the found library must be an absolute path
message(FATAL_ERROR "Package 'rclcpp' found the library '${_library_target}' at '${_lib}' which is not an absolute path")

View File

@@ -66,7 +66,7 @@ public:
get_service_ptrs() const;
RCLCPP_PUBLIC
const std::vector<rclcpp::client::ClientBase::SharedPtr> &
const std::vector<rclcpp::client::ClientBase::WeakPtr> &
get_client_ptrs() const;
RCLCPP_PUBLIC
@@ -100,7 +100,7 @@ private:
std::vector<rclcpp::subscription::SubscriptionBase::WeakPtr> subscription_ptrs_;
std::vector<rclcpp::timer::TimerBase::WeakPtr> timer_ptrs_;
std::vector<rclcpp::service::ServiceBase::SharedPtr> service_ptrs_;
std::vector<rclcpp::client::ClientBase::SharedPtr> client_ptrs_;
std::vector<rclcpp::client::ClientBase::WeakPtr> client_ptrs_;
std::atomic_bool can_be_taken_from_;
};

View File

@@ -60,7 +60,7 @@ public:
virtual std::shared_ptr<void> create_response() = 0;
virtual std::shared_ptr<void> create_request_header() = 0;
virtual void handle_response(
std::shared_ptr<void> & request_header, std::shared_ptr<void> & response) = 0;
std::shared_ptr<void> request_header, std::shared_ptr<void> response) = 0;
private:
RCLCPP_DISABLE_COPY(ClientBase);
@@ -111,13 +111,17 @@ public:
return std::shared_ptr<void>(new rmw_request_id_t);
}
void handle_response(std::shared_ptr<void> & request_header, std::shared_ptr<void> & response)
void handle_response(std::shared_ptr<void> request_header, std::shared_ptr<void> response)
{
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
auto typed_request_header = std::static_pointer_cast<rmw_request_id_t>(request_header);
auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(response);
int64_t sequence_number = typed_request_header->sequence_number;
// TODO(esteve) this must check if the sequence_number is valid otherwise the
// call_promise will be null
// TODO(esteve) this should throw instead since it is not expected to happen in the first place
if (this->pending_requests_.count(sequence_number) == 0) {
fprintf(stderr, "Received invalid sequence number. Ignoring...\n");
return;
}
auto tuple = this->pending_requests_[sequence_number];
auto call_promise = std::get<0>(tuple);
auto callback = std::get<1>(tuple);
@@ -143,6 +147,7 @@ public:
>
SharedFuture async_send_request(SharedRequest request, CallbackT && cb)
{
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
int64_t sequence_number;
if (RMW_RET_OK != rmw_send_request(get_client_handle(), request.get(), &sequence_number)) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
@@ -187,6 +192,7 @@ private:
RCLCPP_DISABLE_COPY(Client);
std::map<int64_t, std::tuple<SharedPromise, CallbackType, SharedFuture>> pending_requests_;
std::mutex pending_requests_mutex_;
};
} // namespace client

View File

@@ -44,6 +44,24 @@ namespace executor
*/
enum FutureReturnCode {SUCCESS, INTERRUPTED, TIMEOUT};
///
/**
* Options to be passed to the executor constructor.
*/
struct ExecutorArgs
{
memory_strategy::MemoryStrategy::SharedPtr memory_strategy;
size_t max_conditions = 0;
};
static inline ExecutorArgs create_default_executor_arguments()
{
ExecutorArgs args;
args.memory_strategy = memory_strategies::create_default_strategy();
args.max_conditions = 0;
return args;
}
/// Coordinate the order and timing of available communication tasks.
/**
* Executor provides spin functions (including spin_node_once and spin_some).
@@ -62,8 +80,7 @@ public:
/// Default constructor.
// \param[in] ms The memory strategy to be used with this executor.
RCLCPP_PUBLIC
explicit Executor(
memory_strategy::MemoryStrategy::SharedPtr ms = memory_strategies::create_default_strategy());
explicit Executor(const ExecutorArgs & args = create_default_executor_arguments());
/// Default destructor.
RCLCPP_PUBLIC
@@ -262,9 +279,14 @@ protected:
/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
std::atomic_bool spinning;
rmw_guard_conditions_t fixed_guard_conditions_;
/// Guard condition for signaling the rmw layer to wake up for special events.
rmw_guard_condition_t * interrupt_guard_condition_;
/// Waitset for managing entities that the rmw layer waits on.
rmw_waitset_t * waitset_;
/// The memory strategy: an interface for handling user-defined memory allocation strategies.
memory_strategy::MemoryStrategy::SharedPtr memory_strategy_;

View File

@@ -38,7 +38,7 @@ public:
RCLCPP_PUBLIC
MultiThreadedExecutor(
memory_strategy::MemoryStrategy::SharedPtr ms = memory_strategies::create_default_strategy());
const executor::ExecutorArgs & args = rclcpp::executor::create_default_executor_arguments());
RCLCPP_PUBLIC
virtual ~MultiThreadedExecutor();

View File

@@ -47,7 +47,7 @@ public:
/// Default constructor. See the default constructor for Executor.
RCLCPP_PUBLIC
SingleThreadedExecutor(
memory_strategy::MemoryStrategy::SharedPtr ms = memory_strategies::create_default_strategy());
const executor::ExecutorArgs & args = rclcpp::executor::create_default_executor_arguments());
/// Default destrcutor.
RCLCPP_PUBLIC

View File

@@ -252,6 +252,8 @@ public:
const CallbackGroupWeakPtrList &
get_callback_groups() const;
std::atomic_bool has_executor;
private:
RCLCPP_DISABLE_COPY(Node);

View File

@@ -151,7 +151,8 @@ public:
services_.push_back(service);
}
}
for (auto & client : group->get_client_ptrs()) {
for (auto & weak_client : group->get_client_ptrs()) {
auto client = weak_client.lock();
if (client) {
clients_.push_back(client);
}

View File

@@ -18,6 +18,8 @@
<depend>rmw_implementation</depend>
<exec_depend>ament_cmake</exec_depend>
<test_depend>ament_cmake_gtest</test_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>

View File

@@ -41,7 +41,7 @@ CallbackGroup::get_service_ptrs() const
return service_ptrs_;
}
const std::vector<rclcpp::client::ClientBase::SharedPtr> &
const std::vector<rclcpp::client::ClientBase::WeakPtr> &
CallbackGroup::get_client_ptrs() const
{
return client_ptrs_;

View File

@@ -19,15 +19,57 @@
using rclcpp::executor::AnyExecutable;
using rclcpp::executor::Executor;
using rclcpp::executor::ExecutorArgs;
Executor::Executor(rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms)
: spinning(false), interrupt_guard_condition_(rmw_create_guard_condition()),
memory_strategy_(ms)
Executor::Executor(const ExecutorArgs & args)
: spinning(false),
memory_strategy_(args.memory_strategy)
{
interrupt_guard_condition_ = rmw_create_guard_condition();
if (!interrupt_guard_condition_) {
throw std::runtime_error("Failed to create interrupt guard condition in Executor constructor");
}
// The number of guard conditions is fixed at 2: 1 for the ctrl-c guard cond,
// and one for the executor's guard cond (interrupt_guard_condition_)
// These guard conditions are permanently attached to the waitset.
const size_t number_of_guard_conds = 2;
fixed_guard_conditions_.guard_condition_count = number_of_guard_conds;
fixed_guard_conditions_.guard_conditions = static_cast<void **>(guard_cond_handles_.data());
// Put the global ctrl-c guard condition in
assert(fixed_guard_conditions_.guard_condition_count > 1);
fixed_guard_conditions_.guard_conditions[0] = \
rclcpp::utilities::get_global_sigint_guard_condition()->data;
// Put the executor's guard condition in
fixed_guard_conditions_.guard_conditions[1] = interrupt_guard_condition_->data;
// The waitset adds the fixed guard conditions to the middleware waitset on initialization,
// and removes the guard conditions in rmw_destroy_waitset.
waitset_ = rmw_create_waitset(&fixed_guard_conditions_, args.max_conditions);
if (!waitset_) {
fprintf(stderr,
"[rclcpp::error] failed to create waitset: %s\n", rmw_get_error_string_safe());
rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_);
if (status != RMW_RET_OK) {
fprintf(stderr,
"[rclcpp::error] failed to destroy guard condition: %s\n", rmw_get_error_string_safe());
}
throw std::runtime_error("Failed to create waitset in Executor constructor");
}
}
Executor::~Executor()
{
// Try to deallocate the waitset.
if (waitset_) {
rmw_ret_t status = rmw_destroy_waitset(waitset_);
if (status != RMW_RET_OK) {
fprintf(stderr,
"[rclcpp::error] failed to destroy waitset: %s\n", rmw_get_error_string_safe());
}
}
// Try to deallocate the interrupt guard condition.
if (interrupt_guard_condition_ != nullptr) {
rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_);
@@ -41,6 +83,10 @@ Executor::~Executor()
void
Executor::add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify)
{
// If the node already has an executor
if (node_ptr->has_executor.exchange(true)) {
throw std::runtime_error("Node has already been added to an executor.");
}
// Check to ensure node not already added
for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
@@ -76,6 +122,7 @@ Executor::remove_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify)
// *INDENT-ON*
)
);
node_ptr->has_executor.store(false);
if (notify) {
// If the node was matched and removed, interrupt waiting
if (node_removed) {
@@ -318,25 +365,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
client_handles.client_count =
memory_strategy_->fill_client_handles(client_handles.clients);
// The number of guard conditions is fixed at 2: 1 for the ctrl-c guard cond,
// and one for the executor's guard cond (interrupt_guard_condition_)
size_t number_of_guard_conds = 2;
rmw_guard_conditions_t guard_condition_handles;
guard_condition_handles.guard_condition_count = number_of_guard_conds;
guard_condition_handles.guard_conditions = static_cast<void **>(guard_cond_handles_.data());
if (guard_condition_handles.guard_conditions == NULL &&
number_of_guard_conds > 0)
{
// TODO(wjwwood): Use a different error here? maybe std::bad_alloc?
throw std::runtime_error("Could not malloc for guard condition pointers.");
}
// Put the global ctrl-c guard condition in
assert(guard_condition_handles.guard_condition_count > 1);
guard_condition_handles.guard_conditions[0] = \
rclcpp::utilities::get_global_sigint_guard_condition()->data;
// Put the executor's guard condition in
guard_condition_handles.guard_conditions[1] = \
interrupt_guard_condition_->data;
// Don't pass guard conditions to rmw_wait; they are permanent fixtures of the waitset
rmw_time_t * wait_timeout = NULL;
rmw_time_t rmw_timeout;
@@ -364,19 +393,14 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
// Now wait on the waitable subscriptions and timers
rmw_ret_t status = rmw_wait(
&subscriber_handles,
&guard_condition_handles,
nullptr,
&service_handles,
&client_handles,
waitset_,
wait_timeout);
if (status != RMW_RET_OK && status != RMW_RET_TIMEOUT) {
throw std::runtime_error(rmw_get_error_string_safe());
}
// If ctrl-c guard condition, return directly
if (guard_condition_handles.guard_conditions[0] != 0) {
// Make sure to free or clean memory
memory_strategy_->clear_handles();
return;
}
memory_strategy_->remove_null_handles();
}

View File

@@ -27,4 +27,5 @@ rclcpp::spin(node::Node::SharedPtr node_ptr)
rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(node_ptr);
exec.spin();
exec.remove_node(node_ptr);
}

View File

@@ -23,8 +23,8 @@
using rclcpp::executors::multi_threaded_executor::MultiThreadedExecutor;
MultiThreadedExecutor::MultiThreadedExecutor(rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms)
: executor::Executor(ms)
MultiThreadedExecutor::MultiThreadedExecutor(const rclcpp::executor::ExecutorArgs & args)
: executor::Executor(args)
{
number_of_threads_ = std::thread::hardware_concurrency();
if (number_of_threads_ == 0) {

View File

@@ -17,10 +17,8 @@
using rclcpp::executors::single_threaded_executor::SingleThreadedExecutor;
SingleThreadedExecutor::SingleThreadedExecutor(
rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms)
: executor::Executor(ms) {}
SingleThreadedExecutor::SingleThreadedExecutor(const rclcpp::executor::ExecutorArgs & args)
: executor::Executor(args) {}
SingleThreadedExecutor::~SingleThreadedExecutor() {}

View File

@@ -84,8 +84,9 @@ MemoryStrategy::get_client_by_handle(void * client_handle, const WeakNodeVector
if (!group) {
continue;
}
for (auto & client : group->get_client_ptrs()) {
if (client->get_client_handle()->data == client_handle) {
for (auto & weak_client : group->get_client_ptrs()) {
auto client = weak_client.lock();
if (client && client->get_client_handle()->data == client_handle) {
return client;
}
}
@@ -182,8 +183,9 @@ MemoryStrategy::get_group_by_client(
if (!group) {
continue;
}
for (auto & cli : group->get_client_ptrs()) {
if (cli == client) {
for (auto & weak_client : group->get_client_ptrs()) {
auto cli = weak_client.lock();
if (cli && cli == client) {
return group;
}
}

View File

@@ -38,6 +38,7 @@ Node::Node(
number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0),
use_intra_process_comms_(use_intra_process_comms)
{
has_executor.store(false);
size_t domain_id = 0;
char * ros_domain_id = nullptr;
const char * env_var = "ROS_DOMAIN_ID";

View File

@@ -39,9 +39,9 @@ TEST(TestRate, rate_basics) {
rclcpp::utilities::sleep_for(offset);
ASSERT_TRUE(r.sleep());
auto two = std::chrono::system_clock::now();
delta = two - one;
ASSERT_TRUE(period < delta + epsilon);
ASSERT_TRUE(period * overrun_ratio > delta);
delta = two - start;
ASSERT_TRUE(2 * period < delta);
ASSERT_TRUE(2 * period * overrun_ratio > delta);
rclcpp::utilities::sleep_for(offset);
auto two_offset = std::chrono::system_clock::now();
@@ -66,35 +66,35 @@ TEST(TestRate, wall_rate_basics) {
auto epsilon = std::chrono::milliseconds(1);
double overrun_ratio = 1.5;
auto start = std::chrono::system_clock::now();
auto start = std::chrono::steady_clock::now();
rclcpp::rate::WallRate r(period);
ASSERT_TRUE(r.is_steady());
ASSERT_TRUE(r.sleep());
auto one = std::chrono::system_clock::now();
auto one = std::chrono::steady_clock::now();
auto delta = one - start;
ASSERT_TRUE(period < delta);
ASSERT_TRUE(period * overrun_ratio > delta);
rclcpp::utilities::sleep_for(offset);
ASSERT_TRUE(r.sleep());
auto two = std::chrono::system_clock::now();
delta = two - one;
ASSERT_TRUE(period < delta + epsilon);
ASSERT_TRUE(period * overrun_ratio > delta);
auto two = std::chrono::steady_clock::now();
delta = two - start;
ASSERT_TRUE(2 * period < delta + epsilon);
ASSERT_TRUE(2 * period * overrun_ratio > delta);
rclcpp::utilities::sleep_for(offset);
auto two_offset = std::chrono::system_clock::now();
auto two_offset = std::chrono::steady_clock::now();
r.reset();
ASSERT_TRUE(r.sleep());
auto three = std::chrono::system_clock::now();
auto three = std::chrono::steady_clock::now();
delta = three - two_offset;
ASSERT_TRUE(period < delta);
ASSERT_TRUE(period * overrun_ratio > delta);
rclcpp::utilities::sleep_for(offset + period);
auto four = std::chrono::system_clock::now();
auto four = std::chrono::steady_clock::now();
ASSERT_FALSE(r.sleep());
auto five = std::chrono::system_clock::now();
auto five = std::chrono::steady_clock::now();
delta = five - four;
ASSERT_TRUE(epsilon > delta);
}