Compare commits

...

12 Commits

Author SHA1 Message Date
Chris Lalancette
295849dce3 Add in dispatch_intra_process for SubscribedTypes.
Not used yet, but at least the general infrastructure is there.

Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>
2021-11-19 12:22:39 -05:00
Chris Lalancette
d3c0db1d59 Start pushing down the subscription side.
Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>
2021-11-19 11:19:43 -05:00
Chris Lalancette
2730af14ad Get rid of unnecessary return values in favor of 'typename'
Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>
2021-11-19 10:56:29 -05:00
Chris Lalancette
da5786933a More fixes in the intra-process manager.
This should now be ready for the next step, which is putting
the type into the subscription buffers.

Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>
2021-11-17 09:40:00 -05:00
Chris Lalancette
30f8ebd940 Refactor for less duplicated code.
Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>
2021-11-17 09:40:00 -05:00
Chris Lalancette
df1ba66fdd WIP: Move ROS conversion down to add_{owned,shared}_msg_to_buffers.
This is still a work-in-progress because we are currently doing
extra conversions in the return_shared case.

Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>
2021-11-17 08:37:15 -05:00
Chris Lalancette
149a8238fc Push the conversion down into the intra_process_manager.
This now compiles, but the type adapt tests are failing because
somehow the wrong allocator is being used.

Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>
2021-11-17 08:37:15 -05:00
Chris Lalancette
6d68132567 Push conversion down one level into do_intra_process_publish_and_return_shared.
Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>
2021-11-17 08:37:15 -05:00
Chris Lalancette
f55d29dada Cleanup the test_{publisher,subscription}_with_type_adapter tests.
Make them spew less errors and be a little cleaner:

1.  Remove the parameterization on various RMWs.  This wasn't
being used and just complicated the signatures.
2.  Because of 1, there is now only a single fixture for each
of the tests.  Add the rclcpp::init/rclcpp::shutdown into the
SetupTestCase/TearDownTestCase fixture.
3.  Add some additional error checking to tests.

Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>
2021-11-17 08:37:15 -05:00
Chris Lalancette
2803e17931 Cleanup includes.
Remove ones that aren't needed, and add in ones that are
actually needed in the respective files.

Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>
2021-11-17 08:37:15 -05:00
Chris Lalancette
222820eb80 Make sure to check the return value of rcl_trigger_guard_condition.
Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>
2021-11-16 10:47:12 -05:00
Chris Lalancette
1b790ddf4a Revamp the IntraProcessManager publish logic.
There is no change here, the new way is just easier to read.

Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>
2021-11-16 10:47:09 -05:00
17 changed files with 906 additions and 289 deletions

View File

@@ -476,7 +476,22 @@ public:
}
}
// Dispatch when input is a ros message and the output could be anything.
std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>
convert_custom_type_message_to_ros_message_unique_ptr(const SubscribedType & msg)
{
if constexpr (rclcpp::TypeAdapter<MessageT>::is_specialized::value) {
auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_type_allocator_, 1);
ROSMessageTypeAllocatorTraits::construct(ros_message_type_allocator_, ptr);
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, *ptr);
return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_);
} else {
throw std::runtime_error(
"convert_custom_type_message_to_ros_message_unique_ptr "
"unexpectedly called without TypeAdapter");
}
}
// Dispatch when input is a ROS message and the output could be anything.
void
dispatch(
std::shared_ptr<ROSMessageType> message,
@@ -529,7 +544,7 @@ public:
{
callback(convert_ros_message_to_custom_type_unique_ptr(*message), message_info);
}
// conditions for output is ros message
// conditions for output is ROS message
else if constexpr (std::is_same_v<T, ConstRefROSMessageCallback>) { // NOLINT
callback(*message);
} else if constexpr (std::is_same_v<T, ConstRefWithInfoROSMessageCallback>) {
@@ -656,7 +671,12 @@ public:
TRACEPOINT(callback_end, static_cast<const void *>(this));
}
void
template<typename T>
typename
std::enable_if_t<
rosidl_generator_traits::is_message<T>::value &&
std::is_same<T, ROSMessageType>::value
>
dispatch_intra_process(
std::shared_ptr<const ROSMessageType> message,
const rclcpp::MessageInfo & message_info)
@@ -672,83 +692,83 @@ public:
// Dispatch.
std::visit(
[&message, &message_info, this](auto && callback) {
using T = std::decay_t<decltype(callback)>;
using CallbackT = std::decay_t<decltype(callback)>;
static constexpr bool is_ta = rclcpp::TypeAdapter<MessageT>::is_specialized::value;
// conditions for custom type
if constexpr (is_ta && std::is_same_v<T, ConstRefCallback>) {
if constexpr (is_ta && std::is_same_v<CallbackT, ConstRefCallback>) {
auto local_message = convert_ros_message_to_custom_type_unique_ptr(*message);
callback(*local_message);
} else if constexpr (is_ta && std::is_same_v<T, ConstRefWithInfoCallback>) { // NOLINT
} else if constexpr (is_ta && std::is_same_v<CallbackT, ConstRefWithInfoCallback>) { // NOLINT
auto local_message = convert_ros_message_to_custom_type_unique_ptr(*message);
callback(*local_message, message_info);
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<T, UniquePtrCallback>||
std::is_same_v<T, SharedPtrCallback>
std::is_same_v<CallbackT, UniquePtrCallback>||
std::is_same_v<CallbackT, SharedPtrCallback>
))
{
callback(convert_ros_message_to_custom_type_unique_ptr(*message));
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<T, UniquePtrWithInfoCallback>||
std::is_same_v<T, SharedPtrWithInfoCallback>
std::is_same_v<CallbackT, UniquePtrWithInfoCallback>||
std::is_same_v<CallbackT, SharedPtrWithInfoCallback>
))
{
callback(convert_ros_message_to_custom_type_unique_ptr(*message), message_info);
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<T, SharedConstPtrCallback>||
std::is_same_v<T, ConstRefSharedConstPtrCallback>
std::is_same_v<CallbackT, SharedConstPtrCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrCallback>
))
{
callback(convert_ros_message_to_custom_type_unique_ptr(*message));
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<T, SharedConstPtrWithInfoCallback>||
std::is_same_v<T, ConstRefSharedConstPtrWithInfoCallback>
std::is_same_v<CallbackT, SharedConstPtrWithInfoCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrWithInfoCallback>
))
{
callback(convert_ros_message_to_custom_type_unique_ptr(*message), message_info);
}
// conditions for ros message type
else if constexpr (std::is_same_v<T, ConstRefROSMessageCallback>) { // NOLINT
// conditions for ROS message type
else if constexpr (std::is_same_v<CallbackT, ConstRefROSMessageCallback>) { // NOLINT
callback(*message);
} else if constexpr (std::is_same_v<T, ConstRefWithInfoROSMessageCallback>) {
} else if constexpr (std::is_same_v<CallbackT, ConstRefWithInfoROSMessageCallback>) {
callback(*message, message_info);
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<T, UniquePtrROSMessageCallback>||
std::is_same_v<T, SharedPtrROSMessageCallback>)
std::is_same_v<CallbackT, UniquePtrROSMessageCallback>||
std::is_same_v<CallbackT, SharedPtrROSMessageCallback>)
{
callback(create_ros_unique_ptr_from_ros_shared_ptr_message(message));
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<T, UniquePtrWithInfoROSMessageCallback>||
std::is_same_v<T, SharedPtrWithInfoROSMessageCallback>)
std::is_same_v<CallbackT, UniquePtrWithInfoROSMessageCallback>||
std::is_same_v<CallbackT, SharedPtrWithInfoROSMessageCallback>)
{
callback(create_ros_unique_ptr_from_ros_shared_ptr_message(message), message_info);
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<T, SharedConstPtrROSMessageCallback>||
std::is_same_v<T, ConstRefSharedConstPtrROSMessageCallback>)
std::is_same_v<CallbackT, SharedConstPtrROSMessageCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrROSMessageCallback>)
{
callback(message);
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<T, SharedConstPtrWithInfoROSMessageCallback>||
std::is_same_v<T, ConstRefSharedConstPtrWithInfoROSMessageCallback>)
std::is_same_v<CallbackT, SharedConstPtrWithInfoROSMessageCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrWithInfoROSMessageCallback>)
{
callback(message, message_info);
}
// condition to catch SerializedMessage types
else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<T, ConstRefSerializedMessageCallback>||
std::is_same_v<T, ConstRefSerializedMessageWithInfoCallback>||
std::is_same_v<T, UniquePtrSerializedMessageCallback>||
std::is_same_v<T, UniquePtrSerializedMessageWithInfoCallback>||
std::is_same_v<T, SharedConstPtrSerializedMessageCallback>||
std::is_same_v<T, SharedConstPtrSerializedMessageWithInfoCallback>||
std::is_same_v<T, ConstRefSharedConstPtrSerializedMessageCallback>||
std::is_same_v<T, ConstRefSharedConstPtrSerializedMessageWithInfoCallback>||
std::is_same_v<T, SharedPtrSerializedMessageCallback>||
std::is_same_v<T, SharedPtrSerializedMessageWithInfoCallback>)
std::is_same_v<CallbackT, ConstRefSerializedMessageCallback>||
std::is_same_v<CallbackT, ConstRefSerializedMessageWithInfoCallback>||
std::is_same_v<CallbackT, UniquePtrSerializedMessageCallback>||
std::is_same_v<CallbackT, UniquePtrSerializedMessageWithInfoCallback>||
std::is_same_v<CallbackT, SharedConstPtrSerializedMessageCallback>||
std::is_same_v<CallbackT, SharedConstPtrSerializedMessageWithInfoCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrSerializedMessageCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrSerializedMessageWithInfoCallback>||
std::is_same_v<CallbackT, SharedPtrSerializedMessageCallback>||
std::is_same_v<CallbackT, SharedPtrSerializedMessageWithInfoCallback>)
{
throw std::runtime_error(
"Cannot dispatch std::shared_ptr<const ROSMessageType> message "
@@ -756,13 +776,18 @@ public:
}
// condition to catch unhandled callback types
else { // NOLINT[readability/braces]
static_assert(always_false_v<T>, "unhandled callback type");
static_assert(always_false_v<CallbackT>, "unhandled callback type");
}
}, callback_variant_);
TRACEPOINT(callback_end, static_cast<const void *>(this));
}
void
template<typename T>
typename
std::enable_if_t<
rosidl_generator_traits::is_message<T>::value &&
std::is_same<T, ROSMessageType>::value
>
dispatch_intra_process(
std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> message,
const rclcpp::MessageInfo & message_info)
@@ -778,83 +803,83 @@ public:
// Dispatch.
std::visit(
[&message, &message_info, this](auto && callback) {
using T = std::decay_t<decltype(callback)>;
using CallbackT = std::decay_t<decltype(callback)>;
static constexpr bool is_ta = rclcpp::TypeAdapter<MessageT>::is_specialized::value;
// conditions for custom type
if constexpr (is_ta && std::is_same_v<T, ConstRefCallback>) {
if constexpr (is_ta && std::is_same_v<CallbackT, ConstRefCallback>) {
auto local_message = convert_ros_message_to_custom_type_unique_ptr(*message);
callback(*local_message);
} else if constexpr (is_ta && std::is_same_v<T, ConstRefWithInfoCallback>) { // NOLINT
} else if constexpr (is_ta && std::is_same_v<CallbackT, ConstRefWithInfoCallback>) { // NOLINT
auto local_message = convert_ros_message_to_custom_type_unique_ptr(*message);
callback(*local_message, message_info);
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<T, UniquePtrCallback>||
std::is_same_v<T, SharedPtrCallback>
std::is_same_v<CallbackT, UniquePtrCallback>||
std::is_same_v<CallbackT, SharedPtrCallback>
))
{
callback(convert_ros_message_to_custom_type_unique_ptr(*message));
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<T, UniquePtrWithInfoCallback>||
std::is_same_v<T, SharedPtrWithInfoCallback>
std::is_same_v<CallbackT, UniquePtrWithInfoCallback>||
std::is_same_v<CallbackT, SharedPtrWithInfoCallback>
))
{
callback(convert_ros_message_to_custom_type_unique_ptr(*message), message_info);
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<T, SharedConstPtrCallback>||
std::is_same_v<T, ConstRefSharedConstPtrCallback>
std::is_same_v<CallbackT, SharedConstPtrCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrCallback>
))
{
callback(convert_ros_message_to_custom_type_unique_ptr(*message));
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<T, SharedConstPtrWithInfoCallback>||
std::is_same_v<T, ConstRefSharedConstPtrWithInfoCallback>
std::is_same_v<CallbackT, SharedConstPtrWithInfoCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrWithInfoCallback>
))
{
callback(convert_ros_message_to_custom_type_unique_ptr(*message), message_info);
}
// conditions for ros message type
else if constexpr (std::is_same_v<T, ConstRefROSMessageCallback>) { // NOLINT
// conditions for ROS message type
else if constexpr (std::is_same_v<CallbackT, ConstRefROSMessageCallback>) { // NOLINT
callback(*message);
} else if constexpr (std::is_same_v<T, ConstRefWithInfoROSMessageCallback>) {
} else if constexpr (std::is_same_v<CallbackT, ConstRefWithInfoROSMessageCallback>) {
callback(*message, message_info);
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<T, UniquePtrROSMessageCallback>||
std::is_same_v<T, SharedPtrROSMessageCallback>)
std::is_same_v<CallbackT, UniquePtrROSMessageCallback>||
std::is_same_v<CallbackT, SharedPtrROSMessageCallback>)
{
callback(std::move(message));
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<T, UniquePtrWithInfoROSMessageCallback>||
std::is_same_v<T, SharedPtrWithInfoROSMessageCallback>)
std::is_same_v<CallbackT, UniquePtrWithInfoROSMessageCallback>||
std::is_same_v<CallbackT, SharedPtrWithInfoROSMessageCallback>)
{
callback(std::move(message), message_info);
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<T, SharedConstPtrROSMessageCallback>||
std::is_same_v<T, ConstRefSharedConstPtrROSMessageCallback>)
std::is_same_v<CallbackT, SharedConstPtrROSMessageCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrROSMessageCallback>)
{
callback(std::move(message));
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<T, SharedConstPtrWithInfoROSMessageCallback>||
std::is_same_v<T, ConstRefSharedConstPtrWithInfoROSMessageCallback>)
std::is_same_v<CallbackT, SharedConstPtrWithInfoROSMessageCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrWithInfoROSMessageCallback>)
{
callback(std::move(message), message_info);
}
// condition to catch SerializedMessage types
else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<T, ConstRefSerializedMessageCallback>||
std::is_same_v<T, ConstRefSerializedMessageWithInfoCallback>||
std::is_same_v<T, UniquePtrSerializedMessageCallback>||
std::is_same_v<T, UniquePtrSerializedMessageWithInfoCallback>||
std::is_same_v<T, SharedConstPtrSerializedMessageCallback>||
std::is_same_v<T, SharedConstPtrSerializedMessageWithInfoCallback>||
std::is_same_v<T, ConstRefSharedConstPtrSerializedMessageCallback>||
std::is_same_v<T, ConstRefSharedConstPtrSerializedMessageWithInfoCallback>||
std::is_same_v<T, SharedPtrSerializedMessageCallback>||
std::is_same_v<T, SharedPtrSerializedMessageWithInfoCallback>)
std::is_same_v<CallbackT, ConstRefSerializedMessageCallback>||
std::is_same_v<CallbackT, ConstRefSerializedMessageWithInfoCallback>||
std::is_same_v<CallbackT, UniquePtrSerializedMessageCallback>||
std::is_same_v<CallbackT, UniquePtrSerializedMessageWithInfoCallback>||
std::is_same_v<CallbackT, SharedConstPtrSerializedMessageCallback>||
std::is_same_v<CallbackT, SharedConstPtrSerializedMessageWithInfoCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrSerializedMessageCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrSerializedMessageWithInfoCallback>||
std::is_same_v<CallbackT, SharedPtrSerializedMessageCallback>||
std::is_same_v<CallbackT, SharedPtrSerializedMessageWithInfoCallback>)
{
throw std::runtime_error(
"Cannot dispatch std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> message "
@@ -862,7 +887,227 @@ public:
}
// condition to catch unhandled callback types
else { // NOLINT[readability/braces]
static_assert(always_false_v<T>, "unhandled callback type");
static_assert(always_false_v<CallbackT>, "unhandled callback type");
}
}, callback_variant_);
TRACEPOINT(callback_end, static_cast<const void *>(this));
}
template<typename T>
typename std::enable_if_t<
rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
std::is_same<T, SubscribedType>::value
>
dispatch_intra_process(
std::shared_ptr<const T> message,
const rclcpp::MessageInfo & message_info)
{
TRACEPOINT(callback_start, static_cast<const void *>(this), true);
// Check if the variant is "unset", throw if it is.
if (callback_variant_.index() == 0) {
if (std::get<0>(callback_variant_) == nullptr) {
// This can happen if it is default initialized, or if it is assigned nullptr.
throw std::runtime_error("dispatch called on an unset AnySubscriptionCallback");
}
}
// Dispatch.
std::visit(
[&message, &message_info, this](auto && callback) {
using CallbackT = std::decay_t<decltype(callback)>;
static constexpr bool is_ta = rclcpp::TypeAdapter<MessageT>::is_specialized::value;
// conditions for custom type
if constexpr (is_ta && std::is_same_v<CallbackT, ConstRefCallback>) {
callback(*message);
} else if constexpr (is_ta && std::is_same_v<CallbackT, ConstRefWithInfoCallback>) { // NOLINT
callback(*message);
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<CallbackT, UniquePtrCallback>||
std::is_same_v<CallbackT, SharedPtrCallback>
))
{
callback(create_custom_unique_ptr_from_custom_shared_ptr_message(message));
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<CallbackT, UniquePtrWithInfoCallback>||
std::is_same_v<CallbackT, SharedPtrWithInfoCallback>
))
{
callback(create_custom_unique_ptr_from_custom_shared_ptr_message(message), message_info);
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<CallbackT, SharedConstPtrCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrCallback>
))
{
callback(std::unique_ptr<SubscribedType, SubscribedTypeDeleter>(message.get(), subscribed_type_deleter_));
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<CallbackT, SharedConstPtrWithInfoCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrWithInfoCallback>
))
{
callback(std::unique_ptr<SubscribedType, SubscribedTypeDeleter>(message.get(), subscribed_type_deleter_), message_info);
}
// conditions for ROS message type
else if constexpr (std::is_same_v<CallbackT, ConstRefROSMessageCallback>) { // NOLINT
auto local_message = convert_custom_type_message_to_ros_message_unique_ptr(*message);
callback(*local_message);
} else if constexpr (std::is_same_v<CallbackT, ConstRefWithInfoROSMessageCallback>) {
auto local_message = convert_custom_type_message_to_ros_message_unique_ptr(*message);
callback(*local_message, message_info);
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<CallbackT, UniquePtrROSMessageCallback>||
std::is_same_v<CallbackT, SharedPtrROSMessageCallback>)
{
callback(convert_custom_type_message_to_ros_message_unique_ptr(*message));
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<CallbackT, UniquePtrWithInfoROSMessageCallback>||
std::is_same_v<CallbackT, SharedPtrWithInfoROSMessageCallback>)
{
callback(convert_custom_type_message_to_ros_message_unique_ptr(*message), message_info);
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<CallbackT, SharedConstPtrROSMessageCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrROSMessageCallback>)
{
callback(convert_custom_type_message_to_ros_message_unique_ptr(*message));
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<CallbackT, SharedConstPtrWithInfoROSMessageCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrWithInfoROSMessageCallback>)
{
callback(convert_custom_type_message_to_ros_message_unique_ptr(*message), message_info);
}
// condition to catch SerializedMessage types
else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<CallbackT, ConstRefSerializedMessageCallback>||
std::is_same_v<CallbackT, ConstRefSerializedMessageWithInfoCallback>||
std::is_same_v<CallbackT, UniquePtrSerializedMessageCallback>||
std::is_same_v<CallbackT, UniquePtrSerializedMessageWithInfoCallback>||
std::is_same_v<CallbackT, SharedConstPtrSerializedMessageCallback>||
std::is_same_v<CallbackT, SharedConstPtrSerializedMessageWithInfoCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrSerializedMessageCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrSerializedMessageWithInfoCallback>||
std::is_same_v<CallbackT, SharedPtrSerializedMessageCallback>||
std::is_same_v<CallbackT, SharedPtrSerializedMessageWithInfoCallback>)
{
throw std::runtime_error(
"Cannot dispatch std::shared_ptr<const ROSMessageType> message "
"to rclcpp::SerializedMessage");
}
// condition to catch unhandled callback types
else { // NOLINT[readability/braces]
static_assert(always_false_v<CallbackT>, "unhandled callback type");
}
}, callback_variant_);
TRACEPOINT(callback_end, static_cast<const void *>(this));
}
template<typename T>
typename std::enable_if_t<
rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
std::is_same<T, SubscribedType>::value
>
dispatch_intra_process(
std::unique_ptr<T, SubscribedTypeDeleter> message,
const rclcpp::MessageInfo & message_info)
{
TRACEPOINT(callback_start, static_cast<const void *>(this), true);
// Check if the variant is "unset", throw if it is.
if (callback_variant_.index() == 0) {
if (std::get<0>(callback_variant_) == nullptr) {
// This can happen if it is default initialized, or if it is assigned nullptr.
throw std::runtime_error("dispatch called on an unset AnySubscriptionCallback");
}
}
// Dispatch.
std::visit(
[&message, &message_info, this](auto && callback) {
using CallbackT = std::decay_t<decltype(callback)>;
static constexpr bool is_ta = rclcpp::TypeAdapter<MessageT>::is_specialized::value;
// conditions for custom type
if constexpr (is_ta && std::is_same_v<CallbackT, ConstRefCallback>) {
callback(*message);
} else if constexpr (is_ta && std::is_same_v<CallbackT, ConstRefWithInfoCallback>) { // NOLINT
callback(*message);
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<CallbackT, UniquePtrCallback>||
std::is_same_v<CallbackT, SharedPtrCallback>
))
{
callback(std::move(message));
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<CallbackT, UniquePtrWithInfoCallback>||
std::is_same_v<CallbackT, SharedPtrWithInfoCallback>
))
{
callback(std::move(message), message_info);
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<CallbackT, SharedConstPtrCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrCallback>
))
{
callback(std::move(message));
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<CallbackT, SharedConstPtrWithInfoCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrWithInfoCallback>
))
{
callback(std::move(message), message_info);
}
// conditions for ROS message type
else if constexpr (std::is_same_v<CallbackT, ConstRefROSMessageCallback>) { // NOLINT
auto local_message = convert_custom_type_to_ros_message_unique_ptr(*message);
callback(*local_message);
} else if constexpr (std::is_same_v<CallbackT, ConstRefWithInfoROSMessageCallback>) {
auto local_message = convert_custom_type_to_ros_message_unique_ptr(*message);
callback(*local_message, message_info);
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<CallbackT, UniquePtrROSMessageCallback>||
std::is_same_v<CallbackT, SharedPtrROSMessageCallback>)
{
callback(convert_custom_type_to_ros_message_unique_ptr(*message));
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<CallbackT, UniquePtrWithInfoROSMessageCallback>||
std::is_same_v<CallbackT, SharedPtrWithInfoROSMessageCallback>)
{
callback(convert_custom_type_to_ros_message_unique_ptr(*message), message_info);
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<CallbackT, SharedConstPtrROSMessageCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrROSMessageCallback>)
{
callback(convert_custom_type_to_ros_message_unique_ptr(*message));
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<CallbackT, SharedConstPtrWithInfoROSMessageCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrWithInfoROSMessageCallback>)
{
callback(convert_custom_type_to_ros_message_unique_ptr(*message), message_info);
}
// condition to catch SerializedMessage types
else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<CallbackT, ConstRefSerializedMessageCallback>||
std::is_same_v<CallbackT, ConstRefSerializedMessageWithInfoCallback>||
std::is_same_v<CallbackT, UniquePtrSerializedMessageCallback>||
std::is_same_v<CallbackT, UniquePtrSerializedMessageWithInfoCallback>||
std::is_same_v<CallbackT, SharedConstPtrSerializedMessageCallback>||
std::is_same_v<CallbackT, SharedConstPtrSerializedMessageWithInfoCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrSerializedMessageCallback>||
std::is_same_v<CallbackT, ConstRefSharedConstPtrSerializedMessageWithInfoCallback>||
std::is_same_v<CallbackT, SharedPtrSerializedMessageCallback>||
std::is_same_v<CallbackT, SharedPtrSerializedMessageWithInfoCallback>)
{
throw std::runtime_error(
"Cannot dispatch std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> message "
"to rclcpp::SerializedMessage");
}
// condition to catch unhandled callback types
else { // NOLINT[readability/braces]
static_assert(always_false_v<CallbackT>, "unhandled callback type");
}
}, callback_variant_);
TRACEPOINT(callback_end, static_cast<const void *>(this));

View File

@@ -16,6 +16,7 @@
#define RCLCPP__EXPERIMENTAL__BUFFERS__INTRA_PROCESS_BUFFER_HPP_
#include <memory>
#include <stdexcept>
#include <type_traits>
#include <utility>

View File

@@ -15,10 +15,6 @@
#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__RING_BUFFER_IMPLEMENTATION_HPP_
#define RCLCPP__EXPERIMENTAL__BUFFERS__RING_BUFFER_IMPLEMENTATION_HPP_
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <utility>

View File

@@ -16,11 +16,9 @@
#define RCLCPP__EXPERIMENTAL__CREATE_INTRA_PROCESS_BUFFER_HPP_
#include <memory>
#include <type_traits>
#include <stdexcept>
#include <utility>
#include "rcl/subscription.h"
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
#include "rclcpp/experimental/buffers/ring_buffer_implementation.hpp"
#include "rclcpp/intra_process_buffer_type.hpp"

View File

@@ -15,7 +15,6 @@
#ifndef RCLCPP__EXPERIMENTAL__EXECUTABLE_LIST_HPP_
#define RCLCPP__EXPERIMENTAL__EXECUTABLE_LIST_HPP_
#include <memory>
#include <vector>
#include "rclcpp/client.hpp"

View File

@@ -19,17 +19,16 @@
#include <shared_mutex>
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <exception>
#include <map>
#include <iterator>
#include <memory>
#include <string>
#include <stdexcept>
#include <type_traits>
#include <unordered_map>
#include <utility>
#include <vector>
#include "rosidl_runtime_cpp/traits.hpp"
#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/experimental/subscription_intra_process.hpp"
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
@@ -38,6 +37,7 @@
#include "rclcpp/logging.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/publisher_base.hpp"
#include "rclcpp/type_adapter.hpp"
#include "rclcpp/visibility_control.hpp"
namespace rclcpp
@@ -175,17 +175,32 @@ public:
* \param message the message that is being stored.
*/
template<
typename T,
typename DeleterT,
typename MessageT,
typename Alloc = std::allocator<void>,
typename Deleter = std::default_delete<MessageT>>
void
typename AllocatorT = std::allocator<void>,
typename ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type,
typename PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type,
typename ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>,
typename ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type,
typename ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>,
typename PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, AllocatorT>,
typename PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type
>
typename
std::enable_if_t<
rosidl_generator_traits::is_message<T>::value &&
std::is_same<T, ROSMessageType>::value
>
do_intra_process_publish(
uint64_t intra_process_publisher_id,
std::unique_ptr<MessageT, Deleter> message,
typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
std::unique_ptr<T, DeleterT> message,
ROSMessageTypeAllocator & ros_message_type_allocator,
ROSMessageTypeDeleter & ros_message_type_deleter,
PublishedTypeAllocator & published_type_allocator)
{
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageAllocatorT = typename MessageAllocTraits::allocator_type;
(void)ros_message_type_deleter;
(void)published_type_allocator;
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
@@ -201,53 +216,152 @@ public:
if (sub_ids.take_ownership_subscriptions.empty()) {
// None of the buffers require ownership, so we promote the pointer
std::shared_ptr<MessageT> msg = std::move(message);
std::shared_ptr<ROSMessageType> msg = std::move(message);
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_shared_msg_to_buffers<ROSMessageType, AllocatorT>(
msg, sub_ids.take_shared_subscriptions);
} else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
sub_ids.take_shared_subscriptions.size() <= 1)
{
// There is at maximum 1 buffer that does not require ownership.
// So this case is equivalent to all the buffers requiring ownership
} else {
if (sub_ids.take_shared_subscriptions.size() <= 1) {
// There is at maximum 1 buffer that does not require ownership.
// So this case is equivalent to all the buffers requiring ownership
// Merge the two vector of ids into a unique one
std::vector<uint64_t> concatenated_vector(sub_ids.take_shared_subscriptions);
concatenated_vector.insert(
concatenated_vector.end(),
sub_ids.take_ownership_subscriptions.begin(),
sub_ids.take_ownership_subscriptions.end());
// Merge the two vector of ids into a unique one
std::vector<uint64_t> concatenated_vector(sub_ids.take_shared_subscriptions);
concatenated_vector.insert(
concatenated_vector.end(),
sub_ids.take_ownership_subscriptions.begin(),
sub_ids.take_ownership_subscriptions.end());
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
std::move(message),
concatenated_vector,
allocator);
} else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
sub_ids.take_shared_subscriptions.size() > 1)
{
// Construct a new shared pointer from the message
// for the buffers that do not require ownership
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
this->template add_owned_msg_to_buffers<ROSMessageType, AllocatorT>(
std::move(message),
concatenated_vector,
ros_message_type_allocator);
} else {
// Construct a new shared pointer from the message
// for the buffers that do not require ownership
auto shared_msg = std::allocate_shared<ROSMessageType, ROSMessageTypeAllocator>(
ros_message_type_allocator, *message);
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
shared_msg, sub_ids.take_shared_subscriptions);
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
std::move(message), sub_ids.take_ownership_subscriptions, allocator);
this->template add_shared_msg_to_buffers<ROSMessageType, AllocatorT>(
shared_msg, sub_ids.take_shared_subscriptions);
this->template add_owned_msg_to_buffers<ROSMessageType, AllocatorT>(
std::move(message), sub_ids.take_ownership_subscriptions, ros_message_type_allocator);
}
}
}
template<
typename T,
typename DeleterT,
typename MessageT,
typename Alloc = std::allocator<void>,
typename Deleter = std::default_delete<MessageT>>
std::shared_ptr<const MessageT>
typename AllocatorT = std::allocator<void>,
typename ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type,
typename PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type,
typename ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>,
typename ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type,
typename ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>,
typename PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, AllocatorT>,
typename PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type
>
typename
std::enable_if_t<
rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
std::is_same<T, PublishedType>::value
>
do_intra_process_publish(
uint64_t intra_process_publisher_id,
std::unique_ptr<T, DeleterT> message,
ROSMessageTypeAllocator & ros_message_type_allocator,
ROSMessageTypeDeleter & ros_message_type_deleter,
PublishedTypeAllocator & published_type_allocator)
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
if (publisher_it == pub_to_subs_.end()) {
// Publisher is either invalid or no longer exists.
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Calling do_intra_process_publish for invalid or no longer existing publisher id");
return;
}
const auto & sub_ids = publisher_it->second;
if (sub_ids.take_ownership_subscriptions.empty()) {
// None of the buffers require ownership, so we promote the pointer
std::shared_ptr<T> msg = std::move(message);
this->template add_blah_shared_msg_to_buffers<MessageT, T, AllocatorT>(
msg,
sub_ids.take_shared_subscriptions,
ros_message_type_allocator,
ros_message_type_deleter);
} else {
if (sub_ids.take_shared_subscriptions.size() <= 1) {
// There is at maximum 1 buffer that does not require ownership.
// So this case is equivalent to all the buffers requiring ownership
// Merge the two vector of ids into a unique one
std::vector<uint64_t> concatenated_vector(sub_ids.take_shared_subscriptions);
concatenated_vector.insert(
concatenated_vector.end(),
sub_ids.take_ownership_subscriptions.begin(),
sub_ids.take_ownership_subscriptions.end());
this->template add_blah_owned_msg_to_buffers<MessageT, T, AllocatorT>(
std::move(message),
concatenated_vector,
ros_message_type_allocator,
ros_message_type_deleter);
} else {
// Construct a new shared pointer from the message
// for the buffers that do not require ownership
auto shared_msg = std::allocate_shared<T, PublishedTypeAllocator>(
published_type_allocator,
*message);
this->template add_blah_shared_msg_to_buffers<MessageT, T, AllocatorT>(
shared_msg,
sub_ids.take_shared_subscriptions,
ros_message_type_allocator,
ros_message_type_deleter);
this->template add_blah_owned_msg_to_buffers<MessageT, T, AllocatorT>(
std::move(message),
sub_ids.take_ownership_subscriptions,
ros_message_type_allocator,
ros_message_type_deleter);
}
}
}
template<
typename T,
typename DeleterT,
typename MessageT,
typename AllocatorT,
typename ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type,
typename PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type,
typename ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>,
typename ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type,
typename ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>,
typename PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, AllocatorT>,
typename PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type
>
typename
std::enable_if_t<
rosidl_generator_traits::is_message<T>::value &&
std::is_same<T, ROSMessageType>::value,
std::shared_ptr<const ROSMessageType>
>
do_intra_process_publish_and_return_shared(
uint64_t intra_process_publisher_id,
std::unique_ptr<MessageT, Deleter> message,
typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
std::unique_ptr<T, DeleterT> message,
ROSMessageTypeAllocator & ros_message_type_allocator,
ROSMessageTypeDeleter & ros_message_type_deleter,
PublishedTypeAllocator & published_type_allocator)
{
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageAllocatorT = typename MessageAllocTraits::allocator_type;
(void)ros_message_type_deleter;
(void)published_type_allocator;
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
@@ -263,33 +377,111 @@ public:
if (sub_ids.take_ownership_subscriptions.empty()) {
// If there are no owning, just convert to shared.
std::shared_ptr<MessageT> shared_msg = std::move(message);
std::shared_ptr<ROSMessageType> shared_msg = std::move(message);
if (!sub_ids.take_shared_subscriptions.empty()) {
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_shared_msg_to_buffers<ROSMessageType, AllocatorT>(
shared_msg, sub_ids.take_shared_subscriptions);
}
return shared_msg;
} else {
// Construct a new shared pointer from the message for the buffers that
// do not require ownership and to return.
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
auto shared_msg = std::allocate_shared<ROSMessageType, ROSMessageTypeAllocator>(
ros_message_type_allocator, *message);
if (!sub_ids.take_shared_subscriptions.empty()) {
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_shared_msg_to_buffers<ROSMessageType, AllocatorT>(
shared_msg,
sub_ids.take_shared_subscriptions);
}
if (!sub_ids.take_ownership_subscriptions.empty()) {
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
std::move(message),
sub_ids.take_ownership_subscriptions,
allocator);
}
this->template add_owned_msg_to_buffers<ROSMessageType, AllocatorT>(
std::move(message),
sub_ids.take_ownership_subscriptions,
ros_message_type_allocator);
return shared_msg;
}
}
template<
typename T,
typename DeleterT,
typename MessageT,
typename AllocatorT,
typename ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type,
typename PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type,
typename ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>,
typename ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type,
typename ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>,
typename PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, AllocatorT>,
typename PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type
>
typename
std::enable_if_t<
rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
std::is_same<T, PublishedType>::value, std::shared_ptr<const ROSMessageType>
>
do_intra_process_publish_and_return_shared(
uint64_t intra_process_publisher_id,
std::unique_ptr<T, DeleterT> message,
ROSMessageTypeAllocator & ros_message_type_allocator,
ROSMessageTypeDeleter & ros_message_type_deleter,
PublishedTypeAllocator & published_type_allocator)
{
auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_type_allocator, 1);
ROSMessageTypeAllocatorTraits::construct(ros_message_type_allocator, ptr);
auto shared_ros_msg = std::shared_ptr<ROSMessageType>(ptr, ros_message_type_deleter);
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*message, *shared_ros_msg);
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
if (publisher_it == pub_to_subs_.end()) {
// Publisher is either invalid or no longer exists.
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Calling do_intra_process_publish for invalid or no longer existing publisher id");
return nullptr;
}
const auto & sub_ids = publisher_it->second;
if (sub_ids.take_ownership_subscriptions.empty()) {
// If there are no owning, just convert to shared.
std::shared_ptr<T> shared_msg = std::move(message);
if (!sub_ids.take_shared_subscriptions.empty()) {
this->template add_blah_shared_msg_to_buffers<MessageT, T, AllocatorT>(
shared_msg,
sub_ids.take_shared_subscriptions,
ros_message_type_allocator,
ros_message_type_deleter);
}
return shared_ros_msg;
} else {
// Construct a new shared pointer from the message for the buffers that
// do not require ownership and to return.
auto shared_msg = std::allocate_shared<T, PublishedTypeAllocator>(
published_type_allocator,
*message);
if (!sub_ids.take_shared_subscriptions.empty()) {
this->template add_blah_shared_msg_to_buffers<MessageT, T, AllocatorT>(
shared_msg,
sub_ids.take_shared_subscriptions,
ros_message_type_allocator,
ros_message_type_deleter);
}
this->template add_blah_owned_msg_to_buffers<MessageT, T, AllocatorT>(
std::move(message),
sub_ids.take_ownership_subscriptions,
ros_message_type_allocator,
ros_message_type_deleter);
return shared_ros_msg;
}
}
/// Return true if the given rmw_gid_t matches any stored Publishers.
RCLCPP_PUBLIC
bool
@@ -337,9 +529,14 @@ private:
template<
typename MessageT,
typename Alloc,
typename Deleter>
void
typename Alloc = std::allocator<void>,
typename Deleter = std::default_delete<MessageT>,
typename ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type>
typename
std::enable_if_t<
rosidl_generator_traits::is_message<MessageT>::value &&
std::is_same<MessageT, ROSMessageType>::value
>
add_shared_msg_to_buffers(
std::shared_ptr<const MessageT> message,
std::vector<uint64_t> subscription_ids)
@@ -369,11 +566,69 @@ private:
}
}
template<
typename MessageT,
typename T,
typename AllocatorT,
typename ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type,
typename PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type,
typename ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>,
typename ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type,
typename ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>
>
typename
std::enable_if_t<
rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
std::is_same<T, PublishedType>::value
>
add_blah_shared_msg_to_buffers(
std::shared_ptr<const T> message,
std::vector<uint64_t> subscription_ids,
ROSMessageTypeAllocator & allocator,
ROSMessageTypeDeleter & deleter)
{
// TODO(clalancette): This goes away once the subscription buffer can take PublishedTypes
auto ptr = ROSMessageTypeAllocatorTraits::allocate(allocator, 1);
ROSMessageTypeAllocatorTraits::construct(allocator, ptr);
auto shared_ros_msg = std::shared_ptr<ROSMessageType>(ptr, deleter);
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*message, *shared_ros_msg);
for (auto id : subscription_ids) {
auto subscription_it = subscriptions_.find(id);
if (subscription_it == subscriptions_.end()) {
throw std::runtime_error("subscription has unexpectedly gone out of scope");
}
auto subscription_base = subscription_it->second.lock();
if (subscription_base) {
auto subscription = std::dynamic_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcessBuffer<ROSMessageType,
AllocatorT, ROSMessageTypeDeleter>
>(subscription_base);
if (nullptr == subscription) {
throw std::runtime_error(
"failed to dynamic cast SubscriptionIntraProcessBase to "
"SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, which "
"can happen when the publisher and subscription use different "
"allocator types, which is not supported");
}
subscription->provide_intra_process_message(shared_ros_msg);
} else {
subscriptions_.erase(id);
}
}
}
template<
typename MessageT,
typename Alloc = std::allocator<void>,
typename Deleter = std::default_delete<MessageT>>
void
typename Deleter = std::default_delete<MessageT>,
typename ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type>
typename
std::enable_if_t<
rosidl_generator_traits::is_message<MessageT>::value &&
std::is_same<MessageT, ROSMessageType>::value
>
add_owned_msg_to_buffers(
std::unique_ptr<MessageT, Deleter> message,
std::vector<uint64_t> subscription_ids,
@@ -419,6 +674,74 @@ private:
}
}
template<
typename MessageT,
typename T,
typename AllocatorT,
typename ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type,
typename PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type,
typename ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>,
typename ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type,
typename ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>
>
typename
std::enable_if_t<
rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
std::is_same<T, PublishedType>::value
>
add_blah_owned_msg_to_buffers(
std::unique_ptr<T> message,
std::vector<uint64_t> subscription_ids,
ROSMessageTypeAllocator & allocator,
ROSMessageTypeDeleter & deleter)
{
std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> unique_ros_msg{nullptr};
for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) {
auto subscription_it = subscriptions_.find(*it);
if (subscription_it == subscriptions_.end()) {
throw std::runtime_error("subscription has unexpectedly gone out of scope");
}
auto subscription_base = subscription_it->second.lock();
if (subscription_base) {
auto subscription = std::dynamic_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcessBuffer<ROSMessageType,
AllocatorT, ROSMessageTypeDeleter>
>(subscription_base);
if (nullptr == subscription) {
throw std::runtime_error(
"failed to dynamic cast SubscriptionIntraProcessBase to "
"SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, which "
"can happen when the publisher and subscription use different "
"allocator types, which is not supported");
}
if (unique_ros_msg == nullptr) {
auto ptr = ROSMessageTypeAllocatorTraits::allocate(allocator, 1);
ROSMessageTypeAllocatorTraits::construct(allocator, ptr);
unique_ros_msg = std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, deleter);
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*message, *unique_ros_msg);
}
if (std::next(it) == subscription_ids.end()) {
// If this is the last subscription, give up ownership
subscription->provide_intra_process_message(std::move(unique_ros_msg));
} else {
// Copy the message since we have additional subscriptions to serve
using MessageUniquePtr = std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>;
MessageUniquePtr copy_message;
auto ptr = ROSMessageTypeAllocatorTraits::allocate(allocator, 1);
ROSMessageTypeAllocatorTraits::construct(allocator, ptr, *unique_ros_msg);
copy_message = MessageUniquePtr(ptr, deleter);
subscription->provide_intra_process_message(std::move(copy_message));
}
} else {
subscriptions_.erase(subscription_it);
}
}
}
PublisherToSubscriptionIdsMap pub_to_subs_;
SubscriptionMap subscriptions_;
PublisherMap publishers_;

View File

@@ -15,25 +15,22 @@
#ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
#define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
#include <rmw/rmw.h>
#include <rmw/types.h>
#include <functional>
#include <map>
#include <memory>
#include <stdexcept>
#include <string>
#include <type_traits>
#include <utility>
#include "rcl/error_handling.h"
#include "rcl/types.h"
#include "rclcpp/any_subscription_callback.hpp"
#include "rclcpp/context.hpp"
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
#include "rclcpp/experimental/subscription_intra_process_buffer.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/waitable.hpp"
#include "tracetools/tracetools.h"
namespace rclcpp
@@ -146,10 +143,10 @@ protected:
if (any_callback_.use_take_shared_method()) {
ConstMessageSharedPtr shared_msg = shared_ptr->first;
any_callback_.dispatch_intra_process(shared_msg, msg_info);
any_callback_.template dispatch_intra_process<T>(shared_msg, msg_info);
} else {
MessageUniquePtr unique_msg = std::move(shared_ptr->second);
any_callback_.dispatch_intra_process(std::move(unique_msg), msg_info);
any_callback_.template dispatch_intra_process<T>(std::move(unique_msg), msg_info);
}
shared_ptr.reset();
}

View File

@@ -15,15 +15,12 @@
#ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
#define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
#include <rmw/rmw.h>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <utility>
#include "rcl/error_handling.h"
#include "rcl/guard_condition.h"
#include "rcl/wait.h"
#include "rclcpp/qos.hpp"
#include "rclcpp/type_support_decl.hpp"

View File

@@ -15,25 +15,20 @@
#ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_
#define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_
#include <rmw/rmw.h>
#include <functional>
#include <map>
#include <memory>
#include <stdexcept>
#include <string>
#include <utility>
#include "rcl/error_handling.h"
#include "rcl/guard_condition.h"
#include "rcl/wait.h"
#include "rclcpp/any_subscription_callback.hpp"
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/waitable.hpp"
#include "tracetools/tracetools.h"
namespace rclcpp
{
@@ -131,7 +126,11 @@ protected:
trigger_guard_condition()
{
rcl_ret_t ret = rcl_trigger_guard_condition(&gc_);
(void)ret;
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(
ret,
"Failed to trigger guard condition in intra-process buffer");
}
}
BufferUniquePtr buffer_;

View File

@@ -20,12 +20,16 @@
#include "rcl/publisher.h"
#include "rcl/subscription.h"
#include "rclcpp/callback_group.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/node_interfaces/node_timers_interface.hpp"
#include "rclcpp/node_interfaces/node_topics_interface.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/publisher_base.hpp"
#include "rclcpp/publisher_factory.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/subscription_base.hpp"
#include "rclcpp/subscription_factory.hpp"
#include "rclcpp/visibility_control.hpp"
namespace rclcpp

View File

@@ -20,12 +20,14 @@
#include <memory>
#include <sstream>
#include <string>
#include <type_traits>
#include <utility>
#include "rcl/error_handling.h"
#include "rcl/publisher.h"
#include "rmw/error_handling.h"
#include "rmw/rmw.h"
#include "rosidl_runtime_cpp/traits.hpp"
#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/allocator/allocator_deleter.hpp"
@@ -253,7 +255,7 @@ public:
this->do_inter_process_publish(*msg);
return;
}
// If an interprocess subscription exist, then the unique_ptr is promoted
// If an interprocess subscription exists, then the unique_ptr is promoted
// to a shared_ptr and published.
// This allows doing the intraprocess publish first and then doing the
// interprocess publish, resulting in lower publish-to-subscribe latency.
@@ -263,10 +265,11 @@ public:
get_subscription_count() > get_intra_process_subscription_count();
if (inter_process_publish_needed) {
auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg));
auto shared_msg = this->do_intra_process_publish_and_return_shared<T, ROSMessageTypeDeleter>(
std::move(msg));
this->do_inter_process_publish(*shared_msg);
} else {
this->do_intra_process_publish(std::move(msg));
this->do_intra_process_publish<T, ROSMessageTypeDeleter>(std::move(msg));
}
}
@@ -319,13 +322,30 @@ public:
>
publish(std::unique_ptr<T, PublishedTypeDeleter> msg)
{
// TODO(wjwwood): later update this to give the unique_ptr to the intra
// process manager and let it decide if it needs to be converted or not.
// For now, convert it unconditionally and pass it the ROSMessageType
// publish function specialization.
auto unique_ros_msg = this->create_ros_message_unique_ptr();
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *unique_ros_msg);
this->publish(std::move(unique_ros_msg));
if (!intra_process_is_enabled_) {
// If we aren't using intraprocess at all, do the type conversion
// immediately and publish interprocess.
auto unique_ros_msg = this->create_ros_message_unique_ptr();
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *unique_ros_msg);
this->do_inter_process_publish(*unique_ros_msg);
return;
}
// If an interprocess subscription exists, then the unique_ptr is promoted
// to a shared_ptr and published.
// This allows doing the intraprocess publish first and then doing the
// interprocess publish, resulting in lower publish-to-subscribe latency.
// It's not possible to do that with an unique_ptr,
// as do_intra_process_publish takes the ownership of the message.
bool inter_process_publish_needed =
get_subscription_count() > get_intra_process_subscription_count();
if (inter_process_publish_needed) {
auto shared_msg = this->do_intra_process_publish_and_return_shared<T, PublishedTypeDeleter>(
std::move(msg));
this->do_inter_process_publish(*shared_msg);
} else {
this->do_intra_process_publish<T, PublishedTypeDeleter>(std::move(msg));
}
}
/// Publish a message on the topic.
@@ -346,12 +366,7 @@ public:
>
publish(const T & msg)
{
// TODO(wjwwood): later update this to give the unique_ptr to the intra
// process manager and let it decide if it needs to be converted or not.
// For now, convert it unconditionally and pass it the ROSMessageType
// publish function specialization.
// Avoid allocating when not using intra process.
// Avoid double allocation when not using intra process.
if (!intra_process_is_enabled_) {
// Convert to the ROS message equivalent and publish it.
ROSMessageType ros_msg;
@@ -359,13 +374,11 @@ public:
// In this case we're not using intra process.
return this->do_inter_process_publish(ros_msg);
}
// Otherwise we have to allocate memory in a unique_ptr, convert it,
// and pass it along.
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
// As the message is not const, a copy should be made.
// A shared_ptr<const MessageT> could also be constructed here.
auto unique_ros_msg = this->create_ros_message_unique_ptr();
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, *unique_ros_msg);
this->publish(std::move(unique_ros_msg));
auto unique_msg = this->duplicate_type_adapt_message_as_unique_ptr(msg);
this->publish(std::move(unique_msg));
}
void
@@ -490,8 +503,9 @@ protected:
}
}
template<typename T, typename DeleterT>
void
do_intra_process_publish(std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> msg)
do_intra_process_publish(std::unique_ptr<T, DeleterT> msg)
{
auto ipm = weak_ipm_.lock();
if (!ipm) {
@@ -502,15 +516,17 @@ protected:
throw std::runtime_error("cannot publish msg which is a null pointer");
}
ipm->template do_intra_process_publish<ROSMessageType, AllocatorT>(
ipm->template do_intra_process_publish<T, DeleterT, MessageT, AllocatorT>(
intra_process_publisher_id_,
std::move(msg),
ros_message_type_allocator_);
ros_message_type_allocator_,
ros_message_type_deleter_,
published_type_allocator_);
}
template<typename T, typename DeleterT>
std::shared_ptr<const ROSMessageType>
do_intra_process_publish_and_return_shared(
std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> msg)
do_intra_process_publish_and_return_shared(std::unique_ptr<T, DeleterT> msg)
{
auto ipm = weak_ipm_.lock();
if (!ipm) {
@@ -521,11 +537,13 @@ protected:
throw std::runtime_error("cannot publish msg which is a null pointer");
}
return ipm->template do_intra_process_publish_and_return_shared<ROSMessageType,
return ipm->template do_intra_process_publish_and_return_shared<T, DeleterT, MessageT,
AllocatorT>(
intra_process_publisher_id_,
std::move(msg),
ros_message_type_allocator_);
ros_message_type_allocator_,
ros_message_type_deleter_,
published_type_allocator_);
}
/// Return a new unique_ptr using the ROSMessageType of the publisher.
@@ -537,7 +555,7 @@ protected:
return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_);
}
/// Duplicate a given ros message as a unique_ptr.
/// Duplicate a given ROS message as a unique_ptr.
std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>
duplicate_ros_message_as_unique_ptr(const ROSMessageType & msg)
{
@@ -546,6 +564,15 @@ protected:
return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_);
}
/// Duplicate a given type adapted message as a unique_ptr.
std::unique_ptr<PublishedType, PublishedTypeDeleter>
duplicate_type_adapt_message_as_unique_ptr(const PublishedType & msg)
{
auto ptr = PublishedTypeAllocatorTraits::allocate(published_type_allocator_, 1);
PublishedTypeAllocatorTraits::construct(published_type_allocator_, ptr, msg);
return std::unique_ptr<PublishedType, PublishedTypeDeleter>(ptr, published_type_deleter_);
}
/// Copy of original options passed during construction.
/**
* It is important to save a copy of this so that the rmw payload which it

View File

@@ -26,14 +26,13 @@
#include "rclcpp/any_subscription_callback.hpp"
#include "rclcpp/get_message_type_support_handle.hpp"
#include "rclcpp/intra_process_buffer_type.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/subscription.hpp"
#include "rclcpp/subscription_options.hpp"
#include "rclcpp/subscription_traits.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rclcpp/topic_statistics/subscription_topic_statistics.hpp"
#include "rclcpp/visibility_control.hpp"
namespace rclcpp
{

View File

@@ -14,9 +14,18 @@
#include "rclcpp/node_interfaces/node_topics.hpp"
#include <stdexcept>
#include <string>
#include "rclcpp/callback_group.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/node_interfaces/node_timers_interface.hpp"
#include "rclcpp/publisher_base.hpp"
#include "rclcpp/publisher_factory.hpp"
#include "rclcpp/subscription_base.hpp"
#include "rclcpp/subscription_factory.hpp"
#include "rclcpp/qos.hpp"
using rclcpp::exceptions::throw_from_rcl_error;

View File

@@ -86,10 +86,12 @@ TEST_F(TestAnySubscriptionCallback, unset_dispatch_throw) {
any_subscription_callback_.dispatch(msg_shared_ptr_, message_info_),
std::runtime_error);
EXPECT_THROW(
any_subscription_callback_.dispatch_intra_process(msg_shared_ptr_, message_info_),
any_subscription_callback_.template dispatch_intra_process<test_msgs::msg::Empty>(
msg_shared_ptr_, message_info_),
std::runtime_error);
EXPECT_THROW(
any_subscription_callback_.dispatch_intra_process(get_unique_ptr_msg(), message_info_),
any_subscription_callback_.template dispatch_intra_process<test_msgs::msg::Empty>(
get_unique_ptr_msg(), message_info_),
std::runtime_error);
}
@@ -203,13 +205,15 @@ format_parameter_with_ta(const ::testing::TestParamInfo<DispatchTestsWithTA::Par
/* Testing dispatch with shared_ptr<const MessageT> as input */ \
TEST_P(DispatchTests_name, test_intra_shared_dispatch) { \
auto any_subscription_callback_to_test = GetParam().get_any_subscription_callback_to_test(); \
any_subscription_callback_to_test.dispatch_intra_process(msg_shared_ptr_, message_info_); \
any_subscription_callback_to_test.template dispatch_intra_process<test_msgs::msg::Empty>( \
msg_shared_ptr_, message_info_); \
} \
\
/* Testing dispatch with unique_ptr<MessageT> as input */ \
TEST_P(DispatchTests_name, test_intra_unique_dispatch) { \
auto any_subscription_callback_to_test = GetParam().get_any_subscription_callback_to_test(); \
any_subscription_callback_to_test.dispatch_intra_process(get_unique_ptr_msg(), message_info_); \
any_subscription_callback_to_test.template dispatch_intra_process<test_msgs::msg::Empty>( \
get_unique_ptr_msg(), message_info_); \
}
PARAMETERIZED_TESTS(DispatchTests)

View File

@@ -116,6 +116,7 @@ public:
{
auto allocator = std::make_shared<Alloc>();
message_allocator_ = std::make_shared<MessageAlloc>(*allocator.get());
message_deleter_ = std::make_shared<MessageDeleter>();
}
// The following functions use the IntraProcessManager
@@ -123,6 +124,7 @@ public:
void publish(MessageUniquePtr msg);
std::shared_ptr<MessageAlloc> message_allocator_;
std::shared_ptr<MessageDeleter> message_deleter_;
};
} // namespace mock
@@ -328,9 +330,15 @@ void Publisher<T, Alloc>::publish(MessageUniquePtr msg)
throw std::runtime_error("cannot publish msg which is a null pointer");
}
ipm->template do_intra_process_publish<T, Alloc>(
using MessageAllocTraits = allocator::AllocRebind<T, Alloc>;
using MessageAlloc = typename MessageAllocTraits::allocator_type;
using MessageDeleter = allocator::Deleter<MessageAlloc, T>;
ipm->template do_intra_process_publish<T, MessageDeleter, T, Alloc>(
intra_process_publisher_id_,
std::move(msg),
*message_allocator_,
*message_deleter_,
*message_allocator_);
}

View File

@@ -34,14 +34,6 @@
#include "rclcpp/msg/string.hpp"
#ifdef RMW_IMPLEMENTATION
# define CLASSNAME_(NAME, SUFFIX) NAME ## __ ## SUFFIX
# define CLASSNAME(NAME, SUFFIX) CLASSNAME_(NAME, SUFFIX)
#else
# define CLASSNAME(NAME, SUFFIX) NAME
#endif
using namespace std::chrono_literals;
static const int g_max_loops = 200;
@@ -50,15 +42,17 @@ static const std::chrono::milliseconds g_sleep_per_loop(10);
class TestPublisher : public ::testing::Test
{
public:
protected:
static void SetUpTestCase()
{
if (!rclcpp::ok()) {
rclcpp::init(0, nullptr);
}
rclcpp::init(0, nullptr);
}
static void TearDownTestCase()
{
rclcpp::shutdown();
}
protected:
void initialize(const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions())
{
node = std::make_shared<rclcpp::Node>("my_node", "/ns", node_options);
@@ -72,20 +66,6 @@ protected:
rclcpp::Node::SharedPtr node;
};
class CLASSNAME (test_intra_process_within_one_node, RMW_IMPLEMENTATION) : public ::testing::Test
{
public:
static void SetUpTestCase()
{
rclcpp::init(0, nullptr);
}
static void TearDownTestCase()
{
rclcpp::shutdown();
}
};
namespace rclcpp
{
@@ -146,44 +126,88 @@ struct TypeAdapter<int, rclcpp::msg::String>
/*
* Testing publisher creation signatures with a type adapter.
*/
TEST_F(TestPublisher, various_creation_signatures) {
for (auto is_intra_process : {true, false}) {
rclcpp::NodeOptions options;
options.use_intra_process_comms(is_intra_process);
initialize(options);
{
using StringTypeAdapter = rclcpp::TypeAdapter<std::string, rclcpp::msg::String>;
auto publisher = node->create_publisher<StringTypeAdapter>("topic", 42);
(void)publisher;
}
{
using StringTypeAdapter = rclcpp::adapt_type<std::string>::as<rclcpp::msg::String>;
auto publisher = node->create_publisher<StringTypeAdapter>("topic", 42);
(void)publisher;
}
TEST_F(TestPublisher, creation_signatures_with_intra) {
rclcpp::NodeOptions options;
options.use_intra_process_comms(true);
initialize(options);
{
using StringTypeAdapter = rclcpp::TypeAdapter<std::string, rclcpp::msg::String>;
auto publisher = node->create_publisher<StringTypeAdapter>("topic", 42);
(void)publisher;
ASSERT_EQ(node->count_publishers("topic"), 1u);
}
{
using StringTypeAdapter = rclcpp::adapt_type<std::string>::as<rclcpp::msg::String>;
auto publisher = node->create_publisher<StringTypeAdapter>("topic", 42);
(void)publisher;
ASSERT_EQ(node->count_publishers("topic"), 1u);
}
}
/*
* Testing publisher creation signatures with a type adapter.
*/
TEST_F(TestPublisher, creation_signatures_without_intra) {
rclcpp::NodeOptions options;
options.use_intra_process_comms(false);
initialize(options);
{
using StringTypeAdapter = rclcpp::TypeAdapter<std::string, rclcpp::msg::String>;
auto publisher = node->create_publisher<StringTypeAdapter>("topic", 42);
(void)publisher;
ASSERT_EQ(node->count_publishers("topic"), 1u);
}
{
using StringTypeAdapter = rclcpp::adapt_type<std::string>::as<rclcpp::msg::String>;
auto publisher = node->create_publisher<StringTypeAdapter>("topic", 42);
(void)publisher;
ASSERT_EQ(node->count_publishers("topic"), 1u);
}
}
/*
* Testing that conversion errors are passed up.
*/
TEST_F(TestPublisher, conversion_exception_is_passed_up) {
TEST_F(TestPublisher, conversion_exception_is_passed_up_with_intra) {
using BadStringTypeAdapter = rclcpp::TypeAdapter<int, rclcpp::msg::String>;
for (auto is_intra_process : {true, false}) {
rclcpp::NodeOptions options;
options.use_intra_process_comms(is_intra_process);
initialize(options);
auto pub = node->create_publisher<BadStringTypeAdapter>("topic_name", 1);
EXPECT_THROW(pub->publish(1), std::runtime_error);
}
rclcpp::NodeOptions options;
options.use_intra_process_comms(true);
initialize(options);
auto pub = node->create_publisher<BadStringTypeAdapter>("topic_name", 1);
auto callback =
[](
const rclcpp::msg::String::ConstSharedPtr msg,
const rclcpp::MessageInfo & message_info
) -> void
{
(void)msg;
(void)message_info;
};
// When we are doing type adaptation with intra-process comms, we have to have a subscriber
// for the 'convert_to_ros_msg' method in the TypeAdapter to be called. That's because the
// intra-process manager delays doing the type conversion until it actually adds the message
// to the subscription's internal buffers. No subscriptions -> no conversions. We add a
// dummy subscription here just to trigger the conversion.
auto sub = node->create_subscription<rclcpp::msg::String>("topic_name", 1, callback);
EXPECT_THROW(pub->publish(1), std::runtime_error);
}
TEST_F(TestPublisher, conversion_exception_is_passed_up_without_intra) {
using BadStringTypeAdapter = rclcpp::TypeAdapter<int, rclcpp::msg::String>;
rclcpp::NodeOptions options;
options.use_intra_process_comms(false);
initialize(options);
auto pub = node->create_publisher<BadStringTypeAdapter>("topic_name", 1);
EXPECT_THROW(pub->publish(1), std::runtime_error);
}
/*
* Testing that publisher sends type adapted types and ROS message types with intra proccess communications.
*/
TEST_F(
CLASSNAME(test_intra_process_within_one_node, RMW_IMPLEMENTATION),
check_type_adapted_message_is_sent_and_received_intra_process) {
TEST_F(TestPublisher, check_type_adapted_message_is_sent_and_received_intra_process) {
using StringTypeAdapter = rclcpp::TypeAdapter<std::string, rclcpp::msg::String>;
const std::string message_data = "Message Data";
const std::string topic_name = "topic_name";
@@ -200,13 +224,14 @@ TEST_F(
ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process);
};
auto node = rclcpp::Node::make_shared(
"test_intra_process",
rclcpp::NodeOptions().use_intra_process_comms(true));
rclcpp::NodeOptions options;
options.use_intra_process_comms(true);
initialize(options);
auto pub = node->create_publisher<StringTypeAdapter>(topic_name, 10);
auto sub = node->create_subscription<rclcpp::msg::String>(topic_name, 1, callback);
auto wait_for_message_to_be_received = [&is_received, &node]() {
auto wait_for_message_to_be_received = [this, &is_received]() {
rclcpp::executors::SingleThreadedExecutor executor;
int i = 0;
executor.add_node(node);

View File

@@ -50,15 +50,17 @@ static const std::chrono::milliseconds g_sleep_per_loop(10);
class TestSubscription : public ::testing::Test
{
public:
protected:
static void SetUpTestCase()
{
if (!rclcpp::ok()) {
rclcpp::init(0, nullptr);
}
rclcpp::init(0, nullptr);
}
static void TearDownTestCase()
{
rclcpp::shutdown();
}
protected:
void initialize(const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions())
{
node = std::make_shared<rclcpp::Node>("my_node", "/ns", node_options);
@@ -72,22 +74,6 @@ protected:
rclcpp::Node::SharedPtr node;
};
class CLASSNAME (test_intra_process_within_one_node, RMW_IMPLEMENTATION) : public ::testing::Test
{
public:
static void SetUpTestCase()
{
if (!rclcpp::ok()) {
rclcpp::init(0, nullptr);
}
}
static void TearDownTestCase()
{
rclcpp::shutdown();
}
};
namespace rclcpp
{
@@ -154,28 +140,29 @@ TEST_F(TestSubscription, various_creation_signatures) {
auto sub =
node->create_subscription<StringTypeAdapter>("topic", 42, [](const std::string &) {});
(void)sub;
ASSERT_EQ(node->count_subscribers("topic"), 1u);
}
{
using StringTypeAdapter = rclcpp::adapt_type<std::string>::as<rclcpp::msg::String>;
auto sub =
node->create_subscription<StringTypeAdapter>("topic", 42, [](const std::string &) {});
(void)sub;
ASSERT_EQ(node->count_subscribers("topic"), 1u);
}
}
/*
* Testing that subscriber receives type adapted types and ROS message types with intra proccess communications.
*/
TEST_F(
CLASSNAME(test_intra_process_within_one_node, RMW_IMPLEMENTATION),
check_type_adapted_messages_are_received_by_intra_process_subscription) {
TEST_F(TestSubscription, check_type_adapted_messages_are_received_by_intra_process_subscription) {
using StringTypeAdapter = rclcpp::TypeAdapter<std::string, rclcpp::msg::String>;
const std::string message_data = "Message Data";
const std::string topic_name = "topic_name";
auto node = rclcpp::Node::make_shared(
"test_intra_process",
rclcpp::NodeOptions().use_intra_process_comms(true));
rclcpp::NodeOptions options;
options.use_intra_process_comms(true);
initialize(options);
auto pub = node->create_publisher<rclcpp::msg::String>(topic_name, 1);
rclcpp::msg::String msg;
@@ -385,16 +372,15 @@ TEST_F(
/*
* Testing that subscriber receives type adapted types and ROS message types with inter proccess communications.
*/
TEST_F(
CLASSNAME(test_intra_process_within_one_node, RMW_IMPLEMENTATION),
check_type_adapted_messages_are_received_by_inter_process_subscription) {
TEST_F(TestSubscription, check_type_adapted_messages_are_received_by_inter_process_subscription) {
using StringTypeAdapter = rclcpp::TypeAdapter<std::string, rclcpp::msg::String>;
const std::string message_data = "Message Data";
const std::string topic_name = "topic_name";
auto node = rclcpp::Node::make_shared(
"test_intra_process",
rclcpp::NodeOptions().use_intra_process_comms(false));
rclcpp::NodeOptions options;
options.use_intra_process_comms(false);
initialize(options);
auto pub = node->create_publisher<rclcpp::msg::String>(topic_name, 1);
rclcpp::msg::String msg;