#prestodb のクエリ履歴を保持する
TL;DR
- prestoのクエリログを全て保持したい
- 簡易に行うならAPI経由で取得できる
- きちんと環境を準備するなら
event-listener
を使う方法がよく、自分のケースではpresto-fluentd
を使用してfluentdを経由しデータストアに送信している
背景
prestoはFacebookが開発した、Hadoop上で動作する分散SQLエンジンである。 Hiveと比較すると、オンメモリで動作するのでCPU効率が良く高速に処理されるので、アドホックな環境に向いている。 このprestoを運用する上で欠かせないであろう実行クエリの履歴管理に関して、きちんとまとまっているエントリは少ないと感じたので書く。
クエリログの取得
方法1: 簡易(API経由)
手っ取り早く情報を取得するには、 http://<master node ip>/v1/query
APIが使用できる。
[ { "memoryPool": "general", "query": "SELECT ...", "queryId": "20171117_163751_00480_7vfi8", "queryStats": { "blockedReasons": [], "completedDrivers": 83, "createTime": "2017-11-17T16:37:51.152Z", "cumulativeMemory": 355882963.0, "elapsedTime": "1.79s", "endTime": "2017-11-17T16:37:52.943Z", "executionTime": "1.79s", "fullyBlocked": true, "peakMemoryReservation": "0B", "progressPercentage": 100.0, "queuedDrivers": 0, "runningDrivers": 0, "totalCpuTime": "86.63ms", "totalDrivers": 83, "totalMemoryReservation": "0B" }, "scheduled": true, "self": "http://172.xx.xx.xx:8889/v1/query/20171117_163751_00480_7vfi8", "session": { "catalogProperties": {}, "clientTransactionSupport": false, "locale": "en_US", "preparedStatements": {}, "queryId": "20171117_163751_00480_7vfi8", "remoteUserAddress": "172.xx.xx.xx", "startTime": 1510936671152, "systemProperties": {}, "timeZoneKey": 0, "transactionId": "26a22b5a-0c8d-4413-9b07-13ab4412e822", "user": "hadoop", "userAgent": "presto-ruby/0.5.2" }, "state": "FINISHED" }, { // } ]
上記のようなフォーマットで実行されたクエリ最新N件の情報が取得できる。 このAPIを定期的に叩いてパースしてデータストアに突っ込むだけでよいので、容易に実装ができる。 ただし 最新N件取得する という点が問題で、例えば1分起きにAPIを叩くスクリプトを実行したとして、深夜帯など一切クエリが更新されない環境の場合APIを叩くたびに同じクエリIDの情報を格納することになってしまうので、大量の重複データがデータストアに溜まることになる。 このため、重複を許容するか、何らかの方法で重複させない実装を挟む必要がある。 即解決可能な策としてはデータストアにRDBを用いてクエリIDをuniqueにする方法が考えられるが、ここではfluentdを使った方法を紹介する。
拙作ではあるが fluentd-plugin-comparison-filter
というfluentd filter gemがある。
このgemは、 column_key
で指定したカラムの値で、次に処理するレコードが最新のものか判断し、最新のレコードだけ出力する プラグインだ。
細かい値や制御を無視して要点だけ実装すると、このようになる。
require 'json' require 'yaml' require 'erb' require 'presto/metrics' stage = ENV["STAGE"] || "development" config = YAML.load(ERB.new(File.read(File.expand_path("../init_config.yml", __FILE__))).result(binding))[stage] client = Presto::Metrics::Client.new(host: config["host"], port: config["port"]) metrics = JSON.parse(client.get('/v1/query')) metrics.select{|m| m["state"] == "FINISHED" || m["state"] == "FAILED"}.sort_by{|m| m["queryStats"]["endTime"] }.select do |m| query_id = m["queryId"] start_at = DateTime.parse(m["queryStats"]['createTime']).to_s end_at = DateTime.parse(m["queryStats"]['endTime']).to_s query = m["query"].gsub(/(\r\n|\r|\n)/, '\\\\n') status = m["state"] puts JSON.dump({query_id: query_id, query: query, start_at: start_at, end_at: end_at, status: status)) end
- in_execを用いてデータを受け取るfluentdの設定の例
<source> @type exec tag presto.query_stats command ruby /opt/presto/fluentd_put_query_stats.rb run_interval 1m <parse> @type json </parse> </source> <filter presto.query_stats> @type comparison <comparison> column_key end_at column_key_type time time_type string time_format %Y-%m-%dT%H:%M:%S %z </comparison> </filter> <match presto.query_stats> @type copy <store> @type relabel @label @bigquery-out </store> </match> # 後続の処理
APIを1度叩いて得たreponseに関して実行された順にfluentdで処理を行い、fluentdではクエリの実行完了時間を用いて「レコードが最新かどうか」判断することで、再びAPIを叩いて全く同じレスポンスだったとしても前回のレスポンスにより最新のレコードまで取り込んであるので、取り込み済のレコードはフィルタリングされるという挙動になる。 これでクエリの重複は回避することができた。
しかし、実はこれでもまだ問題がある。 APIを叩く頻度を1分毎であるとする。また、1回のAPIで得られるクエリ件数は40件とする。 この状況で、例えば高負荷により分間40回を超えるクエリが実行された場合、今度はクエリの取り逃しが発生する。
取り逃しを発生させないために頻度を上げて例えば10秒に1回実行するなどの選択肢はあるが、そもそも不要な処理を行い負荷をかけるのは良い判断とはいえない。
方法2: event-listener経由
方法1では、クエリ情報取得の重複・漏れの可能性があったが、実はprestoには event-listener
と呼ばれる機構があり、クエリ完了時に任意のプログラムを走らせることが可能である。
これを用いてクエリ完了時に各メトリクスをデータストアに保持するようにすれば、無駄なく漏れなくクエリログの取得が可能である。
観測範囲では2つのプラグインがあり、 一つはクエリ結果をファイルに書き出すpluginpresto-audit、もう一つはfluentdに送信するpresto-fluentdだ。 個人的にはfluentdはretry処理や複数のデータストアへの送信を柔軟に行える点で便利だと考え後者を採用した。
fluentdが非常に容易に各メトリクスを処理できるので、
- メトリクスはdatadogに
- クエリログはbigqueryに
それぞれ送信し、日々の監視/運用に役立てている。
event-listener.properties
event-listener.name=presto-fluentd event-listener.fluentd-port=24224 event-listener.fluentd-tag=presto.query event-listener.fluentd-host=<送信対象のfluentd host>"
fluentd.conf
<match presto.query> @type copy <store> @type relabel @label @presto-query-stats </store> <store> @type relabel @label @presto-query-storage </store> </match> <label @presto-query-stats> <match **> @type copy # elapsed_time <store> @label @presto-datadog-out @type record_reformer enable_ruby true renew_record true tag presto.query_stats.elapsed_time <record> type gauge metric presto-observer-<%= ENV["STAGE"] %>.query_stats.elapsed_time value ${(record["endTime"] - record["createTime"]) / 1000.0} tag ${record["user"]} </record> </store> # 略: その他各メトリクス </match> </label> <label @presto-query-storage> <match **> @label @presto-bigquery-out @type record_reformer renew_record true tag presto.query_storage.big_query <record> query_id ${record["queryId"]} user_name ${record["user"]} elapsed_time ${(record["endTime"] - record["createTime"]) / 1000.0} start_at ${Time.at(record["executionStartTime"]/1000).utc.strftime("%Y-%m-%d %H:%M:%S.%3N")} end_at ${Time.at(record["endTime"]/1000).utc.strftime("%Y-%m-%d %H:%M:%S")} query ${record["query"]} status ${record["state"]} </record> </match> </label> <label @presto-datadog-out> <match **> @type dd dd_api_key <%= ENV['DATADOG_API_KEY'] %> <buffer> # 略 </buffer> </match> </label> <label @presto-bigquery-out> <match presto.query_storage.**> @id bigquery-partitioned @type bigquery method load auth_method json_key json_key /fluentd/env/googleauth_key.json <buffer> # 略 </buffer> </match> </label>
結論
event-listenerを用いて、重複・漏れなく効率的にクエリの履歴を残すことができる。 fluentdを使うことで複数のデータストアに結果を送信することができるので、柔軟に監視環境を構築することができる。