例子1: follow.py 可以使用生成器完成 tail -f 的功能, 也就是跟蹤輸出的功能。
例子2: 生成器用作程式管道(類似unix pipe)
pipeline.py
理解pipeline.py
在pipeline中, follow函數和grep函數相當於程式鏈,
Yield作為表達【我們開始說協程了~】:
grep.py
yield最重要的問題在於yield的值是多少。
yield的值需要使用coroutine協程這個概念 相對于僅僅生成值, 函數可以動態處理傳送進去的值, 而最後值通過yield返回。
協程的執行:
協程的執行和生成器的執行很相似。 當你初始化一個協程,
協程啟動:
所有的協程都需要調用.next( )函數。 調用的next( )函數將要執行到第一個yield運算式的位置。 在yield運算式的位置上, 很容易去執行就可以。 協程使用next()啟動。
使用協程的修飾器:
由【協程啟動】中我們知道, 啟動一個協程需要記得調用next( )來開始協程, 而這個啟動器容易忘記使用。 使用修飾器包一層, 來讓我們啟動協程。
使用GeneratorExit這個異常類型
拋出一個異常:
在一個協程中, 可以拋出一個異常
第三部分, 管道篩檢程式:
叫篩檢程式其實並不貼切, 應該叫中間人Intermediate:其兩端都是send()函數。
(協程的中間層) 典型的中間層如下:
協程和生成器的對比
不同處:生成器使用了反覆運算器拉取資料,協程使用send()壓入資料。
變得多分支:(上一個協程發送資料去多個下一段協程)
圖示:
使用協程,你可以發送資料 給 多個 協程篩檢程式/協程終了。但是請注意,協程源只是用來傳遞資料的,過多的在協程源中傳遞資料是令人困惑並且複雜的。
一個例子
從文章中分別列印出含有’python‘ ’ply‘ ’swig‘ 關鍵字的句子。使用了一個協程佇列向所有printer協程 送出 接收到的資料。 圖示:
或者這樣Hook them up:
第三部分:協程,事件分發
事件處理
協程可以用在寫各種各樣處理事件流的元件。
介紹一個例子【這個例子會貫穿這個第三部分始終】要求做一個即時的公車GPS位置監控。編寫程式的主要目的是處理一份檔。傳統上,使用SAX進行處理。【SAX處理可以減少記憶體空間的使用,但SAX事件驅動的特性會讓它笨重和低效】。把SAX和協程組合在一起
我們可以使用協程分發SAX事件,比如:
【最終的組合】
比如,把xml改成json最後從中篩選的出固定資訊. buses.py
協程的一個有趣的事情是,您可以將初始資料來源推送到低級別的語言,而不需要重寫所有處理階段。比如,PPT 中69-73頁介紹的,可以通過協程和低級別的語言進行聯動,從而達成非常好的優化效果。如Expat模組或者cxmlparse模組。 ps: ElementTree具有快速的遞增xml句法分析
第四部分:從資料處理到併發程式設計
複習一下上面學的特點:
協程有以下特點。
協程和生成器非常像。
我們可以用協程,去組合各種簡單的小元件。
我們可以使用創建進程管道,資料流程圖的方法去處理資料。
你可以使用伴有複雜資料處理代碼的協程。
一個相似的主題:
我們往協程內傳送資料,向執行緒內傳送資料,也向進程內傳送資料。那麼,協程自然很容易和執行緒和分散式系統聯繫起來。
基礎的併發:
我們可以通過添加一個額外的層,從而封裝協程進入執行緒或者子進程。這描繪了幾個基本的概念。
目標!協程+執行緒【沒有蛀牙。
下面看一個執行緒的例子。 cothread.py
例子解析:第一部分:先新建一個佇列。然後定義一個永久迴圈的執行緒;這個執行緒可以將其中的元素拉出訊息佇列,然後發送到目標裡面。第二部分:接受上面送來的元素,並通過佇列,將他們傳送進執行緒裡面。其中用到了GeneratorExit ,使得執行緒可以正確的關閉。
Hook up:cothread.py
但是:添加執行緒讓這個例子慢了50%
目標!協程+子進程
我們知道,進程之間是不共用系統資源的,所以要進行兩個子進程之間的通信,我們需要通過一個檔橋接兩個協程。
程式通過sendto()和recvfrom()傳遞檔。
和環境結合的協程:
使用協程,我們可以從一個任務的執行環境中剝離出他的實現。並且,協程就是那個實現。執行環境是你選擇的執行緒,子進程,網路等。
需要注意的警告 :
創建大量的協同程式,執行緒和進程可能是創建 不可維護 應用程式的一個好方法,並且會減慢你程式的速度。需要學習哪些是良好的使用協程的習慣。
在協程裡send()方法需要被適當的同步。
如果你對已經正在執行了的協程使用send()方法,那麼你的程式會發生崩潰。如:多個執行緒發送資料進入同一個協程。
同樣的不能創造迴圈的協程:
堆疊發送正在構建一種調用堆疊(send()函數不返回,直到目標產生)。
如果調用一個正在發送進程的協程,將會拋出一個錯誤。
send() 函數不會掛起任何一個協程的執行。
第五部分:任務一樣的協程
Task的概念
在併發程式設計中,通常將問題細分為“任務”。 “任務”有下面幾個經典的特點: * 擁有獨立的控制流。 * 擁有內在的狀態。 * 可以被安排規劃/掛起/恢復。 * 可與其他的任務通信。 協程也是任務的一種。
協程是任務的一種:
下面的部分 來告訴你協程有他自己的控制流,這裡 if 的控制就是控制流。
需要解決的問題(還在複習微嵌知識)
CPU執行的是應用程式,而不是你的作業系統,那 沒有被CPU執行的作業系統 是怎麼控制 正在運行的應用程式 中斷的呢。
中斷(interrupts)和陷阱(Traps)
作業系統只能通過兩個機制去獲得對應用程式的控制:中斷和陷阱。 * 中斷:和硬體有關的balabala。 * 陷阱:一個軟體發出的信號。 在兩種狀況下,CPU都會掛起正在做的,然後執行OS的代碼(這個時候,OS的代碼成功插入了應用程式的執行),此時,OS來切換了程式。
中斷的底層實現(略…碼字員微嵌只有70分��♀️)
中斷的高級表現:
* 中斷(Traps)使得OS的代碼可以實現。* 在程式運行遇到中斷(Traps)時,OS強制在CPU上停止你的程式。* 程式掛起,然後OS運行。表現如下圖:
每次中斷(Traps)程式都會執行另一個不同的任務。
任務調度(非常簡單):
為了執行很多工,添加一簇任務佇列。
啟示(很重要):
BB了這麼多微嵌的內容,得到的是什麼結論呢。類比任務調度,協程中yield聲明可以理解為中斷(Traps)。當一個生成器函數碰到了yield聲明,那函數將立即掛起。而執行被傳給生成器函數運行的任何代碼。如果你把yield聲明看成了一個中斷,那麼你就可以元件一個多工執行的作業系統了。
第七部分:讓我們建一個作業系統。【起飛了,請握好扶手
目標:滿足以下條件建成一個作業系統。
1. 用純python語句。2. 不用執行緒。3. 不用子進程。4. 使用生成器和協程器。我們用python去構建作業系統的一些動機:
* 尤其在存在執行緒鎖(GIL)的條件下,在執行緒間切換會變得非常重要。我要高併發!* 不阻塞和非同步I/O。我要高併發!* 在實戰中可能會遇到:伺服器要同時處理上千條用戶端的連接。我要高併發!* 大量的工作 致力於實現 事件驅動 或者說 回應式模型。我要組件化!* 綜上,python構建作業系統,有利於瞭解現在高併發,組件化的趨勢。第一步:定義任務
定義一個任務類:任務像一個協程的殼,協程函數傳入target;任務類僅僅有一個run()函數。 pyos1.py
在foo中,yield就像中斷(Traps)一樣,每次執行run(),任務就會執行到下一個yield(一個中斷)。
第二步:構建調度者
下面是調度者類,兩個屬性分別是Task佇列和task_id與Task類對應的map。schedule()向佇列裡面添加Task。new()用來初始化目標函數(協程函數),將目標函數包裝在Task,進而裝入Scheduler。最後mainloop會從佇列裡面拉出task然後執行到task的target函數的yield為止,執行完以後再把task放回佇列。這樣下一次會從下一個yield開始執行。 pyos2.py
第三步:確定任務的停止條件
如果,target函數裡面不是閉環,那麼上面的代碼就會出錯。所以我們對Scheduler做改進。添加一個從任務佇列中刪除的操作,和對於StopIteration的驗證。 【對scheduler做改進的原因是任務的性質:可以被安排規劃/掛起/恢復。】
第四步:添加系統調用基類。
在OS中,中斷是應用程式請求系統服務的方式。在我們的代碼中,OS是調度者(scheduler),而中斷是yield。為了請求調度者服務,任務需要帶值使用yield聲明。 pyos4.py
代碼解析: 1. 如果taskmap裡面存在task,就從ready佇列裡面拿任務出來,如果沒有就結束mainloop。 2. 【就是傳說中的系統調運部分】ready佇列裡面的task被拿出來以後,執行task,返回一個result物件,並初始化這個result物件。如果佇列裡面的task要停止反覆運算了(終止yield這個過程)就從佇列裡刪除這個任務。 3. 最後再通過schedule函數把執行後的task放回佇列裡面。 4. 系統調用基類,之後所有的系統調用都要從這個基類繼承。
第4.5步:添加第一個系統調用
這個系統調用想返回任務的id。 Task的sendval屬性就像一個系統調用的返回值。當task重新運行的是後,sendval將會傳入這個系統調用。 pyos4.py
理解這段代碼的前提: (非常重要) 1. send()函數有返回值的,返回值是yield運算式右邊的值。在本段代碼中,result的返回值是yield GetTid()的GetTid的實例或者是yield後面的None。 2. 執行send(sendval)以後,sendval被傳入了yield運算式。並賦給了mytid,返回GetTid()給ruselt。
執行順序: 先創建一個調度者(Scheduler),然後在調度者裡面添加兩個協程函數:foo(), bar(),最後觸發mainloop進行協程的調度執行。
系統調用原理: 系統調用是基於系統調用類實現的,如GetTid類,其目的是傳出自己的tid。傳出自己的tid之後,再將task放回佇列。
第五步:任務管理
上面我們搞定了一個GetTid系統調用。我們現在搞定更多的系統調用: * 創建一個新的任務。 * 殺掉一個已經存在的任務。 * 等待一個任務結束。 這些細小的相同的操作會與執行緒,進程配合。
1. *創建一個新的系統調用*:通過系統調用加入一個task。網路服務器的搭建:
現在已經完成了: * 多工。 * 開啟新的進程。 * 進行新任務的管理。 這些特點都非常符合一個web伺服器的各種特點。下面做一個Echo Server的嘗試。
但問題是這個網路服務器是I / O阻塞的。整個python的解譯器需要掛起,一直到I/O操作結束。
非阻塞的I/O
先額外介紹一個叫Select的模組。select模組可以用來監視一組socket連結的活躍狀態。用法如下:
下面實現一個非阻塞I/O的網路服務器,所用的思想就是之前所實現的Task waiting 思想。
源碼解析:__init__裡面的是兩個字典。用來存儲阻塞的IO的任務。waitforread()和waitforwrite()將需要等待寫入和等待讀取的task放在dict裡面。這裡的iopoll():使用select()去決定使用哪個檔描述器,並且能夠不阻塞任意一個和I/O才做有關係的任務。poll這個東西也可以放在mainloop裡面,但是這樣會帶來線性的開銷增長。 詳情請見: Python Select 解析 - 金角大王 - 博客園
添加新的系統調用:
更多請見開頭那個連接裡面的代碼:pyos8.py
這樣我們就完成了一個多工處理的OS。這個OS可以併發執行,可以創建、銷毀、等待任務。任務可以進行I/O操作。並且最後我們實現了併發伺服器。
第八部分:協程棧的一些問題的研究。
我們可能在使用yield的時候會遇到一些問題:
讓我們來看看這個叫蹦床的奇淫巧技。
代碼:trampoline.py
整個控制流如下:
我們看到,上圖中左側為統一的scheduler,如果我們想調用一個子執行緒,我們都用通過上面的scheduler進行調度。
千萬不要一個函數裡面包含兩個或多個以上的功能,比如函數是generator就是generator,是一個coroutine就是一個coroutin。
謝謝閱讀!
如有侵權請聯繫小編刪除哦!
協程和生成器的對比
不同處:生成器使用了反覆運算器拉取資料,協程使用send()壓入資料。
變得多分支:(上一個協程發送資料去多個下一段協程)
圖示:
使用協程,你可以發送資料 給 多個 協程篩檢程式/協程終了。但是請注意,協程源只是用來傳遞資料的,過多的在協程源中傳遞資料是令人困惑並且複雜的。
一個例子
從文章中分別列印出含有’python‘ ’ply‘ ’swig‘ 關鍵字的句子。使用了一個協程佇列向所有printer協程 送出 接收到的資料。 圖示:
或者這樣Hook them up:
第三部分:協程,事件分發
事件處理
協程可以用在寫各種各樣處理事件流的元件。
介紹一個例子【這個例子會貫穿這個第三部分始終】要求做一個即時的公車GPS位置監控。編寫程式的主要目的是處理一份檔。傳統上,使用SAX進行處理。【SAX處理可以減少記憶體空間的使用,但SAX事件驅動的特性會讓它笨重和低效】。把SAX和協程組合在一起
我們可以使用協程分發SAX事件,比如:
【最終的組合】
比如,把xml改成json最後從中篩選的出固定資訊. buses.py
協程的一個有趣的事情是,您可以將初始資料來源推送到低級別的語言,而不需要重寫所有處理階段。比如,PPT 中69-73頁介紹的,可以通過協程和低級別的語言進行聯動,從而達成非常好的優化效果。如Expat模組或者cxmlparse模組。 ps: ElementTree具有快速的遞增xml句法分析
第四部分:從資料處理到併發程式設計
複習一下上面學的特點:
協程有以下特點。
協程和生成器非常像。
我們可以用協程,去組合各種簡單的小元件。
我們可以使用創建進程管道,資料流程圖的方法去處理資料。
你可以使用伴有複雜資料處理代碼的協程。
一個相似的主題:
我們往協程內傳送資料,向執行緒內傳送資料,也向進程內傳送資料。那麼,協程自然很容易和執行緒和分散式系統聯繫起來。
基礎的併發:
我們可以通過添加一個額外的層,從而封裝協程進入執行緒或者子進程。這描繪了幾個基本的概念。
目標!協程+執行緒【沒有蛀牙。
下面看一個執行緒的例子。 cothread.py
例子解析:第一部分:先新建一個佇列。然後定義一個永久迴圈的執行緒;這個執行緒可以將其中的元素拉出訊息佇列,然後發送到目標裡面。第二部分:接受上面送來的元素,並通過佇列,將他們傳送進執行緒裡面。其中用到了GeneratorExit ,使得執行緒可以正確的關閉。
Hook up:cothread.py
但是:添加執行緒讓這個例子慢了50%
目標!協程+子進程
我們知道,進程之間是不共用系統資源的,所以要進行兩個子進程之間的通信,我們需要通過一個檔橋接兩個協程。
程式通過sendto()和recvfrom()傳遞檔。
和環境結合的協程:
使用協程,我們可以從一個任務的執行環境中剝離出他的實現。並且,協程就是那個實現。執行環境是你選擇的執行緒,子進程,網路等。
需要注意的警告 :
創建大量的協同程式,執行緒和進程可能是創建 不可維護 應用程式的一個好方法,並且會減慢你程式的速度。需要學習哪些是良好的使用協程的習慣。
在協程裡send()方法需要被適當的同步。
如果你對已經正在執行了的協程使用send()方法,那麼你的程式會發生崩潰。如:多個執行緒發送資料進入同一個協程。
同樣的不能創造迴圈的協程:
堆疊發送正在構建一種調用堆疊(send()函數不返回,直到目標產生)。
如果調用一個正在發送進程的協程,將會拋出一個錯誤。
send() 函數不會掛起任何一個協程的執行。
第五部分:任務一樣的協程
Task的概念
在併發程式設計中,通常將問題細分為“任務”。 “任務”有下面幾個經典的特點: * 擁有獨立的控制流。 * 擁有內在的狀態。 * 可以被安排規劃/掛起/恢復。 * 可與其他的任務通信。 協程也是任務的一種。
協程是任務的一種:
下面的部分 來告訴你協程有他自己的控制流,這裡 if 的控制就是控制流。
需要解決的問題(還在複習微嵌知識)
CPU執行的是應用程式,而不是你的作業系統,那 沒有被CPU執行的作業系統 是怎麼控制 正在運行的應用程式 中斷的呢。
中斷(interrupts)和陷阱(Traps)
作業系統只能通過兩個機制去獲得對應用程式的控制:中斷和陷阱。 * 中斷:和硬體有關的balabala。 * 陷阱:一個軟體發出的信號。 在兩種狀況下,CPU都會掛起正在做的,然後執行OS的代碼(這個時候,OS的代碼成功插入了應用程式的執行),此時,OS來切換了程式。
中斷的底層實現(略…碼字員微嵌只有70分��♀️)
中斷的高級表現:
* 中斷(Traps)使得OS的代碼可以實現。* 在程式運行遇到中斷(Traps)時,OS強制在CPU上停止你的程式。* 程式掛起,然後OS運行。表現如下圖:
每次中斷(Traps)程式都會執行另一個不同的任務。
任務調度(非常簡單):
為了執行很多工,添加一簇任務佇列。
啟示(很重要):
BB了這麼多微嵌的內容,得到的是什麼結論呢。類比任務調度,協程中yield聲明可以理解為中斷(Traps)。當一個生成器函數碰到了yield聲明,那函數將立即掛起。而執行被傳給生成器函數運行的任何代碼。如果你把yield聲明看成了一個中斷,那麼你就可以元件一個多工執行的作業系統了。
第七部分:讓我們建一個作業系統。【起飛了,請握好扶手
目標:滿足以下條件建成一個作業系統。
1. 用純python語句。2. 不用執行緒。3. 不用子進程。4. 使用生成器和協程器。我們用python去構建作業系統的一些動機:
* 尤其在存在執行緒鎖(GIL)的條件下,在執行緒間切換會變得非常重要。我要高併發!* 不阻塞和非同步I/O。我要高併發!* 在實戰中可能會遇到:伺服器要同時處理上千條用戶端的連接。我要高併發!* 大量的工作 致力於實現 事件驅動 或者說 回應式模型。我要組件化!* 綜上,python構建作業系統,有利於瞭解現在高併發,組件化的趨勢。第一步:定義任務
定義一個任務類:任務像一個協程的殼,協程函數傳入target;任務類僅僅有一個run()函數。 pyos1.py
在foo中,yield就像中斷(Traps)一樣,每次執行run(),任務就會執行到下一個yield(一個中斷)。
第二步:構建調度者
下面是調度者類,兩個屬性分別是Task佇列和task_id與Task類對應的map。schedule()向佇列裡面添加Task。new()用來初始化目標函數(協程函數),將目標函數包裝在Task,進而裝入Scheduler。最後mainloop會從佇列裡面拉出task然後執行到task的target函數的yield為止,執行完以後再把task放回佇列。這樣下一次會從下一個yield開始執行。 pyos2.py
第三步:確定任務的停止條件
如果,target函數裡面不是閉環,那麼上面的代碼就會出錯。所以我們對Scheduler做改進。添加一個從任務佇列中刪除的操作,和對於StopIteration的驗證。 【對scheduler做改進的原因是任務的性質:可以被安排規劃/掛起/恢復。】
第四步:添加系統調用基類。
在OS中,中斷是應用程式請求系統服務的方式。在我們的代碼中,OS是調度者(scheduler),而中斷是yield。為了請求調度者服務,任務需要帶值使用yield聲明。 pyos4.py
代碼解析: 1. 如果taskmap裡面存在task,就從ready佇列裡面拿任務出來,如果沒有就結束mainloop。 2. 【就是傳說中的系統調運部分】ready佇列裡面的task被拿出來以後,執行task,返回一個result物件,並初始化這個result物件。如果佇列裡面的task要停止反覆運算了(終止yield這個過程)就從佇列裡刪除這個任務。 3. 最後再通過schedule函數把執行後的task放回佇列裡面。 4. 系統調用基類,之後所有的系統調用都要從這個基類繼承。
第4.5步:添加第一個系統調用
這個系統調用想返回任務的id。 Task的sendval屬性就像一個系統調用的返回值。當task重新運行的是後,sendval將會傳入這個系統調用。 pyos4.py
理解這段代碼的前提: (非常重要) 1. send()函數有返回值的,返回值是yield運算式右邊的值。在本段代碼中,result的返回值是yield GetTid()的GetTid的實例或者是yield後面的None。 2. 執行send(sendval)以後,sendval被傳入了yield運算式。並賦給了mytid,返回GetTid()給ruselt。
執行順序: 先創建一個調度者(Scheduler),然後在調度者裡面添加兩個協程函數:foo(), bar(),最後觸發mainloop進行協程的調度執行。
系統調用原理: 系統調用是基於系統調用類實現的,如GetTid類,其目的是傳出自己的tid。傳出自己的tid之後,再將task放回佇列。
第五步:任務管理
上面我們搞定了一個GetTid系統調用。我們現在搞定更多的系統調用: * 創建一個新的任務。 * 殺掉一個已經存在的任務。 * 等待一個任務結束。 這些細小的相同的操作會與執行緒,進程配合。
1. *創建一個新的系統調用*:通過系統調用加入一個task。網路服務器的搭建:
現在已經完成了: * 多工。 * 開啟新的進程。 * 進行新任務的管理。 這些特點都非常符合一個web伺服器的各種特點。下面做一個Echo Server的嘗試。
但問題是這個網路服務器是I / O阻塞的。整個python的解譯器需要掛起,一直到I/O操作結束。
非阻塞的I/O
先額外介紹一個叫Select的模組。select模組可以用來監視一組socket連結的活躍狀態。用法如下:
下面實現一個非阻塞I/O的網路服務器,所用的思想就是之前所實現的Task waiting 思想。
源碼解析:__init__裡面的是兩個字典。用來存儲阻塞的IO的任務。waitforread()和waitforwrite()將需要等待寫入和等待讀取的task放在dict裡面。這裡的iopoll():使用select()去決定使用哪個檔描述器,並且能夠不阻塞任意一個和I/O才做有關係的任務。poll這個東西也可以放在mainloop裡面,但是這樣會帶來線性的開銷增長。 詳情請見: Python Select 解析 - 金角大王 - 博客園
添加新的系統調用:
更多請見開頭那個連接裡面的代碼:pyos8.py
這樣我們就完成了一個多工處理的OS。這個OS可以併發執行,可以創建、銷毀、等待任務。任務可以進行I/O操作。並且最後我們實現了併發伺服器。
第八部分:協程棧的一些問題的研究。
我們可能在使用yield的時候會遇到一些問題:
讓我們來看看這個叫蹦床的奇淫巧技。
代碼:trampoline.py
整個控制流如下:
我們看到,上圖中左側為統一的scheduler,如果我們想調用一個子執行緒,我們都用通過上面的scheduler進行調度。
千萬不要一個函數裡面包含兩個或多個以上的功能,比如函數是generator就是generator,是一個coroutine就是一個coroutin。
謝謝閱讀!
如有侵權請聯繫小編刪除哦!