Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/ex/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ class thread_pool::impl
cv_.notify_one();
}

void
join() noexcept
{
stop();
for(auto& t : threads_)
if(t.joinable())
t.join();
}

void
stop() noexcept
{
Expand Down Expand Up @@ -160,6 +169,7 @@ class thread_pool::impl
thread_pool::
~thread_pool()
{
impl_->join();
shutdown();
destroy();
delete impl_;
Expand Down
83 changes: 83 additions & 0 deletions test/unit/when_all.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@

#include <boost/capy/ex/async_event.hpp>
#include <boost/capy/ex/run_async.hpp>
#include <boost/capy/ex/strand.hpp>
#include <boost/capy/ex/thread_pool.hpp>
#include <boost/capy/io_task.hpp>
#include <boost/capy/task.hpp>

#include "test_helpers.hpp"

#include <atomic>
#include <latch>
#include <stdexcept>
#include <string>
#include <vector>
Expand Down Expand Up @@ -1047,5 +1050,85 @@ TEST_SUITE(
when_all_io_awaitable_test,
"boost.capy.when_all_io_awaitable");

struct when_all_strand_test
{
// Regression for #131: executor_ref::dispatch() formerly
// returned void, discarding the symmetric transfer handle
// from strand::dispatch(). This caused when_all child
// runners to never resume, deadlocking the caller.
void
testStrandWhenAll()
{
thread_pool pool(2);
strand s{pool.get_executor()};
std::latch done(1);
bool completed = false;

auto outer = [&]() -> task<> {
co_await when_all(
[]() -> task<> { co_return; }(),
[]() -> task<> { co_return; }()
);
};

run_async(s,
[&](auto&&...) {
completed = true;
done.count_down();
},
[&](auto) {
done.count_down();
}
)(outer());

done.wait();
BOOST_TEST(completed);
}

// Verify strand + when_all propagates values correctly
void
testStrandWhenAllWithValues()
{
thread_pool pool(2);
strand s{pool.get_executor()};
std::latch done(1);
bool completed = false;
int result = 0;

auto outer = [&]() -> task<std::tuple<int, int>> {
co_return co_await when_all(
returns_int(10),
returns_int(20));
};

run_async(s,
[&](std::tuple<int, int> t) {
auto [a, b] = t;
completed = true;
result = a + b;
done.count_down();
},
[&](auto) {
done.count_down();
}
)(outer());

done.wait();
BOOST_TEST(completed);
BOOST_TEST_EQ(result, 30);
}

void
run()
{
testStrandWhenAll();
testStrandWhenAllWithValues();
}
};

TEST_SUITE(
when_all_strand_test,
"boost.capy.when_all_strand");

} // capy
} // boost
Loading