Изображение

Ситуация


Компания подверглась DDoS-атаке с множества IP-адресов. Необходимо выявить инфраструктуру злоумышленников, найти связанные ресурсы и передать информацию правоохранительным органам.

Исходные данные


- Логи атаки содержат 247 уникальных IP-адресов
- Атака продолжалась 3 часа
- Пиковая интенсивность: 15,000 запросов/секунду
- Целевой сервер: 192.168.100.50

Этап 1: Сбор и подготовка данных


Извлечение IP-адресов из логов


python
#!/usr/bin/env python3
import re
from collections import Counter

def extract_ips_from_logs(log_file):
"""Извлечение IP-адресов из логов"""
ip_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
ips = []

with open(log_file, 'r') as f:
for line in f:
matches = re.findall(ip_pattern, line)
# Исключаем целевой IP
ips.extend([ip for ip in matches if ip != '192.168.100.50'])

return list(set(ips)) # Уникальные IP

<h2 id="ispolzovanie">Использование</h2>
attack_ips = extract_ips_from_logs('ddos_attack.log')
print(f"Найдено {len(attack_ips)} уникальных IP-адресов")


Сохранение списка IP


python
with open('attack_ips.txt', 'w') as f:
for ip in attack_ips:
f.write(f"{ip}\n")


Этап 2: Группировка IP по диапазонам


Определение CIDR диапазонов


python
import ipaddress
from collections import defaultdict

def group_ips_into_ranges(ip_list):
"""Группировка IP-адресов по диапазонам"""
# Преобразуем в IP объекты
ip_objects = []
for ip_str in ip_list:
try:
ip_objects.append(ipaddress.ip_address(ip_str))
except:
continue

# Сортируем
ip_objects.sort()

# Группируем по /24 подсетям
ranges = defaultdict(list)
for ip in ip_objects:
# Получаем сеть /24 для IP
network = ipaddress.ip_network(f"{ip}/24", strict=False)
ranges[str(network)].append(str(ip))

return dict(ranges)

<h2 id="ispolzovanie">Использование</h2>
ip_ranges = group_ips_into_ranges(attack_ips)
print(f"IP-адреса сгруппированы в {len(ip_ranges)} диапазонов /24")

<h2 id="sohranenie-diapazonov">Сохранение диапазонов</h2>
with open('ip_ranges.txt', 'w') as f:
for cidr, ips in ip_ranges.items():
f.write(f"{cidr}: {len(ips)} IP\n")
for ip in ips:
f.write(f" {ip}\n")


Этап 3: Анализ диапазонов через Shodan


Настройка Shodan API


python
import shodan
import json
import time
from datetime import datetime

class ShodanRangeAnalyzer:
def __init__(self, api_key):
self.api = shodan.Shodan(api_key)
self.results = []

def analyze_ip_range(self, cidr):
"""Анализ диапазона через Shodan"""
network = ipaddress.ip_network(cidr)
start_ip = str(network.network_address + 1)
end_ip = str(network.broadcast_address - 1)

query = f"ip:{start_ip}-{end_ip}"

try:
results = self.api.search(query)
return {
'cidr': cidr,
'total': results['total'],
'matches': results['matches']
}
except shodan.APIError as e:
print(f"Ошибка Shodan для {cidr}: {e}")
return None

def analyze_multiple_ranges(self, ranges_list):
"""Анализ множества диапазонов"""
all_results = []

for cidr in ranges_list:
print(f"Анализ {cidr}...")
result = self.analyze_ip_range(cidr)
if result:
all_results.append(result)

# Rate limiting: 1 запрос в секунду
time.sleep(1)

return all_results

<h2 id="ispolzovanie">Использование</h2>
SHODAN_API_KEY = "your_api_key_here"
analyzer = ShodanRangeAnalyzer(SHODAN_API_KEY)

<h2 id="analiz-vseh-diapazonov">Анализ всех диапазонов</h2>
shodan_results = analyzer.analyze_multiple_ranges(list(ip_ranges.keys()))


Извлечение информации об устройствах


python
def extract_device_info(shodan_results):
"""Извлечение информации об устройствах"""
devices = []

for range_result in shodan_results:
for match in range_result['matches']:
device_info = {
'ip': match.get('ip_str', ''),
'hostnames': match.get('hostnames', []),
'org': match.get('org', ''),
'isp': match.get('isp', ''),
'country': match.get('location', {}).get('country_name', ''),
'city': match.get('location', {}).get('city', ''),
'ports': [item['port'] for item in match.get('data', [])],
'services': []
}

# Извлечение сервисов
for item in match.get('data', []):
service = {
'port': item.get('port', ''),
'product': item.get('product', ''),
'version': item.get('version', ''),
'banner': item.get('data', '')[:200] # Первые 200 символов
}
device_info['services'].append(service)

devices.append(device_info)

return devices

devices = extract_device_info(shodan_results)
print(f"Найдено {len(devices)} устройств в Shodan")


Этап 4: Анализ через Censys


Настройка Censys API


python
from censys.search import CensysHosts
import ipaddress

class CensysRangeAnalyzer:
def __init__(self, api_id, api_secret):
self.censys = CensysHosts(api_id=api_id, api_secret=api_secret)
self.results = []

def analyze_range(self, cidr):
"""Анализ диапазона через Censys"""
query = f"ip:{cidr}"

try:
hosts = self.censys.search(query, per_page=100)
return list(hosts)
except Exception as e:
print(f"Ошибка Censys для {cidr}: {e}")
return []

def analyze_multiple_ranges(self, ranges_list):
"""Анализ множества диапазонов"""
all_hosts = []

for cidr in ranges_list:
print(f"Анализ {cidr} через Censys...")
hosts = self.analyze_range(cidr)
all_hosts.extend(hosts)
time.sleep(1) # Rate limiting

return all_hosts

<h2 id="ispolzovanie">Использование</h2>
CENSYS_API_ID = "your_api_id"
CENSYS_API_SECRET = "your_api_secret"
censys_analyzer = CensysRangeAnalyzer(CENSYS_API_ID, CENSYS_API_SECRET)

censys_hosts = censys_analyzer.analyze_multiple_ranges(list(ip_ranges.keys()))
print(f"Найдено {len(censys_hosts)} хостов в Censys")


Извлечение данных из Censys


python
def extract_censys_data(censys_hosts):
"""Извлечение данных из результатов Censys"""
extracted = []

for host in censys_hosts:
host_data = {
'ip': host.get('ip', ''),
'services': [],
'location': host.get('location', {}),
'autonomous_system': host.get('autonomous_system', {}),
'dns': host.get('dns', {})
}

# Извлечение сервисов
for service in host.get('services', []):
service_data = {
'port': service.get('port', ''),
'service_name': service.get('service_name', ''),
'transport_protocol': service.get('transport_protocol', ''),
'certificate': service.get('certificate', {})
}
host_data['services'].append(service_data)

extracted.append(host_data)

return extracted

censys_data = extract_censys_data(censys_hosts)


Этап 5: Выявление связанных ресурсов


Поиск общих организаций


python
from collections import Counter, defaultdict

def find_common_organizations(devices, censys_data):
"""Поиск IP, принадлежащих одним организациям"""
org_ips = defaultdict(list)

# Из Shodan данных
for device in devices:
org = device.get('org', 'Unknown')
if org and org != 'Unknown':
org_ips[org].append(device['ip'])

# Из Censys данных
for host in censys_data:
as_data = host.get('autonomous_system', {})
org = as_data.get('name', 'Unknown')
if org and org != 'Unknown':
org_ips[org].append(host['ip'])

# Фильтрация: только организации с несколькими IP
significant_orgs = {
org: ips for org, ips in org_ips.items()
if len(set(ips)) > 1
}

return significant_orgs

common_orgs = find_common_organizations(devices, censys_data)
print(f"Найдено {len(common_orgs)} организаций с несколькими IP")

<h2 id="sohranenie-rezultatov">Сохранение результатов</h2>
with open('common_organizations.json', 'w') as f:
json.dump(common_orgs, f, indent=2)


Поиск связанных доменов


python
def find_related_domains(devices, censys_data):
"""Поиск доменов, связанных с IP-адресами"""
ip_to_domains = defaultdict(set)

# Из Shodan
for device in devices:
ip = device['ip']
for hostname in device.get('hostnames', []):
ip_to_domains[ip].add(hostname)

# Из Censys
for host in censys_data:
ip = host['ip']
dns_data = host.get('dns', {})

# Reverse DNS
reverse_dns = dns_data.get('reverse_dns', {}).get('names', [])
for name in reverse_dns:
ip_to_domains[ip].add(name)

return dict(ip_to_domains)

related_domains = find_related_domains(devices, censys_data)
print(f"Найдено доменов для {len(related_domains)} IP-адресов")

<h2 id="sohranenie">Сохранение</h2>
with open('related_domains.json', 'w') as f:
# Преобразуем sets в lists для JSON
domains_dict = {ip: list(domains) for ip, domains in related_domains.items()}
json.dump(domains_dict, f, indent=2)


Географический анализ


python
def geographic_analysis(devices, censys_data):
"""Географический анализ IP-адресов"""
locations = []

# Из Shodan
for device in devices:
locations.append({
'ip': device['ip'],
'country': device.get('country', 'Unknown'),
'city': device.get('city', 'Unknown')
})

# Из Censys
for host in censys_data:
location = host.get('location', {})
locations.append({
'ip': host['ip'],
'country': location.get('country', 'Unknown'),
'city': location.get('city', 'Unknown')
})

# Статистика по странам
countries = Counter([loc['country'] for loc in locations])

return locations, countries

locations, country_stats = geographic_analysis(devices, censys_data)
print("\nРаспределение по странам:")
for country, count in country_stats.most_common(10):
print(f" {country}: {count} IP")


Этап 6: Выявление паттернов ботнета


Анализ временных паттернов


python
def analyze_temporal_patterns(log_file):
"""Анализ временных паттернов атаки"""
from datetime import datetime
import re

time_pattern = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2})'
ip_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}\b'

hourly_activity = defaultdict(lambda: defaultdict(int))

with open(log_file, 'r') as f:
for line in f:
time_match = re.search(time_pattern, line)
ip_matches = re.findall(ip_pattern, line)

if time_match and ip_matches:
time_str = time_match.group(1)
try:
dt = datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S')
hour = dt.hour

for ip in ip_matches:
if ip != '192.168.100.50':
hourly_activity[ip][hour] += 1
except:
continue

return hourly_activity

temporal_patterns = analyze_temporal_patterns('ddos_attack.log')


Выявление координации


python
def detect_coordination(temporal_patterns):
"""Выявление координации между IP-адресами"""
# IP с похожей активностью считаются скоординированными
coordinated_groups = []

ip_list = list(temporal_patterns.keys())

for i, ip1 in enumerate(ip_list):
group = [ip1]
pattern1 = temporal_patterns[ip1]

for ip2 in ip_list[i+1:]:
pattern2 = temporal_patterns[ip2]

# Сравнение паттернов активности
common_hours = set(pattern1.keys()) & set(pattern2.keys())
if len(common_hours) > 3: # Минимум 3 общих часа активности
similarity = sum(
min(pattern1[h], pattern2[h])
for h in common_hours
) / max(sum(pattern1.values()), sum(pattern2.values()), 1)

if similarity > 0.5: # Порог схожести
group.append(ip2)

if len(group) > 1:
coordinated_groups.append(group)

return coordinated_groups

coordinated = detect_coordination(temporal_patterns)
print(f"Выявлено {len(coordinated)} групп скоординированных IP")


Этап 7: Генерация отчета


Создание комплексного отчета


python
def generate_investigation_report(attack_ips, ip_ranges, devices, censys_data, 
common_orgs, related_domains, locations,
country_stats, coordinated):
"""Генерация отчета о расследовании"""

report = {
'investigation_date': datetime.now().isoformat(),
'summary': {
'total_attack_ips': len(attack_ips),
'ip_ranges_identified': len(ip_ranges),
'devices_found_shodan': len(devices),
'hosts_found_censys': len(censys_data),
'organizations_identified': len(common_orgs),
'domains_identified': sum(len(d) for d in related_domains.values()),
'coordinated_groups': len(coordinated)
},
'ip_ranges': {
cidr: {
'ip_count': len(ips),
'ips': ips
} for cidr, ips in ip_ranges.items()
},
'organizations': {
org: {
'ip_count': len(set(ips)),
'ips': list(set(ips))
} for org, ips in common_orgs.items()
},
'geographic_distribution': dict(country_stats.most_common(20)),
'related_domains': {
ip: list(domains) for ip, domains in related_domains.items()
},
'coordinated_groups': [
{
'group_id': i+1,
'ip_count': len(group),
'ips': group
} for i, group in enumerate(coordinated)
],
'devices_details': devices[:100], # Первые 100 устройств
'recommendations': [
"Блокировать все выявленные IP-диапазоны на уровне файрвола",
"Передать информацию о координации в правоохранительные органы",
"Мониторить связанные домены на предмет будущей активности",
"Усилить защиту от DDoS на уровне инфраструктуры"
]
}

return report

<h2 id="generatsiya-otcheta">Генерация отчета</h2>
report = generate_investigation_report(
attack_ips, ip_ranges, devices, censys_data,
common_orgs, related_domains, locations, country_stats, coordinated
)

<h2 id="sohranenie">Сохранение</h2>
with open('ddos_investigation_report.json', 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)

print("\nОтчет сохранен: ddos_investigation_report.json")


Создание текстового отчета


python
def generate_text_report(report):
"""Генерация текстового отчета"""

text = f"""
ОТЧЕТ О РАССЛЕДОВАНИИ DDoS-АТАКИ
Дата расследования: {report['investigation_date']}

КРАТКАЯ СВОДКА
==============
Всего IP-адресов атаки: {report['summary']['total_attack_ips']}
Выявлено диапазонов: {report['summary']['ip_ranges_identified']}
Найдено устройств (Shodan): {report['summary']['devices_found_shodan']}
Найдено хостов (Censys): {report['summary']['hosts_found_censys']}
Выявлено организаций: {report['summary']['organizations_identified']}
Найдено доменов: {report['summary']['domains_identified']}
Групп координации: {report['summary']['coordinated_groups']}

ГЕОГРАФИЧЕСКОЕ РАСПРЕДЕЛЕНИЕ
=============================
"""

for country, count in list(report['geographic_distribution'].items())[:10]:
text += f"{country}: {count} IP\n"

text += f"""

ВЫЯВЛЕННЫЕ ОРГАНИЗАЦИИ
======================
"""

for org, data in list(report['organizations'].items())[:10]:
text += f"\n{org}: {data['ip_count']} IP-адресов\n"
for ip in data['ips'][:5]: # Первые 5 IP
text += f" - {ip}\n"

text += f"""

СВЯЗАННЫЕ ДОМЕНЫ
================
"""

domain_count = 0
for ip, domains in list(report['related_domains'].items())[:20]:
if domains:
text += f"\n{ip}:\n"
for domain in domains:
text += f" - {domain}\n"
domain_count += len(domains)
if domain_count > 50:
break

text += f"""

ГРУППЫ КООРДИНАЦИИ
==================
"""

for group in report['coordinated_groups'][:10]:
text += f"\nГруппа {group['group_id']}: {group['ip_count']} IP\n"
for ip in group['ips'][:10]:
text += f" - {ip}\n"

text += f"""

РЕКОМЕНДАЦИИ
============
"""

for i, rec in enumerate(report['recommendations'], 1):
text += f"{i}. {rec}\n"

return text

<h2 id="generatsiya-i-sohranenie-tekstovogo-otcheta">Генерация и сохранение текстового отчета</h2>
text_report = generate_text_report(report)
with open('ddos_investigation_report.txt', 'w', encoding='utf-8') as f:
f.write(text_report)

print("Текстовый отчет сохранен: ddos_investigation_report.txt")


Результаты расследования


Ключевые находки


1. 523 связанных IP-адреса выявлено в диапазонах
2. 47 связанных доменов обнаружено через обратный DNS и SSL сертификаты
3. 12 организаций с множественными IP-адресами
4. 8 групп координации выявлено через анализ временных паттернов
5. Географическое распределение: 65% IP из 3 стран

Инфраструктура ботнета


- Основные диапазоны: 8 подсетей /24
- Общие организации: 3 хостинг-провайдера
- Связанные домены: 47 доменов, многие с похожими именами
- Координация: синхронизированная активность в 8 группах

Используемые инструменты


- Shodan API - поиск устройств в интернете
- Censys API - комплексный анализ хостов
- Python скрипты - автоматизация анализа
- Maltego - визуализация связей (опционально)

Команды для запуска


bash
<h2 id="1-izvlechenie-ip-iz-logov">1. Извлечение IP из логов</h2>
python3 extract_ips.py ddos_attack.log

<h2 id="2-gruppirovka-po-diapazonam">2. Группировка по диапазонам</h2>
python3 group_ranges.py attack_ips.txt

<h2 id="3-analiz-cherez-shodan">3. Анализ через Shodan</h2>
python3 shodan_analysis.py ip_ranges.txt

<h2 id="4-analiz-cherez-censys">4. Анализ через Censys</h2>
python3 censys_analysis.py ip_ranges.txt

<h2 id="5-vyyavlenie-svyazey">5. Выявление связей</h2>
python3 find_connections.py

<h2 id="6-generatsiya-otcheta">6. Генерация отчета</h2>
python3 generate_report.py


Выводы


Анализ диапазонов IP позволил:
- Выявить полную инфраструктуру ботнета
- Установить связи между ресурсами
- Определить координацию атак

---

**⚠️ Дисклеймер:** Статья носит информационно-образовательный характер и не содержит инструкций для совершения противоправных действий.