Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -729,13 +729,33 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
// Clear any previous wait result
this->wait_result_.reset();

// we need to make sure that callback groups don't get out of scope
// during the wait. As in jazzy, they are not covered by the DynamicStorage,
// we explicitly hold them here as a bugfix
Comment on lines +732 to +734
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would add TODO if we already have a plan to proper fix in the future not to miss, this does not block the PR.

std::vector<rclcpp::CallbackGroup::SharedPtr> cbgs;

{
std::lock_guard<std::mutex> guard(mutex_);

if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {
this->collect_entities();
}

auto callback_groups = this->collector_.get_all_callback_groups();
cbgs.resize(callback_groups.size());
for(const auto & w_ptr : callback_groups) {
auto shr_ptr = w_ptr.lock();
if(shr_ptr) {
cbgs.push_back(std::move(shr_ptr));
}
}
}

this->wait_result_.emplace(wait_set_.wait(timeout));

// drop references to the callback groups, before trying to execute anything
cbgs.clear();

if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) {
RCUTILS_LOG_WARN_NAMED(
"rclcpp",
Expand Down
19 changes: 19 additions & 0 deletions rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,29 @@ StaticSingleThreadedExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
std::optional<rclcpp::WaitResult<rclcpp::WaitSet>>
StaticSingleThreadedExecutor::collect_and_wait(std::chrono::nanoseconds timeout)
{
// we need to make sure that callback groups don't get out of scope
// during the wait. As in jazzy, they are not covered by the DynamicStorage,
// we explicitly hold them here as a bugfix
std::vector<rclcpp::CallbackGroup::SharedPtr> cbgs;

if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {
this->collect_entities();
}

auto callback_groups = this->collector_.get_all_callback_groups();
cbgs.resize(callback_groups.size());
for(const auto & w_ptr : callback_groups) {
auto shr_ptr = w_ptr.lock();
if(shr_ptr) {
cbgs.push_back(std::move(shr_ptr));
}
}

auto wait_result = wait_set_.wait(std::chrono::nanoseconds(timeout));

// drop references to the callback groups, before trying to execute anything
cbgs.clear();

if (wait_result.kind() == WaitResultKind::Empty) {
RCUTILS_LOG_WARN_NAMED(
"rclcpp",
Expand Down
39 changes: 39 additions & 0 deletions rclcpp/test/rclcpp/executors/test_executors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -843,3 +843,42 @@ TYPED_TEST(TestExecutors, release_ownership_entity_after_spinning_cancel)

EXPECT_EQ(server.use_count(), 1);
}

TYPED_TEST(TestExecutors, testRaceDropCallbackGroupFromSecondThread)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}

// Create an executor
auto executor = std::make_shared<ExecutorType>();
executor->add_node(this->node);

// Start spinning
auto executor_thread = std::thread(
[executor]() {
executor->spin();
});

// As the problem is a race, we do this multiple times,
// to raise our chances of hitting the problem
for(size_t i = 0; i < 10; i++) {
auto cg = this->node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
auto timer = this->node->create_timer(1s, [] {}, cg);
// sleep a bit, so that the spin thread can pick up the callback group
// and add it to the executor
std::this_thread::sleep_for(5ms);

// At this point the callbackgroup should be used within the waitset of the executor
// as we leave the scope, the reference to cg will be dropped.
// If the executor has a race, we will experience a segfault at this point.
}

executor->cancel();
executor_thread.join();
}