Skip to main content

Command Palette

Search for a command to run...

La Batalla perdida de la Clasificación de la Información, primera parte.

Te muestro como buscar PII, PCI & mas en nuestra infra.

Updated
10 min read
La Batalla perdida de la Clasificación de la Información, primera parte.

Introducción

La clasificación de datos, esa práctica noble de etiquetar documentos, bases de datos y buckets según su sensibilidad, fue pensada para dar control. Pero en la era de la nube y el desarrollo ágil, la clasificación se volvió una ilusión. Los equipos crean recursos efímeros; los pipelines generan snapshots y backups; los scripts y lambdas almacenan datos temporales. El resultado: PII y PCI dispersas en lugares que nadie revisó.

En lugar de intentar imponer una clasificación perfecta (una batalla perdida), proponemos otro enfoque: encontrar lo que importa. Descubrir automáticamente las superficies que contienen datos sensibles, priorizar por riesgo y aplicar controles proporcionalmente. Esto no solo es pragmático: es la única estrategia viable para reducir la exposición real en ambientes dinámicos.

En la prueba de concepto que desarrollé, combiné LocalStack para emular AWS S3, un contenedor MySQL y un escáner como Hawk Eye. Una manera de evidenciar archivos y tablas con PII/PCI que no figuraban en ningún inventario. A grandes rasgos, Hawk Eye realiza el descubrimiento, luego los hallazgos son clasificados, se revisa si es algo ya evidenciado o no y eso se impacta en The Hive para un seguimiento.

Arquitectura

Aca esta nuestro docker compose, todo nuestro stack.

services:
  localstack:
    image: localstack/localstack:2.2
    container_name: localstack
    environment:
      - SERVICES=s3
      - DEFAULT_REGION=us-east-1
      - DEBUG=1
    ports:
      - "4566:4566"
    volumes:
      - "./localstack_data:/tmp/localstack"
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:4566/_localstack/health"]
      interval: 10s
      timeout: 5s
      retries: 5
    networks:
      - hawk-network

  hawk-mysql:
    image: mysql:8.0
    container_name: hawk-mysql 
    environment:
      MYSQL_ROOT_PASSWORD: rootpassword
      MYSQL_DATABASE: pocdb
    ports:
      - "3306:3306"
    volumes:
      - mysql_data:/var/lib/mysql
    healthcheck:
      test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-u", "root", "-prootpassword"]
      interval: 10s
      timeout: 5s
      retries: 5
    networks:
      - hawk-network

  cassandra:
    image: cassandra:4.1
    container_name: cassandra
    environment:
      - MAX_HEAP_SIZE=1G
      - HEAP_NEWSIZE=256M
      - CASSANDRA_CLUSTER_NAME=thehive
    volumes:
      - cassandra_data:/var/lib/cassandra
    networks:
      - hawk-network
    healthcheck:
      test: ["CMD-SHELL", "cqlsh -e 'describe cluster' || exit 1"]
      interval: 30s
      timeout: 10s
      retries: 10
      start_period: 120s

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.17.9
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - cluster.name=thehive
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - elasticsearch_data:/usr/share/elasticsearch/data
    networks:
      - hawk-network
    healthcheck:
      test: ["CMD-SHELL", "curl -f http://localhost:9200/_cluster/health || exit 1"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 60s

  thehive:
    image: strangebee/thehive:5.0
    platform: linux/amd64   
    container_name: thehive
    depends_on:
      cassandra:
        condition: service_healthy
      elasticsearch:
        condition: service_healthy
    ports:
      - "9000:9000"
    environment:
      - JVM_OPTS=-Xms1G -Xmx1G
    volumes:
      - thehive_data:/opt/thehive/data
      - ./thehive-config/application.conf:/etc/thehive/application.conf:ro
    networks:
      - hawk-network
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/api/v1/status"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 180s

  hawk-scanner:
    build: .
    container_name: hawk-scanner
    platform: linux/amd64
    depends_on:
      hawk-mysql:
        condition: service_healthy
      localstack:
        condition: service_healthy
      thehive:
        condition: service_started
    environment:
      - AWS_ACCESS_KEY_ID=test
      - AWS_SECRET_ACCESS_KEY=test
      - AWS_ENDPOINT_URL=http://localstack:4566
      - AWS_DEFAULT_REGION=us-east-1
      - PYTHONIOENCODING=utf-8
      - LANG=C.UTF-8
      - LC_ALL=C.UTF-8
    volumes:
      - ./alerts:/app/alerts
      - ./hawk-scanner/connection.yml:/app/connection.yml
      - ./hawk-scanner/fingerprint.yml:/app/fingerprint.yml
      - ./hawk-scanner/data:/app/data
    command: tail -f /dev/null
    networks:
      - hawk-network

networks:
  hawk-network:
    driver: bridge

volumes:
  mysql_data:
  cassandra_data:
  elasticsearch_data:
  thehive_data:

Hawk Scanner

Primero el Dockerfile para la creación del contenedor que contendrá Hawk Eye.

# Dockerfile corregido
FROM python:3.11-slim

ENV DEBIAN_FRONTEND=noninteractive \
    PYTHONUNBUFFERED=1

WORKDIR /app

# Dependencias del sistema
RUN apt-get update \
 && apt-get install -y --no-install-recommends \
    curl \
    git \
    netcat-openbsd \
    build-essential \
    libgl1 \
    libglx-mesa0 \
    libglib2.0-0 \
 && rm -rf /var/lib/apt/lists/*

# Instalar dependencias Python
COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt

# Copiar todo el directorio del scanner
COPY hawk-scanner /app/

# Crear carpetas necesarias
RUN mkdir -p /app/alerts /app/data \
 && chmod -R a+rX /app

# WORKDIR ya es /app (donde están los archivos yml)
CMD ["python", "run_hawk_scanner.py"]

Vamos a configurar las conexiones para que Hawk Eye pueda hacer la investigación. En mi caso me propuse investigar AWS S3 y una BBDD MySQL.

# connection.yml
# Configuración de conexiones para Hawk-eye Scanner
# Este archivo define las fuentes de datos a escanear

notify:
  redacted: true  # Redactar datos sensibles en las notificaciones
  suppress_duplicates: true  # Suprimir alertas duplicadas

  # Opcional: Configurar webhook de Slack para notificaciones
  # slack:
  #   webhook_url: https://hooks.slack.com/services/YOUR/WEBHOOK/URL
  #   mention: "<@USERID>"  # Opcional: mencionar usuario/bot en alertas

sources:
  # ==========================================
  # CONFIGURACIÓN MYSQL
  # ==========================================
  mysql:
    poc_mysql:
      host: hawk-mysql
      port: 3306
      user: pocuser
      password: pocpassword
      database: pocdb
      limit_start: 0
      limit_end: 10000
      # Opcional: especificar tablas específicas
      # tables:
      #   - payments
      #   - users
      # Opcional: excluir columnas
      # exclude_columns:
      #   - id
      #   - created_at

  # ==========================================
  # CONFIGURACIÓN S3 (LOCALSTACK)
  # ==========================================
  s3:
    poc_s3:
      access_key: test
      secret_key: test
      bucket_name: poc-bucket
      endpoint_url: http://localstack:4566  # LocalStack endpoint
      cache: false
      # Opcional: patrones a excluir
      # exclude_patterns:
      #   - .log
      #   - .tmp

  # ==========================================
  # CONFIGURACIÓN FILESYSTEM (Opcional)
  # ==========================================
  # fs:
  #   local_scan:
  #     path: /app/test-data
  #     exclude_patterns:
  #       - .git
  #       - node_modules
  #       - venv
  #       - __pycache__

  # ==========================================
  # CONFIGURACIÓN REDIS (Opcional)
  # ==========================================
  # redis:
  #   poc_redis:
  #     host: redis
  #     port: 6379
  #     password: your_redis_password

  # ==========================================
  # CONFIGURACIÓN POSTGRESQL (Opcional)
  # ==========================================
  # postgresql:
  #   poc_postgres:
  #     host: postgres
  #     port: 5432
  #     user: postgres
  #     password: postgres
  #     database: testdb
  #     limit_start: 0
  #     limit_end: 1000

Y ahora lo mas importante ¿Que estamos buscando? Para ello generamos fingerprint.yml.

"Credit Card - Visa": '\b4[0-9]{12}(?:[0-9]{3})?\b'
"Credit Card - Mastercard": '\b5[1-5][0-9]{14}\b'
"Credit Card - American Express": '\b3[47][0-9]{13}\b'
"Credit Card - Discover": '\b6(?:011|5[0-9]{2})[0-9]{12}\b'
"Social Security Number (SSN)": '\b\d{3}-\d{2}-\d{4}\b'
"Email Address": '\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
"Phone Number - US": '\b(?:\+?1[-.]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b'
"Phone Number - International": '\b\+\d{1,3}[-.\s]?\d{1,4}[-.\s]?\d{1,4}[-.\s]?\d{1,9}\b'
"AWS Access Key": '\b(AKIA|A3T|AGPA|AIDA|AROA|AIPA|ANPA|ANVA|ASIA)[A-Z0-9]{16}\b'
"AWS Secret Key": '\b[A-Za-z0-9/+=]{40}\b'
"Private Key": '-----BEGIN (RSA|DSA|EC|OPENSSH|PGP) PRIVATE KEY-----'
"Generic Password": "(?i)(password|pwd|passwd)\\s*[=:]\\s*\\S{4,}"
"API Key": "(?i)(api[_-]?key|apikey)\\s*[=:]\\s*[A-Za-z0-9_\\-]{20,}"
"JWT Token": '\beyJ[A-Za-z0-9_-]*\.eyJ[A-Za-z0-9_-]*\.[A-Za-z0-9_-]*\b'
"IP Address - Private": '\b(?:10\.\d{1,3}\.\d{1,3}\.\d{1,3}|172\.(?:1[6-9]|2[0-9]|3[01])\.\d{1,3}\.\d{1,3}|192\.168\.\d{1,3}\.\d{1,3})\b'
"IBAN": '\b[A-Z]{2}\d{2}[A-Z0-9]{4}\d{7}([A-Z0-9]?){0,16}\b'
"Bitcoin Address": '\b[13][a-km-zA-HJ-NP-Z1-9]{25,34}\b'
"URL with Credentials": '(?i)https?://[^:]+:[^@]+@[^\s]+'

¿Qué hace?

  • Define 15+ patrones regex para detectar datos sensibles

  • Incluye: tarjetas (Visa, MC, Amex), SSN, emails, teléfonos, AWS keys, JWT, passwords, IPs, etc.Cada

Generación Datos Dummies

Vamos a crear un entorno virtual para ejecutar el generador de datos. Lo encontrarás en la carpeta data.

python3 -m venv env
source env/bin/activate
# Instalar dependencias
pip3 install -r requirements.txt
# Correr el generador
> python3 generar_datos.py
============================================================
Generando datos de prueba PCI/PII
============================================================

[1] Conectando a MySQL...
[2] Creando tabla de pagos...
[3] Insertando datos PCI (tarjetas de crédito)...
    4 tarjetas insertadas

[4] Generando PDF con información PII...
    PDF generado

[5] Conectando a S3 (LocalStack)...
[6] Creando bucket...
    Bucket 'poc-bucket' creado
[7] Subiendo PDF con datos sensibles...
    PDF subido: s3://poc-bucket/hr/empleados_confidencial.pdf
[8] Subiendo archivo de texto con más PII...
    Archivo TXT subido: s3://poc-bucket/contacts/internal_directory.txt

============================================================
 DATOS DE PRUEBA GENERADOS EXITOSAMENTE
============================================================

Resumen:
   MySQL tabla 'payments': 4 registros con tarjetas
   S3 archivo PDF: 3 empleados con SSN y tarjetas
   S3 archivo TXT: 3 contactos con datos sensibles

💡 Ahora ejecuta el scanner con: docker exec -it hawk-scanner python run_hawk_scanner.py

Vamos a revisar la data en MySQL

Perfecto, ahora listamos el S3 Bucket donde vemos las dos carpetas, con información, que habíamos generado.

Vamos a revisar nuestro run_hawk_scanner.py. Script orquestador que automatiza el proceso completo de escaneo de seguridad, consolidación de resultados y generación de reportes. Ahora vamos a correr nuestra Aguila. En mi caso desde el docker.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import subprocess
import json
import os
import sys
from datetime import datetime
from collections import Counter
from severity_classifier import reclassify_findings, get_critical_findings
from alert_manager import AlertManager
from thehive_integration import TheHiveIntegration

# Directorios
ALERTS_DIR = "/app/alerts"
RESULTS_DIR = "/app/alerts"

os.makedirs(ALERTS_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

def run_scan(source_type, output_file):
    print(f"🔍 Escaneando {source_type}...")
    cmd = [
        "hawk_scanner",
        source_type,
        "--connection", "connection.yml",
        "--fingerprint", "fingerprint.yml",
        "--json", output_file
    ]

    try:
        result = subprocess.run(cmd, capture_output=True, text=True)
        if result.returncode == 0:
            print(f"✅ {source_type} completado: {output_file}")
            return True
        else:
            print(f"❌ Error en {source_type}:")
            print(result.stderr)
            return False
    except Exception as e:
        print(f"❌ Excepción en {source_type}: {e}")
        return False

def consolidate_results(mysql_file, s3_file, output_file):
    all_results = []

    if os.path.exists(mysql_file):
        with open(mysql_file, 'r') as f:
            mysql_data = json.load(f)
            if isinstance(mysql_data, dict):
                for key, findings in mysql_data.items():
                    if isinstance(findings, list):
                        all_results.extend(findings)
            elif isinstance(mysql_data, list):
                all_results.extend(mysql_data)

    if os.path.exists(s3_file):
        with open(s3_file, 'r') as f:
            s3_data = json.load(f)
            if isinstance(s3_data, dict):
                for key, findings in s3_data.items():
                    if isinstance(findings, list):
                        all_results.extend(findings)
            elif isinstance(s3_data, list):
                all_results.extend(s3_data)

    all_results = reclassify_findings(all_results)

    with open(output_file, 'w') as f:
        json.dump(all_results, f, indent=2)

    print(f"📊 Resultados consolidados: {len(all_results)} hallazgos")
    return all_results

def display_findings(results):
    if not results:
        print("\n✅ No se detectaron hallazgos de seguridad")
        return

    print(f"\n{'='*70}")
    print(f"🔍 HALLAZGOS DETECTADOS")
    print(f"{'='*70}")

    by_severity = {}
    for r in results:
        severity = r.get('severity', 'Unknown')
        if severity not in by_severity:
            by_severity[severity] = []
        by_severity[severity].append(r)

    severity_order = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW', 'Unknown']
    severity_icons = {
        'CRITICAL': '🔴',
        'HIGH': '🟠', 
        'MEDIUM': '🟡',
        'LOW': '🟢',
        'Unknown': '⚪'
    }

    for severity in severity_order:
        if severity not in by_severity:
            continue

        findings = by_severity[severity]
        icon = severity_icons.get(severity, '⚪')

        print(f"\n{icon} {severity} - {len(findings)} hallazgos")
        print("-" * 70)

        max_display = len(findings) if severity == 'CRITICAL' else min(5, len(findings))

        for i, finding in enumerate(findings[:max_display], 1):
            print(f"\n  [{i}] {finding.get('pattern_name', 'Unknown Pattern')}")
            print(f"      Fuente: {finding.get('data_source', 'unknown')}")

            if finding.get('data_source') == 'mysql':
                print(f"      Base de datos: {finding.get('database', 'N/A')}")
                print(f"      Tabla: {finding.get('table', 'N/A')}")
                print(f"      Columna: {finding.get('column', 'N/A')}")
            elif finding.get('data_source') == 's3':
                print(f"      Bucket: {finding.get('bucket', 'N/A')}")
                print(f"      Archivo: {finding.get('file_path', 'N/A')}")

            matches = finding.get('matches', [])
            if matches:
                match_preview = matches[:3]
                print(f"      Matches: {', '.join(match_preview)}")
                if len(matches) > 3:
                    print(f"      ... y {len(matches) - 3} más")

        if len(findings) > max_display:
            print(f"\n  ... y {len(findings) - max_display} hallazgos más de severidad {severity}")

def generate_summary(results, output_file):
    valid_results = [r for r in results if isinstance(r, dict) and 'pattern_name' in r]

    summary = {
        "scan_date": datetime.now().isoformat(),
        "total_findings": len(valid_results),
        "by_severity": dict(Counter([r.get('severity', 'unknown') for r in valid_results])),
        "by_pattern": dict(Counter([r.get('pattern_name', 'unknown') for r in valid_results])),
        "by_source": dict(Counter([r.get('data_source', 'unknown') for r in valid_results])),
        "findings": valid_results
    }

    with open(output_file, 'w') as f:
        json.dump(summary, f, indent=2, default=str)

    print(f"\n{'='*70}")
    print(f"📈 RESUMEN ESTADÍSTICO")
    print(f"{'='*70}")
    print(f"   📊 Total de hallazgos: {summary['total_findings']}")

    print(f"\n   🚨 Por severidad:")
    severity_display_order = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']
    for severity in severity_display_order:
        count = summary['by_severity'].get(severity, 0)
        if count > 0:
            print(f"      {severity}: {count}")

    print(f"\n   📁 Por fuente:")
    for source, count in summary['by_source'].items():
        print(f"      {source}: {count}")

    print(f"\n   🔍 Top 5 patrones más detectados:")
    top_patterns = sorted(summary['by_pattern'].items(), 
                         key=lambda x: x[1], reverse=True)[:5]
    for pattern, count in top_patterns:
        print(f"      {pattern}: {count}")

    critical = get_critical_findings(valid_results)
    if critical:
        print(f"\n   ⚠️  ATENCIÓN: {len(critical)} hallazgos CRÍTICOS detectados")
        print(f"      Requieren acción inmediata")

    return summary

if __name__ == "__main__":
    print("=" * 70)
    print("🦅 HAWK-EYE SCANNER - Automated Security Scan")
    print("=" * 70)

    mysql_output = f"{RESULTS_DIR}/mysql_{timestamp}.json"
    s3_output = f"{RESULTS_DIR}/s3_{timestamp}.json"
    consolidated_output = f"{RESULTS_DIR}/consolidated_{timestamp}.json"
    summary_output = f"{RESULTS_DIR}/summary_{timestamp}.json"
    latest_output = f"{RESULTS_DIR}/latest.json"

    mysql_success = run_scan("mysql", mysql_output)
    s3_success = run_scan("s3", s3_output)

    if mysql_success or s3_success:
        results = consolidate_results(mysql_output, s3_output, consolidated_output)

        # SISTEMA DE TRACKING
        print(f"\n{'='*70}")
        print("🔄 Procesando con sistema de tracking...")
        print(f"{'='*70}")

        alert_mgr = AlertManager()

        new_alerts = []
        duplicate_count = 0

        for finding in results:
            processed = alert_mgr.process_finding(finding)
            if processed['is_new']:
                new_alerts.append(processed)
            else:
                duplicate_count += 1

        print(f"\n📊 Resultados del tracking:")
        print(f"   • Total de hallazgos: {len(results)}")
        print(f"   • Alertas NUEVAS: {len(new_alerts)}")
        print(f"   • Ya vistos: {duplicate_count}")

        stats = alert_mgr.get_stats()
        if stats['critical_pending'] > 0:
            print(f"\n   ⚠️  {stats['critical_pending']} alertas CRÍTICAS pendientes")

        # INTEGRACIÓN CON THEHIVE
        thehive = TheHiveIntegration()

        if thehive.test_connection():
            print(f"\n{'='*70}")
            print("🎯 Enviando alertas críticas a TheHive...")
            print(f"{'='*70}")

            cases_created = 0
            for alert in new_alerts:
                finding = alert['finding']
                if finding.get('severity') in ['CRITICAL', 'HIGH']:
                    case_id = thehive.create_case(finding, alert['alert_hash'])
                    if case_id:
                        cases_created += 1

            print(f"\n📋 Casos creados en TheHive: {cases_created}")
            print(f"🌐 Accede al dashboard: http://localhost:9000")
        else:
            print("\n⚠️  TheHive no está disponible (casos no enviados)")

        display_findings(results)
        generate_summary(results, summary_output)

        with open(latest_output, 'w') as f:
            json.dump(results, f, indent=2)

        print(f"\n{'='*70}")
        print(f"✅ Escaneo completado exitosamente")
        print(f"📁 Resultados guardados en: {ALERTS_DIR}/")
        print(f"{'='*70}\n")
    else:
        print("\n❌ Escaneo falló")
        exit(1)
docker exec -it hawk-scanner python run_hawk_scanner.py

¡Voilà, aquí están los registros marcados por severidad!

Ahora sabemos dónde están las joyas de la corona, para poder actuar en consecuencia. Vamos un poco mas adelante e integremos The Hive para poder hacer el seguimiento.

The Hive

Es la plataforma donde llevaremos adelante el seguimiento de los hallazgos. La misma esta en http://localhost:9000.

  • Usuario: admin@thehive.local

  • Password: secret

Es importante que obtengamos una API KEY para poder integrarla.

  1. Crear organización (si no existe):

    • Haz clic en "Admin" (arriba a la derecha)

    • "Organizations""+ New Organization"

    • Nombre: hawk-security

    • Haz clic en "Create"

  2. Crear usuario con permisos:

    • "Users""+ New User"

    • Nombre: hawk-scanner

    • Login: hawk-scanner@hawk-security

    • Perfil: org-admin (¡importante!)

    • Organización: hawk-security

    • Contraseña: HawkScanner2024!

    • Haz clic en "Create"

  3. Crear API Key:

    • Haz clic en el usuario hawk-scanner que acabas de crear

    • Pestaña "API Keys"

    • Haz clic en "+ Create API Key"

    • Nombre: hawk-scanner-api

    • Haz clic en "Create"

    • ⚠️ COPIA LA KEY

Estos son datos dummies, yo lo cree asi.

La sumamos la API Key a thehive_integration.py, antes de ejecutar. Una vez que el Hawk Eye se ejecute, veremos en The Hive:

5 casos creados:

  • 🔴 4 CRITICAL: Credit Cards (Visa, Mastercard, Amex, Discover)

  • 🟠 1 HIGH: Social Security Number (SSN)

Listo, tenemos todo para darle seguimiento. Te dejo el repositorio para que puedas hacerlo en tu laboratorio.

Referencias

https://github.com/rohitcoder/hawk-eye