Skip to main content

Command Palette

Search for a command to run...

Aumenta el Valor de tus logs a través de la IA

Vamos a crear un servidor MCP para interactuar con Wazuh y dejar que la Inteligencia Artificial haga su trabajo.

Updated
12 min read
Aumenta el Valor de tus logs a través de la IA

Hace un tiempo escribí sobre Wazuh y Active Directory, y la verdad es que recibí muchos mensajes. Seguro que escribiré más sobre eso, pero ahora me voy a centrar en algo que estuve investigando en internet. Me pregunté: ¿Y si conectamos Wazuh a una IA? Como siempre, mucha gente ya estaba trabajando en eso, así que vamos a hacer el paso a paso para lograrlo nosotros mismos.

Antes que nada, les recomiendo leer los artículos de referencia. En esta Prueba de Concepto, vamos a crear nuestro propio servidor MCP para que se vincule con la API de Wazuh, utilizando las siguientes herramientas. Luego, puedes agregar lo que consideres necesario.

📋 Herramientas disponibles:

  1. agents_summary - Resumen rápido de todos los agentes

  2. active_agents - Lista solo agentes activos

  3. search_agent - Buscar agentes por nombre o IP

  4. manager_info - Información del manager Wazuh

  5. agent_details - Detalles específicos de un agente

🛟 Arquitectura

Vamos a tener un servidor SIEM con dos maquinas reportando. Un servidor Ubuntu y uno Microsoft. Luego tendremos que configurar el cliente de Claude.io para que se conecte a mi servidor MCP que sera el encargado de dialogar con Wazuh y responder en base los datos que tenemos en nuestro Index.

🛜 Servidor MCP

Esta version, en Python, es algo muy acotado. Recomiendo ver las referencias, al pie del articulo, para tener todo el poder de esta clase de implementaciones.

#!/usr/bin/env python3
"""
Wazuh MCP Server
Servidor MCP (Model Context Protocol) para integración con Wazuh
"""

import asyncio
import json
import logging
import sys
import os
import argparse
import signal
import time
from typing import Any, Dict, List, Optional, Union
from datetime import datetime
from abc import ABC, abstractmethod

# Verificar dependencias
try:
    import httpx
except ImportError:
    print("Error: httpx no está instalado. Ejecuta: pip install httpx", file=sys.stderr)
    sys.exit(1)

# ============================================================================
# CONFIGURACIÓN Y UTILIDADES
# ============================================================================

def setup_unbuffered_streams():
    """Configurar streams sin buffer para mejor logging"""
    sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', buffering=1)
    sys.stderr = os.fdopen(sys.stderr.fileno(), 'w', buffering=1)


def setup_logging(debug: bool = False, log_name: str = "wazuh-mcp") -> logging.Logger:
    """
    Configurar sistema de logging

    Args:
        debug: Habilitar modo debug
        log_name: Nombre del logger

    Returns:
        Logger configurado
    """
    setup_unbuffered_streams()

    # Directorio y archivo de log
    log_dir = os.path.dirname(os.path.abspath(__file__))
    timestamp = datetime.now().strftime("%Y%m%d_%H%M")
    log_file = os.path.join(log_dir, f'{log_name}_{timestamp}.log')

    # Formatters
    file_formatter = logging.Formatter(
        '%(asctime)s | %(levelname)-8s | %(name)-15s | %(message)s',
        datefmt='%H:%M:%S'
    )
    console_formatter = logging.Formatter(
        '%(asctime)s | %(levelname)s | %(message)s',
        datefmt='%H:%M:%S'
    )

    # Handlers
    file_handler = logging.FileHandler(log_file)
    file_handler.setFormatter(file_formatter)

    console_handler = logging.StreamHandler(sys.stderr)
    console_handler.setFormatter(console_formatter)

    # Logger principal
    logger = logging.getLogger(log_name)
    logger.setLevel(logging.DEBUG if debug else logging.INFO)
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)

    logger.info(f"Logging inicializado - Archivo: {log_file}")
    return logger


# ============================================================================
# EXCEPCIONES PERSONALIZADAS
# ============================================================================

class WazuhMCPException(Exception):
    """Excepción base para Wazuh MCP Server"""
    pass


class AuthenticationError(WazuhMCPException):
    """Error de autenticación"""
    pass


class APIError(WazuhMCPException):
    """Error en petición API"""
    def __init__(self, message: str, status_code: int = None, response_text: str = None):
        super().__init__(message)
        self.status_code = status_code
        self.response_text = response_text


class ToolExecutionError(WazuhMCPException):
    """Error en ejecución de herramienta"""
    pass


# ============================================================================
# CLIENTE WAZUH
# ============================================================================

class WazuhClient:
    """Cliente asíncrono para la API de Wazuh"""

    def __init__(self, wazuh_url: str, username: str, password: str):
        self.wazuh_url = wazuh_url.rstrip('/')
        self.username = username
        self.password = password
        self.token: Optional[str] = None
        self.logger = logging.getLogger("wazuh-client")

        # Cliente HTTP optimizado
        self.client = httpx.AsyncClient(
            verify=False,
            timeout=httpx.Timeout(15.0, connect=5.0),
            limits=httpx.Limits(max_keepalive_connections=5, max_connections=10)
        )

        self.logger.info(f"WazuhClient inicializado para {self.wazuh_url}")

    async def authenticate(self) -> bool:
        """
        Autenticar con la API de Wazuh

        Returns:
            True si la autenticación es exitosa

        Raises:
            AuthenticationError: Si la autenticación falla
        """
        try:
            start_time = time.time()
            auth_url = f"{self.wazuh_url}/security/user/authenticate"
            self.logger.info(f"Autenticando en: {auth_url}")

            auth = httpx.BasicAuth(self.username, self.password)
            response = await self.client.get(
                auth_url,
                auth=auth,
                headers={"Content-Type": "application/json"}
            )

            elapsed = (time.time() - start_time) * 1000
            self.logger.info(f"Respuesta de auth en {elapsed:.0f}ms - Status: {response.status_code}")

            if response.status_code != 200:
                error_msg = f"Autenticación fallida: {response.text[:200]}"
                self.logger.error(error_msg)
                raise AuthenticationError(error_msg)

            data = response.json()
            self.token = data['data']['token']
            self.logger.info("Autenticación exitosa")
            return True

        except httpx.RequestError as e:
            error_msg = f"Error en petición de auth: {e}"
            self.logger.error(error_msg)
            raise AuthenticationError(error_msg)
        except Exception as e:
            error_msg = f"Error de autenticación: {e}"
            self.logger.error(error_msg)
            raise AuthenticationError(error_msg)

    async def make_request(
        self, 
        endpoint: str, 
        method: str = "GET", 
        params: Optional[Dict] = None,
        retry_auth: bool = True
    ) -> Dict[str, Any]:
        """
        Realizar petición autenticada a la API de Wazuh

        Args:
            endpoint: Endpoint de la API
            method: Método HTTP
            params: Parámetros de la petición
            retry_auth: Si reintentar con nueva auth en 401

        Returns:
            Datos de respuesta

        Raises:
            APIError: Si la petición falla
        """
        if not self.token:
            self.logger.info("Sin token, autenticando...")
            await self.authenticate()

        headers = {"Authorization": f"Bearer {self.token}"}
        url = f"{self.wazuh_url}{endpoint}"
        start_time = time.time()

        self.logger.debug(f"{method} {endpoint}")

        try:
            response = await self._execute_request(method, url, headers, params)
            elapsed = (time.time() - start_time) * 1000
            self.logger.info(f"{endpoint} - {elapsed:.0f}ms")

            response.raise_for_status()
            return response.json()

        except httpx.HTTPStatusError as e:
            if e.response.status_code == 401 and retry_auth:
                self.logger.warning("Token expirado, re-autenticando...")
                await self.authenticate()
                headers = {"Authorization": f"Bearer {self.token}"}
                response = await self._execute_request(method, url, headers, params)
                response.raise_for_status()
                return response.json()

            error_msg = f"HTTP Error {e.response.status_code}: {e}"
            self.logger.error(error_msg)
            raise APIError(error_msg, e.response.status_code, e.response.text)

        except httpx.RequestError as e:
            error_msg = f"Error de petición: {e}"
            self.logger.error(error_msg)
            raise APIError(error_msg)

    async def _execute_request(
        self, 
        method: str, 
        url: str, 
        headers: Dict, 
        params: Optional[Dict]
    ) -> httpx.Response:
        """Ejecutar petición HTTP"""
        if method.upper() == "GET":
            return await self.client.get(url, headers=headers, params=params)
        elif method.upper() == "POST":
            return await self.client.post(url, headers=headers, json=params)
        else:
            return await self.client.request(method, url, headers=headers, params=params)

    async def close(self):
        """Cerrar el cliente HTTP"""
        await self.client.aclose()
        self.logger.info("Cliente cerrado")

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()


# ============================================================================
# HERRAMIENTAS BASE
# ============================================================================

class BaseTool(ABC):
    """Clase base para herramientas MCP"""

    def __init__(self, wazuh_client: WazuhClient):
        self.wazuh_client = wazuh_client
        self.logger = logging.getLogger(f"tool-{self.name}")

    @property
    @abstractmethod
    def name(self) -> str:
        """Nombre de la herramienta"""
        pass

    @property
    @abstractmethod
    def description(self) -> str:
        """Descripción de la herramienta"""
        pass

    @property
    @abstractmethod
    def input_schema(self) -> Dict[str, Any]:
        """Schema de entrada de la herramienta"""
        pass

    @abstractmethod
    async def execute(self, arguments: Dict[str, Any]) -> str:
        """
        Ejecutar la herramienta

        Args:
            arguments: Argumentos de la herramienta

        Returns:
            Salida de la herramienta como string
        """
        pass

    def to_dict(self) -> Dict[str, Any]:
        """Convertir herramienta a definición MCP"""
        return {
            "name": self.name,
            "description": self.description,
            "inputSchema": self.input_schema
        }


# ============================================================================
# HERRAMIENTAS ESPECÍFICAS
# ============================================================================

class AgentsSummaryTool(BaseTool):
    """Obtener resumen rápido de todos los agentes Wazuh"""

    @property
    def name(self) -> str:
        return "agents_summary"

    @property
    def description(self) -> str:
        return "Obtener resumen rápido de todos los agentes Wazuh"

    @property
    def input_schema(self) -> Dict[str, Any]:
        return {
            "type": "object",
            "properties": {},
            "additionalProperties": False
        }

    async def execute(self, arguments: Dict[str, Any]) -> str:
        result = await self.wazuh_client.make_request(
            "/agents",
            params={"limit": 100, "select": "id,name,status,ip,os.name"}
        )

        agents = result.get('data', {}).get('affected_items', [])
        total = result.get('data', {}).get('total_affected_items', 0)

        # Contar por estado
        active = sum(1 for a in agents if a.get('status') == 'active')
        disconnected = sum(1 for a in agents if a.get('status') == 'disconnected')
        never = sum(1 for a in agents if a.get('status') == 'never_connected')

        response = "🔍 **Resumen de Agentes Wazuh**\n\n"
        response += f"**Total:** {total} agentes\n\n"
        response += f"**Estado:**\n"
        response += f"• Activos: {active}\n"
        response += f"• Desconectados: {disconnected}\n"
        response += f"• Nunca conectados: {never}\n"

        if active > 0:
            response += f"\n**Muestra de Agentes Activos:**\n"
            for agent in [a for a in agents if a.get('status') == 'active'][:5]:
                response += f"• {agent['name']} ({agent['id']}) - {agent['ip']}\n"

        return response


class ActiveAgentsTool(BaseTool):
    """Listar solo agentes activos"""

    @property
    def name(self) -> str:
        return "active_agents"

    @property
    def description(self) -> str:
        return "Listar solo agentes activos"

    @property
    def input_schema(self) -> Dict[str, Any]:
        return {
            "type": "object",
            "properties": {
                "limit": {
                    "type": "integer",
                    "description": "Máximo número de agentes a retornar (default 10)",
                    "default": 10
                }
            },
            "additionalProperties": False
        }

    async def execute(self, arguments: Dict[str, Any]) -> str:
        limit = arguments.get('limit', 10)
        result = await self.wazuh_client.make_request(
            "/agents",
            params={"status": "active", "limit": limit}
        )

        agents = result.get('data', {}).get('affected_items', [])

        response = f"🟢 **Agentes Activos** ({len(agents)} mostrados)\n\n"
        for agent in agents:
            response += f"**{agent.get('name')}** (ID: {agent.get('id')})\n"
            response += f"• IP: {agent.get('ip')}\n"
            response += f"• OS: {agent.get('os', {}).get('name', 'Desconocido')}\n"
            response += f"• Versión: {agent.get('version')}\n\n"

        return response


class SearchAgentTool(BaseTool):
    """Buscar agentes por nombre o IP"""

    @property
    def name(self) -> str:
        return "search_agent"

    @property
    def description(self) -> str:
        return "Buscar agentes por nombre o IP"

    @property
    def input_schema(self) -> Dict[str, Any]:
        return {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "Término de búsqueda"
                }
            },
            "required": ["query"],
            "additionalProperties": False
        }

    async def execute(self, arguments: Dict[str, Any]) -> str:
        query = arguments.get('query')
        result = await self.wazuh_client.make_request(
            "/agents",
            params={"search": query}
        )

        agents = result.get('data', {}).get('affected_items', [])

        if agents:
            response = f"🔍 **Resultados de búsqueda para '{query}':**\n\n"
            for agent in agents:
                status_emoji = "🟢" if agent.get('status') == 'active' else "🔴"
                response += f"{status_emoji} **{agent.get('name')}**\n"
                response += f"• ID: {agent.get('id')}\n"
                response += f"• Estado: {agent.get('status')}\n"
                response += f"• IP: {agent.get('ip')}\n\n"
        else:
            response = f"❌ No se encontraron agentes que coincidan con '{query}'"

        return response


class ManagerInfoTool(BaseTool):
    """Obtener información del manager Wazuh"""

    @property
    def name(self) -> str:
        return "manager_info"

    @property
    def description(self) -> str:
        return "Obtener información del manager Wazuh"

    @property
    def input_schema(self) -> Dict[str, Any]:
        return {
            "type": "object",
            "properties": {},
            "additionalProperties": False
        }

    async def execute(self, arguments: Dict[str, Any]) -> str:
        result = await self.wazuh_client.make_request("/manager/info")
        info = result.get('data', {}).get('affected_items', [{}])[0]

        response = "ℹ️ **Información del Manager Wazuh**\n\n"
        response += f"**Versión:** {info.get('version', 'N/A')}\n"
        response += f"**Instalación:** {info.get('installation_date', 'N/A')}\n"
        response += f"**Tipo:** {info.get('type', 'N/A')}\n"

        # Intentar obtener estado también
        try:
            status_result = await self.wazuh_client.make_request("/manager/status")
            status = status_result.get('data', {}).get('affected_items', [{}])[0]
            response += f"\n**Estado:**\n"
            for key, value in status.items():
                response += f"• {key}: {value}\n"
        except Exception:
            pass

        return response


class AgentDetailsTool(BaseTool):
    """Obtener información detallada de un agente específico"""

    @property
    def name(self) -> str:
        return "agent_details"

    @property
    def description(self) -> str:
        return "Obtener información detallada de un agente específico"

    @property
    def input_schema(self) -> Dict[str, Any]:
        return {
            "type": "object",
            "properties": {
                "agent_id": {
                    "type": "string",
                    "description": "ID del agente"
                }
            },
            "required": ["agent_id"],
            "additionalProperties": False
        }

    async def execute(self, arguments: Dict[str, Any]) -> str:
        agent_id = arguments.get('agent_id')
        result = await self.wazuh_client.make_request(f"/agents/{agent_id}")

        if result.get('data', {}).get('affected_items'):
            agent = result['data']['affected_items'][0]
            status_emoji = "🟢" if agent.get('status') == 'active' else "🔴"

            response = f"{status_emoji} **Detalles del Agente {agent_id}**\n\n"
            response += f"**Nombre:** {agent.get('name')}\n"
            response += f"**Estado:** {agent.get('status')}\n"
            response += f"**IP:** {agent.get('ip')}\n"
            response += f"**OS:** {agent.get('os', {}).get('name', 'N/A')} {agent.get('os', {}).get('version', '')}\n"
            response += f"**Versión:** {agent.get('version')}\n"
            response += f"**Registro:** {agent.get('dateAdd')}\n"
            response += f"**Última conexión:** {agent.get('lastKeepAlive')}\n"
        else:
            response = f"❌ Agente {agent_id} no encontrado"

        return response


# ============================================================================
# REGISTRO DE HERRAMIENTAS
# ============================================================================

class ToolRegistry:
    """Registro para gestionar herramientas"""

    def __init__(self):
        self._tools: Dict[str, BaseTool] = {}
        self.logger = logging.getLogger("tool-registry")

    def register(self, tool: BaseTool):
        """Registrar una herramienta"""
        self._tools[tool.name] = tool
        self.logger.info(f"Herramienta registrada: {tool.name}")

    def get_tool(self, name: str) -> BaseTool:
        """Obtener herramienta por nombre"""
        if name not in self._tools:
            raise ToolExecutionError(f"Herramienta desconocida: {name}")
        return self._tools[name]

    def list_tools(self) -> List[Dict[str, Any]]:
        """Listar todas las herramientas registradas"""
        return [tool.to_dict() for tool in self._tools.values()]

    async def execute_tool(self, name: str, arguments: Dict[str, Any]) -> str:
        """Ejecutar herramienta por nombre"""
        tool = self.get_tool(name)
        try:
            self.logger.info(f"Ejecutando herramienta: {name}")
            result = await tool.execute(arguments)
            self.logger.info(f"Herramienta {name} completada exitosamente")
            return result
        except Exception as e:
            self.logger.error(f"Herramienta {name} falló: {e}")
            raise ToolExecutionError(f"Herramienta {name} falló: {e}")


# ============================================================================
# SERVIDOR MCP
# ============================================================================

class WazuhMCPServer:
    """Servidor MCP para Wazuh"""

    def __init__(self, wazuh_client: WazuhClient):
        self.wazuh_client = wazuh_client
        self.message_count = 0
        self.tool_registry = ToolRegistry()
        self.logger = logging.getLogger("mcp-server")

        # Registrar herramientas
        self._register_tools()

        self.logger.info("Servidor MCP inicializado")

    def _register_tools(self):
        """Registrar todas las herramientas disponibles"""
        tools = [
            AgentsSummaryTool(self.wazuh_client),
            ActiveAgentsTool(self.wazuh_client),
            SearchAgentTool(self.wazuh_client),
            ManagerInfoTool(self.wazuh_client),
            AgentDetailsTool(self.wazuh_client)
        ]

        for tool in tools:
            self.tool_registry.register(tool)

    async def handle_message(self, message: Dict) -> Optional[Dict]:
        """
        Manejar mensajes MCP entrantes

        Args:
            message: Mensaje MCP

        Returns:
            Respuesta MCP o None
        """
        self.message_count += 1
        method = message.get("method")
        params = message.get("params", {})
        msg_id = message.get("id")

        self.logger.info(f"[{self.message_count}] Procesando: {method}")

        try:
            if method == "initialize":
                self.logger.info("Inicializando servidor...")
                return {
                    "jsonrpc": "2.0",
                    "id": msg_id,
                    "result": {
                        "protocolVersion": "2024-11-05",
                        "capabilities": {"tools": {}},
                        "serverInfo": {
                            "name": "wazuh-mcp",
                            "version": "2.0.0"
                        }
                    }
                }

            elif method == "notifications/initialized":
                self.logger.info("Servidor inicializado")
                return None

            elif method == "tools/list":
                self.logger.info("Listando herramientas...")
                return {
                    "jsonrpc": "2.0",
                    "id": msg_id,
                    "result": {
                        "tools": self.tool_registry.list_tools()
                    }
                }

            elif method == "tools/call":
                return await self.handle_tool_call(params, msg_id)

            else:
                self.logger.warning(f"Método desconocido: {method}")
                if msg_id:
                    return {
                        "jsonrpc": "2.0",
                        "id": msg_id,
                        "error": {
                            "code": -32601,
                            "message": f"Método no encontrado: {method}"
                        }
                    }
                return None

        except Exception as e:
            self.logger.error(f"Error en {method}: {e}")
            if msg_id:
                return {
                    "jsonrpc": "2.0",
                    "id": msg_id,
                    "error": {
                        "code": -32603,
                        "message": str(e)
                    }
                }
            return None

    async def handle_tool_call(self, params: Dict, msg_id: str) -> Dict:
        """
        Manejar llamadas a herramientas

        Args:
            params: Parámetros de la llamada
            msg_id: ID del mensaje

        Returns:
            Respuesta MCP
        """
        tool_name = params.get("name")
        arguments = params.get("arguments", {})

        self.logger.info(f"Herramienta: {tool_name}")

        try:
            response = await self.tool_registry.execute_tool(tool_name, arguments)

            return {
                "jsonrpc": "2.0",
                "id": msg_id,
                "result": {
                    "content": [
                        {
                            "type": "text",
                            "text": response
                        }
                    ]
                }
            }

        except ToolExecutionError as e:
            return {
                "jsonrpc": "2.0",
                "id": msg_id,
                "error": {
                    "code": -32601,
                    "message": str(e)
                }
            }
        except Exception as e:
            self.logger.error(f"Error en herramienta: {e}")
            return {
                "jsonrpc": "2.0",
                "id": msg_id,
                "error": {
                    "code": -32603,
                    "message": str(e)
                }
            }


# ============================================================================
# FUNCIÓN PRINCIPAL
# ============================================================================

async def main():
    """Función principal del servidor"""
    parser = argparse.ArgumentParser(description="Wazuh MCP Server v2.0 - Organizado")
    parser.add_argument("--wazuh-url", required=True, help="URL de la API de Wazuh")
    parser.add_argument("--username", required=True, help="Usuario de Wazuh")
    parser.add_argument("--password", required=True, help="Contraseña de Wazuh")
    parser.add_argument("--debug", action="store_true", help="Habilitar logging debug")

    args = parser.parse_args()

    # Configurar logging
    logger = setup_logging(args.debug)

    logger.info("=" * 60)
    logger.info(" WAZUH MCP SERVER v2.0 - ORGANIZADO")
    logger.info("=" * 60)
    logger.info(f" URL: {args.wazuh_url}")
    logger.info(f" Usuario: {args.username}")
    logger.info("=" * 60)

    # Inicializar cliente Wazuh
    wazuh_client = WazuhClient(args.wazuh_url, args.username, args.password)

    # Probar autenticación
    logger.info("Probando autenticación...")
    try:
        if not await wazuh_client.authenticate():
            logger.error("¡Autenticación fallida!")
            await wazuh_client.close()
            return 1
    except Exception as e:
        logger.error(f"Error de autenticación: {e}")
        await wazuh_client.close()
        return 1

    # Inicializar servidor MCP
    mcp_server = WazuhMCPServer(wazuh_client)

    logger.info("¡Servidor listo! Esperando mensajes...")
    logger.info("=" * 60)

    # Manejadores de señales
    def signal_handler(signum, frame):
        logger.info(f"Señal {signum} recibida, cerrando...")
        sys.exit(0)

    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)

    # Bucle principal de mensajes
    try:
        reader = asyncio.StreamReader()
        protocol = asyncio.StreamReaderProtocol(reader)
        await asyncio.get_event_loop().connect_read_pipe(lambda: protocol, sys.stdin)

        while True:
            try:
                line_bytes = await reader.readline()
                if not line_bytes:
                    logger.info("EOF - Cerrando...")
                    break

                line = line_bytes.decode('utf-8').strip()
                if not line:
                    continue

                try:
                    message = json.loads(line)
                except json.JSONDecodeError as e:
                    logger.error(f"JSON inválido: {e}")
                    continue

                # Manejar mensaje
                response = await mcp_server.handle_message(message)

                # Enviar respuesta si existe
                if response is not None:
                    print(json.dumps(response), flush=True)

            except Exception as e:
                logger.error(f"Error en mensaje: {e}")

    except KeyboardInterrupt:
        logger.info("Interrumpido por usuario")
    except Exception as e:
        logger.error(f"Error del servidor: {e}")
    finally:
        logger.info("Cerrando conexiones...")
        await wazuh_client.close()
        logger.info("Servidor detenido")


if __name__ == "__main__":
    sys.exit(asyncio.run(main()) or 0)

Les dejo las dependencias, para instalar.

anyio==4.10.0
certifi==2025.8.3
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.10
sniffio==1.3.1
typing_extensions==4.15.0

⚙️ Configuración Cliente

La configuración en Claude.io es sencilla. Una vez que tenemos el cliente instalado, vamos a Configuración → Desarrollador y ahí editamos la configuración. En mi caso, es esta:

{
  "mcpServers": {
    "wazuh": {
      "command": "/Users/santiago/Proyects/Wazuh-POC/venv/bin/python3",
      "args": [
        "-u",
        "/Users/santiago/Proyects/Wazuh-POC/wazuh_poc.py",
        "--wazuh-url", "https://IP_TUSERVER:55000",
        "--username", "wazuh",
        "--password", "CONTRASENA_API"
      ],
      "env": {
        "PYTHONUNBUFFERED": "1"
      }
    }
  }
}
💡
En mi caso cree un entorno virtual, para la instalación de dependencias. Por eso ese path.

Reiniciamos el cliente de Claude y verificamos si se logró la conexión al servidor MCP y ¡Voilá!

Primero vamos a revisar nuestro servidor Wazuh, que tiene los dos clientes. Cabe destacar que el mismo servidor de Wazuh se autoreporta.

Ahora haremos búsquedas basadas en nuestras herramientas en el Servidor MCP. Le voy a consultar: ¿Cuántos agentes tengo conectados? y luego: ¿Me das información sobre el AD01?

El poder es ilimitado, solo queda probar y empezar a jugar con nuestro servidor MCP. Si quieres agregar más métodos, basta con revisar la documentación de la API de Wazuh. Espero que disfruten el proceso, como lo hice yo.

💼 Referencias

https://github.com/gbrigandi/mcp-server-wazuh

https://github.com/gensecaihq/Wazuh-MCP-Server

https://www.youtube.com/watch?v=b7aqfI8eLOI&t=657s