Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
Tags
- CustomTab
- web
- design pattern
- SHA512
- Functional
- Android
- Clojure
- traits
- sha256
- program
- Chrono
- ranges
- coroutines
- template
- c++
- type_traits
- RAII
- SHA1
- stringprintf
- async
- WebView
- ChromeTab
- go
- Reflect
- kotlin
- haskell
- Observer
- sprintf
- AES
- Scala
Archives
- Today
- Total
프로그래밍 검색 블로그
고정크기 스레드 풀 본문
이전에 만든
blocking_channel을 사용
만들긴 했지만 반환되는 std::future<>를 굳이 받을 필요가 없다는 점을 제외하면
대부분의 경우에는 std::async 로 더 간단하게 할 수 있어서
실제로 쓸일은 없을듯 하다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | #include <functional> #include <type_traits> #include <future> #include <mutex> #include <queue> #include <deque> enum class thread_pool_status{ PREPARING, RUNNING, STOPPING, STOP }; template<typename _Retn> class fixed_thread_pool{ using _MyT = fixed_thread_pool; using func_t = std::function<_Retn()>; using task_t = std::packaged_task<_Retn()>; using synchronized = std::lock_guard<std::mutex>; const std::size_t mTotalThreadCount; std::size_t mCurrentThreadCount; std::mutex mPoolLock; thread_pool_status mPoolStatus = thread_pool_status::PREPARING; blocking_channel<task_t> mChannel; std::vector<std::thread> mThreads; public: fixed_thread_pool(int threadSize) : mTotalThreadCount(threadSize), mThreads(threadSize), mCurrentThreadCount(0){ mPoolStatus = thread_pool_status::RUNNING; } ~fixed_thread_pool(){ shutdown(); } template<typename _Func, typename... Args> std::future<void> submit(_Func&& f, Args&&... args) { task_t task(std::bind(f, std::forward<Args>(args)...)); auto future = task.get_future(); synchronized lock(mPoolLock); onTaskAdded(std::move(task)); return future; } thread_pool_status getStatus(){ return mPoolStatus; } /** * 모든 스레드를 join 시키고 더이상 실행할수 없게함 */ void shutdown(){ synchronized lock(mPoolLock); mPoolStatus = thread_pool_status::STOPPING; mChannel.shutdown(); for(auto& thread : mThreads){ if(thread.joinable()){ thread.join(); } } mPoolStatus = thread_pool_status::STOP; } private: /** * 새로운 작업이 할당되었을때 * 만약 스레드를 아직 mTotalThreadCount만큼 만들지 않았다면 새로 만든다. * 만드는 동시에 첫번째 작업을 넣고 실행 * 스레드가 충분하다면 channel에 push */ void onTaskAdded(task_t&& task){ if(mPoolStatus != thread_pool_status::RUNNING){ throw std::runtime_error("thread already stopped"); } if(mCurrentThreadCount < mTotalThreadCount) { mThreads[mCurrentThreadCount] = std::thread(&_MyT::runWorker, this, std::move(task)); mCurrentThreadCount += 1; } else{ mChannel.push(std::move(task)); } } /** * 스레드에서 할일 * 이 함수를 스레드에서 실행 */ void runWorker(task_t&& first_task){ first_task(); task_t task; while(mPoolStatus == thread_pool_status::RUNNING){ try{ mChannel.blocking_get(task); if(task.valid()){ task(); } }catch(channel_closed_error&){} } } }; |
테스트
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | fixed_thread_pool<void> p(32); void func(){ auto r = p.submit([](int a){ printf("in thread\n"); },1); r.get(); } int main(int argc, const char * argv[]) { puts("before func"); func(); func(); func(); func(); func(); puts("after func"); getchar(); } | cs |
Comments