
Ситуация
Компания подверглась DDoS-атаке с множества IP-адресов. Необходимо выявить инфраструктуру злоумышленников, найти связанные ресурсы и передать информацию правоохранительным органам.
Исходные данные
- Логи атаки содержат 247 уникальных IP-адресов
- Атака продолжалась 3 часа
- Пиковая интенсивность: 15,000 запросов/секунду
- Целевой сервер: 192.168.100.50
Этап 1: Сбор и подготовка данных
Извлечение IP-адресов из логов
python
#!/usr/bin/env python3
import re
from collections import Counter
def extract_ips_from_logs(log_file):
"""Извлечение IP-адресов из логов"""
ip_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
ips = []
with open(log_file, 'r') as f:
for line in f:
matches = re.findall(ip_pattern, line)
# Исключаем целевой IP
ips.extend([ip for ip in matches if ip != '192.168.100.50'])
return list(set(ips)) # Уникальные IP
<h2 id="ispolzovanie">Использование</h2>
attack_ips = extract_ips_from_logs('ddos_attack.log')
print(f"Найдено {len(attack_ips)} уникальных IP-адресов")
Сохранение списка IP
python
with open('attack_ips.txt', 'w') as f:
for ip in attack_ips:
f.write(f"{ip}\n")
Этап 2: Группировка IP по диапазонам
Определение CIDR диапазонов
python
import ipaddress
from collections import defaultdict
def group_ips_into_ranges(ip_list):
"""Группировка IP-адресов по диапазонам"""
# Преобразуем в IP объекты
ip_objects = []
for ip_str in ip_list:
try:
ip_objects.append(ipaddress.ip_address(ip_str))
except:
continue
# Сортируем
ip_objects.sort()
# Группируем по /24 подсетям
ranges = defaultdict(list)
for ip in ip_objects:
# Получаем сеть /24 для IP
network = ipaddress.ip_network(f"{ip}/24", strict=False)
ranges[str(network)].append(str(ip))
return dict(ranges)
<h2 id="ispolzovanie">Использование</h2>
ip_ranges = group_ips_into_ranges(attack_ips)
print(f"IP-адреса сгруппированы в {len(ip_ranges)} диапазонов /24")
<h2 id="sohranenie-diapazonov">Сохранение диапазонов</h2>
with open('ip_ranges.txt', 'w') as f:
for cidr, ips in ip_ranges.items():
f.write(f"{cidr}: {len(ips)} IP\n")
for ip in ips:
f.write(f" {ip}\n")
Этап 3: Анализ диапазонов через Shodan
Настройка Shodan API
python
import shodan
import json
import time
from datetime import datetime
class ShodanRangeAnalyzer:
def __init__(self, api_key):
self.api = shodan.Shodan(api_key)
self.results = []
def analyze_ip_range(self, cidr):
"""Анализ диапазона через Shodan"""
network = ipaddress.ip_network(cidr)
start_ip = str(network.network_address + 1)
end_ip = str(network.broadcast_address - 1)
query = f"ip:{start_ip}-{end_ip}"
try:
results = self.api.search(query)
return {
'cidr': cidr,
'total': results['total'],
'matches': results['matches']
}
except shodan.APIError as e:
print(f"Ошибка Shodan для {cidr}: {e}")
return None
def analyze_multiple_ranges(self, ranges_list):
"""Анализ множества диапазонов"""
all_results = []
for cidr in ranges_list:
print(f"Анализ {cidr}...")
result = self.analyze_ip_range(cidr)
if result:
all_results.append(result)
# Rate limiting: 1 запрос в секунду
time.sleep(1)
return all_results
<h2 id="ispolzovanie">Использование</h2>
SHODAN_API_KEY = "your_api_key_here"
analyzer = ShodanRangeAnalyzer(SHODAN_API_KEY)
<h2 id="analiz-vseh-diapazonov">Анализ всех диапазонов</h2>
shodan_results = analyzer.analyze_multiple_ranges(list(ip_ranges.keys()))
Извлечение информации об устройствах
python
def extract_device_info(shodan_results):
"""Извлечение информации об устройствах"""
devices = []
for range_result in shodan_results:
for match in range_result['matches']:
device_info = {
'ip': match.get('ip_str', ''),
'hostnames': match.get('hostnames', []),
'org': match.get('org', ''),
'isp': match.get('isp', ''),
'country': match.get('location', {}).get('country_name', ''),
'city': match.get('location', {}).get('city', ''),
'ports': [item['port'] for item in match.get('data', [])],
'services': []
}
# Извлечение сервисов
for item in match.get('data', []):
service = {
'port': item.get('port', ''),
'product': item.get('product', ''),
'version': item.get('version', ''),
'banner': item.get('data', '')[:200] # Первые 200 символов
}
device_info['services'].append(service)
devices.append(device_info)
return devices
devices = extract_device_info(shodan_results)
print(f"Найдено {len(devices)} устройств в Shodan")
Этап 4: Анализ через Censys
Настройка Censys API
python
from censys.search import CensysHosts
import ipaddress
class CensysRangeAnalyzer:
def __init__(self, api_id, api_secret):
self.censys = CensysHosts(api_id=api_id, api_secret=api_secret)
self.results = []
def analyze_range(self, cidr):
"""Анализ диапазона через Censys"""
query = f"ip:{cidr}"
try:
hosts = self.censys.search(query, per_page=100)
return list(hosts)
except Exception as e:
print(f"Ошибка Censys для {cidr}: {e}")
return []
def analyze_multiple_ranges(self, ranges_list):
"""Анализ множества диапазонов"""
all_hosts = []
for cidr in ranges_list:
print(f"Анализ {cidr} через Censys...")
hosts = self.analyze_range(cidr)
all_hosts.extend(hosts)
time.sleep(1) # Rate limiting
return all_hosts
<h2 id="ispolzovanie">Использование</h2>
CENSYS_API_ID = "your_api_id"
CENSYS_API_SECRET = "your_api_secret"
censys_analyzer = CensysRangeAnalyzer(CENSYS_API_ID, CENSYS_API_SECRET)
censys_hosts = censys_analyzer.analyze_multiple_ranges(list(ip_ranges.keys()))
print(f"Найдено {len(censys_hosts)} хостов в Censys")
Извлечение данных из Censys
python
def extract_censys_data(censys_hosts):
"""Извлечение данных из результатов Censys"""
extracted = []
for host in censys_hosts:
host_data = {
'ip': host.get('ip', ''),
'services': [],
'location': host.get('location', {}),
'autonomous_system': host.get('autonomous_system', {}),
'dns': host.get('dns', {})
}
# Извлечение сервисов
for service in host.get('services', []):
service_data = {
'port': service.get('port', ''),
'service_name': service.get('service_name', ''),
'transport_protocol': service.get('transport_protocol', ''),
'certificate': service.get('certificate', {})
}
host_data['services'].append(service_data)
extracted.append(host_data)
return extracted
censys_data = extract_censys_data(censys_hosts)
Этап 5: Выявление связанных ресурсов
Поиск общих организаций
python
from collections import Counter, defaultdict
def find_common_organizations(devices, censys_data):
"""Поиск IP, принадлежащих одним организациям"""
org_ips = defaultdict(list)
# Из Shodan данных
for device in devices:
org = device.get('org', 'Unknown')
if org and org != 'Unknown':
org_ips[org].append(device['ip'])
# Из Censys данных
for host in censys_data:
as_data = host.get('autonomous_system', {})
org = as_data.get('name', 'Unknown')
if org and org != 'Unknown':
org_ips[org].append(host['ip'])
# Фильтрация: только организации с несколькими IP
significant_orgs = {
org: ips for org, ips in org_ips.items()
if len(set(ips)) > 1
}
return significant_orgs
common_orgs = find_common_organizations(devices, censys_data)
print(f"Найдено {len(common_orgs)} организаций с несколькими IP")
<h2 id="sohranenie-rezultatov">Сохранение результатов</h2>
with open('common_organizations.json', 'w') as f:
json.dump(common_orgs, f, indent=2)
Поиск связанных доменов
python
def find_related_domains(devices, censys_data):
"""Поиск доменов, связанных с IP-адресами"""
ip_to_domains = defaultdict(set)
# Из Shodan
for device in devices:
ip = device['ip']
for hostname in device.get('hostnames', []):
ip_to_domains[ip].add(hostname)
# Из Censys
for host in censys_data:
ip = host['ip']
dns_data = host.get('dns', {})
# Reverse DNS
reverse_dns = dns_data.get('reverse_dns', {}).get('names', [])
for name in reverse_dns:
ip_to_domains[ip].add(name)
return dict(ip_to_domains)
related_domains = find_related_domains(devices, censys_data)
print(f"Найдено доменов для {len(related_domains)} IP-адресов")
<h2 id="sohranenie">Сохранение</h2>
with open('related_domains.json', 'w') as f:
# Преобразуем sets в lists для JSON
domains_dict = {ip: list(domains) for ip, domains in related_domains.items()}
json.dump(domains_dict, f, indent=2)
Географический анализ
python
def geographic_analysis(devices, censys_data):
"""Географический анализ IP-адресов"""
locations = []
# Из Shodan
for device in devices:
locations.append({
'ip': device['ip'],
'country': device.get('country', 'Unknown'),
'city': device.get('city', 'Unknown')
})
# Из Censys
for host in censys_data:
location = host.get('location', {})
locations.append({
'ip': host['ip'],
'country': location.get('country', 'Unknown'),
'city': location.get('city', 'Unknown')
})
# Статистика по странам
countries = Counter([loc['country'] for loc in locations])
return locations, countries
locations, country_stats = geographic_analysis(devices, censys_data)
print("\nРаспределение по странам:")
for country, count in country_stats.most_common(10):
print(f" {country}: {count} IP")
Этап 6: Выявление паттернов ботнета
Анализ временных паттернов
python
def analyze_temporal_patterns(log_file):
"""Анализ временных паттернов атаки"""
from datetime import datetime
import re
time_pattern = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2})'
ip_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
hourly_activity = defaultdict(lambda: defaultdict(int))
with open(log_file, 'r') as f:
for line in f:
time_match = re.search(time_pattern, line)
ip_matches = re.findall(ip_pattern, line)
if time_match and ip_matches:
time_str = time_match.group(1)
try:
dt = datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S')
hour = dt.hour
for ip in ip_matches:
if ip != '192.168.100.50':
hourly_activity[ip][hour] += 1
except:
continue
return hourly_activity
temporal_patterns = analyze_temporal_patterns('ddos_attack.log')
Выявление координации
python
def detect_coordination(temporal_patterns):
"""Выявление координации между IP-адресами"""
# IP с похожей активностью считаются скоординированными
coordinated_groups = []
ip_list = list(temporal_patterns.keys())
for i, ip1 in enumerate(ip_list):
group = [ip1]
pattern1 = temporal_patterns[ip1]
for ip2 in ip_list[i+1:]:
pattern2 = temporal_patterns[ip2]
# Сравнение паттернов активности
common_hours = set(pattern1.keys()) & set(pattern2.keys())
if len(common_hours) > 3: # Минимум 3 общих часа активности
similarity = sum(
min(pattern1[h], pattern2[h])
for h in common_hours
) / max(sum(pattern1.values()), sum(pattern2.values()), 1)
if similarity > 0.5: # Порог схожести
group.append(ip2)
if len(group) > 1:
coordinated_groups.append(group)
return coordinated_groups
coordinated = detect_coordination(temporal_patterns)
print(f"Выявлено {len(coordinated)} групп скоординированных IP")
Этап 7: Генерация отчета
Создание комплексного отчета
python
def generate_investigation_report(attack_ips, ip_ranges, devices, censys_data,
common_orgs, related_domains, locations,
country_stats, coordinated):
"""Генерация отчета о расследовании"""
report = {
'investigation_date': datetime.now().isoformat(),
'summary': {
'total_attack_ips': len(attack_ips),
'ip_ranges_identified': len(ip_ranges),
'devices_found_shodan': len(devices),
'hosts_found_censys': len(censys_data),
'organizations_identified': len(common_orgs),
'domains_identified': sum(len(d) for d in related_domains.values()),
'coordinated_groups': len(coordinated)
},
'ip_ranges': {
cidr: {
'ip_count': len(ips),
'ips': ips
} for cidr, ips in ip_ranges.items()
},
'organizations': {
org: {
'ip_count': len(set(ips)),
'ips': list(set(ips))
} for org, ips in common_orgs.items()
},
'geographic_distribution': dict(country_stats.most_common(20)),
'related_domains': {
ip: list(domains) for ip, domains in related_domains.items()
},
'coordinated_groups': [
{
'group_id': i+1,
'ip_count': len(group),
'ips': group
} for i, group in enumerate(coordinated)
],
'devices_details': devices[:100], # Первые 100 устройств
'recommendations': [
"Блокировать все выявленные IP-диапазоны на уровне файрвола",
"Передать информацию о координации в правоохранительные органы",
"Мониторить связанные домены на предмет будущей активности",
"Усилить защиту от DDoS на уровне инфраструктуры"
]
}
return report
<h2 id="generatsiya-otcheta">Генерация отчета</h2>
report = generate_investigation_report(
attack_ips, ip_ranges, devices, censys_data,
common_orgs, related_domains, locations, country_stats, coordinated
)
<h2 id="sohranenie">Сохранение</h2>
with open('ddos_investigation_report.json', 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print("\nОтчет сохранен: ddos_investigation_report.json")
Создание текстового отчета
python
def generate_text_report(report):
"""Генерация текстового отчета"""
text = f"""
ОТЧЕТ О РАССЛЕДОВАНИИ DDoS-АТАКИ
Дата расследования: {report['investigation_date']}
КРАТКАЯ СВОДКА
==============
Всего IP-адресов атаки: {report['summary']['total_attack_ips']}
Выявлено диапазонов: {report['summary']['ip_ranges_identified']}
Найдено устройств (Shodan): {report['summary']['devices_found_shodan']}
Найдено хостов (Censys): {report['summary']['hosts_found_censys']}
Выявлено организаций: {report['summary']['organizations_identified']}
Найдено доменов: {report['summary']['domains_identified']}
Групп координации: {report['summary']['coordinated_groups']}
ГЕОГРАФИЧЕСКОЕ РАСПРЕДЕЛЕНИЕ
=============================
"""
for country, count in list(report['geographic_distribution'].items())[:10]:
text += f"{country}: {count} IP\n"
text += f"""
ВЫЯВЛЕННЫЕ ОРГАНИЗАЦИИ
======================
"""
for org, data in list(report['organizations'].items())[:10]:
text += f"\n{org}: {data['ip_count']} IP-адресов\n"
for ip in data['ips'][:5]: # Первые 5 IP
text += f" - {ip}\n"
text += f"""
СВЯЗАННЫЕ ДОМЕНЫ
================
"""
domain_count = 0
for ip, domains in list(report['related_domains'].items())[:20]:
if domains:
text += f"\n{ip}:\n"
for domain in domains:
text += f" - {domain}\n"
domain_count += len(domains)
if domain_count > 50:
break
text += f"""
ГРУППЫ КООРДИНАЦИИ
==================
"""
for group in report['coordinated_groups'][:10]:
text += f"\nГруппа {group['group_id']}: {group['ip_count']} IP\n"
for ip in group['ips'][:10]:
text += f" - {ip}\n"
text += f"""
РЕКОМЕНДАЦИИ
============
"""
for i, rec in enumerate(report['recommendations'], 1):
text += f"{i}. {rec}\n"
return text
<h2 id="generatsiya-i-sohranenie-tekstovogo-otcheta">Генерация и сохранение текстового отчета</h2>
text_report = generate_text_report(report)
with open('ddos_investigation_report.txt', 'w', encoding='utf-8') as f:
f.write(text_report)
print("Текстовый отчет сохранен: ddos_investigation_report.txt")
Результаты расследования
Ключевые находки
1. 523 связанных IP-адреса выявлено в диапазонах
2. 47 связанных доменов обнаружено через обратный DNS и SSL сертификаты
3. 12 организаций с множественными IP-адресами
4. 8 групп координации выявлено через анализ временных паттернов
5. Географическое распределение: 65% IP из 3 стран
Инфраструктура ботнета
- Основные диапазоны: 8 подсетей /24
- Общие организации: 3 хостинг-провайдера
- Связанные домены: 47 доменов, многие с похожими именами
- Координация: синхронизированная активность в 8 группах
Используемые инструменты
- Shodan API - поиск устройств в интернете
- Censys API - комплексный анализ хостов
- Python скрипты - автоматизация анализа
- Maltego - визуализация связей (опционально)
Команды для запуска
bash
<h2 id="1-izvlechenie-ip-iz-logov">1. Извлечение IP из логов</h2>
python3 extract_ips.py ddos_attack.log
<h2 id="2-gruppirovka-po-diapazonam">2. Группировка по диапазонам</h2>
python3 group_ranges.py attack_ips.txt
<h2 id="3-analiz-cherez-shodan">3. Анализ через Shodan</h2>
python3 shodan_analysis.py ip_ranges.txt
<h2 id="4-analiz-cherez-censys">4. Анализ через Censys</h2>
python3 censys_analysis.py ip_ranges.txt
<h2 id="5-vyyavlenie-svyazey">5. Выявление связей</h2>
python3 find_connections.py
<h2 id="6-generatsiya-otcheta">6. Генерация отчета</h2>
python3 generate_report.py
Выводы
Анализ диапазонов IP позволил:
- Выявить полную инфраструктуру ботнета
- Установить связи между ресурсами
- Определить координацию атак
---
**⚠️ Дисклеймер:** Статья носит информационно-образовательный характер и не содержит инструкций для совершения противоправных действий.