Compare commits

...

1 Commits

Author SHA1 Message Date
Tomoya Fujita
11e6ce2336 use weak_ptr for rcl entities in the memory strategy.
Signed-off-by: Tomoya Fujita <Tomoya.Fujita@sony.com>
2026-01-28 15:47:21 +09:00
2 changed files with 143 additions and 31 deletions

View File

@@ -99,49 +99,92 @@ public:
// Important to use subscription_handles_.size() instead of wait set's size since
// there may be more subscriptions in the wait set due to Waitables added to the end.
// The same logic applies for other entities.
// Mark corresponding weak_ptr as expired for entities that are null in the wait set
size_t valid_subscription_count = 0;
for (size_t i = 0; i < subscription_handles_.size(); ++i) {
if (!wait_set->subscriptions[i]) {
subscription_handles_[i].reset();
if (valid_subscription_count < wait_set->size_of_subscriptions &&
!wait_set->subscriptions[valid_subscription_count])
{
subscription_handles_[i] = std::weak_ptr<const rcl_subscription_t>{};
}
if (subscription_handles_[i].lock()) {
++valid_subscription_count;
}
}
size_t valid_service_count = 0;
for (size_t i = 0; i < service_handles_.size(); ++i) {
if (!wait_set->services[i]) {
service_handles_[i].reset();
if (valid_service_count < wait_set->size_of_services &&
!wait_set->services[valid_service_count])
{
service_handles_[i] = std::weak_ptr<const rcl_service_t>{};
}
if (service_handles_[i].lock()) {
++valid_service_count;
}
}
size_t valid_client_count = 0;
for (size_t i = 0; i < client_handles_.size(); ++i) {
if (!wait_set->clients[i]) {
client_handles_[i].reset();
if (valid_client_count < wait_set->size_of_clients &&
!wait_set->clients[valid_client_count])
{
client_handles_[i] = std::weak_ptr<const rcl_client_t>{};
}
if (client_handles_[i].lock()) {
++valid_client_count;
}
}
size_t valid_timer_count = 0;
for (size_t i = 0; i < timer_handles_.size(); ++i) {
if (!wait_set->timers[i]) {
timer_handles_[i].reset();
if (valid_timer_count < wait_set->size_of_timers &&
!wait_set->timers[valid_timer_count])
{
timer_handles_[i] = std::weak_ptr<const rcl_timer_t>{};
}
if (timer_handles_[i].lock()) {
++valid_timer_count;
}
}
for (size_t i = 0; i < waitable_handles_.size(); ++i) {
if (!waitable_handles_[i]->is_ready(*wait_set)) {
waitable_handles_[i].reset();
}
}
// Remove expired weak_ptr instances
subscription_handles_.erase(
std::remove(subscription_handles_.begin(), subscription_handles_.end(), nullptr),
std::remove_if(subscription_handles_.begin(), subscription_handles_.end(),
[](const std::weak_ptr<const rcl_subscription_t> & weak_ptr) {
return weak_ptr.expired();
}),
subscription_handles_.end()
);
service_handles_.erase(
std::remove(service_handles_.begin(), service_handles_.end(), nullptr),
std::remove_if(service_handles_.begin(), service_handles_.end(),
[](const std::weak_ptr<const rcl_service_t> & weak_ptr) {
return weak_ptr.expired();
}),
service_handles_.end()
);
client_handles_.erase(
std::remove(client_handles_.begin(), client_handles_.end(), nullptr),
std::remove_if(client_handles_.begin(), client_handles_.end(),
[](const std::weak_ptr<const rcl_client_t> & weak_ptr) {
return weak_ptr.expired();
}),
client_handles_.end()
);
timer_handles_.erase(
std::remove(timer_handles_.begin(), timer_handles_.end(), nullptr),
std::remove_if(timer_handles_.begin(), timer_handles_.end(),
[](const std::weak_ptr<const rcl_timer_t> & weak_ptr) {
return weak_ptr.expired();
}),
timer_handles_.end()
);
@@ -196,7 +239,13 @@ public:
bool add_handles_to_wait_set(rcl_wait_set_t * wait_set) override
{
for (const std::shared_ptr<const rcl_subscription_t> & subscription : subscription_handles_) {
for (const std::weak_ptr<const rcl_subscription_t> & weak_subscription :
subscription_handles_)
{
auto subscription = weak_subscription.lock();
if (!subscription) {
continue; // Skip expired handles
}
if (rcl_wait_set_add_subscription(wait_set, subscription.get(), NULL) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
@@ -206,7 +255,11 @@ public:
}
}
for (const std::shared_ptr<const rcl_client_t> & client : client_handles_) {
for (const std::weak_ptr<const rcl_client_t> & weak_client : client_handles_) {
auto client = weak_client.lock();
if (!client) {
continue; // Skip expired handles
}
if (rcl_wait_set_add_client(wait_set, client.get(), NULL) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
@@ -216,7 +269,11 @@ public:
}
}
for (const std::shared_ptr<const rcl_service_t> & service : service_handles_) {
for (const std::weak_ptr<const rcl_service_t> & weak_service : service_handles_) {
auto service = weak_service.lock();
if (!service) {
continue; // Skip expired handles
}
if (rcl_wait_set_add_service(wait_set, service.get(), NULL) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
@@ -226,7 +283,11 @@ public:
}
}
for (const std::shared_ptr<const rcl_timer_t> & timer : timer_handles_) {
for (const std::weak_ptr<const rcl_timer_t> & weak_timer : timer_handles_) {
auto timer = weak_timer.lock();
if (!timer) {
continue; // Skip expired handles
}
if (rcl_wait_set_add_timer(wait_set, timer.get(), NULL) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
@@ -253,7 +314,13 @@ public:
{
auto it = subscription_handles_.begin();
while (it != subscription_handles_.end()) {
auto subscription = get_subscription_by_handle(*it, weak_groups_to_nodes);
auto subscription_handle = it->lock();
if (!subscription_handle) {
// Handle expired, remove it and continue
it = subscription_handles_.erase(it);
continue;
}
auto subscription = get_subscription_by_handle(subscription_handle, weak_groups_to_nodes);
if (subscription) {
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_subscription(subscription, weak_groups_to_nodes);
@@ -288,7 +355,13 @@ public:
{
auto it = service_handles_.begin();
while (it != service_handles_.end()) {
auto service = get_service_by_handle(*it, weak_groups_to_nodes);
auto service_handle = it->lock();
if (!service_handle) {
// Handle expired, remove it and continue
it = service_handles_.erase(it);
continue;
}
auto service = get_service_by_handle(service_handle, weak_groups_to_nodes);
if (service) {
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_service(service, weak_groups_to_nodes);
@@ -323,7 +396,13 @@ public:
{
auto it = client_handles_.begin();
while (it != client_handles_.end()) {
auto client = get_client_by_handle(*it, weak_groups_to_nodes);
auto client_handle = it->lock();
if (!client_handle) {
// Handle expired, remove it and continue
it = client_handles_.erase(it);
continue;
}
auto client = get_client_by_handle(client_handle, weak_groups_to_nodes);
if (client) {
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_client(client, weak_groups_to_nodes);
@@ -358,7 +437,13 @@ public:
{
auto it = timer_handles_.begin();
while (it != timer_handles_.end()) {
auto timer = get_timer_by_handle(*it, weak_groups_to_nodes);
auto timer_handle = it->lock();
if (!timer_handle) {
// Handle expired, remove it and continue
it = timer_handles_.erase(it);
continue;
}
auto timer = get_timer_by_handle(timer_handle, weak_groups_to_nodes);
if (timer) {
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_timer(timer, weak_groups_to_nodes);
@@ -435,7 +520,13 @@ public:
size_t number_of_ready_subscriptions() const override
{
size_t number_of_subscriptions = subscription_handles_.size();
size_t number_of_subscriptions = 0;
// Count only non-expired weak_ptr references
for (const auto & weak_subscription : subscription_handles_) {
if (!weak_subscription.expired()) {
++number_of_subscriptions;
}
}
for (const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
number_of_subscriptions += waitable->get_number_of_ready_subscriptions();
}
@@ -444,7 +535,13 @@ public:
size_t number_of_ready_services() const override
{
size_t number_of_services = service_handles_.size();
size_t number_of_services = 0;
// Count only non-expired weak_ptr references
for (const auto & weak_service : service_handles_) {
if (!weak_service.expired()) {
++number_of_services;
}
}
for (const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
number_of_services += waitable->get_number_of_ready_services();
}
@@ -462,7 +559,13 @@ public:
size_t number_of_ready_clients() const override
{
size_t number_of_clients = client_handles_.size();
size_t number_of_clients = 0;
// Count only non-expired weak_ptr references
for (const auto & weak_client : client_handles_) {
if (!weak_client.expired()) {
++number_of_clients;
}
}
for (const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
number_of_clients += waitable->get_number_of_ready_clients();
}
@@ -480,7 +583,13 @@ public:
size_t number_of_ready_timers() const override
{
size_t number_of_timers = timer_handles_.size();
size_t number_of_timers = 0;
// Count only non-expired weak_ptr references
for (const auto & weak_timer : timer_handles_) {
if (!weak_timer.expired()) {
++number_of_timers;
}
}
for (const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
number_of_timers += waitable->get_number_of_ready_timers();
}
@@ -499,10 +608,10 @@ private:
VectorRebind<const rclcpp::GuardCondition *> guard_conditions_;
VectorRebind<std::shared_ptr<const rcl_subscription_t>> subscription_handles_;
VectorRebind<std::shared_ptr<const rcl_service_t>> service_handles_;
VectorRebind<std::shared_ptr<const rcl_client_t>> client_handles_;
VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_;
VectorRebind<std::weak_ptr<const rcl_subscription_t>> subscription_handles_;
VectorRebind<std::weak_ptr<const rcl_service_t>> service_handles_;
VectorRebind<std::weak_ptr<const rcl_client_t>> client_handles_;
VectorRebind<std::weak_ptr<const rcl_timer_t>> timer_handles_;
VectorRebind<std::shared_ptr<Waitable>> waitable_handles_;
std::shared_ptr<VoidAlloc> allocator_;

View File

@@ -808,7 +808,8 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_service_out_of_scope) {
});
allocator_memory_strategy()->collect_entities(weak_groups_to_nodes);
}
EXPECT_EQ(1u, allocator_memory_strategy()->number_of_ready_services());
// service is out of scope here, so should be cleaned up already.
EXPECT_EQ(0u, allocator_memory_strategy()->number_of_ready_services());
rclcpp::AnyExecutable result;
allocator_memory_strategy()->get_next_service(result, weak_groups_to_nodes);
@@ -843,7 +844,8 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_client_out_of_scope) {
allocator_memory_strategy()->collect_entities(weak_groups_to_nodes);
}
EXPECT_EQ(1u, allocator_memory_strategy()->number_of_ready_clients());
// client is out of scope here, so should be cleaned up already.
EXPECT_EQ(0u, allocator_memory_strategy()->number_of_ready_clients());
rclcpp::AnyExecutable result;
allocator_memory_strategy()->get_next_client(result, weak_groups_to_nodes);
@@ -871,7 +873,8 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_timer_out_of_scope) {
});
allocator_memory_strategy()->collect_entities(weak_groups_to_nodes);
}
EXPECT_EQ(1u, allocator_memory_strategy()->number_of_ready_timers());
// timer is out of scope here, so should be cleaned up already.
EXPECT_EQ(0u, allocator_memory_strategy()->number_of_ready_timers());
rclcpp::AnyExecutable result;
allocator_memory_strategy()->get_next_timer(result, weak_groups_to_nodes);