您的位置:首頁>正文

不會協程你想拿高薪?那是不存在的!微軟工程師帶來最全協程乾貨

例子1: follow.py 可以使用生成器完成 tail -f 的功能, 也就是跟蹤輸出的功能。

例子2: 生成器用作程式管道(類似unix pipe)

pipeline.py

理解pipeline.py

在pipeline中, follow函數和grep函數相當於程式鏈,

這樣就能鏈式處理常式。

Yield作為表達【我們開始說協程了~】:

grep.py

yield最重要的問題在於yield的值是多少。

yield的值需要使用coroutine協程這個概念 相對于僅僅生成值, 函數可以動態處理傳送進去的值, 而最後值通過yield返回。

協程的執行:

協程的執行和生成器的執行很相似。 當你初始化一個協程,

不會返回任何東西。 協程只能回應run和send函數。 協程的執行依賴run和send函數。 如果你缺志同道合的朋友!缺學習Python的氛圍!缺入門資料和視頻!缺書籍的PDF!缺遇到問題沒人解答?那就加這個群:103456743 你想要一起學習的朋友?資料免費提供的學習環境?好比圖書館!不收你一分錢, 只為提供一個良好的交流平臺!程式設計貴在多交流!

協程啟動:

所有的協程都需要調用.next( )函數。 調用的next( )函數將要執行到第一個yield運算式的位置。 在yield運算式的位置上, 很容易去執行就可以。 協程使用next()啟動。

使用協程的修飾器:

由【協程啟動】中我們知道, 啟動一個協程需要記得調用next( )來開始協程, 而這個啟動器容易忘記使用。 使用修飾器包一層, 來讓我們啟動協程。

【以後所有的協程器都會先有@coroutine

使用GeneratorExit這個異常類型

拋出一個異常:

在一個協程中, 可以拋出一個異常

第三部分, 管道篩檢程式:

叫篩檢程式其實並不貼切, 應該叫中間人Intermediate:其兩端都是send()函數。

(協程的中間層) 典型的中間層如下:

協程和生成器的對比

不同處:生成器使用了反覆運算器拉取資料,協程使用send()壓入資料。

變得多分支:(上一個協程發送資料去多個下一段協程)

圖示:

使用協程,你可以發送資料 給 多個 協程篩檢程式/協程終了。但是請注意,協程源只是用來傳遞資料的,過多的在協程源中傳遞資料是令人困惑並且複雜的。

一個例子

從文章中分別列印出含有’python‘ ’ply‘ ’swig‘ 關鍵字的句子。使用了一個協程佇列向所有printer協程 送出 接收到的資料。 圖示:

或者這樣Hook them up:

第三部分:協程,事件分發

事件處理

協程可以用在寫各種各樣處理事件流的元件。

介紹一個例子【這個例子會貫穿這個第三部分始終】要求做一個即時的公車GPS位置監控。編寫程式的主要目的是處理一份檔。傳統上,使用SAX進行處理。【SAX處理可以減少記憶體空間的使用,但SAX事件驅動的特性會讓它笨重和低效】。

把SAX和協程組合在一起

我們可以使用協程分發SAX事件,比如:

【最終的組合】

比如,把xml改成json最後從中篩選的出固定資訊. buses.py

協程的一個有趣的事情是,您可以將初始資料來源推送到低級別的語言,而不需要重寫所有處理階段。比如,PPT 中69-73頁介紹的,可以通過協程和低級別的語言進行聯動,從而達成非常好的優化效果。如Expat模組或者cxmlparse模組。 ps: ElementTree具有快速的遞增xml句法分析

第四部分:從資料處理到併發程式設計

複習一下上面學的特點:

協程有以下特點。

協程和生成器非常像。

我們可以用協程,去組合各種簡單的小元件。

我們可以使用創建進程管道,資料流程圖的方法去處理資料。

你可以使用伴有複雜資料處理代碼的協程。

一個相似的主題:

我們往協程內傳送資料,向執行緒內傳送資料,也向進程內傳送資料。那麼,協程自然很容易和執行緒和分散式系統聯繫起來。

基礎的併發:

我們可以通過添加一個額外的層,從而封裝協程進入執行緒或者子進程。這描繪了幾個基本的概念。

目標!協程+執行緒【沒有蛀牙。

下面看一個執行緒的例子。 cothread.py

例子解析:第一部分:先新建一個佇列。然後定義一個永久迴圈的執行緒;這個執行緒可以將其中的元素拉出訊息佇列,然後發送到目標裡面。第二部分:接受上面送來的元素,並通過佇列,將他們傳送進執行緒裡面。其中用到了GeneratorExit ,使得執行緒可以正確的關閉。

Hook up:cothread.py

但是:添加執行緒讓這個例子慢了50%

目標!協程+子進程

我們知道,進程之間是不共用系統資源的,所以要進行兩個子進程之間的通信,我們需要通過一個檔橋接兩個協程。

程式通過sendto()和recvfrom()傳遞檔。

和環境結合的協程:

使用協程,我們可以從一個任務的執行環境中剝離出他的實現。並且,協程就是那個實現。執行環境是你選擇的執行緒,子進程,網路等。

需要注意的警告 :

創建大量的協同程式,執行緒和進程可能是創建 不可維護 應用程式的一個好方法,並且會減慢你程式的速度。需要學習哪些是良好的使用協程的習慣。

在協程裡send()方法需要被適當的同步。

如果你對已經正在執行了的協程使用send()方法,那麼你的程式會發生崩潰。如:多個執行緒發送資料進入同一個協程。

同樣的不能創造迴圈的協程:

堆疊發送正在構建一種調用堆疊(send()函數不返回,直到目標產生)。

如果調用一個正在發送進程的協程,將會拋出一個錯誤。

send() 函數不會掛起任何一個協程的執行。

第五部分:任務一樣的協程

Task的概念

在併發程式設計中,通常將問題細分為“任務”。 “任務”有下面幾個經典的特點: * 擁有獨立的控制流。 * 擁有內在的狀態。 * 可以被安排規劃/掛起/恢復。 * 可與其他的任務通信。 協程也是任務的一種。

協程是任務的一種:

下面的部分 來告訴你協程有他自己的控制流,這裡 if 的控制就是控制流。

需要解決的問題(還在複習微嵌知識)

CPU執行的是應用程式,而不是你的作業系統,那 沒有被CPU執行的作業系統 是怎麼控制 正在運行的應用程式 中斷的呢。

中斷(interrupts)和陷阱(Traps)

作業系統只能通過兩個機制去獲得對應用程式的控制:中斷和陷阱。 * 中斷:和硬體有關的balabala。 * 陷阱:一個軟體發出的信號。 在兩種狀況下,CPU都會掛起正在做的,然後執行OS的代碼(這個時候,OS的代碼成功插入了應用程式的執行),此時,OS來切換了程式。

中斷的底層實現(略…碼字員微嵌只有70分��‍♀️)

中斷的高級表現:

* 中斷(Traps)使得OS的代碼可以實現。* 在程式運行遇到中斷(Traps)時,OS強制在CPU上停止你的程式。* 程式掛起,然後OS運行。

表現如下圖:

每次中斷(Traps)程式都會執行另一個不同的任務。

任務調度(非常簡單):

為了執行很多工,添加一簇任務佇列。

啟示(很重要):

BB了這麼多微嵌的內容,得到的是什麼結論呢。類比任務調度,協程中yield聲明可以理解為中斷(Traps)。當一個生成器函數碰到了yield聲明,那函數將立即掛起。而執行被傳給生成器函數運行的任何代碼。如果你把yield聲明看成了一個中斷,那麼你就可以元件一個多工執行的作業系統了。

第七部分:讓我們建一個作業系統。【起飛了,請握好扶手

目標:滿足以下條件建成一個作業系統。

1. 用純python語句。2. 不用執行緒。3. 不用子進程。4. 使用生成器和協程器。

我們用python去構建作業系統的一些動機:

* 尤其在存在執行緒鎖(GIL)的條件下,在執行緒間切換會變得非常重要。我要高併發!* 不阻塞和非同步I/O。我要高併發!* 在實戰中可能會遇到:伺服器要同時處理上千條用戶端的連接。我要高併發!* 大量的工作 致力於實現 事件驅動 或者說 回應式模型。我要組件化!* 綜上,python構建作業系統,有利於瞭解現在高併發,組件化的趨勢。

第一步:定義任務

定義一個任務類:任務像一個協程的殼,協程函數傳入target;任務類僅僅有一個run()函數。 pyos1.py

在foo中,yield就像中斷(Traps)一樣,每次執行run(),任務就會執行到下一個yield(一個中斷)。

第二步:構建調度者

下面是調度者類,兩個屬性分別是Task佇列和task_id與Task類對應的map。schedule()向佇列裡面添加Task。new()用來初始化目標函數(協程函數),將目標函數包裝在Task,進而裝入Scheduler。最後mainloop會從佇列裡面拉出task然後執行到task的target函數的yield為止,執行完以後再把task放回佇列。這樣下一次會從下一個yield開始執行。 pyos2.py

第三步:確定任務的停止條件

如果,target函數裡面不是閉環,那麼上面的代碼就會出錯。所以我們對Scheduler做改進。添加一個從任務佇列中刪除的操作,和對於StopIteration的驗證。 【對scheduler做改進的原因是任務的性質:可以被安排規劃/掛起/恢復。】

第四步:添加系統調用基類。

在OS中,中斷是應用程式請求系統服務的方式。在我們的代碼中,OS是調度者(scheduler),而中斷是yield。為了請求調度者服務,任務需要帶值使用yield聲明。 pyos4.py

代碼解析: 1. 如果taskmap裡面存在task,就從ready佇列裡面拿任務出來,如果沒有就結束mainloop。 2. 【就是傳說中的系統調運部分】ready佇列裡面的task被拿出來以後,執行task,返回一個result物件,並初始化這個result物件。如果佇列裡面的task要停止反覆運算了(終止yield這個過程)就從佇列裡刪除這個任務。 3. 最後再通過schedule函數把執行後的task放回佇列裡面。 4. 系統調用基類,之後所有的系統調用都要從這個基類繼承。

第4.5步:添加第一個系統調用

這個系統調用想返回任務的id。 Task的sendval屬性就像一個系統調用的返回值。當task重新運行的是後,sendval將會傳入這個系統調用。 pyos4.py

理解這段代碼的前提: (非常重要) 1. send()函數有返回值的,返回值是yield運算式右邊的值。在本段代碼中,result的返回值是yield GetTid()的GetTid的實例或者是yield後面的None。 2. 執行send(sendval)以後,sendval被傳入了yield運算式。並賦給了mytid,返回GetTid()給ruselt。

執行順序: 先創建一個調度者(Scheduler),然後在調度者裡面添加兩個協程函數:foo(), bar(),最後觸發mainloop進行協程的調度執行。

系統調用原理: 系統調用是基於系統調用類實現的,如GetTid類,其目的是傳出自己的tid。傳出自己的tid之後,再將task放回佇列。

第五步:任務管理

上面我們搞定了一個GetTid系統調用。我們現在搞定更多的系統調用: * 創建一個新的任務。 * 殺掉一個已經存在的任務。 * 等待一個任務結束。 這些細小的相同的操作會與執行緒,進程配合。

1. *創建一個新的系統調用*:通過系統調用加入一個task。

網路服務器的搭建:

現在已經完成了: * 多工。 * 開啟新的進程。 * 進行新任務的管理。 這些特點都非常符合一個web伺服器的各種特點。下面做一個Echo Server的嘗試。

但問題是這個網路服務器是I / O阻塞的。整個python的解譯器需要掛起,一直到I/O操作結束。

非阻塞的I/O

先額外介紹一個叫Select的模組。select模組可以用來監視一組socket連結的活躍狀態。用法如下:

下面實現一個非阻塞I/O的網路服務器,所用的思想就是之前所實現的Task waiting 思想。

源碼解析:__init__裡面的是兩個字典。用來存儲阻塞的IO的任務。waitforread()和waitforwrite()將需要等待寫入和等待讀取的task放在dict裡面。這裡的iopoll():使用select()去決定使用哪個檔描述器,並且能夠不阻塞任意一個和I/O才做有關係的任務。poll這個東西也可以放在mainloop裡面,但是這樣會帶來線性的開銷增長。 詳情請見: Python Select 解析 - 金角大王 - 博客園

添加新的系統調用:

更多請見開頭那個連接裡面的代碼:pyos8.py

這樣我們就完成了一個多工處理的OS。這個OS可以併發執行,可以創建、銷毀、等待任務。任務可以進行I/O操作。並且最後我們實現了併發伺服器。

第八部分:協程棧的一些問題的研究。

我們可能在使用yield的時候會遇到一些問題:

讓我們來看看這個叫蹦床的奇淫巧技。

代碼:trampoline.py

整個控制流如下:

我們看到,上圖中左側為統一的scheduler,如果我們想調用一個子執行緒,我們都用通過上面的scheduler進行調度。

千萬不要一個函數裡面包含兩個或多個以上的功能,比如函數是generator就是generator,是一個coroutine就是一個coroutin。

謝謝閱讀!

如有侵權請聯繫小編刪除哦!

協程和生成器的對比

不同處:生成器使用了反覆運算器拉取資料,協程使用send()壓入資料。

變得多分支:(上一個協程發送資料去多個下一段協程)

圖示:

使用協程,你可以發送資料 給 多個 協程篩檢程式/協程終了。但是請注意,協程源只是用來傳遞資料的,過多的在協程源中傳遞資料是令人困惑並且複雜的。

一個例子

從文章中分別列印出含有’python‘ ’ply‘ ’swig‘ 關鍵字的句子。使用了一個協程佇列向所有printer協程 送出 接收到的資料。 圖示:

或者這樣Hook them up:

第三部分:協程,事件分發

事件處理

協程可以用在寫各種各樣處理事件流的元件。

介紹一個例子【這個例子會貫穿這個第三部分始終】要求做一個即時的公車GPS位置監控。編寫程式的主要目的是處理一份檔。傳統上,使用SAX進行處理。【SAX處理可以減少記憶體空間的使用,但SAX事件驅動的特性會讓它笨重和低效】。

把SAX和協程組合在一起

我們可以使用協程分發SAX事件,比如:

【最終的組合】

比如,把xml改成json最後從中篩選的出固定資訊. buses.py

協程的一個有趣的事情是,您可以將初始資料來源推送到低級別的語言,而不需要重寫所有處理階段。比如,PPT 中69-73頁介紹的,可以通過協程和低級別的語言進行聯動,從而達成非常好的優化效果。如Expat模組或者cxmlparse模組。 ps: ElementTree具有快速的遞增xml句法分析

第四部分:從資料處理到併發程式設計

複習一下上面學的特點:

協程有以下特點。

協程和生成器非常像。

我們可以用協程,去組合各種簡單的小元件。

我們可以使用創建進程管道,資料流程圖的方法去處理資料。

你可以使用伴有複雜資料處理代碼的協程。

一個相似的主題:

我們往協程內傳送資料,向執行緒內傳送資料,也向進程內傳送資料。那麼,協程自然很容易和執行緒和分散式系統聯繫起來。

基礎的併發:

我們可以通過添加一個額外的層,從而封裝協程進入執行緒或者子進程。這描繪了幾個基本的概念。

目標!協程+執行緒【沒有蛀牙。

下面看一個執行緒的例子。 cothread.py

例子解析:第一部分:先新建一個佇列。然後定義一個永久迴圈的執行緒;這個執行緒可以將其中的元素拉出訊息佇列,然後發送到目標裡面。第二部分:接受上面送來的元素,並通過佇列,將他們傳送進執行緒裡面。其中用到了GeneratorExit ,使得執行緒可以正確的關閉。

Hook up:cothread.py

但是:添加執行緒讓這個例子慢了50%

目標!協程+子進程

我們知道,進程之間是不共用系統資源的,所以要進行兩個子進程之間的通信,我們需要通過一個檔橋接兩個協程。

程式通過sendto()和recvfrom()傳遞檔。

和環境結合的協程:

使用協程,我們可以從一個任務的執行環境中剝離出他的實現。並且,協程就是那個實現。執行環境是你選擇的執行緒,子進程,網路等。

需要注意的警告 :

創建大量的協同程式,執行緒和進程可能是創建 不可維護 應用程式的一個好方法,並且會減慢你程式的速度。需要學習哪些是良好的使用協程的習慣。

在協程裡send()方法需要被適當的同步。

如果你對已經正在執行了的協程使用send()方法,那麼你的程式會發生崩潰。如:多個執行緒發送資料進入同一個協程。

同樣的不能創造迴圈的協程:

堆疊發送正在構建一種調用堆疊(send()函數不返回,直到目標產生)。

如果調用一個正在發送進程的協程,將會拋出一個錯誤。

send() 函數不會掛起任何一個協程的執行。

第五部分:任務一樣的協程

Task的概念

在併發程式設計中,通常將問題細分為“任務”。 “任務”有下面幾個經典的特點: * 擁有獨立的控制流。 * 擁有內在的狀態。 * 可以被安排規劃/掛起/恢復。 * 可與其他的任務通信。 協程也是任務的一種。

協程是任務的一種:

下面的部分 來告訴你協程有他自己的控制流,這裡 if 的控制就是控制流。

需要解決的問題(還在複習微嵌知識)

CPU執行的是應用程式,而不是你的作業系統,那 沒有被CPU執行的作業系統 是怎麼控制 正在運行的應用程式 中斷的呢。

中斷(interrupts)和陷阱(Traps)

作業系統只能通過兩個機制去獲得對應用程式的控制:中斷和陷阱。 * 中斷:和硬體有關的balabala。 * 陷阱:一個軟體發出的信號。 在兩種狀況下,CPU都會掛起正在做的,然後執行OS的代碼(這個時候,OS的代碼成功插入了應用程式的執行),此時,OS來切換了程式。

中斷的底層實現(略…碼字員微嵌只有70分��‍♀️)

中斷的高級表現:

* 中斷(Traps)使得OS的代碼可以實現。* 在程式運行遇到中斷(Traps)時,OS強制在CPU上停止你的程式。* 程式掛起,然後OS運行。

表現如下圖:

每次中斷(Traps)程式都會執行另一個不同的任務。

任務調度(非常簡單):

為了執行很多工,添加一簇任務佇列。

啟示(很重要):

BB了這麼多微嵌的內容,得到的是什麼結論呢。類比任務調度,協程中yield聲明可以理解為中斷(Traps)。當一個生成器函數碰到了yield聲明,那函數將立即掛起。而執行被傳給生成器函數運行的任何代碼。如果你把yield聲明看成了一個中斷,那麼你就可以元件一個多工執行的作業系統了。

第七部分:讓我們建一個作業系統。【起飛了,請握好扶手

目標:滿足以下條件建成一個作業系統。

1. 用純python語句。2. 不用執行緒。3. 不用子進程。4. 使用生成器和協程器。

我們用python去構建作業系統的一些動機:

* 尤其在存在執行緒鎖(GIL)的條件下,在執行緒間切換會變得非常重要。我要高併發!* 不阻塞和非同步I/O。我要高併發!* 在實戰中可能會遇到:伺服器要同時處理上千條用戶端的連接。我要高併發!* 大量的工作 致力於實現 事件驅動 或者說 回應式模型。我要組件化!* 綜上,python構建作業系統,有利於瞭解現在高併發,組件化的趨勢。

第一步:定義任務

定義一個任務類:任務像一個協程的殼,協程函數傳入target;任務類僅僅有一個run()函數。 pyos1.py

在foo中,yield就像中斷(Traps)一樣,每次執行run(),任務就會執行到下一個yield(一個中斷)。

第二步:構建調度者

下面是調度者類,兩個屬性分別是Task佇列和task_id與Task類對應的map。schedule()向佇列裡面添加Task。new()用來初始化目標函數(協程函數),將目標函數包裝在Task,進而裝入Scheduler。最後mainloop會從佇列裡面拉出task然後執行到task的target函數的yield為止,執行完以後再把task放回佇列。這樣下一次會從下一個yield開始執行。 pyos2.py

第三步:確定任務的停止條件

如果,target函數裡面不是閉環,那麼上面的代碼就會出錯。所以我們對Scheduler做改進。添加一個從任務佇列中刪除的操作,和對於StopIteration的驗證。 【對scheduler做改進的原因是任務的性質:可以被安排規劃/掛起/恢復。】

第四步:添加系統調用基類。

在OS中,中斷是應用程式請求系統服務的方式。在我們的代碼中,OS是調度者(scheduler),而中斷是yield。為了請求調度者服務,任務需要帶值使用yield聲明。 pyos4.py

代碼解析: 1. 如果taskmap裡面存在task,就從ready佇列裡面拿任務出來,如果沒有就結束mainloop。 2. 【就是傳說中的系統調運部分】ready佇列裡面的task被拿出來以後,執行task,返回一個result物件,並初始化這個result物件。如果佇列裡面的task要停止反覆運算了(終止yield這個過程)就從佇列裡刪除這個任務。 3. 最後再通過schedule函數把執行後的task放回佇列裡面。 4. 系統調用基類,之後所有的系統調用都要從這個基類繼承。

第4.5步:添加第一個系統調用

這個系統調用想返回任務的id。 Task的sendval屬性就像一個系統調用的返回值。當task重新運行的是後,sendval將會傳入這個系統調用。 pyos4.py

理解這段代碼的前提: (非常重要) 1. send()函數有返回值的,返回值是yield運算式右邊的值。在本段代碼中,result的返回值是yield GetTid()的GetTid的實例或者是yield後面的None。 2. 執行send(sendval)以後,sendval被傳入了yield運算式。並賦給了mytid,返回GetTid()給ruselt。

執行順序: 先創建一個調度者(Scheduler),然後在調度者裡面添加兩個協程函數:foo(), bar(),最後觸發mainloop進行協程的調度執行。

系統調用原理: 系統調用是基於系統調用類實現的,如GetTid類,其目的是傳出自己的tid。傳出自己的tid之後,再將task放回佇列。

第五步:任務管理

上面我們搞定了一個GetTid系統調用。我們現在搞定更多的系統調用: * 創建一個新的任務。 * 殺掉一個已經存在的任務。 * 等待一個任務結束。 這些細小的相同的操作會與執行緒,進程配合。

1. *創建一個新的系統調用*:通過系統調用加入一個task。

網路服務器的搭建:

現在已經完成了: * 多工。 * 開啟新的進程。 * 進行新任務的管理。 這些特點都非常符合一個web伺服器的各種特點。下面做一個Echo Server的嘗試。

但問題是這個網路服務器是I / O阻塞的。整個python的解譯器需要掛起,一直到I/O操作結束。

非阻塞的I/O

先額外介紹一個叫Select的模組。select模組可以用來監視一組socket連結的活躍狀態。用法如下:

下面實現一個非阻塞I/O的網路服務器,所用的思想就是之前所實現的Task waiting 思想。

源碼解析:__init__裡面的是兩個字典。用來存儲阻塞的IO的任務。waitforread()和waitforwrite()將需要等待寫入和等待讀取的task放在dict裡面。這裡的iopoll():使用select()去決定使用哪個檔描述器,並且能夠不阻塞任意一個和I/O才做有關係的任務。poll這個東西也可以放在mainloop裡面,但是這樣會帶來線性的開銷增長。 詳情請見: Python Select 解析 - 金角大王 - 博客園

添加新的系統調用:

更多請見開頭那個連接裡面的代碼:pyos8.py

這樣我們就完成了一個多工處理的OS。這個OS可以併發執行,可以創建、銷毀、等待任務。任務可以進行I/O操作。並且最後我們實現了併發伺服器。

第八部分:協程棧的一些問題的研究。

我們可能在使用yield的時候會遇到一些問題:

讓我們來看看這個叫蹦床的奇淫巧技。

代碼:trampoline.py

整個控制流如下:

我們看到,上圖中左側為統一的scheduler,如果我們想調用一個子執行緒,我們都用通過上面的scheduler進行調度。

千萬不要一個函數裡面包含兩個或多個以上的功能,比如函數是generator就是generator,是一個coroutine就是一個coroutin。

謝謝閱讀!

如有侵權請聯繫小編刪除哦!

同類文章
Next Article
喜欢就按个赞吧!!!
点击关闭提示