线程池技术个人理解以及c语言的简单实现

发布时间:2017-1-19 6:07:46 编辑:www.fx114.net 分享查询网我要评论
本篇文章主要介绍了"线程池技术个人理解以及c语言的简单实现",主要涉及到线程池技术个人理解以及c语言的简单实现方面的内容,对于线程池技术个人理解以及c语言的简单实现感兴趣的同学可以参考一下。

     这几天闲来无事,网上无意中看到了关于线程池的东西,发现挺有意思的,找了挺多资料,研究一下,线程池技术,个人理解,线程池是个集合(概念上的,当然是线程的集合),假设这个集合中有3个线程A , B, C 这三个线程初始化的时候就是等待的状态,等待任务的到来,假设有任务1, 2, 3, 4, 5(任务处理的内容是一样的),线程池会怎么处理呢 ①:A会处理1任务(任务其实就是函数),B会处理2任务,C处理3任务 ②:当1任务结束之后,A会处理4任务;2任务结束后B会处理5任务,3任务结束后,C会等待(因为没有任务了)③:当所有任务处理完之后,A,B,C都会等待状态,等待新任务的到来上面陈述的是一般简单线程池的具体事例,那么如何实现,下面贴出代码(linux环境) #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h> #include <sys/types.h> #include <assert.h> #include <string.h> typedef void* (*TaskFun)(void *arg); typedef struct _TskNode{ TaskFun TaskDmd; /*task节点任务处理函数*/ void *arg; /*传入任务处理函数的参数*/ struct _TskNode *pPre ; /*前一个任务节点*/ struct _TskNode *pNext; /*后一个任务节点*/ } TskNode; /*队列节点*/ typedef struct _tskQueueManage{ int tskCurWaitNum; /*当前任务队列的任务数量*/ struct _TskNode *pTskHead; /*当前任务队列的首节点*/ struct _TskNode *pTskTail; /*当前任务队列的尾节点*/ } TskQueueManage; /*任务队列描述符*/ typedef struct _threadManage{ int thdMaxNum; /*线程池容纳最大线程数量*/ pthread_t *pth; /*线程指针*/ pthread_mutex_t mutex; /*线程锁*/ pthread_cond_t cond; /*线程条件变量*/ } ThreadManage; /*线程描述符*/ typedef struct _thredPoolManage{ int shutFlag; /*线程池摧毁标识*/ ThreadManage *pThdManage; /*线程描述符指针*/ TskQueueManage *pTskQueueManage; /*任务队列描述符指针*/ } ThdPoolManage; /*线程池描述符*/ /*初始化,上述描述符*/ static int mainDmdInit(int thdMaxNum); /*线程的创建*/ static void thdPoolCreat(); /*线程池中的线程启动后处理*/ static void threadCreatdmd(); /*任务添加*/ static int tskAddDmd(TaskFun TaskDmd, void* arg); /*线程池的销毁*/ static void thdPoolDestroy(); /*线程池描述符指针*/ ThdPoolManage* pThdPoolManage = NULL; static int mainDmdInit(int thdMaxNum) { int flag = 0; /*线程池描述符的创建*/ pThdPoolManage = (ThdPoolManage *)malloc(sizeof(struct _thredPoolManage)); if(pThdPoolManage != NULL){ /*线程描述符的创建*/ pThdPoolManage->pThdManage = (ThreadManage *)malloc(sizeof(struct _threadManage)); if(pThdPoolManage->pThdManage != NULL){ /*线程互斥锁于条件变量的初始化*/ pthread_mutex_init(&(pThdPoolManage->pThdManage->mutex), NULL); pthread_cond_init(&(pThdPoolManage->pThdManage->cond), NULL); /*将线程池中允许的最大线程数赋值给线程池描述符成员*/ pThdPoolManage->pThdManage->thdMaxNum = thdMaxNum; /*线程pthread_t的创建*/ pThdPoolManage->pThdManage->pth = (pthread_t*)malloc(thdMaxNum*sizeof(pthread_t)); if(pThdPoolManage->pThdManage->pth != NULL){ /*工作队列描述符的创建*/ pThdPoolManage->pTskQueueManage = (TskQueueManage *)malloc(sizeof(struct _tskQueueManage)); if(pThdPoolManage->pTskQueueManage != NULL){ /*初始队列工作描述符*/ pThdPoolManage->pTskQueueManage->tskCurWaitNum = 0; pThdPoolManage->pTskQueueManage->pTskHead = NULL; pThdPoolManage->pTskQueueManage->pTskTail = NULL; /*线程池中所有线程的创建*/ thdPoolCreat(); } else { /*注意: 如果malloc不成功,一定要free掉之前的malloc申请*/ free(pThdPoolManage->pThdManage->pth); free(pThdPoolManage->pThdManage); free(pThdPoolManage); /*如果malloc失败,说明错误 flag 赋值为 = 1*/ flag = 1; printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__); } } else { free(pThdPoolManage->pThdManage); free(pThdPoolManage); flag = 1; printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__); } } else { free(pThdPoolManage); flag = 1; printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__); } } else { flag = 1; printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__); } return flag; } static void thdPoolCreat() { ThreadManage *pThreadManage = NULL; int thdNum; pThreadManage = pThdPoolManage->pThdManage; for(thdNum = 0; thdNum < pThreadManage->thdMaxNum; thdNum++){ pthread_create(&(pThreadManage->pth[thdNum]), NULL, (void*)threadCreatdmd, NULL); } } static void threadCreatdmd(void *arg) { /*注意指针赋值之前要初始化为NULL,以免发生后续出现野指针的情况*/ TskQueueManage* pTskQueueManage = NULL; ThreadManage *pThreadManage = NULL; TskNode* pCurTsk = NULL; printf("threadCreatdmd_creat_success:[ThreadId]=[%x]\n", pthread_self()); pTskQueueManage = pThdPoolManage->pTskQueueManage; pThreadManage = pThdPoolManage->pThdManage; while(1){ /*注意,因为会创建很多个threadCreatdmd函数,由于每个函数都要访问临界代码:即对工作队列的操作,所以必须要枷锁 以保证每一个处理函数(threadCreatdmd),在访问工作队列的时候,此工作队列不会被其他的处理函数修改*/ pthread_mutex_lock(&(pThreadManage->mutex)); /*最开始创建线程池中的线程需要等待的两种情况,即,while循环条件成立的情况, 1.线程池初始化时候(即没添加任务之前), 2. 工作队列没有任务了(即任务都执行完了)*/ while((pTskQueueManage->tskCurWaitNum == 0)&&(pThdPoolManage->shutFlag == 0)){ printf("[ThreadId]=[%x]_waiting... ... ...\n", pthread_self()); /*这时此线程会在这里阻塞*/ pthread_cond_wait(&(pThreadManage->cond), &(pThreadManage->mutex)); } if(pThdPoolManage->shutFlag == 1){ pthread_mutex_unlock(&(pThreadManage->mutex)); printf("[ThreadId]=[%x]_exit\n", pthread_self()); pthread_exit(NULL); } printf("[ThreadId]=[%x]_starting_work!!\n", pthread_self()); assert(pTskQueueManage->tskCurWaitNum != 0); assert(pTskQueueManage->pTskHead != NULL); (pTskQueueManage->tskCurWaitNum)--; /*取工作队列头部节点*/ pCurTsk = pTskQueueManage->pTskHead; /*取头之后,将新头赋给下个元素*/ pTskQueueManage->pTskHead = pTskQueueManage->pTskHead->pNext; /* 注意:如果最后一个元素 这时候 pTskQueueManage->pTskHead 是空,空的话是没有pPre的*/ if(pTskQueueManage->pTskHead != NULL){ pTskQueueManage->pTskHead->pPre = NULL; } pthread_mutex_unlock(&(pThreadManage->mutex)); /*执行头部任务节点的任务函数(即上面取出的节点)*/ (pCurTsk->TaskDmd)(pCurTsk->arg); free(pCurTsk); pCurTsk = NULL; } } static int tskAddDmd(TaskFun TaskDmd, void* arg) { TskNode* pTskNode = NULL; TskQueueManage* pTskQueueManage = NULL; ThreadManage* pThdManage = NULL; int flag = 0; pTskQueueManage = pThdPoolManage->pTskQueueManage; pThdManage = pThdPoolManage->pThdManage; pthread_mutex_lock(&(pThdManage->mutex)); /*任务添加,创建一个工作节点*/ pTskNode = (TskNode*)malloc(sizeof(struct _TskNode)); if(pTskNode != NULL){ /*将任务(函数赋值给节点)*/ pTskNode->TaskDmd = TaskDmd; pTskNode->pNext = NULL; /*赋值参数*/ pTskNode->arg = arg; if(pTskQueueManage->tskCurWaitNum == 0){ pTskQueueManage->pTskHead = pTskNode; pTskQueueManage->pTskTail = pTskNode; } else { pTskQueueManage->pTskTail->pNext = pTskNode; pTskNode->pPre = pTskQueueManage->pTskTail; pTskQueueManage->pTskTail = pTskNode; } (pTskQueueManage->tskCurWaitNum)++; } else { flag = 1; printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__); } pthread_mutex_unlock(&(pThdManage->mutex)); pthread_cond_signal(&(pThdManage->cond)); return flag; } static void thdPoolDestroy() { int thdNum; pThdPoolManage->shutFlag = 1; TskQueueManage *pTskQueueManage = NULL; ThreadManage *pThdManage = NULL; TskNode* pTskNode = NULL; pTskQueueManage = pThdPoolManage->pTskQueueManage; pThdManage = pThdPoolManage->pThdManage; pthread_cond_broadcast(&(pThdPoolManage->pThdManage->cond)); for(thdNum = 0; thdNum < pThdManage -> thdMaxNum; thdNum++){ pthread_join(pThdManage->pth[thdNum], NULL); } while(pTskQueueManage->pTskHead != NULL){ pTskNode = pTskQueueManage->pTskHead; pTskQueueManage->pTskHead = pTskQueueManage->pTskHead->pNext; free(pTskNode); } pthread_mutex_destroy(&(pThdManage->mutex)); pthread_cond_destroy(&(pThdManage->cond)); free(pThdPoolManage->pThdManage->pth); free(pThdPoolManage->pThdManage); free(pThdPoolManage->pTskQueueManage); free(pThdPoolManage); pThdPoolManage = NULL; } void TaskDmd(void *arg) { printf("[ThreadId]=[%x] working on task[%d]\n", pthread_self(), *((int *)arg)); sleep(1); } /*测试代码*/ int main() { int flag; int taskAdd; int *taskArg; int taskNum = 10; int thdMaxNum = 3; flag = mainDmdInit(thdMaxNum); /*保险起见两秒,因为可能会造成添加任务在线程等待之前执行*/ sleep(2); taskArg = (int *)malloc(sizeof(int) * taskNum); memset(taskArg, 0x00, sizeof(int) * taskNum); if(flag != 1){ for(taskAdd = 0; taskAdd < taskNum; taskAdd++){ taskArg[taskAdd] = taskAdd; flag = tskAddDmd((void*)TaskDmd, &(taskArg[taskAdd])); if(flag == 1){ printf("jobAdd error Num=[%d]\n", taskAdd); } else { printf("jobAdd success Num = [%d]\n", taskAdd); } } } else { printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__); } sleep(10); thdPoolDestroy(); return 0; }  

上一篇:
下一篇:黑马程序员_IO流整理

相关文章

相关评论