aeroastroの日記

Over the Sky, Into the Future.

ElasticsearchのIndexを本当の意味で無停止再構築する手法

※ この記事は Elastic stack (Elasticsearch) Advent Calendar 2017 の2日目の記事になります。

Elasticsearchを利用したサービスを運用している場合、既存機能の改修や仕様変更、新規機能の実装に伴い、運用中であってもindexや検索クエリを変更していくことは日々発生します。この際に、インデックスを再構築する必要性も出てきますが、インデックスを再構築し、そこに整合性のあるデータセットを準備するのには時間がかかります。その都度メンテナンスを行うのは非現実的であり、特に大規模サービスであれば準備にかかる時間自体も非現実的なものになりかねません。

このような問題への対処方法として、 Index Aliases を利用することで、透過的に無停止再構築を行うことが出来るというのは複数のブログ記事で紹介されています。これらはアプリケーションから直接indexを利用するのではなく、aliasを経由して利用することで、無停止再構築を行おうという手法です。

たしかに、shard数の変更であったり、 Analyzer や Tokenizer レベルの修正であれば、 Index Aliasを利用した無停止再構築は利便性の高い手法です。Index Alias が持つ透過性により、アプリケーションロジックを変更せずに、インデックスを再構築することが出来ます。

しかしながら、新規fieldの追加や既存fieldの変更/破棄を行い、検索クエリも変更するといったレベルでの開発を行う場合、Index Aliases を利用した無停止再構築は最善の手法ではなくなります。この記事においては、エンドユーザーにスムーズな価値提供を行うために、高い開発効率を維持しつつも、インデックスを無停止で再構築する手法について提案したいと思います。

TL;DR

  • 設計
    • RDBMSなどの他レイヤーでマスターとしてのデータ永続化層を実現し、Elasticsearchはデータを検索するための永続化層として責務を分担させる
    • アプリケーションでは、ModelやEntityに対して、Elasticsearchへのデータ保存・検索を担うRepositoryを定義する
    • Modelに対して m : n で Repository が定義でき、Repositoryは保存用・検索用で別のものが利用できるようにしておく (最低限の開発であれば 1 : n で良い)
    • Repositoryとindexは 1 : 1 の関係で利用できるようにしておく
  • インデックス再構築オペレーション
    • Elasticsearch上に新しいindexを構築し、新/旧2つのindexが利用可能な状態にする
    • 保存用のRepositoryを、新/旧2つのindexに対応する2つのRepositoryにする。この時点以降に操作されたデータは新indexにも保存され始める
    • データが充足されるまでは、検索用のRepositoryは旧indexのものを利用しつづける
    • バッチなどからBulk API を利用して、未操作のデータの同期を行う
    • この際、Operation Type を利用した楽観ロックを行うことにより、サービス経由で更新されるデータとの整合性を保ちつつ、未処理のデータについてのみ同期を行う
    • データ同期完了後、検索ロジックを変えたいタイミングで、検索用Repositoryを新index用のものに変更する
    • 任意のタイミングで旧indexへの保存を停止し、旧indexを破棄する

満たすべき無停止再構築の要件について

本記事で無停止再構築と言った場合に何を示しているかについて、以下に記述します。

  • 旧インデックスを利用するアプリケーションは、完全にアプリケーションが切り替わるまで、コードの修正なく正常に動作すること
  • 旧インデックスはインデックス再構築中も常に最新のデータで更新され続けること
  • 旧から新へのアプリケーションの切り替えはプロセス単位でアトミックに行われ、旧インデックスを参照している旧プロセスも正常に動作すること
  • 新インデックスはインデックス再構築中に更新されたデータも含めて最新の状態で投入されること
  • 新・旧それぞれのインデックス上に存在するデータは必要十分な量に限定され、移行に伴うオーバーヘッドやリスクが最小になること
  • 上記を満たした上で、開発・運用をより効率化すること

具体的な例について

抽象的な話ばかりでは分かりづらいということもあるため、以下のシチュエーションを簡単な例として取り上げます。

  • ユーザー user をアカウント名 account_name で検索するためにElasticsearchを利用していた
  • account_name の検索だけでは、非アクティブなユーザーもヒットしてしまうため、最終ログイン日 last_login が1ヶ月以内のユーザーに絞る機能が必要になった

例に限って言えば、一般的な無停止再構築の手法ではなく、別の解決策が適しているという点もありますが、この記事では一般的な解決策を紹介致します。 また、実装サンプルとして Ruby on Rails + elasticsearch-rails の組み合わせを取り上げたいと思います。

どうして Index Aliases が最善ではない場合があるのか?

Index Aliases が最善の選択肢ではなくなるのは、その透過性が無意味となる場合があるときです。 アプリケーションから参照しているエイリアス名は同一だが、Elasticsearchの内部的にはAliasが指しているIndexは異なるという点がIndex Aliasesのもともとの利点です。 mappingの外部構造にほとんど変化がなければ、aliasを通して保存されたドキュメントは登録されている全てのindexに保存され、検索クエリは登録されている全てのindexから行われます。

逆に言えば、新・旧のindexでfieldの型が大きく異なる場合や、新・旧のindexで存在しないfieldがある場合はこれらの利点は失われます。aliasを通したもう一方へのindexへの保存は、余分なデータの生成、誤ったmappingの自動作成、エラーなどにつながります。また、index と 検索クエリが一致しなければ、想定しない結果(つまりバグ)につながります。

ユーザー検索の例の場合、新・旧のindex中のmappings中のpropertiesは以下のようになります。

旧 index

{
  "user" : {
    "properties" : {
      "account_name" : {
        "type" : "keyword"
      }
    }
  }
}

新 index

{
  "user" : {
    "properties" : {
      "account_name" : {
        "type" : "keyword"
      },
      "last_login" : {
        "type" : "date"
      }
    }
  }
}

そして新たに検索クエリの filter に付け加える節は以下になります。

{ "range": { "last_login": { "gt": "2017/11/02" } } }

上記を見ていただくとわかるように、旧indexには last_login は存在しません。 その為、旧indexに last_login を保存しようとすれば、 dynamic の設定に依存しますが、性能上のオーバーヘッドやデータ不整合、エラーにつながります。 また、新 index に投げるはずのクエリが 旧index に投げられた場合は last_login がそもそも存在しないため、検索結果が0件になってしまいます。

一般的に、Aliasの切り替えを同時並列で実行されているアプリケーションでアトミックにハンドリングすることは難しく、Index Aliases を使った無停止再構築は破綻してしまいます。

設計

そこで、アプリケーションレイヤーで対象のindexを明示的に指定して扱う手法を本記事では提案します。

責務分担について

Elasticsearch を用いた検索を行う場合、システムとしてのデータ永続化は RDBMS などのElasticsearch外で行い、Elasticsearchは検索のための永続化層として責務を持たせます。これはElasticsearchを検索用のシステムとして最適化し、破壊的な変更や本来のデータ構造を示していないmappingであっても容易に作成可能にするためです。

Repositoryの設計について

責務分担を行った上で、以下のようなRepositoryの導入を行います。

f:id:aeroastro:20171202182707p:plain

Repositoryはアプリケーションで利用するModelもしくはEntityをElasticsearchの特定のindexに保存/検索するという責務を持ちます。このようにすることによって、旧indexと新indexに関わるロジックを疎結合に分け、それぞれを高い凝集度で実装することが可能になります。

また、Model と Repository を m : n の関係にすることによって、一つのModel をどちらの index から取得するか、どちらに保存するかという選択がアプリケーションレイヤーで簡単に操作できるようになります。

ユーザー検索の例に対して、Ruby on RailsElasticsearch::Persistence の Repository Pattern を利用したコード例を示します。 Elasticsearch::ModelRailsとの相性は良く充実した機能を持っていますが、無停止再構築を行う上では Elasticsearch::Persistence の方がおすすめです。

簡単の為に 1 : n の関係にしていますが、Repositoryクラスにindexに関する操作が集約されていることがわかります。

# Model class
class User < ActiveRecord::Base
  def self.repositories_to_save
    @repositories_to_save ||= [
      ::ElasticRepository::UserV1.new,
      # ::ElasticRepository::UserV2.new,
    ]
  end

  after_save do
    self.class.repositories_to_save.each do |repository|
      repository.save(self)
    end
  end
end

# Repository Class
module ElasticRepository
  # This unofficial inheritance usage increases flexibility especially when instance objects have their own customized settings
  class UserV1 < Elasticsearch::Persistence::Repository::Class
    def initialize
      client Elasticsearch::Client.new url: 'localhost:9200', log: true

      index 'user_v1'
      type 'user'

      settings number_of_shards: 1 do
        mapping do
          indexes :account_name, type: :keyword
        end
      end

      klass User
    end

    def deserialize(document)
      hash = document['_source']
      klass.new(account_name: hash['account_name']).tap(&:readonly!)
    end

    def serialize(document)
      {
        id: document.id,
        account_name: document.account_name,
      }
    end

    def search_by_account_name(account_name)
      search(
        query: {
          bool: {
            filter: [
              { term: { account_name: account_name } }
            ]
          }
        }
      )
    end
  end
end

# Save (Save Record to DB and Index Documents to Elasticsearch)
user = User.create!(account_name: 'foo')

# When searching with old logic and old index
ElasticRepository::UserV1.new.search_by_account_name('foo')
# When searching with new logic and new index
ElasticRepository::UserV2.new.search_by_account_name('foo', last_login: 1.month.ago)

このようにすることで、新/旧のロジックをアプリケーションレイヤーで適切に扱うことができるようになります。

インデックス再構築のオペレーション

上述したRepositoryを設計すれば、インデックスの再構築作業は容易かつアトミックに行えます。

新インデックスの作成

まずは新しいindexを作成し書き込みを可能な状態にします。Elasticsearchでのindex作成は非常に軽いオペレーションであるため、定義済みのものについては、deploy時などに自動的に反映されるようにする形が楽でしょう。

f:id:aeroastro:20171202220406p:plain

ユーザー検索の例ではCapistranoなどでのdeploy時に以下のコードを常に実行するようにしているだけでこのステップは完了です。

User.repositories_to_save.each(&:create_index!) # This internally checks if the index already exists or not.

新・旧両インデックスへの書き込み開始

つぎに、Modelクラスの保存時に利用されるRepositoryを新・旧の両方にします。

f:id:aeroastro:20171202205932p:plain

このアプリケーションコードがdeployされた後は、新indexについても最新のデータが保存されるようになります。

ユーザー検索の例では、 User.repositories_to_save の中にあるコメントアウトを外すだけで完了します。

全データの同期

上記アプリケーションコードがdeployされた後に操作されたデータに関しては、基本的には最新の情報が新indexにも保存されるようになります。 一方、アプリケーションでアクセスされていない休眠レコードについては、バッチ処理などでの同期が必要になります。

この際には Elasticsearch の Bulk API を利用して複数レコードを同時に保存することで高い効率を得ることが可能になります。

しかし、ここで一つの問題を解決する必要があります。それは通常のアプリケーション側での更新とバッチによる更新が同時に行われた際に、最新のデータを整合性のある状態で保存しなければならないということです。幸いなことにElasticsearchには Operation Type の設定が可能です。バッチからの更新には op_type=create を与えることによって、アプリケーション側の操作で新しくなったデータを古いデータで誤って上書きするということが避けられます。実際の運用においては、全てのレコードに対してこの処理を行っていくことで簡単に準備することが可能です。

また、Bulkによる更新速度が早すぎる場合にはrejectされる場合もあるので、適宜調整を行ってください。

検索用インデックスの切り替え

全データ同期用のバッチ処理が完了すれば、新インデックスにも完全なデータデータセットが揃っているはずです。 アプリケーション中での検索部分を新Repositoryを利用したものに置き換えれば、それがdeployされた瞬間に、新しいindex, 新しいlogicの組み合わせで検索が行われるようになります。

Index Aliases と異なり、旧アプリケーションは旧indexと旧logicの組み合わせで動き続けているので、deploy時に問題が発生することもありません。

f:id:aeroastro:20171202210739p:plain

ユーザー検索の例においては、上述したコードの最後の部分にある新ロジック用のコードに置換していただくだけでこの作業は完了です。

旧インデックスへの保存停止および削除

完全に旧アプリケーションが停止した後は旧インデックスへの保存停止、および、インデックスの削除を行います。

f:id:aeroastro:20171202220129p:plain

自動的に削除されるような仕組みにしておくのが運用コストを削減するために良い手法だと思われます。

ユーザー検索の例においては、User.repositories_to_save に該当しないものをdeploy時に削除してしまう程度のコードでも問題なく、より安全を期すなら、削除についてはしばらく放置しても良いでしょう。QAを行う前提であれば旧インデックスへの保存を停止するタイミングで、deploy時に削除してしまう方がバグの再現性が高いのでおすすめです。

以上の手順によって、ElasticsearchのIndexを本当の意味で無停止再構築することができました。

ユーザー検索の例で言えば、新indexへの保存を開始するタイミングと、新indexへ切り替えを行うタイミングでの都合2回のdeployが必要になりますが、ロジックとindexはそれぞれのRepository内部に閉じ込められ、開発効率をより高めていくことが可能だと思います。

まとめ

本記事では、Elasticsearchのインデックスを無停止再構築するための手法を紹介しました。 アプリケーションレイヤーやデプロイのフローまで絡めた手法にはなりますが、開発・運用のどちらの面からも適している手法だと考えています。

Elasticsearchはまだまだ勉強中なので、より適切な手法などがあれば是非教えてください。

ということで、 Elastic stack (Elasticsearch) Advent Calendar 2017 の2日目でした。

次は st_1tさん の 「Kibanaのハンズオン的なもの」になります!お楽しみに!

参考

www.elastic.co

github.com

github.com