La observabilidad como aliado, monitorizando y alertando Fortinet con Wazuh.
Vamos a crear reglas, para saber que pasa en nuestros equipos de borde.

Introduccion
Hace un tiempo escribí sobre Wazuh y recibí muchas consultas. Voy a avanzar un poco más y vamos a monitorear un equipo Fortigate para tener alarmas en tiempo real y actuar en consecuencia. La verdad es que este SIEM nos ofrece una gran variedad de opciones para configurar. En este caso en particular, vamos a crear dos tipos de reglas personalizadas, una para las IPs y otra para las conexiones VPN. Queremos que cuando se ejecute una regla, nos envíe un correo. Claro que puedes modificarlo para que envíe un mensaje a Slack o Teams. Y por qué no llevarlo a un N8N, algo en lo que estoy trabajando.
La idea es tener esta clase de alarmas. Dejo estos ejemplos, uno de IPs con un ataque de DoS y uno de VPN con un usuario fuera de horario laboral.


Prerequisitos
Necesitamos tener instalado un Wazuh Server. En mi caso, uso la versión 4.12. Debemos modificar el archivo ossec.conf para que reciba eventos UDP en el puerto 514. De esta manera, podemos dirigir nuestros dispositivos Fortinet a nuestro SIEM para enviar los Syslogs.
# La Ruta es /var/ossec/etc/ossec.conf
<!-- Logs de VPN -->
<remote>
<connection>syslog</connection>
<port>514</port>
<protocol>udp</protocol>
<allowed-ips>any</allowed-ips>
</remote>
Luego, como en cada cambio en el Wazuh Manager.
sudo systemctl restart wazuh-manager
Ya nos deberian llegar los Logs de Fortigate, si configuraste bien el equipo. Aca ten dejo un instructivo, para ello.
Configurar Decoders
Un decoder es el componente encargado de interpretar y extraer información de los logs en bruto que recibe Wazuh. Piensa en él como un traductor que convierte texto plano en datos estructurados que Wazuh puede analizar y sobre los cuales puede crear reglas de detección. Para que tengas una idea te dejo la lista de los que son mantenido por la comunidad.

En nuestro caso vamos a generar una lista propia, creandola aqui /var/ossec/etc/decoders/custom-fortigate-ips.xml
<!--
- Fortigate IPS Decoders Santiago Fernandez
-->
<decoder name="fortigate-ips">
<prematch>type=ips</prematch>
</decoder>
<decoder name="fortigate-ips-details">
<parent>fortigate-ips</parent>
<regex>severity=(\w+)</regex>
<order>severity</order>
</decoder>
<decoder name="fortigate-ips-details">
<parent>fortigate-ips</parent>
<regex>srcip=(\S+)</regex>
<order>srcip</order>
</decoder>
<decoder name="fortigate-ips-details">
<parent>fortigate-ips</parent>
<regex>dstip=(\S+)</regex>
<order>dstip</order>
</decoder>
<decoder name="fortigate-ips-details">
<parent>fortigate-ips</parent>
<regex>dstport=(\d+)</regex>
<order>dstport</order>
</decoder>
<decoder name="fortigate-ips-details">
<parent>fortigate-ips</parent>
<regex>attack="([^"]+)"</regex>
<order>attack</order>
</decoder>
<decoder name="fortigate-ips-details">
<parent>fortigate-ips</parent>
<regex>action=(\w+)</regex>
<order>action</order>
</decoder>
Configurando las Reglas
Una regla es el componente que analiza los datos extraídos por los decoders y determina si un evento es relevante, cuán crítico es, y qué acciones tomar. Las reglas transforman datos en alertas de seguridad. Una explicacion burda seria la siguiente: Se envia el log para que el decoder extraiga los campos, la regla evalua y clasifican para saber si debe o no generar la alerta.
Vamos a crear dos reglas, custom, para fortigate. Ambas deben estar en este path /var/ossec/etc/rules.
<!-- ----------------------- -->
<!-- custom-fortigate-ips.xml -->
<!-- ----------------------- -->
<group name="fortigate,ids,ips,utm,">
<!-- Base rule for Fortigate IPS -->
<rule id="196200" level="3">
<if_sid>81629</if_sid>
<description>Fortigate IPS event detected</description>
<group>fortigate,fortigate_ips</group>
</rule>
<!-- ========== CLASIFICACIÓN POR SEVERIDAD ========== -->
<!-- IPS - Critical Severity -->
<rule id="196201" level="13">
<if_sid>196200</if_sid>
<field name="severity">critical</field>
<description>FGT IPS: CRITICAL attack detected - $(attack) from $(srcip)</description>
<options>no_full_log</options>
<mitre>
<id>T1190</id>
</mitre>
<group>attack,critical,fortigate_ips</group>
</rule>
<!-- IPS - High Severity -->
<rule id="196202" level="10">
<if_sid>196200</if_sid>
<field name="severity">high</field>
<description>FGT IPS: HIGH severity attack - $(attack) from $(srcip)</description>
<options>no_full_log</options>
<mitre>
<id>T1190</id>
</mitre>
<group>attack,high,fortigate_ips</group>
</rule>
<!-- IPS - Medium Severity -->
<rule id="196203" level="7">
<if_sid>196200</if_sid>
<field name="severity">medium</field>
<description>FGT IPS: MEDIUM severity attack - $(attack) from $(srcip)</description>
<options>no_full_log</options>
<group>attack,medium,fortigate_ips</group>
</rule>
<!-- IPS - Low Severity pero repetido -->
<rule id="196204" level="5" frequency="10" timeframe="300">
<if_matched_sid>196200</if_matched_sid>
<field name="severity">low</field>
<same_source_ip />
<description>FGT IPS: Multiple LOW severity attacks from same IP (10+ in 5min)</description>
<group>attack,low,fortigate_ips_repeated</group>
</rule>
<!-- ========== CLASIFICACIÓN POR ACCIÓN ========== -->
<!-- Attack Blocked/Dropped (acción exitosa del IPS) -->
<rule id="196205" level="5">
<if_sid>196200</if_sid>
<match>action="dropped"</match>
<description>FGT IPS: Attack BLOCKED from $(srcip) - $(attack)</description>
<options>no_full_log</options>
<group>attack,blocked,fortigate_ips</group>
</rule>
<!-- Attack Detected but NOT Blocked (PELIGROSO) -->
<rule id="196206" level="9">
<if_sid>196200</if_sid>
<match>action="detected"</match>
<description>FGT IPS: Attack DETECTED but NOT blocked - $(attack) from $(srcip)</description>
<options>no_full_log</options>
<mitre>
<id>T1190</id>
</mitre>
<group>attack,detected,fortigate_ips_unblocked</group>
</rule>
<!-- Critical Attack NOT Blocked (MUY PELIGROSO) -->
<rule id="196207" level="14">
<if_sid>196200</if_sid>
<field name="severity">critical</field>
<match>action="detected"</match>
<description>FGT IPS: CRITICAL attack NOT BLOCKED - $(attack) from $(srcip) to $(dstip)</description>
<mitre>
<id>T1190</id>
</mitre>
<group>attack,critical,fortigate_ips_unblocked,emergency</group>
</rule>
<!-- ========== ATAQUES WEB ESPECÍFICOS ========== -->
<!-- SQL Injection -->
<rule id="196208" level="11">
<if_sid>196200</if_sid>
<match>SQL|sql_injection|SQLi</match>
<description>FGT IPS: SQL Injection attempt from $(srcip) to $(dstip):$(dstport)</description>
<mitre>
<id>T1190</id>
<id>T1505.003</id>
</mitre>
<group>attack,sql_injection,web,fortigate_ips</group>
</rule>
<!-- XSS (Cross-Site Scripting) -->
<rule id="196209" level="10">
<if_sid>196200</if_sid>
<match>XSS|Cross.Site|cross-site</match>
<description>FGT IPS: XSS attack attempt from $(srcip) to $(dstip)</description>
<mitre>
<id>T1189</id>
</mitre>
<group>attack,xss,web,fortigate_ips</group>
</rule>
<!-- Command Injection -->
<rule id="196210" level="11">
<if_sid>196200</if_sid>
<match>command.injection|Command.Execution</match>
<description>FGT IPS: Command Injection attempt from $(srcip)</description>
<mitre>
<id>T1059</id>
</mitre>
<group>attack,command_injection,fortigate_ips</group>
</rule>
<!-- Path Traversal / Directory Traversal -->
<rule id="196211" level="9">
<if_sid>196200</if_sid>
<match>Directory.Traversal|Path.Traversal</match>
<description>FGT IPS: Path Traversal attempt from $(srcip)</description>
<mitre>
<id>T1083</id>
</mitre>
<group>attack,path_traversal,web,fortigate_ips</group>
</rule>
<!-- ========== EXPLOITS Y MALWARE ========== -->
<!-- Buffer Overflow -->
<rule id="196212" level="12">
<if_sid>196200</if_sid>
<match>Buffer.Overflow|buffer-overflow</match>
<description>FGT IPS: Buffer Overflow exploit attempt from $(srcip)</description>
<mitre>
<id>T1203</id>
</mitre>
<group>attack,buffer_overflow,exploit,fortigate_ips</group>
</rule>
<!-- Remote Code Execution (RCE) -->
<rule id="196213" level="13">
<if_sid>196200</if_sid>
<match>Remote.Code|RCE|code.execution</match>
<description>FGT IPS: Remote Code Execution attempt from $(srcip)</description>
<mitre>
<id>T1203</id>
<id>T1059</id>
</mitre>
<group>attack,rce,exploit,fortigate_ips,critical</group>
</rule>
<!-- ========== RECONOCIMIENTO Y ESCANEOS ========== -->
<!-- Nmap Scan -->
<rule id="196214" level="7">
<if_sid>196200</if_sid>
<match>Nmap|nmap|Network.Mapper</match>
<description>FGT IPS: Nmap scan detected from $(srcip)</description>
<mitre>
<id>T1046</id>
</mitre>
<group>attack,reconnaissance,scan,fortigate_ips</group>
</rule>
<!-- Port Scan (múltiples intentos) -->
<rule id="196215" level="7" frequency="5" timeframe="60">
<if_matched_sid>196200</if_matched_sid>
<same_source_ip />
<match>Port.Scan|port.scanning</match>
<description>FGT IPS: Port scanning activity from $(srcip) (5+ in 1min)</description>
<mitre>
<id>T1046</id>
</mitre>
<group>attack,reconnaissance,port_scan,fortigate_ips</group>
</rule>
<!-- Vulnerability Scanner -->
<rule id="196216" level="8">
<if_sid>196200</if_sid>
<match>Vulnerability.Scanner|Nikto|OpenVAS|Nessus</match>
<description>FGT IPS: Vulnerability scanner detected from $(srcip)</description>
<mitre>
<id>T1595</id>
</mitre>
<group>attack,reconnaissance,vuln_scan,fortigate_ips</group>
</rule>
<!-- ========== ATAQUES DoS / DDoS ========== -->
<!-- DoS Attack -->
<rule id="196217" level="11">
<if_sid>196200</if_sid>
<match>DoS|Denial.of.Service</match>
<description>FGT IPS: DoS attack detected from $(srcip) to $(dstip)</description>
<mitre>
<id>T1498</id>
</mitre>
<group>attack,dos,fortigate_ips</group>
</rule>
<!-- DDoS Attack -->
<rule id="196218" level="13">
<if_sid>196200</if_sid>
<match>DDoS|Distributed.Denial</match>
<description>FGT IPS: DDoS attack detected targeting $(dstip)</description>
<mitre>
<id>T1498</id>
</mitre>
<group>attack,ddos,fortigate_ips,critical</group>
</rule>
<!-- SYN Flood -->
<rule id="196219" level="10">
<if_sid>196200</if_sid>
<match>SYN.Flood|syn-flood</match>
<description>FGT IPS: SYN Flood attack from $(srcip)</description>
<mitre>
<id>T1498.001</id>
</mitre>
<group>attack,dos,syn_flood,fortigate_ips</group>
</rule>
<!-- ========== MALWARE Y AMENAZAS AVANZADAS ========== -->
<!-- Malware Detection -->
<rule id="196220" level="12">
<if_sid>196200</if_sid>
<match>Malware|malicious|trojan|virus|worm</match>
<description>FGT IPS: Malware detected from $(srcip) - $(attack)</description>
<mitre>
<id>T1204</id>
</mitre>
<group>attack,malware,fortigate_ips,critical</group>
</rule>
<!-- Exploit Attempt -->
<rule id="196221" level="11">
<if_sid>196200</if_sid>
<match>Exploit|exploit-kit|CVE-</match>
<description>FGT IPS: Exploit attempt detected - $(attack) from $(srcip)</description>
<mitre>
<id>T1203</id>
</mitre>
<group>attack,exploit,fortigate_ips</group>
</rule>
<!-- Ransomware Activity -->
<rule id="196222" level="14">
<if_sid>196200</if_sid>
<match>Ransomware|ransom|crypto-locker</match>
<description>FGT IPS: Ransomware activity detected from $(srcip)</description>
<mitre>
<id>T1486</id>
</mitre>
<group>attack,ransomware,malware,fortigate_ips,emergency</group>
</rule>
<!-- ========== ATAQUES A SERVICIOS ESPECÍFICOS ========== -->
<!-- Apache/Nginx/Web Server Exploit -->
<rule id="196223" level="10">
<if_sid>196200</if_sid>
<match>Apache|Nginx|HTTP.Server</match>
<description>FGT IPS: Web server exploit attempt - $(attack)</description>
<mitre>
<id>T1190</id>
</mitre>
<group>attack,web,apache,fortigate_ips</group>
</rule>
<!-- WordPress Attack -->
<rule id="196224" level="9">
<if_sid>196200</if_sid>
<match>WordPress|wp-admin|wp-login</match>
<description>FGT IPS: WordPress attack from $(srcip)</description>
<mitre>
<id>T1190</id>
</mitre>
<group>attack,web,wordpress,fortigate_ips</group>
</rule>
<!-- PHP Exploit -->
<rule id="196225" level="10">
<if_sid>196200</if_sid>
<match>PHP|php-cgi</match>
<description>FGT IPS: PHP exploit attempt from $(srcip)</description>
<mitre>
<id>T1190</id>
</mitre>
<group>attack,web,php,fortigate_ips</group>
</rule>
<!-- ========== CORRELACIÓN DE ATAQUES ========== -->
<!-- Múltiples ataques desde misma IP (5+ en 2min) -->
<rule id="196226" level="12" frequency="5" timeframe="120">
<if_matched_sid>196200</if_matched_sid>
<same_source_ip />
<description>FGT IPS: Multiple attacks from same IP $(srcip) (5+ in 2min)</description>
<mitre>
<id>T1595</id>
</mitre>
<group>attack,correlated,fortigate_ips_repeated</group>
</rule>
<!-- Ataque a múltiples destinos desde una IP (spray attack) -->
<rule id="196227" level="11" frequency="10" timeframe="300">
<if_matched_sid>196200</if_matched_sid>
<same_source_ip />
<not_same_field>data.dstip</not_same_field>
<description>FGT IPS: Single IP attacking multiple targets (10+ targets in 5min)</description>
<mitre>
<id>T1595</id>
</mitre>
<group>attack,correlated,fortigate_ips_spray</group>
</rule>
<!-- Critical attacks en ráfaga (burst) -->
<rule id="196228" level="14" frequency="3" timeframe="60">
<if_matched_sid>196201</if_matched_sid>
<same_source_ip />
<description>FGT IPS: Multiple CRITICAL attacks in short time from $(srcip)</description>
<mitre>
<id>T1190</id>
</mitre>
<group>attack,critical,fortigate_ips_burst,emergency</group>
</rule>
<!-- ========== FILTROS GEOGRÁFICOS ========== -->
<!-- Ataque desde país restringido -->
<rule id="196229" level="10">
<if_sid>196200</if_sid>
<match>srccountry="China"|srccountry="Russia"|srccountry="Iran"|srccountry="North Korea"</match>
<description>FGT IPS: Attack from restricted country - $(attack)</description>
<mitre>
<id>T1190</id>
</mitre>
<group>attack,geo_restricted,fortigate_ips</group>
</rule>
<!-- ========== ATAQUES A PUERTOS CRÍTICOS ========== -->
<!-- Attack to RDP Port (3389) -->
<rule id="196230" level="10">
<if_sid>196200</if_sid>
<match>dstport=3389</match>
<description>FGT IPS: Attack targeting RDP port (3389) from $(srcip)</description>
<mitre>
<id>T1021.001</id>
</mitre>
<group>attack,rdp,fortigate_ips</group>
</rule>
<!-- Attack to SSH Port (22) -->
<rule id="196231" level="9">
<if_sid>196200</if_sid>
<match>dstport=22</match>
<description>FGT IPS: Attack targeting SSH port (22) from $(srcip)</description>
<mitre>
<id>T1021.004</id>
</mitre>
<group>attack,ssh,fortigate_ips</group>
</rule>
</group>
Y el de VPN.
<group name="fortigate,custom,bruteforce,">
<!-- ========== REGLAS BÁSICAS FUNCIONALES ========== -->
<!-- Admin Brute Force -->
<rule id="196100" level="12" frequency="6" timeframe="300" ignore="300">
<if_matched_sid>81606</if_matched_sid>
<same_source_ip />
<description>FGT: ADMIN bruteforce from same IP (6+ attempts in 5min)</description>
<mitre>
<id>T1110.001</id>
</mitre>
<group>fortigate_admin_bruteforce,critical</group>
</rule>
<!-- VPN SSL Brute Force -->
<rule id="196101" level="12" frequency="10" timeframe="180" ignore="300">
<if_matched_sid>81614</if_matched_sid>
<same_source_ip />
<description>FGT: SSL VPN bruteforce from same IP (10+ attempts in 3min)</description>
<mitre>
<id>T1110.001</id>
<id>T1133</id>
</mitre>
<group>fortigate_vpn_bruteforce,critical</group>
</rule>
<!-- ========== VPN FUERA DE HORARIO ========== -->
<!-- Regla base: VPN fuera de horario (TODOS los usuarios, nivel bajo, sin email) -->
<rule id="196102" level="5" ignore="60">
<if_sid>81622</if_sid>
<time>7:00 pm - 6:00 am</time>
<weekday>monday,tuesday,wednesday,thursday,friday</weekday>
<description>FGT: VPN connection outside business hours (19:00-06:00) - $(user)</description>
<mitre>
<id>T1078</id>
<id>T1133</id>
</mitre>
<group>fortigate_vpn_after_hours</group>
</rule>
<!-- Regla específica: VPN fuera de horario de USUARIOS MONITOREADOS (nivel alto, CON email) -->
<rule id="196104" level="8" ignore="60">
<if_sid>196102</if_sid>
<list field="user" lookup="match_key">etc/lists/vpn-monitored-users</list>
<description>FGT VPN: Usuario monitoreado $(user) conectado fuera de horario laboral</description>
<mitre>
<id>T1078</id>
<id>T1133</id>
</mitre>
<group>fortigate_vpn_suspicious,after_hours,monitored_user</group>
</rule>
<!-- ========== VPN FIN DE SEMANA ========== -->
<!-- Regla base: VPN fin de semana (TODOS los usuarios, nivel bajo, sin email) -->
<rule id="196103" level="5" ignore="60">
<if_sid>81622</if_sid>
<weekday>saturday,sunday</weekday>
<description>FGT: VPN connection during weekend - $(user)</description>
<mitre>
<id>T1078</id>
<id>T1133</id>
</mitre>
<group>fortigate_vpn_weekend</group>
</rule>
<!-- Regla específica: VPN fin de semana de USUARIOS MONITOREADOS (nivel alto, CON email) -->
<rule id="196105" level="7" ignore="60">
<if_sid>196103</if_sid>
<list field="user" lookup="match_key">etc/lists/vpn-monitored-users</list>
<description>FGT VPN: Usuario monitoreado $(user) conectado en fin de semana</description>
<mitre>
<id>T1078</id>
<id>T1133</id>
</mitre>
<group>fortigate_vpn_weekend,monitored_user</group>
</rule>
<!-- Múltiples IPs diferentes para mismo usuario -->
<rule id="196104" level="11" frequency="3" timeframe="3600" ignore="300">
<if_matched_sid>81622</if_matched_sid>
<same_user />
<not_same_source_ip />
<description>FGT: VPN user connecting from multiple IPs (possible account compromise)</description>
<mitre>
<id>T1078</id>
<id>T1090</id>
</mitre>
<group>fortigate_vpn_multiple_ips,critical</group>
</rule>
<!-- VPN desde país restringido -->
<rule id="196112" level="11">
<if_sid>81622</if_sid>
<match>srccountry="China"|srccountry="Russia"|srccountry="Iran"|srccountry="North Korea"</match>
<description>FGT: VPN connection from restricted country (BLOCKED)</description>
<mitre>
<id>T1133</id>
</mitre>
<group>fortigate_geo_blocked,critical</group>
</rule>
<!-- Usuario admin fuera de horario -->
<rule id="196113" level="12">
<if_sid>81605</if_sid>
<user>^admin$</user>
<time>6:00 pm - 8:00 am</time>
<description>FGT: Admin account login outside business hours</description>
<mitre>
<id>T1078.003</id>
</mitre>
<group>fortigate_admin_after_hours,critical</group>
</rule>
</group>
Perfecto, ya tenemos las listas. Como les dije antes, vamos a reinciar Wazuh Manager para saber si tenemos problemas de sintaxis.
sudo systemctl restart wazuh-manager
Tambien podes usar el siguiente comando.
sudo apt-get install libxml2-utils
xmllint --noout /var/ossec/etc/ossec.conf
Integracion con Email
Para que nos lleguen esos bellos correos vamos a crear un script en Python que sera referenciado en el ossec.conf.
{
"smtp_host": "HOST",
"smtp_port": 587,
"use_tls": true,
"username": "wazuh",
"password": "TUPASSWORD",
"from": "Wazuh SIEM <wazuh@TUDOMINIO>",
"to": [
"alertas_wazuh@TUDOMINIO",
"sfernandez@TUDOMINIO"
],
"subject_prefix": "[Wazuh SIEM]",
"insecure_skip_verify": true,
"geoip_db": "/usr/share/GeoIP/dbip-city-lite.mmdb" # Geo
}
Hay que instalar la GeoLocalizacion si desean. Si no eliminarlo.
Le damos los permisos.
sudo chown root:wazuh /var/ossec/etc/email-config.json
sudo chmod 640 /var/ossec/etc/email-config.json
Ahora vamos a crear el Python que hara la magia en la ruta /var/ossec/integrations. Yo lo voy a llamar custom-email.
#!/usr/bin/env python3
"""
Wazuh Unified Email Notifier v4.0
- Maneja Active Directory (reglas 100080-100085 y 60107-60113)
- Maneja FortiGate VPN/Admin (reglas 196100-196113)
- Maneja FortiGate IPS (reglas 196200-196231)
- Template HTML unificado y profesional
- GeoIP integrado
- Logging mejorado
- Configuración centralizada
"""
import sys, json, smtplib, ssl, pathlib, html, logging, re, os
from datetime import datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.header import Header
from typing import Dict, Any, Tuple, Optional
# GeoIP opcional
try:
import geoip2.database
HAS_GEOIP = True
except ImportError:
HAS_GEOIP = False
class UnifiedEmailNotifier:
def __init__(self, config_path: str):
self.config = self._load_config(config_path)
self._setup_logging()
# Grupos privilegiados de AD
self.privileged_groups = {
"Domain Admins", "Enterprise Admins", "Administrators", "Schema Admins",
"Account Operators", "Server Operators", "Backup Operators", "Print Operators",
"DnsAdmins", "Domain Controllers", "Enterprise Key Admins", "Key Admins",
"Cert Publishers", "Group Policy Creator Owners", "RAS and IAS Servers"
}
# Países de alto riesgo (ajustar según política organizacional)
self.high_risk_countries = {
'China', 'Russia', 'North Korea', 'Iran', 'Vietnam', 'Nigeria',
'Romania', 'Ukraine', 'Belarus', 'Myanmar', 'Pakistan'
}
def _setup_logging(self):
"""Configurar logging con rotación"""
log_file = self.config.get("log_file", "/var/ossec/logs/integrations.log")
handlers = [logging.StreamHandler(sys.stderr)]
if log_file and os.path.dirname(log_file):
try:
os.makedirs(os.path.dirname(log_file), exist_ok=True)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
))
handlers.append(file_handler)
except Exception:
pass
logging.basicConfig(
level=getattr(logging, self.config.get("log_level", "INFO").upper()),
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=handlers
)
self.logger = logging.getLogger("unified-email-notifier")
def _load_config(self, config_path: str) -> Dict[str, Any]:
"""Cargar configuración con validación"""
try:
cfg = json.loads(pathlib.Path(config_path).read_text(encoding="utf-8"))
required_fields = ["smtp_host", "to"]
for field in required_fields:
if field not in cfg:
raise ValueError(f"Campo requerido '{field}' faltante en configuración")
cfg.setdefault("smtp_port", 587)
cfg.setdefault("use_tls", True)
cfg.setdefault("from", "wazuh@localhost")
cfg.setdefault("subject_prefix", "[Wazuh SIEM]")
cfg.setdefault("timeout", 30)
cfg.setdefault("log_level", "INFO")
return cfg
except Exception as e:
raise RuntimeError(f"Error cargando configuración {config_path}: {e}")
def _load_alert(self, file_path: str) -> Dict[str, Any]:
"""Cargar alerta de Wazuh con manejo robusto de errores"""
try:
if not pathlib.Path(file_path).exists():
raise FileNotFoundError(f"Archivo de alerta no encontrado: {file_path}")
raw = pathlib.Path(file_path).read_text(encoding="utf-8", errors="ignore").strip()
if not raw:
raise ValueError("Archivo de alerta vacío")
candidates = []
if raw.startswith("{"):
candidates.append(raw)
else:
for line in raw.splitlines():
line = line.strip()
if line.startswith("{") and line.endswith("}"):
candidates.append(line)
for candidate in reversed(candidates):
try:
return json.loads(candidate)
except json.JSONDecodeError:
continue
raise ValueError("No se encontró JSON válido en el archivo de alerta")
except Exception as e:
raise RuntimeError(f"Error cargando alerta {file_path}: {e}")
def _get_field(self, data: Dict[str, Any], *paths: str, default: str = "-") -> str:
"""Buscar campo en múltiples rutas con tolerancia a casos"""
for path in paths:
current = data
found = True
for key in path.split("."):
if not isinstance(current, dict):
found = False
break
possible_keys = [key, key.lower(), key.capitalize(), key.upper()]
matched_key = None
for possible_key in possible_keys:
if possible_key in current:
matched_key = possible_key
break
if matched_key is None:
found = False
break
current = current[matched_key]
if found and current is not None:
return str(current).strip()
return default
def _format_timestamp(self, ts: str) -> str:
"""Formatear timestamp de manera consistente"""
if not ts or ts == "-":
return "-"
try:
if "T" in ts:
if ts.endswith("Z"):
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
else:
dt = datetime.fromisoformat(ts)
return dt.strftime("%d/%m/%Y %H:%M:%S")
else:
for fmt in ["%Y-%m-%d %H:%M:%S", "%d/%m/%Y %H:%M:%S"]:
try:
dt = datetime.strptime(ts, fmt)
return dt.strftime("%d/%m/%Y %H:%M:%S")
except ValueError:
continue
except Exception:
pass
return ts
def _create_badge(self, text: str, style: str = "default") -> str:
"""Crear badges HTML con estilos predefinidos"""
styles = {
"default": {"bg": "#f1f5f9", "fg": "#334155", "border": "#e2e8f0"},
"success": {"bg": "#dcfce7", "fg": "#166534", "border": "#bbf7d0"},
"warning": {"bg": "#fef3c7", "fg": "#92400e", "border": "#fde68a"},
"danger": {"bg": "#fee2e2", "fg": "#7f1d1d", "border": "#fecaca"},
"info": {"bg": "#dbeafe", "fg": "#1e40af", "border": "#93c5fd"},
"critical": {"bg": "#7f1d1d", "fg": "#ffffff", "border": "#991b1b"},
"geo": {"bg": "#ecfeff", "fg": "#0c4a6e", "border": "#7dd3fc"},
"geo-risk": {"bg": "#fee2e2", "fg": "#7f1d1d", "border": "#fca5a5"}
}
style_config = styles.get(style, styles["default"])
return (f"<span style='display:inline-block;padding:4px 12px;border-radius:16px;"
f"background:{style_config['bg']};color:{style_config['fg']};"
f"border:1px solid {style_config['border']};"
f"font:bold 11px/14px Segoe UI,Roboto,Arial,sans-serif;"
f"text-transform:uppercase;letter-spacing:0.5px;white-space:nowrap;'>"
f"{html.escape(str(text))}</span>")
def _get_country_flag(self, iso_code: str) -> str:
"""Convertir código ISO a emoji de bandera"""
if not iso_code or len(iso_code) != 2:
return ""
try:
return "".join(chr(0x1F1E6 + ord(char) - 65) for char in iso_code.upper())
except Exception:
return ""
def _get_geoip_info(self, ip: str) -> Dict[str, Any]:
"""Obtener información GeoIP con múltiples bases de datos"""
if not ip or not HAS_GEOIP:
return {}
db_paths = [
self.config.get("geoip_db", ""),
"/var/ossec/etc/GeoIP/GeoLite2-City.mmdb",
"/var/ossec/etc/GeoLite2-City.mmdb",
"/usr/share/GeoIP/GeoLite2-City.mmdb",
"/usr/share/GeoIP/dbip-city-lite.mmdb",
"/opt/geoip/GeoLite2-City.mmdb"
]
for db_path in db_paths:
if not db_path or not pathlib.Path(db_path).exists():
continue
try:
with geoip2.database.Reader(db_path) as reader:
response = reader.city(ip)
return {
"country": response.country.name or "",
"iso": response.country.iso_code or "",
"city": response.city.name or "",
"latitude": response.location.latitude,
"longitude": response.location.longitude
}
except Exception as e:
self.logger.debug(f"Error con base GeoIP {db_path}: {e}")
continue
return {}
def _detect_alert_type(self, alert: Dict[str, Any]) -> str:
"""Detectar tipo de alerta basándose en rule_id y contenido"""
rule_id = self._get_field(alert, "rule.id")
# Reglas de Active Directory
ad_rules = {"100080", "100081", "100082", "100083", "100084", "100085",
"60107", "60108", "60109", "60113"}
# Reglas de FortiGate VPN/Admin
fgt_vpn_rules = {"196100", "196101", "196102", "196103", "196104", "196105",
"196106", "196112", "196113"}
# Reglas de FortiGate IPS (196200-196231)
fgt_ips_rules = set(str(x) for x in range(196200, 196232))
if rule_id in ad_rules:
return "active_directory"
elif rule_id in fgt_vpn_rules:
return "fortigate_vpn"
elif rule_id in fgt_ips_rules:
return "fortigate_ips"
else:
# Detectar por contenido si no es por rule_id
rule_desc = self._get_field(alert, "rule.description", default="").lower()
if any(term in rule_desc for term in ["active directory", "windows", "user account", "group"]):
return "active_directory"
elif any(term in rule_desc for term in ["vpn", "admin", "brute"]):
return "fortigate_vpn"
elif any(term in rule_desc for term in ["ips", "attack", "exploit", "malware"]):
return "fortigate_ips"
else:
return "generic"
def _extract_common_data(self, alert: Dict[str, Any]) -> Dict[str, Any]:
"""Extraer datos comunes a todos los tipos de alerta"""
data = {
"rule_id": self._get_field(alert, "rule.id"),
"rule_level": self._get_field(alert, "rule.level"),
"rule_desc": self._get_field(alert, "rule.description"),
"rule_groups": self._get_field(alert, "rule.groups"),
"fired_times": self._get_field(alert, "rule.firedtimes", default="1"),
"agent_name": self._get_field(alert, "agent.name"),
"agent_id": self._get_field(alert, "agent.id"),
"manager_name": self._get_field(alert, "manager.name"),
"timestamp": self._format_timestamp(self._get_field(alert, "timestamp", "data.eventtime")),
"location": self._get_field(alert, "location"),
"decoder": self._get_field(alert, "decoder.name")
}
return data
def _extract_ad_data(self, alert: Dict[str, Any]) -> Dict[str, Any]:
"""Extraer datos específicos de Active Directory"""
data = self._extract_common_data(alert)
data["subject_user"] = self._get_field(alert, "data.win.eventdata.subjectUserName")
data["subject_domain"] = self._get_field(alert, "data.win.eventdata.subjectDomainName")
data["target_user"] = self._get_field(alert, "data.win.eventdata.targetUserName")
data["sam_account"] = self._get_field(alert, "data.win.eventdata.samAccountName")
data["user_principal"] = self._get_field(alert, "data.win.eventdata.userPrincipalName")
data["display_name"] = self._get_field(alert, "data.win.eventdata.displayName")
data["group_name"] = self._get_field(alert, "data.win.eventdata.targetUserName")
data["member_name"] = self._get_field(alert, "data.win.eventdata.memberName")
data["computer"] = self._get_field(alert, "data.win.system.computer")
data["event_id"] = self._get_field(alert, "data.win.system.eventID")
group_name = data.get("group_name", "")
data["is_privileged"] = group_name in self.privileged_groups
data["is_anonymous"] = data.get("subject_user", "") == "ANONYMOUS LOGON"
event_id = data["event_id"]
if event_id == "4720":
data["title"] = "Usuario Creado en Active Directory"
data["color"] = "#059669"
data["priority"] = "MEDIO"
data["pivot"] = data["target_user"]
elif event_id == "4726":
data["title"] = "Usuario Eliminado de Active Directory"
data["color"] = "#dc2626"
data["priority"] = "ALTO"
data["pivot"] = data["target_user"]
elif event_id in ("4732", "4728"):
action = "Alta en Grupo"
group_type = "LOCAL" if event_id == "4732" else "GLOBAL"
data["title"] = f"{action} {group_type}"
data["color"] = "#0284c7" if event_id == "4732" else "#ea580c"
data["priority"] = "CRÍTICO" if data["is_privileged"] else "MEDIO"
data["pivot"] = f"{data['member_name']} → {data['group_name']}"
elif event_id in ("4733", "4729"):
action = "Baja de Grupo"
group_type = "LOCAL" if event_id == "4733" else "GLOBAL"
data["title"] = f"{action} {group_type}"
data["color"] = "#6b7280" if event_id == "4733" else "#9ca3af"
data["priority"] = "MEDIO" if data["is_privileged"] else "BAJO"
data["pivot"] = f"{data['member_name']} ← {data['group_name']}"
else:
data["title"] = f"Evento Active Directory {event_id}"
data["color"] = "#6366f1"
data["priority"] = "MEDIO"
data["pivot"] = data.get("target_user", "N/A")
return data
def _extract_fortigate_vpn_data(self, alert: Dict[str, Any]) -> Dict[str, Any]:
"""Extraer datos específicos de FortiGate VPN/Admin"""
data = self._extract_common_data(alert)
data["dst_user"] = self._get_field(alert, "data.dstuser", "data.user", "data.srcuser")
data["status"] = self._get_field(alert, "data.status")
data["reason"] = self._get_field(alert, "data.reason")
data["log_desc"] = self._get_field(alert, "data.logdesc")
data["ui"] = self._get_field(alert, "data.ui")
data["method"] = self._get_field(alert, "data.method")
data["action"] = self._get_field(alert, "data.action")
data["dev_name"] = self._get_field(alert, "data.devname")
data["dev_id"] = self._get_field(alert, "data.devid")
src_ip = self._get_field(alert, "data.srcip", "data.remip")
if not src_ip or src_ip == "-":
ui_text = data.get("ui", "")
ip_match = re.search(r'(\d{1,3}(?:\.\d{1,3}){3})', ui_text)
src_ip = ip_match.group(1) if ip_match else "-"
data["src_ip"] = src_ip
geo_info = self._get_geoip_info(src_ip)
data["country"] = geo_info.get("country", "")
data["iso"] = geo_info.get("iso", "")
data["city"] = geo_info.get("city", "")
data["is_high_risk"] = data["country"] in self.high_risk_countries
method = data.get("method", "").lower()
ui = data.get("ui", "").lower()
action = data.get("action", "").lower()
if "ssh" in method or "ssh(" in ui:
data["vector"] = "SSH"
elif "https" in method or "http(" in ui or "https(" in ui:
data["vector"] = "Web (HTTPS)"
elif "tunnel" in action or "vpn" in action:
data["vector"] = "VPN"
else:
data["vector"] = "Desconocido"
rule_id = data["rule_id"]
if rule_id == "196100":
data["title"] = "FortiGate ADMIN Brute-Force Attack"
data["color"] = "#dc2626"
data["priority"] = "CRÍTICO"
data["pivot"] = data["src_ip"]
data["recommendation"] = "Bloquear IP inmediatamente. Revisar logs de intentos previos."
elif rule_id == "196101":
data["title"] = "FortiGate VPN SSL Brute-Force Attack"
data["color"] = "#b91c1c"
data["priority"] = "CRÍTICO"
data["pivot"] = f"{data['dst_user']} from {data['src_ip']}"
data["recommendation"] = "Bloquear IP y verificar estado de la cuenta VPN."
elif rule_id == "196102":
data["title"] = "VPN - Conexión Fuera de Horario Laboral"
data["color"] = "#ea580c"
data["priority"] = "MEDIO-ALTO"
data["pivot"] = f"{data['dst_user']} @ {data['timestamp']}"
data["recommendation"] = "Verificar si el acceso es autorizado con el usuario."
elif rule_id == "196103":
data["title"] = "VPN - Conexión Fin de Semana"
data["color"] = "#f59e0b"
data["priority"] = "MEDIO"
data["pivot"] = f"{data['dst_user']} @ {data['timestamp']}"
data["recommendation"] = "Verificar si el acceso de fin de semana es necesario."
elif rule_id == "196104":
data["title"] = "POSIBLE COMPROMISO DE CUENTA VPN"
data["color"] = "#7f1d1d"
data["priority"] = "EMERGENCIA"
data["pivot"] = f"Usuario {data['dst_user']} - Múltiples países"
data["recommendation"] = "DESHABILITAR CUENTA INMEDIATAMENTE. Contactar usuario."
elif rule_id == "196106":
data["title"] = "Conexión desde TOR/Proxy Anónimo"
data["color"] = "#b91c1c"
data["priority"] = "CRÍTICO"
data["pivot"] = f"IP TOR: {data['src_ip']}"
data["recommendation"] = "Bloquear todas las conexiones desde nodos TOR."
else:
level = int(data.get("rule_level", "0"))
if level >= 12:
data["title"] = "FortiGate Critical Security Alert"
data["color"] = "#dc2626"
data["priority"] = "CRÍTICO"
elif level >= 10:
data["title"] = "FortiGate High Security Alert"
data["color"] = "#ea580c"
data["priority"] = "ALTO"
else:
data["title"] = "FortiGate Security Event"
data["color"] = "#f59e0b"
data["priority"] = "MEDIO"
data["pivot"] = f"{data.get('dst_user', 'N/A')} @ {data['src_ip']}"
data["recommendation"] = "Revisar evento para determinar acciones necesarias."
return data
def _extract_fortigate_ips_data(self, alert: Dict[str, Any]) -> Dict[str, Any]:
"""Extraer datos específicos de IPS de FortiGate"""
data = self._extract_common_data(alert)
# Campos específicos de IPS
data["src_ip"] = self._get_field(alert, "data.srcip")
data["dst_ip"] = self._get_field(alert, "data.dstip")
data["dst_port"] = self._get_field(alert, "data.dstport")
data["attack"] = self._get_field(alert, "data.attack", "data.attack_name", "data.signature")
data["severity"] = self._get_field(alert, "data.severity")
data["action"] = self._get_field(alert, "data.action")
data["protocol"] = self._get_field(alert, "data.proto", "data.protocol")
data["dev_name"] = self._get_field(alert, "data.devname")
data["dev_id"] = self._get_field(alert, "data.devid")
# GeoIP para IP origen
geo_info = self._get_geoip_info(data["src_ip"])
data["country"] = geo_info.get("country", "")
data["iso"] = geo_info.get("iso", "")
data["city"] = geo_info.get("city", "")
data["is_high_risk"] = data["country"] in self.high_risk_countries
# Determinar criticidad y tipo
severity = data.get("severity", "").lower()
attack_name = data.get("attack", "").lower()
action = data.get("action", "").lower()
# Configurar por severidad
if severity == "critical":
data["priority"] = "CRÍTICO"
data["color"] = "#dc2626"
data["title"] = f"IPS CRÍTICO: {data['attack']}"
elif severity == "high":
data["priority"] = "ALTO"
data["color"] = "#ea580c"
data["title"] = f"IPS ALTO: {data['attack']}"
elif severity == "medium":
data["priority"] = "MEDIO"
data["color"] = "#f59e0b"
data["title"] = f"IPS MEDIO: {data['attack']}"
else:
data["priority"] = "BAJO"
data["color"] = "#3b82f6"
data["title"] = f"IPS: {data['attack']}"
# Ajustar por acción
if action == "detected" and severity in ["critical", "high"]:
data["priority"] = "EMERGENCIA"
data["color"] = "#7f1d1d"
data["title"] = "ATAQUE CRÍTICO NO BLOQUEADO"
data["recommendation"] = "BLOQUEAR IP INMEDIATAMENTE - Ataque no fue bloqueado por IPS"
# Detectar tipo de ataque específico
if "sql" in attack_name or "injection" in attack_name:
data["attack_type"] = "SQL Injection"
data["recommendation"] = "Revisar WAF y logs de aplicación web. Verificar consultas SQL."
elif "xss" in attack_name or "cross-site" in attack_name:
data["attack_type"] = "Cross-Site Scripting"
data["recommendation"] = "Validar entrada de usuarios. Revisar sanitización de datos."
elif "command" in attack_name and "injection" in attack_name:
data["attack_type"] = "Command Injection"
data["recommendation"] = "Revisar ejecución de comandos en aplicación. Deshabilitar funciones peligrosas."
elif "dos" in attack_name or "ddos" in attack_name or "flood" in attack_name:
data["attack_type"] = "Denial of Service"
data["recommendation"] = "Activar mitigación DDoS. Considerar rate limiting."
elif "malware" in attack_name or "trojan" in attack_name or "virus" in attack_name:
data["attack_type"] = "Malware"
data["recommendation"] = "Escanear sistemas afectados. Aislar hosts comprometidos."
elif "ransomware" in attack_name:
data["attack_type"] = "Ransomware"
data["recommendation"] = "AISLAR RED INMEDIATAMENTE. Verificar backups. Contactar equipo de respuesta."
data["priority"] = "EMERGENCIA"
data["color"] = "#7f1d1d"
elif "exploit" in attack_name or "cve-" in attack_name:
data["attack_type"] = "Exploit Attempt"
data["recommendation"] = "Verificar parches de seguridad. Revisar CVE mencionado."
elif "buffer" in attack_name and "overflow" in attack_name:
data["attack_type"] = "Buffer Overflow"
data["recommendation"] = "Actualizar software vulnerable. Verificar parches disponibles."
elif "scan" in attack_name or "nmap" in attack_name:
data["attack_type"] = "Network Scan"
data["recommendation"] = "Monitorear IP origen para detectar fase de explotación."
elif "wordpress" in attack_name or "wp-" in attack_name:
data["attack_type"] = "WordPress Attack"
data["recommendation"] = "Actualizar WordPress y plugins. Revisar permisos de archivos."
else:
data["attack_type"] = "IPS Detection"
data["recommendation"] = "Revisar logs del firewall para más detalles."
data["pivot"] = f"{data['src_ip']} → {data['dst_ip']}:{data['dst_port']} • {data['attack']}"
return data
def _build_ad_info_rows(self, data: Dict[str, Any]) -> str:
"""Construir filas de información para Active Directory"""
esc = lambda x: html.escape(str(x or "-"))
rows = []
event_id = data.get("event_id", "")
if event_id in ("4720", "4726"):
rows.extend([
f"<tr><td class='label'>Usuario:</td><td class='value'><code>{esc(data.get('target_user', ''))}</code></td></tr>",
f"<tr><td class='label'>SAM Account:</td><td class='value'><code>{esc(data.get('sam_account', ''))}</code></td></tr>",
f"<tr><td class='label'>UPN:</td><td class='value'><code>{esc(data.get('user_principal', ''))}</code></td></tr>",
f"<tr><td class='label'>Nombre:</td><td class='value'>{esc(data.get('display_name', ''))}</td></tr>"
])
elif event_id in ("4732", "4733", "4728", "4729"):
privilege_badge = self._create_badge("GRUPO PRIVILEGIADO", "danger") if data.get("is_privileged") else ""
rows.extend([
f"<tr><td class='label'>Miembro:</td><td class='value'><code>{esc(data.get('member_name', ''))}</code></td></tr>",
f"<tr><td class='label'>Grupo:</td><td class='value'><code>{esc(data.get('group_name', ''))}</code> {privilege_badge}</td></tr>"
])
anon_badge = self._create_badge("ANONYMOUS LOGON", "warning") if data.get("is_anonymous") else ""
agent_name = data.get('agent_name', '')
agent_id = data.get('agent_id', '')
agent_badge = self._create_badge(f"{agent_name} (ID {agent_id})", 'default')
computer_badge = self._create_badge(data.get('computer', ''), 'info')
rows.extend([
f"<tr><td class='label'>Ejecutado por:</td><td class='value'><code>{esc(data.get('subject_user', ''))}</code> [{esc(data.get('subject_domain', ''))}] {anon_badge}</td></tr>",
f"<tr><td class='label'>Equipo/DC:</td><td class='value'>{computer_badge}</td></tr>",
f"<tr><td class='label'>Agente Wazuh:</td><td class='value'>{agent_badge}</td></tr>",
f"<tr><td class='label'>Timestamp:</td><td class='value'><code>{esc(data.get('timestamp', ''))}</code></td></tr>",
f"<tr><td class='label'>Intentos:</td><td class='value'><strong>{esc(data.get('fired_times', '1'))}</strong></td></tr>"
])
return "\n".join(rows)
def _build_fgt_vpn_info_rows(self, data: Dict[str, Any]) -> str:
"""Construir filas de información para FortiGate VPN/Admin"""
esc = lambda x: html.escape(str(x or "-"))
geo_badge = ""
if data.get("country"):
flag = self._get_country_flag(data.get("iso", ""))
geo_text = f"{flag} {data['country']}"
if data.get("city"):
geo_text += f" • {data['city']}"
geo_style = "geo-risk" if data.get("is_high_risk") else "geo"
geo_badge = self._create_badge(geo_text, geo_style)
vector_badge = self._create_badge(data.get('vector', 'Desconocido'), 'info')
dev_name = data.get('dev_name', '')
dev_id = data.get('dev_id', '')
device_badge = self._create_badge(f"{dev_name} / {dev_id}", 'default')
agent_name = data.get('agent_name', '')
agent_id = data.get('agent_id', '')
agent_badge = self._create_badge(f"{agent_name} (ID {agent_id})", 'warning')
status_badge = self._create_badge(data.get('status', ''), 'default')
rows = [
f"<tr><td class='label'>IP Origen:</td><td class='value'><code>{esc(data.get('src_ip', ''))}</code> {geo_badge}</td></tr>",
f"<tr><td class='label'>Usuario:</td><td class='value'><code>{esc(data.get('dst_user', ''))}</code></td></tr>",
f"<tr><td class='label'>Vector:</td><td class='value'>{vector_badge}</td></tr>",
f"<tr><td class='label'>Dispositivo:</td><td class='value'>{device_badge}</td></tr>",
f"<tr><td class='label'>Agente Wazuh:</td><td class='value'>{agent_badge}</td></tr>",
f"<tr><td class='label'>Estado:</td><td class='value'>{status_badge} • {esc(data.get('reason', ''))}</td></tr>",
f"<tr><td class='label'>Descripción:</td><td class='value'>{esc(data.get('log_desc', ''))}</td></tr>",
f"<tr><td class='label'>Intentos:</td><td class='value'><strong>{esc(data.get('fired_times', '1'))}</strong></td></tr>",
f"<tr><td class='label'>Timestamp:</td><td class='value'><code>{esc(data.get('timestamp', ''))}</code></td></tr>"
]
return "\n".join(rows)
def _build_ips_info_rows(self, data: Dict[str, Any]) -> str:
"""Construir filas de información para IPS"""
esc = lambda x: html.escape(str(x or "-"))
# Badge de GeoIP
geo_badge = ""
if data.get("country"):
flag = self._get_country_flag(data.get("iso", ""))
geo_text = f"{flag} {data['country']}"
if data.get("city"):
geo_text += f" • {data['city']}"
geo_style = "geo-risk" if data.get("is_high_risk") else "geo"
geo_badge = self._create_badge(geo_text, geo_style)
# Badges
severity_styles = {
"critical": "critical",
"high": "danger",
"medium": "warning",
"low": "info"
}
severity_badge = self._create_badge(
data.get('severity', 'unknown').upper(),
severity_styles.get(data.get('severity', '').lower(), 'default')
)
action_style = "danger" if data.get('action') == 'detected' else "success"
action_badge = self._create_badge(data.get('action', '').upper(), action_style)
attack_type_badge = self._create_badge(data.get('attack_type', 'IPS Detection'), 'info')
device_badge = self._create_badge(f"{data.get('dev_name', '')} / {data.get('dev_id', '')}", 'default')
agent_badge = self._create_badge(f"{data.get('agent_name', '')} (ID {data.get('agent_id', '')})", 'warning')
rows = [
f"<tr><td class='label'>Tipo de Ataque:</td><td class='value'>{attack_type_badge}</td></tr>",
f"<tr><td class='label'>Firma:</td><td class='value'><code>{esc(data.get('attack', ''))}</code></td></tr>",
f"<tr><td class='label'>IP Origen:</td><td class='value'><code>{esc(data.get('src_ip', ''))}</code> {geo_badge}</td></tr>",
f"<tr><td class='label'>IP Destino:</td><td class='value'><code>{esc(data.get('dst_ip', ''))}:{esc(data.get('dst_port', ''))}</code></td></tr>",
f"<tr><td class='label'>Severidad:</td><td class='value'>{severity_badge}</td></tr>",
f"<tr><td class='label'>Acción IPS:</td><td class='value'>{action_badge}</td></tr>",
f"<tr><td class='label'>Protocolo:</td><td class='value'><code>{esc(data.get('protocol', ''))}</code></td></tr>",
f"<tr><td class='label'>Dispositivo:</td><td class='value'>{device_badge}</td></tr>",
f"<tr><td class='label'>Agente Wazuh:</td><td class='value'>{agent_badge}</td></tr>",
f"<tr><td class='label'>Intentos:</td><td class='value'><strong>{esc(data.get('fired_times', '1'))}</strong></td></tr>",
f"<tr><td class='label'>Timestamp:</td><td class='value'><code>{esc(data.get('timestamp', ''))}</code></td></tr>"
]
return "\n".join(rows)
def _build_generic_info_rows(self, data: Dict[str, Any]) -> str:
"""Construir filas de información para alertas genéricas"""
esc = lambda x: html.escape(str(x or "-"))
agent_name = data.get('agent_name', '')
agent_id = data.get('agent_id', '')
agent_badge = self._create_badge(f"{agent_name} (ID {agent_id})", 'default')
decoder_badge = self._create_badge(data.get('decoder', ''), 'info')
rows = [
f"<tr><td class='label'>Agente:</td><td class='value'>{agent_badge}</td></tr>",
f"<tr><td class='label'>Ubicación:</td><td class='value'><code>{esc(data.get('location', ''))}</code></td></tr>",
f"<tr><td class='label'>Decoder:</td><td class='value'>{decoder_badge}</td></tr>",
f"<tr><td class='label'>Grupos:</td><td class='value'>{esc(data.get('rule_groups', ''))}</td></tr>",
f"<tr><td class='label'>Intentos:</td><td class='value'><strong>{esc(data.get('fired_times', '1'))}</strong></td></tr>",
f"<tr><td class='label'>Timestamp:</td><td class='value'><code>{esc(data.get('timestamp', ''))}</code></td></tr>"
]
return "\n".join(rows)
def _build_html_body(self, data: Dict[str, Any], alert_type: str) -> str:
"""Construir cuerpo HTML del email con template unificado"""
esc = lambda x: html.escape(str(x or "-"))
css_styles = """
<style>
body { font-family: 'Segoe UI', Roboto, Arial, sans-serif; background: #f8fafc; margin: 0; padding: 20px; }
.container { max-width: 800px; margin: 0 auto; background: white; border-radius: 12px; box-shadow: 0 4px 6px rgba(0,0,0,0.1); overflow: hidden; }
.header { padding: 24px; border-left: 8px solid """ + data.get('color', '#6366f1') + """; background: linear-gradient(135deg, #f8fafc 0%, #e2e8f0 100%); }
.title { font-size: 24px; font-weight: 700; margin-bottom: 8px; color: #1f2937; }
.subtitle { font-size: 14px; color: #64748b; margin-bottom: 16px; }
.badges { display: flex; gap: 8px; flex-wrap: wrap; }
.pivot-section { background: #fef2f2; padding: 16px; margin: 20px; border-radius: 8px; border-left: 4px solid """ + data.get('color', '#6366f1') + """; }
.pivot-label { font-weight: bold; color: #374151; margin-bottom: 4px; }
.pivot-value { font-family: 'Courier New', monospace; font-size: 16px; font-weight: bold; color: #1f2937; }
.info-table { width: 100%; border-collapse: collapse; margin: 20px 0; }
.info-table td { padding: 12px 16px; border-bottom: 1px solid #e5e7eb; }
.info-table .label { font-weight: 600; color: #374151; width: 160px; background: #f9fafb; }
.info-table .value { color: #1f2937; }
.recommendation { background: #fef3c7; padding: 16px; margin: 20px; border-radius: 8px; border-left: 4px solid #f59e0b; }
.recommendation-title { font-weight: bold; color: #92400e; margin-bottom: 8px; }
.recommendation-text { color: #78350f; }
.actions { background: #f0f9ff; padding: 16px; margin: 20px; border-radius: 8px; border-left: 4px solid #0284c7; }
.actions-title { font-weight: bold; color: #0c4a6e; margin-bottom: 8px; }
.actions ul { margin: 8px 0; padding-left: 20px; color: #075985; }
.footer { padding: 16px; background: #f8fafc; border-top: 1px solid #e5e7eb; font-size: 12px; color: #64748b; text-align: center; }
</style>
"""
priority_styles = {
"EMERGENCIA": "critical",
"CRÍTICO URGENTE": "critical",
"CRÍTICO": "danger",
"ALTO": "warning",
"MEDIO-ALTO": "warning",
"MEDIO": "info",
"BAJO": "default"
}
priority_style = priority_styles.get(data.get("priority", "MEDIO"), "default")
badges_html = f"""
{self._create_badge(f"RULE {data['rule_id']} • NIVEL {data['rule_level']}", "info")}
{self._create_badge(data.get('priority', 'MEDIO'), priority_style)}
"""
pivot_html = f"""
<div class="pivot-section">
<div class="pivot-label">Información Principal:</div>
<div class="pivot-value">{esc(data.get('pivot', 'N/A'))}</div>
</div>
"""
if alert_type == "active_directory":
info_rows = self._build_ad_info_rows(data)
elif alert_type == "fortigate_vpn":
info_rows = self._build_fgt_vpn_info_rows(data)
elif alert_type == "fortigate_ips":
info_rows = self._build_ips_info_rows(data)
else:
info_rows = self._build_generic_info_rows(data)
recommendation_html = ""
if data.get("recommendation"):
recommendation_html = f"""
<div class="recommendation">
<div class="recommendation-title">Recomendación:</div>
<div class="recommendation-text">{esc(data['recommendation'])}</div>
</div>
"""
actions_html = f"""
<div class="actions">
<div class="actions-title">Acciones Sugeridas:</div>
<ul>
<li>Revisar actividad reciente en los logs del sistema</li>
<li>Verificar la legitimidad del evento con usuarios involucrados</li>
<li>Documentar el incidente en el sistema de tickets</li>
<li>Aplicar medidas correctivas si es necesario</li>
{f"<li><strong>{data['recommendation']}</strong></li>" if data.get('recommendation') else ""}
</ul>
</div>
"""
return f"""<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{esc(data.get('title', 'Wazuh Alert'))}</title>
{css_styles}
</head>
<body>
<div class="container">
<div class="header">
<div class="title">{data.get('title', 'Alerta de Seguridad')}</div>
<div class="subtitle">{esc(data.get('rule_desc', ''))}</div>
<div class="badges">{badges_html}</div>
</div>
{pivot_html}
<table class="info-table">
{info_rows}
</table>
{recommendation_html}
{actions_html}
<div class="footer">
<strong>Wazuh SIEM Alert</strong> • Unified Email Notifier v4.0<br>
Generado: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}
</div>
</div>
</body>
</html>"""
def _build_text_body(self, data: Dict[str, Any], alert_type: str) -> str:
"""Construir cuerpo de texto plano del email"""
lines = [
f"{data.get('title', 'Alerta de Seguridad')}",
"=" * 80,
f"PRIORIDAD: {data.get('priority', 'MEDIO')}",
f"Rule: {data.get('rule_id', '')} (Nivel {data.get('rule_level', '')})",
f"Descripción: {data.get('rule_desc', '')}",
"",
f"INFORMACIÓN PRINCIPAL: {data.get('pivot', 'N/A')}",
"-" * 50
]
if alert_type == "active_directory":
lines.extend(self._build_ad_text_info(data))
elif alert_type == "fortigate_vpn":
lines.extend(self._build_fgt_text_info(data))
elif alert_type == "fortigate_ips":
lines.extend(self._build_ips_text_info(data))
else:
lines.extend(self._build_generic_text_info(data))
lines.extend([
"",
f"Agente: {data.get('agent_name', '')} (ID {data.get('agent_id', '')})",
f"Intentos: {data.get('fired_times', '1')}",
f"Timestamp: {data.get('timestamp', '')}",
""
])
if data.get("recommendation"):
lines.extend([
f"RECOMENDACIÓN: {data['recommendation']}",
""
])
lines.extend([
"ACCIONES SUGERIDAS:",
"- Revisar actividad reciente en los logs",
"- Verificar legitimidad con usuarios involucrados",
"- Documentar en sistema de tickets",
"- Aplicar medidas correctivas si es necesario",
"",
"=" * 80
])
return "\n".join(lines)
def _build_ad_text_info(self, data: Dict[str, Any]) -> list:
"""Información de texto para AD"""
lines = []
event_id = data.get("event_id", "")
if event_id in ("4720", "4726"):
lines.extend([
f"Usuario: {data.get('target_user', '')}",
f"SAM: {data.get('sam_account', '')}",
f"UPN: {data.get('user_principal', '')}",
f"Nombre: {data.get('display_name', '')}"
])
elif event_id in ("4732", "4733", "4728", "4729"):
privilege_note = " [GRUPO PRIVILEGIADO]" if data.get("is_privileged") else ""
lines.extend([
f"Miembro: {data.get('member_name', '')}",
f"Grupo: {data.get('group_name', '')}{privilege_note}"
])
anon_note = " [ANONYMOUS LOGON]" if data.get("is_anonymous") else ""
lines.extend([
f"Ejecutado por: {data.get('subject_user', '')} [{data.get('subject_domain', '')}]{anon_note}",
f"Equipo/DC: {data.get('computer', '')}"
])
return lines
def _build_fgt_text_info(self, data: Dict[str, Any]) -> list:
"""Información de texto para FortiGate VPN/Admin"""
geo_info = ""
if data.get("country"):
geo_info = f" ({data['country']}"
if data.get("city"):
geo_info += f" - {data['city']}"
geo_info += ")"
if data.get("is_high_risk"):
geo_info += " [PAÍS DE ALTO RIESGO]"
return [
f"IP Origen: {data.get('src_ip', '')}{geo_info}",
f"Usuario: {data.get('dst_user', '')}",
f"Vector: {data.get('vector', 'Desconocido')}",
f"Dispositivo: {data.get('dev_name', '')} / {data.get('dev_id', '')}",
f"Estado: {data.get('status', '')} Motivo: {data.get('reason', '')}",
f"Log: {data.get('log_desc', '')}"
]
def _build_ips_text_info(self, data: Dict[str, Any]) -> list:
"""Información de texto para IPS"""
geo_info = ""
if data.get("country"):
geo_info = f" ({data['country']}"
if data.get("city"):
geo_info += f" - {data['city']}"
geo_info += ")"
if data.get("is_high_risk"):
geo_info += " [PAÍS DE ALTO RIESGO]"
return [
f"Tipo de Ataque: {data.get('attack_type', 'IPS Detection')}",
f"Firma: {data.get('attack', '')}",
f"IP Origen: {data.get('src_ip', '')}{geo_info}",
f"IP Destino: {data.get('dst_ip', '')}:{data.get('dst_port', '')}",
f"Severidad: {data.get('severity', '').upper()}",
f"Acción IPS: {data.get('action', '').upper()}",
f"Protocolo: {data.get('protocol', '')}",
f"Dispositivo: {data.get('dev_name', '')} / {data.get('dev_id', '')}"
]
def _build_generic_text_info(self, data: Dict[str, Any]) -> list:
"""Información de texto para alertas genéricas"""
return [
f"Ubicación: {data.get('location', '')}",
f"Decoder: {data.get('decoder', '')}",
f"Grupos: {data.get('rule_groups', '')}"
]
def _build_subject(self, data: Dict[str, Any]) -> str:
"""Construir asunto del email"""
prefix = self.config.get("subject_prefix", "[Wazuh SIEM]")
priority = data.get("priority", "MEDIO")
title = data.get("title", "Alerta de Seguridad")
pivot = data.get("pivot", "N/A")
level = data.get("rule_level", "")
max_title_len = 40
max_pivot_len = 30
if len(title) > max_title_len:
title = title[:max_title_len-3] + "..."
if len(pivot) > max_pivot_len:
pivot = pivot[:max_pivot_len-3] + "..."
return f"{prefix} {priority} • {title} • {pivot} • L{level}"
def _send_email(self, message: MIMEMultipart):
"""Enviar email con manejo robusto de errores"""
smtp_config = {
"host": self.config["smtp_host"],
"port": int(self.config.get("smtp_port", 587)),
"use_tls": bool(self.config.get("use_tls", True)),
"username": self.config.get("username"),
"password": self.config.get("password"),
"timeout": int(self.config.get("timeout", 30))
}
recipients = self.config.get("to", [])
if not recipients:
raise ValueError("No hay destinatarios configurados")
self.logger.info(f"Enviando email a {len(recipients)} destinatarios via {smtp_config['host']}:{smtp_config['port']}")
try:
if smtp_config["use_tls"]:
context = ssl.create_default_context()
if self.config.get("insecure_skip_verify", False):
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
self.logger.warning("Verificación SSL deshabilitada")
with smtplib.SMTP(smtp_config["host"], smtp_config["port"],
timeout=smtp_config["timeout"]) as server:
server.ehlo()
server.starttls(context=context)
server.ehlo()
if smtp_config["username"] and smtp_config["password"]:
server.login(smtp_config["username"], smtp_config["password"])
self.logger.debug("Autenticación SMTP exitosa")
server.sendmail(message["From"], recipients, message.as_string())
else:
with smtplib.SMTP(smtp_config["host"], smtp_config["port"],
timeout=smtp_config["timeout"]) as server:
if smtp_config["username"] and smtp_config["password"]:
server.login(smtp_config["username"], smtp_config["password"])
self.logger.debug("Autenticación SMTP exitosa")
server.sendmail(message["From"], recipients, message.as_string())
self.logger.info("Email enviado exitosamente")
except smtplib.SMTPAuthenticationError as e:
raise RuntimeError(f"Error de autenticación SMTP: {e}")
except smtplib.SMTPRecipientsRefused as e:
raise RuntimeError(f"Destinatarios rechazados: {e}")
except smtplib.SMTPException as e:
raise RuntimeError(f"Error SMTP: {e}")
except Exception as e:
raise RuntimeError(f"Error enviando email: {e}")
def build_email(self, alert: Dict[str, Any]) -> Tuple[MIMEMultipart, str]:
"""Construir mensaje de email completo"""
alert_type = self._detect_alert_type(alert)
self.logger.info(f"Procesando alerta tipo: {alert_type}")
if alert_type == "active_directory":
data = self._extract_ad_data(alert)
elif alert_type == "fortigate_vpn":
data = self._extract_fortigate_vpn_data(alert)
elif alert_type == "fortigate_ips":
data = self._extract_fortigate_ips_data(alert)
else:
data = self._extract_common_data(alert)
data["title"] = "Alerta de Seguridad Wazuh"
data["color"] = "#6366f1"
data["priority"] = "MEDIO"
data["pivot"] = f"Rule {data['rule_id']} • {data.get('agent_name', 'N/A')}"
html_body = self._build_html_body(data, alert_type)
text_body = self._build_text_body(data, alert_type)
subject = self._build_subject(data)
message = MIMEMultipart("alternative")
message["Subject"] = Header(subject, "utf-8")
message["From"] = self.config.get("from", "wazuh@localhost")
message["To"] = ", ".join(self.config.get("to", []))
message["X-Priority"] = "1" if data.get("priority") in ["EMERGENCIA", "CRÍTICO URGENTE", "CRÍTICO"] else "3"
message.attach(MIMEText(text_body, "plain", "utf-8"))
message.attach(MIMEText(html_body, "html", "utf-8"))
return message, subject
def process_alert(self, alert_file: str):
"""Procesar alerta y enviar email"""
try:
alert = self._load_alert(alert_file)
self.logger.info(f"Alerta cargada desde: {alert_file}")
message, subject = self.build_email(alert)
self.logger.info(f"Email construido: {subject}")
self._send_email(message)
self.logger.info("Procesamiento completado exitosamente")
except Exception as e:
self.logger.error(f"Error procesando alerta: {e}")
raise
def main():
"""Función principal"""
if len(sys.argv) < 4:
print("Uso: custom-email-unified <alert_file> <api_key|''> <config_file>", file=sys.stderr)
print("", file=sys.stderr)
print("Ejemplos:", file=sys.stderr)
print(" custom-email-unified /tmp/alert.json '' /var/ossec/etc/email-config.json", file=sys.stderr)
print("", file=sys.stderr)
sys.exit(1)
alert_file = sys.argv[1]
config_file = sys.argv[3]
try:
notifier = UnifiedEmailNotifier(config_file)
notifier.process_alert(alert_file)
print("Email enviado exitosamente")
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
Como puedes notar el script tiene la posiblidad de enviar sobre reglas de Active Directory tambien, algo que puedes implementar sin problemas. No dejen de instalar sudo pip3 install geoip2.
Vamos a darle los permisos.
sudo chown root:wazuh /var/ossec/integrations/custom-email-unified
sudo chmod 750 /var/ossec/integrations/custom-email-unified
Por ultimo, vamos a agregar esto al ossec.conf.
<integration>
<name>custom-email</name>
<hook_url>/var/ossec/etc/email-config.json</hook_url>
<level>14</level>
<rule_id>196207,196222,196228</rule_id>
<alert_format>json</alert_format>
</integration>
<!-- Desglose de reglas Level 14:
- 196207: IPS - Ataque crítico NO bloqueado
- 196222: IPS - Ransomware detectado
- 196228: IPS - Múltiples ataques críticos en ráfaga
-->
<!-- ================================================
NIVEL ALTO (Level 10-13): Ataques Críticos
- Ataques bloqueados de alta severidad
- SQL Injection, RCE, malware, DoS/DDoS
- Correlaciones de múltiples ataques
================================================ -->
<integration>
<name>custom-email</name>
<hook_url>/var/ossec/etc/email-config.json</hook_url>
<level>10</level>
<rule_id>196201,196202,196208,196212,196213,196217,196218,196220,196221,196226,196227,196230,196232</rule_id>
<alert_format>json</alert_format>
</integration>
<!-- Desglose de reglas Level 10-13:
- 196201: IPS - Ataque severidad CRITICAL (bloqueado)
- 196202: IPS - Ataque severidad HIGH
- 196208: IPS - SQL Injection
- 196212: IPS - Buffer Overflow
- 196213: IPS - Remote Code Execution (RCE)
- 196217: IPS - DoS Attack
- 196218: IPS - DDoS Attack
- 196220: IPS - Malware detectado
- 196221: IPS - Exploit attempt
- 196226: IPS - Múltiples ataques desde misma IP
- 196227: IPS - IP atacando múltiples objetivos
- 196230: IPS - Ataque a puerto RDP (3389)
- 196232: IPS - [Personalizar según tu entorno]
-->
<!-- ================================================
NIVEL MEDIO-ALTO (Level 7-9): VPN y Eventos Sospechosos
- Brute-force attacks
- Conexiones desde países restringidos
- Actividad fuera de horario
- Posible compromiso de cuentas
================================================ -->
<integration>
<name>custom-email</name>
<hook_url>/var/ossec/etc/email-config.json</hook_url>
<level>7</level>
<rule_id>196100,196101,196104,196105,196112,196113</rule_id>
<alert_format>json</alert_format>
</integration>
<!-- Desglose de reglas Level 7-12:
- 196100: FortiGate - Admin brute-force (6+ intentos)
- 196101: FortiGate - VPN SSL brute-force (10+ intentos)
- 196104: FortiGate - VPN desde múltiples IPs (compromiso)
- 196105: FortiGate - VPN usuario monitoreado fin de semana
- 196112: FortiGate - VPN desde país restringido
- 196113: FortiGate - Admin login fuera de horario
-->
Reiniciamos el manager.
sudo systemctl restart wazuh-manager.service
Prueba
Listo. Ahora si vamos a hacer las pruebas, para que las alarmas se disparen. Generamos la data dummy y ejecutamos.
cat > /tmp/test-vpn-bruteforce.json << 'EOF'
{
"timestamp": "2025-10-05T14:40:00.000Z",
"rule": {
"id": "196101",
"level": 12,
"description": "FGT: SSL VPN bruteforce from same IP (10+ attempts in 3min)",
"groups": ["fortigate", "vpn", "bruteforce", "critical"],
"firedtimes": 15
},
"agent": {
"id": "000",
"name": "firewall-master"
},
"manager": {
"name": "wazuh-manager-prod"
},
"data": {
"srcip": "131.159.24.205",
"remip": "131.159.24.205",
"dstuser": "admin",
"user": "admin",
"status": "error",
"reason": "authentication failed",
"action": "ssl-login",
"logdesc": "SSL VPN login fail",
"method": "https",
"ui": "https(131.159.24.205)",
"devname": "FG-100F-PROD",
"devid": "FG100FTK21002473"
},
"decoder": {
"name": "fortigate-firewall-v5"
},
"location": "firewall->/var/log/fortigate.log"
}
EOF
sudo -u wazuh python3 /var/ossec/integrations/custom-email-unified /tmp/test-vpn-bruteforce.json '' /var/ossec/etc/email-config.json

Voila!

Ya tenemos nuestro SIEM, ejecutando las reglas y avisandonos sobre los diferentes incidentes.
Dashboard
Cree unos Dashboard’s, uno para IPs y otro para VPNs.


Espero que les sirva, fue divertido generar estas alarmas. Si queres los dashboars, escribrime.
Saludos.




