프로그래밍 검색 블로그

고정크기 스레드 풀 본문

C++자료구조

고정크기 스레드 풀

코딩조무사 2017. 10. 5. 00:53

이전에 만든

blocking_channel을 사용

http://psbs.tistory.com/11



만들긴 했지만 반환되는 std::future<>를 굳이 받을 필요가 없다는 점을 제외하면 

대부분의 경우에는 std::async 로 더 간단하게 할 수 있어서 

실제로 쓸일은 없을듯 하다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
 
#include <functional>
#include <type_traits>
#include <future>
#include <mutex>
#include <queue>
#include <deque>
enum class thread_pool_status{
    PREPARING,
    RUNNING,
    STOPPING,
    STOP
};
template<typename _Retn>
class fixed_thread_pool{
    using _MyT = fixed_thread_pool;
    using func_t = std::function<_Retn()>;
    using task_t = std::packaged_task<_Retn()>;
    using synchronized = std::lock_guard<std::mutex>;
    
    const std::size_t mTotalThreadCount;
    std::size_t mCurrentThreadCount;
    std::mutex mPoolLock;
    thread_pool_status mPoolStatus = thread_pool_status::PREPARING;
    
    blocking_channel<task_t> mChannel;
    std::vector<std::thread> mThreads;
    
public:
    fixed_thread_pool(int threadSize) :
        mTotalThreadCount(threadSize),
        mThreads(threadSize),
    mCurrentThreadCount(0){ mPoolStatus = thread_pool_status::RUNNING; }
    
    ~fixed_thread_pool(){
        shutdown();
    }
    
    template<typename _Func, typename... Args>
    std::future<void> submit(_Func&& f, Args&&... args) {
        task_t task(std::bind(f, std::forward<Args>(args)...));
        auto future = task.get_future();
        
        synchronized lock(mPoolLock);
        onTaskAdded(std::move(task));
        return future;
    }
    
    thread_pool_status getStatus(){
        return mPoolStatus;
    }
 
    /**
     * 모든 스레드를 join 시키고 더이상 실행할수 없게함
     */
    void shutdown(){
        synchronized lock(mPoolLock);
        mPoolStatus = thread_pool_status::STOPPING;
        
        mChannel.shutdown();
        for(auto& thread : mThreads){
            if(thread.joinable()){
                thread.join();
                
            }
        }
        
        mPoolStatus = thread_pool_status::STOP;
    }
private:
    
    /**
     * 새로운 작업이 할당되었을때
     * 만약 스레드를 아직 mTotalThreadCount만큼 만들지 않았다면 새로 만든다.
     * 만드는 동시에 첫번째 작업을 넣고 실행
     * 스레드가 충분하다면 channel에 push
     */
    void onTaskAdded(task_t&& task){
        if(mPoolStatus != thread_pool_status::RUNNING){
            throw std::runtime_error("thread already stopped");
        }
        
        if(mCurrentThreadCount < mTotalThreadCount) {
            mThreads[mCurrentThreadCount] = std::thread(&_MyT::runWorker, thisstd::move(task));
            mCurrentThreadCount += 1;
        }
        else{
           mChannel.push(std::move(task));
        }
    }
    
    /**
     * 스레드에서 할일
     * 이 함수를 스레드에서 실행
     */
    void runWorker(task_t&& first_task){
        first_task();
        
        task_t task;
        while(mPoolStatus == thread_pool_status::RUNNING){
            try{
                mChannel.blocking_get(task);
                if(task.valid()){
                    task();
                }
            }catch(channel_closed_error&){}
        }
    }
};
 
 

cs



테스트


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fixed_thread_pool<void> p(32);
void func(){
    auto r = p.submit([](int a){
        printf("in thread\n");
    },1);
 
    r.get();
}
int main(int argc, const char * argv[]) {
    puts("before func");
    func();
    func();
    func();
    func();
    func();
    puts("after func");
    getchar();
}
 
 
 
cs


Comments