Изображение


Содержание


1. Введение: Зачем автоматизировать цифровую криминалистику
2. Инструментарий: pytsk3, pefile, pyew и экосистема DFIR-Python
3. Установка и настройка рабочей среды
4. pytsk3: работа с образами дисков и файловыми системами
5. Массовое извлечение артефактов с pytsk3
6. pefile: глубокий анализ PE-файлов Windows
7. Автоматическая классификация подозрительных PE-файлов
8. pyew: статический анализ и дизассемблирование малвари
9. Построение forensic pipeline: от образа до отчёта
10. Интеграция с YARA: автоматическое сигнатурное сканирование
11. Интеграция с VirusTotal API и MISP
12. Параллельная обработка: multiprocessing и asyncio в DFIR
13. Анализ артефактов Windows: реестр, журналы событий, префетч
14. Продвинутые техники: детекция упаковщиков, антианализ и обфускация
15. Генерация отчётов: HTML, JSON и STIX 2.1
16. Реальные сценарии: forensic pipeline для IR-команды
17. Часто задаваемые вопросы (FAQ)
18. Заключение: Python как основа современной криминалистики



Введение: Зачем автоматизировать цифровую криминалистику


Представьте типичный инцидент реагирования в 2026 году. Корпоративная сеть из 500 рабочих станций. Подозрение на компрометацию — индикатор угрозы пришёл из SIEM в 3 часа ночи. У аналитика есть образы дисков с 12 потенциально скомпрометированных машин, каждый объёмом 256 GB. Нужно за 8 часов определить вектор атаки, найти закреплённые импланты, восстановить временну́ю шкалу событий и подготовить отчёт для руководства.

Без автоматизации это невозможно. Ручной анализ одного образа диска занимает 4–8 часов у опытного аналитика. Двенадцать образов — это неделя работы. Автоматизированный pipeline на Python способен выполнить первичный анализ всех двенадцати образов за 2–3 часа: извлечь все исполняемые файлы, проверить их по YARA-правилам, рассчитать хэши и проверить по VirusTotal, проанализировать PE-заголовки на аномалии, извлечь журналы событий и артефакты закрепления, построить временну́ю шкалу. После этого аналитику остаётся изучить уже отфильтрованный и структурированный материал — вместо того чтобы тонуть в терабайтах сырых данных.

Python стал языком де-факто для цифровой криминалистики и incident response. Причин несколько. Богатейшая экосистема библиотек: pytsk3 для работы с образами дисков и файловыми системами, pefile для анализа PE-формата, volatility3 для анализа памяти, python-evtx для разбора журналов Windows, python-registry для работы с реестром. Простота написания скриптов — forensic-аналитик с базовыми навыками Python может автоматизировать рутинные задачи за часы. Кросс-платформенность — скрипты работают на Linux, macOS и Windows.

В этом руководстве мы сосредоточимся на трёх ключевых библиотеках: pytsk3 (The Sleuth Kit Python bindings) для работы с образами дисков и файловыми системами, pefile для глубокого анализа PE-исполняемых файлов Windows, pyew для статического анализа и базового дизассемблирования. Но не ограничимся ими — покажем как интегрировать эти инструменты в полный pipeline с YARA, VirusTotal, обработкой реестра и журналов Windows.

Руководство рассчитано на практикующих DFIR-аналитиков, пентестеров и специалистов по безопасности с базовыми знаниями Python и понимания концепций цифровой криминалистики. Все представленные скрипты — рабочие, проверены на реальных кейсах и готовы к использованию с минимальными адаптациями.

Важная оговорка: все техники, описанные в этом руководстве, предназначены для легальной защитной деятельности — анализа собственной инфраструктуры, реагирования на инциденты, исследования малвари в изолированных средах. Использование этих знаний для несанкционированного доступа к чужим системам является незаконным.



Инструментарий: pytsk3, pefile, pyew и экосистема DFIR-Python


Прежде чем писать код, необходимо разобраться с инструментами: что каждый из них умеет, для каких задач предназначен и как они взаимодействуют в едином pipeline.

#### pytsk3 — Python-биндинги к The Sleuth Kit

The Sleuth Kit (TSK) — это зрелый open-source фреймворк для анализа файловых систем и образов дисков, написанный на C. pytsk3 предоставляет Python-обёртку над TSK, позволяя работать с образами дисков на Python.

Что умеет pytsk3: читать образы дисков в форматах RAW (dd), AFF, E01 (EnCase), EWF без предварительного монтирования системы; работать с файловыми системами NTFS, FAT12/16/32, exFAT, ext2/3/4, HFS+, APFS; получать метаданные файлов включая удалённые; читать содержимое файлов; обходить каталоги рекурсивно; анализировать MFT (Master File Table) NTFS; работать с альтернативными потоками данных (ADS) NTFS.

Ключевое преимущество pytsk3 перед монтированием образа — безопасность и независимость. Монтирование образа потенциально заражённой системы в Linux может скомпрометировать аналитическую машину через автозапуск и другие механизмы. pytsk3 читает образ как обычный файл, не монтируя файловую систему в операционную систему — это криминалистически чистый подход.

#### pefile — анализ PE-формата

PE (Portable Executable) — формат исполняемых файлов Windows: .exe, .dll, .sys, .ocx, .drv. pefile — Python-библиотека для полного разбора PE-формата без запуска файла.

Что умеет pefile: разбирать DOS-заголовок и PE-заголовок; анализировать секции (.text, .data, .rsrc и кастомные); читать таблицы импортов и экспортов; анализировать ресурсы (иконки, манифесты, версионная информация); извлекать цифровые подписи Authenticode; анализировать TLS-колбэки; вычислять энтропию секций (высокая энтропия = возможная упаковка или шифрование); обнаруживать аномалии заголовка; работать с PE-оверлеями (данными после последней секции).

pefile — незаменимый инструмент для статического анализа малвари. Большинство малвари является PE-файлами, и анализ заголовка без запуска позволяет обнаружить сотни признаков вредоносности: подозрительные импорты, нестандартные имена секций, аномальные точки входа, встроенные ресурсы, высокую энтропию указывающую на упаковку.

#### pyew — статический анализатор и дизассемблер

pyew — Python-фреймворк для интерактивного и скриптового статического анализа исполняемых файлов. Предоставляет возможности дизассемблирования, поиска паттернов, анализа строк и базовой деобфускации.

В отличие от pefile, который анализирует структуру PE, pyew позволяет работать с содержимым — анализировать код, искать паттерны, извлекать строки, выполнять базовый анализ потока управления. pyew поддерживает архитектуры x86 и x86-64 через библиотеку distorm3.

Важное замечание о статусе pyew: оригинальный проект pyew активно не развивается с примерно 2015 года. В 2026 году для аналогичных задач чаще используются: radare2 с Python-биндингами (r2pipe), capstone для дизассемблирования, angr для символьного выполнения. Мы рассмотрим pyew как концептуальную основу и покажем современные альтернативы там, где это уместно.

#### Дополнительная экосистема DFIR-Python

Полный forensic pipeline невозможен без вспомогательных библиотек:

python-evtx — разбор журналов событий Windows (.evtx). Позволяет извлекать события из журналов без Windows-системы.

python-registry — чтение и анализ реестра Windows (.reg, NTUSER.DAT, SYSTEM, SOFTWARE). Незаменим для поиска артефактов закрепления.

libscca-python — анализ файлов префетча Windows (.pf). Позволяет восстановить список запускавшихся программ.

yara-python — применение YARA-правил к файлам и памяти. Основа сигнатурного обнаружения.

volatility3 — анализ дампов памяти. Де-факто стандарт memory forensics.

dfvfs — Digital Forensics Virtual File System от Google. Более высокоуровневая альтернатива pytsk3.

python-artifacts — библиотека Google для работы с forensic artifacts.



Установка и настройка рабочей среды


Правильная изолированная рабочая среда критически важна для forensic-работы. Мы никогда не анализируем потенциально вредоносные файлы на рабочей машине без изоляции.

#### Рекомендуемая конфигурация рабочей среды

Для forensic-анализа рекомендуется:

- Хост-машина: Linux (Ubuntu 22.04/24.04 или Debian 12) или Windows с WSL2
- Изоляция: виртуальная машина (VMware/VirtualBox) или контейнер Docker с ограниченными правами
- Сетевая изоляция: анализ малвари только в сети без доступа к корпоративным ресурсам
- Снапшоты: перед анализом сделать снапшот VM для быстрого восстановления

#### Установка системных зависимостей

pytsk3 требует системных библиотек The Sleuth Kit:

bash
<h2 id="ubuntu-debian-22-04">Ubuntu/Debian 22.04+</h2>
sudo apt update && sudo apt install -y \
python3 python3-pip python3-venv \
build-essential \
libewf-dev \
libtsk-dev \
libafflib-dev \
sleuthkit \
git \
libssl-dev \
libffi-dev \
libmagic1 \
libmagic-dev \
yara \
libyara-dev

<h2 id="proverka-versii-tsk">Проверка версии TSK</h2>
tsk_version

<h2 id="centos-rhel-rocky-linux-9">CentOS/RHEL/Rocky Linux 9</h2>
sudo dnf install -y \
python3 python3-pip \
gcc gcc-c++ make \
sleuthkit-devel \
openssl-devel \
libffi-devel


#### Создание изолированного виртуального окружения

bash
<h2 id="sozdat-rabochuyu-direktoriyu">Создать рабочую директорию</h2>
mkdir -p ~/forensic-lab && cd ~/forensic-lab

<h2 id="sozdat-virtualnoe-okruzhenie">Создать виртуальное окружение</h2>
python3 -m venv venv
source venv/bin/activate

<h2 id="obnovit-pip">Обновить pip</h2>
pip install --upgrade pip setuptools wheel

<h2 id="ustanovit-osnovnye-biblioteki">Установить основные библиотеки</h2>
pip install pytsk3
pip install pefile
pip install yara-python
pip install python-evtx
pip install python-registry
pip install requests
pip install tqdm # прогресс-бар для длительных операций
pip install colorama # цветной вывод в терминале
pip install tabulate # красивые таблицы в отчётах
pip install jinja2 # шаблонизатор для HTML-отчётов
pip install pymisp # интеграция с MISP
pip install vt-py # официальный VirusTotal Python SDK

<h2 id="ustanovit-pyew-iz-ishodnikov-tak-kak-ne-v-pypi-v-aktualnoy-versii">Установить pyew (из исходников, так как не в PyPI в актуальной версии)</h2>
pip install pyew

<h2 id="dlya-dizassemblirovaniya-sovremennaya-alternativa-pyew">Для дизассемблирования (современная альтернатива pyew)</h2>
pip install capstone # дизассемблер
pip install r2pipe # radare2 Python bindings

<h2 id="dopolnitelnye-utility">Дополнительные утилиты</h2>
pip install python-magic # определение типов файлов
pip install ssdeep # нечёткое хэширование
pip install tlsh # TLSH хэширование для похожести
pip install entropy # вычисление энтропии


#### Проверка установки

bash
<h2 id="proverit-pytsk3">Проверить pytsk3</h2>
python3 -c "import pytsk3; print('pytsk3 OK:', pytsk3.TSK_VERSION_STR)"

<h2 id="proverit-pefile">Проверить pefile</h2>
python3 -c "import pefile; print('pefile OK:', pefile.__version__)"

<h2 id="proverit-yara">Проверить yara</h2>
python3 -c "import yara; print('yara-python OK:', yara.__version__)"

<h2 id="proverit-capstone">Проверить capstone</h2>
python3 -c "import capstone; print('capstone OK:', capstone.__version__)"

<h2 id="bystryy-test-pytsk3-na-realnom-obraze">Быстрый тест pytsk3 на реальном образе</h2>
python3 -c "
import pytsk3
<h2 id="otkryt-obraz-diska">Открыть образ диска</h2>
img = pytsk3.Img_Info('/path/to/disk.dd')
print('Размер образа:', img.get_size(), 'байт')
"


#### Docker-контейнер для forensic-анализа

Для полной изоляции используйте Docker:

dockerfile
<h2 id="forensic-lab-dockerfile">forensic-lab/Dockerfile</h2>
FROM ubuntu:22.04

ENV DEBIAN_FRONTEND=noninteractive
ENV TZ=UTC

RUN apt-get update && apt-get install -y \
python3 python3-pip python3-venv \
build-essential \
libewf-dev libtsk-dev \
sleuthkit \
libssl-dev libffi-dev \
libmagic1 libmagic-dev \
yara libyara-dev \
git curl wget \
&& rm -rf /var/lib/apt/lists/*

WORKDIR /forensic

COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt

<h2 id="sozdat-direktorii-dlya-artefaktov">Создать директории для артефактов</h2>
RUN mkdir -p /forensic/{images,output,yara_rules,scripts}

CMD ["/bin/bash"]


bash
<h2 id="docker-compose-yml-dlya-forensic-lab">docker-compose.yml для forensic lab</h2>
version: "3.9"
services:
forensic-lab:
build: .
container_name: forensic-lab
volumes:
- ./images:/forensic/images:ro # образы дисков (только чтение!)
- ./output:/forensic/output # результаты анализа
- ./scripts:/forensic/scripts # скрипты
- ./yara_rules:/forensic/yara_rules # YARA правила
network_mode: none # полная сетевая изоляция для малвари
security_opt:
- no-new-privileges:true




pytsk3: работа с образами дисков и файловыми системами


pytsk3 предоставляет три основных класса для работы с образами: Img_Info (образ диска), Volume_Info (таблица разделов), FS_Info (файловая система). Понимание иерархии этих классов критично для правильного использования библиотеки.

#### Открытие образа диска

python
#!/usr/bin/env python3
"""
forensic_disk.py — базовые операции с образами дисков через pytsk3
"""
import pytsk3
import sys
from pathlib import Path


def open_image(image_path: str) -> pytsk3.Img_Info:
"""Открыть образ диска, определить формат автоматически."""
path = Path(image_path)
if not path.exists():
raise FileNotFoundError(f"Образ не найден: {image_path}")

# pytsk3 автоматически определяет формат (RAW, E01, AFF)
try:
img = pytsk3.Img_Info(image_path)
print(f"[+] Образ открыт: {image_path}")
print(f"[+] Размер: {img.get_size():,} байт ({img.get_size() / (10243):.2f} GB)")
return img
except Exception as e:
raise RuntimeError(f"Ошибка открытия образа: {e}")


def list_partitions(img: pytsk3.Img_Info) -> list:
"""Получить список разделов из таблицы разделов."""
partitions = []

try:
volume = pytsk3.Volume_Info(img)
part_type = volume.info.vstype
print(f"\n[+] Тип таблицы разделов: {part_type}")
print(f"{'Индекс':<8} {'Смещение (сект)':<20} {'Длина (сект)':<18} {'Описание'}")
print("-" * 70)

for part in volume:
desc = part.desc.decode("utf-8", errors="replace").strip()
print(f"{part.addr:<8} {part.start:<20} {part.len:<18} {desc}")
partitions.append({
"addr": part.addr,
"start": part.start,
"length": part.len,
"desc": desc,
"flags": part.flags,
})

except OSError as e:
# Нет таблицы разделов — возможно, образ одного раздела
print(f"[!] Таблица разделов не найдена: {e}")
print("[*] Попытка открыть как единый раздел...")

return partitions


def open_filesystem(img: pytsk3.Img_Info,
offset: int = 0,
sector_size: int = 512) -> pytsk3.FS_Info:
"""Открыть файловую систему по смещению."""
byte_offset = offset * sector_size
try:
fs = pytsk3.FS_Info(img, offset=byte_offset)
fs_type_map = {
pytsk3.TSK_FS_TYPE_NTFS: "NTFS",
pytsk3.TSK_FS_TYPE_FAT12: "FAT12",
pytsk3.TSK_FS_TYPE_FAT16: "FAT16",
pytsk3.TSK_FS_TYPE_FAT32: "FAT32",
pytsk3.TSK_FS_TYPE_EXT2: "ext2",
pytsk3.TSK_FS_TYPE_EXT3: "ext3",
pytsk3.TSK_FS_TYPE_EXT4: "ext4",
}
fs_name = fs_type_map.get(fs.info.ftype, f"Unknown ({fs.info.ftype})")
print(f"[+] Файловая система: {fs_name}")
print(f"[+] Размер блока: {fs.info.block_size} байт")
print(f"[+] Всего inodes: {fs.info.inum_count:,}")
return fs
except Exception as e:
raise RuntimeError(f"Ошибка открытия файловой системы: {e}")


def get_file_metadata(file_entry) -> dict:
"""Извлечь метаданные файла из записи файловой системы."""
import datetime

meta = file_entry.info.meta
name = file_entry.info.name

def ts_to_str(timestamp) -> str:
if timestamp and timestamp > 0:
try:
return datetime.datetime.utcfromtimestamp(timestamp).strftime(
"%Y-%m-%d %H:%M:%S UTC"
)
except (OSError, OverflowError):
return "Invalid"
return "N/A"

result = {
"name": name.name.decode("utf-8", errors="replace") if name else "N/A",
"inode": meta.addr if meta else None,
"size": meta.size if meta else 0,
"type": "DIR" if (meta and meta.type == pytsk3.TSK_FS_META_TYPE_DIR) else "FILE",
"mtime": ts_to_str(meta.mtime) if meta else "N/A",
"atime": ts_to_str(meta.atime) if meta else "N/A",
"ctime": ts_to_str(meta.ctime) if meta else "N/A",
"crtime": ts_to_str(meta.crtime) if meta else "N/A", # NTFS: время создания
"uid": meta.uid if meta else 0,
"gid": meta.gid if meta else 0,
"mode": oct(meta.mode) if meta else "N/A",
"allocated": bool(
meta.flags & pytsk3.TSK_FS_META_FLAG_ALLOC
) if meta else False,
}
return result


if __name__ == "__main__":
if len(sys.argv) < 2:
print(f"Использование: {sys.argv[0]} <образ_диска>")
sys.exit(1)

image_path = sys.argv[1]
img = open_image(image_path)
partitions = list_partitions(img)

# Открыть первый NTFS/ext раздел (пропустить служебные)
for part in partitions:
if part["flags"] == pytsk3.TSK_VS_PART_FLAG_ALLOC and part["length"] > 2048:
try:
fs = open_filesystem(img, part["start"])
break
except RuntimeError:
continue


#### Рекурсивный обход файловой системы

python
#!/usr/bin/env python3
"""
fs_walker.py — рекурсивный обход файловой системы с фильтрацией
"""
import pytsk3
import os
from typing import Generator, Optional, Callable
from dataclasses import dataclass, field


@dataclass
class FileEntry:
"""Представление файла из образа диска."""
path: str
name: str
inode: int
size: int
is_dir: bool
is_allocated: bool
mtime: str
atime: str
ctime: str
crtime: str
extension: str = ""

def __post_init__(self):
self.extension = os.path.splitext(self.name)[1].lower()


def walk_filesystem(
fs: pytsk3.FS_Info,
path: str = "/",
recursive: bool = True,
include_deleted: bool = False,
file_filter: Optional[Callable] = None,
max_depth: int = 20,
_depth: int = 0,
) -> Generator[FileEntry, None, None]:
"""
Рекурсивный генератор для обхода файловой системы.

Args:
fs: объект файловой системы pytsk3
path: начальный путь для обхода
recursive: обходить поддиректории
include_deleted: включать удалённые файлы
file_filter: функция-фильтр, принимает FileEntry, возвращает bool
max_depth: максимальная глубина рекурсии
"""
if _depth > max_depth:
return

try:
directory = fs.open_dir(path=path)
except OSError:
return

for entry in directory:
try:
name = entry.info.name.name.decode("utf-8", errors="replace")
except AttributeError:
continue

# Пропустить системные записи
if name in (".", ".."):
continue

# Пропустить служебные записи без метаданных
if entry.info.meta is None:
continue

meta = entry.info.meta
is_allocated = bool(meta.flags & pytsk3.TSK_FS_META_FLAG_ALLOC)
is_dir = (meta.type == pytsk3.TSK_FS_META_TYPE_DIR)

# Фильтр удалённых файлов
if not include_deleted and not is_allocated:
continue

# Построить полный путь
full_path = f"{path.rstrip('/')}/{name}"

# Создать объект FileEntry
file_entry = FileEntry(
path=full_path,
name=name,
inode=meta.addr,
size=meta.size,
is_dir=is_dir,
is_allocated=is_allocated,
mtime=_ts(meta.mtime),
atime=_ts(meta.atime),
ctime=_ts(meta.ctime),
crtime=_ts(meta.crtime),
)

# Применить пользовательский фильтр
if file_filter and not file_filter(file_entry):
if not is_dir:
continue

if not is_dir:
yield file_entry
else:
# Рекурсивный обход поддиректорий
if recursive:
yield from walk_filesystem(
fs, full_path, recursive, include_deleted,
file_filter, max_depth, _depth + 1
)


def _ts(timestamp) -> str:
"""Конвертировать Unix timestamp в строку."""
import datetime
if timestamp and timestamp > 0:
try:
return datetime.datetime.utcfromtimestamp(timestamp).strftime(
"%Y-%m-%d %H:%M:%S"
)
except (OSError, OverflowError):
return "Invalid"
return ""


def read_file_content(
fs: pytsk3.FS_Info,
file_path: str,
max_bytes: int = 50 * 1024 * 1024 # 50 MB лимит
) -> Optional[bytes]:
"""Прочитать содержимое файла из образа диска."""
try:
f = fs.open(file_path)
size = f.info.meta.size

if size == 0:
return b""

if size > max_bytes:
print(f"[!] Файл слишком большой ({size:,} байт), читаем первые {max_bytes:,}")
size = max_bytes

# Читать файл чанками для больших файлов
chunks = []
offset = 0
chunk_size = 1024 * 1024 # 1 MB чанки

while offset < size:
available = min(chunk_size, size - offset)
data = f.read_random(offset, available)
if not data:
break
chunks.append(data)
offset += len(data)

return b"".join(chunks)

except OSError as e:
# Файл не может быть прочитан (повреждён, особые права и т.д.)
return None




Массовое извлечение артефактов с pytsk3


Практическая задача forensic-аналитика — не просто прочитать файловую систему, а эффективно извлечь интересующие артефакты из образа. Рассмотрим скрипты для решения типичных задач.

#### Извлечение всех исполняемых файлов

python
#!/usr/bin/env python3
"""
extract_executables.py — извлечение всех PE-файлов из образа диска
"""
import pytsk3
import hashlib
import os
from pathlib import Path
from tqdm import tqdm
from typing import Optional
import json
import datetime

<h2 id="pe-signatura-mz-header">PE-сигнатура (MZ header)</h2>
PE_MAGIC = b"MZ"

<h2 id="rasshireniya-ispolnyaemyh-faylov-windows">Расширения исполняемых файлов Windows</h2>
EXEC_EXTENSIONS = {
".exe", ".dll", ".sys", ".ocx", ".scr",
".drv", ".cpl", ".ax", ".acm", ".mui",
".com", ".bat", ".cmd", ".ps1", ".vbs",
}


def is_pe_file(data: bytes) -> bool:
"""Быстрая проверка PE-сигнатуры."""
if len(data) < 2:
return False
return data[:2] == PE_MAGIC


def hash_data(data: bytes) -> dict:
"""Вычислить MD5, SHA1, SHA256 хэши."""
return {
"md5": hashlib.md5(data).hexdigest(),
"sha1": hashlib.sha1(data).hexdigest(),
"sha256": hashlib.sha256(data).hexdigest(),
}


def extract_executables(
image_path: str,
output_dir: str,
partition_offset: int = 0,
include_deleted: bool = False,
verify_pe_header: bool = True,
) -> list:
"""
Извлечь все исполняемые файлы из образа диска.

Returns:
Список словарей с метаданными извлечённых файлов
"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)

img = pytsk3.Img_Info(image_path)
fs = pytsk3.FS_Info(img, offset=partition_offset * 512)

extracted = []
errors = []
total_size = 0

print(f"[*] Начинаем извлечение исполняемых файлов из {image_path}")
print(f"[*] Вывод в: {output_dir}")
print(f"[*] Включая удалённые файлы: {include_deleted}")

# Генератор с фильтром по расширению
file_filter = lambda e: (
not e.is_dir and
e.extension in EXEC_EXTENSIONS and
e.size > 0 and
e.size < 500 * 1024 * 1024 # не больше 500 MB
)

# Сначала подсчитаем файлы для прогресс-бара
print("[*] Сканирование файловой системы...")
all_entries = list(walk_filesystem(
fs,
file_filter=file_filter,
include_deleted=include_deleted,
))
print(f"[+] Найдено файлов для извлечения: {len(all_entries):,}")

for entry in tqdm(all_entries, desc="Извлечение", unit="файл"):
try:
# Читать содержимое файла
data = read_file_content(fs, entry.path)
if data is None:
errors.append({"path": entry.path, "error": "Не удалось прочитать"})
continue

# Проверить PE-сигнатуру если нужно
if verify_pe_header and not is_pe_file(data):
# Файл имеет расширение PE но не является PE — подозрительно
tqdm.write(f"[!] Не PE-файл несмотря на расширение: {entry.path}")
continue

# Вычислить хэши
hashes = hash_data(data)

# Создать безопасное имя файла для сохранения
safe_name = entry.path.replace("/", "_").replace("\\", "_").lstrip("_")
save_name = f"{hashes['sha256'][:16]}_{safe_name}"
save_path = output_path / save_name

# Сохранить файл
with open(save_path, "wb") as f:
f.write(data)

total_size += len(data)

metadata = {
"original_path": entry.path,
"saved_as": save_name,
"size": entry.size,
"extension": entry.extension,
"allocated": entry.is_allocated,
"mtime": entry.mtime,
"atime": entry.atime,
"ctime": entry.ctime,
"crtime": entry.crtime,
hashes,
}
extracted.append(metadata)

except Exception as e:
errors.append({"path": entry.path, "error": str(e)})
tqdm.write(f"[!] Ошибка при обработке {entry.path}: {e}")

# Сохранить манифест
manifest = {
"image": image_path,
"extraction_time": datetime.datetime.utcnow().isoformat(),
"total_files": len(extracted),
"total_size_bytes": total_size,
"errors": len(errors),
"files": extracted,
}

manifest_path = output_path / "manifest.json"
with open(manifest_path, "w", encoding="utf-8") as f:
json.dump(manifest, f, indent=2, ensure_ascii=False)

print(f"\n[+] Извлечено файлов: {len(extracted):,}")
print(f"[+] Общий размер: {total_size / (10242):.1f} MB")
print(f"[!] Ошибок: {len(errors)}")
print(f"[+] Манифест сохранён: {manifest_path}")

return extracted


<h2 id="importiruem-walk-filesystem-i-read-file-content-iz-predyduschego-modulya">Импортируем walk_filesystem и read_file_content из предыдущего модуля</h2>
from fs_walker import walk_filesystem, read_file_content


if __name__ == "__main__":
import sys
if len(sys.argv) < 3:
print(f"Использование: {sys.argv[0]} <образ> <выходная_директория> [смещение_раздела]")
sys.exit(1)

image = sys.argv[1]
output = sys.argv[2]
offset = int(sys.argv[3]) if len(sys.argv) > 3 else 0

results = extract_executables(image, output, offset, include_deleted=True)


#### Построение временно́й шкалы (timeline)

Временна́я шкала — один из важнейших артефактов forensic-анализа. Позволяет восстановить последовательность событий на системе.

python
#!/usr/bin/env python3
"""
timeline_builder.py — построение MAC(B) временно́й шкалы из образа диска
MACB = Modification, Access, Change, Birth (creation)
"""
import pytsk3
import csv
import datetime
from pathlib import Path
from dataclasses import dataclass
from typing import List, Optional
import sys


@dataclass
class TimelineEntry:
"""Одна запись в временно́й шкале."""
timestamp: datetime.datetime
macb: str # какой тип времени: M, A, C, B
fs_type: str # тип файловой системы
mode: str # права доступа
uid: int
gid: int
size: int
inode: int
file_type: str # FILE или DIR
filename: str
is_allocated: bool

def to_csv_row(self) -> list:
return [
self.timestamp.strftime("%Y-%m-%d %H:%M:%S"),
self.macb,
self.fs_type,
self.mode,
self.uid,
self.gid,
self.size,
self.inode,
self.file_type,
"Y" if self.is_allocated else "N",
self.filename,
]


def build_timeline(
fs: pytsk3.FS_Info,
output_csv: str,
path: str = "/",
include_deleted: bool = True,
) -> int:
"""
Построить временну́ю шкалу в формате CSV.

Returns:
Количество записей в шкале
"""
entries: List[TimelineEntry] = []

# Определить тип файловой системы
fs_type_map = {
pytsk3.TSK_FS_TYPE_NTFS: "NTFS",
pytsk3.TSK_FS_TYPE_FAT32: "FAT32",
pytsk3.TSK_FS_TYPE_EXT4: "ext4",
}
fs_name = fs_type_map.get(fs.info.ftype, "Unknown")

def process_entry(file_entry, full_path):
meta = file_entry.info.meta
if meta is None:
return

is_allocated = bool(meta.flags & pytsk3.TSK_FS_META_FLAG_ALLOC)
is_dir = (meta.type == pytsk3.TSK_FS_META_TYPE_DIR)
file_type = "DIR" if is_dir else "FILE"
mode = oct(meta.mode) if meta.mode else "0000"

timestamps = {
"M": meta.mtime, # Modification
"A": meta.atime, # Access
"C": meta.ctime, # Change (metadata)
"B": meta.crtime, # Birth (creation) — только NTFS
}

# Группировать одинаковые временные метки
ts_groups = {}
for ts_type, ts_value in timestamps.items():
if ts_value and ts_value > 0:
try:
dt = datetime.datetime.utcfromtimestamp(ts_value)
key = dt.strftime("%Y-%m-%d %H:%M:%S")
if key not in ts_groups:
ts_groups[key] = {"dt": dt, "types": []}
ts_groups[key]["types"].append(ts_type)
except (OSError, OverflowError):
pass

for ts_str, ts_data in ts_groups.items():
macb_str = "".join(
t if t in ts_data["types"] else "."
for t in "MACB"
)
entry = TimelineEntry(
timestamp=ts_data["dt"],
macb=macb_str,
fs_type=fs_name,
mode=mode,
uid=meta.uid or 0,
gid=meta.gid or 0,
size=meta.size or 0,
inode=meta.addr,
file_type=file_type,
filename=full_path,
is_allocated=is_allocated,
)
entries.append(entry)

# Обход файловой системы
print(f"[*] Построение временно́й шкалы из {fs_name}...")
for entry in walk_filesystem(fs, path, include_deleted=include_deleted):
try:
f = fs.open(entry.path)
process_entry(f, entry.path)
except OSError:
continue

# Сортировать по времени
entries.sort(key=lambda e: e.timestamp)

# Сохранить в CSV
output_path = Path(output_csv)
output_path.parent.mkdir(parents=True, exist_ok=True)

with open(output_path, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow([
"Timestamp", "MACB", "FS", "Mode", "UID", "GID",
"Size", "Inode", "Type", "Allocated", "Filename"
])
for entry in entries:
writer.writerow(entry.to_csv_row())

print(f"[+] Временна́я шкала: {len(entries):,} записей → {output_csv}")
return len(entries)


from fs_walker import walk_filesystem




pefile: глубокий анализ PE-файлов Windows


PE-формат — основа всей экосистемы исполняемых файлов Windows. Глубокое понимание его структуры и умение анализировать её программно — ключевой навык forensic-аналитика и исследователя малвари.

#### Базовый анализ PE-файла

python
#!/usr/bin/env python3
"""
pe_analyzer.py — комплексный анализ PE-файла с pefile
"""
import pefile
import hashlib
import math
import datetime
import os
import sys
from pathlib import Path
from typing import Optional
import json


def calculate_entropy(data: bytes) -> float:
"""Вычислить энтропию Шеннона для блока данных."""
if not data:
return 0.0

# Подсчёт частоты каждого байта
byte_counts = [0] * 256
for byte in data:
byte_counts[byte] += 1

# Вычисление энтропии
entropy = 0.0
data_len = len(data)
for count in byte_counts:
if count == 0:
continue
probability = count / data_len
entropy -= probability * math.log2(probability)

return entropy


def analyze_pe(file_path: str) -> dict:
"""
Полный анализ PE-файла.

Returns:
Словарь с результатами анализа
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"Файл не найден: {file_path}")

# Читать файл
with open(file_path, "rb") as f:
raw_data = f.read()

result = {
"file": str(path),
"size": len(raw_data),
"hashes": {
"md5": hashlib.md5(raw_data).hexdigest(),
"sha1": hashlib.sha1(raw_data).hexdigest(),
"sha256": hashlib.sha256(raw_data).hexdigest(),
},
"analysis_time": datetime.datetime.utcnow().isoformat(),
"is_valid_pe": False,
"warnings": [],
"suspicious_indicators": [],
}

# Загрузить PE
try:
pe = pefile.PE(data=raw_data)
result["is_valid_pe"] = True
except pefile.PEFormatError as e:
result["errors"] = [f"Ошибка разбора PE: {e}"]
return result

# Заголовки
result["headers"] = analyze_headers(pe)

# Секции
result["sections"] = analyze_sections(pe)

# Импорты
result["imports"] = analyze_imports(pe)

# Экспорты
result["exports"] = analyze_exports(pe)

# Ресурсы
result["resources"] = analyze_resources(pe)

# Цифровая подпись
result["signature"] = analyze_signature(pe, raw_data)

# Строки
result["strings"] = extract_strings(raw_data)

# Индикаторы подозрительности
result["suspicious_indicators"] = detect_suspicious(pe, result)

# Предупреждения pefile
result["warnings"] = [str(w) for w in pe.get_warnings()]

pe.close()
return result


def analyze_headers(pe: pefile.PE) -> dict:
"""Анализ DOS и PE заголовков."""
headers = {}

# DOS заголовок
headers["dos"] = {
"e_magic": hex(pe.DOS_HEADER.e_magic),
"e_lfanew": hex(pe.DOS_HEADER.e_lfanew),
}

# Файловый заголовок
machine_map = {
0x014c: "x86 (i386)",
0x8664: "x86-64 (AMD64)",
0x01c0: "ARM",
0xaa64: "ARM64",
}
headers["file"] = {
"machine": machine_map.get(
pe.FILE_HEADER.Machine,
f"Unknown ({hex(pe.FILE_HEADER.Machine)})"
),
"number_of_sections": pe.FILE_HEADER.NumberOfSections,
"time_date_stamp": datetime.datetime.utcfromtimestamp(
pe.FILE_HEADER.TimeDateStamp
).isoformat() if pe.FILE_HEADER.TimeDateStamp < 231 else "Invalid",
"characteristics": hex(pe.FILE_HEADER.Characteristics),
"characteristics_flags": [],
}

# Флаги характеристик
char_flags = {
0x0002: "EXECUTABLE_IMAGE",
0x0020: "LARGE_ADDRESS_AWARE",
0x0100: "32BIT_MACHINE",
0x2000: "DLL",
}
for flag, name in char_flags.items():
if pe.FILE_HEADER.Characteristics & flag:
headers["file"]["characteristics_flags"].append(name)

# Optional Header
subsystem_map = {
1: "NATIVE", 2: "WINDOWS_GUI", 3: "WINDOWS_CUI",
7: "POSIX_CUI", 9: "WINDOWS_CE_GUI", 10: "EFI_APPLICATION",
14: "XBOX", 16: "WINDOWS_BOOT_APPLICATION",
}
headers["optional"] = {
"magic": "PE32+" if pe.OPTIONAL_HEADER.Magic == 0x20b else "PE32",
"entry_point": hex(pe.OPTIONAL_HEADER.AddressOfEntryPoint),
"image_base": hex(pe.OPTIONAL_HEADER.ImageBase),
"subsystem": subsystem_map.get(
pe.OPTIONAL_HEADER.Subsystem,
f"Unknown ({pe.OPTIONAL_HEADER.Subsystem})"
),
"dll_characteristics": hex(pe.OPTIONAL_HEADER.DllCharacteristics),
"checksum": hex(pe.OPTIONAL_HEADER.CheckSum),
"linker_version": f"{pe.OPTIONAL_HEADER.MajorLinkerVersion}."
f"{pe.OPTIONAL_HEADER.MinorLinkerVersion}",
"os_version": f"{pe.OPTIONAL_HEADER.MajorOperatingSystemVersion}."
f"{pe.OPTIONAL_HEADER.MinorOperatingSystemVersion}",
"size_of_image": pe.OPTIONAL_HEADER.SizeOfImage,
"size_of_headers": pe.OPTIONAL_HEADER.SizeOfHeaders,
}

return headers


def analyze_sections(pe: pefile.PE) -> list:
"""Анализ секций PE с расчётом энтропии."""
sections = []

for section in pe.sections:
try:
name = section.Name.decode("utf-8", errors="replace").rstrip("\x00")
except Exception:
name = "N/A"

data = section.get_data()
entropy = calculate_entropy(data)

# Флаги характеристик секции
char = section.Characteristics
flags = []
if char & 0x20: flags.append("CODE")
if char & 0x40: flags.append("INITIALIZED_DATA")
if char & 0x80: flags.append("UNINITIALIZED_DATA")
if char & 0x20000000: flags.append("EXECUTABLE")
if char & 0x40000000: flags.append("READABLE")
if char & 0x80000000: flags.append("WRITABLE")

sec_info = {
"name": name,
"virtual_address": hex(section.VirtualAddress),
"virtual_size": section.Misc_VirtualSize,
"raw_size": section.SizeOfRawData,
"entropy": round(entropy, 4),
"characteristics": hex(char),
"flags": flags,
"md5": hashlib.md5(data).hexdigest() if data else "",
}

# Признаки подозрительности секции
sec_info["suspicious"] = []
if entropy > 7.2:
sec_info["suspicious"].append(f"Высокая энтропия ({entropy:.2f}) — возможно упаковка/шифрование")
if entropy < 0.1 and section.SizeOfRawData > 0:
sec_info["suspicious"].append("Очень низкая энтропия — возможно нулевые данные")
if "CODE" in flags and "WRITABLE" in flags:
sec_info["suspicious"].append("Секция кода доступна для записи — самомодифицирующийся код?")
if name not in (".text", ".data", ".rdata", ".rsrc", ".reloc",
".bss", ".edata", ".idata", ".pdata", ".debug"):
sec_info["suspicious"].append(f"Нестандартное имя секции: '{name}'")

sections.append(sec_info)

return sections


def analyze_imports(pe: pefile.PE) -> dict:
"""Анализ таблицы импортов с выделением подозрительных функций."""

# Функции, часто используемые малварью
SUSPICIOUS_IMPORTS = {
"kernel32.dll": {
"VirtualAlloc", "VirtualAllocEx", "VirtualProtect",
"WriteProcessMemory", "ReadProcessMemory",
"CreateRemoteThread", "OpenProcess",
"LoadLibraryA", "LoadLibraryW", "LoadLibraryExA",
"GetProcAddress",
"CreateProcessA", "CreateProcessW",
"WinExec", "ShellExecuteA", "ShellExecuteW",
},
"ntdll.dll": {
"NtAllocateVirtualMemory", "NtWriteVirtualMemory",
"NtCreateThreadEx", "NtUnmapViewOfSection",
"RtlDecompressBuffer",
},
"advapi32.dll": {
"RegCreateKeyA", "RegCreateKeyW", "RegSetValueExA", "RegSetValueExW",
"OpenSCManagerA", "CreateServiceA", "StartServiceA",
"AdjustTokenPrivileges", "LookupPrivilegeValueA",
},
"wininet.dll": {
"InternetOpenA", "InternetConnectA",
"HttpOpenRequestA", "HttpSendRequestA",
"InternetReadFile",
},
"ws2_32.dll": {
"WSAStartup", "socket", "connect", "send", "recv",
"bind", "listen", "accept",
},
"crypt32.dll": {
"CryptDecrypt", "CryptEncrypt", "CryptImportKey",
},
}

imports = {"dlls": [], "suspicious": []}

if not hasattr(pe, "DIRECTORY_ENTRY_IMPORT"):
imports["note"] = "Нет таблицы импортов (packed?)"
return imports

for entry in pe.DIRECTORY_ENTRY_IMPORT:
try:
dll_name = entry.dll.decode("utf-8", errors="replace").lower()
except Exception:
dll_name = "unknown"

functions = []
for imp in entry.imports:
if imp.name:
try:
func_name = imp.name.decode("utf-8", errors="replace")
except Exception:
func_name = f"ordinal_{imp.ordinal}"
else:
func_name = f"ordinal_{imp.ordinal}"

functions.append({
"name": func_name,
"address": hex(imp.address) if imp.address else "N/A",
"ordinal": imp.ordinal,
})

# Проверить на подозрительные импорты
for sus_dll, sus_funcs in SUSPICIOUS_IMPORTS.items():
if sus_dll in dll_name and func_name in sus_funcs:
imports["suspicious"].append({
"dll": dll_name,
"function": func_name,
"reason": f"Подозрительная функция из {sus_dll}",
})

imports["dlls"].append({
"dll": dll_name,
"function_count": len(functions),
"functions": functions,
})

return imports


def analyze_exports(pe: pefile.PE) -> dict:
"""Анализ таблицы экспортов."""
exports = {"functions": []}

if not hasattr(pe, "DIRECTORY_ENTRY_EXPORT"):
return exports

try:
exports["dll_name"] = pe.DIRECTORY_ENTRY_EXPORT.name.decode(
"utf-8", errors="replace"
)
except Exception:
exports["dll_name"] = "N/A"

for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols:
func = {
"ordinal": exp.ordinal,
"address": hex(exp.address) if exp.address else "N/A",
"name": exp.name.decode("utf-8", errors="replace") if exp.name else f"ordinal_{exp.ordinal}",
}
exports["functions"].append(func)

exports["count"] = len(exports["functions"])
return exports


def analyze_resources(pe: pefile.PE) -> list:
"""Анализ ресурсной секции."""
resources = []

if not hasattr(pe, "DIRECTORY_ENTRY_RESOURCE"):
return resources

RESOURCE_TYPES = {
1: "CURSOR", 2: "BITMAP", 3: "ICON", 4: "MENU",
5: "DIALOG", 6: "STRING", 7: "FONTDIR", 8: "FONT",
9: "ACCELERATOR", 10: "RCDATA", 11: "MESSAGETABLE",
12: "GROUP_CURSOR", 14: "GROUP_ICON", 16: "VERSION",
17: "DLGINCLUDE", 19: "PLUGPLAY", 20: "VXD",
21: "ANICURSOR", 22: "ANIICON", 23: "HTML",
24: "MANIFEST",
}

for res_type in pe.DIRECTORY_ENTRY_RESOURCE.entries:
try:
res_type_name = RESOURCE_TYPES.get(
res_type.id,
res_type.name.string.decode() if res_type.name else f"Type_{res_type.id}"
)
except Exception:
res_type_name = f"Type_{res_type.id}"

if hasattr(res_type, "directory"):
for res_id in res_type.directory.entries:
if hasattr(res_id, "directory"):
for res_lang in res_id.directory.entries:
try:
data = pe.get_data(
res_lang.data.struct.OffsetToData,
res_lang.data.struct.Size
)
entropy = calculate_entropy(data)
resources.append({
"type": res_type_name,
"size": res_lang.data.struct.Size,
"entropy": round(entropy, 4),
"md5": hashlib.md5(data).hexdigest(),
"suspicious": entropy > 7.0,
})
except Exception:
pass

return resources


def analyze_signature(pe: pefile.PE, raw_data: bytes) -> dict:
"""Проверка цифровой подписи Authenticode."""
sig_info = {"present": False, "valid": None}

try:
security_dir = pe.OPTIONAL_HEADER.DATA_DIRECTORY[
pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_SECURITY"]
]
if security_dir.VirtualAddress != 0 and security_dir.Size != 0:
sig_info["present"] = True
sig_info["offset"] = hex(security_dir.VirtualAddress)
sig_info["size"] = security_dir.Size
sig_info["note"] = "Присутствует (требует проверки через OpenSSL/pyopenssl)"
except Exception:
pass

return sig_info


def extract_strings(data: bytes, min_length: int = 6) -> dict:
"""Извлечь ASCII и Unicode строки из бинарного файла."""
import re

# ASCII строки
ascii_pattern = re.compile(rb"[\x20-\x7e]{" + str(min_length).encode() + rb",}")
ascii_strings = [m.group().decode("ascii") for m in ascii_pattern.finditer(data)]

# Unicode (UTF-16LE) строки
unicode_pattern = re.compile(
rb"(?:[\x20-\x7e]\x00){" + str(min_length).encode() + rb",}"
)
unicode_strings = []
for m in unicode_pattern.finditer(data):
try:
decoded = m.group().decode("utf-16-le")
if len(decoded) >= min_length:
unicode_strings.append(decoded)
except Exception:
pass

# Фильтрация интересных строк
INTERESTING_PATTERNS = [
r"https?://[^\s]+", # URLs
r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", # IP адреса
r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}", # Email
r"HKEY_[A-Z_]+\\[^\x00]+", # Ключи реестра
r"cmd\.exe|powershell|wscript|cscript", # Shell команды
r"\\\\[a-zA-Z0-9_-]+\\[^\s]+", # UNC пути
r"[A-Fa-f0-9]{32,}", # Hex строки (ключи?)
]

import re as re_module
interesting = []
all_strings = set(ascii_strings + unicode_strings)
for pattern in INTERESTING_PATTERNS:
for s in all_strings:
if re_module.search(pattern, s, re_module.IGNORECASE):
interesting.append(s)

return {
"ascii_count": len(ascii_strings),
"unicode_count": len(unicode_strings),
"interesting": list(set(interesting))[:100], # Топ-100
"sample_ascii": ascii_strings[:20],
}


def detect_suspicious(pe: pefile.PE, analysis: dict) -> list:
"""Детектировать подозрительные признаки в PE-файле."""
indicators = []

# 1. Проверка временно́й метки компиляции
try:
ts = pe.FILE_HEADER.TimeDateStamp
if ts == 0:
indicators.append({
"severity": "HIGH",
"type": "HEADER_ANOMALY",
"description": "Нулевая временна́я метка компиляции — возможная зачистка артефактов",
})
elif ts > 2147483647: # Дата в будущем
indicators.append({
"severity": "MEDIUM",
"type": "HEADER_ANOMALY",
"description": f"Временна́я метка в будущем: {hex(ts)}",
})
except Exception:
pass

# 2. Проверка контрольной суммы
try:
if pe.OPTIONAL_HEADER.CheckSum != 0:
calculated = pe.generate_checksum()
if calculated != pe.OPTIONAL_HEADER.CheckSum:
indicators.append({
"severity": "MEDIUM",
"type": "CHECKSUM_MISMATCH",
"description": f"Несоответствие контрольной суммы: "
f"заявлена {hex(pe.OPTIONAL_HEADER.CheckSum)}, "
f"вычислена {hex(calculated)}",
})
except Exception:
pass

# 3. Проверка точки входа
try:
ep = pe.OPTIONAL_HEADER.AddressOfEntryPoint
ep_section = None
for section in pe.sections:
va = section.VirtualAddress
vs = section.Misc_VirtualSize
if va <= ep < va + vs:
ep_section = section
break

if ep_section is None:
indicators.append({
"severity": "HIGH",
"type": "ENTRY_POINT_ANOMALY",
"description": "Точка входа не находится ни в одной секции",
})
else:
sec_name = ep_section.Name.decode("utf-8", errors="replace").rstrip("\x00")
if sec_name not in (".text", "CODE", ".code"):
indicators.append({
"severity": "MEDIUM",
"type": "ENTRY_POINT_ANOMALY",
"description": f"Точка входа в нестандартной секции: '{sec_name}'",
})
except Exception:
pass

# 4. Высокая энтропия секций
for sec in analysis.get("sections", []):
if sec.get("entropy", 0) > 7.2:
indicators.append({
"severity": "HIGH",
"type": "HIGH_ENTROPY",
"description": f"Секция '{sec['name']}' имеет высокую энтропию "
f"({sec['entropy']}) — возможна упаковка/шифрование",
})

# 5. Подозрительные импорты
suspicious_imports = analysis.get("imports", {}).get("suspicious", [])
for imp in suspicious_imports:
indicators.append({
"severity": "MEDIUM",
"type": "SUSPICIOUS_IMPORT",
"description": f"Подозрительный импорт: {imp['dll']}!{imp['function']}",
})

# 6. Комбинация VirtualAlloc + WriteProcessMemory + CreateRemoteThread
all_funcs = set()
for dll_entry in analysis.get("imports", {}).get("dlls", []):
for func in dll_entry.get("functions", []):
all_funcs.add(func["name"])

injection_combo = {
"VirtualAllocEx", "WriteProcessMemory", "CreateRemoteThread"
}
if injection_combo.issubset(all_funcs):
indicators.append({
"severity": "CRITICAL",
"type": "PROCESS_INJECTION",
"description": "Классическая тройка для инжекции: VirtualAllocEx + "
"WriteProcessMemory + CreateRemoteThread",
})

# 7. Нет импортов (packed)
if not analysis.get("imports", {}).get("dlls"):
indicators.append({
"severity": "HIGH",
"type": "NO_IMPORTS",
"description": "Файл не имеет таблицы импортов — вероятно упакован",
})

# 8. Подозрительные ресурсы с высокой энтропией
for res in analysis.get("resources", []):
if res.get("suspicious") and res.get("size", 0) > 10000:
indicators.append({
"severity": "MEDIUM",
"type": "SUSPICIOUS_RESOURCE",
"description": f"Ресурс типа {res['type']} имеет высокую энтропию "
f"({res['entropy']}) и большой размер ({res['size']} байт)",
})

# 9. Оверлей (данные после PE)
try:
overlay = pe.get_overlay()
if overlay and len(overlay) > 1024:
overlay_entropy = calculate_entropy(overlay)
indicators.append({
"severity": "MEDIUM",
"type": "PE_OVERLAY",
"description": f"PE-файл имеет оверлей {len(overlay):,} байт, "
f"энтропия: {overlay_entropy:.2f}",
})
except Exception:
pass

return indicators




Автоматическая классификация подозрительных PE-файлов


Имея базовый PE-анализатор, построим систему оценки риска для массовой классификации файлов.

python
#!/usr/bin/env python3
"""
pe_classifier.py — система оценки риска для массового анализа PE-файлов
"""
import os
import sys
import json
import concurrent.futures
from pathlib import Path
from typing import List, Tuple
from tqdm import tqdm
import csv
import datetime

<h2 id="importiruem-nash-analizator">Импортируем наш анализатор</h2>
from pe_analyzer import analyze_pe


<h2 id="vesovye-koeffitsienty-dlya-otsenki-riska">Весовые коэффициенты для оценки риска</h2>
RISK_WEIGHTS = {
"CRITICAL": 40,
"HIGH": 20,
"MEDIUM": 10,
"LOW": 5,
}

<h2 id="porogi-klassifikatsii">Пороги классификации</h2>
RISK_THRESHOLDS = {
"BENIGN": (0, 20),
"SUSPICIOUS": (20, 50),
"LIKELY_MALICIOUS": (50, 80),
"MALICIOUS": (80, float("inf")),
}


def calculate_risk_score(analysis: dict) -> Tuple[int, str]:
"""
Вычислить интегральную оценку риска.

Returns:
(score, classification)
"""
score = 0
indicators = analysis.get("suspicious_indicators", [])

for indicator in indicators:
severity = indicator.get("severity", "LOW")
score += RISK_WEIGHTS.get(severity, 0)

# Дополнительные факторы
# Нет цифровой подписи (не признак малвари, но увеличивает риск)
if not analysis.get("signature", {}).get("present", False):
score += 5

# Предупреждения pefile
if len(analysis.get("warnings", [])) > 3:
score += 10

# Определить классификацию
classification = "BENIGN"
for cls, (low, high) in RISK_THRESHOLDS.items():
if low <= score < high:
classification = cls
break

return score, classification


def analyze_file_worker(args: tuple) -> dict:
"""Воркер для параллельного анализа."""
file_path, output_dir = args
try:
analysis = analyze_pe(str(file_path))
score, classification = calculate_risk_score(analysis)
analysis["risk_score"] = score
analysis["classification"] = classification

# Сохранить детальный JSON если подозрительный
if classification in ("LIKELY_MALICIOUS", "MALICIOUS", "SUSPICIOUS"):
output_file = Path(output_dir) / f"{file_path.stem}_{score}_{classification}.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump(analysis, f, indent=2, ensure_ascii=False)

return {
"file": str(file_path),
"size": analysis.get("size", 0),
"sha256": analysis.get("hashes", {}).get("sha256", ""),
"risk_score": score,
"classification": classification,
"indicators_count": len(analysis.get("suspicious_indicators", [])),
"indicators": [
i["description"] for i in analysis.get("suspicious_indicators", [])
],
"error": None,
}
except Exception as e:
return {
"file": str(file_path),
"size": 0,
"sha256": "",
"risk_score": -1,
"classification": "ERROR",
"indicators_count": 0,
"indicators": [],
"error": str(e),
}


def batch_analyze(
input_dir: str,
output_dir: str,
max_workers: int = 4,
extensions: set = None,
) -> List[dict]:
"""
Массовый анализ всех PE-файлов в директории.

Args:
input_dir: директория с файлами для анализа
output_dir: директория для результатов
max_workers: число параллельных воркеров
extensions: расширения файлов для анализа

Returns:
Список результатов анализа
"""
if extensions is None:
extensions = {".exe", ".dll", ".sys", ".ocx", ".scr"}

input_path = Path(input_dir)
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)

# Собрать все файлы
files = [
f for f in input_path.rglob("*")
if f.is_file() and f.suffix.lower() in extensions
]
print(f"[*] Найдено файлов для анализа: {len(files):,}")

if not files:
print("[!] Файлы не найдены")
return []

# Параллельный анализ
results = []
args = [(f, str(output_path)) for f in files]

with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(analyze_file_worker, arg): arg for arg in args}

with tqdm(total=len(files), desc="Анализ PE-файлов", unit="файл") as pbar:
for future in concurrent.futures.as_completed(futures):
result = future.result()
results.append(result)
pbar.update(1)

# Показывать подозрительные файлы в реальном времени
cls = result.get("classification", "")
if cls in ("MALICIOUS", "LIKELY_MALICIOUS"):
tqdm.write(
f"[!] {cls} ({result['risk_score']}): {result['file']}"
)

# Сортировать по убыванию риска
results.sort(key=lambda r: r["risk_score"], reverse=True)

# Сохранить сводный CSV-отчёт
csv_path = output_path / f"batch_analysis_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
with open(csv_path, "w", newline="", encoding="utf-8") as f:
fieldnames = ["file", "size", "sha256", "risk_score", "classification",
"indicators_count", "indicators", "error"]
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for r in results:
r_copy = r.copy()
r_copy["indicators"] = "; ".join(r_copy.get("indicators", []))
writer.writerow(r_copy)

# Статистика
stats = {}
for cls in RISK_THRESHOLDS:
stats[cls] = sum(1 for r in results if r["classification"] == cls)
stats["ERROR"] = sum(1 for r in results if r["classification"] == "ERROR")

print(f"\n{'='*50}")
print(f"РЕЗУЛЬТАТЫ МАССОВОГО АНАЛИЗА")
print(f"{'='*50}")
print(f"Всего файлов: {len(results):>8,}")
print(f"MALICIOUS: {stats.get('MALICIOUS', 0):>8,}")
print(f"LIKELY_MALICIOUS: {stats.get('LIKELY_MALICIOUS', 0):>8,}")
print(f"SUSPICIOUS: {stats.get('SUSPICIOUS', 0):>8,}")
print(f"BENIGN: {stats.get('BENIGN', 0):>8,}")
print(f"ERROR: {stats.get('ERROR', 0):>8,}")
print(f"{'='*50}")
print(f"[+] Отчёт: {csv_path}")

return results


if __name__ == "__main__":
if len(sys.argv) < 3:
print(f"Использование: {sys.argv[0]} <входная_директория> <выходная_директория> [воркеры]")
sys.exit(1)

workers = int(sys.argv[3]) if len(sys.argv) > 3 else 4
batch_analyze(sys.argv[1], sys.argv[2], max_workers=workers)




pyew: статический анализ и дизассемблирование малвари


Для задач дизассемблирования и статического анализа кода в 2026 году наиболее практичен подход с использованием capstone в сочетании с pefile. Рассмотрим современный подход, отражающий реальную практику.

python
#!/usr/bin/env python3
"""
disasm_analyzer.py — дизассемблирование и статический анализ кода PE-файлов
Использует capstone (современная альтернатива pyew)
"""
import pefile
import capstone
import struct
from pathlib import Path
from typing import List, Optional, Tuple
import re


class StaticCodeAnalyzer:
"""
Статический анализатор кода исполняемых файлов.
Комбинирует pefile для структурного анализа и capstone для дизассемблирования.
"""

def __init__(self, file_path: str):
self.file_path = Path(file_path)
self.pe = None
self.raw_data = None
self.md = None # capstone дизассемблер
self._load()

def _load(self):
"""Загрузить PE-файл и инициализировать дизассемблер."""
with open(self.file_path, "rb") as f:
self.raw_data = f.read()

self.pe = pefile.PE(data=self.raw_data)

# Определить архитектуру
if self.pe.FILE_HEADER.Machine == 0x014c: # x86
self.md = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_32)
self.arch = "x86"
elif self.pe.FILE_HEADER.Machine == 0x8664: # x64
self.md = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64)
self.arch = "x86_64"
else:
raise ValueError(f"Неподдерживаемая архитектура: {hex(self.pe.FILE_HEADER.Machine)}")

# Включить детализированный вывод capstone
self.md.detail = True

print(f"[+] Загружен: {self.file_path.name} ({self.arch})")

def get_entry_point_code(self, size: int = 256) -> List[str]:
"""
Дизассемблировать код вокруг точки входа.

Args:
size: количество байт для дизассемблирования

Returns:
Список строк дизассемблера
"""
ep_rva = self.pe.OPTIONAL_HEADER.AddressOfEntryPoint
ep_offset = self.pe.get_offset_from_rva(ep_rva)
image_base = self.pe.OPTIONAL_HEADER.ImageBase

code = self.raw_data[ep_offset: ep_offset + size]
result = []

for insn in self.md.disasm(code, image_base + ep_rva):
line = (
f"0x{insn.address:08x}: "
f"{insn.bytes.hex():<20} "
f"{insn.mnemonic:<12} {insn.op_str}"
)
result.append(line)

return result

def find_suspicious_patterns(self) -> List[dict]:
"""
Найти подозрительные паттерны кода в секции .text.
Обнаруживает: shellcode-паттерны, antidebug, crypto constants.
"""
findings = []

# Известные константы криптографических алгоритмов
CRYPTO_CONSTANTS = {
b"\x67\x45\x23\x01": "SHA-1/MD5 константа инициализации",
b"\x6a\x09\xe6\x67": "SHA-256 инициализирующая константа",
b"\x52\x98\x53\x01": "RC4 инициализация",
b"\x63\x7c\x77\x7b": "AES S-box",
}

# Паттерны антиотладки
ANTIDEBUG_PATTERNS = {
b"\x64\xa1\x30\x00\x00\x00": "PEB access (антиотладка — проверка IsDebugged)",
b"\x64\xa1\x18\x00\x00\x00": "PEB.Ldr access",
b"\xcc": "INT3 (breakpoint — возможная антиотладка)",
}

# Паттерны shellcode (GetPC call, egg hunters)
SHELLCODE_PATTERNS = {
b"\xe8\x00\x00\x00\x00\x5b": "GetPC через call/pop (shellcode паттерн)",
b"\xe8\x00\x00\x00\x00\x58": "GetPC через call/pop EAX",
b"\x54\x59\x49\x74\x04\x49\x74": "Egg hunter паттерн",
}

for section in self.pe.sections:
try:
sec_name = section.Name.decode("utf-8", errors="replace").rstrip("\x00")
sec_data = section.get_data()
except Exception:
continue

for pattern_dict, category in [
(CRYPTO_CONSTANTS, "CRYPTO"),
(ANTIDEBUG_PATTERNS, "ANTIDEBUG"),
(SHELLCODE_PATTERNS, "SHELLCODE"),
]:
for pattern, description in pattern_dict.items():
offset = 0
while True:
pos = sec_data.find(pattern, offset)
if pos == -1:
break

va = section.VirtualAddress + pos
findings.append({
"category": category,
"description": description,
"section": sec_name,
"offset_in_section": hex(pos),
"virtual_address": hex(va + self.pe.OPTIONAL_HEADER.ImageBase),
"bytes": pattern.hex(),
})
offset = pos + 1

return findings

def find_api_calls(self) -> List[dict]:
"""
Найти вызовы API через анализ инструкций CALL.
Полезно для понимания поведения без запуска.
"""
api_calls = []
iat = self._build_iat()

for section in self.pe.sections:
try:
char = section.Characteristics
if not (char & 0x20000000): # только исполняемые секции
continue

sec_data = section.get_data()
sec_va = section.VirtualAddress + self.pe.OPTIONAL_HEADER.ImageBase

for insn in self.md.disasm(sec_data, sec_va):
if insn.mnemonic in ("call", "jmp"):
# Попробовать разрешить адрес в имя API
op_str = insn.op_str.strip()
if op_str.startswith("0x"):
try:
target_addr = int(op_str, 16)
if target_addr in iat:
api_calls.append({
"address": hex(insn.address),
"instruction": f"{insn.mnemonic} {op_str}",
"target": iat[target_addr],
})
except ValueError:
pass
except Exception:
continue

return api_calls

def _build_iat(self) -> dict:
"""Построить словарь адрес→имя_функции из таблицы импортов."""
iat = {}
if not hasattr(self.pe, "DIRECTORY_ENTRY_IMPORT"):
return iat

for entry in self.pe.DIRECTORY_ENTRY_IMPORT:
for imp in entry.imports:
if imp.address and imp.name:
try:
func_name = imp.name.decode("utf-8", errors="replace")
dll_name = entry.dll.decode("utf-8", errors="replace")
iat[imp.address] = f"{dll_name}!{func_name}"
except Exception:
pass
return iat

def extract_network_indicators(self) -> dict:
"""
Извлечь сетевые индикаторы (IP, домены, URL) из строк файла.
"""
import re

with open(self.file_path, "rb") as f:
data = f.read()

# Извлечь все ASCII строки
ascii_strings = re.findall(rb"[\x20-\x7e]{6,}", data)
strings = [s.decode("ascii", errors="replace") for s in ascii_strings]

indicators = {
"urls": [],
"ips": [],
"domains": [],
"emails": [],
}

url_pattern = re.compile(r"https?://[^\s'\"\x00]{4,}")
ip_pattern = re.compile(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b")
domain_pattern = re.compile(
r"\b(?:<a href="?:[a-z0-9-]{0,61}[a-z0-9]">a-z0-9</a>?\.)+[a-z]{2,6}\b",
re.IGNORECASE
)

for s in strings:
for url in url_pattern.findall(s):
if url not in indicators["urls"]:
indicators["urls"].append(url)

for ip in ip_pattern.findall(s):
# Фильтровать очевидно нереальные IP
parts = ip.split(".")
if all(0 <= int(p) <= 255 for p in parts):
if ip not in ("127.0.0.1", "0.0.0.0", "255.255.255.255"):
if ip not in indicators["ips"]:
indicators["ips"].append(ip)

return indicators

def generate_report(self) -> dict:
"""Сгенерировать полный отчёт статического анализа."""
print(f"[*] Анализ: {self.file_path.name}")

report = {
"file": str(self.file_path),
"arch": self.arch,
"entry_point_disasm": self.get_entry_point_code(128),
"suspicious_patterns": self.find_suspicious_patterns(),
"network_indicators": self.extract_network_indicators(),
"api_calls": self.find_api_calls()[:50],
}

print(f"[+] Подозрительных паттернов: {len(report['suspicious_patterns'])}")
print(f"[+] Сетевых индикаторов: "
f"{len(report['network_indicators']['urls'])} URL, "
f"{len(report['network_indicators']['ips'])} IP")

return report


if __name__ == "__main__":
import sys, json
if len(sys.argv) < 2:
print(f"Использование: {sys.argv[0]} <pe_файл>")
sys.exit(1)

analyzer = StaticCodeAnalyzer(sys.argv[1])
report = analyzer.generate_report()

print("\n=== ТОЧКА ВХОДА (первые 128 байт) ===")
for line in report["entry_point_disasm"]:
print(line)

print("\n=== ПОДОЗРИТЕЛЬНЫЕ ПАТТЕРНЫ ===")
for p in report["suspicious_patterns"]:
print(f"[{p['category']}] {p['description']} @ {p['virtual_address']}")

print("\n=== СЕТЕВЫЕ ИНДИКАТОРЫ ===")
for url in report["network_indicators"]["urls"]:
print(f" URL: {url}")
for ip in report["network_indicators"]["ips"]:
print(f" IP: {ip}")




Интеграция с YARA: автоматическое сигнатурное сканирование


YARA — стандарт де-факто для описания и обнаружения малвари через сигнатуры. Интеграция YARA с нашим pipeline позволяет мгновенно классифицировать файлы по известным семействам малвари.

python
#!/usr/bin/env python3
"""
yara_scanner.py — массовое YARA-сканирование с детализированными результатами
"""
import yara
import os
import json
import hashlib
from pathlib import Path
from typing import Optional, List
from tqdm import tqdm
import concurrent.futures
import datetime


def compile_rules(rules_dir: str) -> yara.Rules:
"""
Скомпилировать все YARA-правила из директории.

Returns:
Скомпилированные правила
"""
rules_path = Path(rules_dir)
rule_files = {}

for yara_file in rules_path.rglob("*.yar"):
# Использовать имя файла как namespace
namespace = str(yara_file.relative_to(rules_path)).replace("/", "_").rstrip(".yar")
rule_files[namespace] = str(yara_file)

for yara_file in rules_path.rglob("*.yara"):
namespace = str(yara_file.relative_to(rules_path)).replace("/", "_").rstrip(".yara")
rule_files[namespace] = str(yara_file)

if not rule_files:
raise ValueError(f"YARA-правила не найдены в {rules_dir}")

print(f"[+] Компиляция {len(rule_files)} файлов с YARA-правилами...")

try:
rules = yara.compile(filepaths=rule_files)
print(f"[+] Правила скомпилированы успешно")
return rules
except yara.SyntaxError as e:
raise RuntimeError(f"Синтаксическая ошибка в YARA-правилах: {e}")


def scan_file(
file_path: str,
rules: yara.Rules,
timeout: int = 60,
) -> dict:
"""
Сканировать файл YARA-правилами.

Returns:
Словарь с совпадениями
"""
path = Path(file_path)
result = {
"file": str(path),
"sha256": "",
"matches": [],
"error": None,
"scan_time": datetime.datetime.utcnow().isoformat(),
}

try:
with open(path, "rb") as f:
data = f.read()

result["sha256"] = hashlib.sha256(data).hexdigest()
result["size"] = len(data)

# Сканировать файл
matches = rules.match(data=data, timeout=timeout)

for match in matches:
match_info = {
"rule": match.rule,
"namespace": match.namespace,
"tags": list(match.tags),
"meta": dict(match.meta),
"strings": [],
}

# Детали совпавших строк
for string_match in match.strings:
for instance in string_match.instances:
match_info["strings"].append({
"identifier": string_match.identifier,
"offset": instance.offset,
"matched_data": instance.matched_data[:64].hex(),
})

result["matches"].append(match_info)

except yara.TimeoutError:
result["error"] = f"Таймаут сканирования ({timeout}s)"
except Exception as e:
result["error"] = str(e)

return result


def batch_yara_scan(
input_dir: str,
rules_dir: str,
output_file: str,
max_workers: int = 4,
extensions: Optional[set] = None,
) -> List[dict]:
"""Массовое YARA-сканирование директории."""

if extensions is None:
extensions = {".exe", ".dll", ".sys", ".bin", ".dat"}

# Компилировать правила один раз
rules = compile_rules(rules_dir)

# Собрать файлы
files = [
f for f in Path(input_dir).rglob("*")
if f.is_file() and f.suffix.lower() in extensions
]
print(f"[*] Файлов для сканирования: {len(files):,}")

results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(scan_file, str(f), rules): f
for f in files
}

with tqdm(total=len(files), desc="YARA сканирование", unit="файл") as pbar:
for future in concurrent.futures.as_completed(futures):
result = future.result()
results.append(result)
pbar.update(1)

if result["matches"]:
rules_hit = [m["rule"] for m in result["matches"]]
tqdm.write(
f"[HIT] {Path(result['file']).name}: "
f"{', '.join(rules_hit)}"
)

# Статистика
total_hits = sum(1 for r in results if r["matches"])
unique_rules = set()
for r in results:
for m in r["matches"]:
unique_rules.add(m["rule"])

print(f"\n[+] Файлов с совпадениями: {total_hits}/{len(results)}")
print(f"[+] Уникальных сработавших правил: {len(unique_rules)}")

# Сохранить результаты
with open(output_file, "w", encoding="utf-8") as f:
json.dump({
"scan_time": datetime.datetime.utcnow().isoformat(),
"total_files": len(results),
"files_with_hits": total_hits,
"unique_rules_triggered": list(unique_rules),
"results": [r for r in results if r["matches"] or r["error"]],
}, f, indent=2, ensure_ascii=False)

print(f"[+] Результаты сохранены: {output_file}")
return results


<h2 id="primer-bazovogo-yara-pravila-dlya-demonstratsii">Пример базового YARA-правила для демонстрации</h2>
EXAMPLE_YARA_RULE = """
rule Suspicious_PE_Packer {
meta:
description = "Признаки упакованного PE-файла"
author = "ForensicLab"
date = "2026-01-01"
severity = "MEDIUM"

strings:
$upx = "UPX0" ascii
$upx1 = "UPX!" ascii
$nsis = "NSIS" ascii
$aspack = "ASPack" ascii
$mpress = "MPRESS" ascii

condition:
uint16(0) == 0x5A4D and // MZ header
(
any of ($upx*) or
$nsis or
$aspack or
$mpress
)
}

rule Possible_C2_Communication {
meta:
description = "Возможные паттерны C2-коммуникации"
severity = "HIGH"

strings:
$ua1 = "Mozilla/4.0 (compatible; MSIE" ascii nocase
$ua2 = "Mozilla/5.0" ascii nocase
$http = "HTTP/1." ascii
$post = "POST /" ascii
$get = "GET /" ascii
$cmd = "cmd.exe" ascii nocase
$ps = "powershell" ascii nocase

condition:
uint16(0) == 0x5A4D and
(
($ua1 or $ua2) and
($http or $post or $get) and
($cmd or $ps)
)
}
"""




Интеграция с VirusTotal API и MISP


Автоматическая проверка хэшей по VirusTotal и обогащение данных из MISP значительно ускоряют процесс классификации.

python
#!/usr/bin/env python3
"""
threat_intel.py — интеграция с VirusTotal и MISP для обогащения данных
"""
import vt # pip install vt-py
from pymisp import PyMISP # pip install pymisp
import requests
import json
import time
import hashlib
from pathlib import Path
from typing import Optional
import os


class VirusTotalClient:
"""Клиент VirusTotal API v3."""

def __init__(self, api_key: str):
self.client = vt.Client(api_key)
self.rate_limit_delay = 15 # секунды между запросами (free tier: 4/мин)

def check_hash(self, sha256: str) -> dict:
"""Проверить хэш файла в VirusTotal."""
try:
file_obj = self.client.get_object(f"/files/{sha256}")
stats = file_obj.last_analysis_stats

result = {
"sha256": sha256,
"found": True,
"malicious": stats.get("malicious", 0),
"suspicious": stats.get("suspicious", 0),
"undetected": stats.get("undetected", 0),
"total_engines": sum(stats.values()),
"detection_ratio": f"{stats.get('malicious', 0)}/{sum(stats.values())}",
"meaningful_name": getattr(file_obj, "meaningful_name", ""),
"type_description": getattr(file_obj, "type_description", ""),
"first_submission": str(getattr(file_obj, "first_submission_date", "")),
"last_analysis_date": str(getattr(file_obj, "last_analysis_date", "")),
"tags": list(getattr(file_obj, "tags", [])),
"popular_threat_classification": {},
}

# Классификация угрозы
ptc = getattr(file_obj, "popular_threat_classification", None)
if ptc:
result["popular_threat_classification"] = {
"suggested_threat_label": getattr(ptc, "suggested_threat_label", ""),
"popular_threat_category": [
c.value for c in getattr(ptc, "popular_threat_category", [])
],
}

return result

except vt.error.APIError as e:
if e.code == "NotFoundError":
return {"sha256": sha256, "found": False, "error": "Not in VT"}
return {"sha256": sha256, "found": False, "error": str(e)}
except Exception as e:
return {"sha256": sha256, "found": False, "error": str(e)}

def batch_check_hashes(
self, hashes: list, cache_file: Optional[str] = None
) -> dict:
"""
Массовая проверка хэшей с кэшированием результатов.

Returns:
Словарь sha256 -> результат
"""
# Загрузить кэш если есть
cache = {}
if cache_file and Path(cache_file).exists():
with open(cache_file, "r") as f:
cache = json.load(f)
print(f"[+] Загружен кэш VT: {len(cache)} записей")

results = {}
to_check = [h for h in hashes if h not in cache]
print(f"[*] Хэшей для проверки в VT: {len(to_check)} (кэшировано: {len(hashes) - len(to_check)})")

for i, sha256 in enumerate(to_check, 1):
print(f"[*] VT [{i}/{len(to_check)}]: {sha256[:16]}...")
result = self.check_hash(sha256)
cache[sha256] = result
results[sha256] = result

# Показать результат сразу
if result.get("found"):
det = result.get("malicious", 0)
total = result.get("total_engines", 0)
if det > 0:
print(f" [!] DETECTED: {det}/{total} - {result.get('meaningful_name', '')}")

# Rate limiting
if i < len(to_check):
time.sleep(self.rate_limit_delay)

# Сохранить кэш
if cache_file:
with open(cache_file, "w") as f:
json.dump(cache, f, indent=2, default=str)

# Добавить кэшированные результаты
for h in hashes:
if h in cache:
results[h] = cache[h]

return results

def close(self):
self.client.close()


class MISPClient:
"""Клиент MISP для обогащения данных."""

def __init__(self, url: str, api_key: str, verify_ssl: bool = True):
self.misp = PyMISP(url, api_key, ssl=verify_ssl)

def search_hash(self, sha256: str) -> list:
"""Найти события в MISP по хэшу."""
try:
result = self.misp.search(value=sha256, type_attribute="sha256")
events = []
for event in result:
events.append({
"event_id": event.get("Event", {}).get("id"),
"info": event.get("Event", {}).get("info", ""),
"date": event.get("Event", {}).get("date", ""),
"threat_level": event.get("Event", {}).get("threat_level_id"),
"tags": [
t.get("Tag", {}).get("name", "")
for t in event.get("Event", {}).get("Tag", [])
],
})
return events
except Exception as e:
return [{"error": str(e)}]

def add_event_from_analysis(
self,
analysis: dict,
event_title: str,
threat_level: int = 2, # 1=High, 2=Medium, 3=Low, 4=Undefined
) -> Optional[dict]:
"""Создать событие в MISP из результатов forensic-анализа."""
from pymisp import MISPEvent, MISPAttribute

event = MISPEvent()
event.info = event_title
event.threat_level_id = threat_level
event.analysis = 1 # Initial

# Добавить хэши
hashes = analysis.get("hashes", {})
if hashes.get("sha256"):
event.add_attribute("sha256", hashes["sha256"])
if hashes.get("md5"):
event.add_attribute("md5", hashes["md5"])
if hashes.get("sha1"):
event.add_attribute("sha1", hashes["sha1"])

# Добавить сетевые индикаторы
for url in analysis.get("network_indicators", {}).get("urls", []):
event.add_attribute("url", url)
for ip in analysis.get("network_indicators", {}).get("ips", []):
event.add_attribute("ip-dst", ip)

try:
result = self.misp.add_event(event)
print(f"[+] MISP событие создано: ID {result.get('Event', {}).get('id')}")
return result
except Exception as e:
print(f"[!] Ошибка создания события MISP: {e}")
return None


def enrich_results_with_vt(
analysis_results: list,
vt_api_key: str,
cache_file: str = "vt_cache.json",
) -> list:
"""Обогатить результаты анализа данными VirusTotal."""

# Извлечь уникальные хэши
hashes = list(set(
r.get("hashes", {}).get("sha256", "")
for r in analysis_results
if r.get("hashes", {}).get("sha256")
))
hashes = [h for h in hashes if h] # убрать пустые

if not hashes:
print("[!] Нет хэшей для проверки в VT")
return analysis_results

print(f"[*] Проверка {len(hashes)} уникальных хэшей в VirusTotal...")

vt_client = VirusTotalClient(vt_api_key)
try:
vt_results = vt_client.batch_check_hashes(hashes, cache_file=cache_file)
finally:
vt_client.close()

# Обогатить результаты
for result in analysis_results:
sha256 = result.get("hashes", {}).get("sha256", "")
if sha256 and sha256 in vt_results:
result["virustotal"] = vt_results[sha256]

return analysis_results




Параллельная обработка: multiprocessing и asyncio в DFIR


Скорость — критический фактор при реагировании на инциденты. Правильное использование параллелизма в Python позволяет обрабатывать тысячи файлов за разумное время.

python
#!/usr/bin/env python3
"""
parallel_forensics.py — высокопроизводительная параллельная обработка артефактов
"""
import concurrent.futures
import multiprocessing
import asyncio
import aiohttp
import aiofiles
import os
import hashlib
import time
from pathlib import Path
from typing import Callable, List, Any, Optional
from tqdm import tqdm
from dataclasses import dataclass
import queue
import threading


@dataclass
class ProcessingTask:
"""Задача обработки файла."""
file_path: str
task_type: str
priority: int = 0
metadata: dict = None


class ForensicProcessingPool:
"""
Пул воркеров для параллельной forensic-обработки.
Использует ProcessPool для CPU-интенсивных задач
и ThreadPool для I/O-интенсивных.
"""

def __init__(
self,
cpu_workers: Optional[int] = None,
io_workers: int = 8,
):
self.cpu_workers = cpu_workers or max(1, multiprocessing.cpu_count() - 1)
self.io_workers = io_workers
self.stats = {
"processed": 0,
"errors": 0,
"start_time": None,
}

def process_cpu_bound(
self,
items: list,
worker_func: Callable,
desc: str = "Обработка",
timeout: int = 300,
) -> List[Any]:
"""
Обработка CPU-интенсивных задач (PE-анализ, YARA-сканирование).
Использует ProcessPoolExecutor для обхода GIL.
"""
results = []
self.stats["start_time"] = time.time()
errors = 0

print(f"[*] Запуск {self.cpu_workers} CPU-воркеров для {len(items):,} задач")

with concurrent.futures.ProcessPoolExecutor(
max_workers=self.cpu_workers
) as executor:
futures = {executor.submit(worker_func, item): item for item in items}

with tqdm(total=len(items), desc=desc, unit="файл") as pbar:
for future in concurrent.futures.as_completed(futures, timeout=timeout * len(items)):
try:
result = future.result(timeout=timeout)
results.append(result)
self.stats["processed"] += 1
except concurrent.futures.TimeoutError:
errors += 1
tqdm.write(f"[!] Таймаут: {futures[future]}")
except Exception as e:
errors += 1
tqdm.write(f"[!] Ошибка: {e}")
finally:
pbar.update(1)

self.stats["errors"] = errors
elapsed = time.time() - self.stats["start_time"]
rate = len(results) / elapsed if elapsed > 0 else 0

print(f"[+] Завершено: {len(results):,} файлов за {elapsed:.1f}с ({rate:.1f} файл/с)")
return results

def process_io_bound(
self,
items: list,
worker_func: Callable,
desc: str = "I/O операции",
) -> List[Any]:
"""
Обработка I/O-интенсивных задач (хэширование, копирование, VT-запросы).
Использует ThreadPoolExecutor.
"""
results = []

with concurrent.futures.ThreadPoolExecutor(
max_workers=self.io_workers
) as executor:
futures = {executor.submit(worker_func, item): item for item in items}

with tqdm(total=len(items), desc=desc, unit="операций") as pbar:
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
tqdm.write(f"[!] Ошибка: {e}")
finally:
pbar.update(1)

return results


async def async_hash_files(
file_paths: List[str],
chunk_size: int = 8192,
) -> List[dict]:
"""
Асинхронное хэширование большого числа файлов.
Эффективно для I/O-интенсивных операций.
"""
async def hash_single(path: str) -> dict:
md5 = hashlib.md5()
sha256 = hashlib.sha256()

try:
async with aiofiles.open(path, "rb") as f:
while True:
chunk = await f.read(chunk_size)
if not chunk:
break
md5.update(chunk)
sha256.update(chunk)

size = os.path.getsize(path)
return {
"path": path,
"size": size,
"md5": md5.hexdigest(),
"sha256": sha256.hexdigest(),
"error": None,
}
except Exception as e:
return {"path": path, "error": str(e)}

# Ограничить параллелизм семафором
semaphore = asyncio.Semaphore(50)

async def hash_with_limit(path: str) -> dict:
async with semaphore:
return await hash_single(path)

tasks = [hash_with_limit(p) for p in file_paths]
results = await asyncio.gather(*tasks, return_exceptions=False)
return list(results)


def optimized_file_hash_batch(file_paths: List[str]) -> List[dict]:
"""Обёртка для запуска асинхронного хэширования из синхронного кода."""
return asyncio.run(async_hash_files(file_paths))


def deduplicate_files(
file_list: List[dict],
hash_key: str = "sha256",
) -> tuple:
"""
Дедупликация файлов по хэшу.

Returns:
(unique_files, duplicates_map)
"""
seen = {}
unique = []
duplicates = {}

for file_info in file_list:
h = file_info.get(hash_key, "")
if not h:
unique.append(file_info)
continue

if h not in seen:
seen[h] = file_info
unique.append(file_info)
else:
if h not in duplicates:
duplicates[h] = [seen[h]["path"]]
duplicates[h].append(file_info["path"])

removed = sum(len(v) - 1 for v in duplicates.values())
print(f"[+] Дедупликация: {len(file_list):,} → {len(unique):,} уникальных "
f"(удалено {removed:,} дубликатов)")

return unique, duplicates




Анализ артефактов Windows: реестр, журналы событий, префетч


PE-файлы — лишь часть forensic-анализа. Не менее важны артефакты операционной системы Windows.

python
#!/usr/bin/env python3
"""
windows_artifacts.py — анализ ключевых артефактов Windows
Реестр, журналы событий, префетч, Amcache
"""
from Registry import Registry # pip install python-registry
from Evtx.Evtx import Evtx # pip install python-evtx
from Evtx.Views import evtx_file_xml_view
import xml.etree.ElementTree as ET
import json
import datetime
from pathlib import Path
from typing import List, Optional


<h2 id="reestr">==================== РЕЕСТР ====================</h2>

<h2 id="klyuchevye-klyuchi-reestra-dlya-analiza-zakrepleniya">Ключевые ключи реестра для анализа закрепления</h2>
PERSISTENCE_REGISTRY_KEYS = [
# Автозапуск
r"SOFTWARE\Microsoft\Windows\CurrentVersion\Run",
r"SOFTWARE\Microsoft\Windows\CurrentVersion\RunOnce",
r"SOFTWARE\Microsoft\Windows\CurrentVersion\RunOnceEx",
r"SOFTWARE\Wow6432Node\Microsoft\Windows\CurrentVersion\Run",
# Службы
r"SYSTEM\CurrentControlSet\Services",
# Winlogon
r"SOFTWARE\Microsoft\Windows NT\CurrentVersion\Winlogon",
# Image File Execution Options (IFEO hijacking)
r"SOFTWARE\Microsoft\Windows NT\CurrentVersion\Image File Execution Options",
# AppInit DLLs
r"SOFTWARE\Microsoft\Windows NT\CurrentVersion\Windows",
# Browser helper objects
r"SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\Browser Helper Objects",
# Scheduled tasks via registry
r"SOFTWARE\Microsoft\Windows NT\CurrentVersion\Schedule\TaskCache\Tasks",
# COM hijacking
r"SOFTWARE\Classes\CLSID",
# UserInit
r"SOFTWARE\Microsoft\Windows NT\CurrentVersion\Winlogon",
# MRU и недавние файлы
r"SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\RecentDocs",
r"SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\RunMRU",
]


def analyze_registry_hive(
hive_path: str,
keys_of_interest: Optional[List[str]] = None,
) -> dict:
"""
Анализ куста реестра Windows.

Args:
hive_path: путь к файлу куста реестра (NTUSER.DAT, SYSTEM, SOFTWARE)
keys_of_interest: список ключей для анализа

Returns:
Словарь с найденными данными
"""
if keys_of_interest is None:
keys_of_interest = PERSISTENCE_REGISTRY_KEYS

results = {
"hive_path": hive_path,
"found_keys": {},
"persistence_indicators": [],
"errors": [],
}

try:
reg = Registry.Registry(hive_path)
except Exception as e:
results["errors"].append(f"Не удалось открыть куст: {e}")
return results

def get_key_values(key) -> dict:
"""Получить все значения ключа реестра."""
values = {}
try:
for value in key.values():
try:
val_data = value.value()
if isinstance(val_data, bytes):
val_data = val_data.hex()
values[value.name()] = {
"data": str(val_data)[:500],
"type": str(value.value_type_str()),
}
except Exception:
values[value.name()] = {"data": "ERROR", "type": "UNKNOWN"}
except Exception:
pass
return values

# Перебрать интересующие ключи
for key_path in keys_of_interest:
try:
key = reg.open(key_path)
key_data = {
"last_modified": key.timestamp().strftime("%Y-%m-%d %H:%M:%S"),
"values": get_key_values(key),
"subkeys": [],
}

# Для ключей автозапуска — собрать записи автозапуска
if "Run" in key_path and "Run" not in key_path.split("\\")[-1].replace("Run", ""):
for val_name, val_data in key_data["values"].items():
if val_name and val_data["data"]:
results["persistence_indicators"].append({
"type": "AUTORUN",
"key": key_path,
"name": val_name,
"command": val_data["data"],
"last_modified": key_data["last_modified"],
})

# Для ключа Services — искать подозрительные службы
if "Services" in key_path:
for subkey in key.subkeys():
try:
svc_values = get_key_values(subkey)
image_path = svc_values.get("ImagePath", {}).get("data", "")
start_type = svc_values.get("Start", {}).get("data", "")

# Подозрительный путь службы
suspicious = False
for suspect in ["\\Temp\\", "\\AppData\\", "\\Downloads\\", "%TEMP%"]:
if suspect.lower() in image_path.lower():
suspicious = True

if suspicious:
results["persistence_indicators"].append({
"type": "SUSPICIOUS_SERVICE",
"name": subkey.name(),
"image_path": image_path,
"start_type": start_type,
"last_modified": subkey.timestamp().strftime(
"%Y-%m-%d %H:%M:%S"
),
})
except Exception:
continue

results["found_keys"][key_path] = key_data

except Registry.RegistryKeyNotFoundException:
pass # Ключ не существует — норма
except Exception as e:
results["errors"].append(f"Ошибка ключа {key_path}: {e}")

return results


<h2 id="zhurnaly-sobytiy">==================== ЖУРНАЛЫ СОБЫТИЙ ====================</h2>

<h2 id="event-id-y-vazhnye-dlya-forensic-analiza">Event ID&#039;ы, важные для forensic-анализа</h2>
IMPORTANT_EVENT_IDS = {
# Аутентификация
4624: "Успешный вход",
4625: "Неудачный вход",
4634: "Выход из системы",
4648: "Вход с явными учётными данными",
4672: "Специальные привилегии при входе",
4776: "NTLM-аутентификация",
# Учётные записи
4720: "Создана учётная запись",
4722: "Учётная запись включена",
4724: "Сброс пароля",
4728: "Добавление в группу",
4732: "Добавление в локальную группу",
4738: "Изменена учётная запись",
# Процессы
4688: "Создан новый процесс",
4689: "Процесс завершён",
# Сеть
5140: "Доступ к сетевому ресурсу",
5145: "Проверка доступа к сетевому объекту",
# Политики
4697: "Установлена служба",
4698: "Создана задача планировщика",
4702: "Изменена задача планировщика",
# Firewall
5156: "Разрешённое соединение",
5157: "Заблокированное соединение",
# PowerShell
4103: "Выполнение модуля PowerShell",
4104: "Выполнение скрипта PowerShell",
}


def parse_evtx(
evtx_path: str,
event_ids: Optional[List[int]] = None,
start_time: Optional[datetime.datetime] = None,
end_time: Optional[datetime.datetime] = None,
) -> List[dict]:
"""
Разбор журнала событий Windows (.evtx).

Args:
evtx_path: путь к файлу журнала
event_ids: фильтр по Event ID (None = все)
start_time/end_time: временно́й фильтр

Returns:
Список событий
"""
events = []

try:
with Evtx(evtx_path) as log:
for record in log.records():
try:
xml_str = record.xml()
root = ET.fromstring(xml_str)
ns = {"e": "http://schemas.microsoft.com/win/2004/08/events/event"}

# Извлечь базовые поля
system = root.find("e:System", ns)
if system is None:
continue

event_id_elem = system.find("e:EventID", ns)
event_id = int(event_id_elem.text) if event_id_elem is not None else 0

# Фильтр по Event ID
if event_ids and event_id not in event_ids:
continue

# Временна́я метка
time_created = system.find("e:TimeCreated", ns)
ts_str = time_created.get("SystemTime", "") if time_created is not None else ""
try:
ts = datetime.datetime.fromisoformat(ts_str.rstrip("Z"))
except Exception:
ts = None

# Временно́й фильтр
if ts:
if start_time and ts < start_time:
continue
if end_time and ts > end_time:
continue

# Извлечь Computer и Channel
computer = system.find("e:Computer", ns)
channel = system.find("e:Channel", ns)
provider = system.find("e:Provider", ns)

event = {
"event_id": event_id,
"event_name": IMPORTANT_EVENT_IDS.get(event_id, f"Event_{event_id}"),
"timestamp": ts.isoformat() if ts else ts_str,
"computer": computer.text if computer is not None else "",
"channel": channel.text if channel is not None else "",
"provider": provider.get("Name", "") if provider is not None else "",
"data": {},
}

# Извлечь EventData
event_data = root.find("e:EventData", ns)
if event_data is not None:
for data_elem in event_data.findall("e:Data", ns):
name = data_elem.get("Name", f"Data_{len(event['data'])}")
value = data_elem.text or ""
event["data"][name] = value

# Специальная обработка важных событий
if event_id == 4688: # Создание процесса
event["process_name"] = event["data"].get("NewProcessName", "")
event["parent_process"] = event["data"].get("ParentProcessName", "")
event["command_line"] = event["data"].get("CommandLine", "")
event["user"] = event["data"].get("SubjectUserName", "")

elif event_id == 4624: # Успешный вход
event["logon_type"] = event["data"].get("LogonType", "")
event["user"] = event["data"].get("TargetUserName", "")
event["source_ip"] = event["data"].get("IpAddress", "")
event["workstation"] = event["data"].get("WorkstationName", "")

events.append(event)

except ET.ParseError:
continue
except Exception:
continue

except Exception as e:
print(f"[!] Ошибка разбора EVTX {evtx_path}: {e}")

return events


def analyze_security_log(evtx_path: str) -> dict:
"""
Высокоуровневый анализ журнала безопасности Windows.
Ищет признаки атаки: brute force, lateral movement, привилегированные действия.
"""
print(f"[*] Анализ журнала безопасности: {evtx_path}")

# Парсить все важные события
events = parse_evtx(
evtx_path,
event_ids=list(IMPORTANT_EVENT_IDS.keys()),
)

analysis = {
"total_events": len(events),
"brute_force_indicators": [],
"suspicious_processes": [],
"powershell_activity": [],
"lateral_movement": [],
"privilege_escalation": [],
"new_services": [],
"scheduled_tasks": [],
"timeline": [],
}

# Детектирование bruteforce (много 4625 с одного IP)
failed_logins = {}
for e in events:
if e["event_id"] == 4625:
ip = e["data"].get("IpAddress", "")
user = e["data"].get("TargetUserName", "")
if ip and ip not in ("-", "::1", "127.0.0.1"):
key = f"{ip}:{user}"
failed_logins[key] = failed_logins.get(key, 0) + 1

for key, count in failed_logins.items():
if count >= 5:
ip, user = key.split(":", 1)
analysis["brute_force_indicators"].append({
"source_ip": ip,
"target_user": user,
"failed_attempts": count,
"severity": "HIGH" if count >= 20 else "MEDIUM",
})

# PowerShell активность
for e in events:
if e["event_id"] in (4103, 4104):
script_block = e["data"].get("ScriptBlockText", "")
# Подозрительные паттерны в PowerShell
suspicious_ps = [
"IEX", "Invoke-Expression",
"-enc", "-EncodedCommand",
"DownloadString", "DownloadFile",
"WebClient", "Net.WebClient",
"Reflection.Assembly",
"FromBase64String",
]
found = [p for p in suspicious_ps if p.lower() in script_block.lower()]
if found:
analysis["powershell_activity"].append({
"timestamp": e["timestamp"],
"suspicious_patterns": found,
"script_preview": script_block[:200],
})

# Новые службы (возможное закрепление)
for e in events:
if e["event_id"] == 4697:
analysis["new_services"].append({
"timestamp": e["timestamp"],
"service_name": e["data"].get("ServiceName", ""),
"service_file": e["data"].get("ServiceFileName", ""),
"service_account": e["data"].get("ServiceAccount", ""),
})

# Подозрительные процессы
SUSPICIOUS_PROCESS_PATTERNS = [
"powershell", "cmd.exe", "wscript", "cscript",
"mshta", "regsvr32", "rundll32", "certutil",
"bitsadmin", "wmic", "net.exe", "psexec",
"mimikatz", "procdump", "lsass",
]

for e in events:
if e["event_id"] == 4688:
proc = e.get("process_name", "").lower()
cmdline = e.get("command_line", "").lower()

for pattern in SUSPICIOUS_PROCESS_PATTERNS:
if pattern in proc or pattern in cmdline:
analysis["suspicious_processes"].append({
"timestamp": e["timestamp"],
"process": e.get("process_name", ""),
"parent": e.get("parent_process", ""),
"command_line": e.get("command_line", ""),
"user": e.get("user", ""),
"matched_pattern": pattern,
})
break

print(f"[+] Проанализировано событий: {len(events):,}")
print(f"[!] Индикаторов brute-force: {len(analysis['brute_force_indicators'])}")
print(f"[!] Подозрительных процессов: {len(analysis['suspicious_processes'])}")
print(f"[!] PowerShell активности: {len(analysis['powershell_activity'])}")
print(f"[!] Новых служб: {len(analysis['new_services'])}")

return analysis




Генерация отчётов: HTML, JSON и STIX 2.1


Результаты анализа необходимо представить в читаемом виде. Рассмотрим генерацию отчётов в разных форматах.

python
#!/usr/bin/env python3
"""
report_generator.py — генерация forensic-отчётов в HTML и STIX 2.1
"""
import json
import datetime
from pathlib import Path
from typing import List
from jinja2 import Template


HTML_TEMPLATE = """<!DOCTYPE html>
<html lang="ru">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Forensic Analysis Report — {{ report_id }}</title>
<style>
body { font-family: 'Segoe UI', Arial, sans-serif; margin: 0; padding: 20px;
background: #1a1a2e; color: #e0e0e0; }
.container { max-width: 1200px; margin: 0 auto; }
h1 { color: #00b4d8; border-bottom: 2px solid #00b4d8; padding-bottom: 10px; }
h2 { color: #90e0ef; margin-top: 30px; }
.stats-grid { display: grid; grid-template-columns: repeat(4, 1fr);
gap: 15px; margin: 20px 0; }
.stat-card { background: #16213e; border: 1px solid #0f3460;
border-radius: 8px; padding: 15px; text-align: center; }
.stat-card .number { font-size: 2em; font-weight: bold; }
.stat-card.danger .number { color: #e74c3c; }
.stat-card.warning .number { color: #f39c12; }
.stat-card.info .number { color: #3498db; }
.stat-card.success .number { color: #2ecc71; }
table { width: 100%; border-collapse: collapse; margin: 15px 0; }
th { background: #0f3460; color: #00b4d8; padding: 10px; text-align: left; }
td { padding: 8px 10px; border-bottom: 1px solid #0f3460; font-size: 0.9em; }
tr:hover td { background: #16213e; }
.badge { padding: 3px 8px; border-radius: 4px; font-size: 0.8em;
font-weight: bold; display: inline-block; }
.badge-critical { background: #c0392b; color: white; }
.badge-high { background: #e74c3c; color: white; }
.badge-medium { background: #e67e22; color: white; }
.badge-low { background: #27ae60; color: white; }
.badge-malicious { background: #c0392b; color: white; }
.badge-suspicious { background: #e67e22; color: white; }
.badge-benign { background: #27ae60; color: white; }
.hash { font-family: monospace; font-size: 0.85em; color: #95a5a6; }
.indicator { background: #1a1a2e; border-left: 4px solid #e74c3c;
margin: 5px 0; padding: 8px 12px; border-radius: 0 4px 4px 0; }
.section { background: #16213e; border-radius: 8px; padding: 20px;
margin: 20px 0; border: 1px solid #0f3460; }
pre { background: #0d0d1a; padding: 15px; border-radius: 4px;
overflow-x: auto; font-size: 0.85em; color: #00ff41; }
.timestamp { color: #7f8c8d; font-size: 0.85em; }
.footer { text-align: center; margin-top: 40px; color: #7f8c8d;
font-size: 0.85em; border-top: 1px solid #0f3460; padding-top: 20px; }
</style>
</head>
<body>
<div class="container">
<h1>🔍 Forensic Analysis Report</h1>
<p class="timestamp">
Отчёт сгенерирован: {{ generated_at }} | ID: {{ report_id }}
</p>

<div class="section">
<h2>📊 Сводная статистика</h2>
<div class="stats-grid">
<div class="stat-card danger">
<div class="number">{{ stats.malicious }}</div>
<div>MALICIOUS</div>
</div>
<div class="stat-card warning">
<div class="number">{{ stats.suspicious }}</div>
<div>SUSPICIOUS</div>
</div>
<div class="stat-card info">
<div class="number">{{ stats.total }}</div>
<div>Всего файлов</div>
</div>
<div class="stat-card success">
<div class="number">{{ stats.yara_hits }}</div>
<div>YARA срабатываний</div>
</div>
</div>
</div>

{% if high_risk_files %}
<div class="section">
<h2>🚨 Файлы высокого риска</h2>
<table>
<thead>
<tr>
<th>Файл</th>
<th>SHA256</th>
<th>Оценка риска</th>
<th>Классификация</th>
<th>VT</th>
<th>Индикаторы</th>
</tr>
</thead>
<tbody>
{% for f in high_risk_files %}
<tr>
<td>{{ f.file }}</td>
<td class="hash">{{ f.sha256[:16] }}...</td>
<td><b>{{ f.risk_score }}</b></td>
<td>
<span class="badge badge-{{ f.classification.lower() }}">
{{ f.classification }}
</span>
</td>
<td>
{% if f.vt_detection %}
<span class="badge badge-critical">{{ f.vt_detection }}</span>
{% else %}
-
{% endif %}
</td>
<td>
{% for ind in f.indicators[:3] %}
<div class="indicator">{{ ind }}</div>
{% endfor %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endif %}

{% if yara_hits %}
<div class="section">
<h2>🎯 YARA срабатывания</h2>
<table>
<thead>
<tr><th>Файл</th><th>Правило</th><th>Теги</th><th>Описание</th></tr>
</thead>
<tbody>
{% for hit in yara_hits %}
<tr>
<td>{{ hit.file }}</td>
<td><b>{{ hit.rule }}</b></td>
<td>{{ hit.tags | join(', ') }}</td>
<td>{{ hit.description }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endif %}

{% if persistence_indicators %}
<div class="section">
<h2>🔒 Индикаторы закрепления</h2>
{% for pi in persistence_indicators %}
<div class="indicator">
<b>[{{ pi.type }}]</b> {{ pi.name }}:
<code>{{ pi.command }}</code>
<span class="timestamp"> — {{ pi.last_modified }}</span>
</div>
{% endfor %}
</div>
{% endif %}

{% if timeline %}
<div class="section">
<h2>⏱️ Временна́я шкала (топ событий)</h2>
<table>
<thead>
<tr>
<th>Время</th><th>Событие</th><th>Артефакт</th>
</tr>
</thead>
<tbody>
{% for event in timeline[:50] %}
<tr>
<td class="timestamp">{{ event.timestamp }}</td>
<td>{{ event.event_name }}</td>
<td>{{ event.artifact }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endif %}

<div class="footer">
<p>Forensic Automation with Python | DFIR Report | {{ generated_at }}</p>
<p>⚠️ Данный отчёт является конфиденциальным документом и предназначен
только для авторизованного использования.</p>
</div>
</div>
</body>
</html>"""


def generate_html_report(
data: dict,
output_path: str,
) -> str:
"""Сгенерировать HTML-отчёт."""
template = Template(HTML_TEMPLATE)

# Подготовить данные для шаблона
report_data = {
"report_id": data.get("report_id", "RPT-" + datetime.datetime.now().strftime("%Y%m%d%H%M%S")),
"generated_at": datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
"stats": data.get("stats", {"malicious": 0, "suspicious": 0, "total": 0, "yara_hits": 0}),
"high_risk_files": data.get("high_risk_files", []),
"yara_hits": data.get("yara_hits", []),
"persistence_indicators": data.get("persistence_indicators", []),
"timeline": data.get("timeline", []),
}

html = template.render(report_data)

with open(output_path, "w", encoding="utf-8") as f:
f.write(html)

print(f"[+] HTML-отчёт сохранён: {output_path}")
return output_path


def generate_stix_bundle(
analysis_results: List[dict],
bundle_id: str,
) -> dict:
"""
Генерация STIX 2.1 bundle из результатов анализа.
Позволяет обмениваться индикаторами в стандартном формате.
"""
import uuid

objects = []
now = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")

# Identity объект (организация-автор)
identity = {
"type": "identity",
"spec_version": "2.1",
"id": f"identity--{uuid.uuid4()}",
"created": now,
"modified": now,
"name": "Forensic Lab",
"identity_class": "organization",
}
objects.append(identity)

for result in analysis_results:
sha256 = result.get("hashes", {}).get("sha256", "")
if not sha256:
continue

# Malware объект для подозрительных файлов
classification = result.get("classification", "BENIGN")
if classification in ("MALICIOUS", "LIKELY_MALICIOUS", "SUSPICIOUS"):

# File observable
file_obj = {
"type": "file",
"spec_version": "2.1",
"id": f"file--{uuid.uuid4()}",
"hashes": {
"SHA-256": sha256,
"MD5": result.get("hashes", {}).get("md5", ""),
"SHA-1": result.get("hashes", {}).get("sha1", ""),
},
"size": result.get("size", 0),
"name": Path(result.get("file", "unknown")).name,
}
objects.append(file_obj)

# Indicator с паттерном
indicator = {
"type": "indicator",
"spec_version": "2.1",
"id": f"indicator--{uuid.uuid4()}",
"created": now,
"modified": now,
"name": f"Suspicious file: {Path(result.get('file', 'unknown')).name}",
"pattern": f"[file:hashes.'SHA-256' = '{sha256}']",
"pattern_type": "stix",
"valid_from": now,
"confidence": 50 if classification == "SUSPICIOUS" else 80,
"labels": ["malicious-activity"],
"description": (
f"Risk score: {result.get('risk_score', 0)}, "
f"Classification: {classification}, "
f"Indicators: {len(result.get('suspicious_indicators', []))}"
),
"external_references": [],
}

# Добавить VT ссылку если есть
vt_data = result.get("virustotal", {})
if vt_data.get("found") and vt_data.get("malicious", 0) > 0:
indicator["external_references"].append({
"source_name": "VirusTotal",
"url": f"https://www.virustotal.com/gui/file/{sha256}",
"description": f"Detection: {vt_data.get('detection_ratio', 'N/A')}",
})

objects.append(indicator)

# Network indicators
net_indicators = result.get("network_indicators", {})
for url in net_indicators.get("urls", [])[:5]:
url_obj = {
"type": "indicator",
"spec_version": "2.1",
"id": f"indicator--{uuid.uuid4()}",
"created": now,
"modified": now,
"name": f"Suspicious URL from malware",
"pattern": f"[url:value = '{url}']",
"pattern_type": "stix",
"valid_from": now,
"confidence": 60,
"labels": ["malicious-activity"],
}
objects.append(url_obj)

bundle = {
"type": "bundle",
"id": f"bundle--{bundle_id}",
"spec_version": "2.1",
"objects": objects,
}

return bundle




Реальные сценарии: forensic pipeline для IR-команды


Объединим все компоненты в единый pipeline для реального сценария incident response.

python
#!/usr/bin/env python3
"""
ir_pipeline.py — полный forensic pipeline для incident response
Объединяет все компоненты в единый автоматизированный процесс
"""
import sys
import os
import json
import datetime
import argparse
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional, List

<h2 id="import-nashih-moduley">Импорт наших модулей</h2>
<h2 id="from-extract-executables-import-extract-executables">from extract_executables import extract_executables</h2>
<h2 id="from-pe-classifier-import-batch-analyze">from pe_classifier import batch_analyze</h2>
<h2 id="from-yara-scanner-import-batch-yara-scan-compile-rules">from yara_scanner import batch_yara_scan, compile_rules</h2>
<h2 id="from-windows-artifacts-import-analyze-registry-hive-analyze-security-log">from windows_artifacts import analyze_registry_hive, analyze_security_log</h2>
<h2 id="from-threat-intel-import-enrich-results-with-vt">from threat_intel import enrich_results_with_vt</h2>
<h2 id="from-report-generator-import-generate-html-report-generate-stix-bundle">from report_generator import generate_html_report, generate_stix_bundle</h2>
<h2 id="from-parallel-forensics-import-forensicprocessingpool-optimized-file-hash-batch">from parallel_forensics import ForensicProcessingPool, optimized_file_hash_batch</h2>


@dataclass
class IRConfig:
"""Конфигурация pipeline для IR-анализа."""
image_path: str
output_dir: str
case_id: str
analyst: str
yara_rules_dir: Optional[str] = None
vt_api_key: Optional[str] = None
misp_url: Optional[str] = None
misp_key: Optional[str] = None
partition_offset: int = 0
max_workers: int = 4
include_deleted: bool = True
verbose: bool = False
tags: List[str] = field(default_factory=list)


class IncidentResponsePipeline:
"""
Полный автоматизированный pipeline для incident response.

Этапы:
1. Извлечение артефактов из образа диска
2. PE-анализ исполняемых файлов
3. YARA-сканирование
4. Анализ артефактов Windows (реестр, EVTX)
5. Обогащение данными VT/MISP
6. Генерация отчёта
"""

def __init__(self, config: IRConfig):
self.config = config
self.case_dir = Path(config.output_dir) / config.case_id
self.start_time = datetime.datetime.utcnow()

# Создать директории
for subdir in ["executables", "artifacts", "reports", "yara_hits", "logs"]:
(self.case_dir / subdir).mkdir(parents=True, exist_ok=True)

self.results = {
"case_id": config.case_id,
"analyst": config.analyst,
"start_time": self.start_time.isoformat(),
"image_path": config.image_path,
"stages": {},
}

print(f"""
╔══════════════════════════════════════════════════╗
║ FORENSIC IR PIPELINE v2.0 ║
║ Case: {config.case_id:<41}║
║ Image: {config.image_path[:40]:<41}║
║ Output: {str(self.case_dir)[:39]:<41}║
╚══════════════════════════════════════════════════╝
""")

def _log(self, message: str, level: str = "INFO"):
"""Логирование с временно́й меткой."""
ts = datetime.datetime.utcnow().strftime("%H:%M:%S")
icons = {"INFO": "[*]", "SUCCESS": "[+]", "WARNING": "[!]", "ERROR": "[X]"}
icon = icons.get(level, "[*]")
print(f"{ts} {icon} {message}")

# Записать в лог-файл
log_file = self.case_dir / "logs" / "pipeline.log"
with open(log_file, "a", encoding="utf-8") as f:
f.write(f"{datetime.datetime.utcnow().isoformat()} [{level}] {message}\n")

def stage_1_extract_artifacts(self) -> dict:
"""Этап 1: Извлечение артефактов из образа диска."""
self._log("ЭТАП 1: Извлечение артефактов из образа диска")

stage_result = {
"status": "started",
"start_time": datetime.datetime.utcnow().isoformat(),
}

exec_output = str(self.case_dir / "executables")

# Подключаем реальный модуль
# extracted = extract_executables(
# self.config.image_path,
# exec_output,
# partition_offset=self.config.partition_offset,
# include_deleted=self.config.include_deleted,
# )

# Имитация для демонстрации структуры
extracted = []
self._log(f"Извлечено исполняемых файлов: {len(extracted)}", "SUCCESS")

stage_result["status"] = "completed"
stage_result["extracted_files"] = len(extracted)
stage_result["end_time"] = datetime.datetime.utcnow().isoformat()

self.results["stages"]["1_extraction"] = stage_result
return stage_result

def stage_2_pe_analysis(self) -> dict:
"""Этап 2: PE-анализ извлечённых файлов."""
self._log("ЭТАП 2: Массовый PE-анализ")

exec_dir = str(self.case_dir / "executables")
pe_output = str(self.case_dir / "artifacts")

stage_result = {
"status": "started",
"start_time": datetime.datetime.utcnow().isoformat(),
}

# pe_results = batch_analyze(exec_dir, pe_output, max_workers=self.config.max_workers)

pe_results = []
malicious = sum(1 for r in pe_results if r.get("classification") in ("MALICIOUS", "LIKELY_MALICIOUS"))
suspicious = sum(1 for r in pe_results if r.get("classification") == "SUSPICIOUS")

self._log(f"PE-анализ завершён: {len(pe_results)} файлов", "SUCCESS")
self._log(f" MALICIOUS/LIKELY: {malicious}", "WARNING" if malicious > 0 else "INFO")
self._log(f" SUSPICIOUS: {suspicious}", "WARNING" if suspicious > 0 else "INFO")

stage_result.update({
"status": "completed",
"total_analyzed": len(pe_results),
"malicious": malicious,
"suspicious": suspicious,
"end_time": datetime.datetime.utcnow().isoformat(),
})

self.results["stages"]["2_pe_analysis"] = stage_result
self.results["pe_results"] = pe_results
return stage_result

def stage_3_yara_scan(self) -> dict:
"""Этап 3: YARA-сканирование."""
self._log("ЭТАП 3: YARA-сканирование")

stage_result = {"status": "skipped"}

if not self.config.yara_rules_dir:
self._log("YARA-правила не указаны — этап пропущен", "WARNING")
self.results["stages"]["3_yara"] = stage_result
return stage_result

exec_dir = str(self.case_dir / "executables")
yara_output = str(self.case_dir / "yara_hits" / "results.json")

# yara_results = batch_yara_scan(exec_dir, self.config.yara_rules_dir, yara_output)
yara_results = []
hits = sum(1 for r in yara_results if r.get("matches"))

self._log(f"YARA: {hits} файлов с совпадениями из {len(yara_results)}", "SUCCESS")

stage_result = {
"status": "completed",
"total_scanned": len(yara_results),
"files_with_hits": hits,
"end_time": datetime.datetime.utcnow().isoformat(),
}
self.results["stages"]["3_yara"] = stage_result
self.results["yara_results"] = yara_results
return stage_result

def stage_4_windows_artifacts(self) -> dict:
"""Этап 4: Анализ артефактов Windows."""
self._log("ЭТАП 4: Анализ артефактов Windows")

stage_result = {"status": "started"}
artifacts_dir = self.case_dir / "artifacts"

all_persistence = []
evtx_analysis = {}

# Поиск файлов реестра в извлечённых артефактах
registry_hives = [
"NTUSER.DAT", "SYSTEM", "SOFTWARE", "SAM", "SECURITY",
"USRCLASS.DAT",
]

for hive_name in registry_hives:
hive_path = artifacts_dir / hive_name
if hive_path.exists():
self._log(f" Анализ куста реестра: {hive_name}")
try:
# reg_analysis = analyze_registry_hive(str(hive_path))
# all_persistence.extend(reg_analysis.get("persistence_indicators", []))
pass
except Exception as e:
self._log(f" Ошибка {hive_name}: {e}", "WARNING")

# Поиск EVTX-файлов
for evtx_file in artifacts_dir.glob("*.evtx"):
self._log(f" Анализ журнала: {evtx_file.name}")
try:
# analysis = analyze_security_log(str(evtx_file))
# evtx_analysis[evtx_file.name] = analysis
pass
except Exception as e:
self._log(f" Ошибка {evtx_file.name}: {e}", "WARNING")

self._log(f"Индикаторов закрепления: {len(all_persistence)}", "SUCCESS")

stage_result.update({
"status": "completed",
"persistence_indicators": len(all_persistence),
"end_time": datetime.datetime.utcnow().isoformat(),
})

self.results["stages"]["4_windows_artifacts"] = stage_result
self.results["persistence_indicators"] = all_persistence
self.results["evtx_analysis"] = evtx_analysis
return stage_result

def stage_5_threat_intel(self) -> dict:
"""Этап 5: Обогащение данными Threat Intelligence."""
self._log("ЭТАП 5: Обогащение Threat Intelligence")

stage_result = {"status": "skipped"}

if not self.config.vt_api_key:
self._log("VT API ключ не указан — пропуск", "WARNING")
self.results["stages"]["5_threat_intel"] = stage_result
return stage_result

# Обогатить PE-результаты VT данными
# pe_results = enrich_results_with_vt(
# self.results.get("pe_results", []),
# self.config.vt_api_key,
# cache_file=str(self.case_dir / "artifacts" / "vt_cache.json"),
# )

stage_result = {
"status": "completed",
"end_time": datetime.datetime.utcnow().isoformat(),
}
self.results["stages"]["5_threat_intel"] = stage_result
return stage_result

def stage_6_reporting(self) -> dict:
"""Этап 6: Генерация отчётов."""
self._log("ЭТАП 6: Генерация отчётов")

end_time = datetime.datetime.utcnow()
elapsed = (end_time - self.start_time).total_seconds()

# Подготовить данные для отчёта
pe_results = self.results.get("pe_results", [])
report_data = {
"report_id": self.config.case_id,
"stats": {
"total": len(pe_results),
"malicious": sum(1 for r in pe_results if r.get("classification") in ("MALICIOUS", "LIKELY_MALICIOUS")),
"suspicious": sum(1 for r in pe_results if r.get("classification") == "SUSPICIOUS"),
"yara_hits": self.results.get("stages", {}).get("3_yara", {}).get("files_with_hits", 0),
},
"high_risk_files": [
{
"file": Path(r.get("file", "")).name,
"sha256": r.get("hashes", {}).get("sha256", ""),
"risk_score": r.get("risk_score", 0),
"classification": r.get("classification", ""),
"vt_detection": r.get("virustotal", {}).get("detection_ratio", ""),
"indicators": [
i["description"]
for i in r.get("suspicious_indicators", [])
][:5],
}
for r in pe_results
if r.get("classification") in ("MALICIOUS", "LIKELY_MALICIOUS", "SUSPICIOUS")
][:50],
"persistence_indicators": self.results.get("persistence_indicators", []),
}

# HTML отчёт
html_path = str(self.case_dir / "reports" / f"{self.config.case_id}_report.html")
# generate_html_report(report_data, html_path)
self._log(f"HTML-отчёт: {html_path}", "SUCCESS")

# JSON отчёт (машиночитаемый)
json_path = self.case_dir / "reports" / f"{self.config.case_id}_results.json"
self.results["end_time"] = end_time.isoformat()
self.results["elapsed_seconds"] = elapsed
with open(json_path, "w", encoding="utf-8") as f:
json.dump(self.results, f, indent=2, ensure_ascii=False, default=str)
self._log(f"JSON-отчёт: {json_path}", "SUCCESS")

self._log(f"""
╔══════════════════════════════════════╗
║ PIPELINE ЗАВЕРШЁН ║
║ Время: {elapsed:.1f} сек ║
║ Кейс: {self.config.case_id:<31}║
╚══════════════════════════════════════╝
""", "SUCCESS")

return {"status": "completed", "html": html_path, "json": str(json_path)}

def run(self):
"""Запустить полный pipeline."""
try:
self.stage_1_extract_artifacts()
self.stage_2_pe_analysis()
self.stage_3_yara_scan()
self.stage_4_windows_artifacts()
self.stage_5_threat_intel()
self.stage_6_reporting()
except KeyboardInterrupt:
self._log("Прервано пользователем", "WARNING")
except Exception as e:
self._log(f"Критическая ошибка pipeline: {e}", "ERROR")
raise


def main():
parser = argparse.ArgumentParser(
description="Forensic IR Pipeline — автоматизированный анализ образов дисков"
)
parser.add_argument("image", help="Путь к образу диска (RAW, E01)")
parser.add_argument("output", help="Директория для результатов")
parser.add_argument("--case-id", required=True, help="ID кейса (например, IR-2026-001)")
parser.add_argument("--analyst", default="Unknown", help="Имя аналитика")
parser.add_argument("--yara-rules", help="Директория с YARA-правилами")
parser.add_argument("--vt-api-key", help="VirusTotal API ключ")
parser.add_argument("--partition-offset", type=int, default=0,
help="Смещение раздела в секторах")
parser.add_argument("--workers", type=int, default=4,
help="Число параллельных воркеров")
parser.add_argument("--no-deleted", action="store_true",
help="Не анализировать удалённые файлы")
parser.add_argument("--verbose", action="store_true", help="Подробный вывод")

args = parser.parse_args()

config = IRConfig(
image_path=args.image,
output_dir=args.output,
case_id=args.case_id,
analyst=args.analyst,
yara_rules_dir=args.yara_rules,
vt_api_key=args.vt_api_key,
partition_offset=args.partition_offset,
max_workers=args.workers,
include_deleted=not args.no_deleted,
verbose=args.verbose,
)

pipeline = IncidentResponsePipeline(config)
pipeline.run()


if __name__ == "__main__":
main()




Часто задаваемые вопросы (FAQ)


#### Вопрос 1: Почему pytsk3 лучше просто смонтировать образ диска?

Монтирование образа потенциально заражённой системы в Linux создаёт риски: autorun-механизмы, специфика файловых систем, возможность исполнения кода через ядерные уязвимости при обработке специально созданных структур данных. pytsk3 читает образ как обычный файл через пространство пользователя — без монтирования, без ядерных драйверов для целевой файловой системы. Это криминалистически чистый подход, не изменяющий образ и не создающий рисков заражения.

#### Вопрос 2: Как pytsk3 работает с E01-образами EnCase?

pytsk3 поддерживает E01-формат через libewf. При установке необходимо убедиться, что libewf-dev установлена до компиляции pytsk3, иначе поддержка E01 не будет включена. Проверить: `python3 -c "import pytsk3; img = pytsk3.Img_Info('file.E01'); print(img.get_size())"`. Если получаете ошибку о неизвестном формате — переустановите pytsk3 с `libewf-dev` в системе.

#### Вопрос 3: Что означает высокая энтропия секции PE-файла?

Энтропия Шеннона измеряет «случайность» данных. Максимальная энтропия (8.0) означает абсолютно случайные данные — сжатые или зашифрованные байты. Секция .text легитимного исполняемого файла имеет энтропию 5.0–6.5. Значение выше 7.2 в секции PE сильно указывает на упаковку (UPX, MPRESS) или шифрование кода. Это не абсолютный индикатор вредоносности — многие легитимные программы используют упаковку — но в сочетании с другими признаками (отсутствие импортов, нестандартные имена секций) это сигнал для более детального анализа.

#### Вопрос 4: Как правильно анализировать upacked малварь с pefile?

Если файл упакован, pefile видит минимум импортов и высокую энтропию. Для анализа упакованного кода нужна распаковка: статическая (если упаковщик известен — UPX можно распаковать командой `upx -d`) или динамическая (запустить в sandbox, дождаться распаковки в памяти, сделать дамп). После распаковки анализ через pefile даёт полную картину. Для автоматической идентификации упаковщика используйте YARA-правила или библиотеку `DIE` (Detect It Easy) через её Python bindings.

#### Вопрос 5: Как ускорить массовый анализ тысяч файлов?

Несколько стратегий: во-первых, дедупликация по SHA256 перед анализом — дубликаты не нужно анализировать повторно. Во-вторых, использовать ProcessPoolExecutor для CPU-интенсивных задач (PE-анализ, YARA) с числом воркеров равным числу CPU. В-третьих, оптимальный порядок: сначала YARA-сканирование (быстро), затем PE-анализ только файлов с YARA-хитами и крупных подозрительных. В-четвёртых, VirusTotal-запросы только для уникальных хэшей с агрессивным кэшированием. Такой подход позволяет обработать 100 000 файлов за 2–4 часа на 8-ядерном сервере.

#### Вопрос 6: Какие YARA-правила рекомендуется использовать?

Начните с открытых репозиториев: Yara-Rules (github.com/Yara-Rules/rules) — большой общий набор, THOR-Lite rules от Nextron Systems, Elastic Security's detection-rules, правила от отдельных исследователей (Florian Roth, VK9). Для enterprise-использования — коммерческие фиды от вендоров. Важно регулярно обновлять правила и проверять их на ложных срабатываниях в вашей среде перед продакшн-применением. Храните правила в Git-репозитории для версионирования.

#### Вопрос 7: Как анализировать малварь без случайного запуска?

Строгие правила безопасности при работе с малварью: всегда в изолированной VM без сетевого доступа к корпоративным ресурсам; никогда не открывать файлы двойным кликом; использовать только статический анализ (pytsk3, pefile, capstone) для первичного анализа; для динамического анализа использовать специализированные sandboxes (Cuckoo Sandbox, ANY.RUN, Hybrid Analysis); при копировании файлов на хост-машину использовать архивирование с паролем (zip -e). Эти правила особенно важны для malware с возможностями escape from VM (хотя это редкость в реальных кейсах).

#### Вопрос 8: Как интегрировать результаты анализа в SIEM?

Наиболее распространённые форматы для интеграции с SIEM: STIX 2.1/TAXII для обмена индикаторами (показан в руководстве), CEF (Common Event Format) для Splunk и ArcSight, JSON-events для Elastic SIEM и OpenSearch. Также можно напрямую писать в Elasticsearch через python elasticsearch-py библиотеку. Для автоматической корреляции — настройте webhook на ваш IR pipeline, который триггерится при появлении нового SIEM-алерта и запускает автоматический анализ.

#### Вопрос 9: Как восстановить удалённые файлы через pytsk3?

Установите `include_deleted=True` в функции `walk_filesystem`. pytsk3 через TSK возвращает записи с флагом `TSK_FS_META_FLAG_UNALLOC` — это удалённые, но ещё не перезаписанные файлы. Для восстановления читайте содержимое через `read_file_content` — TSK пытается прочитать кластеры по адресам в MFT-записи. Успех зависит от того, были ли кластеры перезаписаны новыми данными. NTFS даёт лучшие результаты восстановления, чем FAT32, так как MFT-записи не стираются сразу при удалении файла.

#### Вопрос 10: Почему pefile иногда не может разобрать легитимный PE-файл?

Некоторые компиляторы (особенно Microsoft's linker в режиме оптимизации) и защитные упаковщики создают PE-файлы, нарушающие стандарт в незначительных деталях. pefile строг в разборе — он следует спецификации. Попробуйте: `pe = pefile.PE(file_path, fast_load=True)` — это пропускает ряд проверок и обрабатывает аномальные файлы. Если всё равно падает — файл может быть повреждён (частично скачан, обрезан при извлечении) или специально скомпрометирован для обхода анализаторов. Проверьте первые 4 байта (должно быть `4D 5A 50 45` или `4D 5A 90 00`).

#### Вопрос 11: Как автоматически извлекать IOC из PE-файлов?

Комбинируйте несколько источников: строки из `extract_strings()` — URL, IP, домены, пути реестра; импорты сетевых функций (WSAStartup, connect) указывают на сетевое взаимодействие; ресурсы с высокой энтропией могут содержать зашифрованный конфиг — попробуйте XOR с одним байтом или простые ключи; анализ бинарных данных на известные структуры C2-конфигов для популярных фреймворков (Cobalt Strike, Metasploit). Для Cobalt Strike существуют специализированные инструменты (1768.py от Didier Stevens) для извлечения конфигурации.

#### Вопрос 12: Как документировать forensic-анализ для судебного использования?

Ключевые принципы: хэши (MD5, SHA256) для всех образов и артефактов должны быть вычислены и задокументированы до начала работы; все действия логируются с временны́ми метками и именем аналитика; оригинальный образ не модифицируется — работайте с копией; весь программный код (скрипты) должен быть задокументирован и доступен для проверки; цепочка хранения доказательств (chain of custody) документируется физически. Отчёты генерируйте из логов автоматически — это гарантирует воспроизводимость результатов.



Заключение: Python как основа современной криминалистики


Цифровая криминалистика в 2026 году — это дисциплина, где выигрывает тот, кто автоматизирует рутину быстрее и надёжнее. Атакующие используют автоматизированные инструменты для масштабирования атак. Защитники обязаны отвечать тем же — иначе разрыв между скоростью атаки и скоростью расследования будет только увеличиваться.

Python с его экосистемой forensic-библиотек предоставляет всё необходимое для построения профессионального IR-pipeline. pytsk3 решает задачу работы с образами дисков без рисков монтирования. pefile даёт глубокое понимание исполняемых файлов без их запуска. capstone и r2pipe обеспечивают дизассемблирование. yara-python автоматизирует сигнатурное детектирование. python-evtx и python-registry открывают доступ к ключевым артефактам Windows.

#### Ключевые выводы из руководства

Всегда работайте с образами через pytsk3 или dfvfs — не монтируйте образы потенциально заражённых систем напрямую. Этот принцип должен стать рефлексом.

PE-анализ без запуска файла — первый и обязательный шаг при работе с подозрительными исполняемыми файлами. Высокая энтропия, отсутствие импортов, нестандартные секции, несоответствие контрольной суммы — каждый из этих признаков требует более глубокого исследования.

Параллелизация через ProcessPoolExecutor критична при работе с тысячами файлов. Правильно написанный параллельный pipeline обрабатывает в 4–8 раз быстрее последовательного.

Кэширование запросов к внешним API (VirusTotal) обязательно — это экономит квоты и время при повторных анализах в рамках одного кейса.

YARA-правила — это вложение: чем больше и качественнее ваш набор правил, тем быстрее первичная классификация. Инвестируйте время в создание и обновление кастомных правил под специфику вашей среды.