Изображение


Содержание


1. Введение: Зачем нужен Fluentd и почему именно он
2. Архитектура Fluentd: как работает коллектор логов
3. Установка Fluentd: все способы для Linux, Docker и Kubernetes
4. Базовая конфигурация: файл fluent.conf от А до Я
5. Input-плагины: источники логов и их настройка
6. Filter-плагины: парсинг, трансформация и обогащение данных
7. Output-плагины: куда отправлять логи
8. Буферизация и надёжность: как не потерять логи
9. Fluentd в Kubernetes: Daemonset, Sidecar и Fluent Bit
10. Интеграция с EFK стеком: Elasticsearch, Fluentd, Kibana
11. Продвинутые техники: маршрутизация, агрегация и мультитенантность
12. Мониторинг и отладка Fluentd в production
13. Производительность и оптимизация: настройка под нагрузку
14. Безопасность: TLS, аутентификация и разграничение доступа
15. Часто задаваемые вопросы (FAQ)
16. Заключение: Fluentd в 2026 году — итоги и перспективы



Введение: Зачем нужен Fluentd и почему именно он


Представьте ситуацию: ваш сервис в Kubernetes упал в три часа ночи. Дежурный инженер заходит в систему, чтобы понять причину, — и видит разрозненные логи из десятков подов, записанные в разных форматах, часть которых уже потерялась при перезапуске контейнеров. Поиск причины занимает часы вместо минут. Это классическая проблема, с которой сталкивается каждая команда, не выстроившая нормальный pipeline сбора логов.

Fluentd — это open-source коллектор данных, который решает именно эту задачу. Он разработан компанией Treasure Data и передан в управление Cloud Native Computing Foundation (CNCF), что означает его статус зрелого, поддерживаемого проекта уровня enterprise. Fluentd объединяет логи из множества источников, преобразует их в единый формат и направляет в нужные системы хранения и анализа.

Почему Fluentd стал стандартом де-факто для контейнерных сред? Во-первых, он изначально проектировался с учётом требований облачных архитектур: поддержка Docker, нативная интеграция с Kubernetes, возможность работы в режиме DaemonSet. Во-вторых, экосистема плагинов насчитывает более 500 расширений, которые позволяют подключиться к любому мыслимому источнику данных и направить логи в любую систему хранения. В-третьих, Fluentd написан на Ruby с горячими путями на C, что обеспечивает достаточную производительность при разумном потреблении ресурсов.

Конкуренты у Fluentd есть — это Logstash из стека ELK, Filebeat, Vector и более легковесный Fluent Bit (который тоже создан командой Fluentd). Каждый из них занимает свою нишу. Logstash мощнее в части обработки данных, но значительно тяжелее. Filebeat легче, но беднее функционально. Fluent Bit оптимален для встраивания в контейнер или на IoT-устройство. Fluentd же находится на золотой середине: достаточно мощный для enterprise-задач и достаточно лёгкий для запуска на каждом узле кластера.

В этом руководстве мы пройдём весь путь: от понимания архитектуры до настройки production-ready pipeline в Kubernetes. Мы рассмотрим реальные конфигурации, разберём типичные ошибки и дадим конкретные рекомендации по оптимизации. Материал подойдёт как инженерам, которые только начинают работать с централизованным логированием, так и опытным DevOps-специалистам, которые хотят систематизировать свои знания о Fluentd.

Руководство актуально для Fluentd версии 1.16.x, которая является актуальной стабильной веткой в 2026 году. Все примеры конфигураций проверены на практике и готовы к использованию с минимальными изменениями.



Архитектура Fluentd: как работает коллектор логов


Прежде чем переходить к установке и настройке, необходимо понять, как Fluentd устроен внутри. Без понимания архитектуры сложно правильно спроектировать pipeline и диагностировать проблемы в production.

#### Базовая модель: Input → Filter → Output

Вся работа Fluentd строится на трёх ключевых концепциях: источники данных (Input), обработчики (Filter) и назначения (Output). Каждый входящий лог-запись проходит через этот конвейер последовательно.

Input-плагины отвечают за получение данных. Они могут читать файлы с диска, слушать сетевые соединения, подписываться на очереди сообщений или опрашивать API. Примеры: tail (чтение файла), forward (получение данных по сети от другого Fluentd или Fluent Bit), http (HTTP endpoint), syslog.

Filter-плагины обрабатывают данные в потоке без их сохранения. Они могут парсить текст в структурированные поля, добавлять метаданные, фильтровать нежелательные записи, маскировать персональные данные. Примеры: parser, record_transformer, grep, geoip.

Output-плагины отвечают за отправку обработанных данных в целевые системы. Примеры: elasticsearch, s3, kafka, file, stdout (для отладки).

#### Теги и маршрутизация

Каждая запись в Fluentd имеет тег — строку, которая определяет её маршрут через систему. Тег — это не просто метка, это механизм маршрутизации. На основе тегов Fluentd решает, какие фильтры применить к записи и в какой output её направить.

Теги имеют иерархическую структуру, разделённую точками: app.backend.error, kubernetes.pod.nginx. Это позволяет использовать шаблоны в директивах match: match app. подхватит все теги, начинающиеся с app.

Пример базовой схемы маршрутизации:

text
<source>
@type tail
tag app.access
path /var/log/nginx/access.log
</source>

<filter app.>
@type parser
key_name log
<parse>
@type nginx
</parse>
</filter>

<match app.>
@type elasticsearch
host elasticsearch
port 9200
</match>


В этом примере все записи с тегом, начинающимся на app., проходят через парсер nginx и затем отправляются в Elasticsearch.

#### Событие (Event) как единица данных

Единица данных в Fluentd называется событием (event). Каждое событие состоит из трёх компонентов:

- tag — строка маршрутизации
- time — временная метка в формате Unix timestamp
- record — ключ-значение пары с данными (хэш в терминологии Ruby)

Например, одна строка из лога Nginx превращается в событие вида:

tag
: app.access
time: 1704067200
record: {
"remote": "192.168.1.1",
"method": "GET",
"path": "/api/users",
"status": 200,
"size": 1234
}


Именно работа с событиями, а не с сырыми строками, делает Fluentd мощным инструментом обработки данных.

#### Многопоточная модель и жизненный цикл

Fluentd использует многопоточную модель обработки. Каждый input-плагин работает в своём потоке, что позволяет одновременно читать из множества источников без блокировки. Buffer-система обеспечивает асинхронную передачу данных в output, что позволяет переживать временные недоступности целевых систем без потери данных.

Жизненный цикл записи в Fluentd:
1. Input-плагин получает данные и создаёт событие с тегом
2. Событие передаётся в движок маршрутизации
3. Применяются фильтры, соответствующие тегу
4. Событие попадает в буфер output-плагина
5. Output-плагин сбрасывает буфер в целевую систему

#### Отличие Fluentd от Fluent Bit

Fluent Bit — это более лёгкий форвардер, написанный на C, который часто используется в связке с Fluentd. Типичная архитектура: Fluent Bit работает как агент на каждом узле (или в каждом поде), собирает логи и пересылает их в централизованный агрегатор Fluentd, который выполняет тяжёлую обработку и отправляет данные в хранилища.

Fluent Bit потребляет примерно 450 KB RAM в минимальной конфигурации против 40+ MB у Fluentd. Это делает Fluent Bit идеальным для edge-узлов и контейнеров-sidecar. Но количество плагинов у Fluent Bit значительно меньше, и сложная обработка данных там затруднена.

#### Worker-процессы и параллелизм

Начиная с версии 1.0, Fluentd поддерживает режим multi-process workers. Это означает, что можно запустить несколько worker-процессов, каждый из которых обрабатывает свой набор input/output. Это особенно важно для высоконагруженных систем, где один процесс Ruby не успевает обрабатывать поток событий из-за Global Interpreter Lock (GIL).

text
<system>
workers 4
</system>


При использовании нескольких workers необходимо убедиться, что плагины безопасны для работы в многопроцессном режиме, и правильно распределить порты для input-плагинов, слушающих сетевые соединения.



Установка Fluentd: все способы для Linux, Docker и Kubernetes


Существует несколько способов установки Fluentd в зависимости от вашей среды. Рассмотрим каждый из них подробно, с реальными командами и пояснениями типичных проблем.

#### Установка через пакетный менеджер (td-agent / fluent-package)

Исторически Fluentd распространялся в виде пакета td-agent, который включал собственную сборку Ruby, изолированную от системной. Это решало проблему конфликта зависимостей, но создавало путаницу в терминологии. В 2023 году td-agent был переименован в fluent-package, и именно это название используется в 2026 году.

Установка на Ubuntu/Debian:

bash
<h2 id="dobavlenie-repozitoriya">Добавление репозитория</h2>
curl -fsSL https://toolbelt.treasuredata.com/sh/install-ubuntu-jammy-fluent-package5-lts.sh | sh

<h2 id="zapusk-i-dobavlenie-v-avtozagruzku">Запуск и добавление в автозагрузку</h2>
sudo systemctl enable --now fluentd

<h2 id="proverka-statusa">Проверка статуса</h2>
sudo systemctl status fluentd


Установка на CentOS/RHEL/Rocky Linux:

bash
<h2 id="dobavlenie-repozitoriya">Добавление репозитория</h2>
curl -fsSL https://toolbelt.treasuredata.com/sh/install-redhat-fluent-package5-lts.sh | sh

<h2 id="zapusk">Запуск</h2>
sudo systemctl enable --now fluentd


После установки через пакетный менеджер:
- Конфигурационный файл располагается по пути: `/etc/fluent/fluentd.conf`
- Логи самого Fluentd: `/var/log/fluent/fluentd.log`
- Исполняемый файл: `/usr/sbin/fluentd`
- Директория для плагинов: `/etc/fluent/plugin/`

Важно: fluent-package устанавливает собственный Ruby (обычно 3.2.x) в `/opt/fluent/`. Не смешивайте его с системным Ruby — это источник множества проблем с зависимостями плагинов.

#### Установка через RubyGems

Если у вас уже есть Ruby 3.1+ и вы хотите минимальную установку:

bash
<h2 id="ustanovka-fluentd">Установка Fluentd</h2>
gem install fluentd

<h2 id="sozdanie-bazovoy-konfiguratsii">Создание базовой конфигурации</h2>
fluentd --setup ./fluent

<h2 id="zapusk">Запуск</h2>
fluentd -c ./fluent/fluent.conf -vv


Этот способ подходит для разработки и тестирования. Для production рекомендуется использовать пакетную установку или Docker, так как она более предсказуема и воспроизводима.

#### Запуск Fluentd в Docker

Docker-образы Fluentd публикуются на Docker Hub в репозитории fluent/fluentd. Рекомендуется использовать образы с суффиксом -debian вместо -alpine, поскольку часть нативных расширений плагинов требует glibc.

bash
<h2 id="zapusk-bazovogo-obraza">Запуск базового образа</h2>
docker run -d \
--name fluentd \
-p 24224:24224 \
-p 24224:24224/udp \
-v $(pwd)/fluent.conf:/fluentd/etc/fluent.conf \
-v $(pwd)/logs:/fluentd/log \
fluent/fluentd:v1.16-debian-1

<h2 id="proverka-logov">Проверка логов</h2>
docker logs fluentd


Dockerfile для кастомного образа с плагинами:

dockerfile
FROM fluent/fluentd:v1.16-debian-1

<h2 id="pereklyuchaemsya-na-root-dlya-ustanovki-plaginov">Переключаемся на root для установки плагинов</h2>
USER root

<h2 id="ustanavlivaem-nuzhnye-plaginy">Устанавливаем нужные плагины</h2>
RUN fluent-gem install \
fluent-plugin-elasticsearch \
fluent-plugin-kafka \
fluent-plugin-s3 \
--no-document

<h2 id="vozvraschaem-polzovatelya-fluent">Возвращаем пользователя fluent</h2>
USER fluent


Использование кастомного образа — правильный подход для production: вы фиксируете версии плагинов и можете воспроизвести окружение в любой момент.

#### Установка в Kubernetes через Helm

Самый распространённый способ деплоя Fluentd в Kubernetes — Helm Chart.

bash
<h2 id="dobavlenie-repozitoriya">Добавление репозитория</h2>
helm repo add fluent https://fluent.github.io/helm-charts
helm repo update

<h2 id="ustanovka-fluentd-kak-daemonset">Установка Fluentd как DaemonSet</h2>
helm install fluentd fluent/fluentd \
--namespace logging \
--create-namespace \
--set kind=DaemonSet \
--values values.yaml


Пример минимального values.yaml:

yaml
kind: DaemonSet

image:
repository: fluent/fluentd-kubernetes-daemonset
tag: v1.16-debian-elasticsearch8-1

env:
- name: FLUENT_ELASTICSEARCH_HOST
value: "elasticsearch-master"
- name: FLUENT_ELASTICSEARCH_PORT
value: "9200"

resources:
limits:
memory: 512Mi
requests:
cpu: 100m
memory: 200Mi

tolerations:
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule


Обратите внимание на tolerations — без этой секции Fluentd не будет запущен на control-plane нодах, и их логи не будут собираться.

#### Проверка установки

После любого способа установки необходимо убедиться, что Fluentd работает корректно:

bash
<h2 id="test-konfiguratsii-bez-zapuska">Тест конфигурации без запуска</h2>
fluentd --dry-run -c /etc/fluent/fluentd.conf

<h2 id="proverka-versii-i-dostupnyh-plaginov">Проверка версии и доступных плагинов</h2>
fluentd --version
fluentd -e 'exit 0' -c /dev/null 2>&1 | grep -i plugin

<h2 id="otpravka-testovogo-sobytiya-cherez-http">Отправка тестового события через HTTP</h2>
curl -X POST -d 'json={"action":"test","user":1}' \
http://localhost:9880/test.tag




Базовая конфигурация: файл fluent.conf от А до Я


Конфигурационный файл fluent.conf — это центральное место управления всем поведением Fluentd. Важно понять его синтаксис и основные директивы прежде, чем переходить к сложным сценариям.

#### Синтаксис конфигурационного файла

Fluentd использует собственный синтаксис, напоминающий XML. Конфигурация состоит из блоков (директив), заключённых в угловые скобки. Каждый блок имеет тип, указанный после @type.

Основные директивы верхнего уровня:
- `` — определяет источник данных
- `` — определяет правило для обработки и отправки событий
- `` — определяет правило фильтрации/трансформации
- `` — системные настройки Fluentd
- `` — группировка директив для изоляции потоков

Пример минимальной рабочей конфигурации:

text
<h2 id="sistemnye-nastroyki">Системные настройки</h2>
<system>
log_level info
<log>
format json
time_format %Y-%m-%dT%H:%M:%S%z
</log>
</system>

<h2 id="istochnik-chtenie-stdout-konteynerov-docker">Источник: чтение stdout контейнеров Docker</h2>
<source>
@type forward
port 24224
bind 0.0.0.0
</source>

<h2 id="vyvod-v-stdout-dlya-otladki">Вывод в stdout для отладки</h2>
<match >
@type stdout
</match>


#### Директива system

Директива `` задаёт глобальные параметры работы Fluentd. Это первое, что нужно настроить в production:

text
<system>
# Количество worker-процессов (рекомендуется = числу CPU)
workers 2

# Директория для временных файлов буферов
root_dir /var/log/fluentd

# Уровень логирования самого Fluentd
log_level warn

# Формат логов Fluentd
<log>
format json
time_format %Y-%m-%dT%H:%M:%S%z
# Ротация лог-файла самого Fluentd
rotate_age 7
rotate_size 1073741824
</log>

# Настройки RPC-сервера для управления
rpc_endpoint 127.0.0.1:24444
enable_get_dump true
</system>


#### Директива source

Директива `` определяет, откуда Fluentd будет получать данные. Самые часто используемые типы:

tail — чтение файлов по мере их дополнения:

text
<source>
@type tail

# Путь к файлу или glob-шаблон
path /var/log/app/*.log

# Файл для хранения позиции чтения (обязателен!)
pos_file /var/log/fluentd/app.pos

# Тег для событий из этого источника
tag app.log

# Читать файл с начала при первом запуске
read_from_head true

# Обновление через каждые N секунд
refresh_interval 5

# Формат парсинга
<parse>
@type none
</parse>
</source>


forward — получение данных от другого Fluentd или Fluent Bit по протоколу MessagePack:

text
<source>
@type forward
port 24224
bind 0.0.0.0

# Аутентификация (рекомендуется для production)
<security>
self_hostname aggregator.example.com
shared_key my-secret-key
</security>
</source>


http — HTTP endpoint для приёма данных:

text
<source>
@type http
port 9880
bind 0.0.0.0
body_size_limit 32m
keepalive_timeout 10

<parse>
@type json
</parse>
</source>


#### Директива match

Директива `` определяет, что делать с событиями, соответствующими указанному паттерну тега. Порядок директив match важен — Fluentd применяет первую подходящую:

text
<h2 id="oshibki-prilozheniya-v-otdelnyy-indeks-elasticsearch">Ошибки приложения — в отдельный индекс Elasticsearch</h2>
<match app.error>
@type elasticsearch
host elasticsearch
port 9200
index_name app-errors
type_name _doc
</match>

<h2 id="ostalnye-logi-prilozheniya-v-obschiy-indeks">Остальные логи приложения — в общий индекс</h2>
<match app.>
@type elasticsearch
host elasticsearch
port 9200
index_name app-logs
type_name _doc
</match>

<h2 id="vsyo-ostalnoe-v-stdout-dlya-otladki">Всё остальное — в stdout для отладки</h2>
<match >
@type stdout
</match>


#### Использование @label для изоляции потоков

Labels позволяют создавать изолированные pipeline'ы обработки, что удобно при сложной маршрутизации:

text
<source>
@type tail
path /var/log/app/error.log
tag raw.error
@label @ERROR_PROCESSING
<parse>
@type none
</parse>
</source>

<label @ERROR_PROCESSING>
<filter raw.error>
@type parser
key_name message
<parse>
@type json
</parse>
</filter>

<match raw.error>
@type elasticsearch
host elasticsearch
port 9200
index_name errors
</match>
</label>


События, помеченные @label, не попадают в общий поток обработки и обрабатываются только внутри соответствующего блока ``.

#### Переменные окружения и инклюды

В production правильно использовать переменные окружения для конфиденциальных данных:

text
<match >
@type elasticsearch
host "#{ENV['ELASTICSEARCH_HOST']}"
port "#{ENV['ELASTICSEARCH_PORT']}"
user "#{ENV['ELASTICSEARCH_USER']}"
password "#{ENV['ELASTICSEARCH_PASSWORD']}"
</match>


Для удобства поддержки большие конфигурации можно разбить на файлы и использовать директиву @include:

text
@include /etc/fluent/conf.d/*.conf




Input-плагины: источники логов и их настройка


Input-плагины — это точки входа данных в Fluentd. Правильный выбор и настройка input-плагина критически важны для надёжности всего pipeline.

#### Плагин tail: чтение лог-файлов

Плагин tail является самым используемым. Он реализует механизм inotify на Linux (и polling на других ОС) для отслеживания изменений в файлах.

Полная конфигурация с пояснениями:

text
<source>
@type tail

# Один файл или glob-шаблон
path /var/log/nginx/access.log,/var/log/nginx/error.log

# Исключения из glob
exclude_path ["/var/log/nginx/*.gz"]

# Файл позиции — ОБЯЗАТЕЛЕН для надёжного чтения
pos_file /var/log/fluentd/nginx.pos

# Читать с начала при первом запуске или после потери pos_file
read_from_head true

# Интервал проверки новых файлов при использовании glob (секунды)
refresh_interval 5

# Чтение уже ротированных файлов (важно для logrotate)
follow_inodes true

# Тег для событий
tag nginx.access

# Парсинг
<parse>
@type nginx
</parse>
</source>


Типичная ошибка: отсутствие pos_file. Без него при перезапуске Fluentd начнёт читать файл с начала, что приведёт к дублированию событий в downstream-системах.

Многострочные логи:

Для логов Java stack traces и других многострочных форматов нужна специальная настройка:

text
<source>
@type tail
path /var/log/app/app.log
pos_file /var/log/fluentd/app.pos
tag app.java

<parse>
@type multiline
format_firstline /^\d{4}-\d{2}-\d{2}/
format1 /^(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (?<level>[A-Z]+) (?<message>.*)/
</parse>
</source>


#### Плагин forward: приём данных по сети

Плагин forward используется для построения иерархии агрегаторов. Он работает по протоколу MessagePack поверх TCP.

text
<source>
@type forward
port 24224
bind 0.0.0.0

# Максимальный размер принимаемых данных
chunk_size_limit 256m

# Timeout для неактивных соединений
deny_keepalive false

# TLS (рекомендуется в production)
<transport tls>
cert_path /etc/ssl/certs/fluentd.crt
private_key_path /etc/ssl/private/fluentd.key
client_cert_auth false
</transport>

<security>
self_hostname aggregator.prod.example.com
shared_key "#{ENV['FORWARD_SHARED_KEY']}"
</security>
</source>


#### Плагин kubernetes_metadata (специфичен для Kubernetes)

Этот плагин автоматически добавляет к событиям метаданные Kubernetes: имя пода, namespace, labels, annotations. Без него логи из Kubernetes лишены контекста.

Однако важно понимать: плагин kubernetes_metadata реализован как filter, а не как input. Он обогащает уже собранные события метаданными из Kubernetes API.

text
<filter kubernetes.>
@type kubernetes_metadata

# Использовать кэш для уменьшения запросов к API
cache_size 1000
cache_ttl 3600

# Добавлять данные из аннотаций
annotation_match ["fluentd.io/.*"]

# Исключить ненужные поля для экономии места
de_dot false
watch true
</filter>


#### Плагин syslog

Для приёма syslog-сообщений от сетевого оборудования и серверов:

text
<source>
@type syslog
port 5140
bind 0.0.0.0
tag system.syslog

<parse>
@type syslog
with_priority true
</parse>
</source>


#### Плагин http_healthcheck

В Kubernetes Fluentd часто нужна liveness probe. Для этого подойдёт встроенный HTTP endpoint:

text
<source>
@type http
port 9880

<parse>
@type json
</parse>
</source>


Liveness probe в манифесте Kubernetes:

yaml
livenessProbe:
httpGet:
path: /fluentd.healthcheck?json=%7B%22log%22%3A+%22health+check%22%7D
port: 9880
initialDelaySeconds: 5
periodSeconds: 30




Filter-плагины: парсинг, трансформация и обогащение данных


Filter-плагины — это инструменты преобразования данных на лету. Именно здесь происходит основная логика обработки: структурирование сырых строк, обогащение метаданными, фильтрация шума, маскирование чувствительных данных.

#### Плагин parser: структурирование сырых данных

Парсер преобразует сырую строку лога в структурированные поля:

text
<filter app.>
@type parser

# Поле, содержащее сырую строку для парсинга
key_name log

# Что делать с исходным полем после парсинга
reserve_data true # сохранить исходные поля
remove_key_name_field true # удалить поле 'log' после парсинга

<parse>
@type json
time_key timestamp
time_format %Y-%m-%dT%H:%M:%S.%N%z
keep_time_key false
</parse>
</filter>


Встроенные парсеры Fluentd:

- `json` — парсинг JSON
- `nginx` — стандартный формат Nginx access.log
- `apache2` — стандартный формат Apache Combined Log
- `apache_error` — формат Apache error.log
- `syslog` — формат syslog RFC3164 и RFC5424
- `regexp` — кастомное регулярное выражение
- `multiline` — многострочные логи
- `none` — не парсить, сохранить как поле `message`
- `csv` — CSV-формат
- `tsv` — TSV-формат

Кастомный парсер через regexp:

text
<parse>
@type regexp
expression /^\[(?<time>[^\]]*)\] (?<level>[A-Z]+) (?<service>[^ ]+) - (?<message>.*)$/
time_key time
time_format %Y-%m-%d %H:%M:%S
</parse>


#### Плагин record_transformer: трансформация полей

Этот плагин позволяет добавлять, изменять и удалять поля в событии:

text
<filter app.>
@type record_transformer
enable_ruby true

<record>
# Добавить имя хоста
hostname "#{Socket.gethostname}"

# Добавить метку окружения из переменной окружения
environment "#{ENV['APP_ENV'] || 'unknown'}"

# Вычислить поле на основе существующих данных
response_time_ms ${record["duration"].to_f * 1000}

# Нормализовать поле
level ${record["level"].upcase}

# Добавить временную метку приёма
received_at ${Time.now.utc.iso8601(3)}
</record>

# Удалить ненужные поля
remove_keys debug_info, internal_id
</filter>


#### Плагин grep: фильтрация событий

Grep позволяет отфильтровать события по значению полей:

text
<filter app.>
@type grep

<regexp>
# Пропускать только события с уровнем ERROR или WARN
key level
pattern /^(ERROR|WARN)$/
</regexp>

<exclude>
# Исключить события от healthcheck endpoint
key path
pattern /\/health/
</exclude>

<and>
<regexp>
key status
pattern /^5\d{2}$/
</regexp>
<regexp>
key method
pattern /^POST$/
</regexp>
</and>
</filter>


#### Плагин concat: сборка многострочных логов

Для логов, которые поступают построчно, но должны быть собраны в одно событие:

text
<filter java.>
@type concat
key message

# Строка, начинающая новое событие
multiline_start_regexp /^\d{4}-\d{2}-\d{2}/

# Разделитель строк в собранном событии
separator "\n"

# Timeout для незакрытых событий
flush_interval 5

# Максимальное количество строк
limit_bytes 10485760
</filter>


#### Маскирование персональных данных

В compliance-требовательных средах важно маскировать PII (personally identifiable information) прямо в pipeline до сохранения в индексе:

text
<filter app.>
@type record_transformer
enable_ruby true

<record>
# Маскировать номера карт (оставлять только последние 4 цифры)
message ${record["message"].gsub(/\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?(\d{4})\b/, '---\1')}

# Маскировать email
user_email ${record["user_email"] ? record["user_email"].sub(/^(.{2}).*(@.*)$/, '\1*\2') : nil}
</record>
</filter>




Output-плагины: куда отправлять логи


Output-плагины определяют конечные пункты назначения для логов. Выбор output зависит от требований к хранению, поиску и анализу данных.

#### Вывод в Elasticsearch

Elasticsearch — наиболее распространённый output для логов в enterprise-среде:

text
<match app.>
@type elasticsearch

host "#{ENV['ELASTICSEARCH_HOST']}"
port "#{ENV['ELASTICSEARCH_PORT']}"
scheme https

# Аутентификация
user "#{ENV['ELASTICSEARCH_USER']}"
password "#{ENV['ELASTICSEARCH_PASSWORD']}"
ssl_verify true
ca_file /etc/ssl/certs/ca-bundle.crt

# Имя индекса с датой для ротации
index_name fluentd-app-%Y.%m.%d

# Логотип типа записи (для ES 8.x используйте _doc)
type_name _doc

# Использовать ILM (Index Lifecycle Management)
use_legacy_template false
ilm_policy_id fluentd-policy

# Bulk API — отправлять батчами
bulk_message_request_threshold 20MB

# Настройки буфера
<buffer tag,time>
@type file
path /var/log/fluentd/buffer/elasticsearch
timekey 1h
timekey_wait 10m
flush_mode interval
flush_interval 30s
chunk_limit_size 8MB
retry_max_interval 30s
retry_forever true
</buffer>
</match>


#### Вывод в Apache Kafka

Kafka часто используется как промежуточное звено между Fluentd и системами хранения:

text
<match app.>
@type kafka2

brokers "#{ENV['KAFKA_BROKERS']}"

# Тема Kafka — можно использовать тег
default_topic fluentd-logs

# Использовать тег как имя топика
use_default_for_unknown false

# Сериализация
output_data_type json
output_include_tag true
output_include_time true

# Партиционирование
get_kafka_client_log false

# SSL
ssl_ca_cert /etc/ssl/certs/kafka-ca.crt
ssl_client_cert /etc/ssl/certs/kafka-client.crt
ssl_client_cert_key /etc/ssl/private/kafka-client.key

<buffer topic>
@type file
path /var/log/fluentd/buffer/kafka
flush_mode interval
flush_interval 5s
chunk_limit_size 10MB
</buffer>
</match>


#### Вывод в Amazon S3

Для долгосрочного хранения и batch-анализа:

text
<match archive.>
@type s3

aws_key_id "#{ENV['AWS_ACCESS_KEY_ID']}"
aws_sec_key "#{ENV['AWS_SECRET_ACCESS_KEY']}"
s3_bucket "#{ENV['S3_BUCKET']}"
s3_region "#{ENV['AWS_REGION']}"

# Структура пути в S3
path logs/%Y/%m/%d/
s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}

# Сжатие
store_as gzip

# Временной слайс (один файл = один час)
time_slice_format %Y%m%d%H
time_slice_wait 10m

<buffer time>
@type file
path /var/log/fluentd/buffer/s3
timekey 1h
timekey_wait 10m
timekey_use_utc true
chunk_limit_size 256MB
</buffer>

<format>
@type json
</format>
</match>


#### Вывод в Grafana Loki

Loki — легковесная альтернатива Elasticsearch для хранения логов, особенно популярная в Kubernetes-окружениях с Grafana:

text
<match kubernetes.>
@type loki

url "#{ENV['LOKI_URL']}"

# Аутентификация для Grafana Cloud
username "#{ENV['LOKI_USERNAME']}"
password "#{ENV['LOKI_PASSWORD']}"

# Labels для Loki (должны быть низкой кардинальности)
extra_labels {"cluster": "prod-eu", "env": "production"}

# Извлечь labels из полей записи
<label>
namespace $.kubernetes.namespace_name
pod_name $.kubernetes.pod_name
container_name $.kubernetes.container_name
app $.kubernetes.labels.app
</label>

<buffer>
flush_interval 5s
flush_at_shutdown true
chunk_limit_size 1MB
</buffer>
</match>


#### Вывод в несколько назначений одновременно (copy)

Плагин copy позволяет отправить одно событие в несколько output:

text
<match app.>
@type copy

<store>
@type elasticsearch
host elasticsearch
port 9200
index_name app-logs
</store>

<store>
@type s3
s3_bucket my-logs-archive
s3_region eu-west-1
path logs/
</store>

# Третья копия — в stdout для отладки в dev-среде
<store>
@type stdout
</store>
</match>




Буферизация и надёжность: как не потерять логи


Буферизация — один из ключевых механизмов Fluentd, обеспечивающих надёжность доставки данных. Без правильной настройки буферов при временной недоступности Elasticsearch или Kafka логи будут потеряны.

#### Как работает буфер Fluentd

Буфер в Fluentd работает по принципу write-ahead log. Когда output-плагин не может доставить данные (например, Elasticsearch недоступен), события сохраняются в буфере. Как только соединение восстановлено, Fluentd автоматически сбрасывает буфер.

Буфер состоит из чанков (chunks). Каждый чанк — это порция событий, объединённых по тегу и/или времени. Чанки переходят из состояния "staging" (формирование) в "queued" (ожидание отправки) когда достигается лимит по размеру или времени.

#### Типы буферов

Буфер в памяти (memory):
text
<buffer>
@type memory
chunk_limit_size 8MB
total_limit_size 512MB
flush_interval 5s
</buffer>

Быстрый, но данные теряются при перезапуске Fluentd. Подходит для некритичных логов.

Буфер на диске (file):
text
<buffer>
@type file
path /var/log/fluentd/buffer/app
chunk_limit_size 8MB
total_limit_size 10GB
flush_interval 30s
flush_at_shutdown true
</buffer>

Медленнее, но данные сохраняются при перезапуске. Обязателен в production.

#### Полная конфигурация буфера для production

text
<match app.>
@type elasticsearch
host elasticsearch
port 9200

<buffer tag,time>
@type file
path /var/log/fluentd/buffer/elasticsearch

# Ключи партиционирования буфера
# tag — разные чанки для разных тегов
# time — разные чанки для разных временных слайсов

# Размер одного чанка (до достижения — не сбрасывать)
chunk_limit_size 8MB

# Максимальный суммарный размер буфера
total_limit_size 10GB

# Ожидание перед записью нового чанка в очередь
flush_mode interval
flush_interval 30s

# Ждать N секунд после закрытия временного слайса
timekey 1h
timekey_wait 10m

# Количество потоков для сброса буфера
flush_thread_count 4

# Политика повторных попыток
retry_type exponential_backoff
retry_wait 1s
retry_max_interval 300s
retry_randomize true

# Повторять бесконечно (не терять данные)
retry_forever true

# Сбрасывать буфер при остановке Fluentd
flush_at_shutdown true

# Сжатие чанков в буфере
compress gzip
</buffer>
</match>


#### Мониторинг состояния буфера

Переполненный буфер — критическая ситуация. Если `total_limit_size` достигнут, Fluentd начнёт отбрасывать новые события. Необходимо мониторить состояние буфера:

bash
<h2 id="proverit-razmer-direktorii-bufera">Проверить размер директории буфера</h2>
du -sh /var/log/fluentd/buffer/

<h2 id="smotret-logi-fluentd-na-predmet-buffer-overflow">Смотреть логи Fluentd на предмет buffer overflow</h2>
tail -f /var/log/fluent/fluentd.log | grep -i "buffer\|retry\|overflow"


В Prometheus-метриках (если настроен плагин prometheus) важные показатели:
- `fluentd_output_status_buffer_total_bytes` — текущий размер буфера
- `fluentd_output_status_retry_count` — количество повторных попыток
- `fluentd_output_status_num_errors` — количество ошибок



Fluentd в Kubernetes: Daemonset, Sidecar и Fluent Bit


Kubernetes — основная среда работы Fluentd в 2026 году. Существует несколько паттернов развёртывания, каждый со своими плюсами и минусами.

#### Паттерн DaemonSet: один агент на узел

DaemonSet гарантирует, что один экземпляр Fluentd запущен на каждом узле кластера. Это наиболее распространённый подход для сбора системных логов и логов контейнеров.

Fluentd в DaemonSet читает логи контейнеров, которые Docker/containerd сохраняет в /var/log/containers/ в виде симлинков на /var/log/pods/.

Манифест DaemonSet:

yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluentd
namespace: logging
labels:
app: fluentd
spec:
selector:
matchLabels:
app: fluentd
template:
metadata:
labels:
app: fluentd
spec:
serviceAccountName: fluentd
tolerations:
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
containers:
- name: fluentd
image: fluent/fluentd-kubernetes-daemonset:v1.16-debian-elasticsearch8-1
env:
- name: FLUENT_ELASTICSEARCH_HOST
value: "elasticsearch-master.logging.svc.cluster.local"
- name: FLUENT_ELASTICSEARCH_PORT
value: "9200"
- name: FLUENT_ELASTICSEARCH_SCHEME
value: "http"
- name: K8S_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
resources:
limits:
memory: 512Mi
requests:
cpu: 100m
memory: 200Mi
volumeMounts:
- name: varlog
mountPath: /var/log
- name: dockercontainerlogdirectory
mountPath: /var/log/pods
readOnly: true
- name: config-volume
mountPath: /fluentd/etc
volumes:
- name: varlog
hostPath:
path: /var/log
- name: dockercontainerlogdirectory
hostPath:
path: /var/log/pods
- name: config-volume
configMap:
name: fluentd-config


#### RBAC для Fluentd в Kubernetes

Fluentd нужны права для чтения метаданных подов через Kubernetes API:

yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: fluentd
namespace: logging


apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: fluentd
rules:
- apiGroups: [""]
resources: ["pods", "namespaces"]
verbs: ["get", "list", "watch"]


apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: fluentd
roleRef:
kind: ClusterRole
name: fluentd
apiGroup: rbac.authorization.k8s.io
subjects:
- kind: ServiceAccount
name: fluentd
namespace: logging


#### ConfigMap с конфигурацией Fluentd для Kubernetes

yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: fluentd-config
namespace: logging
data:
fluent.conf: |
<system>
log_level warn
</system>

# Чтение логов контейнеров
<source>
@type tail
path /var/log/containers/*.log
pos_file /var/log/fluentd-containers.log.pos
tag raw.kubernetes.*
read_from_head true
<parse>
@type multi_format
<pattern>
format json
time_key time
time_format %Y-%m-%dT%H:%M:%S.%NZ
</pattern>
<pattern>
format /^(?<time>.+) (?<stream>stdout|stderr) [^ ]* (?<log>.*)$/
time_format %Y-%m-%dT%H:%M:%S.%N%:z
</pattern>
</parse>
</source>

# Обогащение метаданными Kubernetes
<filter raw.kubernetes.>
@type kubernetes_metadata
watch true
cache_size 1000
</filter>

# Парсинг JSON логов приложений
<filter raw.kubernetes.>
@type parser
key_name log
reserve_data true
remove_key_name_field true
<parse>
@type json
</parse>
</filter>

<match raw.kubernetes.>
@type elasticsearch
host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
logstash_format true
logstash_prefix kubernetes
include_timestamp true
<buffer>
flush_thread_count 8
flush_interval 5s
chunk_limit_size 2M
queue_limit_length 32
retry_max_interval 30
retry_forever true
</buffer>
</match>


#### Паттерн Sidecar: отдельный контейнер на под

Sidecar-паттерн используется когда приложение пишет логи в файл (не в stdout), или когда нужна изолированная конфигурация для конкретного приложения.

yaml
spec:
containers:
- name: app
image: my-app:latest
volumeMounts:
- name: log-volume
mountPath: /var/log/app

- name: fluentd-sidecar
image: fluent/fluentd:v1.16-debian-1
volumeMounts:
- name: log-volume
mountPath: /var/log/app
readOnly: true
- name: fluentd-config
mountPath: /fluentd/etc
resources:
limits:
memory: 128Mi
requests:
cpu: 50m
memory: 64Mi

volumes:
- name: log-volume
emptyDir: {}
- name: fluentd-config
configMap:
name: app-fluentd-config


#### Паттерн Aggregator: централизованный агрегатор

В крупных кластерах рекомендуется двухуровневая архитектура: Fluent Bit на каждом узле собирает логи и пересылает в центральный Fluentd-агрегатор, который выполняет тяжёлую обработку.

Конфигурация Fluent Bit (агент) для пересылки в Fluentd:

text
[OUTPUT]
Name forward
Match *
Host fluentd-aggregator.logging.svc.cluster.local
Port 24224
Self_Hostname fluent-bit
Shared_Key my-secret-key
tls on
tls.verify on
tls.ca_file /etc/ssl/certs/fluentd-ca.crt


Конфигурация Fluentd-агрегатора (принимает от Fluent Bit):

text
<source>
@type forward
port 24224
bind 0.0.0.0
<security>
self_hostname aggregator.logging
shared_key my-secret-key
</security>
</source>

<h2 id="tyazhyolaya-obrabotka-parsing-obogaschenie-marshrutizatsiya">Тяжёлая обработка — парсинг, обогащение, маршрутизация</h2>
<filter >
@type record_transformer
<record>
cluster_name prod-eu-west
</record>
</filter>

<match >
@type elasticsearch
...
</match>




Интеграция с EFK стеком: Elasticsearch, Fluentd, Kibana


EFK (Elasticsearch, Fluentd, Kibana) — наиболее распространённый стек для централизованного логирования в Kubernetes. Рассмотрим полное развёртывание.

#### Развёртывание Elasticsearch в Kubernetes

bash
<h2 id="dobavlenie-repozitoriya-elastic">Добавление репозитория Elastic</h2>
helm repo add elastic https://helm.elastic.co
helm repo update

<h2 id="ustanovka-elasticsearch">Установка Elasticsearch</h2>
helm install elasticsearch elastic/elasticsearch \
--namespace logging \
--set replicas=3 \
--set minimumMasterNodes=2 \
--set resources.requests.cpu=1000m \
--set resources.requests.memory=2Gi \
--set resources.limits.cpu=2000m \
--set resources.limits.memory=4Gi \
--set volumeClaimTemplate.storageClassName=standard \
--set volumeClaimTemplate.resources.requests.storage=100Gi


#### Настройка index template в Elasticsearch

Перед началом записи данных Fluentd необходимо создать index template с правильным маппингом:

bash
curl -X PUT "http://elasticsearch:9200/_index_template/kubernetes-logs" \
-H 'Content-Type: application/json' \
-d '{
"index_patterns": ["kubernetes-*"],
"template": {
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1,
"index.lifecycle.name": "fluentd-policy",
"index.lifecycle.rollover_alias": "kubernetes-logs"
},
"mappings": {
"properties": {
"@timestamp": {"type": "date"},
"kubernetes": {
"properties": {
"namespace_name": {"type": "keyword"},
"pod_name": {"type": "keyword"},
"container_name": {"type": "keyword"},
"labels": {"type": "object"}
}
},
"level": {"type": "keyword"},
"message": {"type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}}
}
}
}
}'


#### Настройка ILM (Index Lifecycle Management)

ILM позволяет автоматически управлять жизненным циклом индексов: ротировать, переводить на холодное хранение, удалять:

bash
curl -X PUT "http://elasticsearch:9200/_ilm/policy/fluentd-policy" \
-H 'Content-Type: application/json' \
-d '{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_primary_shard_size": "50GB",
"max_age": "1d"
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": {"number_of_shards": 1},
"forcemerge": {"max_num_segments": 1}
}
},
"cold": {
"min_age": "30d",
"actions": {
"freeze": {}
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}'


#### Установка Kibana

bash
helm install kibana elastic/kibana \
--namespace logging \
--set elasticsearchHosts="http://elasticsearch-master:9200" \
--set resources.requests.cpu=500m \
--set resources.requests.memory=1Gi


#### Создание Data View в Kibana

После того как Fluentd начнёт писать данные в Elasticsearch, в Kibana нужно создать Data View (ранее называлось Index Pattern):

1. Открыть Kibana → Stack Management → Data Views
2. Нажать "Create data view"
3. Указать паттерн индекса: `kubernetes-*`
4. Выбрать поле времени: `@timestamp`
5. Нажать "Save data view to Kibana"

После этого логи станут доступны в Kibana Discover для поиска и в Kibana Lens для построения визуализаций.

#### Полезные KQL-запросы для Kibana

text
<h2 id="nayti-vse-oshibki-v-konkretnom-namespace">Найти все ошибки в конкретном namespace</h2>
kubernetes.namespace_name: "production" AND level: "ERROR"

<h2 id="nayti-logi-konkretnogo-poda-za-posledniy-chas">Найти логи конкретного пода за последний час</h2>
kubernetes.pod_name: "api-gateway-*" AND @timestamp > now-1h

<h2 id="nayti-5xx-otvety">Найти 5xx ответы</h2>
http.response.status_code >= 500

<h2 id="nayti-medlennye-zaprosy-bolee-1-sekundy">Найти медленные запросы (более 1 секунды)</h2>
duration > 1000 AND kubernetes.labels.app: "api"




Продвинутые техники: маршрутизация, агрегация и мультитенантность


После освоения базовых концепций можно переходить к продвинутым паттернам, которые позволяют решать сложные задачи enterprise-уровня.

#### Динамическая маршрутизация по содержимому

Иногда нужно направить события в разные destinations в зависимости от содержимого полей:

text
<h2 id="razbit-potok-po-urovnyu-kritichnosti">Разбить поток по уровню критичности</h2>
<match app.>
@type rewrite_tag_filter

# Критические ошибки — в срочный топик
<rule>
key level
pattern /^CRITICAL$/
tag urgent.${tag}
</rule>

# Ошибки — в стандартный error-поток
<rule>
key level
pattern /^ERROR$/
tag error.${tag}
</rule>

# Всё остальное — в общий поток
<rule>
key level
pattern /.*/
tag normal.${tag}
</rule>
</match>

<match urgent.>
@type kafka2
brokers kafka:9092
default_topic alerts-urgent
</match>

<match error.>
@type elasticsearch
index_name errors
</match>

<match normal.>
@type elasticsearch
index_name app-logs
</match>


#### Агрегация метрик из логов

Fluentd умеет не только пересылать логи, но и вычислять метрики на лету:

text
<filter app.>
@type prometheus

<metric>
name fluentd_http_requests_total
type counter
desc HTTP requests total
key status
<labels>
method $.method
status $.status
service app
</labels>
</metric>

<metric>
name fluentd_http_request_duration_seconds
type histogram
desc HTTP request duration
key duration
buckets 0.01, 0.05, 0.1, 0.5, 1, 5
<labels>
service app
</labels>
</metric>
</filter>


#### Мультитенантное логирование

В мультиарендных Kubernetes-кластерах логи разных команд/проектов должны быть изолированы:

text
<h2 id="razdelenie-po-namespace">Разделение по namespace</h2>
<match raw.kubernetes.>
@type rewrite_tag_filter

<rule>
key $.kubernetes.namespace_name
pattern /^team-alpha/
tag tenant.alpha.${tag}
</rule>

<rule>
key $.kubernetes.namespace_name
pattern /^team-beta/
tag tenant.beta.${tag}
</rule>

<rule>
key $.kubernetes.namespace_name
pattern /.*/
tag tenant.shared.${tag}
</rule>
</match>

<match tenant.alpha.>
@type elasticsearch
host elasticsearch-alpha
index_name alpha-logs-%Y.%m.%d
user alpha-user
password "#{ENV['ALPHA_ES_PASSWORD']}"
</match>

<match tenant.beta.>
@type elasticsearch
host elasticsearch-shared
index_name beta-logs-%Y.%m.%d
user beta-user
password "#{ENV['BETA_ES_PASSWORD']}"
</match>


#### Семплирование логов для снижения нагрузки

При высоком объёме логов debug-уровня можно использовать семплирование:

text
<filter app.>
@type sampling

# Пропускать только каждое N-ое событие
sample_unit second
sample_threshold 1000 # если более 1000 событий в секунду
sample_rate 10 # пропускать 1 из 10

# Не семплировать ошибки
<except>
key level
pattern /^(ERROR|CRITICAL)$/
</except>
</filter>


#### Дедупликация событий

При использовании нескольких агентов или при проблемах с сетью возможны дубликаты:

text
<filter >
@type dedot
de_dot false

# Использовать поле unique_id для дедупликации
# Это поле должно генерироваться на стороне приложения
</filter>


#### Обогащение геоданными

Для логов с IP-адресами можно добавить геолокацию:

text
<filter access.>
@type geoip

geoip_lookup_keys remote_addr

<record>
city ${location.city['remote_addr']}
country_code ${location.country_code['remote_addr']}
latitude ${location.latitude['remote_addr']}
longitude ${location.longitude['remote_addr']}
</record>

skip_adding_null_record true
</filter>




Мониторинг и отладка Fluentd в production


Fluentd сам по себе является критическим компонентом инфраструктуры, и его необходимо мониторить не менее тщательно, чем приложения.

#### Встроенный HTTP-мониторинг

Fluentd предоставляет REST API для получения статистики:

text
<source>
@type monitor_agent
bind 0.0.0.0
port 24220
</source>


После этого можно получать данные:

bash
<h2 id="obschaya-statistika-plaginov">Общая статистика плагинов</h2>
curl http://localhost:24220/api/plugins.json | python3 -m json.tool

<h2 id="statistika-konkretnogo-plagina-po-id">Статистика конкретного плагина по ID</h2>
curl http://localhost:24220/api/plugins.json?plugin_id=object:3fc...

<h2 id="konfiguratsiya-polezno-dlya-otladki">Конфигурация (полезно для отладки)</h2>
curl http://localhost:24220/api/config.json


#### Интеграция с Prometheus

Плагин fluent-plugin-prometheus экспортирует метрики в формате Prometheus:

text
<source>
@type prometheus
bind 0.0.0.0
port 24231
metrics_path /metrics
</source>

<source>
@type prometheus_monitor
interval 5
</source>

<source>
@type prometheus_output_monitor
interval 5
</source>


Ключевые метрики для мониторинга:

text
<h2 id="kolichestvo-sobytiy-v-bufere">Количество событий в буфере</h2>
fluentd_output_status_buffer_queue_length

<h2 id="tekuschiy-razmer-bufera-v-baytah">Текущий размер буфера в байтах</h2>
fluentd_output_status_buffer_total_bytes

<h2 id="kolichestvo-povtornyh-popytok-dolzhno-byt-0">Количество повторных попыток (должно быть ~0)</h2>
fluentd_output_status_retry_count

<h2 id="kolichestvo-sobytiy-v-sekundu">Количество событий в секунду</h2>
rate(fluentd_input_status_num_records_total[5m])

<h2 id="lag-otstavanie-ot-realnogo-vremeni">Lag — отставание от реального времени</h2>
fluentd_output_status_emit_records - fluentd_input_status_num_records_total


#### Алерты для Prometheus/Alertmanager

yaml
groups:
- name: fluentd
rules:
- alert: FluentdBufferFull
expr: fluentd_output_status_buffer_total_bytes / fluentd_output_status_buffer_total_bytes_limit > 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "Fluentd buffer is 80% full"

- alert: FluentdRetryLoop
expr: fluentd_output_status_retry_count > 10
for: 5m
labels:
severity: critical
annotations:
summary: "Fluentd has more than 10 retries"

- alert: FluentdNoEvents
expr: rate(fluentd_input_status_num_records_total[5m]) == 0
for: 10m
labels:
severity: warning
annotations:
summary: "Fluentd is not receiving any events"


#### Отладка конфигурации

Режим verbose:
bash
fluentd -c /etc/fluent/fluentd.conf -vv 2>&1 | head -100


Dry-run для проверки синтаксиса:
bash
fluentd --dry-run -c /etc/fluent/fluentd.conf


Отправка тестового события:
bash
<h2 id="cherez-fluent-cat">Через fluent-cat</h2>
echo '{"message": "test event", "level": "INFO"}' | \
fluent-cat --host localhost --port 24224 app.test

<h2 id="cherez-curl-esli-vklyuchen-http-input">Через curl (если включен HTTP input)</h2>
curl -X POST -d 'json={"action":"login","user":1}' \
http://localhost:9880/test.tag


Анализ логов самого Fluentd:
bash
<h2 id="oshibki">Ошибки</h2>
grep -i error /var/log/fluent/fluentd.log | tail -50

<h2 id="preduprezhdeniya-o-bufere">Предупреждения о буфере</h2>
grep -i "buffer\|retry\|drop" /var/log/fluent/fluentd.log | tail -20


#### Диагностика потери событий

Если события теряются, проверьте следующее:

1. Размер буфера: `du -sh /var/log/fluentd/buffer/` — если буфер заполнен, события отбрасываются
2. Retry count: в логах Fluentd должны быть записи о повторных попытках
3. Pos-file: проверьте, что pos-файл существует и обновляется
4. Права доступа: Fluentd должен иметь права на чтение логов и запись в буфер
5. Сеть: проверьте доступность Elasticsearch/Kafka с узла, где запущен Fluentd



Производительность и оптимизация: настройка под нагрузку


В высоконагруженных средах неправильная конфигурация Fluentd может привести к значительному отставанию или потере событий. Рассмотрим методы оптимизации.

#### Оценка текущей производительности

Перед оптимизацией необходимо понять текущие показатели:

bash
<h2 id="kolichestvo-sobytiy-v-sekundu-cherez-prometheus">Количество событий в секунду через Prometheus</h2>
curl -s http://localhost:24231/metrics | grep fluentd_input_status_num_records_total

<h2 id="zaderzhka-obrabotki-lag">Задержка обработки (lag)</h2>
<h2 id="raznitsa-mezhdu-vremenem-sobytiya-i-vremenem-ego-zapisi-v-elasticsearch">Разница между временем события и временем его записи в Elasticsearch</h2>


Типичная производительность Fluentd (один worker):
- Простая конфигурация (tail → elasticsearch): ~10,000-20,000 событий/сек
- С heavy filtering и парсингом: ~3,000-8,000 событий/сек
- С записью в файловый буфер: overhead ~10-15%

#### Настройка workers для параллелизма

text
<system>
workers 4

# Каждый worker получает свой диапазон портов
# Input-плагины с портами должны это учитывать
</system>

<source>
@type forward
port 24224
# Fluentd автоматически назначит порты 24224-24227 для 4 workers
</source>


#### Оптимизация парсинга

Парсинг регулярными выражениями — самая ресурсоёмкая операция. Рекомендации:

1. Используйте нативные парсеры (json, nginx, apache2) вместо regexp, когда возможно
2. Если нужен regexp — делайте его максимально конкретным, избегайте .* в середине паттерна
3. Выносите тяжёлый парсинг на уровень агрегатора, а не агента

text
<h2 id="ploho-medlennyy-regexp">Плохо — медленный regexp</h2>
<parse>
@type regexp
expression /^(?<time>.*?) (?<level>.*?) (?<message>.*)$/
</parse>

<h2 id="horosho-konkretnyy-regexp">Хорошо — конкретный regexp</h2>
<parse>
@type regexp
expression /^(?<time>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z) (?<level>DEBUG|INFO|WARN|ERROR|FATAL) (?<message>.+)$/
time_format %Y-%m-%dT%H:%M:%S.%LZ
</parse>


#### Настройка параметров буфера для производительности

text
<buffer>
@type file
path /var/log/fluentd/buffer

# Больше потоков для сброса = выше пропускная способность
flush_thread_count 8

# Интервал сброса — баланс между латентностью и производительностью
flush_interval 5s

# Крупные чанки = меньше запросов к Elasticsearch
chunk_limit_size 16MB

# Очередь — сколько чанков ждут отправки
queue_limit_length 128

# Сжатие уменьшает I/O на диск и сеть
compress gzip
</buffer>


#### Использование Fluent Bit как легковесного агента

В среде с тысячами подов каждый байт RAM на агенте имеет значение. Fluent Bit потребляет ~450 KB против ~40 MB у Fluentd. Поэтому рекомендуется:

- На каждом узле — Fluent Bit как DaemonSet: сбор и форвардинг логов
- Centralised — Fluentd как Deployment: парсинг, обогащение, маршрутизация

Это позволяет сэкономить значительные ресурсы при сохранении мощных возможностей обработки.

#### Ограничение потребления CPU

В Kubernetes Fluentd может занимать слишком много CPU во время парсинга больших объёмов логов. Используйте resource limits и правильно настройте лимиты:

yaml
resources:
requests:
cpu: 200m
memory: 256Mi
limits:
cpu: 1000m
memory: 512Mi


Важно: не ставьте слишком жёсткий CPU limit — при CPU throttling Fluentd начнёт отставать, буфер заполнится, и события начнут теряться.



Безопасность: TLS, аутентификация и разграничение доступа


Безопасность pipeline логирования часто недооценивают. Между тем, логи содержат чувствительную информацию: токены, пароли, персональные данные пользователей.

#### Шифрование трафика между Fluentd-агентами

Если Fluent Bit или другой Fluentd пересылают данные через сеть — используйте TLS:

Генерация сертификатов:
bash
<h2 id="ca">CA</h2>
openssl genrsa -out ca.key 4096
openssl req -new -x509 -days 3650 -key ca.key -out ca.crt \
-subj "/CN=Fluentd CA"

<h2 id="sertifikat-servera-agregatora">Сертификат сервера (агрегатора)</h2>
openssl genrsa -out server.key 2048
openssl req -new -key server.key -out server.csr \
-subj "/CN=fluentd-aggregator.logging.svc.cluster.local"
openssl x509 -req -days 365 -CA ca.crt -CAkey ca.key \
-CAcreateserial -in server.csr -out server.crt


Конфигурация TLS на агрегаторе:
text
<source>
@type forward
port 24224

<transport tls>
cert_path /etc/fluentd/ssl/server.crt
private_key_path /etc/fluentd/ssl/server.key
ca_path /etc/fluentd/ssl/ca.crt
client_cert_auth true # Взаимная аутентификация
</transport>

<security>
self_hostname aggregator.logging
shared_key "#{ENV['FORWARD_SHARED_KEY']}"
</security>
</source>


Конфигурация TLS на агенте (Fluent Bit → Fluentd):
text
<match >
@type forward

<server>
host fluentd-aggregator.logging.svc.cluster.local
port 24224
</server>

<security>
self_hostname fluent-bit-agent
shared_key "#{ENV['FORWARD_SHARED_KEY']}"
</security>

tls true
tls_cert_path /etc/ssl/certs/fluentd-ca.crt
</match>


#### Хранение секретов в Kubernetes

Никогда не храните пароли и ключи в ConfigMap. Используйте Secret:

yaml
apiVersion: v1
kind: Secret
metadata:
name: fluentd-secrets
namespace: logging
type: Opaque
stringData:
elasticsearch-password: "my-secure-password"
forward-shared-key: "my-shared-key"
aws-secret-key: "my-aws-secret"


Использование в Pod:

yaml
env:
- name: ELASTICSEARCH_PASSWORD
valueFrom:
secretKeyRef:
name: fluentd-secrets
key: elasticsearch-password


#### Маскирование PII в логах

Для соответствия GDPR и другим регуляторным требованиям:

text
<filter >
@type record_transformer
enable_ruby true

<record>
# Маскировать email
message ${
record["message"]
.gsub(/[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}/, '[EMAIL_REDACTED]')
.gsub(/\b\d{3}[-.]?\d{3}[-.]?\d{4}\b/, '[PHONE_REDACTED]')
.gsub(/\b(?:\d[ -]*?){13,16}\b/, '[CARD_REDACTED]')
}
</record>
</filter>


#### Network Policy для Fluentd в Kubernetes

Ограничьте сетевой доступ Fluentd:

yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: fluentd-network-policy
namespace: logging
spec:
podSelector:
matchLabels:
app: fluentd
policyTypes:
- Ingress
- Egress
ingress:
- from:
- namespaceSelector: {}
ports:
- protocol: TCP
port: 24224
egress:
- to:
- podSelector:
matchLabels:
app: elasticsearch
ports:
- protocol: TCP
port: 9200
- to: [] # DNS
ports:
- protocol: UDP
port: 53




Часто задаваемые вопросы (FAQ)


#### Вопрос 1: В чём разница между Fluentd и Fluent Bit?

Fluentd — это полнофункциональный коллектор данных на Ruby, с более чем 500 плагинами и богатыми возможностями обработки данных. Fluent Bit — облегчённая версия на C, созданная той же командой, оптимизированная для встроенных систем и контейнеров. Fluent Bit потребляет ~450 KB RAM против 40+ MB у Fluentd. На практике их часто используют вместе: Fluent Bit как агент на каждом узле, Fluentd как централизованный агрегатор. Выбирайте Fluent Bit если нужен минимальный footprint, Fluentd — если нужна богатая обработка данных и широкий выбор плагинов.

#### Вопрос 2: Как не потерять логи при перезапуске Fluentd?

Используйте файловый буфер (`@type file`) с параметром `flush_at_shutdown true`. Также убедитесь, что для плагина tail указан `pos_file` — файл позиции, в котором Fluentd запоминает, до какого места в файле он дочитал. При перезапуске Fluentd продолжит читать файл с последней сохранённой позиции, а не с начала. Для событий в буфере файловый buffer гарантирует их сохранность при перезапуске.

#### Вопрос 3: Как правильно настроить Fluentd для Kubernetes?

Используйте готовый образ `fluent/fluentd-kubernetes-daemonset`, который включает плагин `kubernetes_metadata` и предварительно настроен для работы в Kubernetes. Разверните Fluentd как DaemonSet с tolerations для control-plane нод. Добавьте RBAC с правами на чтение pods и namespaces. Используйте ConfigMap для конфигурации и Secret для паролей. Настройте resource limits, но не слишком жёсткие — CPU throttling может вызвать потерю событий.

#### Вопрос 4: Почему Fluentd потребляет много CPU?

Основные причины: тяжёлый парсинг регулярными выражениями, отсутствие workers (весь поток обрабатывается одним Ruby-процессом), очень короткий flush_interval (частые операции записи), отсутствие сжатия в буфере. Решения: использовать нативные парсеры вместо regexp, увеличить количество workers, настроить оптимальный flush_interval (5-30 секунд), включить compress gzip в буфере.

#### Вопрос 5: Как отлаживать конфигурацию Fluentd?

Первый шаг — добавить `@type stdout` как временный output для вывода событий в консоль. Затем проверить конфигурацию командой `fluentd --dry-run -c fluent.conf`. Запустить с уровнем логирования `-vv` для подробного вывода. Отправить тестовое событие через `fluent-cat` или HTTP endpoint. Проверить логи самого Fluentd на наличие ошибок и предупреждений.

#### Вопрос 6: Как настроить Fluentd для многострочных Java stack traces?

Используйте плагин `concat` или тип парсера `multiline`. Ключевой параметр — `multiline_start_regexp`, который определяет регулярное выражение для начала нового события. Для Java это обычно строка с датой/временем: `/^\d{4}-\d{2}-\d{2}/`. Также важно настроить `flush_interval` для освобождения незакрытых событий — иначе последний stack trace может зависнуть в ожидании следующей строки.

#### Вопрос 7: Как ограничить объём логов, поступающих в Elasticsearch?

Используйте несколько подходов: семплирование с плагином `sampling` (пропускать каждый N-й debug-лог), фильтрацию с плагином `grep` (отбрасывать ненужные логи), агрегацию (считать количество похожих событий вместо записи каждого), настройку retention в Elasticsearch через ILM. Также рекомендуется настроить приложение на вывод структурированных JSON-логов с правильными уровнями (не логировать verbose-данные в production).

#### Вопрос 8: Что делать, если Fluentd отстаёт и буфер переполняется?

Диагностика: проверьте `fluentd_output_status_buffer_total_bytes` — если близко к `total_limit_size`, это критическая ситуация. Немедленные меры: увеличить `total_limit_size` буфера, добавить workers, увеличить `flush_thread_count`. Долгосрочные меры: перейти на архитектуру Fluent Bit + Fluentd-агрегатор, оптимизировать парсинг, настроить семплирование для высокочастотных debug-логов. Если производительность Elasticsearch является узким местом — рассмотреть Kafka как промежуточный буфер.

#### Вопрос 9: Как мигрировать с Logstash на Fluentd?

Основные отличия: синтаксис конфигурации (Ruby DSL vs XML-подобный), модель данных (Events в Fluentd против Documents в Logstash), подход к плагинам. При миграции: составьте список всех input/filter/output плагинов Logstash, найдите аналоги для Fluentd (большинство имеют прямые аналоги), адаптируйте конфигурацию. Можно запустить оба коллектора параллельно для проверки корректности. Fluentd обычно потребляет меньше ресурсов, чем Logstash на JVM.

#### Вопрос 10: Как настроить высокую доступность для Fluentd-агрегатора?

Разверните несколько экземпляров Fluentd-агрегатора как Deployment с replicas ≥ 2. Используйте Kubernetes Service для балансировки нагрузки между ними. Агент (Fluent Bit или Fluentd в DaemonSet) должен иметь в конфигурации несколько серверов с указанием `weight` для равномерного распределения нагрузки. Файловый буфер в каждом агрегаторе обеспечит надёжность при падении одного из них.

#### Вопрос 11: Как интегрировать Fluentd с Grafana Loki вместо Elasticsearch?

Установите плагин `fluent-plugin-loki`: `fluent-gem install fluent-plugin-loki`. Настройте output:
text
<match kubernetes.>
@type loki
url http://loki:3100
<label>
namespace $.kubernetes.namespace_name
app $.kubernetes.labels.app
</label>
</match>

Loki значительно дешевле Elasticsearch по ресурсам, но менее функционален для полнотекстового поиска. Выбирайте Loki если у вас уже развёрнут Grafana и бюджет на Elasticsearch ограничен.

#### Вопрос 12: Как проверить, что логи действительно доходят до Elasticsearch?

Несколько способов: просмотреть счётчик `fluentd_output_status_emit_records` в Prometheus-метриках, проверить индексы в Elasticsearch (`GET _cat/indices?v`), временно добавить `@type stdout` в copy-output для просмотра событий в реальном времени, использовать Kibana Discover с фильтром по последним 5 минутам.



Заключение: Fluentd в 2026 году — итоги и перспективы


Fluentd прошёл долгий путь от инструмента для стартапов до de-facto стандарта сбора логов в корпоративных Kubernetes-средах. В 2026 году он остаётся зрелым, стабильным и активно развивающимся проектом под управлением CNCF.

#### Что мы разобрали в этом руководстве

На протяжении всей статьи мы последовательно рассмотрели все аспекты работы с Fluentd. Архитектура Input → Filter → Output — это фундаментальная концепция, понимание которой позволяет правильно проектировать любой pipeline. Установка через пакетный менеджер, Docker и Helm в Kubernetes — выбор метода зависит от вашей среды и требований к воспроизводимости.

Конфигурационный файл fluent.conf с его директивами source, filter, match и label — это инструментарий для построения логики обработки. Богатая экосистема плагинов позволяет собирать данные практически из любого источника и отправлять в любое хранилище.

Буферизация — ключевой механизм надёжности. Файловый буфер с правильными параметрами retry гарантирует, что события не потеряются даже при временных сбоях downstream-систем.

Мониторинг самого Fluentd через Prometheus и правильно настроенные алерты — обязательная часть production-ready конфигурации. Fluentd без мониторинга — это чёрный ящик, который может тихо терять данные.