大量に登録されているDatastoreのEntityをElasticsearchに投入する
背景
とあるWebアプリがAppEngine (Golang)で稼働しており、Datastoreに大量の記事(約500,000件)が蓄積されている。
これはRSS Feedから取得した記事でOGPのタイトルや概要が含まれている。
記事のタイトルと概要に対し全文検索を行う要件が新たに発生したため、記事取得時にElasticsearch (ElasticCloud) へ同期する動作に改修した。
今後新規に取得する記事についてはこれで問題無いが、既存データは別途Elasticsearchへまとめて投入を行う必要があり、これを実装した際に得た知見を記事にする。
なお基本的に1度しか使わない機能なので少々雑な仕組み、コードであっても簡便迅速に作成することを重視して作成した。
また、ElasticCloudが現状では日本リージョンに対応していないためAppEngineは日本、Elasticsearchはオレゴンという構成になっている。
スペックは下記画像の通り(最小構成)
用語
- Elasticsearch
- Bulk API
手法
基本的な動作
Datastoreの当該Kindに含まれるEntity全てを一定数毎にページングしながら取得し、それをBulk APIの仕様に沿ったJSONに成形して投入する。
AppEngineにバッチ機能を作成し所定のURLにGETリクエストを送ることをトリガーに動作を開始する。
開始されるとDatastoreから取得したEntityをIndexingするTaskQueueを順次起動し非同期にIndexingを行い、何らかの理由で失敗した場合は3回までリトライを行う。
考慮した点、発生した問題、対策
Entity数が大量であること
大量のIndexingを行う際はBulk APIを使うことが推奨されている。
しかし全Entityを1つにまとめるとデータが大き過ぎてAppEngineでOOMが発生したりElasticsearchが受け付けられない可能性があるため、最終的に128個のEntityを1つとしてIndexingを行った。
Datastoreからの取得に時間がかかる
単純にAppEngineで処理を行おうとすると1リクエスト30秒以内の制限に抵触して中断されてしまうことが予想されたため、IndexingするTaskQueueを起動するプロセスもTaskQueue化しある程度長時間稼働できるようにした。
IndexingするTaskQueueを発行するプロセスがクラッシュする
当初はもっと多量(256個や512個)のEntityをまとめてIndexingしようとしたが、TaskQueueを発行するプロセスがクラッシュしてしまう事象が発生した。
これを解析し対策を行うことも検討したが、ログに何も出力されておらず(HTTP500が発生した旨のみが記録されていた)解析に時間がかかりそうだったこと、128個で試したところ安定動作しパフォーマンスも十分であったため無視した。
結果
全Entity約500,000件を凡そ5分15秒(約1,600Index/sec)でエラー無く登録完了した。
TaskQueueの同時実行数は最大でも3,4個程度(目視確認)。
Indexing中にElasticsearchのパフォーマンスを確認したところ最大CPU使用率は20%程度、JVM Heapも50%前後で特に問題は発生しなかった。
所感
AutoscalingのTaskQueueは10分の制限があるため、今回の処理に要した時間から計算するとこれ以上のEntity数では厳しそうです。
Datastoreからの読み出し(とJSON生成)処理がボトルネックであり小手先での高速化が難しく、根本的にはTaskQueueのワーカーサービスを別モジュールとして切り出し最大24時間まで処理できるBasic or Manual Scalingを使用する必要があると思います。
このままの仕組みで24時間まで処理が続行できるとすると、単純計算で 1,600Index/sec × 86,400sec = 138,240,000Entity までは何とかなります。
大陸間通信だったのでネットワークで何か問題が起きるのではと危惧していましたが、実際にやってみるとTaskQueueの滞留等は見られませんでした。
流石Google様の自前ネットワークといったところでしょうか?
Bulk APIに投入するJSONの書式はとてもキモくて独特で困惑しましたが、JSONのパース負担を削減するためというのが理由とのことでした。
Multidocument Patterns | Elasticsearch: The Definitive Guide [2.x] | Elastic
ソースコードサンプル
5つのDocumentを生成してBulk API用のJSONを生成するサンプルです
参考資料
Bulk API | Elasticsearch Reference [6.1] | Elastic
elasticsearchのBulk APIでは改行してはならない - sora_sakakiのブログ
【elasticsearch】idを指定せずに、bulk-insert - Qiita
Push Queues in Go | App Engine standard environment for Go | Google Cloud Platform