ニートの言葉

元ニートがやってみたこと・その過程で学んだこと・考えたこと・技術メモあたりを主に書いています。情報革命が起きた後に訪れるであろう「一億総ニート時代」の生き方を考え中です。

【Kubernetes】1週間かかる処理を1.5時間で終わらせた【並列処理】

こんにちはあんどう(@t_andou)です。

今回はKubernetesを使って並列処理させた記録です。

まだ「とりあえずそれっぽく動くまで試してみた」という段階で、kubernetesを理解できてはいないので自分用のメモを公開しているという認識でご覧ください。

間違っている部分や、よりスマートなやり方がありましたらご指摘いただけると幸いです。

この記事の概要

機械学習に使う特徴量の作成で1週間かかりそうな処理を10分くらいで終わらせられないかと考え、GKE(=GoogleのKubernetes環境)を使い試行錯誤した記録です。

今回は一部失敗して完了時間が1.5時間になったものの、設定を上手く出来れば15分程度で終わる見込みです。

 

対象読者

・Kubernetesの概要は知っているくらいのレベルの人
・KubernetesのJobを使った並列処理をしたい人

目次

背景

機械学習で使う学習用データ(特徴量)の作成にかかる時間がとても長かったので短縮したいと考えていました。

どのくらい時間がかかっていたのか

約10年分のデータで15万件近くのデータセットがあり、1件あたり処理するのに早くて1秒・遅いと1分近くかかっていました。

平均すると4秒/件くらいだったので、4秒*15万件=600,000秒 = 6.94日。ほぼ1週間かかる想定。

一度きりの処理ならまだ良いかもしれないですが、今後も試行錯誤しながら特徴量作成を行いたいため、毎回この時間を待つのは厳しい…

どうやって処理時間を短縮するのか

すごく極端な考えとしては、15万件それぞれを1件ずつ独立して処理出来るようにして、Kubernetesで並列処理を行えば、オーバーヘッドの時間(NodeやPodの準備などで数分)+処理時間(最長1分)として10分くらいで完了するのでは?と考えました。

ちなみにGKEの上限を見ると、1Node Poolあたり最大1000台のNodeまで立てることが出来て、それぞれに110Podまで乗せることが出来るようなのでNodePoolを二つ用意すればなんとか実現できそうです。

ただし、詳細は後述しますが「やってみたこと その1」のような構成だと15万podがMySQLへ同時接続するのは多すぎると判断し、今回は1Podあたり1日分(30~50件)のデータ処理することにしました。
本記事執筆時には2019年は10月分までしかデータが無いため、正確には10年分ではないですが、約3500~3600程度のPodで並列処理をすることになります。

なぜKubernetesなのか

流行っているので触りたかったからです。

SparkやHadoop(どちらも聞いたことあるだけ)などもあるようですが、Kubernetesを触ってみることも目的の一つだったため今回はk8sを選択しました。

やってみたこと その1:処理部分を独立させた(失敗)

まずはこちらの記事を参考に処理する部分を切り出してキューを使って並列処理するようにしました。

もともとデータベースをConohaに置いているので、k8sの部分はGKE(=GoogleのKubernetes環境)を利用し、データはConohaのMySQLから持ってくるような構成にしました。

構成

処理の流れ

1.処理したい日付の一覧をキューに入れる(ここはシェルスクリプトを作成して、手で実行しました)
2.処理をするPodはキューから日付を受け取り、該当するデータをMySQLから取得
3.MySQLから取得したデータをもとに処理をして、MySQLの特徴量テーブルへinsert

困ったこと

MySQLへの接続部分

想定していた通り、MySQLへの同時接続数が問題になりました。

3000以上の同時アクセスがあり、さらにそれらのPodがJOINなどの処理を要求してくるためメモリ2GBのVPS上に建てているDBでは耐えられなかったようです。

でも、特徴量作成のためだけにMySQLのチューニングを頑張ったりサーバーのスペックを上げるのは嫌でした。

なので、別の方法を考えました。 

やってみたこと その2:データをGCSにダンプした(失敗)

MySQLへの同時接続がボトルネックになっていると分かったので、データをpandasのDataFrame形式でzipでダンプし、GCS(=Googleのストレージ)におきました。

構成

処理の流れ

1.処理したい日付の一覧をキューに入れる(ここはシェルスクリプトを作成して、手で実行しました)

2.各Podは全データをGCSからダウンロード
3.Podはキューから受け取った日付に応じてデータの処理し、DataFrameのzipとして保存
4.GCSへzipをアップロード

困ったこと

デフォルトではGCSへの書き込み権限が無い

クラスタ作成時に各Node→GCSへのアクセス権がデフォルトだと読み込みのみになっているため、node pool作成時に書き込み権限が必要だった。

→クラスタ作成時に --scopes=gke-default,storage-rw を付けることで解決

Nodeの容量不足

各PodがGCSからデータをダウンロードしてくる仕様にしていたため各Nodeのストレージ容量(10GB)を一杯に使ってしまい、容量不足で止まってしまった。 

やってみたこと その3:永続ボリュームにデータを配置した(成功)

(ここから先は本当によく分からないまま進めました。)

その2の時にはデータを各Podにダウンロードする仕様にしていましたが、実際には全てのコンテナで同じデータを読み込んでいるので、わざわざPod内にダウンロードしてくる必要は無いと判断しました。

そこで、Kubernetesの永続ボリュームというものを使い、マウントして各Podからはそこを参照するようにしました。(そうすることで各Podからは起動時に既にローカルにデータがあるような状態になっているはず)

構成

処理の流れ

1.処理したい日付の一覧をキューに入れる(ここはシェルスクリプトを作成して、手で実行しました)

2.永続ボリューム用にディスクを作成、VMにマウントしてデータを配置した(詳細は後述)

3.Podはキューから受け取った日付に応じてデータの処理し、DataFrameのzipとして保存

4.GCSへzipをアップロード

困ったこと

永続ボリュームの読み書き権限周り

GKEでは複数のNodeからの読み書き(ReadWriteMany)が出来ず、一つのNodeだけから読み書きが出来るReadWriteOnceか、複数のNodeから読み込みのみ可能なReadOnlyManyしか選べませんでした。

今回は初回のみデータを書き込んで、その後は複数Nodeから読み込みだけを行うという流れにしたかったのですが、永続ボリュームを作った後に設定の変更の仕方が分からず苦戦しました。(未解決)

結局、今回はこちらの記事を参考に、ディスクを作成し手作業でマウントしてデータを配置。そのディスクを各PodへReadOnlyManyでマウントさせることで対応しました。

job側でも readOnly: true を設定する必要がありました

使えるvm数に制限がかけられていたのでリクエストが必要だった

GCPではリソース(借りられるVMインスタンスの数・CPUの数)に初期値で上限が設定されていました。

詳細をメモし忘れたのですが、ロケーションごとに色々と設定されているので、適切に上限緩和のリクエストを送らないと数十台のVMをオートスケールしようとすると用意されずにPodが起動待機の状態になります。

上限緩和リクエストの際に理由を書く必要があるのですが「GKEで3000Podの並列処理をしたいので、n1-standard-96が100台必要だから」みたいな曖昧な理由でも通りました。

詳細はこちらのページに記載されています

リソースの割り当て  |  Compute Engine ドキュメント  |  Google Cloud

完成したymlとシェルスクリプト

その3の構成まで試した結果、なんとかそれっぽく動く状態になったので、ymlを残しておきます。

クラスター作成

gist.github.com

永続ボリューム

gist.github.com

ジョブ実行

gist.github.com

かかった料金

クラウドを使う際に気になる料金関係をまとめます。

結論としては、今回色々と試行錯誤したのですが、トータルで1600円程度でした。こまめにクラスタを削除していたからかもしれないですが、だいぶ安く済みました。

Kubernetesのマスター管理料金:無料

AKE(AWSのKubernetes環境)ではKubernetesのマスターを稼働させておくだけで$0.2/hが課金されるようですが、GKEでは無料でした。安心。

https://cloud.google.com/kubernetes-engine/pricing?hl=ja

データ転送料:ほぼ無料

コンテナレジストリからイメージをpullする時やGCSからPodへデータを転送する際に3600PodでGB単位データが行き来するので、ネットワークの転送料が結構かかるかもしれないと思っていましたが、調べたところ同じロケーション内であれば無料でした。

今回はGKEもGCSも全てus-westで統一しましたが、もしもここのロケーションが違っていたら1回の試行で3~4千円くらいのネットワーク料金がかかるのではないかと思います。

データのストレージへの保管料金:数円

細かくは見ていないですが、他のプロジェクトで数GBのデータを1ヶ月置いても100円もいかない程度ですので、今回の試行錯誤に使ったデータだけでいうと10円とかその程度だと思います。

VMの料金

今回かかった費用はほとんどここです。

まず、ロケーションによってだいぶ値段が違いました。(東京だと$1.2720/hのマシンがオレゴンだと$0.9600/hと25%くらい割引で使えます。)

レスポンスの速度が重要なWebサービスなどは国内に置きたいですが、今回のような処理であれば海外でも良いと判断してオレゴンのVMを使いました。  

VMの料金見積もり

1Podあたり1vCPU/メモリ2GBだとして、3600Podを動かしたい。

n1-standard-96だと1nodeで96個のPodを動かせそう*1なので、38台のマシンを借りることで3600Podでの並列処理が出来ると判断。

また、プリエンプティブルVM(いつ落ちるか分からないけれど安いVM)を使えば1時間あたり$0.96。全体の処理で20分くらいかかったとすると

1/3時間 * $0.96 * 38 =12.16 US$

日本円にすると1300~1400円くらいでしょうか。できればワンコインで処理完了させたかったですが、このくらいなら許容範囲。

実際にかかったのは961円でした。

合計金額

何度か小さく試行錯誤をしたものも含め、合計で1600円でした。

分からないこと 

いまだに分かっていないことをまとめておきます。

もしも解決策をご存知の方がいらっしゃいましたら教えていただけると幸いです。

ReadOnlyManyでPVCを作成する時に最初のデータ配置をどうするのか

今回はインスタンスを作って手作業で置きましたが、Kubernetesで完結させたい場合はどうすれば良いのかが分かっていません。

Init Containerという仕組みを使えないかと思ったのですが、PVCの権限関係とは無関係っぽいのでよく分かっていません。

根本的に理解がズレてる気がしますが…

jobsのCOMPLETIONSが途中で減ることがあった

処理が完了したPod数を示す「COMPLETIONS」ですが、途中から増えなくなるどころか減っていきました。

プリエンプティブルVMを使っていたので、途中でNodeが落とされて完了情報が落ちたのかも?完了情報はk8sマスターが管理してそうな気がしますが…よく分からないです。

処理から抜けているものが複数あった

データの質の問題でエラーが起きていた日付がいくつかあるのは確認できていた(処理に失敗したものはslackへ通知するようにしていた)のですが、それ以外にエラー通知が来ていないのに処理されていない日付が複数ありました。

原因は不明ですが、キューを持った状態でプリエンプティブルノードが落ちてしまったらそうなるのかもしれないです。(未調査)

今回は抜けた日付を再度回すことで対応可能ですが、厳格な管理が必要な場合はプリエンプティブルVMは使っちゃダメということになりそう?

キューの設定の仕方次第で対策できるのでしょうか。

GKEのアップデートが始まって時々接続できなくなった

クラスタを作成してしばらくは操作できるのですが、jobを開始して少しするとkubectlコマンドが効かなくなり、接続できないというエラーが出ました。

GKEのコントロールパネルを見ると次のような表示になっていたので、自動でアップグレードが始まってしまったようです。

 

この画面になっている数分間はnodeの削除もクラスタの削除もできません。 

数分間とは言えn1-standard-96が数台立ち上がった状態だと1000円くらいすぐにいっちゃうので困りました。

おそらくクラスター作成時に「自動アップグレードしない」という選択肢や「最新のマスターを使う」という選択があるんだと思いますが、今のところ回避策は不明です。 

追記

クラスター作成時に下記のWARNINGが出てました。--no-enable-autoupgradeを付けることで自動アップデートを回避できるようです。

WARNING: In November 2019, node auto-upgrade will be enabled by default for newly created clusters and node pools. To disable it, use the `--no-enable-autoupgrade` flag.

参考にした資料

下記の資料にすごく助けられました。ありがとうございます。

Webページ

キューの仕組み
Kubernetes 作業キューを使った並列ジョブ - Qiita

ディスクの作成・マウント
Google Cloudで追加ディスクのマウントを行う - Tug-uca’s blog

永続ボリュームの利用

【kubernetes】Podから永続ボリュームを利用する【GKE】 | taketiyo.log

プリエンプティブルVMを使ったクラスター作成
Preemptible InstanceでGKEクラスターのオートスケーリング - Ian Lewis

書籍

15Stepで習得 Dockerから入るKubernetes コンテナ開発からK8s本番運用まで (StepUp!選書)

Docker/Kubernetes 実践コンテナ開発入門

最後に

まだ理解できていないので完璧に動く状態ではないですが、一通りそれらしい動きができるようになりました。

処理時間について

当初はほぼ1週間=168時間かかる見込みだったので、1.5時間で終わったということは約110倍くらいの高速化になります。

実は今回はVMの上限制限をうまく緩和出来ておらず、38台借りたかったところが6台しか借りることが出来ませんでした。その状態で完了時間が1.5時間でしたので、もしも正しく緩和出来ていたら15分程度で完了するかと思います。

用途について

今回は機械学習で使う特徴量の作成に使いましたが、分散処理が出来るということはOptunaなどでも使えるため、機械学習の学習するフェーズでも使えそうです。

近いうちにLightGBMのハイパーパラメータ選択をする際にも使ってみたいと思っています。

おまけ:試行錯誤のツイート

一連の試行錯誤ツイートを貼っておきます

*1:正確には動かせる数が少しだけズレていましたが、誤差の範囲なので詳細は割愛