Linux工作队列workqueue实现分析

发布时间:2014-10-22 14:20:03编辑:www.fx114.net 分享查询网我要评论
本篇文章主要介绍了"Linux工作队列workqueue实现分析",主要涉及到Linux工作队列workqueue实现分析方面的内容,对于Linux工作队列workqueue实现分析感兴趣的同学可以参考一下。

本文档的Copyleft归rosetta所有,使用GPL发布,可以自由拷贝、转载,转载时请保持文档的完整性。 参考资料:《Linux内核设计与实现》第3版 LKD3e、linux-2.6.27         工作队列子系统是一个用于调用创建内核线程的接口,通过它创建的线程负责执行由内核其它部分排到队列里的任务。这些内核线程称为工作者线程。工作队列子系统提供了一个缺省的工作都线程来处理工作。一般使用缺省线程即可,但当处理密集型和性能要求严格的任务时,创建拥有自己的工作者线程比较好。(引至LKD3e)         这个接口就是create_workqueue(),它返回一个struct workqueue_struct 结构指针。         /*          * The externally visible workqueue abstraction is an array of          * per-CPU workqueues:          */         struct workqueue_struct {             struct cpu_workqueue_struct *cpu_wq;             struct list_head list;             const char *name;             int singlethread;             int freezeable;     /* Freeze threads during suspend */         #ifdef CONFIG_LOCKDEP             struct lockdep_map lockdep_map;         #endif         };         上面注释:外部可见的工作队列抽象是由per-CPU的工作队列组成的数组,这个数组为结构体 cpu_workqueue_struct 。所以每种任务,它有一个自己的工作队列(struct workqueue_struct ),这个工作队列如果需要的话会为每个CPU创建对应的工作者线程。也就是说每个CPU,每个工作者线程对应一个cpu_workqueue_struct。         /*          * The per-CPU workqueue (if single thread, we always use the first          * possible cpu).          */         struct cpu_workqueue_struct {                      spinlock_t lock;                      struct list_head worklist;             wait_queue_head_t more_work;             struct work_struct *current_work;                      struct workqueue_struct *wq;             struct task_struct *thread;                      int run_depth;      /* Detect run_workqueue() recursion depth */         } ____cacheline_aligned;         刚才说了,每种任务,它都有一个自己的工作队列,这种任务的抽象就是为每个CPU创建一个处理这种任务的工作者线程(当然这是需要的情况下,如果不需要则会使用默认的工作者线程events/n,n为CPU编号),那么这个wq就是关联到自己的工作队列workqueue_struct。所有的工作者线程都是用普通的内核线程实现的,由worker_thread()函数完成。 当为一个CPU创建完一个线程后,这个线程执行死循环开始休眠,当有操作插入到队列时,线程被唤醒并执行。(LKD3e)         对应的具体工作由work_struct结构:         struct work_struct {             atomic_long_t data;             struct list_head entry;             work_func_t func;         };         由list_head可知,它是个双向链表,每个结点为一个work_struct结构类型。每个CPU上的每种类型的队列都对应这样一个链表。当一个工作线程被唤醒时,它会执行这个链表上的所有工作;工作执行完毕后就从链表上先移除相应的work_struct;当链表上不再有对象时就继续休眠。         总得来说就是每种任务(可以理解成为处理不同数据结构),有一个workqueue_struct。每个CPU有多个工作者线程,         每个线程处理相应的任务。处理过程最终调用的是func。至于func是怎么赋值的可参考下面代码实现。         下面再看下linux内核具体实现,看一下create_workqueue是怎样创建工作者线程的,代码有很多内核同步机制,暂时不关注。         #define create_workqueue(name) __create_workqueue((name), 0, 0)         #define __create_workqueue(name, singlethread, freezeable)  \//singlethread为0         ({                              \             static struct lock_class_key __key;         \             const char *__lock_name;                \                                         \             if (__builtin_constant_p(name))             \                 __lock_name = (name);               \             else                            \                 __lock_name = #name;                \                                         \             __create_workqueue_key((name), (singlethread),      \                            (freezeable), &__key,        \                            __lock_name);            \         })         struct workqueue_struct *__create_workqueue_key(const char *name,                                 int singlethread,                                 int freezeable,                                 struct lock_class_key *key,                                 const char *lock_name)         {             struct workqueue_struct *wq;             struct cpu_workqueue_struct *cwq;             int err = 0, cpu;                      wq = kzalloc(sizeof(*wq), GFP_KERNEL);             if (!wq)                 return NULL;                      wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);//为每个CPU分配cpu_workqueue_struct结构内存         //per-CPU可参考《Linux per-CPU实现分析》             if (!wq->cpu_wq) {                 kfree(wq);                 return NULL;             }                      wq->name = name;             lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);             wq->singlethread = singlethread;             wq->freezeable = freezeable;             INIT_LIST_HEAD(&wq->list);                      if (singlethread) {                 cwq = init_cpu_workqueue(wq, singlethread_cpu);                 err = create_workqueue_thread(cwq, singlethread_cpu);                 start_workqueue_thread(cwq, -1);             } else {//__create_workqueue传进来的参数singlethread为0                 cpu_maps_update_begin();                 /*                  * We must place this wq on list even if the code below fails.                  * cpu_down(cpu) can remove cpu from cpu_populated_map before                  * destroy_workqueue() takes the lock, in that case we leak                  * cwq[cpu]->thread.                  */                 spin_lock(&workqueue_lock);                 list_add(&wq->list, &workqueues);                 spin_unlock(&workqueue_lock);                 /*                  * We must initialize cwqs for each possible cpu even if we                  * are going to call destroy_workqueue() finally. Otherwise                  * cpu_up() can hit the uninitialized cwq once we drop the                  * lock.                  */                 for_each_possible_cpu(cpu) {//遍历所有CPU                     cwq = init_cpu_workqueue(wq, cpu);                     if (err || !cpu_online(cpu))                         continue;                     err = create_workqueue_thread(cwq, cpu);//为每个CPU创建一个内核线程                     start_workqueue_thread(cwq, cpu);                 }                 cpu_maps_update_done();             }                      if (err) {                 destroy_workqueue(wq);                 wq = NULL;             }             return wq;         }         static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)         {             struct workqueue_struct *wq = cwq->wq;             const char *fmt = is_single_threaded(wq) ? "%s" : "%s/%d";             struct task_struct *p;                      p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);//为每个CPU创建一个线程,每个线程的数据对应于 //结构体cpu_workqueue_struct             /*              * Nobody can add the work_struct to this cwq,              *  if (caller is __create_workqueue)              *      nobody should see this wq              *  else // caller is CPU_UP_PREPARE              *      cpu is not on cpu_online_map              * so we can abort safely.              */             if (IS_ERR(p))                 return PTR_ERR(p);                      cwq->thread = p;                      return 0;         }         static int worker_thread(void *__cwq) //最终创建完成的工作者线程         {                struct cpu_workqueue_struct *cwq = __cwq;             DEFINE_WAIT(wait);                      if (cwq->wq->freezeable)                 set_freezable();                      set_user_nice(current, -5);                          for (;;) {                 prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);                 if (!freezing(current) &&                     !kthread_should_stop() &&                     list_empty(&cwq->worklist))//如果cpu_workqueue_struct队列为空                     schedule();//进入睡眠状态                 finish_wait(&cwq->more_work, &wait);                          try_to_freeze();                          if (kthread_should_stop())                     break;                          run_workqueue(cwq);//真正干活在这里干的。             }                      return 0;         }         static void run_workqueue(struct cpu_workqueue_struct *cwq)         {             spin_lock_irq(&cwq->lock);             cwq->run_depth++;             if (cwq->run_depth > 3) {                 /* morton gets to eat his hat */                 printk("%s: recursion depth exceeded: %d\n",                     __func__, cwq->run_depth);                 dump_stack();             }             while (!list_empty(&cwq->worklist)) {//遍历当前CPU上所有类型的任务,因为每个cpu_workqueue_struct对应一种任务。                 struct work_struct *work = list_entry(cwq->worklist.next,                                 struct work_struct, entry);//遍历当前任务上的work_struct链表。                 work_func_t f = work->func;//给执行函数赋值。         #ifdef CONFIG_LOCKDEP                 /*                  * It is permissible to free the struct work_struct                  * from inside the function that is called from it,                  * this we need to take into account for lockdep too.                  * To avoid bogus "held lock freed" warnings as well                  * as problems when looking into work->lockdep_map,                  * make a copy and use that here.                  */                 struct lockdep_map lockdep_map = work->lockdep_map;         #endif                          cwq->current_work = work;                 list_del_init(cwq->worklist.next);                 spin_unlock_irq(&cwq->lock);                          BUG_ON(get_wq_data(work) != cwq);                 work_clear_pending(work);                 lock_map_acquire(&cwq->wq->lockdep_map);                 lock_map_acquire(&lockdep_map);                 f(work);//最终执行工作函数。                 lock_map_release(&lockdep_map);                 lock_map_release(&cwq->wq->lockdep_map);                          if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {                     printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "                             "%s/0x%08x/%d\n",                             current->comm, preempt_count(),                                 task_pid_nr(current));                     printk(KERN_ERR "    last function: ");                     print_symbol("%s\n", (unsigned long)f);                     debug_show_held_locks(current);                     dump_stack();                 }                          spin_lock_irq(&cwq->lock);                 cwq->current_work = NULL;             }             cwq->run_depth--;             spin_unlock_irq(&cwq->lock);         }     


上一篇:http常见状态
下一篇:查找算法总结(一)

相关文章

相关评论

本站评论功能暂时取消,后续此功能例行通知。

一、不得利用本站危害国家安全、泄露国家秘密,不得侵犯国家社会集体的和公民的合法权益,不得利用本站制作、复制和传播不法有害信息!

二、互相尊重,对自己的言论和行为负责。

好贷网好贷款