linux內(nèi)核工作隊(duì)列講解和源碼詳細(xì)注釋
p = kthread_create(worker_thread, cwq, %s, wq->name);else p = kthread_create(worker_thread, cwq, %s/%d, wq->name, cpu);if (IS_ERR(p))
return NULL;// 保存線程指針cwq->thread = p;return p;} static int worker_thread(void *__cwq)
{ struct cpu_workqueue_struct *cwq = __cwq;// 聲明一個(gè)等待隊(duì)列DECLARE_WAITQUEUE(wait, current);// 信號(hào)struct k_sigaction sa;sigset_t blocked;current->flags |= PF_NOFREEZE;// 降低進(jìn)程優(yōu)先級(jí), 工作進(jìn)程不是個(gè)很緊急的進(jìn)程,不和其他進(jìn)程搶占CPU,通常在系統(tǒng)空閑時(shí)運(yùn)行set_user_nice(current, -5);/* Block and flush all signals */ // 阻塞所有信號(hào)sigfillset(blocked);sigprocmask(SIG_BLOCK, blocked, NULL);flush_signals(current);/* * We inherited MPOL_INTERLEAVE from the booting kernel. * Set MPOL_DEFAULT to insure node local allocations. */ numa_default_policy();/* SIG_IGN makes children autoreap: see do_notify_parent()。 */ // 信號(hào)處理都是忽略sa.sa.sa_handler = SIG_IGN;sa.sa.sa_flags = 0;siginitset(sa.sa.sa_mask, sigmask(SIGCHLD));do_sigaction(SIGCHLD, sa, (struct k_sigaction *)0);// 進(jìn)程可中斷set_current_state(TASK_INTERRUPTIBLE);// 進(jìn)入循環(huán), 沒(méi)明確停止該進(jìn)程就一直運(yùn)行while (!kthread_should_stop()) { // 設(shè)置more_work等待隊(duì)列, 當(dāng)有新work結(jié)構(gòu)鏈入隊(duì)列中時(shí)會(huì)激發(fā)此等待隊(duì)列add_wait_queue(cwq->more_work, wait);if (list_empty(cwq->worklist))
// 工作隊(duì)列為空, 睡眠schedule();else // 進(jìn)行運(yùn)行狀態(tài)__set_current_state(TASK_RUNNING);// 刪除等待隊(duì)列remove_wait_queue(cwq->more_work, wait);// 按鏈表遍歷執(zhí)行工作任務(wù)if (!list_empty(cwq->worklist))
run_workqueue(cwq);// 執(zhí)行完工作, 設(shè)置進(jìn)程是可中斷的, 重新循環(huán)等待工作set_current_state(TASK_INTERRUPTIBLE);} __set_current_state(TASK_RUNNING);return 0;}
// 運(yùn)行工作結(jié)構(gòu)static void run_workqueue(struct cpu_workqueue_struct *cwq)
{ unsigned long flags;/* * Keep taking off work from the queue until * done. */ // 加鎖spin_lock_irqsave(cwq->lock, flags);// 統(tǒng)計(jì)已經(jīng)遞歸調(diào)用了多少次了cwq->run_depth++;if (cwq->run_depth > 3) { // 遞歸調(diào)用此時(shí)太多/* morton gets to eat his hat */ printk(%s: recursion depth exceeded: %dn,__FUNCTION__, cwq->run_depth);dump_stack();} // 遍歷工作鏈表while (!list_empty(cwq->worklist)) { // 獲取的是next節(jié)點(diǎn)的struct work_struct *work = list_entry(cwq->worklist.next,struct work_struct, entry);void (*f) (void *) = work->func;void *data = work->data;// 刪除節(jié)點(diǎn), 同時(shí)節(jié)點(diǎn)中的list參數(shù)清空l(shuí)ist_del_init(cwq->worklist.next);// 解鎖// 現(xiàn)在在執(zhí)行以下代碼時(shí)可以中斷,run_workqueue本身可能會(huì)重新被調(diào)用, 所以要判斷遞歸深度spin_unlock_irqrestore(cwq->lock, flags);BUG_ON(work->wq_data != cwq);// 工作結(jié)構(gòu)已經(jīng)不在鏈表中clear_bit(0, work->pending);// 執(zhí)行工作函數(shù)f(data);// 重新加鎖spin_lock_irqsave(cwq->lock, flags);// 執(zhí)行完的工作序列號(hào)遞增cwq->remove_sequence++;// 喚醒工作完成等待隊(duì)列, 供釋放工作隊(duì)列wake_up(cwq->work_done);} // 減少遞歸深度cwq->run_depth——;// 解鎖spin_unlock_irqrestore(cwq->lock, flags);}
4.2 釋放工作隊(duì)列
/** * destroy_workqueue - safely terminate a workqueue * @wq: target workqueue * * Safely destroy a workqueue. All work currently pending will be done first. */ void destroy_workqueue(struct workqueue_struct *wq)
{ int cpu;// 清除當(dāng)前工作隊(duì)列中的所有工作flush_workqueue(wq);/* We don't need the distraction of CPUs appearing and vanishing. */ mutex_lock(workqueue_mutex);// 結(jié)束該工作隊(duì)列的線程if (is_single_threaded(wq))
cleanup_workqueue_thread(wq, singlethread_cpu);else { for_each_online_cpu(cpu)
cleanup_workqueue_thread(wq, cpu);list_del(wq->list);} mutex_unlock(workqueue_mutex);// 釋放工作隊(duì)列中對(duì)應(yīng)每個(gè)CPU的工作隊(duì)列數(shù)據(jù)free_percpu(wq->cpu_wq);kfree(wq);} EXPORT_SYMBOL_GPL(destroy_workqueue);
/** * flush_workqueue - ensure that any scheduled work has run to completion. * @wq: workqueue to flush * * Forces execution of the workqueue and blocks until its completion. * This is typically used in driver shutdown handlers. * * This function will sample each workqueue's current insert_sequence number and * will sleep until the head sequence is greater than or equal to that. This * means that we sleep until all works which were queued on entry have been * handled, but we are not livelocked by new incoming ones. * * This function used to run the workqueues itself. Now we just wait for the * helper threads to do it. */ void fastcall flush_workqueue(struct workqueue_struct *wq)
{ // 該進(jìn)程可以睡眠might_sleep();// 清空每個(gè)CPU上的工作隊(duì)列if (is_single_threaded(wq)) { /* Always use first cpu's area. */ flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));} else { int cpu;mutex_lock(workqueue_mutex);for_each_online_cpu(cpu)
flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));mutex_unlock(workqueue_mutex);} EXPORT_SYMBOL_GPL(flush_workqueue);
評(píng)論