Compare commits

..

1 Commits

Author SHA1 Message Date
Barry Xu
df2ac887ed Add support check for content filter feature in subscription (#3089)
Signed-off-by: Barry Xu <barry.xu@sony.com>
2026-03-17 10:49:53 +09:00
6 changed files with 79 additions and 83 deletions

View File

@@ -80,7 +80,7 @@ public:
node_base,
*rclcpp::get_message_typesupport_handle(topic_type, "rosidl_typesupport_cpp", *ts_lib),
topic_name,
force_cpu_buffer_backend_(options).to_rcl_subscription_options(qos),
options.to_rcl_subscription_options(qos),
options.event_callbacks,
options.use_default_callbacks,
DeliveredMessageKind::SERIALIZED_MESSAGE),
@@ -182,17 +182,6 @@ public:
private:
RCLCPP_DISABLE_COPY(GenericSubscription)
template<typename AllocatorT>
static rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>
force_cpu_buffer_backend_(
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options)
{
auto opts = options;
opts.acceptable_buffer_backends = "cpu";
return opts;
}
AnySubscriptionCallback<rclcpp::SerializedMessage, std::allocator<void>> any_callback_;
// The type support library should stay loaded, so it is stored in the GenericSubscription
std::shared_ptr<rcpputils::SharedLibrary> ts_lib_;

View File

@@ -551,6 +551,15 @@ public:
event_handlers_[event_type]->clear_on_ready_callback();
}
/// Check if content filtered topic feature of the subscription instance is supported.
/**
* \return boolean flag indicating if the content filtered topic of this subscription is
* supported.
*/
RCLCPP_PUBLIC
bool
is_cft_supported() const;
/// Check if content filtered topic feature of the subscription instance is enabled.
/**
* \return boolean flag indicating if the content filtered topic of this subscription is enabled.

View File

@@ -89,15 +89,6 @@ struct SubscriptionOptionsBase
QosOverridingOptions qos_overriding_options;
ContentFilterOptions content_filter_options;
/// Acceptable buffer backend names for this subscription.
/**
* Empty string or "cpu" means CPU-only (default for backward compatibility).
* "any" means all installed backends are acceptable.
* Comma-separated for specific backends, e.g. "cuda,demo".
* CPU is always implicitly acceptable regardless of this value.
*/
std::string acceptable_buffer_backends{"cpu"};
};
/// Structure containing optional configuration for Subscriptions.
@@ -154,11 +145,6 @@ struct SubscriptionOptionsWithAllocator : public SubscriptionOptionsBase
}
}
if (!acceptable_buffer_backends.empty()) {
result.rmw_subscription_options.acceptable_buffer_backends =
acceptable_buffer_backends.c_str();
}
return result;
}

View File

@@ -77,6 +77,10 @@ ParameterEventHandler::add_parameter_callback(
bool
ParameterEventHandler::configure_nodes_filter(const std::vector<std::string> & node_names)
{
if (!event_subscription_->is_cft_supported()) {
return false;
}
if (node_names.empty()) {
// Clear content filter
event_subscription_->set_content_filter("");

View File

@@ -480,6 +480,12 @@ SubscriptionBase::set_on_new_message_callback(
}
}
bool
SubscriptionBase::is_cft_supported() const
{
return rcl_subscription_is_cft_supported(subscription_handle_.get());
}
bool
SubscriptionBase::is_cft_enabled() const
{

View File

@@ -108,6 +108,21 @@ bool operator==(const test_msgs::msg::BasicTypes & m1, const test_msgs::msg::Bas
m1.uint64_value == m2.uint64_value;
}
TEST_F(TestContentFilterSubscription, is_cft_supported)
{
{
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_subscription_is_cft_supported, false);
EXPECT_FALSE(sub->is_cft_supported());
}
{
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_subscription_is_cft_supported, true);
EXPECT_TRUE(sub->is_cft_supported());
}
}
TEST_F(TestContentFilterSubscription, is_cft_enabled)
{
{
@@ -165,7 +180,7 @@ TEST_F(TestContentFilterSubscription, get_content_filter)
TEST_F(TestContentFilterSubscription, set_content_filter)
{
if (sub->is_cft_enabled()) {
if (sub->is_cft_supported()) {
EXPECT_NO_THROW(
sub->set_content_filter(filter_expression_init, expression_parameters_2));
} else {
@@ -178,8 +193,10 @@ TEST_F(TestContentFilterSubscription, set_content_filter)
TEST_F(TestContentFilterSubscription, content_filter_get_begin)
{
std::string rmw_implementation_str = std::string(rmw_get_implementation_identifier());
if (rmw_implementation_str == "rmw_zenoh_cpp") {
GTEST_SKIP();
// Run test only if rmw implementation supports content filter, otherwise skip it.
if (!sub->is_cft_supported()) {
GTEST_SKIP() << rmw_implementation_str << " doesn't support content filter";
}
using namespace std::chrono_literals;
@@ -206,28 +223,27 @@ TEST_F(TestContentFilterSubscription, content_filter_get_begin)
EXPECT_TRUE(receive);
EXPECT_EQ(original_message, output_message);
if (sub->is_cft_enabled()) {
EXPECT_NO_THROW(
sub->set_content_filter(filter_expression_init, expression_parameters_2));
// waiting to allow for filter propagation
std::this_thread::sleep_for(std::chrono::seconds(10));
EXPECT_NO_THROW(
sub->set_content_filter(filter_expression_init, expression_parameters_2));
// waiting to allow for filter propagation
std::this_thread::sleep_for(std::chrono::seconds(10));
test_msgs::msg::BasicTypes original_message;
original_message.int32_value = 3;
pub->publish(original_message);
test_msgs::msg::BasicTypes message_to_be_filtered;
message_to_be_filtered.int32_value = original_message.int32_value;
pub->publish(message_to_be_filtered);
test_msgs::msg::BasicTypes output_message;
bool receive = wait_for_message(output_message, sub, context, 10s);
EXPECT_FALSE(receive);
}
test_msgs::msg::BasicTypes output_message2;
receive = wait_for_message(output_message2, sub, context, 10s);
EXPECT_FALSE(receive);
}
}
TEST_F(TestContentFilterSubscription, content_filter_get_later)
{
std::string rmw_implementation_str = std::string(rmw_get_implementation_identifier());
if (rmw_implementation_str == "rmw_zenoh_cpp") {
GTEST_SKIP();
// Run test only if rmw implementation supports content filter, otherwise skip it.
if (!sub->is_cft_supported()) {
GTEST_SKIP() << rmw_implementation_str << " doesn't support content filter";
}
using namespace std::chrono_literals;
@@ -251,36 +267,30 @@ TEST_F(TestContentFilterSubscription, content_filter_get_later)
test_msgs::msg::BasicTypes output_message;
bool receive = wait_for_message(output_message, sub, context, 10s);
if (sub->is_cft_enabled()) {
EXPECT_FALSE(receive);
} else {
EXPECT_TRUE(receive);
EXPECT_EQ(original_message, output_message);
}
EXPECT_FALSE(receive);
if (sub->is_cft_enabled()) {
EXPECT_NO_THROW(
sub->set_content_filter(filter_expression_init, expression_parameters_2));
// waiting to allow for filter propagation
std::this_thread::sleep_for(std::chrono::seconds(10));
EXPECT_NO_THROW(
sub->set_content_filter(filter_expression_init, expression_parameters_2));
// waiting to allow for filter propagation
std::this_thread::sleep_for(std::chrono::seconds(10));
test_msgs::msg::BasicTypes original_message;
original_message.int32_value = 4;
pub->publish(original_message);
test_msgs::msg::BasicTypes matching_message;
matching_message.int32_value = original_message.int32_value;
pub->publish(matching_message);
test_msgs::msg::BasicTypes output_message;
bool receive = wait_for_message(output_message, sub, context, 10s);
EXPECT_TRUE(receive);
EXPECT_EQ(original_message, output_message);
}
test_msgs::msg::BasicTypes output_message2;
receive = wait_for_message(output_message2, sub, context, 10s);
EXPECT_TRUE(receive);
EXPECT_EQ(matching_message, output_message2);
}
}
TEST_F(TestContentFilterSubscription, content_filter_reset)
{
std::string rmw_implementation_str = std::string(rmw_get_implementation_identifier());
if (rmw_implementation_str == "rmw_zenoh_cpp") {
GTEST_SKIP();
// Run test only if rmw implementation supports content filter, otherwise skip it.
if (!sub->is_cft_supported()) {
GTEST_SKIP() << rmw_implementation_str << " doesn't support content filter";
}
using namespace std::chrono_literals;
@@ -304,28 +314,20 @@ TEST_F(TestContentFilterSubscription, content_filter_reset)
test_msgs::msg::BasicTypes output_message;
bool receive = wait_for_message(output_message, sub, context, 10s);
if (sub->is_cft_enabled()) {
EXPECT_FALSE(receive);
} else {
EXPECT_TRUE(receive);
EXPECT_EQ(original_message, output_message);
}
EXPECT_FALSE(receive);
if (sub->is_cft_enabled()) {
EXPECT_NO_THROW(
sub->set_content_filter(""));
// waiting to allow for filter propagation
std::this_thread::sleep_for(std::chrono::seconds(10));
EXPECT_NO_THROW(sub->set_content_filter(""));
// waiting to allow for filter propagation
std::this_thread::sleep_for(std::chrono::seconds(10));
test_msgs::msg::BasicTypes original_message;
original_message.int32_value = 4;
pub->publish(original_message);
test_msgs::msg::BasicTypes unfiltered_message;
unfiltered_message.int32_value = original_message.int32_value;
pub->publish(unfiltered_message);
test_msgs::msg::BasicTypes output_message;
bool receive = wait_for_message(output_message, sub, context, 10s);
EXPECT_TRUE(receive);
EXPECT_EQ(original_message, output_message);
}
test_msgs::msg::BasicTypes output_message2;
receive = wait_for_message(output_message2, sub, context, 10s);
EXPECT_TRUE(receive);
EXPECT_EQ(unfiltered_message, output_message2);
}
}