体はドクペで出来ている

インフラ、Goの割合が多い技術ブログ

大量に登録されているDatastoreのEntityをElasticsearchに投入する

背景

とあるWebアプリがAppEngine (Golang)で稼働しており、Datastoreに大量の記事(約500,000件)が蓄積されている。
これはRSS Feedから取得した記事でOGPのタイトルや概要が含まれている。
記事のタイトルと概要に対し全文検索を行う要件が新たに発生したため、記事取得時にElasticsearch (ElasticCloud) へ同期する動作に改修した。

今後新規に取得する記事についてはこれで問題無いが、既存データは別途Elasticsearchへまとめて投入を行う必要があり、これを実装した際に得た知見を記事にする。
なお基本的に1度しか使わない機能なので少々雑な仕組み、コードであっても簡便迅速に作成することを重視して作成した。

また、ElasticCloudが現状では日本リージョンに対応していないためAppEngineは日本、Elasticsearchはオレゴンという構成になっている。
スペックは下記画像の通り(最小構成)

f:id:ryo-yamaoka:20180116010106p:plain

用語

  • Elasticsearch
    • 日本語にも対応しているOSS全文検索エンジン
    • Indexing
      • Elasticsearchへ記事を登録すること
    • Document
      • Elasticsearchに登録された記事(RDBのレコードに類似する概念)
  • Bulk API
    • ElasticsearchのDocumentを複数まとめて登録・削除するためのAPI
    • 大量のDocumentを処理する際にはこれを使うことが推奨されている
    • 独立したJSONが2行で1セットとなる独特の書式を使う
      • JSONを途中で改行してはならない
      • Delete処理のみは1行1Document

手法

基本的な動作

Datastoreの当該Kindに含まれるEntity全てを一定数毎にページングしながら取得し、それをBulk APIの仕様に沿ったJSONに成形して投入する。
AppEngineにバッチ機能を作成し所定のURLにGETリクエストを送ることをトリガーに動作を開始する。
開始されるとDatastoreから取得したEntityをIndexingするTaskQueueを順次起動し非同期にIndexingを行い、何らかの理由で失敗した場合は3回までリトライを行う。

考慮した点、発生した問題、対策

Entity数が大量であること

大量のIndexingを行う際はBulk APIを使うことが推奨されている。
しかし全Entityを1つにまとめるとデータが大き過ぎてAppEngineでOOMが発生したりElasticsearchが受け付けられない可能性があるため、最終的に128個のEntityを1つとしてIndexingを行った。

Datastoreからの取得に時間がかかる

単純にAppEngineで処理を行おうとすると1リクエスト30秒以内の制限に抵触して中断されてしまうことが予想されたため、IndexingするTaskQueueを起動するプロセスもTaskQueue化しある程度長時間稼働できるようにした。

IndexingするTaskQueueを発行するプロセスがクラッシュする

当初はもっと多量(256個や512個)のEntityをまとめてIndexingしようとしたが、TaskQueueを発行するプロセスがクラッシュしてしまう事象が発生した。
これを解析し対策を行うことも検討したが、ログに何も出力されておらず(HTTP500が発生した旨のみが記録されていた)解析に時間がかかりそうだったこと、128個で試したところ安定動作しパフォーマンスも十分であったため無視した。

結果

全Entity約500,000件を凡そ5分15秒(約1,600Index/sec)でエラー無く登録完了した。
TaskQueueの同時実行数は最大でも3,4個程度(目視確認)。
Indexing中にElasticsearchのパフォーマンスを確認したところ最大CPU使用率は20%程度、JVM Heapも50%前後で特に問題は発生しなかった。

00:32(赤い線の少し右側)に投入処理を開始している
f:id:ryo-yamaoka:20180116010137p:plain
f:id:ryo-yamaoka:20180116010152p:plain

所感

AutoscalingのTaskQueueは10分の制限があるため、今回の処理に要した時間から計算するとこれ以上のEntity数では厳しそうです。
Datastoreからの読み出し(とJSON生成)処理がボトルネックであり小手先での高速化が難しく、根本的にはTaskQueueのワーカーサービスを別モジュールとして切り出し最大24時間まで処理できるBasic or Manual Scalingを使用する必要があると思います。
このままの仕組みで24時間まで処理が続行できるとすると、単純計算で 1,600Index/sec × 86,400sec = 138,240,000Entity までは何とかなります。

大陸間通信だったのでネットワークで何か問題が起きるのではと危惧していましたが、実際にやってみるとTaskQueueの滞留等は見られませんでした。
流石Google様の自前ネットワークといったところでしょうか?

Bulk APIに投入するJSONの書式はとてもキモくて独特で困惑しましたが、JSONのパース負担を削減するためというのが理由とのことでした。

Multidocument Patterns | Elasticsearch: The Definitive Guide [2.x] | Elastic

ソースコードサンプル

5つのDocumentを生成してBulk API用のJSONを生成するサンプルです

The Go Playground

参考資料

Bulk API | Elasticsearch Reference [6.1] | Elastic

elasticsearchのBulk APIでは改行してはならない - sora_sakakiのブログ

【elasticsearch】idを指定せずに、bulk-insert - Qiita

Push Queues in Go  |  App Engine standard environment for Go  |  Google Cloud Platform