FluentdとRiakの話

Fluentdは、Ruby製のログコレクタだ。コードは公開されている。
様々なログを構造化して一元管理することができ、収集と解析へのハードルを大きく下げてくれる。
インストールもプラグイン開発も簡単。日本語の資料も多い。
その資料も様々あるが、プラグインを見るならこれが最良だと思う。必要な情報がよくまとまっており、必読といえる。


データの確実な転送を実現するバッファ機能については、池田大輔さんのブログが詳しい。

さて、Fluentdはデータを収集してくれるが、保存はしてくれない。
永続化にはデータベースが必要だ。
そこで、Riak。
f:id:saisa6153:20130817155009p:plain
Basho社がスポンサードするErlang製分散型KVS。これもOSSだが、契約によって商用サービスが受けられる。
これがまたエッジ立ちまくってて紹介しきれないのだが、最大の特徴は、マスターが無いためシングルポイントになり得ないということだと思う。
例えばMySQLに代表されるRDBの場合マスター・スレーブ構成をとることが多いと思うが、その場合マスターが停止した場合はDBにアクセスするシステム全体が停止してしまう。
それを防ぐためどうにかしてマスターを冗長化するわけだが、シングルポイントである事実は変わらない。
しかしRiakは違う。全ノードが対等であり、どれかのノードが停止しても他のノードがカバーする。すぐにシステムが停止する可能性は低い(もちろんゼロではない)。
深夜にHW障害アラートを受けたシステム管理者は、翌朝出社してからインフラチームに対応を依頼すれば良い。
KVSとはKey-Value Storeの略称だが、KeyがStringであればValueはなんでも良い。
文字列でも、写真や動画といったバイナリでも受け入れられる。
もちろん、構造化されたドキュメントもOK。
f:id:saisa6153:20130817160937p:plain
RiakにはRiak Searchという機能があり、有効にしておくとデータのコミット時にValueを読んで、自動的にインデックスを作成してくれる。
そうするとクエリが利用可能になる。(ただこれはまだあまり試していない。)
書籍"7つのデータベース7つの世界"によると、基本的な文字列検索を中心として、ワイルドカード複数条件のand/or, グループ化、範囲検索が利用可能なようである。
複雑な計算などは簡単には実現できないようなので、データを集計処理するアプリケーションを書いても良いかもしれない。
分散型という特徴を活かしてのMapReduceも簡単に実装できる。
KVSといったらmemcachedやTokyo Cabinet/Kyoto Cabinetの印象が強いと思うが、それよりはRedisが近い。
ストレージエンジンを有し、ディスクへの書き込みが可能だからだ。
しかし、Redisとは異なりレプリケーション設定の必要はない。
まだユーザは多くないかもしれないが、明確な特徴(利点)を持つおもしろくて有用なデータベースである。

そんなRiakにも欠点がある。
まず、RDBのような厳密なトランザクションは原理的に実装が難しい。
もちろん限りなくそれに近いことはできるが、課金情報のような整合性第一の分野では採用するべきではないと思う。
また、Riak Searchで実行可能なクエリの表現力は、RDBやMongoDBなどに比べると劣るという印象。
基本的にKVSなので仕方ないのかもしれないが、データ処理が重要であれば、アプリケーションを書いてカバーする必要がある。
なお、コミット前の値の修正は、Riakの標準機能で実現可能だ(プレコミットフック)。
そして、限界まで性能を引き出したいならErlangを読み書きする必要があるということ。
基本的な機能やMapReduceはJSや各言語のクライアントライブラリを通して記述可能だが、速度や安定性を最大まで求めるならやはりErlangで書いたほうが良いらしい。

このように、Riakは可用性に特化している。
また、スケーラビリティも非常に高く、スケールアウト(スケールアップではない)が容易らしい。
どう容易なのかというと、

  1. レプリケーション設定無しで、数回のコマンドで新規ノードを稼働中クラスタへ投入可能
  2. データの規模に対して負荷がリニアに増大していくので、必要なリソースを予測しやすい

ということらしい。

で、Fluentdを思い出すと、日々増大するログの格納先にはRiakって良い選択肢なのではないかと思った。
その根拠は以下のとおり。

  • 多くの場合、ログデータ自体の不具合によってユーザからのクレームが発生することは少ない(完全な整合性が求められない)
  • ログデータはサービスのスケールに伴って増大する
  • 上記の理由より、分散処理が求められる

つまり、データひとつあたりの価値が高くなく、運用の簡便さが必要であるため、Riakが適任なのではないか、と考えた。

イメージはこんな感じ。
f:id:saisa6153:20130817170907p:plain
これをMongoDBで実現しようとすると多分こうなる。
f:id:saisa6153:20130817170922p:plain
Riak簡単!

ということで、FluentdやRiakをいじってみる。
Fluentd(td-agent)のインストールは簡単なので調べてもらうとして、その設定を行おう。
あ、その前にRiakのインストール。

[root@saisa-pc ~]# yum install http://yum.basho.com/gpg/basho-release-6-1.noarch.rpm
[root@saisa-pc ~]# yum install riak
[root@saisa-pc ~]# service riak start
Starting riak: !!!!
!!!! WARNING: ulimit -n is 1024; 4096 is the recommended minimum.
!!!!
                                                           [  OK  ]

Riak公式ドキュメントにも書いてあるけど、ファイルディスクリプタ数の制限を解除して起動。

[root@saisa-pc ~]# ulimit -n 4096
[root@saisa-pc ~]# service riak restart
Shutting down riak: ok
                                                           [  OK  ]
Starting riak:                                             [  OK  ]

※fluent-plugin-riakが必要
[root@saisa-pc ~]# /usr/lib64/fluent/ruby/bin/fluent-gem install fluent-plugin-riak
[root@saisa-pc ~]# service td-agent restart
Shutting down td-agent:                                    [  OK  ]
Starting td-agent:                                         [  OK  ]

このfluent-plugin-riakだが、内部でriakのruby clientを使用しているため、REST APIとは異なるポートをListenしていることに注意。
これに気づかなくてかなりハマった。

同一マシン内でApacheアクセスログをRiakに流す場合は、Fluentdの設定は以下のようになる。

sourceディレクティブ
      type tail
      format apache
      path /var/log/httpd/access_log
      tag riak.apache
    
matchディレクティブ
      type riak
      buffer_type memory
      flush_interval 10s
      retry_limit 5
      retry_wait 1s
      buffer_chunk_limit 256m
      buffer_queue_limit 8096
      nodes 127.0.0.1:8087

なぜか<>が表示されないので適当な表記にしてしまった...
access_logをfluentdが読めるように権限を設定しておく必要がある。

riakとfluentdが起動していれば、これで連携は開始されるはずだ。
試しにcurlして、access_logがriakに格納されるかを見てみよう。

なお、今回はRiakのRESTインタフェースに対してcurlすることでHTTPで値の出し入れを行う。
Apache連携(80番へのcurl)との区別をしっかり。

※死活確認
[root@saisa-pc ~]# curl -v http://127.0.0.1:8098/ping
 * About to connect() to 127.0.0.1 port 8098 (#0)
 *   Trying 127.0.0.1... connected
 * Connected to 127.0.0.1 (127.0.0.1) port 8098 (#0)
 > GET /ping HTTP/1.1
 > User-Agent: curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.13.1.0 zlib/1.2.3 libidn/1.18 libssh2/1.2.2
 > Host: 127.0.0.1:8098
 > Accept: */*
 >
 < HTTP/1.1 200 OK
 < Server: MochiWeb/1.1 WebMachine/1.10.0 (never breaks eye contact)
 < Date: Thu, 25 Jul 2013 05:55:00 GMT
 < Content-Type: text/html
 < Content-Length: 2
 <
 * Connection #0 to host 127.0.0.1 left intact
 * Closing connection #0
OK
 
[root@saisa-pc ~]# service httpd status でapacheの起動を確認

サンプルのログを入れるためにcurl
[root@saisa-pc ~]# curl http://127.0.0.1

キーを確認してみる
[root@saisa-pc ~]# curl http://localhost:8098/buckets/fluentlog/keys?keys=true
{"keys":["2013-07-25-f22c8f31-ec5f-4cd9-b76d-960d7d7a00db"]}

確認したキーで値を見てみる
[root@saisa-pc ~]# curl http://localhost:8098/buckets/fluentlog/keys/2013-07-25-f22c8f31-ec5f-4cd9-b76d-960d7d7a00db
[{"host":"127.0.0.1","user":"-","method":"GET","path":"/","code":"403","size":"5039","referer":"-","agent":"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.13.1.0 zlib/1.2.3 libidn/1.18 libssh2/1.2.2","time":"2013-07-25T05:57:07Z","tag":"riak.apache"}]

これで、fluentdとriakの連携を確認できた。
以上は、この2つのブログと公式ドキュメントを参考にした。

さて、問題の集計クエリである。
Apache連携なしでコミットして試してみる。今回はRiak Searchを有効にしていない(デフォルトで無効)ので、コミット時に2i(セカンダリ・インデックス)を指定して、そのインデックスをもとに集計をかける。
まずはriakの設定ファイル(app.config)の該当箇所を変更して、ストレージエンジンをeLevelDBに変更する。
riakにもストレージエンジンが複数用意されているが、2iを使うにはデフォルトのBitcaskでなくeLevelDBでなければならない。

80行目あたり
%% Riak KV config
  {riak_kv, [
             %% Storage_backend specifies the Erlang module defining the storage
             %% mechanism that will be used on this node.
             {storage_backend, riak_kv_eleveldb_backend},

設定を変更したら、保存してriakを再起動。
これで準備完了だ。

2iの設定には、x-riak-indexを用いる。
※なお現状、fluent-plugin-riakではインデックスが固定ではられるので、予めRiak Searchを有効にするか、Pull Requestを出す必要がある。

➜  ~  curl -v -XPUT \
    -d '{"time":"2013-07-29T00:11:22"}' \
    -H "x-riak-index-type_bin: date" \
    -H "x-riak-index-date_int: 20130729" \
    http://localhost:8098/riak/fluentlog/20130729testtest1
    
➜  ~  curl -v -XPUT \
    -d '{"time":"2013-07-30T11:22:33"}' \
    -H "x-riak-index-type_bin: date" \
    -H "x-riak-index-date_int: 20130730" \
    http://localhost:8098/riak/fluentlog/20130730testtest4

こんなかんじで値を変えてたくさんデータ作ったら、下みたいにRiakにcurlしてデータが帰ってくるかを確かめる
    
➜  ~  curl http://localhost:8098/riak/fluentlog/20130729testtest1
{"time":"2013-07-29T00:11:22"}
                                                                                      
➜  ~  curl http://localhost:8098/riak/fluentlog/20130729testtest2
{"time":"2013-07-29T00:22:33"}
                                                                                        
➜  ~  curl http://localhost:8098/riak/fluentlog/20130729testtest3
{"time":"2013-07-29T00:33:44"}
                                                                                        
➜  ~  curl http://localhost:8098/riak/fluentlog/20130729testtest4
{"time":"2013-07-30T00:11:22"}
                                                                                    
➜  ~  curl http://localhost:8098/riak/fluentlog/20130729testtest5
{"time":"2013-07-30T00:22:33"}

これでテストデータができた。
簡単な操作で集計処理を試してみたいが、以下の注意点がある。

  • keyはuniqでなければならない(fluent-plugin-riakで付けられるkey名も乱数のようなもの付けて準拠している)
  • x-riak-index(ヘッダプレフィックス)にはhoge_binとfuga_intをつける。binにはインデックス種別(date)、intには値(日付-無し)を指定 末尾の_*(suffics)は必要みたい
  • -d’{}’内にはapacheログが幾つかjson形式で入るが、この内部はriak_searchでなければ立ち入れない
➜  ~  curl -XPOST http://localhost:8098/mapred \
       -H 'Content-Type: application/json' \
       -d '{"inputs":{
                "bucket":"fluentlog",
                "index":"date_int",
                "key":"20130730"
            },
            "query":[{"reduce":{"language":"erlang",
                                "module":"riak_kv_mapreduce",
                                "function":"reduce_count_inputs", 
                                "arg":{"do_prereduce":true}
                               }
                    }]
            }'

[3] ←出力される件数

これは、RiakのRESTインタフェースを通して組み込みの集計クエリを呼び出している。


実際にはfluentd-fluentd間の通信だったり他に考慮することがいくつかあるけど、今回はfluentdとriakがメインだから割愛する。
これであとはRiak Searchで、Valueに格納したJSONに対してクエリをかけることができれば、いよいよRiakをFluentdの出力先として選定する材料が揃うのではないだろうか。

ちなみにfluentdの話だが、一般的なApacheのログフォーマットから外れるカスタムログを送信する場合、正規表現でフォーマットをApache側のfluentd.confに記述する必要がある。
以下のブログを参考にすると、fluentdの動作が意図したものかを確認して正規表現が記述できるのでオススメだ。

Riakは、新しい。
まだ前例は多くない。
しかし、だからこそ、挑む価値と楽しさがあるのではないか?
検討すべき課題はまだあるけど、できれば実際に使ってみたいと思う。