Skip to content

Commit 39666a4

Browse files
authored
support wait with predicate in bthread's ConditionVariable (#3195)
1 parent 39a3436 commit 39666a4

File tree

3 files changed

+66
-0
lines changed

3 files changed

+66
-0
lines changed

src/bthread/bthread.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#if defined(__cplusplus)
3131
#include <iostream>
3232
#include "bthread/mutex.h" // use bthread_mutex_t in the RAII way
33+
#include "bthread/condition_variable.h" // use bthread_cond_t in the RAII way
3334
#endif // __cplusplus
3435

3536
#include "bthread/id.h"

src/bthread/condition_variable.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,20 @@ class ConditionVariable {
6363
bthread_cond_wait(&_cond, lock.mutex());
6464
}
6565

66+
template<typename Predicate>
67+
void wait(std::unique_lock<bthread::Mutex>& lock, Predicate p) {
68+
while (!p()) {
69+
bthread_cond_wait(&_cond, lock.mutex()->native_handler());
70+
}
71+
}
72+
73+
template<typename Predicate>
74+
void wait(std::unique_lock<bthread_mutex_t>& lock, Predicate p) {
75+
while (!p()) {
76+
bthread_cond_wait(&_cond, lock.mutex());
77+
}
78+
}
79+
6680
// Unlike std::condition_variable, we return ETIMEDOUT when time expires
6781
// rather than std::timeout
6882
int wait_for(std::unique_lock<bthread::Mutex>& lock,

test/bthread_cond_unittest.cpp

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,10 @@ TEST(CondTest, sanity) {
138138
struct WrapperArg {
139139
bthread::Mutex mutex;
140140
bthread::ConditionVariable cond;
141+
bool ready = false;
142+
static std::atomic<int> wake_time;
141143
};
144+
std::atomic<int> WrapperArg::wake_time{0};
142145

143146
void* cv_signaler(void* void_arg) {
144147
WrapperArg* a = (WrapperArg*)void_arg;
@@ -168,6 +171,23 @@ void* cv_mutex_waiter(void* void_arg) {
168171
return NULL;
169172
}
170173

174+
175+
void* cv_bmutex_waiter_with_pred(void* void_arg) {
176+
WrapperArg* a = (WrapperArg*)void_arg;
177+
std::unique_lock<bthread_mutex_t> lck(*a->mutex.native_handler());
178+
a->cond.wait(lck, [&] { return a->ready; });
179+
WrapperArg::wake_time.fetch_add(1);
180+
return NULL;
181+
}
182+
183+
void* cv_mutex_waiter_with_pred(void* void_arg) {
184+
WrapperArg* a = (WrapperArg*)void_arg;
185+
std::unique_lock<bthread::Mutex> lck(a->mutex);
186+
a->cond.wait(lck, [&] { return a->ready; });
187+
WrapperArg::wake_time.fetch_add(1);
188+
return NULL;
189+
}
190+
171191
#define COND_IN_PTHREAD
172192

173193
#ifndef COND_IN_PTHREAD
@@ -202,6 +222,37 @@ TEST(CondTest, cpp_wrapper) {
202222
}
203223
}
204224

225+
TEST(CondTest, cpp_wrapper2) {
226+
stop = false;
227+
bthread::ConditionVariable cond;
228+
pthread_t bmutex_waiter_threads[8];
229+
pthread_t mutex_waiter_threads[8];
230+
pthread_t signal_thread;
231+
WrapperArg a;
232+
for (size_t i = 0; i < ARRAY_SIZE(bmutex_waiter_threads); ++i) {
233+
ASSERT_EQ(0, pthread_create(&bmutex_waiter_threads[i], NULL,
234+
cv_bmutex_waiter_with_pred, &a));
235+
ASSERT_EQ(0, pthread_create(&mutex_waiter_threads[i], NULL,
236+
cv_mutex_waiter_with_pred, &a));
237+
}
238+
ASSERT_EQ(0, pthread_create(&signal_thread, NULL, cv_signaler, &a));
239+
bthread_usleep(100L * 1000);
240+
ASSERT_EQ(WrapperArg::wake_time, 0);
241+
{
242+
BAIDU_SCOPED_LOCK(a.mutex);
243+
stop = true;
244+
a.ready = true;
245+
246+
}
247+
pthread_join(signal_thread, NULL);
248+
a.cond.notify_all();
249+
for (size_t i = 0; i < ARRAY_SIZE(bmutex_waiter_threads); ++i) {
250+
pthread_join(bmutex_waiter_threads[i], NULL);
251+
pthread_join(mutex_waiter_threads[i], NULL);
252+
}
253+
ASSERT_EQ(WrapperArg::wake_time, 16);
254+
}
255+
205256
#ifndef COND_IN_PTHREAD
206257
#undef pthread_join
207258
#undef pthread_create

0 commit comments

Comments
 (0)