Compare commits

...

45 Commits

Author SHA1 Message Date
Karsten Knese
1cd811a412 make it compile on osx
Signed-off-by: Karsten Knese <karsten@openrobotics.org>
2020-04-17 15:02:13 -07:00
Joshua Hampp
241164204d fixed include order
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
1f8226a6f3 line break
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
511476c445 fixed check
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
408343fb96 renamed variable
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
ff7a81f5dd reuse constructor
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
4fa5f090ed fixed memory loss
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
ab73a62297 updated loop signature
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
5472d363b1 added comment
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
35fd4908f8 fixed year in header
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
93dc82699b added original constructor to LifcyclePublisher
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
b482164b97 moved serialization and serialized message out of "experimental"
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
a2fc8bd867 enabled publishing of rcl_serialized_message_t
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Fabian König
9b436fa777 fix include guards
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
2147026fbf initializing variables
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
2cefdca297 added backwards compatibility for publishing serialized messages
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
4c7506f5aa uncrustify
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
18181f5d8d changes due to review of PR
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
9b65b7cefd * default deleter
* initiazlize variables

Co-Authored-By: Karsten Knese <Karsten1987@users.noreply.github.com>
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
DensoADAS
657d9a0be4 Update rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
Co-Authored-By: Karsten Knese <Karsten1987@users.noreply.github.com>
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
DensoADAS
7cff1aa85b Update rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
Co-Authored-By: Karsten Knese <Karsten1987@users.noreply.github.com>
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
d4536b3036 updated to rclcpp::ok
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
d74c4edac1 removed unnecessary include
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
d8f1da9c88 adapted to multi waitables for ipm
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
4e1e744624 removed unnecessary dependency
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
2ddbe32469 added missing member from rebase
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
4c80594a6f fixed code style
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
bc228ee032 uncrustified file
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
0c83e4981d updated test_intra_process_communication:
* fixed allocation counter
 * updated data type naming

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
af14ee76be * fixed code style
* updated error messages
* added const modifier

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
d7a84eaa66 changed timing
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
a3db0069b6 beautified error output
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
5faf8f40e7 changed include notation
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
02921427f7 added unit test for serialized intra process communication
* test memory (de)allocation
 * test communication channels between serialized and unserialized content

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
56b520eb95 extended "create_subscription" and "create_subscriptioncreate_publisher_factory" to pass message type
* backwards compatible
 * allows creation of publisher with type "rcl_serialized_message_t"

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
f9ef2c2fd5 extended "Subscription" for serialized messages:
* implemented second communication channel for serialized intra process messages

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
29fea926af extended "create_publisher" and "create_publisher_factory" to pass message type
* backwards compatible
 * allows creation of publisher with type "rcl_serialized_message_t"

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
c8522b3cd6 updated "LifcecylePublisher" for serialized messages
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
9d7ef52885 extended "Publisher" for serialized messages:
* implemented second communication channel for serialized intra process messages
 * extend publish to handle "rcl_serialized_message_t"
 * changed behaviour of serialized message publishing by taking ownership of message (for deletion)

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
86bd3a3c80 extended "Publisher" for serialized messages:
* implemented second communication channel for serialized intra process messages
extended "Publisher" for serialized messages:
 * pass message type by argument
 * added constructor for backwards compatibility (moved common code in separate methods "init_setup")
 * added allocator for serialized messages

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
a300c2f4ae added secon communication channel for intra process communication for serialized communication in publisher base and subscriber base (adapted waitables)
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:11 -07:00
Joshua Hampp
9f1c10f1c4 extended "SubscriptionIntraProcess" for serialized communication by:
* distinguish between content type and callback type
 * use "SerializedContainer" for serialized messages for memory deletion
 * added specialized methods for combinations of (un)serialized content/callback
 * automatically (de)serialize messages
 * allowed communication types are now (MessageT==CallbackMessageT || MessageT==SerializedContainer || CallbackMessageT==rcl_serialized_message_t)

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:11 -07:00
Joshua Hampp
f3aab4f6c5 extended "IntraProcessManager" for serialized messages:
* added flag for serialized communication
 * check for flag in "can_communicate"

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:11 -07:00
Joshua Hampp
c8162ba861 added "SerializedContainer" as container of rcl_serialized_message_t, for addind a deleter to ease up memory handling
further features:
 * copy constructor (allowing static memory allocation, e.g. if for static memory allocation in device driver)
 * destructor

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:11 -07:00
Joshua Hampp
9b73845f89 added "SerializationBase" and "Serialization" to convert a ROS2 message to rcl_serialized_message_t and vice versa
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:11 -07:00
21 changed files with 1134 additions and 143 deletions

View File

@@ -245,6 +245,18 @@ if(BUILD_TESTING)
"test_msgs"
)
target_link_libraries(test_loaned_message ${PROJECT_NAME})
ament_add_gtest(test_intra_process_communication test/test_intra_process_communication.cpp
TIMEOUT 120)
if(TARGET test_intra_process_communication)
ament_target_dependencies(test_intra_process_communication
"rcl"
"rcl_interfaces"
"rmw"
"rosidl_typesupport_cpp"
"test_msgs"
)
endif()
target_link_libraries(test_intra_process_communication ${PROJECT_NAME})
ament_add_gtest(test_node test/test_node.cpp TIMEOUT 240)
if(TARGET test_node)

View File

@@ -43,6 +43,7 @@ std::shared_ptr<PublisherT>
create_publisher(
NodeT & node,
const std::string & topic_name,
const rosidl_message_type_support_t & type_support,
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options = (
rclcpp::PublisherOptionsWithAllocator<AllocatorT>()
@@ -56,7 +57,7 @@ create_publisher(
// Create the publisher.
auto pub = node_topics->create_publisher(
topic_name,
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(options),
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(options, type_support),
qos
);
@@ -66,6 +67,28 @@ create_publisher(
return std::dynamic_pointer_cast<PublisherT>(pub);
}
template<
typename MessageT,
typename AllocatorT = std::allocator<void>,
typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>,
typename NodeT>
std::shared_ptr<PublisherT>
create_publisher(
NodeT & node,
const std::string & topic_name,
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options = (
rclcpp::PublisherOptionsWithAllocator<AllocatorT>()
)
)
{
const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>();
return create_publisher<MessageT, AllocatorT, PublisherT, NodeT>(
node, topic_name, type_support,
qos, options);
}
} // namespace rclcpp
#endif // RCLCPP__CREATE_PUBLISHER_HPP_

View File

@@ -51,14 +51,13 @@ typename std::shared_ptr<SubscriptionT>
create_subscription(
NodeT && node,
const std::string & topic_name,
const rosidl_message_type_support_t & type_support,
const rclcpp::QoS & qos,
CallbackT && callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options = (
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>()
),
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = (
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options =
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>(),
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat =
MessageMemoryStrategyT::create_default()
)
)
{
using rclcpp::node_interfaces::get_node_topics_interface;
@@ -67,7 +66,8 @@ create_subscription(
auto factory = rclcpp::create_subscription_factory<MessageT>(
std::forward<CallbackT>(callback),
options,
msg_mem_strat
msg_mem_strat,
type_support
);
auto sub = node_topics->create_subscription(topic_name, factory, qos);
@@ -76,6 +76,39 @@ create_subscription(
return std::dynamic_pointer_cast<SubscriptionT>(sub);
}
template<
typename MessageT,
typename CallbackT,
typename AllocatorT = std::allocator<void>,
typename CallbackMessageT =
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type,
typename SubscriptionT = rclcpp::Subscription<CallbackMessageT, AllocatorT>,
typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
CallbackMessageT,
AllocatorT
>,
typename NodeT>
typename std::shared_ptr<SubscriptionT>
create_subscription(
NodeT && node,
const std::string & topic_name,
const rclcpp::QoS & qos,
CallbackT && callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options =
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>(),
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat =
MessageMemoryStrategyT::create_default()
)
{
const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>();
return create_subscription<MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT,
MessageMemoryStrategyT>(
std::forward<NodeT>(
node), topic_name, type_support, qos, std::forward<CallbackT>(
callback), options, msg_mem_strat);
}
} // namespace rclcpp
#endif // RCLCPP__CREATE_SUBSCRIPTION_HPP_

View File

@@ -110,11 +110,14 @@ public:
* In addition this generates a unique intra process id for the subscription.
*
* \param subscription the SubscriptionIntraProcess to register.
* \param is_serialized true if the buffer expects serialized messages
* \return an unsigned 64-bit integer which is the subscription's unique id.
*/
RCLCPP_PUBLIC
uint64_t
add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription);
add_subscription(
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription,
const bool is_serialized = false);
/// Unregister a subscription using the subscription's unique id.
/**
@@ -134,11 +137,12 @@ public:
* In addition this generates a unique intra process id for the publisher.
*
* \param publisher publisher to be registered with the manager.
* \param is_serialized true if the buffer expects serialized messages
* \return an unsigned 64-bit integer which is the publisher's unique id.
*/
RCLCPP_PUBLIC
uint64_t
add_publisher(rclcpp::PublisherBase::SharedPtr publisher);
add_publisher(rclcpp::PublisherBase::SharedPtr publisher, const bool is_serialized = false);
/// Unregister a publisher using the publisher's unique id.
/**
@@ -311,6 +315,7 @@ private:
rmw_qos_profile_t qos;
const char * topic_name;
bool use_take_shared_method;
bool is_serialized;
};
struct PublisherInfo
@@ -320,6 +325,7 @@ private:
rclcpp::PublisherBase::WeakPtr publisher;
rmw_qos_profile_t qos;
const char * topic_name;
bool is_serialized;
};
struct SplittedSubscriptions

View File

@@ -28,6 +28,8 @@
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
#include "rclcpp/serialization.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/waitable.hpp"
#include "tracetools/tracetools.h"
@@ -37,6 +39,8 @@ namespace rclcpp
namespace experimental
{
class SerializedMessage;
template<
typename MessageT,
typename Alloc = std::allocator<void>,
@@ -51,6 +55,10 @@ public:
using MessageAlloc = typename MessageAllocTraits::allocator_type;
using ConstMessageSharedPtr = std::shared_ptr<const MessageT>;
using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
using CallbackMessageAllocTraits = allocator::AllocRebind<CallbackMessageT, Alloc>;
using CallbackMessageAlloc = typename CallbackMessageAllocTraits::allocator_type;
using CallbackMessageUniquePtr = std::unique_ptr<CallbackMessageT>;
using CallbackMessageSharedPtr = std::shared_ptr<CallbackMessageT>;
using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer<
MessageT,
@@ -64,11 +72,15 @@ public:
rclcpp::Context::SharedPtr context,
const std::string & topic_name,
rmw_qos_profile_t qos_profile,
rclcpp::IntraProcessBufferType buffer_type)
rclcpp::IntraProcessBufferType buffer_type,
std::shared_ptr<SerializationBase> serializer)
: SubscriptionIntraProcessBase(topic_name, qos_profile),
any_callback_(callback)
any_callback_(callback), serializer_(serializer)
{
if (!std::is_same<MessageT, CallbackMessageT>::value) {
if (!std::is_same<MessageT, CallbackMessageT>::value &&
!std::is_same<MessageT, rclcpp::SerializedMessage>::value &&
!std::is_same<CallbackMessageT, rcl_serialized_message_t>::value)
{
throw std::runtime_error("SubscriptionIntraProcess wrong callback type");
}
@@ -142,18 +154,47 @@ private:
(void)ret;
}
// convert from ROS2 message to rcl_serialized_message_t (serilizatino needed)
template<typename T>
typename std::enable_if<std::is_same<T, rcl_serialized_message_t>::value, void>::type
typename std::enable_if<
std::is_same<T, rcl_serialized_message_t>::value &&
!std::is_same<MessageT, rclcpp::SerializedMessage>::value,
void>::type
execute_impl()
{
throw std::runtime_error("Subscription intra-process can't handle serialized messages");
if (nullptr == serializer_) {
throw std::runtime_error("Subscription intra-process can't handle serialized messages");
}
rmw_message_info_t msg_info = {};
msg_info.from_intra_process = true;
ConstMessageSharedPtr msg = buffer_->consume_shared();
auto serialized_msg =
serializer_->serialize_message(reinterpret_cast<const void *>(msg.get()));
if (nullptr == serialized_msg) {
throw std::runtime_error("Subscription intra-process could not serialize message");
}
if (any_callback_.use_take_shared_method()) {
any_callback_.dispatch_intra_process(serialized_msg, msg_info);
} else {
throw std::runtime_error(
"Subscription intra-process for serialized "
"messages does not support unique pointers.");
}
}
// forward from ROS2 message to ROS2 message (same type)
template<class T>
typename std::enable_if<!std::is_same<T, rcl_serialized_message_t>::value, void>::type
typename std::enable_if<
!std::is_same<T, rcl_serialized_message_t>::value &&
!std::is_same<MessageT, rclcpp::SerializedMessage>::value,
void>::type
execute_impl()
{
rmw_message_info_t msg_info;
rmw_message_info_t msg_info = {};
msg_info.publisher_gid = {0, {0}};
msg_info.from_intra_process = true;
@@ -166,8 +207,80 @@ private:
}
}
// forward from rcl_serialized_message_t to SerializationMessage (no conversion needed)
template<typename T>
typename std::enable_if<
std::is_same<T, rcl_serialized_message_t>::value &&
std::is_same<MessageT, rclcpp::SerializedMessage>::value,
void>::type
execute_impl()
{
rmw_message_info_t msg_info = {};
msg_info.from_intra_process = true;
if (any_callback_.use_take_shared_method()) {
ConstMessageSharedPtr msg = buffer_->consume_shared();
if (msg == nullptr) {
throw std::runtime_error("Subscription intra-process could not get serialized message");
}
any_callback_.dispatch_intra_process(msg, msg_info);
} else {
throw std::runtime_error(
"Subscription intra-process for serialized "
"messages does not support unique pointers.");
}
}
// convert from rcl_serialized_message_t to ROS2 message (deserialization needed)
template<class T>
typename std::enable_if<
!std::is_same<T, rcl_serialized_message_t>::value &&
std::is_same<MessageT, rclcpp::SerializedMessage>::value,
void>::type
execute_impl()
{
if (nullptr == serializer_) {
throw std::runtime_error("Subscription intra-process can't handle unserialized messages");
}
ConstMessageSharedPtr serialized_container = buffer_->consume_shared();
if (nullptr == serialized_container) {
throw std::runtime_error("Subscription intra-process could not get serialized message");
}
rmw_message_info_t msg_info = {};
msg_info.from_intra_process = true;
if (any_callback_.use_take_shared_method()) {
CallbackMessageSharedPtr msg = construct_unique();
serializer_->deserialize_message(
serialized_container.get(),
reinterpret_cast<void *>(msg.get()));
any_callback_.dispatch_intra_process(msg, msg_info);
} else {
CallbackMessageUniquePtr msg = construct_unique();
serializer_->deserialize_message(
serialized_container.get(),
reinterpret_cast<void *>(msg.get()));
any_callback_.dispatch_intra_process(std::move(msg), msg_info);
}
}
CallbackMessageUniquePtr construct_unique()
{
CallbackMessageUniquePtr unique_msg;
auto ptr = CallbackMessageAllocTraits::allocate(*message_allocator_.get(), 1);
CallbackMessageAllocTraits::construct(*message_allocator_.get(), ptr);
unique_msg = CallbackMessageUniquePtr(ptr);
return unique_msg;
}
AnySubscriptionCallback<CallbackMessageT, Alloc> any_callback_;
BufferUniquePtr buffer_;
std::shared_ptr<SerializationBase> serializer_;
std::shared_ptr<CallbackMessageAlloc> message_allocator_ =
std::make_shared<CallbackMessageAlloc>();
};
} // namespace experimental

View File

@@ -56,6 +56,12 @@ public:
using MessageDeleter = allocator::Deleter<MessageAllocator, MessageT>;
using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>;
using MessageSharedPtr = std::shared_ptr<const MessageT>;
using SerializedMessageAllocatorTraits =
allocator::AllocRebind<rclcpp::SerializedMessage,
AllocatorT>;
using SerializedMessageAllocator = typename SerializedMessageAllocatorTraits::allocator_type;
using SerializedMessageDeleter = allocator::Deleter<SerializedMessageAllocator,
rclcpp::SerializedMessage>;
RCLCPP_SMART_PTR_DEFINITIONS(Publisher<MessageT, AllocatorT>)
@@ -70,37 +76,28 @@ public:
*rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
options.template to_rcl_publisher_options<MessageT>(qos)),
options_(options),
message_allocator_(new MessageAllocator(*options.get_allocator().get()))
message_allocator_(new MessageAllocator(*options.get_allocator().get())),
message_allocator_serialized_(new SerializedMessageAllocator(*options.get_allocator().get()))
{
allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get());
init_setup();
}
if (options_.event_callbacks.deadline_callback) {
this->add_event_handler(
options_.event_callbacks.deadline_callback,
RCL_PUBLISHER_OFFERED_DEADLINE_MISSED);
}
if (options_.event_callbacks.liveliness_callback) {
this->add_event_handler(
options_.event_callbacks.liveliness_callback,
RCL_PUBLISHER_LIVELINESS_LOST);
}
if (options_.event_callbacks.incompatible_qos_callback) {
this->add_event_handler(
options_.event_callbacks.incompatible_qos_callback,
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
} else if (options_.use_default_callbacks) {
// Register default callback when not specified
try {
this->add_event_handler(
[this](QOSOfferedIncompatibleQoSInfo & info) {
this->default_incompatible_qos_callback(info);
},
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
} catch (UnsupportedEventTypeException & /*exc*/) {
// pass
}
}
// Setup continues in the post construction method, post_init_setup().
Publisher(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options,
const rosidl_message_type_support_t & type_support)
: PublisherBase(
node_base,
topic,
type_support,
options.template to_rcl_publisher_options<MessageT>(qos)),
options_(options),
message_allocator_(new MessageAllocator(*options.get_allocator().get())),
message_allocator_serialized_(new SerializedMessageAllocator(*options.get_allocator().get()))
{
init_setup();
}
/// Called post construction, so that construction may continue after shared_from_this() works.
@@ -135,8 +132,11 @@ public:
"intraprocess communication allowed only with volatile durability");
}
uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this());
uint64_t intra_process_publisher_id_serialized = ipm->add_publisher(
this->shared_from_this(), true);
this->setup_intra_process(
intra_process_publisher_id,
intra_process_publisher_id_serialized,
ipm);
}
}
@@ -173,6 +173,11 @@ public:
virtual void
publish(std::unique_ptr<MessageT, MessageDeleter> msg)
{
if (std::is_same<MessageT, rcl_serialized_message_t>::value) {
this->template publish<MessageDeleter>(std::move(msg));
return;
}
if (!intra_process_is_enabled_) {
this->do_inter_process_publish(*msg);
return;
@@ -187,34 +192,41 @@ public:
get_subscription_count() > get_intra_process_subscription_count();
if (inter_process_publish_needed) {
auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg));
auto shared_msg = this->do_intra_process_publish_and_return_shared(
std::move(msg), message_allocator_);
this->do_inter_process_publish(*shared_msg);
} else {
this->do_intra_process_publish(std::move(msg));
this->do_intra_process_publish(std::move(msg), message_allocator_);
}
}
virtual void
publish(const MessageT & msg)
{
// Avoid allocating when not using intra process.
if (!intra_process_is_enabled_) {
// In this case we're not using intra process.
return this->do_inter_process_publish(msg);
}
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
// As the message is not const, a copy should be made.
// A shared_ptr<const MessageT> could also be constructed here.
auto ptr = MessageAllocatorTraits::allocate(*message_allocator_.get(), 1);
MessageAllocatorTraits::construct(*message_allocator_.get(), ptr, msg);
MessageUniquePtr unique_msg(ptr, message_deleter_);
this->publish(std::move(unique_msg));
this->do_publish_message(msg);
}
void
template<class T = MessageT>
typename std::enable_if<!std::is_same<T, rcl_serialized_message_t>::value>::type
publish(const rcl_serialized_message_t & serialized_msg)
{
return this->do_serialized_publish(&serialized_msg);
this->do_publish_message<rcl_serialized_message_t>(serialized_msg);
}
/// Publish a serialized message. Non specialized version to prevent compiling errors.
template<typename TDeleter, typename T>
void publish(std::unique_ptr<T, TDeleter> serialized_msg)
{
(void)serialized_msg;
throw std::runtime_error(
"publishing unique_ptr with custom deleter only supported for serialized messages");
}
/// Publish a serialized message.
template<typename TDeleter>
void publish(std::unique_ptr<rcl_serialized_message_t, TDeleter> serialized_msg)
{
this->do_serialized_publish(*serialized_msg);
}
/// Publish an instance of a LoanedMessage.
@@ -259,6 +271,69 @@ public:
}
protected:
void init_setup()
{
allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get());
if (options_.event_callbacks.deadline_callback) {
this->add_event_handler(
options_.event_callbacks.deadline_callback,
RCL_PUBLISHER_OFFERED_DEADLINE_MISSED);
}
if (options_.event_callbacks.liveliness_callback) {
this->add_event_handler(
options_.event_callbacks.liveliness_callback,
RCL_PUBLISHER_LIVELINESS_LOST);
}
if (options_.event_callbacks.incompatible_qos_callback) {
this->add_event_handler(
options_.event_callbacks.incompatible_qos_callback,
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
} else if (options_.use_default_callbacks) {
// Register default callback when not specified
try {
this->add_event_handler(
[this](QOSOfferedIncompatibleQoSInfo & info) {
this->default_incompatible_qos_callback(info);
},
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
} catch (UnsupportedEventTypeException & /*exc*/) {
RCLCPP_WARN_ONCE(
rclcpp::get_logger(rcl_node_get_logger_name(rcl_node_handle_.get())),
"This rmw implementation does not support ON_OFFERED_INCOMPATIBLE_QOS "
"events, you will not be notified when Publishers offer an incompatible "
"QoS profile to Subscriptions on the same topic.");
}
}
// Setup continues in the post construction method, post_init_setup().
}
template<class T = MessageT>
typename std::enable_if<!std::is_same<T, rcl_serialized_message_t>::value>::type
do_publish_message(const T & msg)
{
// Avoid allocating when not using intra process.
if (!intra_process_is_enabled_) {
// In this case we're not using intra process.
return this->do_inter_process_publish(msg);
}
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
// As the message is not const, a copy should be made.
// A shared_ptr<const MessageT> could also be constructed here.
auto ptr = MessageAllocatorTraits::allocate(*message_allocator_.get(), 1);
MessageAllocatorTraits::construct(*message_allocator_.get(), ptr, msg);
MessageUniquePtr unique_msg(ptr, message_deleter_);
this->publish(std::move(unique_msg));
}
template<class T = MessageT>
typename std::enable_if<std::is_same<T, rcl_serialized_message_t>::value>::type
do_publish_message(const T & msg)
{
// Kept for backwards compatibility. Copies compelete memory!
this->publish(std::make_unique<rclcpp::SerializedMessage>(msg));
}
void
do_inter_process_publish(const MessageT & msg)
{
@@ -280,15 +355,24 @@ protected:
}
void
do_serialized_publish(const rcl_serialized_message_t * serialized_msg)
do_serialized_publish(rcl_serialized_message_t serialized_msg)
{
if (intra_process_is_enabled_) {
// TODO(Karsten1987): support serialized message passed by intraprocess
throw std::runtime_error("storing serialized messages in intra process is not supported yet");
bool inter_process_publish_needed =
get_subscription_count() > get_intra_process_subscription_count();
if (inter_process_publish_needed) {
// declare here to avoid deletion before returning method
auto status = rcl_publish_serialized_message(&publisher_handle_, &serialized_msg, nullptr);
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message");
}
}
auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg, nullptr);
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message");
auto msg = std::make_unique<rclcpp::SerializedMessage>(
std::move(serialized_msg));
if (intra_process_is_enabled_) {
do_intra_process_publish(std::move(msg), message_allocator_serialized_);
}
}
@@ -312,8 +396,11 @@ protected:
}
}
template<typename T, typename Deleter, typename Allocator>
void
do_intra_process_publish(std::unique_ptr<MessageT, MessageDeleter> msg)
do_intra_process_publish(
std::unique_ptr<T, Deleter> msg,
std::shared_ptr<Allocator> & message_allocator)
{
auto ipm = weak_ipm_.lock();
if (!ipm) {
@@ -324,14 +411,21 @@ protected:
throw std::runtime_error("cannot publish msg which is a null pointer");
}
ipm->template do_intra_process_publish<MessageT, AllocatorT>(
intra_process_publisher_id_,
const uint64_t intra_process_publisher_id = std::is_same<T,
rclcpp::SerializedMessage>::value ?
intra_process_publisher_id_serialized_ : intra_process_publisher_id_;
ipm->template do_intra_process_publish<T, AllocatorT>(
intra_process_publisher_id,
std::move(msg),
message_allocator_);
message_allocator);
}
std::shared_ptr<const MessageT>
do_intra_process_publish_and_return_shared(std::unique_ptr<MessageT, MessageDeleter> msg)
template<typename T, typename Deleter, typename Allocator>
std::shared_ptr<const T>
do_intra_process_publish_and_return_shared(
std::unique_ptr<T, Deleter> msg,
std::shared_ptr<Allocator> & message_allocator)
{
auto ipm = weak_ipm_.lock();
if (!ipm) {
@@ -342,10 +436,14 @@ protected:
throw std::runtime_error("cannot publish msg which is a null pointer");
}
return ipm->template do_intra_process_publish_and_return_shared<MessageT, AllocatorT>(
intra_process_publisher_id_,
const uint64_t intra_process_publisher_id = std::is_same<T,
rclcpp::SerializedMessage>::value ?
intra_process_publisher_id_serialized_ : intra_process_publisher_id_;
return ipm->template do_intra_process_publish_and_return_shared<T, AllocatorT>(
intra_process_publisher_id,
std::move(msg),
message_allocator_);
message_allocator);
}
/// Copy of original options passed during construction.
@@ -356,6 +454,7 @@ protected:
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> options_;
std::shared_ptr<MessageAllocator> message_allocator_;
std::shared_ptr<SerializedMessageAllocator> message_allocator_serialized_;
MessageDeleter message_deleter_;
};

View File

@@ -191,6 +191,7 @@ public:
void
setup_intra_process(
uint64_t intra_process_publisher_id,
uint64_t intra_process_publisher_id_serialized,
IntraProcessManagerSharedPtr ipm);
protected:
@@ -222,6 +223,7 @@ protected:
bool intra_process_is_enabled_;
IntraProcessManagerWeakPtr weak_ipm_;
uint64_t intra_process_publisher_id_;
uint64_t intra_process_publisher_id_serialized_;
rmw_gid_t rmw_gid_;
};

View File

@@ -63,17 +63,21 @@ struct PublisherFactory
/// Return a PublisherFactory with functions setup for creating a PublisherT<MessageT, AllocatorT>.
template<typename MessageT, typename AllocatorT, typename PublisherT>
PublisherFactory
create_publisher_factory(const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options)
create_publisher_factory(
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options,
const rosidl_message_type_support_t & type_support)
{
PublisherFactory factory {
// factory function that creates a MessageT specific PublisherT
[options](
[options, type_support](
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic_name,
const rclcpp::QoS & qos
) -> std::shared_ptr<PublisherT>
{
auto publisher = std::make_shared<PublisherT>(node_base, topic_name, qos, options);
auto publisher = std::make_shared<PublisherT>(
node_base, topic_name, qos, options,
type_support);
// This is used for setting up things like intra process comms which
// require this->shared_from_this() which cannot be called from
// the constructor.

View File

@@ -0,0 +1,123 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__SERIALIZATION_HPP_
#define RCLCPP__SERIALIZATION_HPP_
#include <rmw/rmw.h>
#include <memory>
#include <string>
#include "rcl/error_handling.h"
namespace rclcpp
{
/// Interface to (de)serialize a message
class SerializationBase
{
public:
virtual ~SerializationBase() = default;
/// Serialize a ROS2 message to a serialized stream
/**
* \param[in] message The ROS2 message which is read and serialized by rmw.
*/
virtual std::shared_ptr<rcl_serialized_message_t> serialize_message(const void * message) = 0;
/// Deserialize a serialized stream to a ROS message
/**
* \param[in] serialized_message The serialized message to be converted to ROS2 by rmw.
* \param[out] message The deserialized ROS2 message.
*/
virtual void deserialize_message(
const rcl_serialized_message_t * serialized_message,
void * msg) = 0;
};
/// Default implementation to (de)serialize a message by using rmw_(de)serialize
class Serialization : public SerializationBase
{
public:
Serialization(
const rosidl_message_type_support_t & type_support,
const rcutils_allocator_t allocator = rcutils_get_default_allocator())
: type_support_(type_support), rcutils_allocator_(allocator)
{}
std::shared_ptr<rcl_serialized_message_t> serialize_message(const void * message) override
{
auto serialized_message = new rcl_serialized_message_t;
*serialized_message = rmw_get_zero_initialized_serialized_message();
const auto ret = rmw_serialized_message_init(serialized_message, 0, &rcutils_allocator_);
if (ret != RCUTILS_RET_OK) {
throw std::runtime_error(
"Error allocating resources for serialized message: " +
std::string(rcutils_get_error_string().str));
}
if (nullptr == message) {
delete serialized_message;
throw std::runtime_error("Message is nullpointer while serialization.");
}
const auto error = rmw_serialize(
message,
&type_support_,
serialized_message);
if (error != RCL_RET_OK) {
delete serialized_message;
throw std::runtime_error("Failed to serialize.");
}
auto shared_serialized_msg = std::shared_ptr<rcl_serialized_message_t>(
serialized_message,
[](rcl_serialized_message_t * msg) {
auto fini_ret = rmw_serialized_message_fini(msg);
delete msg;
if (fini_ret != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Failed to destroy serialized message: %s", rcl_get_error_string().str);
}
});
return shared_serialized_msg;
}
void deserialize_message(const rcl_serialized_message_t * serialized_message, void * msg) override
{
if (nullptr == serialized_message ||
serialized_message->buffer_capacity == 0 ||
serialized_message->buffer_length == 0 ||
!serialized_message->buffer)
{
throw std::runtime_error("Failed to deserialize nullptr serialized message.");
}
const auto ret = rmw_deserialize(serialized_message, &type_support_, msg);
if (ret != RMW_RET_OK) {
throw std::runtime_error("Failed to deserialize serialized message.");
}
}
private:
rosidl_message_type_support_t type_support_;
rcutils_allocator_t rcutils_allocator_;
};
} // namespace rclcpp
#endif // RCLCPP__SERIALIZATION_HPP_

View File

@@ -0,0 +1,80 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__SERIALIZED_MESSAGE_HPP_
#define RCLCPP__SERIALIZED_MESSAGE_HPP_
#include <rclcpp/exceptions.hpp>
#include <cstring>
#include "rcutils/logging_macros.h"
#include "rmw/serialized_message.h"
namespace rclcpp
{
/// Object oriented version of rcl_serialized_message_t with destructor to avoid memory leaks
class SerializedMessage : public rcl_serialized_message_t
{
public:
SerializedMessage()
: rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message())
{}
explicit SerializedMessage(const SerializedMessage & serialized_message)
: SerializedMessage(static_cast<const rcl_serialized_message_t>(serialized_message))
{}
explicit SerializedMessage(const rcl_serialized_message_t & serialized_message)
: rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message())
{
const auto ret = rmw_serialized_message_init(
this, serialized_message.buffer_length,
&serialized_message.allocator);
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
// do not call memcpy if the pointer is "static"
if (buffer != serialized_message.buffer) {
std::memcpy(buffer, serialized_message.buffer, serialized_message.buffer_length);
}
buffer_length = serialized_message.buffer_length;
}
explicit SerializedMessage(rcl_serialized_message_t && msg)
: rcl_serialized_message_t(msg)
{
// reset buffer to prevent double free
msg = rmw_get_zero_initialized_serialized_message();
}
~SerializedMessage()
{
if (nullptr != buffer) {
const auto fini_ret = rmw_serialized_message_fini(this);
if (fini_ret != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Failed to destroy serialized message: %s", rcl_get_error_string().str);
}
}
}
};
} // namespace rclcpp
#endif // RCLCPP__SERIALIZED_MESSAGE_HPP_

View File

@@ -41,6 +41,7 @@
#include "rclcpp/message_info.hpp"
#include "rclcpp/message_memory_strategy.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/subscription_base.hpp"
#include "rclcpp/subscription_options.hpp"
#include "rclcpp/subscription_traits.hpp"
@@ -155,29 +156,84 @@ public:
"intraprocess communication allowed only with volatile durability");
}
// First create a SubscriptionIntraProcess which will be given to the intra-process manager.
auto context = node_base->get_context();
using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess<
CallbackMessageT,
AllocatorT,
typename MessageUniquePtr::deleter_type>;
auto subscription_intra_process = std::make_shared<SubscriptionIntraProcessT>(
callback,
options.get_allocator(),
context,
this->get_topic_name(), // important to get like this, as it has the fully-qualified name
qos_profile,
resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback));
TRACEPOINT(
rclcpp_subscription_init,
(const void *)get_subscription_handle().get(),
(const void *)subscription_intra_process.get());
// Add it to the intra process manager.
using rclcpp::experimental::IntraProcessManager;
uint64_t intra_process_subscription_id;
uint64_t intra_process_subscription_id_serialized;
auto context = node_base->get_context();
auto ipm = context->get_sub_context<IntraProcessManager>();
uint64_t intra_process_subscription_id = ipm->add_subscription(subscription_intra_process);
this->setup_intra_process(intra_process_subscription_id, ipm);
{
// First create a SubscriptionIntraProcess which will be given to the intra-process manager.
auto subscription_intra_process = std::make_shared<
rclcpp::experimental::SubscriptionIntraProcess<
CallbackMessageT,
AllocatorT,
typename MessageUniquePtr::deleter_type
>>(
callback,
options.get_allocator(),
context,
this->get_topic_name(), // important to get it by the fully-qualified name
qos.get_rmw_qos_profile(),
resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback),
std::make_shared<rclcpp::Serialization>(
type_support_handle,
options.template to_rcl_subscription_options<CallbackMessageT>(qos).allocator)
);
TRACEPOINT(
rclcpp_subscription_init,
(const void *)get_subscription_handle().get(),
(const void *)subscription_intra_process.get());
// Add it to the intra process manager.
intra_process_subscription_id = ipm->add_subscription(subscription_intra_process);
}
{
using SerializedMessageAllocatorTraits =
allocator::AllocRebind<rclcpp::SerializedMessage,
AllocatorT>;
using SerializedMessageAllocator =
typename SerializedMessageAllocatorTraits::allocator_type;
using SerializedMessageDeleter = allocator::Deleter<SerializedMessageAllocator,
rclcpp::SerializedMessage>;
using SerializedMessageUniquePtr =
std::unique_ptr<rclcpp::SerializedMessage,
SerializedMessageDeleter>;
// First create a SubscriptionIntraProcess which will be given to the intra-process manager.
auto subscription_intra_process = std::make_shared<
rclcpp::experimental::SubscriptionIntraProcess<
rclcpp::SerializedMessage,
AllocatorT,
typename SerializedMessageUniquePtr::deleter_type,
CallbackMessageT
>>(
callback,
options.get_allocator(),
context,
this->get_topic_name(), // important to get it by the fully-qualified name
qos.get_rmw_qos_profile(),
resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback),
std::make_shared<rclcpp::Serialization>(
type_support_handle,
options.template to_rcl_subscription_options<CallbackMessageT>(qos).allocator)
);
TRACEPOINT(
rclcpp_subscription_init,
(const void *)get_subscription_handle().get(),
(const void *)subscription_intra_process.get());
// Add it to the intra process manager.
intra_process_subscription_id_serialized = ipm->add_subscription(
subscription_intra_process,
true);
}
this->setup_intra_process(
{intra_process_subscription_id,
intra_process_subscription_id_serialized}, ipm);
}
TRACEPOINT(

View File

@@ -228,13 +228,13 @@ public:
RCLCPP_PUBLIC
void
setup_intra_process(
uint64_t intra_process_subscription_id,
const std::vector<uint64_t> & intra_process_subscription_ids,
IntraProcessManagerWeakPtr weak_ipm);
/// Return the waitable for intra-process, or nullptr if intra-process is not setup.
RCLCPP_PUBLIC
rclcpp::Waitable::SharedPtr
get_intra_process_waitable() const;
std::vector<rclcpp::Waitable::SharedPtr>
get_intra_process_waitables() const;
/// Exchange state of whether or not a part of the subscription is used by a wait set.
/**
@@ -286,7 +286,7 @@ protected:
bool use_intra_process_;
IntraProcessManagerWeakPtr weak_ipm_;
uint64_t intra_process_subscription_id_;
std::vector<uint64_t> intra_process_subscription_ids_;
private:
RCLCPP_DISABLE_COPY(SubscriptionBase)

View File

@@ -78,7 +78,8 @@ SubscriptionFactory
create_subscription_factory(
CallbackT && callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat)
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat,
const rosidl_message_type_support_t & type_support)
{
auto allocator = options.get_allocator();
@@ -88,7 +89,7 @@ create_subscription_factory(
SubscriptionFactory factory {
// factory function that creates a MessageT specific SubscriptionT
[options, msg_mem_strat, any_subscription_callback](
[options, msg_mem_strat, any_subscription_callback, type_support](
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic_name,
const rclcpp::QoS & qos
@@ -99,7 +100,7 @@ create_subscription_factory(
auto sub = Subscription<CallbackMessageT, AllocatorT>::make_shared(
node_base,
*rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
type_support,
topic_name,
qos,
any_subscription_callback,

View File

@@ -18,6 +18,7 @@
#include <memory>
#include "rclcpp/function_traits.hpp"
#include "rclcpp/subscription_options.hpp"
#include "rcl/types.h"
namespace rclcpp
@@ -75,6 +76,7 @@ struct extract_message_type<std::unique_ptr<MessageT, Deleter>>: extract_message
template<
typename CallbackT,
typename AllocatorT = std::allocator<void>,
// Do not attempt if CallbackT is an integer (mistaken for depth)
typename = std::enable_if_t<!std::is_integral<
std::remove_cv_t<std::remove_reference_t<CallbackT>>>::value>,
@@ -85,6 +87,10 @@ template<
// Do not attempt if CallbackT is a rmw_qos_profile_t (mistaken for qos profile)
typename = std::enable_if_t<!std::is_same<
rmw_qos_profile_t,
std::remove_cv_t<std::remove_reference_t<CallbackT>>>::value>,
// Do not attempt if CallbackT is a rclcpp::SubscriptionOptionsWithAllocator
typename = std::enable_if_t<!std::is_same<
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>,
std::remove_cv_t<std::remove_reference_t<CallbackT>>>::value>
>
struct has_message_type : extract_message_type<

View File

@@ -165,18 +165,19 @@ public:
}
if (mask.include_intra_process_waitable) {
auto local_subscription = inner_subscription;
auto waitable = inner_subscription->get_intra_process_waitable();
if (nullptr != waitable) {
bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state(
waitable.get(),
true);
if (already_in_use) {
throw std::runtime_error(
"subscription intra-process waitable already associated with a wait set");
for (auto waitable : inner_subscription->get_intra_process_waitables()) {
if (nullptr != waitable) {
bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state(
waitable.get(),
true);
if (already_in_use) {
throw std::runtime_error(
"subscription intra-process waitable already associated with a wait set");
}
this->storage_add_waitable(
std::move(waitable),
std::move(local_subscription));
}
this->storage_add_waitable(
std::move(inner_subscription->get_intra_process_waitable()),
std::move(local_subscription));
}
}
});
@@ -230,11 +231,12 @@ public:
}
}
if (mask.include_intra_process_waitable) {
auto local_waitable = inner_subscription->get_intra_process_waitable();
inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false);
if (nullptr != local_waitable) {
// This is the case when intra process is disabled for the subscription.
this->storage_remove_waitable(std::move(local_waitable));
for (auto local_waitable : inner_subscription->get_intra_process_waitables()) {
inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false);
if (nullptr != local_waitable) {
// This is the case when intra process is disabled for the subscription.
this->storage_remove_waitable(std::move(local_waitable));
}
}
}
});

View File

@@ -32,7 +32,9 @@ IntraProcessManager::~IntraProcessManager()
{}
uint64_t
IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
IntraProcessManager::add_publisher(
rclcpp::PublisherBase::SharedPtr publisher,
const bool is_serialized)
{
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
@@ -41,6 +43,7 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
publishers_[id].publisher = publisher;
publishers_[id].topic_name = publisher->get_topic_name();
publishers_[id].qos = publisher->get_actual_qos().get_rmw_qos_profile();
publishers_[id].is_serialized = is_serialized;
// Initialize the subscriptions storage for this publisher.
pub_to_subs_[id] = SplittedSubscriptions();
@@ -56,7 +59,9 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
}
uint64_t
IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr subscription)
IntraProcessManager::add_subscription(
SubscriptionIntraProcessBase::SharedPtr subscription,
const bool is_serialized)
{
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
@@ -66,6 +71,7 @@ IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr su
subscriptions_[id].topic_name = subscription->get_topic_name();
subscriptions_[id].qos = subscription->get_actual_qos();
subscriptions_[id].use_take_shared_method = subscription->use_take_shared_method();
subscriptions_[id].is_serialized = is_serialized;
// adds the subscription id to all the matchable publishers
for (auto & pair : publishers_) {
@@ -220,6 +226,13 @@ IntraProcessManager::can_communicate(
return false;
}
// a publisher and a subscription with different content type can't communicate
// if is_serialized is true, the expected message typ is rcl_serialized_message_t
// otherwise the templated ROS2 message type
if (sub_info.is_serialized != pub_info.is_serialized) {
return false;
}
return true;
}

View File

@@ -99,10 +99,12 @@ NodeTopics::add_subscription(
callback_group->add_waitable(subscription_event);
}
auto intra_process_waitable = subscription->get_intra_process_waitable();
if (nullptr != intra_process_waitable) {
// Add to the callback group to be notified about intra-process msgs.
callback_group->add_waitable(intra_process_waitable);
const auto intra_process_waitables = subscription->get_intra_process_waitables();
for (auto & intra_process_waitable : intra_process_waitables) {
if (nullptr != intra_process_waitable) {
// Add to the callback group to be notified about intra-process msgs.
callback_group->add_waitable(intra_process_waitable);
}
}
// Notify the executor that a new subscription was created using the parent Node.

View File

@@ -44,7 +44,8 @@ PublisherBase::PublisherBase(
const rosidl_message_type_support_t & type_support,
const rcl_publisher_options_t & publisher_options)
: rcl_node_handle_(node_base->get_shared_rcl_node_handle()),
intra_process_is_enabled_(false), intra_process_publisher_id_(0)
intra_process_is_enabled_(false), intra_process_publisher_id_(0),
intra_process_publisher_id_serialized_(0)
{
rcl_ret_t ret = rcl_publisher_init(
&publisher_handle_,
@@ -105,6 +106,7 @@ PublisherBase::~PublisherBase()
return;
}
ipm->remove_publisher(intra_process_publisher_id_);
ipm->remove_publisher(intra_process_publisher_id_serialized_);
}
const char *
@@ -238,9 +240,11 @@ PublisherBase::operator==(const rmw_gid_t * gid) const
void
PublisherBase::setup_intra_process(
uint64_t intra_process_publisher_id,
uint64_t intra_process_publisher_id_serialized,
IntraProcessManagerSharedPtr ipm)
{
intra_process_publisher_id_ = intra_process_publisher_id;
intra_process_publisher_id_serialized_ = intra_process_publisher_id_serialized;
weak_ipm_ = ipm;
intra_process_is_enabled_ = true;
}

View File

@@ -39,7 +39,6 @@ SubscriptionBase::SubscriptionBase(
: node_base_(node_base),
node_handle_(node_base_->get_shared_rcl_node_handle()),
use_intra_process_(false),
intra_process_subscription_id_(0),
type_support_(type_support_handle),
is_serialized_(is_serialized)
{
@@ -93,7 +92,9 @@ SubscriptionBase::~SubscriptionBase()
"Intra process manager died before than a subscription.");
return;
}
ipm->remove_subscription(intra_process_subscription_id_);
for (auto intra_process_subscription_id : intra_process_subscription_ids_) {
ipm->remove_subscription(intra_process_subscription_id);
}
}
const char *
@@ -204,10 +205,10 @@ SubscriptionBase::get_publisher_count() const
void
SubscriptionBase::setup_intra_process(
uint64_t intra_process_subscription_id,
const std::vector<uint64_t> & intra_process_subscription_ids,
IntraProcessManagerWeakPtr weak_ipm)
{
intra_process_subscription_id_ = intra_process_subscription_id;
intra_process_subscription_ids_ = intra_process_subscription_ids;
weak_ipm_ = weak_ipm;
use_intra_process_ = true;
}
@@ -218,23 +219,29 @@ SubscriptionBase::can_loan_messages() const
return rcl_subscription_can_loan_messages(subscription_handle_.get());
}
rclcpp::Waitable::SharedPtr
SubscriptionBase::get_intra_process_waitable() const
std::vector<rclcpp::Waitable::SharedPtr>
SubscriptionBase::get_intra_process_waitables() const
{
// If not using intra process, shortcut to nullptr.
if (!use_intra_process_) {
return nullptr;
return std::vector<rclcpp::Waitable::SharedPtr>();
}
// Get the intra process manager.
auto ipm = weak_ipm_.lock();
if (!ipm) {
throw std::runtime_error(
"SubscriptionBase::get_intra_process_waitable() called "
"SubscriptionBase::get_intra_process_waitables() called "
"after destruction of intra process manager");
}
// Use the id to retrieve the subscription intra-process from the intra-process manager.
return ipm->get_subscription_intra_process(intra_process_subscription_id_);
std::vector<rclcpp::Waitable::SharedPtr> waitables(intra_process_subscription_ids_.size());
for (auto i = 0u; i < intra_process_subscription_ids_.size(); ++i) {
// Use the id to retrieve the subscription intra-process from the intra-process manager.
waitables[i] = ipm->get_subscription_intra_process(intra_process_subscription_ids_[i]);
}
return waitables;
}
void
@@ -274,8 +281,10 @@ SubscriptionBase::exchange_in_use_by_wait_set_state(
if (this == pointer_to_subscription_part) {
return subscription_in_use_by_wait_set_.exchange(in_use_state);
}
if (get_intra_process_waitable().get() == pointer_to_subscription_part) {
return intra_process_subscription_waitable_in_use_by_wait_set_.exchange(in_use_state);
for (auto & waitable : get_intra_process_waitables()) {
if (waitable.get() == pointer_to_subscription_part) {
return intra_process_subscription_waitable_in_use_by_wait_set_.exchange(in_use_state);
}
}
for (const auto & qos_event : event_handlers_) {
if (qos_event.get() == pointer_to_subscription_part) {

View File

@@ -0,0 +1,367 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include <test_msgs/message_fixtures.hpp>
#include <iostream>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "rclcpp/exceptions.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/rclcpp.hpp"
#include "rclcpp/serialized_message.hpp"
int32_t & get_test_allocation_counter()
{
static int32_t counter = 0;
return counter;
}
void * custom_allocate(size_t size, void * state)
{
static auto m_allocator = rcutils_get_default_allocator();
++get_test_allocation_counter();
auto r = m_allocator.allocate(size, state);
return r;
}
void * custom_zero_allocate(size_t number_of_elements, size_t size_of_element, void * state)
{
static auto m_allocator = rcutils_get_default_allocator();
++get_test_allocation_counter();
auto r = m_allocator.zero_allocate(number_of_elements, size_of_element, state);
return r;
}
void * custom_reallocate(void * pointer, size_t size, void * state)
{
static auto m_allocator = rcutils_get_default_allocator();
if (pointer == nullptr) {
++get_test_allocation_counter();
}
auto r = m_allocator.reallocate(pointer, size, state);
return r;
}
void custom_deallocate(void * pointer, void * state)
{
static auto m_allocator = rcutils_get_default_allocator();
--get_test_allocation_counter();
m_allocator.deallocate(pointer, state);
}
rcl_serialized_message_t make_serialized_string_msg(
const std::shared_ptr<test_msgs::msg::Strings> & stringMsg)
{
auto m_allocator = rcutils_get_default_allocator();
// add custom (de)allocator to count the references to the object
m_allocator.allocate = &custom_allocate;
m_allocator.deallocate = &custom_deallocate;
m_allocator.reallocate = &custom_reallocate;
m_allocator.zero_allocate = &custom_zero_allocate;
rcl_serialized_message_t msg = rmw_get_zero_initialized_serialized_message();
auto ret = rmw_serialized_message_init(&msg, 0, &m_allocator);
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
static auto type =
rosidl_typesupport_cpp::get_message_type_support_handle
<test_msgs::msg::Strings>();
auto error = rmw_serialize(stringMsg.get(), type, &msg);
if (error != RMW_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"test_intra_process_communication",
"Something went wrong preparing the serialized message");
}
return msg;
}
/**
* Parameterized test.
* The first param are the NodeOptions used to create the nodes.
* The second param are the expected intraprocess count results.
*/
struct TestParameters
{
rclcpp::NodeOptions node_options[2];
uint64_t intraprocess_count_results[2];
size_t runs;
std::string description;
};
std::ostream & operator<<(std::ostream & out, const TestParameters & params)
{
out << params.description;
return out;
}
class TestPublisherSubscriptionSerialized : public ::testing::TestWithParam<TestParameters>
{
public:
static void SetUpTestCase()
{
if (!rclcpp::ok()) {
rclcpp::init(0, nullptr);
}
}
protected:
static std::chrono::milliseconds offset;
};
std::chrono::milliseconds TestPublisherSubscriptionSerialized::offset = std::chrono::milliseconds(
250);
std::array<uint32_t, 2> counts;
void OnMessageSerialized(const std::shared_ptr<const rcl_serialized_message_t> msg)
{
EXPECT_NE(msg->buffer, nullptr);
EXPECT_GT(msg->buffer_capacity, 0u);
++counts[0];
}
void OnMessageConst(std::shared_ptr<const test_msgs::msg::Strings> msg)
{
EXPECT_EQ(msg->string_value.back(), '9');
++counts[1];
}
void OnMessageUniquePtr(std::unique_ptr<test_msgs::msg::Strings> msg)
{
EXPECT_EQ(msg->string_value.back(), '9');
++counts[1];
}
void OnMessage(std::shared_ptr<test_msgs::msg::Strings> msg)
{
EXPECT_EQ(msg->string_value.back(), '9');
++counts[1];
}
TEST_P(TestPublisherSubscriptionSerialized, publish_serialized)
{
get_test_allocation_counter() = 0;
TestParameters parameters = GetParam();
{
rclcpp::Node::SharedPtr node = std::make_shared<rclcpp::Node>(
"my_node",
"/ns",
parameters.node_options[0]);
auto publisher = node->create_publisher<test_msgs::msg::Strings>("/topic", 10);
auto sub_shared = node->create_subscription<test_msgs::msg::Strings>(
"/topic", 10,
&OnMessage);
auto sub_unique = node->create_subscription<test_msgs::msg::Strings>(
"/topic", 10,
&OnMessageUniquePtr);
auto sub_const_shared = node->create_subscription<test_msgs::msg::Strings>(
"/topic", 10,
&OnMessageConst);
auto sub_serialized = node->create_subscription<test_msgs::msg::Strings>(
"/topic", 10,
&OnMessageSerialized);
rclcpp::sleep_for(offset);
counts.fill(0);
auto stringMsg = get_messages_strings()[3];
for (size_t i = 0; i < parameters.runs; i++) {
auto msg0 = make_serialized_string_msg(stringMsg);
std::unique_ptr<test_msgs::msg::Strings> stringMsgU(
new test_msgs::msg::Strings(
*stringMsg));
publisher->publish(std::make_unique<rcl_serialized_message_t>(msg0));
publisher->publish(*stringMsg);
publisher->publish(std::move(stringMsgU));
}
for (uint32_t i = 0; i < 3; ++i) {
rclcpp::spin_some(node);
rclcpp::sleep_for(offset);
}
rclcpp::spin_some(node);
}
if (parameters.runs == 1) {
EXPECT_EQ(counts[0], 3u);
EXPECT_EQ(counts[1], 9u);
}
EXPECT_EQ(get_test_allocation_counter(), 0);
}
TEST_P(TestPublisherSubscriptionSerialized, publish_serialized_generic)
{
get_test_allocation_counter() = 0;
TestParameters parameters = GetParam();
{
rclcpp::Node::SharedPtr node = std::make_shared<rclcpp::Node>(
"my_node",
"/ns",
parameters.node_options[0]);
auto publisher = rclcpp::create_publisher<rcl_serialized_message_t>(
node,
"/topic",
*rosidl_typesupport_cpp::get_message_type_support_handle<test_msgs::msg::Strings>(),
rclcpp::QoS(10));
auto sub_gen_serialized = rclcpp::create_subscription<rcl_serialized_message_t>(
node,
"/topic",
*rosidl_typesupport_cpp::get_message_type_support_handle<test_msgs::msg::Strings>(),
rclcpp::QoS(10),
&OnMessageSerialized);
auto sub_shared = node->create_subscription<test_msgs::msg::Strings>(
"/topic", 10,
&OnMessage);
auto sub_unique = node->create_subscription<test_msgs::msg::Strings>(
"/topic", 10,
&OnMessageUniquePtr);
auto sub_const_shared = node->create_subscription<test_msgs::msg::Strings>(
"/topic", 10,
&OnMessageConst);
auto sub_serialized = node->create_subscription<test_msgs::msg::Strings>(
"/topic", 10,
&OnMessageSerialized);
rclcpp::sleep_for(offset);
counts.fill(0);
auto stringMsg = get_messages_strings()[3];
for (size_t i = 0; i < parameters.runs; i++) {
auto msg0 = make_serialized_string_msg(stringMsg);
publisher->publish(std::make_unique<rcl_serialized_message_t>(msg0));
}
rclcpp::spin_some(node);
rclcpp::sleep_for(offset);
rclcpp::spin_some(node);
}
if (parameters.runs == 1) {
EXPECT_EQ(counts[0], 2u);
EXPECT_EQ(counts[1], 3u);
}
EXPECT_EQ(get_test_allocation_counter(), 0);
}
auto get_new_context()
{
auto context = rclcpp::Context::make_shared();
context->init(0, nullptr);
return context;
}
std::vector<TestParameters> parameters = {
/*
Testing publisher subscription count api and internal process subscription count.
Two subscriptions in the same topic, both using intraprocess comm.
*/
{
{
rclcpp::NodeOptions().use_intra_process_comms(true),
rclcpp::NodeOptions().use_intra_process_comms(true)
},
{1u, 2u},
1,
"two_subscriptions_intraprocess_comm"
},
/*
Testing publisher subscription count api and internal process subscription count.
Two subscriptions, one using intra-process comm and the other not using it.
*/
{
{
rclcpp::NodeOptions().use_intra_process_comms(true),
rclcpp::NodeOptions().use_intra_process_comms(false)
},
{1u, 1u},
1,
"two_subscriptions_one_intraprocess_one_not"
},
/*
Testing publisher subscription count api and internal process subscription count.
Two contexts, both using intra-process.
*/
{
{
rclcpp::NodeOptions().use_intra_process_comms(true),
rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(true)
},
{1u, 1u},
1,
"two_subscriptions_in_two_contexts_with_intraprocess_comm"
},
/*
Testing publisher subscription count api and internal process subscription count.
Two contexts, both of them not using intra-process comm.
*/
{
{
rclcpp::NodeOptions().use_intra_process_comms(false),
rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(false)
},
{0u, 0u},
1,
"two_subscriptions_in_two_contexts_without_intraprocess_comm"
}
};
std::vector<TestParameters> setRuns(const std::vector<TestParameters> & in, const size_t runs)
{
std::vector<TestParameters> out = in;
for (auto & p : out) {
p.runs = runs;
}
return out;
}
INSTANTIATE_TEST_CASE_P(
TestWithDifferentNodeOptions, TestPublisherSubscriptionSerialized,
::testing::ValuesIn(parameters),
::testing::PrintToStringParamName());
INSTANTIATE_TEST_CASE_P(
TestWithDifferentNodeOptions1000Runs, TestPublisherSubscriptionSerialized,
::testing::ValuesIn(setRuns(parameters, 1000)),
::testing::PrintToStringParamName());

View File

@@ -68,6 +68,18 @@ public:
{
}
LifecyclePublisher(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<Alloc> & options,
const rosidl_message_type_support_t & type_support)
: rclcpp::Publisher<MessageT, Alloc>(node_base, topic, qos, options, type_support),
enabled_(false),
logger_(rclcpp::get_logger("LifecyclePublisher"))
{
}
~LifecyclePublisher() {}
/// LifecyclePublisher publish function
@@ -110,6 +122,30 @@ public:
rclcpp::Publisher<MessageT, Alloc>::publish(msg);
}
/// Publish a serialized message. Non specialized version to prevent compiling errors.
template<typename TDeleter, typename T>
void publish(std::unique_ptr<T, TDeleter> serialized_msg)
{
(void)serialized_msg;
throw std::runtime_error(
"publishing unique_ptr with custom deleter only supported for serialized messages");
}
/// Publish a serialized message.
template<typename TDeleter>
void publish(std::unique_ptr<rcl_serialized_message_t, TDeleter> serialized_msg)
{
if (!enabled_) {
RCLCPP_WARN(
logger_,
"Trying to publish message on the topic '%s', but the publisher is not activated",
this->get_topic_name());
return;
}
this->do_serialized_publish(*serialized_msg);
}
virtual void
on_activate()
{