我正在为我的ubuntu服务器(针对我的多客户端匿名聊天程序)实现一种简单的线程池机制,并且需要使我的工作线程进入睡眠状态,直到需要执行一项工作(以函数指针和参数的形式) 。
我当前的系统即将关闭。我(工人线程正在)问经理是否有工作可用,以及是否有5毫秒没有睡眠。如果存在,请将作业添加到工作队列中并运行该函数。糟糕的循环浪费。
什么我 喜欢 做的是做一个简单的事件性的系统。我正在考虑有一个互斥量向量(每个工人一个),并在创建时将互斥量的句柄作为参数传入。然后在我的经理类(保存并分发作业)中,每当创建线程时,锁定互斥体。当需要执行作业时,请解锁下一个互斥锁,等待其锁定和解锁,然后重新锁定。但是我想知道是否有更好的方法可以达到这个目的。
tldr; 所以我的问题是这个。使线程等待管理类中的工作的最有效,最有效和最安全的方法是什么?轮询是我什至应该考虑的技术(一次有1000个以上的客户端),互斥锁是否不错?还是还有其他技术?
您需要的是条件变量。 所有工作线程都调用wait(),这将暂停它们。
然后,父线程将工作项放在队列中,并在条件变量上调用signal。这将唤醒正在hibernate的一个线程。它可以从队列中删除该作业,然后执行该作业,然后在条件变量上调用wait以返回睡眠状态。
尝试:
#include <pthread.h> #include <memory> #include <list> // Use RAII to do the lock/unlock struct MutexLock { MutexLock(pthread_mutex_t& m) : mutex(m) { pthread_mutex_lock(&mutex); } ~MutexLock() { pthread_mutex_unlock(&mutex); } private: pthread_mutex_t& mutex; }; // The base class of all work we want to do. struct Job { virtual void doWork() = 0; }; // pthreads is a C library the call back must be a C function. extern "C" void* threadPoolThreadStart(void*); // The very basre minimal part of a thread pool // It does not create the workers. You need to create the work threads // then make them call workerStart(). I leave that as an exercise for you. class ThreadPool { public: ThreadPool(unsigned int threadCount=1); ~ThreadPool(); void addWork(std::auto_ptr<Job> job); private: friend void* threadPoolThreadStart(void*); void workerStart(); std::auto_ptr<Job> getJob(); bool finished; // Threads will re-wait while this is true. pthread_mutex_t mutex; // A lock so that we can sequence accesses. pthread_cond_t cond; // The condition variable that is used to hold worker threads. std::list<Job*> workQueue; // A queue of jobs. std::vector<pthread_t>threads; }; // Create the thread pool ThreadPool::ThreadPool(int unsigned threadCount) : finished(false) , threads(threadCount) { // If we fail creating either pthread object than throw a fit. if (pthread_mutex_init(&mutex, NULL) != 0) { throw int(1); } if (pthread_cond_init(&cond, NULL) != 0) { pthread_mutex_destroy(&mutex); throw int(2); } for(unsigned int loop=0; loop < threadCount;++loop) { if (pthread_create(threads[loop], NULL, threadPoolThreadStart, this) != 0) { // One thread failed: clean up for(unsigned int kill = loop -1; kill < loop /*unsigned will wrap*/;--kill) { pthread_kill(threads[kill], 9); } throw int(3); } } } // Cleanup any left overs. // Note. This does not deal with worker threads. // You need to add a method to flush all worker threads // out of this pobject before you let the destructor destroy it. ThreadPool::~ThreadPool() { finished = true; for(std::vector<pthread_t>::iterator loop = threads.begin();loop != threads.end(); ++loop) { // Send enough signals to free all threads. pthread_cond_signal(&cond); } for(std::vector<pthread_t>::iterator loop = threads.begin();loop != threads.end(); ++loop) { // Wait for all threads to exit (they will as finished is true and // we sent enough signals to make sure // they are running). void* result; pthread_join(*loop, &result); } // Destroy the pthread objects. pthread_cond_destroy(&cond); pthread_mutex_destroy(&mutex); // Delete all re-maining jobs. // Notice how we took ownership of the jobs. for(std::list<Job*>::const_iterator loop = workQueue.begin(); loop != workQueue.end();++loop) { delete *loop; } } // Add a new job to the queue // Signal the condition variable. This will flush a waiting worker // otherwise the job will wait for a worker to finish processing its current job. void ThreadPool::addWork(std::auto_ptr<Job> job) { MutexLock lock(mutex); workQueue.push_back(job.release()); pthread_cond_signal(&cond); } // Start a thread. // Make sure no exceptions escape as that is bad. void* threadPoolThreadStart(void* data) { ThreadPool* pool = reinterpret_cast<ThreadPool*>(workerStart); try { pool->workerStart(); } catch(...){} return NULL; } // This is the main worker loop. void ThreadPool::workerStart() { while(!finished) { std::auto_ptr<Job> job = getJob(); if (job.get() != NULL) { job->doWork(); } } } // The workers come here to get a job. // If there are non in the queue they are suspended waiting on cond // until a new job is added above. std::auto_ptr<Job> ThreadPool::getJob() { MutexLock lock(mutex); while((workQueue.empty()) && (!finished)) { pthread_cond_wait(&cond, &mutex); // The wait releases the mutex lock and suspends the thread (until a signal). // When a thread wakes up it is help until it can acquire the mutex so when we // get here the mutex is again locked. // // Note: You must use while() here. This is because of the situation. // Two workers: Worker A processing job A. // Worker B suspended on condition variable. // Parent adds a new job and calls signal. // This wakes up thread B. But it is possible for Worker A to finish its // work and lock the mutex before the Worker B is released from the above call. // // If that happens then Worker A will see that the queue is not empty // and grab the work item in the queue and start processing. Worker B will // then lock the mutext and proceed here. If the above is not a while then // it would try and remove an item from an empty queue. With a while it sees // that the queue is empty and re-suspends on the condition variable above. } std::auto_ptr<Job> result; if (!finished) { result.reset(workQueue.front()); workQueue.pop_front(); } return result; }