Apache Beam & Dataflow 撞牆筆記

Apache Beam & Google Cloud Dataflow 撞牆筆記

介紹

Apache beam 是一套開發工具,可以用來設計並描述平行化資料處理的 pipeline。描述好的 pipeline 則會用到分散式處理的後端來執行,例如 Google Cloud Dataflow,它可以依據設定分派 Google Cloud 上的運算資源來完成運算,還會依據流量來自動調節資源使用的多寡。若你的程式包含了資料量龐大且互不相依的資料處理,使用 Apache Beam + Dataflow 來達成分散式處理進而縮短程式執行時間,會是個很好的選擇。

Apache Beam 與 Google Cloud Dataflow 都有詳盡且親切的官方文件可以參考,甚至還有手把手教你的 Example walk-through,因此這篇不再贅述如何使用這兩項工具。這篇將會著重在我自己使用這套工具時所遇到的問題,以及我的解決方式。其實這兩項工具並非主流,網路上的討論度也不算高,所以滿多解法都是我自己硬幹出來的,我沒辦法保證這是最好的做法,如果有更好的做法歡迎指正,我也會趕緊更新的。

入門學習資源

這份文件介紹如何使用 Python 或 Java 編寫 pipeline:Apache Beam Programing Guide

這篇則示範如何建立一個 pipeline 用來計算某文件內每個字的出現頻率:Wordcount Example
(統計文件字數可以把文件分成數個 batch 互不相依地平行統計,最後再加總就行,所以適合用 pipeline 處理)

使用 Apache Beam 建立好的 pipeline 已經可以在你的 local 電腦正確地執行了,但沒有運用到分散式運算資源的話當然不可能縮短執行的時間。使用 Google Cloud Dataflow 則可以讓它支配 Google Cloud 上的運算資源執行你的 pipeline

這份文件介紹如何使用 Dataflow,包括部署、執行、監控 pipeline 等等:Dataflow How-to Guideline

這篇則示範用 Apache Beam, Dataflow 配上 Tensorflow 做機器學習:Molecules Walkthrough
(最主要是在第二步驟的 preprocess 部分,它示範了更複雜的 pipeline 架構以及涵蓋了讀檔寫檔)

如果要使用 GCP 上的運算資源的話,記得要先購置好足夠的 CPU、memory 等 quota 在你的 GCP project 上,並且開啟權限讓 Dataflow 支配。

撞牆經驗分享

其實用 Apache Beam 編寫好的 pipeline 很容易就能正確地在 local 執行,會出問題的都在於連接遠端的分散式的機器上面。使用 Dataflow 等於是要奴役那些在 GCP 上面的 worker virtual machine,而那些 worker VM 都是為了跑 pipeline 而臨時組出的空機器,且我們沒有機會預先連上去安裝什麼套件或上傳什麼檔案。因此這些都要特別處理。

Dependency

舉例來說,我們的程式可能會用到 numpy 以及 tensorflow,因此在本地我們會先 pip install 這些套件,然而雲端上的 worker VM 裡面沒有這些套件,因此在執行我們所寫的 import numpy 時就會出問題。要讓雲端上的 worker VM 能夠順利 import 這些套件,我們需要準備一份 setup.py,並且把程式裡面所有會用到的 python 套件都羅列進去。這樣一來,只要在執行程式時加入參數就行,例如

python main.py --setup_file ./setup.py

關於 setup.py 裡的格式,可以參考 Molecules Walkthrough 的 setup.py ,以及詳細的 Apache Beam 文件 - dependencies。建議在羅列套件時可以在每個套件後面加上和自己本地相同的版本號,雖然如果不加版本號的話,VM 會預設是最新的版本,但如果 VM 上裝的版本和你本地的版本不同,就可能產生不相容的問題,例如你依據 a 版本的 API 寫出的 code 從 b 版本的角度是錯誤的 code 等等。

至於有些套件必須要用 apt-get install 安裝,那就比較棘手了。Apache Beam 的文件裡面有寫道,這些額外的安裝指令必須寫在 setup.py 的 Custom command 裡,這份 Juliaset 的 setup.py 裡面就有詳細的範例和解說。有些 python 套件確實要先安裝 non-python 的套件才能安裝,例如 openslide 就是個例子。

鬼畜的是 Dataflow 若在這一步出問題,它不會顯示任何可辨認的錯誤訊息,而是會讓程式一直零 throughput 地空轉,我當時花了好幾天抽絲剝繭才發現是我沒安裝某個 non-python 套件的問題,真是太崩潰了。

檔案共享交換

如果我們的 pipeline 中間有涉及檔案讀寫,就必須把檔案存放在 worker VM 們能夠存取的地方。雖然 pipeline 在本地執行時可以自由存取本地的檔案,但放上遠端執行時,遠端的機器沒辦法讀本地的檔案,運算完的結果也沒辦法存回本地。因此我們可以利用雲端的空間來達成檔案交換,例如 Google Cloud Storage 就是個常用的選擇。

關於如何使用 Google Cloud Storage,包含如何建立 bucket 以及上傳檔案,可以參考 Google Cloud Storage 的文件。由 bucket 名稱以及檔案在 bucket 裡面的路徑組成的 URL 就可以讓你的程式在本地和遠端都能存取這個檔案了,記得要設定好權限。

以學習資源 section 提到的 Wordcount Example 為例,他的程式會讀取預先上傳到雲端的 gs://dataflow-samples/shakespeare/kinglear.txt 作為 input,中間做了 pipeline 一系列的計算字數的運算後,再把結果存到 gs://my-bucket/counts.txt 供人瀏覽或下載。再以 Molecules Walkthrough 的 preprocess.py 為例,他的程式也會讀取 Cloud Storage 裡的 Tensorflow 檔案,經過 pipeline 一系列的處理,再把處理好的訓練資料以 Tensorflow 檔案存在 Cloud Storage。

Apache Beam 有提供方便的API,可以直接讀取 Cloud Storage 的檔案成為 pipeline 物件 (也就是所謂的 PCollection) 的型態,可以參考 apache_beam.io 的官方文件,例如 apache_beam.io.ReadFromTextapache_beam.io.WriteToText 可以用來存取文字檔案;而 apache_beam.io.tfrecordio.ReadFromTFRecordapache_beam.io.tfrecordio.WriteToTFRecord 可以用來存取 TFRecord 格式的檔案。

除了使用 Apache Beam 的 API 來直接把 Cloud Storage 的檔案讀成 PCollection 型態以外,也有一些套件會提供 API 來讀取 Cloud Storage 的檔案,例如 Tensorflow 就有tf.gfile.Open 等 API 可以用來讀取 Cloud Storage 上面的檔案。

然而除了文字檔或 TF 檔之外,大部分格式的檔案都沒有什麼 python 的 API 可供直接讀寫 Cloud Storage 的檔案,例如處理圖片常需要用到的 imread 就沒有辦法,只能讀取本地的檔案。這種情況只好特地寫程式來下載或上傳檔案,幸好 python 有一套名為 google-cloud-storage 的套件,提供了各種 API 來對 Cloud Storage 做操作,包含建立 Client、依照路徑建立 Blob、上傳和下載等等,可以參考 google-cloud-storage 的文件。這樣一來,儘管不能直接讀取 Cloud Storage 的檔案,也可以用程式先下載到機器上再用正常方法讀取;儘管不能把檔案直接寫入 Cloud Storage,也可以先用正常方法存在機器上,再用程式上傳回 Cloud Storage。

再回到 worker VM 需要下載檔案來用的問題。worker VM 被使喚去處理 pipeline 的工作時是依據我們所寫的 DoFn (用來描述 pipeline 的 stage) 來工作,因此我們可以把「下載檔案」的動作也寫在 DoFn 裡面。Apache Beam 的文件 - ParDo with DoFn 就介紹了 DoFn 裡面有哪些 function 可以 override,以及適用於哪些操作。而下載檔案的程式就適合寫在 DoFn.setup() 裡面,因為 setup 這個 function 會在 DoFn 初始化時被呼叫,所以可以保證「下載」進行在「開始工作」前,而且只會下載一次。白話一點可以想成 worker VM 準備好要開始處理 pipeline 的某個 stage 的時候,先把該 stage 會用到的檔案都先下載好,就能開始不斷地處理該 stage 的工作。

禁用 Public IP

Dataflow 在奴役 worker VM 時,不只會消耗我們的 CPU 和記憶體 quota,還預設會占用 Public IP,這是因為奴隸們和主人需要有聯絡的管道。因此假設我們購置了 200 個 CPU、指定 Dataflow 參數為每台 worker 一個 CPU,並且期望看到總共有 200 個 worker 為我們工作,然而如果我們的 Public IP quota 只有 100 個,那麼依然只會有 100 個 worker 能夠工作。但這其實是不該發生且不合邏輯的,因為奴隸們只需要和主人溝通就好,根本不會需要對外溝通,或說我們根本不會需要 SSH 進這些 worker VM 做任何操作,因此它們不該占用 Public IP。

要解決這個問題,我們可以在 GCP 上面新增 VPC network,並且取消掉 Dataflow 的 Public IP,讓機器們使用這個 network 來溝通,可以參考 Dataflow 的文件 - Network,而如何指定 Dataflow 的各種參數則可以參考 Dataflow 的文件 - Parameters ,包含 disable public IP 以及指定 network 或 subnetwork。