Aumenta el Valor de tus logs a través de la IA
Vamos a crear un servidor MCP para interactuar con Wazuh y dejar que la Inteligencia Artificial haga su trabajo.

Hace un tiempo escribí sobre Wazuh y Active Directory, y la verdad es que recibí muchos mensajes. Seguro que escribiré más sobre eso, pero ahora me voy a centrar en algo que estuve investigando en internet. Me pregunté: ¿Y si conectamos Wazuh a una IA? Como siempre, mucha gente ya estaba trabajando en eso, así que vamos a hacer el paso a paso para lograrlo nosotros mismos.
Antes que nada, les recomiendo leer los artículos de referencia. En esta Prueba de Concepto, vamos a crear nuestro propio servidor MCP para que se vincule con la API de Wazuh, utilizando las siguientes herramientas. Luego, puedes agregar lo que consideres necesario.
📋 Herramientas disponibles:
agents_summary- Resumen rápido de todos los agentesactive_agents- Lista solo agentes activossearch_agent- Buscar agentes por nombre o IPmanager_info- Información del manager Wazuhagent_details- Detalles específicos de un agente
🛟 Arquitectura
Vamos a tener un servidor SIEM con dos maquinas reportando. Un servidor Ubuntu y uno Microsoft. Luego tendremos que configurar el cliente de Claude.io para que se conecte a mi servidor MCP que sera el encargado de dialogar con Wazuh y responder en base los datos que tenemos en nuestro Index.

🛜 Servidor MCP
Esta version, en Python, es algo muy acotado. Recomiendo ver las referencias, al pie del articulo, para tener todo el poder de esta clase de implementaciones.
#!/usr/bin/env python3
"""
Wazuh MCP Server
Servidor MCP (Model Context Protocol) para integración con Wazuh
"""
import asyncio
import json
import logging
import sys
import os
import argparse
import signal
import time
from typing import Any, Dict, List, Optional, Union
from datetime import datetime
from abc import ABC, abstractmethod
# Verificar dependencias
try:
import httpx
except ImportError:
print("Error: httpx no está instalado. Ejecuta: pip install httpx", file=sys.stderr)
sys.exit(1)
# ============================================================================
# CONFIGURACIÓN Y UTILIDADES
# ============================================================================
def setup_unbuffered_streams():
"""Configurar streams sin buffer para mejor logging"""
sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', buffering=1)
sys.stderr = os.fdopen(sys.stderr.fileno(), 'w', buffering=1)
def setup_logging(debug: bool = False, log_name: str = "wazuh-mcp") -> logging.Logger:
"""
Configurar sistema de logging
Args:
debug: Habilitar modo debug
log_name: Nombre del logger
Returns:
Logger configurado
"""
setup_unbuffered_streams()
# Directorio y archivo de log
log_dir = os.path.dirname(os.path.abspath(__file__))
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
log_file = os.path.join(log_dir, f'{log_name}_{timestamp}.log')
# Formatters
file_formatter = logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(name)-15s | %(message)s',
datefmt='%H:%M:%S'
)
console_formatter = logging.Formatter(
'%(asctime)s | %(levelname)s | %(message)s',
datefmt='%H:%M:%S'
)
# Handlers
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(file_formatter)
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setFormatter(console_formatter)
# Logger principal
logger = logging.getLogger(log_name)
logger.setLevel(logging.DEBUG if debug else logging.INFO)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.info(f"Logging inicializado - Archivo: {log_file}")
return logger
# ============================================================================
# EXCEPCIONES PERSONALIZADAS
# ============================================================================
class WazuhMCPException(Exception):
"""Excepción base para Wazuh MCP Server"""
pass
class AuthenticationError(WazuhMCPException):
"""Error de autenticación"""
pass
class APIError(WazuhMCPException):
"""Error en petición API"""
def __init__(self, message: str, status_code: int = None, response_text: str = None):
super().__init__(message)
self.status_code = status_code
self.response_text = response_text
class ToolExecutionError(WazuhMCPException):
"""Error en ejecución de herramienta"""
pass
# ============================================================================
# CLIENTE WAZUH
# ============================================================================
class WazuhClient:
"""Cliente asíncrono para la API de Wazuh"""
def __init__(self, wazuh_url: str, username: str, password: str):
self.wazuh_url = wazuh_url.rstrip('/')
self.username = username
self.password = password
self.token: Optional[str] = None
self.logger = logging.getLogger("wazuh-client")
# Cliente HTTP optimizado
self.client = httpx.AsyncClient(
verify=False,
timeout=httpx.Timeout(15.0, connect=5.0),
limits=httpx.Limits(max_keepalive_connections=5, max_connections=10)
)
self.logger.info(f"WazuhClient inicializado para {self.wazuh_url}")
async def authenticate(self) -> bool:
"""
Autenticar con la API de Wazuh
Returns:
True si la autenticación es exitosa
Raises:
AuthenticationError: Si la autenticación falla
"""
try:
start_time = time.time()
auth_url = f"{self.wazuh_url}/security/user/authenticate"
self.logger.info(f"Autenticando en: {auth_url}")
auth = httpx.BasicAuth(self.username, self.password)
response = await self.client.get(
auth_url,
auth=auth,
headers={"Content-Type": "application/json"}
)
elapsed = (time.time() - start_time) * 1000
self.logger.info(f"Respuesta de auth en {elapsed:.0f}ms - Status: {response.status_code}")
if response.status_code != 200:
error_msg = f"Autenticación fallida: {response.text[:200]}"
self.logger.error(error_msg)
raise AuthenticationError(error_msg)
data = response.json()
self.token = data['data']['token']
self.logger.info("Autenticación exitosa")
return True
except httpx.RequestError as e:
error_msg = f"Error en petición de auth: {e}"
self.logger.error(error_msg)
raise AuthenticationError(error_msg)
except Exception as e:
error_msg = f"Error de autenticación: {e}"
self.logger.error(error_msg)
raise AuthenticationError(error_msg)
async def make_request(
self,
endpoint: str,
method: str = "GET",
params: Optional[Dict] = None,
retry_auth: bool = True
) -> Dict[str, Any]:
"""
Realizar petición autenticada a la API de Wazuh
Args:
endpoint: Endpoint de la API
method: Método HTTP
params: Parámetros de la petición
retry_auth: Si reintentar con nueva auth en 401
Returns:
Datos de respuesta
Raises:
APIError: Si la petición falla
"""
if not self.token:
self.logger.info("Sin token, autenticando...")
await self.authenticate()
headers = {"Authorization": f"Bearer {self.token}"}
url = f"{self.wazuh_url}{endpoint}"
start_time = time.time()
self.logger.debug(f"{method} {endpoint}")
try:
response = await self._execute_request(method, url, headers, params)
elapsed = (time.time() - start_time) * 1000
self.logger.info(f"{endpoint} - {elapsed:.0f}ms")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 401 and retry_auth:
self.logger.warning("Token expirado, re-autenticando...")
await self.authenticate()
headers = {"Authorization": f"Bearer {self.token}"}
response = await self._execute_request(method, url, headers, params)
response.raise_for_status()
return response.json()
error_msg = f"HTTP Error {e.response.status_code}: {e}"
self.logger.error(error_msg)
raise APIError(error_msg, e.response.status_code, e.response.text)
except httpx.RequestError as e:
error_msg = f"Error de petición: {e}"
self.logger.error(error_msg)
raise APIError(error_msg)
async def _execute_request(
self,
method: str,
url: str,
headers: Dict,
params: Optional[Dict]
) -> httpx.Response:
"""Ejecutar petición HTTP"""
if method.upper() == "GET":
return await self.client.get(url, headers=headers, params=params)
elif method.upper() == "POST":
return await self.client.post(url, headers=headers, json=params)
else:
return await self.client.request(method, url, headers=headers, params=params)
async def close(self):
"""Cerrar el cliente HTTP"""
await self.client.aclose()
self.logger.info("Cliente cerrado")
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
# ============================================================================
# HERRAMIENTAS BASE
# ============================================================================
class BaseTool(ABC):
"""Clase base para herramientas MCP"""
def __init__(self, wazuh_client: WazuhClient):
self.wazuh_client = wazuh_client
self.logger = logging.getLogger(f"tool-{self.name}")
@property
@abstractmethod
def name(self) -> str:
"""Nombre de la herramienta"""
pass
@property
@abstractmethod
def description(self) -> str:
"""Descripción de la herramienta"""
pass
@property
@abstractmethod
def input_schema(self) -> Dict[str, Any]:
"""Schema de entrada de la herramienta"""
pass
@abstractmethod
async def execute(self, arguments: Dict[str, Any]) -> str:
"""
Ejecutar la herramienta
Args:
arguments: Argumentos de la herramienta
Returns:
Salida de la herramienta como string
"""
pass
def to_dict(self) -> Dict[str, Any]:
"""Convertir herramienta a definición MCP"""
return {
"name": self.name,
"description": self.description,
"inputSchema": self.input_schema
}
# ============================================================================
# HERRAMIENTAS ESPECÍFICAS
# ============================================================================
class AgentsSummaryTool(BaseTool):
"""Obtener resumen rápido de todos los agentes Wazuh"""
@property
def name(self) -> str:
return "agents_summary"
@property
def description(self) -> str:
return "Obtener resumen rápido de todos los agentes Wazuh"
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {},
"additionalProperties": False
}
async def execute(self, arguments: Dict[str, Any]) -> str:
result = await self.wazuh_client.make_request(
"/agents",
params={"limit": 100, "select": "id,name,status,ip,os.name"}
)
agents = result.get('data', {}).get('affected_items', [])
total = result.get('data', {}).get('total_affected_items', 0)
# Contar por estado
active = sum(1 for a in agents if a.get('status') == 'active')
disconnected = sum(1 for a in agents if a.get('status') == 'disconnected')
never = sum(1 for a in agents if a.get('status') == 'never_connected')
response = "🔍 **Resumen de Agentes Wazuh**\n\n"
response += f"**Total:** {total} agentes\n\n"
response += f"**Estado:**\n"
response += f"• Activos: {active}\n"
response += f"• Desconectados: {disconnected}\n"
response += f"• Nunca conectados: {never}\n"
if active > 0:
response += f"\n**Muestra de Agentes Activos:**\n"
for agent in [a for a in agents if a.get('status') == 'active'][:5]:
response += f"• {agent['name']} ({agent['id']}) - {agent['ip']}\n"
return response
class ActiveAgentsTool(BaseTool):
"""Listar solo agentes activos"""
@property
def name(self) -> str:
return "active_agents"
@property
def description(self) -> str:
return "Listar solo agentes activos"
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Máximo número de agentes a retornar (default 10)",
"default": 10
}
},
"additionalProperties": False
}
async def execute(self, arguments: Dict[str, Any]) -> str:
limit = arguments.get('limit', 10)
result = await self.wazuh_client.make_request(
"/agents",
params={"status": "active", "limit": limit}
)
agents = result.get('data', {}).get('affected_items', [])
response = f"🟢 **Agentes Activos** ({len(agents)} mostrados)\n\n"
for agent in agents:
response += f"**{agent.get('name')}** (ID: {agent.get('id')})\n"
response += f"• IP: {agent.get('ip')}\n"
response += f"• OS: {agent.get('os', {}).get('name', 'Desconocido')}\n"
response += f"• Versión: {agent.get('version')}\n\n"
return response
class SearchAgentTool(BaseTool):
"""Buscar agentes por nombre o IP"""
@property
def name(self) -> str:
return "search_agent"
@property
def description(self) -> str:
return "Buscar agentes por nombre o IP"
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Término de búsqueda"
}
},
"required": ["query"],
"additionalProperties": False
}
async def execute(self, arguments: Dict[str, Any]) -> str:
query = arguments.get('query')
result = await self.wazuh_client.make_request(
"/agents",
params={"search": query}
)
agents = result.get('data', {}).get('affected_items', [])
if agents:
response = f"🔍 **Resultados de búsqueda para '{query}':**\n\n"
for agent in agents:
status_emoji = "🟢" if agent.get('status') == 'active' else "🔴"
response += f"{status_emoji} **{agent.get('name')}**\n"
response += f"• ID: {agent.get('id')}\n"
response += f"• Estado: {agent.get('status')}\n"
response += f"• IP: {agent.get('ip')}\n\n"
else:
response = f"❌ No se encontraron agentes que coincidan con '{query}'"
return response
class ManagerInfoTool(BaseTool):
"""Obtener información del manager Wazuh"""
@property
def name(self) -> str:
return "manager_info"
@property
def description(self) -> str:
return "Obtener información del manager Wazuh"
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {},
"additionalProperties": False
}
async def execute(self, arguments: Dict[str, Any]) -> str:
result = await self.wazuh_client.make_request("/manager/info")
info = result.get('data', {}).get('affected_items', [{}])[0]
response = "ℹ️ **Información del Manager Wazuh**\n\n"
response += f"**Versión:** {info.get('version', 'N/A')}\n"
response += f"**Instalación:** {info.get('installation_date', 'N/A')}\n"
response += f"**Tipo:** {info.get('type', 'N/A')}\n"
# Intentar obtener estado también
try:
status_result = await self.wazuh_client.make_request("/manager/status")
status = status_result.get('data', {}).get('affected_items', [{}])[0]
response += f"\n**Estado:**\n"
for key, value in status.items():
response += f"• {key}: {value}\n"
except Exception:
pass
return response
class AgentDetailsTool(BaseTool):
"""Obtener información detallada de un agente específico"""
@property
def name(self) -> str:
return "agent_details"
@property
def description(self) -> str:
return "Obtener información detallada de un agente específico"
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"agent_id": {
"type": "string",
"description": "ID del agente"
}
},
"required": ["agent_id"],
"additionalProperties": False
}
async def execute(self, arguments: Dict[str, Any]) -> str:
agent_id = arguments.get('agent_id')
result = await self.wazuh_client.make_request(f"/agents/{agent_id}")
if result.get('data', {}).get('affected_items'):
agent = result['data']['affected_items'][0]
status_emoji = "🟢" if agent.get('status') == 'active' else "🔴"
response = f"{status_emoji} **Detalles del Agente {agent_id}**\n\n"
response += f"**Nombre:** {agent.get('name')}\n"
response += f"**Estado:** {agent.get('status')}\n"
response += f"**IP:** {agent.get('ip')}\n"
response += f"**OS:** {agent.get('os', {}).get('name', 'N/A')} {agent.get('os', {}).get('version', '')}\n"
response += f"**Versión:** {agent.get('version')}\n"
response += f"**Registro:** {agent.get('dateAdd')}\n"
response += f"**Última conexión:** {agent.get('lastKeepAlive')}\n"
else:
response = f"❌ Agente {agent_id} no encontrado"
return response
# ============================================================================
# REGISTRO DE HERRAMIENTAS
# ============================================================================
class ToolRegistry:
"""Registro para gestionar herramientas"""
def __init__(self):
self._tools: Dict[str, BaseTool] = {}
self.logger = logging.getLogger("tool-registry")
def register(self, tool: BaseTool):
"""Registrar una herramienta"""
self._tools[tool.name] = tool
self.logger.info(f"Herramienta registrada: {tool.name}")
def get_tool(self, name: str) -> BaseTool:
"""Obtener herramienta por nombre"""
if name not in self._tools:
raise ToolExecutionError(f"Herramienta desconocida: {name}")
return self._tools[name]
def list_tools(self) -> List[Dict[str, Any]]:
"""Listar todas las herramientas registradas"""
return [tool.to_dict() for tool in self._tools.values()]
async def execute_tool(self, name: str, arguments: Dict[str, Any]) -> str:
"""Ejecutar herramienta por nombre"""
tool = self.get_tool(name)
try:
self.logger.info(f"Ejecutando herramienta: {name}")
result = await tool.execute(arguments)
self.logger.info(f"Herramienta {name} completada exitosamente")
return result
except Exception as e:
self.logger.error(f"Herramienta {name} falló: {e}")
raise ToolExecutionError(f"Herramienta {name} falló: {e}")
# ============================================================================
# SERVIDOR MCP
# ============================================================================
class WazuhMCPServer:
"""Servidor MCP para Wazuh"""
def __init__(self, wazuh_client: WazuhClient):
self.wazuh_client = wazuh_client
self.message_count = 0
self.tool_registry = ToolRegistry()
self.logger = logging.getLogger("mcp-server")
# Registrar herramientas
self._register_tools()
self.logger.info("Servidor MCP inicializado")
def _register_tools(self):
"""Registrar todas las herramientas disponibles"""
tools = [
AgentsSummaryTool(self.wazuh_client),
ActiveAgentsTool(self.wazuh_client),
SearchAgentTool(self.wazuh_client),
ManagerInfoTool(self.wazuh_client),
AgentDetailsTool(self.wazuh_client)
]
for tool in tools:
self.tool_registry.register(tool)
async def handle_message(self, message: Dict) -> Optional[Dict]:
"""
Manejar mensajes MCP entrantes
Args:
message: Mensaje MCP
Returns:
Respuesta MCP o None
"""
self.message_count += 1
method = message.get("method")
params = message.get("params", {})
msg_id = message.get("id")
self.logger.info(f"[{self.message_count}] Procesando: {method}")
try:
if method == "initialize":
self.logger.info("Inicializando servidor...")
return {
"jsonrpc": "2.0",
"id": msg_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {
"name": "wazuh-mcp",
"version": "2.0.0"
}
}
}
elif method == "notifications/initialized":
self.logger.info("Servidor inicializado")
return None
elif method == "tools/list":
self.logger.info("Listando herramientas...")
return {
"jsonrpc": "2.0",
"id": msg_id,
"result": {
"tools": self.tool_registry.list_tools()
}
}
elif method == "tools/call":
return await self.handle_tool_call(params, msg_id)
else:
self.logger.warning(f"Método desconocido: {method}")
if msg_id:
return {
"jsonrpc": "2.0",
"id": msg_id,
"error": {
"code": -32601,
"message": f"Método no encontrado: {method}"
}
}
return None
except Exception as e:
self.logger.error(f"Error en {method}: {e}")
if msg_id:
return {
"jsonrpc": "2.0",
"id": msg_id,
"error": {
"code": -32603,
"message": str(e)
}
}
return None
async def handle_tool_call(self, params: Dict, msg_id: str) -> Dict:
"""
Manejar llamadas a herramientas
Args:
params: Parámetros de la llamada
msg_id: ID del mensaje
Returns:
Respuesta MCP
"""
tool_name = params.get("name")
arguments = params.get("arguments", {})
self.logger.info(f"Herramienta: {tool_name}")
try:
response = await self.tool_registry.execute_tool(tool_name, arguments)
return {
"jsonrpc": "2.0",
"id": msg_id,
"result": {
"content": [
{
"type": "text",
"text": response
}
]
}
}
except ToolExecutionError as e:
return {
"jsonrpc": "2.0",
"id": msg_id,
"error": {
"code": -32601,
"message": str(e)
}
}
except Exception as e:
self.logger.error(f"Error en herramienta: {e}")
return {
"jsonrpc": "2.0",
"id": msg_id,
"error": {
"code": -32603,
"message": str(e)
}
}
# ============================================================================
# FUNCIÓN PRINCIPAL
# ============================================================================
async def main():
"""Función principal del servidor"""
parser = argparse.ArgumentParser(description="Wazuh MCP Server v2.0 - Organizado")
parser.add_argument("--wazuh-url", required=True, help="URL de la API de Wazuh")
parser.add_argument("--username", required=True, help="Usuario de Wazuh")
parser.add_argument("--password", required=True, help="Contraseña de Wazuh")
parser.add_argument("--debug", action="store_true", help="Habilitar logging debug")
args = parser.parse_args()
# Configurar logging
logger = setup_logging(args.debug)
logger.info("=" * 60)
logger.info(" WAZUH MCP SERVER v2.0 - ORGANIZADO")
logger.info("=" * 60)
logger.info(f" URL: {args.wazuh_url}")
logger.info(f" Usuario: {args.username}")
logger.info("=" * 60)
# Inicializar cliente Wazuh
wazuh_client = WazuhClient(args.wazuh_url, args.username, args.password)
# Probar autenticación
logger.info("Probando autenticación...")
try:
if not await wazuh_client.authenticate():
logger.error("¡Autenticación fallida!")
await wazuh_client.close()
return 1
except Exception as e:
logger.error(f"Error de autenticación: {e}")
await wazuh_client.close()
return 1
# Inicializar servidor MCP
mcp_server = WazuhMCPServer(wazuh_client)
logger.info("¡Servidor listo! Esperando mensajes...")
logger.info("=" * 60)
# Manejadores de señales
def signal_handler(signum, frame):
logger.info(f"Señal {signum} recibida, cerrando...")
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Bucle principal de mensajes
try:
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await asyncio.get_event_loop().connect_read_pipe(lambda: protocol, sys.stdin)
while True:
try:
line_bytes = await reader.readline()
if not line_bytes:
logger.info("EOF - Cerrando...")
break
line = line_bytes.decode('utf-8').strip()
if not line:
continue
try:
message = json.loads(line)
except json.JSONDecodeError as e:
logger.error(f"JSON inválido: {e}")
continue
# Manejar mensaje
response = await mcp_server.handle_message(message)
# Enviar respuesta si existe
if response is not None:
print(json.dumps(response), flush=True)
except Exception as e:
logger.error(f"Error en mensaje: {e}")
except KeyboardInterrupt:
logger.info("Interrumpido por usuario")
except Exception as e:
logger.error(f"Error del servidor: {e}")
finally:
logger.info("Cerrando conexiones...")
await wazuh_client.close()
logger.info("Servidor detenido")
if __name__ == "__main__":
sys.exit(asyncio.run(main()) or 0)
Les dejo las dependencias, para instalar.
anyio==4.10.0
certifi==2025.8.3
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.10
sniffio==1.3.1
typing_extensions==4.15.0
⚙️ Configuración Cliente
La configuración en Claude.io es sencilla. Una vez que tenemos el cliente instalado, vamos a Configuración → Desarrollador y ahí editamos la configuración. En mi caso, es esta:
{
"mcpServers": {
"wazuh": {
"command": "/Users/santiago/Proyects/Wazuh-POC/venv/bin/python3",
"args": [
"-u",
"/Users/santiago/Proyects/Wazuh-POC/wazuh_poc.py",
"--wazuh-url", "https://IP_TUSERVER:55000",
"--username", "wazuh",
"--password", "CONTRASENA_API"
],
"env": {
"PYTHONUNBUFFERED": "1"
}
}
}
}
Reiniciamos el cliente de Claude y verificamos si se logró la conexión al servidor MCP y ¡Voilá!

Primero vamos a revisar nuestro servidor Wazuh, que tiene los dos clientes. Cabe destacar que el mismo servidor de Wazuh se autoreporta.

Ahora haremos búsquedas basadas en nuestras herramientas en el Servidor MCP. Le voy a consultar: ¿Cuántos agentes tengo conectados? y luego: ¿Me das información sobre el AD01?

El poder es ilimitado, solo queda probar y empezar a jugar con nuestro servidor MCP. Si quieres agregar más métodos, basta con revisar la documentación de la API de Wazuh. Espero que disfruten el proceso, como lo hice yo.
💼 Referencias
https://github.com/gbrigandi/mcp-server-wazuh




