Enjoy Architecting

Twitter: @taisho6339

2023年の振り返り

今年も仕事納めをしたので一年間を振り返りたいと思います。

2023年の仕事を振り返って

今年も全社モニタリング基盤の時系列データベースの開発・運用に携わってきました。

その中でも今年は「出来るか出来ないか分からないけど頑張って乗り越えて結果を出さないといけない仕事」がとても多い年でした。

内製のIn-Memoryデータベースのメモリフラグメンテーション問題を解決したり、

既存の構成にObject Storageを活用した独自のmetrics保存・クエリ処理レイヤを実装して加えることで、年間運用費を1億円弱削るための取り組みをしたり、

自分史上過去一難易度が高い仕事を必死に頭を振り絞って取り組んできました。

特に後者はCNDT2023の登壇では話す時間が全然ありませんでしたが、 データ保存よりクエリ処理の方が100倍大変で、 キャッシュをチューニングして高速化したり、 転置インデックスのインデックスを作ってインデックスのデコードと検索を部分処理できるようにして負荷を下げたり、 自分とチームメイトの技術力の集大成を詰め込みました。 せっかくなのでどこかの場で発表させていただけないかと画策しています。

自分は就職前の大学生の頃から一貫して「プラットフォーム」を自ら実装したいと言っており、ようやく夢がかなってずっとやりたかった仕事が出来ている感覚があり、幸い評価もしていただけているので、かなりのプレッシャーを感じつつもメンタルを安定させてやって来られました。

多国籍チームで働くということ

日本人が一人しかいないチームでそろそろ2年ほど働いていることになります。

率直に言ってしまうと今のチームのカルチャーがすごく好きでとても働きやすいです。 「自分は自分のままでいい」という個人が肯定される風土が特に気に入っていて、 日本の組織でよく観測される「社会人ならxxでなくてはいけない」「エンジニアならxxができないといけない、yyを知っていないといけない」みたいな無意識の規範意識が強制されることがありません。

苦手なことを無理に頑張らなくていいし、無理に周りに合わせる必要がなく、個々の性質を最大限尊重されているように感じますし、自分も意識的に尊重するようにしています。 知らないことはお互いに気軽に教え合いますし、些細なことでもお礼を言い合っていて、X(Twitter)で悪口を書かれてたりとか、高圧的に接されたりとか、人格否定のようなこともありません。

レイオフが当たり前な文化圏から人が来ているからなのか、たまたまなのか分からないですが、この先いつか転職するとしても似たような風土の場所が良いなと思います。

英語

毎日英語学習を続けているお陰でだいぶ英語でストレスを受けることがなくなってきました。 今年は、TOEFLとかIELTSを受けるための登竜門と言われている英検準一級を取ってみました。 無事に受かったので、来年もなにかしら受けたいと思っています。

今の課題としては、とにかくリスニングをもっと伸ばしたいと思っていて、そのためにシャドーイングと多読を毎日やっています。 シャドーイングはシャドテンというサブスクで毎日添削してもらいながら行っていて、

多読はデータベースや分散システム関連の論文を気の向くままに読んでいます。 特に出てくる語彙や理論も仕事で馴染み深いので比較的スラスラ読めますし、知らない単語・熟語も覚えてすぐに使い道があるので楽しくやれています。

シャドーイングと多読を始めてからグッと英語力が伸びた感覚があるので、今後もずっと続けていく予定です。

来年の抱負

来年も引き続き英語・分散システム・データベースをテーマに好きなことで生きていこうと思います

2022年の振り返り

今年ももうすぐ終わるので去年までと同様、一年間を振り返りたいと思います。

時系列データベースの開発チームへ異動

今年の4月から、Private CloudのSREチームから時系列データベース開発チームへジョブチェンジしました。 いきなり日本人が自分一人だけのチームに移り、すべての業務が突然英語になったことで戸惑うことも多かったものの、 ようやくチームにも慣れ、なんとか業務を回せています。

直近では、数億件のデータをどう効率的に分散処理し、どうスケーラブルかつ運用コストをあまりかけずに保存するか、 といったテーマでアーキテクチャ改善のプロジェクトを主導させてもらえており、とても楽しく働くことができています。

異動についての公式インタビュー記事

英語について

今年一番苦労し、力を入れてきたのは英語でした。 チームにJOINしたばかりの頃はCEFRでいうとB1のレベルで、自分だけ議論の内容についていけない、突然話を振られてパニクってしまうなどたくさん辛い経験をしました。 オーバーラッピング、ボキャブラリ習得、発音練習、スピーキング練習をひたすら毎日2時間程度やり続けることで現在はCEFR B2レベルまで上がり、 なんとか業務上は問題にならない程度の英語力を身につけることができました。 とはいえまだなんの苦もなく自己表現できるには至っていないので、来年も引き続き練習していこうと思っています。

※単語帳はDistinction 2000、文法書はGrammer In Use一億人の英文法、発音は英語耳ELSA Speak、オーバラッピングはVoiceTubeに非常に助けられました

SREからSWEへのキャリアチェンジ

2021年の振り返りで、

そして今回のGrafana Lokiでの活動を通して、 自分もLokiのような分散アーキテクチャなソフトウェアを作る仕事をしてみたいという気持ちをとても強くしました。

ということを書いているのですが、この思いが諦めきれず思い切って異動してみました。 SREでは新しいソフトウェア、システムを作るというよりは、適切にツール、ソフトウェアを要件に合わせて選択し、運用に乗せるのが仕事で、 オリジナルな仕組み、ツールなどはオーバーエンジニアリングや負債になりやすいので、避けられるのであれば避けるというスタンスでした。 これはこれで面白い仕事だと感じつつも、潜在的にそういったものをしがらみなく自分自身で開発してみたいという気持ちがとても強かったというのが大きな理由です。

結果この異動は大成功で、自分はプロダクト側にいたほうが仕事のスコープや要件、関係者、ユーザのニーズをイメージしやすく、 自分からちゃんと導入まで持っていける具体的な提案もしやすいため、向いているのだなということに気がつくことができました。

また、大抵の場合、時系列データベースそのものを作るみたいな仕事はビジネス上妥当な理由がつけにくく、 無理やり作ったとしても負債になりがちですが、今の会社だと思いっきり気にせず打ち込めるのが嬉しいポイントです。

今年のアウトプット

現在担当しているプロダクトでも使われているRaftについて、論文を読んで自分なりに実装してみました。 https://github.com/taisho6339/rs-raft

今後のキャリアについて思うこと

引き続き自分が楽しめて、市場価値も高い分野という軸で開拓していきたいと思っています。 その軸で考えると今現在自分が関わっている分散システムの開発運用という分野は、 日本の会社だとあまりポジションはないですが、海外を見ればたくさんあるので引き続き英語と一緒に頑張っていこうと思っています。

一方で、オンプレの会社だし、世のクラウドサービスへの知見がどんどん風化していくことに不安を覚えることもあります。 ただ、今の仕事でしっかり気合を入れて取り組むことで、分散システム設計の根本の知識や経験が得られるので、 こういった基盤となるスキルセットさえあれば、後から必要に応じてキャッチアップして良いアウトプットができると信じています。 また、クラウドであれば詳しい人はどの会社にもいるだろうし、ソリューションアーキテクトのようなロールの方々もプロバイダ側にいたりするので、 気負う必要はないし、むしろ棲み分けているおかげで一緒に組んで良いシナジーが出せるのではと考えています。

来年の抱負

ということで、来年はもう少しアウトプットの比重をあげつつ、 英語、分散システム、データベース分野に引き続き力を入れてがんばります。

2021年の振り返り

今年は結婚、LINEへの転職を経て、自分にとって大きな転換期となる年でした。 今年も例年と同様に振り返り、来年の糧としたいと思います。

LINEでの仕事のスタート

今年1月からLINEに入社してPrivate Cloudを作る部署のSREとして仕事をはじめました。 背景についてはこちら。 https://taisho6339.hatenablog.com/entry/2020/12/31/191912

Private Cloud開発者用の横断Platformを開発、運用する、というミッションを担い、 専らログ基盤の構築に邁進してきた一年でした。 Managed Fluentdの提供や、Fluentdを管理するためのKubernetes Operatorなどを作ったり、 Grafana Lokiを使った20 TB / day規模の大規模ログモニタリング基盤を構築したり、 なかなかにアグレッシブな仕事ができました。

※具体的な内容については下記にて発表させていただきました。

Reliable Log Aggregation System in Multi-Tenant Kubernetes cluster

Grafana Lokiで構築する大規模ログモニタリング基盤

フリーランスから転身してみて

結論から言うとLINEに入社して良かったと思っています。 現在の僕のロールはシニアソフトウェアエンジニアなので、 部署を横断した価値提供や、よりハイレベルな要件下でのシステムデザイン、開発、運用が求められます。 そのため要件整理から各所への合意形成、そして開発リリースまで一貫して、裁量を与えられて仕事をさせていただきました。 技術的にも、ただ既存のミドルウェアやライブラリなどを使えば済むような要件はなく、 内部動作や場合によってはソースコードレベルで理解していないといけないようなレベルの仕事も多く、 ソフトスキル、技術スキル両面で大きく前進できた一年だと思っています。

最もOSS活動に勤しんだ一年

そして今年は、人生で最もOSSプロダクトに真剣に向き合った年になりました。

今回LINEの仕事でGrafana Lokiを使うに辺り、 かなり気合いを入れて、検証、内部動作理解に努めてきました。 結果、のめり込むほど面白さを感じ、 本体のコードに10個ほどContributeしたり、

https://github.com/grafana/loki/pulls?q=is%3Apr+author%3Ataisho6339+is%3Aclosed

いろんな国のユーザとSlackやIssue上で議論したりしていました。

クエリシャーディング、レプリケーション、キャッシュ、クラスタリング、Consistent Hashなど、 分散システムの基本的な仕組みを愚直に組み合わせることで、 S3のようなほどほどなレイテンシの安価なストレージ上で、 高速にリアルタイムなデータ検索を実現しているアーキテクチャにロマンを感じました。

Lokiのことが好きになりすぎてファンブックのようなものまで作ってしまいました。

Grafana Loki Deep Dive

今後のキャリアについて思うこと

もともと「コンピュータサイエンスをゴリゴリに意識しないと成果を出せないエンジニアリング」にとても強い関心があり、 そのためアプリケーションレイヤから徐々に下周りのレイヤに降りるようにキャリアを開拓してきました。 特に最近はそういったレイヤはうまくクラウドやライブラリ、フレームワークで抽象化されているため、 いっそそれらを作ることに関心を持ち始めました。 そして今回のGrafana Lokiでの活動を通して、 自分もLokiのような分散アーキテクチャなソフトウェアを作る仕事をしてみたいという気持ちをとても強くしました。

日本でいうとやはりポジションは少ないのですが、 英語さえクリアできればそういったポジションもたくさんあるのだということが実感として感じることができ、 もっと気合を入れて長い目で継続して英語をやっていくぞ、という気持ちになりました。

今年のアウトプットまとめ

登壇

OSS活動

Grafana LokiへのContribution

Grafana Loki Deep Dive

ブログ

https://taisho6339.hatenablog.com/archive/2021

来年に向けて

来年は今年以上に、英語、OSS活動、低レイヤの開発をテーマに邁進していこうと思います。

高負荷環境でFluentdを安定運用するための3つの観点

本記事について

Fluentdは機能としてはシンプルですが、 高負荷環境で安定的に運用するためにはある程度の知識が求められます。

そこで、本記事ではそれなりにログ流量の高い環境下で私が考慮した観点をまとめました。

本記事では、KubernetesでFluentdの信頼性を担保するための3つの観点に加え、

「高負荷時の安定運用」に焦点を当て、

  1. 負荷分散
  2. 適切なモニタリング
  3. トラブルシューティングとチューニング

の3つの観点について整理しています。

前提となるアーキテクチャ

アーキテクチャとしては実際に私が構築した図の構成を前提とします。

f:id:taisho6339:20210422151430p:plain

アーキテクチャの特徴

1つのKubernetesクラスタに、FluentdがForwarderとAggregatorという2つのロールで存在しています。

  • Forwarder

    • DaemonSetでデプロイされる
    • 各コンテナの出力ログをあつめ、Aggregatorに送信することだけが唯一の責務
    • Aggregatorの名前解決はServiceリソースで行う
  • Aggregator

    • StatefulSetでデプロイされ、バッファファイルやposfileの永続化のためにPersistentVolumeがアタッチされる
    • ForwarderからTCPでログを受け取る
    • filterを用いた加工処理、最終的なデータストアへのログ送信を担う

1. 負荷分散

負荷分散として考慮することは、

  • Aggregatorの水平スケール手段の確立
  • 複数のAggregatorに均等にログが送られること

です。 AggregatorはStatefulSetなので、HPAを使った自動スケールや手動操作によって簡単に水平スケールすることができます。 しかし、均等にログを送るためには踏み込んだ考慮が必要です。 Aggregatorの前段にL4ロードバランサーやKafkaなどのメッセージングキューを置くことでより精密な分散ができますが、 今回はあくまでもDNSによる名前解決の構成で考えます。

※近い将来にKafkaの導入を検討しています

DNSキャッシュについて考慮する

FluentdはデフォルトでDNSで名前解決したIPを永続的にキャッシュします。 よって、このままだと仮にAggregatorがスケールしたとしても新しく作成されたPodにルーティングされてくれません。 そこでDNSキャッシュの時間を短く or 無効にすることで水平スケールに対応できるようにします。

※その分DNS側に負荷がかかるので許容できない場合はLBやメッセージキューを検討することになります

expire_dns_cache

Client-side DNS round robin

DNSラウンドロビンでリクエストが偏る場合は、クライアントサイドの分散設定を考慮するのが有効です。 forwarder pluginには、dns_round_robin設定があり、 これを有効にすると、DNSから返却されたIPリストからランダムで一つピックアップして使用する、という挙動になります。

2. モニタリングで備える

Fluentdで負荷に備えるためには、モニタリングで見るべきポイントを明確にしておく必要があります。 そこで大前提として意識しなければならないのは、「ログの経路」と、各経路での「単位時間辺りのログが流れる量」です。

ログの経路とログの流量の大原則

ログはForwarder、Aggregatorのそれぞれのポイントで以下のような経路をたどっていきます。

  1. In Forwarder: 監視対象のログファイルから読み取り、加工処理(filter, match)をしてバッファに書き出し
  2. In Forwarder: バッファに溜まったログを、チャンク単位でFlush Queueにキューイング
  3. In Forwarder: Flush QueueからAggregatorへ送信
  4. In Aggregator: Chunk単位でログを受信し、加工処理(filter, match)をしてバッファに書き出し
  5. In Aggregator: バッファに溜まったログを、チャンク単位でFlush Queueにキューイング
  6. In Aggregator: Flush Queueから各送信先へ送信

つまり大原則として、Forwarder、Aggregator両サイドにおいて、

  • 単位時間あたりのログからバッファに書き込まれる量 < 単位時間あたりのバッファからFlush Queueuにキューイングされる量
  • 単位時間あたりのFlush Queueにキューイングされる量 < 単位時間あたりのFlush Queueから宛先に送られる量

である状態をなるべくの間維持しておかなくてはなりません。 そうでなければ、常に一定量のバッファがたまった状態になり一定時間遅れで集計され続けるか、最悪の場合バッファが溢れて容量限界を迎えてしまいます。

以上の点を踏まえて監視を行っていきます。

監視すべきメトリクス

まず他のコンテナと同様に、

  • CPU使用率
  • メモリ使用率
  • プロセスの死活監視
  • ファイルディスクリプタの使用率(※取得可能な場合)

などはモニタリングします。

それに加えてFluentdは、monitoring-prometheusに記載されている通り、Prometheus形式のメトリクスをexportすることができ、

記載されているmetricsのように、output時に発生したエラー数や、Slow Flushの数、Retryの数などもエラー検知に役に立ちます。

また、負荷計測の観点では下記のメトリクスをチェックしておくことで適切なログ流量設定の参考にすることができます。

ログの流量に関する重要メトリクス

メトリクス名 説明 目的
fluentd_output_status_buffer_total_bytes buffer stage + buffer queueを合計したログのデータ量 全体的なバッファのたまる速度、はける速度をモニタリング
fluentd_output_status_buffer_stage_byte_size buffer stage領域に溜まっているログのデータ量 バッファにたまる速度、Flush Queueにキューイングされる速度をモニタリング
fluentd_output_status_queue_byte_size buffer queue領域に溜まっているログのデータ量 Flush Queueにキューイングされる速度、実際にFlushされる速度をモニタリング
buffer_available_buffer_space_ratios bufferの有効な残りスペースの割合 バッファが足りそうかをモニタリング

私の推進するプロジェクトでは、上記メトリクスを使ってダッシュボードを作り常に可視化しています。

3. トラブルシューティングに対応する

最後のセクションではトラブルシューティングに役立つコマンド、ノウハウと、おまけとして私が実際に遭遇した障害ケースを紹介します。

前提1: FluentdのThreadモデル

Fluentdの過負荷をトラブルシュートするにあたって、Threadモデルを理解しておく必要があります。

Fluentdで抑えておくべきThreadの種類は以下の3つです。

  • event_thread

    • input pluginが作るevent_loop用のスレッド
    • リクエストを受けて加工し、バッファに書き込むまでが仕事
  • enqueue_thread

    • flushできるChunkをflush queueにenqueueするスレッド
  • flush_thread

    • キューイングされたChunkを実際の宛先に送信するスレッド

前提2: トラブルシューティングの流れ

基本的にFluentdが過負荷になったり、ログが送られなくなった場合、私は下記のような流れでトラブルシュートしています。

  • top -H コマンドで過負荷になっているFluentdのスレッドを特定する
  • netstat, lsofを使い、接続は確立しているかを確認する
  • tcpdumpを使い、データは実際に送られているかなどを確認する
  • straceを使ってstuckしているsyscallを確認する
  • SIGCONT signalを飛ばしてsigdumpを吐き出し、どこでstuckしているのかを確認する

Fluentdはsigdumpが組み込まれており、SIGCONT signalを飛ばすことで特定の場所にこのようなスレッドダンプのファイルを吐き出してくれます。 つまり、stuckしている場合はこのdumpファイルを吐き出すことで実際にRubyコードのどの部分でstuckしているのかを可視化することができます。

Case1: 1つのChunkサイズが大きすぎてAggregatorが過負荷になった

事象としては下記の状態になっていました。

  • 一定時間ごとにAggregatorのCPU使用率が100%近くで張り付く
  • Forwarderへtimeout時間までにackを返せずにログ送信が失敗するエラーがたくさん出ている
  • ずっとCPU使用率が張り付いているわけはなく、定期的にバラバラのaggregatorに負荷が集中して収まるのを繰り返している

よって、負荷が高まっているコンテナ内で

top -H

を実行したところ、前述した1つのevent_threadだけが高負荷なことが分かりました。 つまり、受け取るところ or 加工するところ or バッファに書き込むところのどこかがボトルネックになっていることになります。

また、

netstat

を実行したところForwarderと接続確立していたのは1つだけだったため、リクエストの数が高負荷になっているわけでもなさそうでした。 またバッファに書き込むところが詰まっているならIO待ちの割合が増えそうですがそんなこともなくCPUも高負荷なので、消去法で加工するところが重いと仮説を立てました。

そこでForwarderから送るChunkあたりのサイズを小さくしたところ、無事に解決しました。

参考: Buffering Parameters

Case2: Flush Threadが常時CPU使用率100%近くの過負荷になった

事象としては、

  • Aggregatorの送信先のElasticsearchクラスタに全くログが送られなくなった
  • AggregatorコンテナのCPU使用率が常時100%近くで張り付いている

ということが発生しました。 topコマンドの結果、flush threadが張り付いていることが分かりました。 また、netstatしてもESへの接続がなく、tcpdumpで確認しても一切データが送られていない状態であることも分かりました。 そこでflush threadに対してstraceすると、getrandomというsyscallを無限に呼び続けていることが確認できました。 つまり確実にどこかの処理でstuckしているので、SIGCONT signalを投げてスレッドダンプをとったところ、下記のようなダンプが取れました。

Thread #<Thread:0x0000562c82ebbf90@flush_thread_2 /srv/fluentd/lib/ruby/gems/2.7.0/gems/fluentd-1.12.3/lib/fluent/plugin_helper/thread.rb:70 run> status=run priority=0
      /srv/fluentd/lib/ruby/2.7.0/securerandom.rb:123:in `urandom'
      /srv/fluentd/lib/ruby/2.7.0/securerandom.rb:123:in `gen_random_urandom'
      /srv/fluentd/lib/ruby/2.7.0/securerandom.rb:73:in `bytes'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:606:in `random_number'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:606:in `random'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:625:in `block in allocate_request_id'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:622:in `synchronize'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:622:in `allocate_request_id'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:842:in `sender'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:527:in `block in fetch_resource'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:1124:in `block (3 levels) in resolv'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:1122:in `each'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:1122:in `block (2 levels) in resolv'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:1121:in `each'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:1121:in `block in resolv'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:1119:in `each'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:1119:in `resolv'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:521:in `fetch_resource'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:507:in `each_resource'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:402:in `each_address'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:116:in `block in each_address'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:115:in `each'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:115:in `each_address'
      /srv/fluentd/lib/ruby/2.7.0/resolv.rb:58:in `each_address'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/excon-0.81.0/lib/excon/socket.rb:110:in `connect'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/excon-0.81.0/lib/excon/socket.rb:49:in `initialize'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/excon-0.81.0/lib/excon/connection.rb:463:in `new'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/excon-0.81.0/lib/excon/connection.rb:463:in `socket'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/excon-0.81.0/lib/excon/connection.rb:118:in `request_call'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/excon-0.81.0/lib/excon/middlewares/mock.rb:57:in `request_call'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/excon-0.81.0/lib/excon/middlewares/instrumentor.rb:34:in `request_call'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/excon-0.81.0/lib/excon/middlewares/idempotent.rb:19:in `request_call'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/excon-0.81.0/lib/excon/middlewares/base.rb:22:in `request_call'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/excon-0.81.0/lib/excon/middlewares/base.rb:22:in `request_call'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/excon-0.81.0/lib/excon/connection.rb:273:in `request'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/faraday-excon-1.1.0/lib/faraday/adapter/excon.rb:31:in `block in call'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/faraday-1.4.1/lib/faraday/adapter.rb:55:in `connection'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/faraday-excon-1.1.0/lib/faraday/adapter/excon.rb:31:in `call'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/faraday-1.4.1/lib/faraday/rack_builder.rb:154:in `build_response'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/faraday-1.4.1/lib/faraday/connection.rb:492:in `run_request'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/elasticsearch-transport-7.12.0/lib/elasticsearch/transport/transport/http/faraday.rb:48:in `block in perform_request'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/elasticsearch-transport-7.12.0/lib/elasticsearch/transport/transport/base.rb:288:in `perform_request'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/elasticsearch-transport-7.12.0/lib/elasticsearch/transport/transport/http/faraday.rb:37:in `perform_request'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/elasticsearch-transport-7.12.0/lib/elasticsearch/transport/client.rb:191:in `perform_request'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/elasticsearch-api-7.12.0/lib/elasticsearch/api/actions/bulk.rb:69:in `bulk'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/fluent-plugin-elasticsearch-5.0.3/lib/fluent/plugin/out_elasticsearch.rb:1062:in `send_bulk'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/fluent-plugin-elasticsearch-5.0.3/lib/fluent/plugin/out_elasticsearch.rb:875:in `block in write'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/fluent-plugin-elasticsearch-5.0.3/lib/fluent/plugin/out_elasticsearch.rb:874:in `each'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/fluent-plugin-elasticsearch-5.0.3/lib/fluent/plugin/out_elasticsearch.rb:874:in `write'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/fluentd-1.12.3/lib/fluent/plugin/output.rb:1138:in `try_flush'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/fluentd-1.12.3/lib/fluent/plugin/output.rb:1450:in `flush_thread_run'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/fluentd-1.12.3/lib/fluent/plugin/output.rb:462:in `block (2 levels) in start'
      /srv/fluentd/lib/ruby/gems/2.7.0/gems/fluentd-1.12.3/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'

見る限りfluentd側に問題がありそうなので、issueを掘ってみると全く同じ事象が報告されていたため、 一旦事象が再現しないバージョンに変えてデプロイすることで解決しました。

まとめ

Fluentdを安定運用するために考慮した観点と実際のトラブルケースなどについて紹介してきました。 今回はクライアント側の設定で負荷分散したりと、シンプルさを優先していますが、 将来的にはForwarderとAggregatorの間にKafkaを導入することで、より効率的かつ安全な負荷&障害対策ができることを期待しています。

最短で理解して運用するGrafana Loki

本記事について

Lokiについてまったく知識のない状態の人にとって、1からキャッチアップしていくのは とても大変なことです。

特にLokiはマイクロサービスで構成されているため、何を知るべきなのかの全体像が見えにくいと思っています。

そのため、Lokiをまったく知らない状態から実際に運用検証を開始するために必要なインプットを体系的にまとめました。

具体的には下記の項目で整理します。

  1. Lokiの機能
  2. Lokiを構成するアーキテクチャ
  3. Lokiを構成するプロセス
  4. Lokiのモニタリング
  5. Lokiでのログのリテンション管理
  6. Lokiのデプロイ
  7. Lokiでのデータキャッシュ
  8. Lokiのベストプラクティス

※前提として、Prometheusについての基本的な知識があれば本記事についてもすぐに理解できるかと思います。

1. Lokiの機能

Grafana Lokiとは?

Lokiは3大監視項目である、メトリクス、ログ、トレースのうち、ログを担当するモニタリングツールです。

メトリクス収集のPrometheus、時系列データベースのCortexのアーキテクチャを参考に作られた分散システムの構成になっています。

Grafana Lokiでは何ができるのか?

Lokiの機能としては主には下記のようなことができます。

  • Grafanaと連携してダッシュボードでのログの可視化や検索
  • ログベースでのアラートルールの設定
  • ログデータのマルチテナント管理

注意点としてはマルチテナント前提に設計されてはいるものの、Lokiそのものにテナント認証機能はありません。 よってLokiに保存したり検索をかける前に、テナント認証のプロキシをはさみ、テナント識別用のHTTPリクエストヘッダーを埋め込む、といったことが必要になります。

Authentication

Lokiの主な特徴

Lokiは下記の様な特徴を持ったツールです。

  • Prometheusと同様1つ1つのログデータがラベルを持つ

  • Cortexと同様、書き込み、読み込み、アラーティング、データ圧縮など複数の役割を持った分散システム構成になっている

  • Cortexと同様自分ではデータストレージを持たず、AWSGCPのObject StorageやBigtable、DynamoDB、他のOSSソフトウェアなどを使ってデータを管理する

役割ごとにプロセスを分割しているため柔軟にスケールでき、可用性や信頼性もクラウドプロバイダーなどに移譲できるというのが大きな特徴です。

2. Lokiのアーキテクチャ

Lokiのアーキテクチャは図のようになっています。 データには、転置インデックスである「Index」と実データである「Chunk」の2種類が存在し、そのデータを書き込んだり、読み込んだり、定期的にチェックしてアラートを飛ばしたり、圧縮したりリテンションを管理するプロセスが存在しているという構成になります。 また、データにはキャッシュ機構も存在しています。

Lokiが扱うデータ

Lokiは受け取ったログデータから、Chunk(実ログデータ)とIndex(検索用転置インデックス)を生成し、 設定、連携されたログデータストレージに保存します。 ただ、ログデータストレージにはなんでも指定できるわけではなく、サポートされているものが決まっています。 また、ChunkとIndexで指定できるストレージも異なります。

Loki Storage

以前はIndexに関してはObject Storageへの保存がサポートされていませんでしたが、現在はBoltDBというローカルDBに一旦保存し、BoltDB Shipperという仕組みを使うことで、自動的にObject Storageへ同期してくれるようになりました。

3. Lokiを構成するプロセス

Lokiの主要なプロセスとして下記が挙げられます。

  • Distributor
  • Ingester
  • Querier
  • Querier Frontend
  • Ruler
  • Compactor
  • Table Manager

このうち、Lokiを最低限動かすのに必要なプロセスはDistributor、Ingester、Querierです。 他のプロセスは、機能的に必要な場合やパフォーマンス改善で必要な場合に足していく形になります。

では実際に各プロセスについて役割を追っていきます。

Distributor

書き込みリクエストを最初にハンドリングするコンポーネントで、受け取ったログを適切なIngesterへつなぎます。 IngesterへのルーティングはConsistent Hashアルゴリズムを用いてルーティングします。 ハッシュの計算にはlogのラベルとtenant IDを用い、計算されたハッシュ値より大きくて一番近い値を持つIngesterへルーティングされます。

また、バリデーション、データ加工、Rate Limitの役割も担っており、 特にRate Limitは全体のRate LimitをDistributorの台数で割った値を一台あたりのRate Limitに設定します。 よってロードバランサーを置いて、均等にトラフィックを分散するのが有効です。

レプリケーション

データ保護、及びIngesterの入れ替わりに対応するため、 通常複数のIngesterに複製してログを送ります。(デフォルトでは3台に送る) また、データの一貫性を保つため、書き込み完了の判定にはquorumを用いています。 quorumは、

floor(replication_factor / 2) + 1

で計算され、例えば送信先が3台あったら2台に書き込みが成功しないと書き込み失敗になります。 また複製として使用するIngesterは、最初にハッシュ値がヒットしたIngesterから、Consistent HashのRing上を時計回りに順番にレプリカ数分ピックアップします。

バリデーション

バリデーションプロセスでは以下のような内容をチェックしており、バリデーションに失敗すると書き込みエラーを返します。

  • ログの時系列はあっているか?
  • ログは大きすぎないか?
  • Prometheus形式のラベルになっているか?

特に時系列の概念は重要で、最後に受け取ったログのtimestampより前のtimestampのログは受け取ることができません。

詳しくこちらに記載されています。

Logs must be in increasing time order per stream

データ加工

ラベルを元にハッシュ値を作るので、同じ構成のラベルは同じハッシュ値になるようラベルの並び順をソートして正規化しています。

たとえば、これらのラベルが同じハッシュ値になるように並び順を揃えます。

{job="syslog",env="dev"}
{env="dev",job="syslog"}

Rate Limit

Rate Limitでは受け取る書き込みリクエストを制限します。

リクエストの制限は1つのテナントごとに設定され、 1つのDistributorが制限するRate Limitは、そのテナントに対するLimitをDistributorの数で割ったものになります。

これを実現するためには、Distributorが全体で何台いるのかを各メンバーが知る必要があり、 そのためにクラスタリングしています。

クラスタ情報の保管、管理には、Consul, etcd, memberlist, inmemoryのオプションを選択できます。

memberlistはHashCorp製のクラスタ管理用ライブラリで、Consul等と同様にgossip-protocolを用いてクラスタメンバー間でクラスタ情報の更新を行っています。

memberlist

Ingester

ログを実際に保存する役割を担っています。

受け取ったログのラベルセット + テナントのIDを見て、対応するChunkにログを追加します。 また、もしChunkが存在しない場合は新規で作成します。

Chunkに仕分けられたログは一定時間メモリにバッファされ、一定のタイミングで永続化ストレージにflushされていきます。

この構成だと一定期間ログデータをメモリに置いた状態なので、この間にプロセスがダウンするとデータが揮発してしまいます。 よってWrite Ahead Logという仕組みを用いることで復旧できるようにしています。

WAL

Ingesterは書き込みリクエストを受け取ると、永続化領域にまずログを記録します。

flushしたものはflushしたことがわかるように更新されるため、もしプロセスダウンで復活したときでも、 どのログがflushされていないのかを判別することができ、復旧することができます。

Write Ahead Log

しかし注意点があります。

ログ書き込み用のディスクがfullになり、WALに書き込めない状態でもログの書き込みリクエストはエラーにならずに受信できてしまいます。

つまりWALに書き込まれずにログを処理する時間が発生する可能性があり、 この時間でプロセス停止などが意図せず起こればログデータを失う可能性があります。

ディスク使用量や、WALへの書き込み失敗数などをモニタリングし、検知できるようにしておくことが大切です。

クラスタリング

IngesterはDistributorからConsitent Hashによるルーティングがされるため、 Consistent HashのRingを保存しておく共通のデータストアなどが必要になります。

実際にクラスタリングに使えるのはDistributerで列挙したものと同様です。

実際の書き込みフロー

Distributorがリクエストを受け付けてから、Ingesterが処理するまでの流れはここに詳細に記載されています。

Write Path

Querier

LogQLの形式でクエリを受け取り、検索処理するコンポーネントになります。 具体的に、検索処理がどのようなフローで行われるのかは下記がわかりやすいです。

Read Path

Querier Frontend

Querierの前にProxyとして置くことができるOptionalなコンポーネントです。 主な役割として、パフォーマンス向上のために存在しています。

機能としては、検索結果のキャッシュ、クエリ処理のキューイング、大きなクエリの分割を提供します。

Querier Frontend

巨大なクエリを実行したときに困るのはOOMです。 よってQuerier Frontendのレイヤで小さなクエリに分割し、キューイングして別々のQuerierに分散させることで、一つのQuerierに大きな負荷がかかることを回避することができます。 Querierから返ってきた結果はこのレイヤで統合されて、最終的なレスポンスを返します。

Ruler

Querierに定期的にクエリ発行して、AlertManagerにアラートを飛ばすことのできるコンポーネントです。

クエリそのものはLogQLという形式ですが、Prometheusと同じようなフォーマットでルールを記載することができます。

Alerting

metricsの出力機能がなかったり、exporterが提供されていないようなプロダクトに対してもログベースで気軽にアラート設定を行うことができます。

Compactor

Compactorについてはあまり情報がありませんでしたが、一定周期でIndexデータを最適に圧縮してくれるコンポーネントのようです。

compactor_config

Table Manager

Lokiはストレージのバックエンドとして、DynamoDBやBigTableなどの、テーブルベースのDBをサポートしています。 Table ManagerはそういったテーブルベースのDBに対して、スキーマの変更、バージョン管理や、データのリテンション管理を行うことができます。

Table Manager

注意点として、S3などのObject Storageを使う場合は、Table Managerのスコープ外なので、S3側でLifecycle設定を通してリテンション管理を行うなどが別途必要になります。

Lokiのプロセス実行モードについて

Lokiはすべてのプロセスを一つのバイナリでまとめて実行するモード(モノリシックモード)と、マルチプロセスに構成するモード(マイクロサービスモード)があります。

モノリシックモードは、簡単にプロセスを立ち上げてすぐに機能を検証することが可能ですが、柔軟なスケールができません。

すべて一緒にスケールしないといけないので、例えば読み込み用のプロセスだけスケールするなどできず、リソース上の無駄が発生したりします。

そのため、モノリシックモードは検証やスモールスタート用に用いるのが推奨されており、ある程度の規模の本番環境ではマイクロサービスモードで運用するのがStandardのようです。

StatelessなプロセスとStatefulなプロセス

LokiにおいてStatefulなプロセスは、IngesterとQuerierです。 IngesterはWALやChunkを一定期間ローカルにバッファする性質上、Statefulなのは明白ですが、Querierに関してはboltdb-shipperを利用してIndexを保存している場合にStatefulになるようです。 ※この辺りの理由は調査中

Upgrade Guide

4. Lokiのモニタリング

Lokiの各プロセスはPrometheus形式のmetricsを出力します。 よって、汎用的なプロセスの死活監視などを行いつつ、Lokiのmetricsを見て詳細な機能に関するモニタリングを行うことになります。

前述したWALに関するメトリクスは、Observabilityに記載されてはいませんでしたが、実装には記述されていました。

5. Lokiでのログのリテンション管理

前述した通り、TableベースのDBをバックエンドにする場合は、TableManagerを使ってリテンション管理を設定するのが良いです。

そうでなければストレージそのものに備わっている機能を使って管理するか、自前で仕組みを作る必要があります。

6. Lokiのデプロイ

幸いHelmチャートが用意されているので、基本的にはこれを使うと良いです。

マイクロサービスモード

モノリシックモード

7. Lokiのデータキャッシュ

Lokiではアーキテクチャ図にもある通り、Ingester、Querier、Querier Frontend、Rulerのレイヤでそれぞれでキャッシュを保持します。 例えば、Ingesterは前述したとおり、データを一時的にメモリに置き、期限が切れたタイミングでWriteBackします。 また、Querier Frontendは検索結果のキャッシュを保持しています。 このキャッシュのバックエンドには、Redis、memcache、in-memoryを指定でき、 キャッシュの影響でパフォーマンスが大きく変わるため、チューニングの余地があります。

8. Lokiのベストプラクティス

ラベルのカーディナリティに配慮する

Lokiのログデータも、Prometheusと同様にラベルをつけることで検索に役立てることができますが、 同様にカーディナリティに配慮する必要があります。 カーディナリティとは「何種類の値を取りうるか」での数値で、これがあまりにも膨大、もしくは予測できない場合、チャンクのサイズも膨大になってしまい、ストレージ容量の消費とロードにかかる時間がボトルネックになってしまいます。 Lokiでは、経験則的に、1桁 ~ 10台の値に押さえておくように推奨されています。 これ以上にカーディナリティの高い、もしくは予測ができない無制限のパラメータはラベルではなく、文字列一致や、正規表現のパターンマッチなどを使うことが推奨されています。

まとめ

ここまでの内容を踏まえることで、Lokiは実際に自分たちの環境、要件にフィットするのかを検証することができるようになったはずです。

まだまだ情報不足であり、運用が難しいプロダクトだとは思いますが、本記事がお役に立てれば幸いです。

追記

こちらのkubenews 第20回でDemoを交えつつ、紹介させていただきました www.youtube.com

また、動画内のデモはこちらのmanifestを使うことで再現することができます github.com

Fluentdのバッファリングで抑えておくべき大事なポイント

概要

Fluentdで障害設計をする上でバッファリングの概念は切っても切り離せません。

本記事では、ドキュメントだけでは拾いきれないものも踏まえ、

Fluentdのバッファリングで抑えておくべき情報を体系的にまとめます。

バッファリングとは?

Fluentdではログをバッファリングしてまとめて送信するための仕組みが用意されています。

これは下記のような用途に用いることができます。

  • 送信先がダウンしていたときに一時的に保管しておく
  • 送信先のキャパシティに合わせて送信流量を制限する

Fluentdにはメモリ上、もしくは永続化ディスク上にバッファを保管しておく仕組みが用意されています。

バッファの構造

バッファの構造は下記のようになっています。

引用: https://docs.fluentd.org/buffer

Output Pluginごとに一つバッファ領域を持っており「stage」と「queue」という2つのフェーズを持ちます。

まずログが取り込まれるときは、「stage」へと書き込まれ、 これが成功することでInput/Filter Pluginの処理は完了とみなされます。

※例えば、forward pluginのrequire_ack_responseを用いる場合、Aggregator側で、受け取ったログがstageに書き込まれたタイミングでACKを返します。

そして特定のタイミングでqueueにenqueueされ、順番にPluginが扱う送信先に送信されていきます。

stageの構造

stageに書き込まれる時、ログはchunkという単位でグルーピングされてまとめられます。 このchunkは、chunk keyという指定されたkeyをもとに、同じ値を持つグループでまとめられます。 chunk keyには、時間、タグ、特定のレコードを指定することができます。

stageからenqueueする処理

Fluentdのプロセスは、1つ専用のスレッドを作り、 そのスレッドで、stageにいるchunkをすべて走査し、 条件にあてはまるchunkをenqueueしていくという処理をintervalごとにループして繰り返し実行しています。

この辺りの処理

flush modeについて

stageからqueueに追加されるときの動作には、4つのモードがあります。

これらはflush_modeと呼ばれ、bufferのconfigで指定可能です。

flushと呼ばれていますが、この文脈ではstageからqueueへの追加されるまでの処理を「flush」としています。

lazy mode

時刻と期間をベースにchunkを分けてflushしていくmodeです。

timekeyパラメータに指定されている期間と時刻をもとにchunkを分割します。

例えばtimekeyに1hを指定した場合、 12:00 ~ 12:59の間のログ, 13:00 ~ 13:59の間のログのようにグループ分けされ、1hごとにenqueueされます。

interval mode

flush intervalごとにenqueueしていくmodeです。

lazyと違って時刻の考慮はなく、シンプルに一定時間ごとにenqueueします。

flush_invervalという名前なのでわかりにくいですが、ログの送信先に送信されるまでのintervalではなく、あくまでもenqueueされるまでのintervalです。

immediate mode

chunkに書き込まれた瞬間にenqueueされるモードになります。

default mode

defaultでは、timekeyが指定されていればlazy、そうでなければintervalで実行されます。

chunkがenqueueされるタイミング

chunkがenqueueされるタイミングは2つあります。

  1. flush intervalもしくはtimekeyの時間ごとのタイミング
  2. chunk sizeが指定したサイズ以上になったタイミング

buffering parametersのように、chunkはsize limitやrecord数を指定でき、その制限を超えたタイミングでenqueueされます。

Buffer Overflowした時の挙動

buffering parametersに書かれているように、bufferの領域にはサイズ上限が設けられています。

これ以上のサイズのデータがたまるとBuffer Overflow Errorを起こします。

これが発生した時、どういう挙動になるかはInput Pluginのハンドリング次第になります。

よく使われるtail pluginの場合は、Buffer Overflowが収まるまで何度も同じ行の読み込みを繰り返し(1秒ごと、もしくはファイルの変更ごと)、成功するまではpos fileの更新を止めます。

この辺りの処理

設定次第ですが、ログがrotateされる前にバッファが復旧される必要があります。

あまりにもBuffer Overflowが発生する場合は、スループットが追いついていないのでスケールアップするなり、バッファサイズの見直しなりが必要になります。

ちなみにBuffer Overflow ErrorはRecoverableなErrorなため、secondaryなどをバックアップ先として指定していてもこのケースでは使用されません。

queueのサイズが制限に達した時の挙動

enqueueされたchunkの数が、queued_chunks_limit_sizeで指定された数に達している場合、chunkの数がはけるまでenqueueされません。

しかしデータが失われるわけではなく、前述したループ処理で、単にenqueueが見送られるだけになります。

この辺りの処理

ただflush_at_shutdownをtrueにしている場合、shutdown時のenqueue処理ではqueued_chunks_limit_sizeは無視されます。

ログの送信が失敗したときの挙動

ここに書かれている通り、成功するまで、もしくはリトライの制限に引っかかるまでリトライを行います。

リトライにはバックオフがあり、retry_max_intervalに達するまでリトライの間隔が延び続けます。

回数制限をつけたり、無限にリトライするように指定することができます。

また、この間にもstageにはどんどん書き込みが入るのでバッファの使用量には注意が必要です。

※prometheus形式のmetrics exportを有効にしている場合、使用可能な残りのバッファサイズを監視できます

まとめ

バッファを使うにあたって、知っておくべき概念と、細かい挙動についてまとめました。

コードを読まないと分からない点もふくめてまとめたので、KubernetesでFluentdの信頼性を担保するための3つの観点と合わせて、実際にFluentdをProduction運用にする際に役立てられれば幸いです。

KubernetesでFluentdの信頼性を担保するための3つの観点

概要

GKEなどを使えば自動的に標準出力のログが集計&集約され、Cloud Loggingなどを通して可視化されますが、 オンプレミス環境でKubernetesクラスタを構築する場合そうはいきません。 また単純なアプリケーションログの集計以外にも、 Kubernetesを使ってログ、データ集計をしている人はFluentdを運用しなくてはならない人は多いと思います。 本記事では、ログの集計、集約のデファクトスタンダードであるFluentdをKubernetes上に展開する上で、 信頼性を担保するための観点を整理します。

想定アーキテクチャ

想定アーキテクチャとしては現場でよく構築されている、図のような構成を用います。

f:id:taisho6339:20210422151430p:plain

アーキテクチャの特徴

クラスタに、FluentdがForwarderとAggregatorという2つのロールでそれぞれ存在しています。

  • Forwarder

    • DaemonSetでデプロイされる
    • 各コンテナの出力ログをあつめ、Aggregatorに送信することだけが唯一の責務
  • Aggregator

    • Deployment(もしくはStatefulSet)でデプロイされる
    • ForwarderからTCPでログを受け取る
    • filterを用いた加工処理や、最終的なデータストアへのログ送信を担う
  • ForwarderはAggregatorへServiceリソース経由でアクセスする

担保すべき信頼性

今回は「どこかに障害が発生したとしてもできる限りログを損失しないこと」を目標とし、 Forwarder, Aggregatorそれぞれで下記3つの観点をチェックしていきます。

  1. Podのクラッシュへの対応
  2. Podの退避への対応
  3. ログの宛先のダウンへの対応

※またログの損失を予防する代わりに、重複するログは多少許容することとします。(at-least-once)

1.Podのクラッシュへの対応

Forwarderの場合

f:id:taisho6339:20210422151446p:plain

Forwarderの役割は実際のログファイルから少しずつログを読み取りAggregatorへ送信することです。 また、Fluentdはパフォーマンス向上や、ログの送信先がダウンしていても問題ないようにバッファリング機構を持っています。 この前提から、FowarderのPodがクラッシュするときに想定したい注意点として以下の2つがあげられます。

(1) クラッシュからの復帰後、以前読んだところから読み取りを再開できるようにする

「ログの読み取り済みの位置を記録する」ことで対応します。 ログをファイルから読み取るとき、通常tail pluginを用いますが、 tail pluginにはpos fileという機能があります。

pos file

これを使うことで、読み取ってからバッファ済みになったログファイルの位置を記録しておくことができます。

(2) バッファされた未送信のログの損失を防ぐ

こちらは「未送信のバッファを永続化しておく」ことが必要です。 バッファリングにはメモリバッファとファイルバッファがありますが、 ファイルバッファを使っておくとクラッシュ時に損失を防ぐことができます。

file type buffer

Aggregatorの場合

f:id:taisho6339:20210422151532p:plain

Aggregatorの役割は、ForwarderからTCPで受け取ったログを加工、フィルタ処理を行い、最終的なデータストアへ送信することです。 受け取った時点で、加工処理が走り、バッファに書き込みがされてからForwarderへACKが返ります。 なので、Aggregatorとしては受け取ったログのbufferをPodがクラッシュしたとしても保持し続けることが大事です。

これは、Forwarderで用いたfile bufferを指定しておけばOKです。

2. Podの退避への対応

Podの退避、移動や削除は以下のような様々なタイミングで訪れます。

  • NodeのShutdown(Scale Inなど)、Replace、メンテナンス
  • DaemonSet, DeploymentのUpdate

つまり、Podの削除とノード間移動を前提に考えなくてはいけません。

Forwarderの場合

f:id:taisho6339:20210422170640p:plain

ForwarderはDaemonSetなので、 DaemonSetのUpdate発生時、Nodeが特にShutdownしないようなケースであれば、 hostPathに前述のpos fileとfile bufferをおいていればPodがいかに入れ替わろうと復旧可能です。 ※ただしemptyDirはPodが退避されると一緒に削除されるのでNG

しかし、NodeもShutdownするケースは、Nodeのディスクにデータを残しておけば済む話ではないため注意が必要です。 よって、以下の2点を考慮する必要があります。

(1) Nodeがshutdownしてしまうので、プロセス終了時にバッファをflushする

flush_at_shutdownというパラメータがbufferingの設定の中にあるので、これを有効にしておけばOKです。

flust_at_shutdown

また、ちゃんとflushするのに必要な時間を確保するため、terminationGracePeriodSecondsは十分に取っておきます。

(2) プロセス終了時のflushのタイミングでAggregatorがダウンしているケースを考える

このケースは完全にログの送信を担保することはできません。 プロセスshutdown時にflushが失敗した場合、 secondaryを設定していればsecondaryにバックアップしてくれるような実装になっていれば回避できますが、 現状コードを読む限りそうはなっていません。

この辺りの実装

つまりどうしても損失させられないようなログ(監査ログなど)は、 はじめからsecondaryではなくcopyプラグインを活用して、複数箇所に同時に保存しておく、 といった対応が必要になります。

Aggregatorの場合

f:id:taisho6339:20210422151544p:plain

DaemonSetと違うのは、すべてのNodeに1台だけ存在する構成にはなっていないので、 Node配置のAffinity設定や、一気にPodがダウンしないように気を使ってあげる必要があります。 またDaemonSetと違い、TCPでリクエストを受ける構成なので、 プロセスshutdown時にSIGTERMとbuffer flushのタイミングを考えなくてはいけません。 それを踏まえ、下記の2点を検討します。

(1) Pod削除時にbufferをflushする

Forwarderのときと同じく、flush_at_shutdownを行います。 AggregatorはForwarderと違って、TCPで通信を受け付けるので、 新規のリクエストを止めてからflushするようにする必要があります。 具体的にはPodのpreStopでsleepさせて、 しっかりServiceへのリクエストを止めてからSIGTERM => flush処理に移るようにするのが安全です。

また、flush時に宛先がdownしているとForwarderと同じようにデータが損失してしまうため、 Volumeをアタッチしておき、バッファの保存先をそこに指定おけば復旧可能になります。

(2) 一気に複数のPodが同時にUnavailableにならないようにする

特定NodeにPodが集中しないよう、Affinity設定を入れます。

Pod Affinity

また、PodDisruptionBudgetを活用し、UnavailableなPodの数を制限するようにします。

PDB

3. ログの宛先のダウンへの対応

Forwarderの場合

f:id:taisho6339:20210422151501p:plain

宛先がダウンしている場合でもログが失われないようにするために、前述したfile bufferを用います。 この際、2つ注意点があります。

(1) Aggregatorがバッファに書き込み完了したことを保証する

AggregatorがTCPでログを受け取っても、バッファにちゃんと書き込まれているとは限りません。 よって、Aggregatorがデータを受け取った直後、バッファに書き込まれる前にプロセスが終了してしまった場合データが失われます。

これに対応するため、 forwardのpluginには、Aggregatorがbufferに書き込み完了しACKを返すまで、送信が完了したとみなさないようにするパラメータがあります。

require_ack_response

このパラメータを有効にすることで、Aggregator到達時点まではat-least-onceを保証することができます。

(2) Aggregatorへの送信失敗に備える

Aggregatorへの送信が失敗に備えるには、バッファの永続化、リトライ、secondaryを意識する必要があります。 そして、送信失敗したとしても復旧可能にしておく方法として2つ手段があります。

  1. リトライを無限にし、バッファをHostのディスク領域に永続化しておく

  2. リトライ回数に制限を設けておき、制限に達したらsecondaryの送信先に送信する

バッファサイズは、ディスク容量、オープンできる最大ファイルディスクリプタ数、時間辺りのログ流量 * 障害許容時間などを加味して決定します。 またsecondaryは、リトライが制限に達するか、回復不可能なエラーが発生した場合に送信先として使われることになります。

Aggregatorの場合

f:id:taisho6339:20210422151557p:plain

Aggregatorの場合は、Forwarderと違い、require_ack_responseなどのパラメータは使えません。 到達保証は宛先データストアと宛先へのoutput pluginの実装次第になります。

Datastoreへの送信失敗に備える

こちらもForwaderからAggregatorへの送信失敗ケースと同様の観点で考えます。

  1. リトライを無限にし、PersistentVolumeをAttachしてそこでバッファのファイルを永続化する

  2. リトライ回数に制限を設けておき、制限に達したらsecondaryの送信先に送信する

Aggregatorの場合は、DaemonSetではないので必ず同じノードで起動してくれる保証はありません。 そこでDeploymentではなくStatefulSetでデプロイしておき、VolumeをAttachすることでバッファを永続化しておくことができます。

まとめ

まとめるとこれまで、述べた3つの観点に対し、下記のように対応すればOKです。

  • Forwarder

    • input
      • tail ではpos fileを設定する
    • output
      • file bufferを使う
      • require_ack_responseを設定する
      • hostPathにfile bufferを永続化しておく
      • flush_at_shutdownを設定する
    • pos fileやbufferingにはemptyDirを使わない
    • どうしても失いたくないログはcopy pluginで二重に保存しておくと安心
  • Aggregator

    • affinityで分散しておく
    • PDBで一気にunavailableにならないよう気をつける
    • リクエストを止めてからflushするようSIGTERMのタイミングを調整する
    • outputでは、
      • file bufferを使う
      • VolumeをAttachしてそこにfile bufferを永続化しておく
      • flush_at_shutdownを設定する
    • bufferingにはemptyDirを使わない
    • どうしても失いたくないログはcopy pluginで二重に保存しておくと安心