Compare commits

...

53 Commits

Author SHA1 Message Date
Karsten Knese
82fa5bb5ed wip with multiple intra process ids
Signed-off-by: Karsten Knese <karsten@openrobotics.org>
2020-04-18 21:03:14 -07:00
Karsten Knese
4dfcc5f7a7 wip make serilize to serialize communication work
Signed-off-by: Karsten Knese <karsten@openrobotics.org>
2020-04-18 20:33:12 -07:00
Karsten Knese
e0fb0a4f5c more tests
Signed-off-by: Karsten Knese <karsten@openrobotics.org>
2020-04-18 15:09:07 -07:00
Karsten Knese
dd6564518d test for rclcpp::serializedmessage
Signed-off-by: Karsten Knese <karsten@openrobotics.org>
2020-04-18 12:52:47 -07:00
Karsten Knese
d89ef5098b use is_serialized from pub/sub base
Signed-off-by: Karsten Knese <karsten@openrobotics.org>
2020-04-18 12:06:22 -07:00
Karsten Knese
17f2d32250 remove instantiate_p
Signed-off-by: Karsten Knese <karsten@openrobotics.org>
2020-04-17 23:04:55 -07:00
Karsten Knese
7f04a19cb8 wip: make test run successfully
Signed-off-by: Karsten Knese <karsten@openrobotics.org>
2020-04-17 21:56:44 -07:00
Karsten Knese
d6bf9833b3 make serialized message a class
Signed-off-by: Karsten Knese <karsten@openrobotics.org>
2020-04-17 19:46:10 -07:00
Karsten Knese
1cd811a412 make it compile on osx
Signed-off-by: Karsten Knese <karsten@openrobotics.org>
2020-04-17 15:02:13 -07:00
Joshua Hampp
241164204d fixed include order
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
1f8226a6f3 line break
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
511476c445 fixed check
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
408343fb96 renamed variable
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
ff7a81f5dd reuse constructor
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
4fa5f090ed fixed memory loss
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
ab73a62297 updated loop signature
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
5472d363b1 added comment
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
35fd4908f8 fixed year in header
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
93dc82699b added original constructor to LifcyclePublisher
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
b482164b97 moved serialization and serialized message out of "experimental"
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
a2fc8bd867 enabled publishing of rcl_serialized_message_t
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Fabian König
9b436fa777 fix include guards
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
2147026fbf initializing variables
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
2cefdca297 added backwards compatibility for publishing serialized messages
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
4c7506f5aa uncrustify
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
18181f5d8d changes due to review of PR
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
9b65b7cefd * default deleter
* initiazlize variables

Co-Authored-By: Karsten Knese <Karsten1987@users.noreply.github.com>
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
DensoADAS
657d9a0be4 Update rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
Co-Authored-By: Karsten Knese <Karsten1987@users.noreply.github.com>
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
DensoADAS
7cff1aa85b Update rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
Co-Authored-By: Karsten Knese <Karsten1987@users.noreply.github.com>
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
d4536b3036 updated to rclcpp::ok
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
d74c4edac1 removed unnecessary include
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
d8f1da9c88 adapted to multi waitables for ipm
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
4e1e744624 removed unnecessary dependency
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
2ddbe32469 added missing member from rebase
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
4c80594a6f fixed code style
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
bc228ee032 uncrustified file
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
0c83e4981d updated test_intra_process_communication:
* fixed allocation counter
 * updated data type naming

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
af14ee76be * fixed code style
* updated error messages
* added const modifier

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
d7a84eaa66 changed timing
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
a3db0069b6 beautified error output
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
5faf8f40e7 changed include notation
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
02921427f7 added unit test for serialized intra process communication
* test memory (de)allocation
 * test communication channels between serialized and unserialized content

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
56b520eb95 extended "create_subscription" and "create_subscriptioncreate_publisher_factory" to pass message type
* backwards compatible
 * allows creation of publisher with type "rcl_serialized_message_t"

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
f9ef2c2fd5 extended "Subscription" for serialized messages:
* implemented second communication channel for serialized intra process messages

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
29fea926af extended "create_publisher" and "create_publisher_factory" to pass message type
* backwards compatible
 * allows creation of publisher with type "rcl_serialized_message_t"

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
c8522b3cd6 updated "LifcecylePublisher" for serialized messages
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
9d7ef52885 extended "Publisher" for serialized messages:
* implemented second communication channel for serialized intra process messages
 * extend publish to handle "rcl_serialized_message_t"
 * changed behaviour of serialized message publishing by taking ownership of message (for deletion)

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
86bd3a3c80 extended "Publisher" for serialized messages:
* implemented second communication channel for serialized intra process messages
extended "Publisher" for serialized messages:
 * pass message type by argument
 * added constructor for backwards compatibility (moved common code in separate methods "init_setup")
 * added allocator for serialized messages

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:44 -07:00
Joshua Hampp
a300c2f4ae added secon communication channel for intra process communication for serialized communication in publisher base and subscriber base (adapted waitables)
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:11 -07:00
Joshua Hampp
9f1c10f1c4 extended "SubscriptionIntraProcess" for serialized communication by:
* distinguish between content type and callback type
 * use "SerializedContainer" for serialized messages for memory deletion
 * added specialized methods for combinations of (un)serialized content/callback
 * automatically (de)serialize messages
 * allowed communication types are now (MessageT==CallbackMessageT || MessageT==SerializedContainer || CallbackMessageT==rcl_serialized_message_t)

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:11 -07:00
Joshua Hampp
f3aab4f6c5 extended "IntraProcessManager" for serialized messages:
* added flag for serialized communication
 * check for flag in "can_communicate"

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:11 -07:00
Joshua Hampp
c8162ba861 added "SerializedContainer" as container of rcl_serialized_message_t, for addind a deleter to ease up memory handling
further features:
 * copy constructor (allowing static memory allocation, e.g. if for static memory allocation in device driver)
 * destructor

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:11 -07:00
Joshua Hampp
9b73845f89 added "SerializationBase" and "Serialization" to convert a ROS2 message to rcl_serialized_message_t and vice versa
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
2020-04-17 13:20:11 -07:00
30 changed files with 1578 additions and 151 deletions

View File

@@ -77,6 +77,8 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/publisher_base.cpp
src/rclcpp/qos.cpp
src/rclcpp/qos_event.cpp
src/rclcpp/serialization.cpp
src/rclcpp/serialized_message.cpp
src/rclcpp/service.cpp
src/rclcpp/signal_handler.cpp
src/rclcpp/subscription_base.cpp
@@ -245,6 +247,18 @@ if(BUILD_TESTING)
"test_msgs"
)
target_link_libraries(test_loaned_message ${PROJECT_NAME})
ament_add_gtest(test_intra_process_communication test/test_intra_process_communication.cpp
TIMEOUT 120)
if(TARGET test_intra_process_communication)
ament_target_dependencies(test_intra_process_communication
"rcl"
"rcl_interfaces"
"rmw"
"rosidl_typesupport_cpp"
"test_msgs"
)
endif()
target_link_libraries(test_intra_process_communication ${PROJECT_NAME})
ament_add_gtest(test_node test/test_node.cpp TIMEOUT 240)
if(TARGET test_node)
@@ -393,6 +407,15 @@ if(BUILD_TESTING)
${PROJECT_NAME}
)
endif()
ament_add_gtest(test_serialized_message test/test_serialized_message.cpp)
if(TARGET test_serialized_message)
ament_target_dependencies(test_serialized_message
test_msgs
)
target_link_libraries(test_serialized_message
${PROJECT_NAME}
)
endif()
ament_add_gtest(test_service test/test_service.cpp)
if(TARGET test_service)
ament_target_dependencies(test_service

View File

@@ -43,6 +43,7 @@ std::shared_ptr<PublisherT>
create_publisher(
NodeT & node,
const std::string & topic_name,
const rosidl_message_type_support_t & type_support,
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options = (
rclcpp::PublisherOptionsWithAllocator<AllocatorT>()
@@ -56,7 +57,7 @@ create_publisher(
// Create the publisher.
auto pub = node_topics->create_publisher(
topic_name,
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(options),
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(options, type_support),
qos
);
@@ -66,6 +67,27 @@ create_publisher(
return std::dynamic_pointer_cast<PublisherT>(pub);
}
template<
typename MessageT,
typename AllocatorT = std::allocator<void>,
typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>,
typename NodeT>
std::shared_ptr<PublisherT>
create_publisher(
NodeT & node,
const std::string & topic_name,
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options = (
rclcpp::PublisherOptionsWithAllocator<AllocatorT>()
)
)
{
const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>();
return create_publisher<MessageT, AllocatorT, PublisherT, NodeT>(
node, topic_name, type_support, qos, options);
}
} // namespace rclcpp
#endif // RCLCPP__CREATE_PUBLISHER_HPP_

View File

@@ -51,14 +51,13 @@ typename std::shared_ptr<SubscriptionT>
create_subscription(
NodeT && node,
const std::string & topic_name,
const rosidl_message_type_support_t & type_support,
const rclcpp::QoS & qos,
CallbackT && callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options = (
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>()
),
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = (
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options =
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>(),
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat =
MessageMemoryStrategyT::create_default()
)
)
{
using rclcpp::node_interfaces::get_node_topics_interface;
@@ -67,7 +66,8 @@ create_subscription(
auto factory = rclcpp::create_subscription_factory<MessageT>(
std::forward<CallbackT>(callback),
options,
msg_mem_strat
msg_mem_strat,
type_support
);
auto sub = node_topics->create_subscription(topic_name, factory, qos);
@@ -76,6 +76,39 @@ create_subscription(
return std::dynamic_pointer_cast<SubscriptionT>(sub);
}
template<
typename MessageT,
typename CallbackT,
typename AllocatorT = std::allocator<void>,
typename CallbackMessageT =
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type,
typename SubscriptionT = rclcpp::Subscription<CallbackMessageT, AllocatorT>,
typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
CallbackMessageT,
AllocatorT
>,
typename NodeT>
typename std::shared_ptr<SubscriptionT>
create_subscription(
NodeT && node,
const std::string & topic_name,
const rclcpp::QoS & qos,
CallbackT && callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options =
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>(),
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat =
MessageMemoryStrategyT::create_default()
)
{
const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>();
return create_subscription<MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT,
MessageMemoryStrategyT>(
std::forward<NodeT>(
node), topic_name, type_support, qos, std::forward<CallbackT>(
callback), options, msg_mem_strat);
}
} // namespace rclcpp
#endif // RCLCPP__CREATE_SUBSCRIPTION_HPP_

View File

@@ -138,7 +138,7 @@ public:
*/
RCLCPP_PUBLIC
uint64_t
add_publisher(rclcpp::PublisherBase::SharedPtr publisher);
add_publisher(rclcpp::PublisherBase::SharedPtr publisher, bool is_serialized = false);
/// Unregister a publisher using the publisher's unique id.
/**
@@ -311,6 +311,7 @@ private:
rmw_qos_profile_t qos;
const char * topic_name;
bool use_take_shared_method;
bool is_serialized;
};
struct PublisherInfo
@@ -320,6 +321,7 @@ private:
rclcpp::PublisherBase::WeakPtr publisher;
rmw_qos_profile_t qos;
const char * topic_name;
bool is_serialized;
};
struct SplittedSubscriptions

View File

@@ -28,6 +28,8 @@
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
#include "rclcpp/serialization.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/waitable.hpp"
#include "tracetools/tracetools.h"
@@ -37,6 +39,8 @@ namespace rclcpp
namespace experimental
{
class SerializedMessage;
template<
typename MessageT,
typename Alloc = std::allocator<void>,
@@ -51,6 +55,10 @@ public:
using MessageAlloc = typename MessageAllocTraits::allocator_type;
using ConstMessageSharedPtr = std::shared_ptr<const MessageT>;
using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
using CallbackMessageAllocTraits = allocator::AllocRebind<CallbackMessageT, Alloc>;
using CallbackMessageAlloc = typename CallbackMessageAllocTraits::allocator_type;
using CallbackMessageUniquePtr = std::unique_ptr<CallbackMessageT>;
using CallbackMessageSharedPtr = std::shared_ptr<CallbackMessageT>;
using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer<
MessageT,
@@ -64,11 +72,15 @@ public:
rclcpp::Context::SharedPtr context,
const std::string & topic_name,
rmw_qos_profile_t qos_profile,
rclcpp::IntraProcessBufferType buffer_type)
: SubscriptionIntraProcessBase(topic_name, qos_profile),
any_callback_(callback)
rclcpp::IntraProcessBufferType buffer_type,
std::shared_ptr<SerializationBase> serializer,
bool is_serialized = false)
: SubscriptionIntraProcessBase(topic_name, qos_profile, is_serialized),
any_callback_(callback), serializer_(serializer)
{
if (!std::is_same<MessageT, CallbackMessageT>::value) {
if (!std::is_same<MessageT, CallbackMessageT>::value &&
!std::is_base_of<rcl_serialized_message_t, MessageT>::value)
{
throw std::runtime_error("SubscriptionIntraProcess wrong callback type");
}
@@ -142,18 +154,49 @@ private:
(void)ret;
}
// convert from ROS2 message to rcl_serialized_message_t (serilizatino needed)
template<typename T>
typename std::enable_if<std::is_same<T, rcl_serialized_message_t>::value, void>::type
typename std::enable_if<
!std::is_base_of<rcl_serialized_message_t, MessageT>::value && // tx not a SerializedMessage
std::is_base_of<rcl_serialized_message_t, T>::value, // rx a SerializedMessage
void>::type
execute_impl()
{
throw std::runtime_error("Subscription intra-process can't handle serialized messages");
fprintf(stderr, "tx ROS2, rx serialized\n");
if (nullptr == serializer_) {
throw std::runtime_error("Subscription intra-process can't handle serialized messages");
}
rmw_message_info_t msg_info = {};
msg_info.from_intra_process = true;
ConstMessageSharedPtr msg = buffer_->consume_shared();
auto serialized_msg = std::make_shared<rclcpp::SerializedMessage>();
serializer_->serialize_message(reinterpret_cast<const void *>(msg.get()), serialized_msg.get());
if (0u == serialized_msg->buffer_length) {
throw std::runtime_error("Subscription intra-process could not serialize message");
}
if (any_callback_.use_take_shared_method()) {
any_callback_.dispatch_intra_process(std::static_pointer_cast<rcl_serialized_message_t>(serialized_msg), msg_info);
} else {
throw std::runtime_error(
"Subscription intra-process for serialized "
"messages does not support unique pointers.");
}
}
// forward from ROS2 message to ROS2 message (same type)
template<class T>
typename std::enable_if<!std::is_same<T, rcl_serialized_message_t>::value, void>::type
typename std::enable_if<
!std::is_base_of<rcl_serialized_message_t, MessageT>::value && // tx not a SerializedMessage
!std::is_base_of<rcl_serialized_message_t, T>::value, // rx not a SerializedMessage
void>::type
execute_impl()
{
rmw_message_info_t msg_info;
fprintf(stderr, "tx ROS2, rx ROS2\n");
rmw_message_info_t msg_info = {};
msg_info.publisher_gid = {0, {0}};
msg_info.from_intra_process = true;
@@ -166,8 +209,83 @@ private:
}
}
// forward from rcl_serialized_message_t to rcl_serialized_message_t (no conversion needed)
template<typename T>
typename std::enable_if<
std::is_base_of<rcl_serialized_message_t, MessageT>::value && // tx a SerializedMessage
std::is_base_of<rcl_serialized_message_t, T>::value, // rx a SerializedMessage
void>::type
execute_impl()
{
fprintf(stderr, "tx serialize, rx serialize\n");
rmw_message_info_t msg_info = {};
msg_info.from_intra_process = true;
if (any_callback_.use_take_shared_method()) {
ConstMessageSharedPtr msg = buffer_->consume_shared();
if (msg == nullptr) {
throw std::runtime_error("Subscription intra-process could not get serialized message");
}
any_callback_.dispatch_intra_process(msg, msg_info);
} else {
throw std::runtime_error(
"Subscription intra-process for serialized "
"messages does not support unique pointers.");
}
}
// convert from rcl_serialized_message_t to ROS2 message (deserialization needed)
template<class T>
typename std::enable_if<
std::is_base_of<rcl_serialized_message_t, MessageT>::value && // tx a SerializedMessage
!std::is_base_of<rcl_serialized_message_t, T>::value, // rx not a SerializedMessage
void>::type
execute_impl()
{
fprintf(stderr, "tx rcl, rx ROS2\n");
if (nullptr == serializer_) {
throw std::runtime_error("Subscription intra-process can't handle unserialized messages");
}
ConstMessageSharedPtr serialized_message = buffer_->consume_shared();
if (nullptr == serialized_message) {
throw std::runtime_error("Subscription intra-process could not get serialized message");
}
rmw_message_info_t msg_info = {};
msg_info.from_intra_process = true;
if (any_callback_.use_take_shared_method()) {
CallbackMessageSharedPtr msg = construct_unique();
serializer_->deserialize_message(
serialized_message.get(),
reinterpret_cast<void *>(msg.get()));
any_callback_.dispatch_intra_process(msg, msg_info);
} else {
// construct unique might not initialize the ros message?
CallbackMessageUniquePtr msg = construct_unique();
serializer_->deserialize_message(
serialized_message.get(),
reinterpret_cast<void *>(msg.get()));
any_callback_.dispatch_intra_process(std::move(msg), msg_info);
}
}
CallbackMessageUniquePtr construct_unique()
{
CallbackMessageUniquePtr unique_msg;
auto ptr = CallbackMessageAllocTraits::allocate(*message_allocator_.get(), 1);
CallbackMessageAllocTraits::construct(*message_allocator_.get(), ptr);
unique_msg = CallbackMessageUniquePtr(ptr);
return unique_msg;
}
AnySubscriptionCallback<CallbackMessageT, Alloc> any_callback_;
BufferUniquePtr buffer_;
std::shared_ptr<SerializationBase> serializer_;
std::shared_ptr<CallbackMessageAlloc> message_allocator_ =
std::make_shared<CallbackMessageAlloc>();
};
} // namespace experimental

View File

@@ -39,8 +39,11 @@ public:
RCLCPP_SMART_PTR_ALIASES_ONLY(SubscriptionIntraProcessBase)
RCLCPP_PUBLIC
SubscriptionIntraProcessBase(const std::string & topic_name, rmw_qos_profile_t qos_profile)
: topic_name_(topic_name), qos_profile_(qos_profile)
SubscriptionIntraProcessBase(
const std::string & topic_name,
rmw_qos_profile_t qos_profile,
bool is_serialized)
: topic_name_(topic_name), qos_profile_(qos_profile), is_serialized_(is_serialized)
{}
virtual ~SubscriptionIntraProcessBase() = default;
@@ -70,6 +73,10 @@ public:
rmw_qos_profile_t
get_actual_qos() const;
RCLCPP_PUBLIC
bool
is_serialized() const;
protected:
std::recursive_mutex reentrant_mutex_;
rcl_guard_condition_t gc_;
@@ -80,6 +87,7 @@ private:
std::string topic_name_;
rmw_qos_profile_t qos_profile_;
bool is_serialized_;
};
} // namespace experimental

View File

@@ -56,6 +56,12 @@ public:
using MessageDeleter = allocator::Deleter<MessageAllocator, MessageT>;
using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>;
using MessageSharedPtr = std::shared_ptr<const MessageT>;
using SerializedMessageAllocatorTraits =
allocator::AllocRebind<rclcpp::SerializedMessage,
AllocatorT>;
using SerializedMessageAllocator = typename SerializedMessageAllocatorTraits::allocator_type;
using SerializedMessageDeleter = allocator::Deleter<SerializedMessageAllocator,
rclcpp::SerializedMessage>;
RCLCPP_SMART_PTR_DEFINITIONS(Publisher<MessageT, AllocatorT>)
@@ -63,44 +69,39 @@ public:
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options)
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options,
bool is_serialized = false)
: PublisherBase(
node_base,
topic,
*rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
options.template to_rcl_publisher_options<MessageT>(qos)),
options.template to_rcl_publisher_options<MessageT>(qos),
is_serialized),
options_(options),
message_allocator_(new MessageAllocator(*options.get_allocator().get()))
message_allocator_(new MessageAllocator(*options.get_allocator().get())),
message_allocator_serialized_(new SerializedMessageAllocator(*options.get_allocator().get()))
{
allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get());
init_setup();
}
if (options_.event_callbacks.deadline_callback) {
this->add_event_handler(
options_.event_callbacks.deadline_callback,
RCL_PUBLISHER_OFFERED_DEADLINE_MISSED);
}
if (options_.event_callbacks.liveliness_callback) {
this->add_event_handler(
options_.event_callbacks.liveliness_callback,
RCL_PUBLISHER_LIVELINESS_LOST);
}
if (options_.event_callbacks.incompatible_qos_callback) {
this->add_event_handler(
options_.event_callbacks.incompatible_qos_callback,
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
} else if (options_.use_default_callbacks) {
// Register default callback when not specified
try {
this->add_event_handler(
[this](QOSOfferedIncompatibleQoSInfo & info) {
this->default_incompatible_qos_callback(info);
},
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
} catch (UnsupportedEventTypeException & /*exc*/) {
// pass
}
}
// Setup continues in the post construction method, post_init_setup().
Publisher(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options,
const rosidl_message_type_support_t & type_support,
bool is_serialized = false)
: PublisherBase(
node_base,
topic,
type_support,
options.template to_rcl_publisher_options<MessageT>(qos),
is_serialized),
options_(options),
message_allocator_(new MessageAllocator(*options.get_allocator().get())),
message_allocator_serialized_(new SerializedMessageAllocator(*options.get_allocator().get()))
{
init_setup();
}
/// Called post construction, so that construction may continue after shared_from_this() works.
@@ -134,10 +135,9 @@ public:
throw std::invalid_argument(
"intraprocess communication allowed only with volatile durability");
}
uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this());
this->setup_intra_process(
intra_process_publisher_id,
ipm);
uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this(), false);
uint64_t intra_process_publisher_id_serialized = ipm->add_publisher(this->shared_from_this(), true);
this->setup_intra_process(intra_process_publisher_id, intra_process_publisher_id_serialized, ipm);
}
}
@@ -173,6 +173,11 @@ public:
virtual void
publish(std::unique_ptr<MessageT, MessageDeleter> msg)
{
//if (std::is_same<MessageT, rcl_serialized_message_t>::value) {
// this->template publish<MessageDeleter>(std::move(msg));
// return;
//}
if (!intra_process_is_enabled_) {
this->do_inter_process_publish(*msg);
return;
@@ -187,34 +192,48 @@ public:
get_subscription_count() > get_intra_process_subscription_count();
if (inter_process_publish_needed) {
auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg));
auto shared_msg = this->do_intra_process_publish_and_return_shared(
std::move(msg), message_allocator_);
this->do_inter_process_publish(*shared_msg);
} else {
this->do_intra_process_publish(std::move(msg));
this->do_intra_process_publish(std::move(msg), message_allocator_);
}
}
virtual void
publish(const MessageT & msg)
{
// Avoid allocating when not using intra process.
if (!intra_process_is_enabled_) {
// In this case we're not using intra process.
return this->do_inter_process_publish(msg);
}
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
// As the message is not const, a copy should be made.
// A shared_ptr<const MessageT> could also be constructed here.
auto ptr = MessageAllocatorTraits::allocate(*message_allocator_.get(), 1);
MessageAllocatorTraits::construct(*message_allocator_.get(), ptr, msg);
MessageUniquePtr unique_msg(ptr, message_deleter_);
this->publish(std::move(unique_msg));
this->do_publish_message(msg);
}
void
template<class T = MessageT>
typename std::enable_if<!std::is_base_of<rcl_serialized_message_t, T>::value, void>::type
publish(const rcl_serialized_message_t & serialized_msg)
{
return this->do_serialized_publish(&serialized_msg);
this->do_publish_message<rcl_serialized_message_t>(serialized_msg);
}
/// Publish a serialized message. Non specialized version to prevent compiling errors.
template<typename TDeleter, typename T>
void publish(std::unique_ptr<T, TDeleter> serialized_msg)
{
(void)serialized_msg;
throw std::runtime_error(
"publishing unique_ptr with custom deleter only supported for serialized messages");
}
/// Publish a serialized message.
//template<typename TDeleter>
//void publish(std::unique_ptr<rcl_serialized_message_t, TDeleter> serialized_msg)
//{
// this->do_serialized_publish(*serialized_msg);
//}
/// Publish a serialized message.
template<typename TDeleter>
void publish(std::unique_ptr<SerializedMessage, TDeleter> serialized_msg)
{
this->do_serialized_publish(*serialized_msg);
}
/// Publish an instance of a LoanedMessage.
@@ -259,6 +278,69 @@ public:
}
protected:
void init_setup()
{
allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get());
if (options_.event_callbacks.deadline_callback) {
this->add_event_handler(
options_.event_callbacks.deadline_callback,
RCL_PUBLISHER_OFFERED_DEADLINE_MISSED);
}
if (options_.event_callbacks.liveliness_callback) {
this->add_event_handler(
options_.event_callbacks.liveliness_callback,
RCL_PUBLISHER_LIVELINESS_LOST);
}
if (options_.event_callbacks.incompatible_qos_callback) {
this->add_event_handler(
options_.event_callbacks.incompatible_qos_callback,
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
} else if (options_.use_default_callbacks) {
// Register default callback when not specified
try {
this->add_event_handler(
[this](QOSOfferedIncompatibleQoSInfo & info) {
this->default_incompatible_qos_callback(info);
},
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
} catch (UnsupportedEventTypeException & /*exc*/) {
RCLCPP_WARN_ONCE(
rclcpp::get_logger(rcl_node_get_logger_name(rcl_node_handle_.get())),
"This rmw implementation does not support ON_OFFERED_INCOMPATIBLE_QOS "
"events, you will not be notified when Publishers offer an incompatible "
"QoS profile to Subscriptions on the same topic.");
}
}
// Setup continues in the post construction method, post_init_setup().
}
template<class T = MessageT>
typename std::enable_if<!std::is_base_of<rcl_serialized_message_t, T>::value>::type
do_publish_message(const T & msg)
{
// Avoid allocating when not using intra process.
if (!intra_process_is_enabled_) {
// In this case we're not using intra process.
return this->do_inter_process_publish(msg);
}
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
// As the message is not const, a copy should be made.
// A shared_ptr<const MessageT> could also be constructed here.
auto ptr = MessageAllocatorTraits::allocate(*message_allocator_.get(), 1);
MessageAllocatorTraits::construct(*message_allocator_.get(), ptr, msg);
MessageUniquePtr unique_msg(ptr, message_deleter_);
this->publish(std::move(unique_msg));
}
template<class T = MessageT>
typename std::enable_if<std::is_base_of<rcl_serialized_message_t, T>::value>::type
do_publish_message(const T & msg)
{
// Kept for backwards compatibility. Copies compelete memory!
this->publish(std::make_unique<rclcpp::SerializedMessage>(msg));
}
void
do_inter_process_publish(const MessageT & msg)
{
@@ -280,15 +362,24 @@ protected:
}
void
do_serialized_publish(const rcl_serialized_message_t * serialized_msg)
do_serialized_publish(const SerializedMessage & serialized_msg)
{
if (intra_process_is_enabled_) {
// TODO(Karsten1987): support serialized message passed by intraprocess
throw std::runtime_error("storing serialized messages in intra process is not supported yet");
bool inter_process_publish_needed =
get_subscription_count() > get_intra_process_subscription_count();
if (inter_process_publish_needed) {
// declare here to avoid deletion before returning method
auto status = rcl_publish_serialized_message(&publisher_handle_, &serialized_msg, nullptr);
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message");
}
}
auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg, nullptr);
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message");
if (intra_process_is_enabled_) {
auto msg = std::make_unique<rclcpp::SerializedMessage>();
*msg = serialized_msg;
do_intra_process_publish(std::move(msg), message_allocator_serialized_);
}
}
@@ -312,8 +403,11 @@ protected:
}
}
template<typename T, typename Deleter, typename Allocator>
void
do_intra_process_publish(std::unique_ptr<MessageT, MessageDeleter> msg)
do_intra_process_publish(
std::unique_ptr<T, Deleter> msg,
std::shared_ptr<Allocator> & message_allocator)
{
auto ipm = weak_ipm_.lock();
if (!ipm) {
@@ -324,14 +418,17 @@ protected:
throw std::runtime_error("cannot publish msg which is a null pointer");
}
ipm->template do_intra_process_publish<MessageT, AllocatorT>(
ipm->template do_intra_process_publish<T, AllocatorT>(
intra_process_publisher_id_,
std::move(msg),
message_allocator_);
message_allocator);
}
std::shared_ptr<const MessageT>
do_intra_process_publish_and_return_shared(std::unique_ptr<MessageT, MessageDeleter> msg)
template<typename T, typename Deleter, typename Allocator>
std::shared_ptr<const T>
do_intra_process_publish_and_return_shared(
std::unique_ptr<T, Deleter> msg,
std::shared_ptr<Allocator> & message_allocator)
{
auto ipm = weak_ipm_.lock();
if (!ipm) {
@@ -342,10 +439,10 @@ protected:
throw std::runtime_error("cannot publish msg which is a null pointer");
}
return ipm->template do_intra_process_publish_and_return_shared<MessageT, AllocatorT>(
return ipm->template do_intra_process_publish_and_return_shared<T, AllocatorT>(
intra_process_publisher_id_,
std::move(msg),
message_allocator_);
message_allocator);
}
/// Copy of original options passed during construction.
@@ -356,6 +453,7 @@ protected:
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> options_;
std::shared_ptr<MessageAllocator> message_allocator_;
std::shared_ptr<SerializedMessageAllocator> message_allocator_serialized_;
MessageDeleter message_deleter_;
};

View File

@@ -73,7 +73,8 @@ public:
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rosidl_message_type_support_t & type_support,
const rcl_publisher_options_t & publisher_options);
const rcl_publisher_options_t & publisher_options,
bool is_serializd = false);
RCLCPP_PUBLIC
virtual ~PublisherBase();
@@ -191,8 +192,13 @@ public:
void
setup_intra_process(
uint64_t intra_process_publisher_id,
uint64_t intra_process_publisher_id_serialized,
IntraProcessManagerSharedPtr ipm);
RCLCPP_PUBLIC
bool
is_serialized() const;
protected:
template<typename EventCallbackT>
void
@@ -222,8 +228,12 @@ protected:
bool intra_process_is_enabled_;
IntraProcessManagerWeakPtr weak_ipm_;
uint64_t intra_process_publisher_id_;
uint64_t intra_process_publisher_id_serialized_;
rmw_gid_t rmw_gid_;
private:
bool is_serialized_;
};
} // namespace rclcpp

View File

@@ -63,17 +63,21 @@ struct PublisherFactory
/// Return a PublisherFactory with functions setup for creating a PublisherT<MessageT, AllocatorT>.
template<typename MessageT, typename AllocatorT, typename PublisherT>
PublisherFactory
create_publisher_factory(const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options)
create_publisher_factory(
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options,
const rosidl_message_type_support_t & type_support)
{
PublisherFactory factory {
// factory function that creates a MessageT specific PublisherT
[options](
[options, type_support](
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic_name,
const rclcpp::QoS & qos
) -> std::shared_ptr<PublisherT>
{
auto publisher = std::make_shared<PublisherT>(node_base, topic_name, qos, options);
auto is_serialized = std::is_base_of<rcl_serialized_message_t, MessageT>::value;
auto publisher = std::make_shared<PublisherT>(
node_base, topic_name, qos, options, type_support, is_serialized);
// This is used for setting up things like intra process comms which
// require this->shared_from_this() which cannot be called from
// the constructor.

View File

@@ -0,0 +1,68 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__SERIALIZATION_HPP_
#define RCLCPP__SERIALIZATION_HPP_
#include <memory>
#include <string>
#include "rosidl_runtime_c/message_type_support_struct.h"
namespace rclcpp
{
class SerializedMessage;
/// Interface to (de)serialize a message
class SerializationBase
{
public:
virtual ~SerializationBase() = default;
/// Serialize a ROS2 message to a serialized stream
/**
* \param[in] message The ROS2 message which is read and serialized by rmw.
*/
virtual void serialize_message(
const void * ros_message, SerializedMessage * serialized_message) const = 0;
/// Deserialize a serialized stream to a ROS message
/**
* \param[in] serialized_message The serialized message to be converted to ROS2 by rmw.
* \param[out] message The deserialized ROS2 message.
*/
virtual void deserialize_message(
const SerializedMessage * serialized_message, void * ros_message) const = 0;
};
/// Default implementation to (de)serialize a message by using rmw_(de)serialize
class Serialization : public SerializationBase
{
public:
Serialization(const rosidl_message_type_support_t & type_support);
void serialize_message(
const void * ros_message, SerializedMessage * serialized_message) const override;
void deserialize_message(
const SerializedMessage * serialized_message, void * ros_message) const override;
private:
rosidl_message_type_support_t type_support_;
};
} // namespace rclcpp
#endif // RCLCPP__SERIALIZATION_HPP_

View File

@@ -0,0 +1,76 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__SERIALIZED_MESSAGE_HPP_
#define RCLCPP__SERIALIZED_MESSAGE_HPP_
#include "rcl/allocator.h"
#include "rcl/types.h"
namespace rclcpp
{
/// Object oriented version of rcl_serialized_message_t with destructor to avoid memory leaks
class SerializedMessage : public rcl_serialized_message_t
{
public:
/// Default constructor for a SerializedMessage
/**
* Default constructs a serialized message and initalizes its
* capacity with 0.
*
* \param[in] allocator The allocator to be used for the initialzation.
*/
explicit SerializedMessage(
const rcl_allocator_t & allocator = rcl_get_default_allocator());
/// Default constructor for a SerializedMessage
/**
* Default constructs a serialized message and initalizes its
* capacity with 0.
*
* \param[in] initial_capacity The amount of memory to be allocated.
* \param[in] allocator The allocator to be used for the initialzation.
*/
SerializedMessage(
size_t initial_capacity,
const rcl_allocator_t & allocator = rcl_get_default_allocator());
/// Copy Constructor for a SerializedMessage
SerializedMessage(const SerializedMessage & serialized_message);
/// Copy Constructor for a SerializedMessage from a rcl_serialized_message_t
SerializedMessage(const rcl_serialized_message_t & serialized_message);
/// Move Constructor for a SerializedMessage
SerializedMessage(SerializedMessage && serialized_message);
/// Move Constructor for a SerializedMessage from a rcl_serialized_message_t
SerializedMessage(rcl_serialized_message_t && serialized_message);
SerializedMessage & operator=(const SerializedMessage & other);
SerializedMessage & operator=(const rcl_serialized_message_t & other);
SerializedMessage & operator=(SerializedMessage && other);
SerializedMessage & operator=(rcl_serialized_message_t && other);
/// Destructor for a SerializedMessage
~SerializedMessage();
};
} // namespace rclcpp
#endif // RCLCPP__SERIALIZED_MESSAGE_HPP_

View File

@@ -25,7 +25,6 @@
#include <string>
#include <utility>
#include "rcl/error_handling.h"
#include "rcl/subscription.h"
@@ -41,6 +40,7 @@
#include "rclcpp/message_info.hpp"
#include "rclcpp/message_memory_strategy.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/subscription_base.hpp"
#include "rclcpp/subscription_options.hpp"
#include "rclcpp/subscription_traits.hpp"
@@ -155,29 +155,80 @@ public:
"intraprocess communication allowed only with volatile durability");
}
// First create a SubscriptionIntraProcess which will be given to the intra-process manager.
auto context = node_base->get_context();
using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess<
CallbackMessageT,
AllocatorT,
typename MessageUniquePtr::deleter_type>;
auto subscription_intra_process = std::make_shared<SubscriptionIntraProcessT>(
callback,
options.get_allocator(),
context,
this->get_topic_name(), // important to get like this, as it has the fully-qualified name
qos_profile,
resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback));
TRACEPOINT(
rclcpp_subscription_init,
(const void *)get_subscription_handle().get(),
(const void *)subscription_intra_process.get());
// Add it to the intra process manager.
using rclcpp::experimental::IntraProcessManager;
uint64_t intra_process_subscription_id;
uint64_t intra_process_subscription_id_serialized;
auto context = node_base->get_context();
auto ipm = context->get_sub_context<IntraProcessManager>();
uint64_t intra_process_subscription_id = ipm->add_subscription(subscription_intra_process);
this->setup_intra_process(intra_process_subscription_id, ipm);
{
// First create a SubscriptionIntraProcess which will be given to the intra-process manager.
auto subscription_intra_process = std::make_shared<
rclcpp::experimental::SubscriptionIntraProcess<
CallbackMessageT,
AllocatorT,
typename MessageUniquePtr::deleter_type
>>(
callback,
options.get_allocator(),
context,
this->get_topic_name(), // important to get it by the fully-qualified name
qos.get_rmw_qos_profile(),
resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback),
std::make_shared<rclcpp::Serialization>(type_support_handle)
);
TRACEPOINT(
rclcpp_subscription_init,
(const void *)get_subscription_handle().get(),
(const void *)subscription_intra_process.get());
// Add it to the intra process manager.
intra_process_subscription_id = ipm->add_subscription(subscription_intra_process);
}
{
using SerializedMessageAllocatorTraits =
allocator::AllocRebind<rclcpp::SerializedMessage,
AllocatorT>;
using SerializedMessageAllocator =
typename SerializedMessageAllocatorTraits::allocator_type;
using SerializedMessageDeleter = allocator::Deleter<SerializedMessageAllocator,
rclcpp::SerializedMessage>;
using SerializedMessageUniquePtr =
std::unique_ptr<rclcpp::SerializedMessage,
SerializedMessageDeleter>;
// First create a SubscriptionIntraProcess which will be given to the intra-process manager.
auto subscription_intra_process = std::make_shared<
rclcpp::experimental::SubscriptionIntraProcess<
rclcpp::SerializedMessage,
AllocatorT,
typename SerializedMessageUniquePtr::deleter_type,
CallbackMessageT
>>(
callback,
options.get_allocator(),
context,
this->get_topic_name(), // important to get it by the fully-qualified name
qos.get_rmw_qos_profile(),
resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback),
std::make_shared<rclcpp::Serialization>(type_support_handle),
is_serialized()
);
TRACEPOINT(
rclcpp_subscription_init,
(const void *)get_subscription_handle().get(),
(const void *)subscription_intra_process.get());
// Add it to the intra process manager.
intra_process_subscription_id_serialized = ipm->add_subscription(
subscription_intra_process);
}
this->setup_intra_process(
{intra_process_subscription_id,
intra_process_subscription_id_serialized}, ipm);
}
TRACEPOINT(

View File

@@ -228,13 +228,13 @@ public:
RCLCPP_PUBLIC
void
setup_intra_process(
uint64_t intra_process_subscription_id,
const std::vector<uint64_t> & intra_process_subscription_ids,
IntraProcessManagerWeakPtr weak_ipm);
/// Return the waitable for intra-process, or nullptr if intra-process is not setup.
RCLCPP_PUBLIC
rclcpp::Waitable::SharedPtr
get_intra_process_waitable() const;
std::vector<rclcpp::Waitable::SharedPtr>
get_intra_process_waitables() const;
/// Exchange state of whether or not a part of the subscription is used by a wait set.
/**
@@ -286,7 +286,7 @@ protected:
bool use_intra_process_;
IntraProcessManagerWeakPtr weak_ipm_;
uint64_t intra_process_subscription_id_;
std::vector<uint64_t> intra_process_subscription_ids_;
private:
RCLCPP_DISABLE_COPY(SubscriptionBase)

View File

@@ -78,7 +78,8 @@ SubscriptionFactory
create_subscription_factory(
CallbackT && callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat)
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat,
const rosidl_message_type_support_t & type_support)
{
auto allocator = options.get_allocator();
@@ -88,7 +89,7 @@ create_subscription_factory(
SubscriptionFactory factory {
// factory function that creates a MessageT specific SubscriptionT
[options, msg_mem_strat, any_subscription_callback](
[options, msg_mem_strat, any_subscription_callback, type_support](
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic_name,
const rclcpp::QoS & qos
@@ -99,7 +100,7 @@ create_subscription_factory(
auto sub = Subscription<CallbackMessageT, AllocatorT>::make_shared(
node_base,
*rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
type_support,
topic_name,
qos,
any_subscription_callback,

View File

@@ -18,6 +18,8 @@
#include <memory>
#include "rclcpp/function_traits.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/subscription_options.hpp"
#include "rcl/types.h"
namespace rclcpp
@@ -49,6 +51,15 @@ struct is_serialized_subscription_argument<std::shared_ptr<rcl_serialized_messag
: std::true_type
{};
template<>
struct is_serialized_subscription_argument<rclcpp::SerializedMessage>: std::true_type
{};
template<>
struct is_serialized_subscription_argument<std::shared_ptr<rclcpp::SerializedMessage>>
: std::true_type
{};
template<typename T>
struct is_serialized_subscription : is_serialized_subscription_argument<T>
{};
@@ -75,6 +86,7 @@ struct extract_message_type<std::unique_ptr<MessageT, Deleter>>: extract_message
template<
typename CallbackT,
typename AllocatorT = std::allocator<void>,
// Do not attempt if CallbackT is an integer (mistaken for depth)
typename = std::enable_if_t<!std::is_integral<
std::remove_cv_t<std::remove_reference_t<CallbackT>>>::value>,
@@ -85,6 +97,10 @@ template<
// Do not attempt if CallbackT is a rmw_qos_profile_t (mistaken for qos profile)
typename = std::enable_if_t<!std::is_same<
rmw_qos_profile_t,
std::remove_cv_t<std::remove_reference_t<CallbackT>>>::value>,
// Do not attempt if CallbackT is a rclcpp::SubscriptionOptionsWithAllocator
typename = std::enable_if_t<!std::is_same<
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>,
std::remove_cv_t<std::remove_reference_t<CallbackT>>>::value>
>
struct has_message_type : extract_message_type<

View File

@@ -165,18 +165,19 @@ public:
}
if (mask.include_intra_process_waitable) {
auto local_subscription = inner_subscription;
auto waitable = inner_subscription->get_intra_process_waitable();
if (nullptr != waitable) {
bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state(
waitable.get(),
true);
if (already_in_use) {
throw std::runtime_error(
"subscription intra-process waitable already associated with a wait set");
for (auto waitable : inner_subscription->get_intra_process_waitables()) {
if (nullptr != waitable) {
bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state(
waitable.get(),
true);
if (already_in_use) {
throw std::runtime_error(
"subscription intra-process waitable already associated with a wait set");
}
this->storage_add_waitable(
std::move(waitable),
std::move(local_subscription));
}
this->storage_add_waitable(
std::move(inner_subscription->get_intra_process_waitable()),
std::move(local_subscription));
}
}
});
@@ -230,11 +231,12 @@ public:
}
}
if (mask.include_intra_process_waitable) {
auto local_waitable = inner_subscription->get_intra_process_waitable();
inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false);
if (nullptr != local_waitable) {
// This is the case when intra process is disabled for the subscription.
this->storage_remove_waitable(std::move(local_waitable));
for (auto local_waitable : inner_subscription->get_intra_process_waitables()) {
inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false);
if (nullptr != local_waitable) {
// This is the case when intra process is disabled for the subscription.
this->storage_remove_waitable(std::move(local_waitable));
}
}
}
});

View File

@@ -32,7 +32,8 @@ IntraProcessManager::~IntraProcessManager()
{}
uint64_t
IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
IntraProcessManager::add_publisher(
rclcpp::PublisherBase::SharedPtr publisher, bool is_serialized)
{
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
@@ -41,6 +42,7 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
publishers_[id].publisher = publisher;
publishers_[id].topic_name = publisher->get_topic_name();
publishers_[id].qos = publisher->get_actual_qos().get_rmw_qos_profile();
publishers_[id].is_serialized = is_serialized;
// Initialize the subscriptions storage for this publisher.
pub_to_subs_[id] = SplittedSubscriptions();
@@ -66,6 +68,7 @@ IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr su
subscriptions_[id].topic_name = subscription->get_topic_name();
subscriptions_[id].qos = subscription->get_actual_qos();
subscriptions_[id].use_take_shared_method = subscription->use_take_shared_method();
subscriptions_[id].is_serialized = subscription->is_serialized();
// adds the subscription id to all the matchable publishers
for (auto & pair : publishers_) {
@@ -220,6 +223,13 @@ IntraProcessManager::can_communicate(
return false;
}
// a publisher and a subscription with different content type can't communicate
// if is_serialized is true, the expected message typ is rcl_serialized_message_t
// otherwise the templated ROS2 message type
if (sub_info.is_serialized != pub_info.is_serialized) {
return false;
}
return true;
}

View File

@@ -99,10 +99,12 @@ NodeTopics::add_subscription(
callback_group->add_waitable(subscription_event);
}
auto intra_process_waitable = subscription->get_intra_process_waitable();
if (nullptr != intra_process_waitable) {
// Add to the callback group to be notified about intra-process msgs.
callback_group->add_waitable(intra_process_waitable);
const auto intra_process_waitables = subscription->get_intra_process_waitables();
for (auto & intra_process_waitable : intra_process_waitables) {
if (nullptr != intra_process_waitable) {
// Add to the callback group to be notified about intra-process msgs.
callback_group->add_waitable(intra_process_waitable);
}
}
// Notify the executor that a new subscription was created using the parent Node.

View File

@@ -42,9 +42,12 @@ PublisherBase::PublisherBase(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rosidl_message_type_support_t & type_support,
const rcl_publisher_options_t & publisher_options)
const rcl_publisher_options_t & publisher_options,
bool is_serialized)
: rcl_node_handle_(node_base->get_shared_rcl_node_handle()),
intra_process_is_enabled_(false), intra_process_publisher_id_(0)
intra_process_is_enabled_(false),
intra_process_publisher_id_(0),
is_serialized_(is_serialized)
{
rcl_ret_t ret = rcl_publisher_init(
&publisher_handle_,
@@ -238,13 +241,21 @@ PublisherBase::operator==(const rmw_gid_t * gid) const
void
PublisherBase::setup_intra_process(
uint64_t intra_process_publisher_id,
uint64_t intra_process_publisher_id_serialized,
IntraProcessManagerSharedPtr ipm)
{
intra_process_publisher_id_ = intra_process_publisher_id;
intra_process_publisher_id_serialized_ = intra_process_publisher_id_serialized;
weak_ipm_ = ipm;
intra_process_is_enabled_ = true;
}
bool
PublisherBase::is_serialized() const
{
return is_serialized_;
}
void
PublisherBase::default_incompatible_qos_callback(QOSOfferedIncompatibleQoSInfo & event) const
{

View File

@@ -0,0 +1,66 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "rclcpp/serialization.hpp"
#include <memory>
#include <string>
#include "rclcpp/exceptions.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rcpputils/asserts.hpp"
#include "rmw/rmw.h"
namespace rclcpp
{
Serialization::Serialization(const rosidl_message_type_support_t & type_support)
: type_support_(type_support)
{}
void Serialization::serialize_message(
const void * ros_message, SerializedMessage * serialized_message) const
{
rcpputils::check_true(ros_message != nullptr, "ROS message is nullpointer.");
rcpputils::check_true(serialized_message != nullptr, "Serialized message is nullpointer.");
const auto ret = rmw_serialize(
ros_message,
&type_support_,
serialized_message);
if (ret != RMW_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to serialize ROS message.");
}
}
void
Serialization::deserialize_message(
const SerializedMessage * serialized_message, void * ros_message) const
{
rcpputils::check_true(serialized_message != nullptr, "Serialized message is nullpointer.");
rcpputils::check_true(
serialized_message->buffer_capacity != 0 &&
serialized_message->buffer_length != 0 &&
serialized_message->buffer != nullptr, "Serialized message is wrongly initialized.");
rcpputils::check_true(ros_message != nullptr, "ROS message is a nullpointer.");
const auto ret = rmw_deserialize(serialized_message, &type_support_, ros_message);
if (ret != RMW_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to deserialize ROS message.");
}
}
} // namespace rclcpp

View File

@@ -0,0 +1,135 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "rclcpp/serialized_message.hpp"
#include <cstring>
#include "rclcpp/exceptions.hpp"
#include "rclcpp/logging.hpp"
#include "rmw/types.h"
namespace rclcpp
{
/// Object oriented version of rcl_serialized_message_t with destructor to avoid memory leaks
SerializedMessage::SerializedMessage(const rcl_allocator_t & allocator)
: SerializedMessage(0u, allocator)
{}
SerializedMessage::SerializedMessage(
size_t initial_capacity, const rcl_allocator_t & allocator)
: rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message())
{
const auto ret = rmw_serialized_message_init(
this, initial_capacity, &allocator);
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
}
SerializedMessage::SerializedMessage(const SerializedMessage & serialized_message)
: SerializedMessage(static_cast<const rcl_serialized_message_t &>(serialized_message))
{}
SerializedMessage::SerializedMessage(const rcl_serialized_message_t & serialized_message)
: rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message())
{
fprintf(stderr, "copy constructor called\n");
const auto ret = rmw_serialized_message_init(
this, serialized_message.buffer_capacity, &serialized_message.allocator);
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
// do not call memcpy if the pointer is "static"
if (buffer != serialized_message.buffer) {
std::memcpy(buffer, serialized_message.buffer, serialized_message.buffer_length);
}
buffer_length = serialized_message.buffer_length;
}
SerializedMessage::SerializedMessage(SerializedMessage && serialized_message)
: SerializedMessage(
std::forward<rcl_serialized_message_t>(
static_cast<rcl_serialized_message_t &&>(serialized_message)))
{}
SerializedMessage::SerializedMessage(rcl_serialized_message_t && serialized_message)
: rcl_serialized_message_t(serialized_message)
{
// reset buffer to prevent double free
serialized_message = rmw_get_zero_initialized_serialized_message();
}
SerializedMessage & SerializedMessage::operator=(const SerializedMessage & other)
{
*this = static_cast<const rcl_serialized_message_t &>(other);
return *this;
}
SerializedMessage & SerializedMessage::operator=(const rcl_serialized_message_t & other)
{
*this = static_cast<SerializedMessage>(rmw_get_zero_initialized_serialized_message());
const auto ret = rmw_serialized_message_init(
this, other.buffer_capacity, &other.allocator);
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
// do not call memcpy if the pointer is "static"
if (buffer != other.buffer) {
std::memcpy(buffer, other.buffer, other.buffer_length);
}
buffer_length = other.buffer_length;
return *this;
}
SerializedMessage & SerializedMessage::operator=(SerializedMessage && other)
{
*this = static_cast<rcl_serialized_message_t &&>(other);
return *this;
}
SerializedMessage & SerializedMessage::operator=(rcl_serialized_message_t && other)
{
this->buffer = other.buffer;
this->buffer_capacity = other.buffer_length;
this->buffer_length = other.buffer_length;
this->allocator = other.allocator;
// reset original to prevent double free
other = rmw_get_zero_initialized_serialized_message();
return *this;
}
SerializedMessage::~SerializedMessage()
{
if (nullptr != buffer) {
const auto fini_ret = rmw_serialized_message_fini(this);
if (fini_ret != RCL_RET_OK) {
RCLCPP_ERROR(
get_logger("rclcpp"),
"Failed to destroy serialized message: %s", rcl_get_error_string().str);
}
}
}
} // namespace rclcpp

View File

@@ -39,7 +39,6 @@ SubscriptionBase::SubscriptionBase(
: node_base_(node_base),
node_handle_(node_base_->get_shared_rcl_node_handle()),
use_intra_process_(false),
intra_process_subscription_id_(0),
type_support_(type_support_handle),
is_serialized_(is_serialized)
{
@@ -93,7 +92,9 @@ SubscriptionBase::~SubscriptionBase()
"Intra process manager died before than a subscription.");
return;
}
ipm->remove_subscription(intra_process_subscription_id_);
for (auto intra_process_subscription_id : intra_process_subscription_ids_) {
ipm->remove_subscription(intra_process_subscription_id);
}
}
const char *
@@ -204,10 +205,10 @@ SubscriptionBase::get_publisher_count() const
void
SubscriptionBase::setup_intra_process(
uint64_t intra_process_subscription_id,
const std::vector<uint64_t> & intra_process_subscription_ids,
IntraProcessManagerWeakPtr weak_ipm)
{
intra_process_subscription_id_ = intra_process_subscription_id;
intra_process_subscription_ids_ = intra_process_subscription_ids;
weak_ipm_ = weak_ipm;
use_intra_process_ = true;
}
@@ -218,23 +219,29 @@ SubscriptionBase::can_loan_messages() const
return rcl_subscription_can_loan_messages(subscription_handle_.get());
}
rclcpp::Waitable::SharedPtr
SubscriptionBase::get_intra_process_waitable() const
std::vector<rclcpp::Waitable::SharedPtr>
SubscriptionBase::get_intra_process_waitables() const
{
// If not using intra process, shortcut to nullptr.
if (!use_intra_process_) {
return nullptr;
return std::vector<rclcpp::Waitable::SharedPtr>();
}
// Get the intra process manager.
auto ipm = weak_ipm_.lock();
if (!ipm) {
throw std::runtime_error(
"SubscriptionBase::get_intra_process_waitable() called "
"SubscriptionBase::get_intra_process_waitables() called "
"after destruction of intra process manager");
}
// Use the id to retrieve the subscription intra-process from the intra-process manager.
return ipm->get_subscription_intra_process(intra_process_subscription_id_);
std::vector<rclcpp::Waitable::SharedPtr> waitables(intra_process_subscription_ids_.size());
for (auto i = 0u; i < intra_process_subscription_ids_.size(); ++i) {
// Use the id to retrieve the subscription intra-process from the intra-process manager.
waitables[i] = ipm->get_subscription_intra_process(intra_process_subscription_ids_[i]);
}
return waitables;
}
void
@@ -274,8 +281,10 @@ SubscriptionBase::exchange_in_use_by_wait_set_state(
if (this == pointer_to_subscription_part) {
return subscription_in_use_by_wait_set_.exchange(in_use_state);
}
if (get_intra_process_waitable().get() == pointer_to_subscription_part) {
return intra_process_subscription_waitable_in_use_by_wait_set_.exchange(in_use_state);
for (auto & waitable : get_intra_process_waitables()) {
if (waitable.get() == pointer_to_subscription_part) {
return intra_process_subscription_waitable_in_use_by_wait_set_.exchange(in_use_state);
}
}
for (const auto & qos_event : event_handlers_) {
if (qos_event.get() == pointer_to_subscription_part) {

View File

@@ -36,3 +36,9 @@ SubscriptionIntraProcessBase::get_actual_qos() const
{
return qos_profile_;
}
bool
SubscriptionIntraProcessBase::is_serialized() const
{
return is_serialized_;
}

View File

@@ -0,0 +1,388 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include <iostream>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "rclcpp/exceptions.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/rclcpp.hpp"
#include "rclcpp/serialization.hpp"
#include "rclcpp/serialized_message.hpp"
#include "test_msgs/message_fixtures.hpp"
int32_t & get_test_allocation_counter()
{
static int32_t counter = 0;
return counter;
}
void * custom_allocate(size_t size, void * state)
{
fprintf(stderr, "calling custom allocate\n");
static auto m_allocator = rcutils_get_default_allocator();
++get_test_allocation_counter();
auto r = m_allocator.allocate(size, state);
return r;
}
void * custom_zero_allocate(size_t number_of_elements, size_t size_of_element, void * state)
{
fprintf(stderr, "calling custom zero allocate\n");
static auto m_allocator = rcutils_get_default_allocator();
++get_test_allocation_counter();
auto r = m_allocator.zero_allocate(number_of_elements, size_of_element, state);
return r;
}
void * custom_reallocate(void * pointer, size_t size, void * state)
{
fprintf(stderr, "calling custom reallocate\n");
static auto m_allocator = rcutils_get_default_allocator();
if (pointer == nullptr) {
++get_test_allocation_counter();
}
auto r = m_allocator.reallocate(pointer, size, state);
return r;
}
void custom_deallocate(void * pointer, void * state)
{
fprintf(stderr, "calling custom deallocate\n");
static auto m_allocator = rcutils_get_default_allocator();
--get_test_allocation_counter();
m_allocator.deallocate(pointer, state);
}
rclcpp::SerializedMessage make_serialized_string_msg(
const std::shared_ptr<test_msgs::msg::Strings> & string_msg)
{
auto m_allocator = rcutils_get_default_allocator();
// add custom (de)allocator to count the references to the object
m_allocator.allocate = &custom_allocate;
m_allocator.deallocate = &custom_deallocate;
m_allocator.reallocate = &custom_reallocate;
m_allocator.zero_allocate = &custom_zero_allocate;
rclcpp::SerializedMessage msg(m_allocator);
static auto type_support =
rosidl_typesupport_cpp::get_message_type_support_handle<test_msgs::msg::Strings>();
rclcpp::Serialization serializer(*type_support);
EXPECT_NO_THROW(serializer.serialize_message(string_msg.get(), &msg));
return msg;
}
/**
* Parameterized test.
* The first param are the NodeOptions used to create the nodes.
* The second param are the expected intraprocess count results.
*/
struct TestParameters
{
rclcpp::NodeOptions node_options[2];
uint64_t intraprocess_count_results[2];
size_t runs;
std::string description;
};
std::ostream & operator<<(std::ostream & out, const TestParameters & params)
{
out << params.description;
return out;
}
auto get_new_context()
{
auto context = rclcpp::Context::make_shared();
context->init(0, nullptr);
return context;
}
class TestPublisherSubscriptionSerialized : public ::testing::Test
{
public:
static void SetUpTestCase()
{
rclcpp::init(0, nullptr);
}
static void TearDownTestCase()
{
rclcpp::shutdown();
}
protected:
std::chrono::milliseconds offset_ = std::chrono::milliseconds(250);
std::vector<TestParameters> parameters_ = {
/*
Testing publisher subscription count api and internal process subscription count.
Two subscriptions in the same topic, both using intraprocess comm.
*/
{
{
rclcpp::NodeOptions().use_intra_process_comms(false).start_parameter_services(false).enable_rosout(false),
rclcpp::NodeOptions().use_intra_process_comms(false).start_parameter_services(false).enable_rosout(false)
},
{1u, 2u},
1,
"two_subscriptions_intraprocess_comm"
},
{
{
rclcpp::NodeOptions().use_intra_process_comms(true).start_parameter_services(false).enable_rosout(false),
rclcpp::NodeOptions().use_intra_process_comms(true).start_parameter_services(false).enable_rosout(false)
},
{1u, 2u},
1,
"two_subscriptions_intraprocess_comm"
},
///*
// Testing publisher subscription count api and internal process subscription count.
// Two subscriptions, one using intra-process comm and the other not using it.
// */
//{
// {
// rclcpp::NodeOptions().use_intra_process_comms(true),
// rclcpp::NodeOptions().use_intra_process_comms(false)
// },
// {1u, 1u},
// 1,
// "two_subscriptions_one_intraprocess_one_not"
//},
///*
// Testing publisher subscription count api and internal process subscription count.
// Two contexts, both using intra-process.
// */
//{
// {
// rclcpp::NodeOptions().use_intra_process_comms(true),
// rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(true)
// },
// {1u, 1u},
// 1,
// "two_subscriptions_in_two_contexts_with_intraprocess_comm"
//},
///*
// Testing publisher subscription count api and internal process subscription count.
// Two contexts, both of them not using intra-process comm.
// */
//{
// {
// rclcpp::NodeOptions().use_intra_process_comms(false),
// rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(false)
// },
// {0u, 0u},
// 1,
// "two_subscriptions_in_two_contexts_without_intraprocess_comm"
//}
};
};
std::array<uint32_t, 2> counts;
void OnMessageSerialized(const std::shared_ptr<const rcl_serialized_message_t> msg)
{
EXPECT_NE(msg->buffer, nullptr);
EXPECT_GT(msg->buffer_capacity, 0u);
static auto type_support =
rosidl_typesupport_cpp::get_message_type_support_handle<test_msgs::msg::Strings>();
test_msgs::msg::Strings ros_msg;
rclcpp::Serialization serializer(*type_support);
EXPECT_NO_THROW(
serializer.deserialize_message(
static_cast<const rclcpp::SerializedMessage *>(msg.get()), &ros_msg));
EXPECT_EQ(ros_msg.string_value.front(), '0');
EXPECT_EQ(ros_msg.string_value[6], '6');
EXPECT_EQ(ros_msg.string_value.back(), '9');
++counts[0];
}
void OnMessageConst(std::shared_ptr<const test_msgs::msg::Strings> msg)
{
EXPECT_EQ(msg->string_value[6], '6');
++counts[1];
}
void OnMessageUniquePtr(std::unique_ptr<test_msgs::msg::Strings> msg)
{
EXPECT_EQ(msg->string_value.back(), '9');
++counts[1];
}
void OnMessage(std::shared_ptr<test_msgs::msg::Strings> msg)
{
EXPECT_EQ(msg->string_value.front(), '0');
EXPECT_EQ(msg->string_value[6], '6');
EXPECT_EQ(msg->string_value.back(), '\0');
++counts[1];
}
TEST_F(TestPublisherSubscriptionSerialized, publish_serialized)
{
get_test_allocation_counter() = 0;
for (const auto & parameter : parameters_) {
rclcpp::Node::SharedPtr node = std::make_shared<rclcpp::Node>(
"my_node",
"/ns",
parameter.node_options[0]);
auto publisher = node->create_publisher<test_msgs::msg::Strings>("/topic", 10);
auto sub_shared = node->create_subscription<test_msgs::msg::Strings>(
"/topic", 10,
&OnMessage);
//auto sub_unique = node->create_subscription<test_msgs::msg::Strings>(
// "/topic", 10,
// &OnMessageUniquePtr);
//auto sub_const_shared = node->create_subscription<test_msgs::msg::Strings>(
// "/topic", 10,
// &OnMessageConst);
auto sub_serialized = node->create_subscription<test_msgs::msg::Strings>(
"/topic", 10,
&OnMessageSerialized);
rclcpp::sleep_for(offset_);
counts.fill(0);
std::shared_ptr<test_msgs::msg::Strings> string_msg = get_messages_strings()[3];
for (size_t i = 0; i < parameter.runs; i++) {
auto msg0 = make_serialized_string_msg(string_msg);
auto unique_string_msg = std::make_unique<test_msgs::msg::Strings>(*string_msg);
{
auto unique_serialized_msg = std::make_unique<rclcpp::SerializedMessage>(std::move(msg0));
publisher->publish(std::move(unique_serialized_msg));
}
//publisher->publish(*string_msg);
//publisher->publish(std::move(unique_string_msg));
}
for (uint32_t i = 0; i < 3; ++i) {
rclcpp::spin_some(node);
rclcpp::sleep_for(offset_);
}
rclcpp::spin_some(node);
if (parameter.runs == 1) {
EXPECT_EQ(counts[0], 3u); // count serialized message callbacks
EXPECT_EQ(counts[1], 9u); // count unique + shared message callbacks
}
}
EXPECT_EQ(get_test_allocation_counter(), 0);
}
//TEST_P(TestPublisherSubscriptionSerialized, publish_serialized_generic)
//{
// get_test_allocation_counter() = 0;
//
// TestParameters parameters = GetParam();
// {
// rclcpp::Node::SharedPtr node = std::make_shared<rclcpp::Node>(
// "my_node",
// "/ns",
// parameters.node_options[0]);
// auto publisher = rclcpp::create_publisher<rcl_serialized_message_t>(
// node,
// "/topic",
// *rosidl_typesupport_cpp::get_message_type_support_handle<test_msgs::msg::Strings>(),
// rclcpp::QoS(10));
//
// auto sub_gen_serialized = rclcpp::create_subscription<rcl_serialized_message_t>(
// node,
// "/topic",
// *rosidl_typesupport_cpp::get_message_type_support_handle<test_msgs::msg::Strings>(),
// rclcpp::QoS(10),
// &OnMessageSerialized);
//
// auto sub_shared = node->create_subscription<test_msgs::msg::Strings>(
// "/topic", 10,
// &OnMessage);
// auto sub_unique = node->create_subscription<test_msgs::msg::Strings>(
// "/topic", 10,
// &OnMessageUniquePtr);
// auto sub_const_shared = node->create_subscription<test_msgs::msg::Strings>(
// "/topic", 10,
// &OnMessageConst);
// auto sub_serialized = node->create_subscription<test_msgs::msg::Strings>(
// "/topic", 10,
// &OnMessageSerialized);
//
// rclcpp::sleep_for(offset);
//
// counts.fill(0);
// auto stringMsg = get_messages_strings()[3];
//
// for (size_t i = 0; i < parameters.runs; i++) {
// auto msg0 = make_serialized_string_msg(stringMsg);
//
// publisher->publish(std::make_unique<rcl_serialized_message_t>(msg0));
// }
// rclcpp::spin_some(node);
// rclcpp::sleep_for(offset);
//
// rclcpp::spin_some(node);
// }
//
// if (parameters.runs == 1) {
// EXPECT_EQ(counts[0], 2u);
// EXPECT_EQ(counts[1], 3u);
// }
//
// EXPECT_EQ(get_test_allocation_counter(), 0);
//}
std::vector<TestParameters> setRuns(const std::vector<TestParameters> & in, const size_t runs)
{
std::vector<TestParameters> out = in;
for (auto & p : out) {
p.runs = runs;
}
return out;
}
//INSTANTIATE_TEST_CASE_P(
// TestWithDifferentNodeOptions, TestPublisherSubscriptionSerialized,
// ::testing::ValuesIn(parameters),
// ::testing::PrintToStringParamName());
//INSTANTIATE_TEST_CASE_P(
// TestWithDifferentNodeOptions1000Runs, TestPublisherSubscriptionSerialized,
// ::testing::ValuesIn(setRuns(parameters, 1000)),
// ::testing::PrintToStringParamName());

View File

@@ -93,6 +93,12 @@ public:
return false;
}
bool
is_serialized() const
{
return false;
}
rclcpp::QoS qos;
std::string topic_name;
uint64_t intra_process_publisher_id_;
@@ -212,6 +218,12 @@ public:
return topic_name;
}
bool
is_serialized() const
{
return false;
}
rmw_qos_profile_t qos_profile;
const char * topic_name;
};

View File

@@ -20,6 +20,7 @@
#include "rclcpp/exceptions.hpp"
#include "rclcpp/rclcpp.hpp"
#include "rclcpp/serialized_message.hpp"
#include "test_msgs/msg/empty.hpp"
@@ -143,6 +144,34 @@ TEST_F(TestPublisher, various_creation_signatures) {
}
}
/*
Testing for serialized publishers.
*/
TEST_F(TestPublisher, test_is_serialized) {
initialize();
using test_msgs::msg::Empty;
{
auto publisher = node->create_publisher<Empty>("topic", 42);
EXPECT_FALSE(publisher->is_serialized());
}
{
auto publisher = rclcpp::create_publisher<Empty>(node, "topic", 42, rclcpp::PublisherOptions());
EXPECT_FALSE(publisher->is_serialized());
}
{
auto ts = *rosidl_typesupport_cpp::get_message_type_support_handle<Empty>();
auto publisher = rclcpp::create_publisher<rcl_serialized_message_t>(
node, "topic", ts, 42, rclcpp::PublisherOptions());
EXPECT_TRUE(publisher->is_serialized());
}
{
auto ts = *rosidl_typesupport_cpp::get_message_type_support_handle<Empty>();
auto publisher = rclcpp::create_publisher<rclcpp::SerializedMessage>(
node, "topic", ts, 42, rclcpp::PublisherOptions());
EXPECT_TRUE(publisher->is_serialized());
}
}
/*
Testing publisher with intraprocess enabled and invalid QoS
*/

View File

@@ -0,0 +1,137 @@
// Copyright 2019 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include <memory>
#include "rclcpp/serialization.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/rclcpp.hpp"
#include "rcpputils/asserts.hpp"
#include "test_msgs/message_fixtures.hpp"
#include "test_msgs/msg/basic_types.hpp"
TEST(TestSerializedMessage, empty_initialize) {
rclcpp::SerializedMessage serialized_message;
EXPECT_TRUE(serialized_message.buffer == nullptr);
EXPECT_EQ(0u, serialized_message.buffer_length);
EXPECT_EQ(0u, serialized_message.buffer_capacity);
}
TEST(TestSerializedMessage, initialize_with_capacity) {
rclcpp::SerializedMessage serialized_message(13);
EXPECT_TRUE(serialized_message.buffer != nullptr);
EXPECT_EQ(0u, serialized_message.buffer_length);
EXPECT_EQ(13u, serialized_message.buffer_capacity);
}
TEST(TestSerializedMessage, various_constructors) {
std::string content = "Hello World";
auto content_size = content.size() + 1; // accounting for null terminator
rclcpp::SerializedMessage serialized_message(content_size);
// manually copy some content
std::memcpy(serialized_message.buffer, content.c_str(), content.size());
serialized_message.buffer[content.size()] = '\0';
serialized_message.buffer_length = content_size;
EXPECT_STREQ(content.c_str(), reinterpret_cast<char *>(serialized_message.buffer));
EXPECT_EQ(content_size, serialized_message.buffer_capacity);
// Copy Constructor
rclcpp::SerializedMessage other_serialized_message(serialized_message);
EXPECT_EQ(content_size, other_serialized_message.buffer_capacity);
EXPECT_EQ(content_size, other_serialized_message.buffer_length);
EXPECT_STREQ(
reinterpret_cast<char *>(serialized_message.buffer),
reinterpret_cast<char *>(other_serialized_message.buffer));
// Move Constructor
rclcpp::SerializedMessage yet_another_serialized_message(std::move(other_serialized_message));
EXPECT_TRUE(other_serialized_message.buffer == nullptr);
EXPECT_EQ(0u, other_serialized_message.buffer_capacity);
EXPECT_EQ(0u, other_serialized_message.buffer_length);
auto default_allocator = rcl_get_default_allocator();
auto rcl_serialized_msg = rmw_get_zero_initialized_serialized_message();
auto ret = rmw_serialized_message_init(&rcl_serialized_msg, 13, &default_allocator);
ASSERT_EQ(RCL_RET_OK, ret);
// manually copy some content
std::memcpy(rcl_serialized_msg.buffer, content.c_str(), content.size());
rcl_serialized_msg.buffer[content.size()] = '\0';
rcl_serialized_msg.buffer_length = content_size;
EXPECT_EQ(13u, rcl_serialized_msg.buffer_capacity);
// Copy Constructor from rcl_serialized_message_t
rclcpp::SerializedMessage from_rcl_msg(rcl_serialized_msg);
EXPECT_EQ(13u, from_rcl_msg.buffer_capacity);
EXPECT_EQ(content_size, from_rcl_msg.buffer_length);
// Verify that despite being fini'd, the copy is real
ret = rmw_serialized_message_fini(&rcl_serialized_msg);
ASSERT_EQ(RCL_RET_OK, ret);
EXPECT_EQ(nullptr, rcl_serialized_msg.buffer);
EXPECT_EQ(0u, rcl_serialized_msg.buffer_capacity);
EXPECT_EQ(0u, rcl_serialized_msg.buffer_length);
EXPECT_TRUE(nullptr != from_rcl_msg.buffer);
EXPECT_EQ(13u, from_rcl_msg.buffer_capacity);
EXPECT_EQ(content_size, from_rcl_msg.buffer_length);
}
TEST(TestSerializedMessage, serialization) {
auto type_support =
rosidl_typesupport_cpp::get_message_type_support_handle<test_msgs::msg::BasicTypes>();
rclcpp::Serialization serializer(*type_support);
auto basic_type_ros_msgs = get_messages_basic_types();
for (const auto & ros_msg : basic_type_ros_msgs) {
// convert ros msg to serialized msg
rclcpp::SerializedMessage serialized_msg;
serializer.serialize_message(ros_msg.get(), &serialized_msg);
// convert serialized msg back to ros msg
test_msgs::msg::BasicTypes deserialized_ros_msg;
serializer.deserialize_message(&serialized_msg, &deserialized_ros_msg);
EXPECT_EQ(*ros_msg, deserialized_ros_msg);
}
}
TEST(TestSerializedMessage, serialization_into_nullptr) {
auto type_support =
rosidl_typesupport_cpp::get_message_type_support_handle<test_msgs::msg::BasicTypes>();
rclcpp::Serialization serializer(*type_support);
auto basic_type_ros_msgs = get_messages_basic_types();
for (const auto & ros_msg : basic_type_ros_msgs) {
rclcpp::SerializedMessage serialized_msg;
test_msgs::msg::BasicTypes deserialized_ros_msg;
EXPECT_THROW(
serializer.serialize_message(ros_msg.get(), nullptr),
rcpputils::IllegalStateException);
EXPECT_THROW(
serializer.serialize_message(nullptr, &serialized_msg),
rcpputils::IllegalStateException);
EXPECT_THROW(
serializer.deserialize_message(&serialized_msg, nullptr),
rcpputils::IllegalStateException);
EXPECT_THROW(
serializer.deserialize_message(nullptr, &deserialized_ros_msg),
rcpputils::IllegalStateException);
}
}

View File

@@ -231,6 +231,40 @@ TEST_F(TestSubscription, various_creation_signatures) {
}
}
/*
Testing for serialized subscriptions
*/
TEST_F(TestSubscription, test_is_serialized) {
initialize();
using test_msgs::msg::Empty;
auto cb = [](test_msgs::msg::Empty::SharedPtr) {};
auto cb_rcl_serialized = [](std::shared_ptr<rcl_serialized_message_t>) {};
auto cb_rclcpp_serialized = [](std::shared_ptr<rclcpp::SerializedMessage>) {};
{
auto sub = node->create_subscription<Empty>("topic", 1, cb);
EXPECT_FALSE(sub->is_serialized());
}
{
auto sub = rclcpp::create_subscription<Empty>(node, "topic", rclcpp::QoS(1), cb);
EXPECT_FALSE(sub->is_serialized());
}
{
auto sub = rclcpp::create_subscription<Empty>(node, "topic", rclcpp::QoS(1), cb_rcl_serialized);
EXPECT_TRUE(sub->is_serialized());
}
{
auto ts = *rosidl_typesupport_cpp::get_message_type_support_handle<Empty>();
auto sub = rclcpp::create_subscription<rcl_serialized_message_t>(
node, "topic", ts, rclcpp::QoS(1), cb_rcl_serialized);
EXPECT_TRUE(sub->is_serialized());
}
{
auto ts = *rosidl_typesupport_cpp::get_message_type_support_handle<Empty>();
auto sub = rclcpp::create_subscription<rclcpp::SerializedMessage>(
node, "topic", ts, rclcpp::QoS(1), cb_rclcpp_serialized);
EXPECT_TRUE(sub->is_serialized());
}
}
/*
Testing subscriptions using std::bind.
*/

View File

@@ -52,6 +52,16 @@ void not_serialized_unique_ptr_callback(
(void) unused;
}
void rclcpp_serialized_callback_copy(rclcpp::SerializedMessage unused)
{
(void) unused;
}
void rclcpp_serialized_callback_shared_ptr(std::shared_ptr<rclcpp::SerializedMessage> unused)
{
(void) unused;
}
TEST(TestSubscriptionTraits, is_serialized_callback) {
// Test regular functions
auto cb1 = &serialized_callback_copy;
@@ -97,6 +107,16 @@ TEST(TestSubscriptionTraits, is_serialized_callback) {
static_assert(
rclcpp::subscription_traits::is_serialized_callback<decltype(cb7)>::value == false,
"passing a fancy unique_ptr of test_msgs::msg::Empty is not a serialized callback");
auto cb8 = &rclcpp_serialized_callback_copy;
static_assert(
rclcpp::subscription_traits::is_serialized_callback<decltype(cb8)>::value == true,
"rclcpp::SerializedMessage in a first argument callback makes it a serialized callback");
auto cb9 = &rclcpp_serialized_callback_shared_ptr;
static_assert(
rclcpp::subscription_traits::is_serialized_callback<decltype(cb9)>::value == true,
"std::shared_ptr<rclcpp::SerializedMessage> in a callback makes it a serialized callback");
}
TEST(TestSubscriptionTraits, callback_messages) {

View File

@@ -68,6 +68,18 @@ public:
{
}
LifecyclePublisher(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<Alloc> & options,
const rosidl_message_type_support_t & type_support)
: rclcpp::Publisher<MessageT, Alloc>(node_base, topic, qos, options, type_support),
enabled_(false),
logger_(rclcpp::get_logger("LifecyclePublisher"))
{
}
~LifecyclePublisher() {}
/// LifecyclePublisher publish function
@@ -110,6 +122,30 @@ public:
rclcpp::Publisher<MessageT, Alloc>::publish(msg);
}
/// Publish a serialized message. Non specialized version to prevent compiling errors.
template<typename TDeleter, typename T>
void publish(std::unique_ptr<T, TDeleter> serialized_msg)
{
(void)serialized_msg;
throw std::runtime_error(
"publishing unique_ptr with custom deleter only supported for serialized messages");
}
/// Publish a serialized message.
template<typename TDeleter>
void publish(std::unique_ptr<rcl_serialized_message_t, TDeleter> serialized_msg)
{
if (!enabled_) {
RCLCPP_WARN(
logger_,
"Trying to publish message on the topic '%s', but the publisher is not activated",
this->get_topic_name());
return;
}
this->do_serialized_publish(*serialized_msg);
}
virtual void
on_activate()
{