Skip to main content

Command Palette

Search for a command to run...

La Batalla perdida de la Clasificación de la Información, primera parte.

Te muestro como buscar PII, PCI & mas en nuestra infra.

Updated
10 min read
La Batalla perdida de la Clasificación de la Información, primera parte.
S

I have a bachelor's degree in Technology from the University of Palermo, a Master in Information Security from the University of Murcia and different certifications such as CISSP | CISM | CDPSE | CCSK | CSX | MCSA | SMAC™️ | DSOE | DEPC | CSFPC | CSFPC | 5x AWS Certified.

He is currently CISO at Klar, a Mexican Fintech. He was fortunate to be awarded as CISO of the Year in Argentina in 2021 and was among the Top 100 CISO's in the World in 2022.

A lover of new technologies, he has developed a career in DevSecOps and Cloud Security at Eko Party, the largest security conference in Latin America.

Introducción

La clasificación de datos, esa práctica noble de etiquetar documentos, bases de datos y buckets según su sensibilidad, fue pensada para dar control. Pero en la era de la nube y el desarrollo ágil, la clasificación se volvió una ilusión. Los equipos crean recursos efímeros; los pipelines generan snapshots y backups; los scripts y lambdas almacenan datos temporales. El resultado: PII y PCI dispersas en lugares que nadie revisó.

En lugar de intentar imponer una clasificación perfecta (una batalla perdida), proponemos otro enfoque: encontrar lo que importa. Descubrir automáticamente las superficies que contienen datos sensibles, priorizar por riesgo y aplicar controles proporcionalmente. Esto no solo es pragmático: es la única estrategia viable para reducir la exposición real en ambientes dinámicos.

En la prueba de concepto que desarrollé, combiné LocalStack para emular AWS S3, un contenedor MySQL y un escáner como Hawk Eye. Una manera de evidenciar archivos y tablas con PII/PCI que no figuraban en ningún inventario. A grandes rasgos, Hawk Eye realiza el descubrimiento, luego los hallazgos son clasificados, se revisa si es algo ya evidenciado o no y eso se impacta en The Hive para un seguimiento.

Arquitectura

Aca esta nuestro docker compose, todo nuestro stack.

services:
  localstack:
    image: localstack/localstack:2.2
    container_name: localstack
    environment:
      - SERVICES=s3
      - DEFAULT_REGION=us-east-1
      - DEBUG=1
    ports:
      - "4566:4566"
    volumes:
      - "./localstack_data:/tmp/localstack"
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:4566/_localstack/health"]
      interval: 10s
      timeout: 5s
      retries: 5
    networks:
      - hawk-network

  hawk-mysql:
    image: mysql:8.0
    container_name: hawk-mysql 
    environment:
      MYSQL_ROOT_PASSWORD: rootpassword
      MYSQL_DATABASE: pocdb
    ports:
      - "3306:3306"
    volumes:
      - mysql_data:/var/lib/mysql
    healthcheck:
      test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-u", "root", "-prootpassword"]
      interval: 10s
      timeout: 5s
      retries: 5
    networks:
      - hawk-network

  cassandra:
    image: cassandra:4.1
    container_name: cassandra
    environment:
      - MAX_HEAP_SIZE=1G
      - HEAP_NEWSIZE=256M
      - CASSANDRA_CLUSTER_NAME=thehive
    volumes:
      - cassandra_data:/var/lib/cassandra
    networks:
      - hawk-network
    healthcheck:
      test: ["CMD-SHELL", "cqlsh -e 'describe cluster' || exit 1"]
      interval: 30s
      timeout: 10s
      retries: 10
      start_period: 120s

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.17.9
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - cluster.name=thehive
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - elasticsearch_data:/usr/share/elasticsearch/data
    networks:
      - hawk-network
    healthcheck:
      test: ["CMD-SHELL", "curl -f http://localhost:9200/_cluster/health || exit 1"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 60s

  thehive:
    image: strangebee/thehive:5.0
    platform: linux/amd64   
    container_name: thehive
    depends_on:
      cassandra:
        condition: service_healthy
      elasticsearch:
        condition: service_healthy
    ports:
      - "9000:9000"
    environment:
      - JVM_OPTS=-Xms1G -Xmx1G
    volumes:
      - thehive_data:/opt/thehive/data
      - ./thehive-config/application.conf:/etc/thehive/application.conf:ro
    networks:
      - hawk-network
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/api/v1/status"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 180s

  hawk-scanner:
    build: .
    container_name: hawk-scanner
    platform: linux/amd64
    depends_on:
      hawk-mysql:
        condition: service_healthy
      localstack:
        condition: service_healthy
      thehive:
        condition: service_started
    environment:
      - AWS_ACCESS_KEY_ID=test
      - AWS_SECRET_ACCESS_KEY=test
      - AWS_ENDPOINT_URL=http://localstack:4566
      - AWS_DEFAULT_REGION=us-east-1
      - PYTHONIOENCODING=utf-8
      - LANG=C.UTF-8
      - LC_ALL=C.UTF-8
    volumes:
      - ./alerts:/app/alerts
      - ./hawk-scanner/connection.yml:/app/connection.yml
      - ./hawk-scanner/fingerprint.yml:/app/fingerprint.yml
      - ./hawk-scanner/data:/app/data
    command: tail -f /dev/null
    networks:
      - hawk-network

networks:
  hawk-network:
    driver: bridge

volumes:
  mysql_data:
  cassandra_data:
  elasticsearch_data:
  thehive_data:

Hawk Scanner

Primero el Dockerfile para la creación del contenedor que contendrá Hawk Eye.

# Dockerfile corregido
FROM python:3.11-slim

ENV DEBIAN_FRONTEND=noninteractive \
    PYTHONUNBUFFERED=1

WORKDIR /app

# Dependencias del sistema
RUN apt-get update \
 && apt-get install -y --no-install-recommends \
    curl \
    git \
    netcat-openbsd \
    build-essential \
    libgl1 \
    libglx-mesa0 \
    libglib2.0-0 \
 && rm -rf /var/lib/apt/lists/*

# Instalar dependencias Python
COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt

# Copiar todo el directorio del scanner
COPY hawk-scanner /app/

# Crear carpetas necesarias
RUN mkdir -p /app/alerts /app/data \
 && chmod -R a+rX /app

# WORKDIR ya es /app (donde están los archivos yml)
CMD ["python", "run_hawk_scanner.py"]

Vamos a configurar las conexiones para que Hawk Eye pueda hacer la investigación. En mi caso me propuse investigar AWS S3 y una BBDD MySQL.

# connection.yml
# Configuración de conexiones para Hawk-eye Scanner
# Este archivo define las fuentes de datos a escanear

notify:
  redacted: true  # Redactar datos sensibles en las notificaciones
  suppress_duplicates: true  # Suprimir alertas duplicadas

  # Opcional: Configurar webhook de Slack para notificaciones
  # slack:
  #   webhook_url: https://hooks.slack.com/services/YOUR/WEBHOOK/URL
  #   mention: "<@USERID>"  # Opcional: mencionar usuario/bot en alertas

sources:
  # ==========================================
  # CONFIGURACIÓN MYSQL
  # ==========================================
  mysql:
    poc_mysql:
      host: hawk-mysql
      port: 3306
      user: pocuser
      password: pocpassword
      database: pocdb
      limit_start: 0
      limit_end: 10000
      # Opcional: especificar tablas específicas
      # tables:
      #   - payments
      #   - users
      # Opcional: excluir columnas
      # exclude_columns:
      #   - id
      #   - created_at

  # ==========================================
  # CONFIGURACIÓN S3 (LOCALSTACK)
  # ==========================================
  s3:
    poc_s3:
      access_key: test
      secret_key: test
      bucket_name: poc-bucket
      endpoint_url: http://localstack:4566  # LocalStack endpoint
      cache: false
      # Opcional: patrones a excluir
      # exclude_patterns:
      #   - .log
      #   - .tmp

  # ==========================================
  # CONFIGURACIÓN FILESYSTEM (Opcional)
  # ==========================================
  # fs:
  #   local_scan:
  #     path: /app/test-data
  #     exclude_patterns:
  #       - .git
  #       - node_modules
  #       - venv
  #       - __pycache__

  # ==========================================
  # CONFIGURACIÓN REDIS (Opcional)
  # ==========================================
  # redis:
  #   poc_redis:
  #     host: redis
  #     port: 6379
  #     password: your_redis_password

  # ==========================================
  # CONFIGURACIÓN POSTGRESQL (Opcional)
  # ==========================================
  # postgresql:
  #   poc_postgres:
  #     host: postgres
  #     port: 5432
  #     user: postgres
  #     password: postgres
  #     database: testdb
  #     limit_start: 0
  #     limit_end: 1000

Y ahora lo mas importante ¿Que estamos buscando? Para ello generamos fingerprint.yml.

"Credit Card - Visa": '\b4[0-9]{12}(?:[0-9]{3})?\b'
"Credit Card - Mastercard": '\b5[1-5][0-9]{14}\b'
"Credit Card - American Express": '\b3[47][0-9]{13}\b'
"Credit Card - Discover": '\b6(?:011|5[0-9]{2})[0-9]{12}\b'
"Social Security Number (SSN)": '\b\d{3}-\d{2}-\d{4}\b'
"Email Address": '\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
"Phone Number - US": '\b(?:\+?1[-.]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b'
"Phone Number - International": '\b\+\d{1,3}[-.\s]?\d{1,4}[-.\s]?\d{1,4}[-.\s]?\d{1,9}\b'
"AWS Access Key": '\b(AKIA|A3T|AGPA|AIDA|AROA|AIPA|ANPA|ANVA|ASIA)[A-Z0-9]{16}\b'
"AWS Secret Key": '\b[A-Za-z0-9/+=]{40}\b'
"Private Key": '-----BEGIN (RSA|DSA|EC|OPENSSH|PGP) PRIVATE KEY-----'
"Generic Password": "(?i)(password|pwd|passwd)\\s*[=:]\\s*\\S{4,}"
"API Key": "(?i)(api[_-]?key|apikey)\\s*[=:]\\s*[A-Za-z0-9_\\-]{20,}"
"JWT Token": '\beyJ[A-Za-z0-9_-]*\.eyJ[A-Za-z0-9_-]*\.[A-Za-z0-9_-]*\b'
"IP Address - Private": '\b(?:10\.\d{1,3}\.\d{1,3}\.\d{1,3}|172\.(?:1[6-9]|2[0-9]|3[01])\.\d{1,3}\.\d{1,3}|192\.168\.\d{1,3}\.\d{1,3})\b'
"IBAN": '\b[A-Z]{2}\d{2}[A-Z0-9]{4}\d{7}([A-Z0-9]?){0,16}\b'
"Bitcoin Address": '\b[13][a-km-zA-HJ-NP-Z1-9]{25,34}\b'
"URL with Credentials": '(?i)https?://[^:]+:[^@]+@[^\s]+'

¿Qué hace?

  • Define 15+ patrones regex para detectar datos sensibles

  • Incluye: tarjetas (Visa, MC, Amex), SSN, emails, teléfonos, AWS keys, JWT, passwords, IPs, etc.Cada

Generación Datos Dummies

Vamos a crear un entorno virtual para ejecutar el generador de datos. Lo encontrarás en la carpeta data.

python3 -m venv env
source env/bin/activate
# Instalar dependencias
pip3 install -r requirements.txt
# Correr el generador
> python3 generar_datos.py
============================================================
Generando datos de prueba PCI/PII
============================================================

[1] Conectando a MySQL...
[2] Creando tabla de pagos...
[3] Insertando datos PCI (tarjetas de crédito)...
    4 tarjetas insertadas

[4] Generando PDF con información PII...
    PDF generado

[5] Conectando a S3 (LocalStack)...
[6] Creando bucket...
    Bucket 'poc-bucket' creado
[7] Subiendo PDF con datos sensibles...
    PDF subido: s3://poc-bucket/hr/empleados_confidencial.pdf
[8] Subiendo archivo de texto con más PII...
    Archivo TXT subido: s3://poc-bucket/contacts/internal_directory.txt

============================================================
 DATOS DE PRUEBA GENERADOS EXITOSAMENTE
============================================================

Resumen:
   MySQL tabla 'payments': 4 registros con tarjetas
   S3 archivo PDF: 3 empleados con SSN y tarjetas
   S3 archivo TXT: 3 contactos con datos sensibles

💡 Ahora ejecuta el scanner con: docker exec -it hawk-scanner python run_hawk_scanner.py

Vamos a revisar la data en MySQL

Perfecto, ahora listamos el S3 Bucket donde vemos las dos carpetas, con información, que habíamos generado.

Vamos a revisar nuestro run_hawk_scanner.py. Script orquestador que automatiza el proceso completo de escaneo de seguridad, consolidación de resultados y generación de reportes. Ahora vamos a correr nuestra Aguila. En mi caso desde el docker.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import subprocess
import json
import os
import sys
from datetime import datetime
from collections import Counter
from severity_classifier import reclassify_findings, get_critical_findings
from alert_manager import AlertManager
from thehive_integration import TheHiveIntegration

# Directorios
ALERTS_DIR = "/app/alerts"
RESULTS_DIR = "/app/alerts"

os.makedirs(ALERTS_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

def run_scan(source_type, output_file):
    print(f"🔍 Escaneando {source_type}...")
    cmd = [
        "hawk_scanner",
        source_type,
        "--connection", "connection.yml",
        "--fingerprint", "fingerprint.yml",
        "--json", output_file
    ]

    try:
        result = subprocess.run(cmd, capture_output=True, text=True)
        if result.returncode == 0:
            print(f"✅ {source_type} completado: {output_file}")
            return True
        else:
            print(f"❌ Error en {source_type}:")
            print(result.stderr)
            return False
    except Exception as e:
        print(f"❌ Excepción en {source_type}: {e}")
        return False

def consolidate_results(mysql_file, s3_file, output_file):
    all_results = []

    if os.path.exists(mysql_file):
        with open(mysql_file, 'r') as f:
            mysql_data = json.load(f)
            if isinstance(mysql_data, dict):
                for key, findings in mysql_data.items():
                    if isinstance(findings, list):
                        all_results.extend(findings)
            elif isinstance(mysql_data, list):
                all_results.extend(mysql_data)

    if os.path.exists(s3_file):
        with open(s3_file, 'r') as f:
            s3_data = json.load(f)
            if isinstance(s3_data, dict):
                for key, findings in s3_data.items():
                    if isinstance(findings, list):
                        all_results.extend(findings)
            elif isinstance(s3_data, list):
                all_results.extend(s3_data)

    all_results = reclassify_findings(all_results)

    with open(output_file, 'w') as f:
        json.dump(all_results, f, indent=2)

    print(f"📊 Resultados consolidados: {len(all_results)} hallazgos")
    return all_results

def display_findings(results):
    if not results:
        print("\n✅ No se detectaron hallazgos de seguridad")
        return

    print(f"\n{'='*70}")
    print(f"🔍 HALLAZGOS DETECTADOS")
    print(f"{'='*70}")

    by_severity = {}
    for r in results:
        severity = r.get('severity', 'Unknown')
        if severity not in by_severity:
            by_severity[severity] = []
        by_severity[severity].append(r)

    severity_order = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW', 'Unknown']
    severity_icons = {
        'CRITICAL': '🔴',
        'HIGH': '🟠', 
        'MEDIUM': '🟡',
        'LOW': '🟢',
        'Unknown': '⚪'
    }

    for severity in severity_order:
        if severity not in by_severity:
            continue

        findings = by_severity[severity]
        icon = severity_icons.get(severity, '⚪')

        print(f"\n{icon} {severity} - {len(findings)} hallazgos")
        print("-" * 70)

        max_display = len(findings) if severity == 'CRITICAL' else min(5, len(findings))

        for i, finding in enumerate(findings[:max_display], 1):
            print(f"\n  [{i}] {finding.get('pattern_name', 'Unknown Pattern')}")
            print(f"      Fuente: {finding.get('data_source', 'unknown')}")

            if finding.get('data_source') == 'mysql':
                print(f"      Base de datos: {finding.get('database', 'N/A')}")
                print(f"      Tabla: {finding.get('table', 'N/A')}")
                print(f"      Columna: {finding.get('column', 'N/A')}")
            elif finding.get('data_source') == 's3':
                print(f"      Bucket: {finding.get('bucket', 'N/A')}")
                print(f"      Archivo: {finding.get('file_path', 'N/A')}")

            matches = finding.get('matches', [])
            if matches:
                match_preview = matches[:3]
                print(f"      Matches: {', '.join(match_preview)}")
                if len(matches) > 3:
                    print(f"      ... y {len(matches) - 3} más")

        if len(findings) > max_display:
            print(f"\n  ... y {len(findings) - max_display} hallazgos más de severidad {severity}")

def generate_summary(results, output_file):
    valid_results = [r for r in results if isinstance(r, dict) and 'pattern_name' in r]

    summary = {
        "scan_date": datetime.now().isoformat(),
        "total_findings": len(valid_results),
        "by_severity": dict(Counter([r.get('severity', 'unknown') for r in valid_results])),
        "by_pattern": dict(Counter([r.get('pattern_name', 'unknown') for r in valid_results])),
        "by_source": dict(Counter([r.get('data_source', 'unknown') for r in valid_results])),
        "findings": valid_results
    }

    with open(output_file, 'w') as f:
        json.dump(summary, f, indent=2, default=str)

    print(f"\n{'='*70}")
    print(f"📈 RESUMEN ESTADÍSTICO")
    print(f"{'='*70}")
    print(f"   📊 Total de hallazgos: {summary['total_findings']}")

    print(f"\n   🚨 Por severidad:")
    severity_display_order = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']
    for severity in severity_display_order:
        count = summary['by_severity'].get(severity, 0)
        if count > 0:
            print(f"      {severity}: {count}")

    print(f"\n   📁 Por fuente:")
    for source, count in summary['by_source'].items():
        print(f"      {source}: {count}")

    print(f"\n   🔍 Top 5 patrones más detectados:")
    top_patterns = sorted(summary['by_pattern'].items(), 
                         key=lambda x: x[1], reverse=True)[:5]
    for pattern, count in top_patterns:
        print(f"      {pattern}: {count}")

    critical = get_critical_findings(valid_results)
    if critical:
        print(f"\n   ⚠️  ATENCIÓN: {len(critical)} hallazgos CRÍTICOS detectados")
        print(f"      Requieren acción inmediata")

    return summary

if __name__ == "__main__":
    print("=" * 70)
    print("🦅 HAWK-EYE SCANNER - Automated Security Scan")
    print("=" * 70)

    mysql_output = f"{RESULTS_DIR}/mysql_{timestamp}.json"
    s3_output = f"{RESULTS_DIR}/s3_{timestamp}.json"
    consolidated_output = f"{RESULTS_DIR}/consolidated_{timestamp}.json"
    summary_output = f"{RESULTS_DIR}/summary_{timestamp}.json"
    latest_output = f"{RESULTS_DIR}/latest.json"

    mysql_success = run_scan("mysql", mysql_output)
    s3_success = run_scan("s3", s3_output)

    if mysql_success or s3_success:
        results = consolidate_results(mysql_output, s3_output, consolidated_output)

        # SISTEMA DE TRACKING
        print(f"\n{'='*70}")
        print("🔄 Procesando con sistema de tracking...")
        print(f"{'='*70}")

        alert_mgr = AlertManager()

        new_alerts = []
        duplicate_count = 0

        for finding in results:
            processed = alert_mgr.process_finding(finding)
            if processed['is_new']:
                new_alerts.append(processed)
            else:
                duplicate_count += 1

        print(f"\n📊 Resultados del tracking:")
        print(f"   • Total de hallazgos: {len(results)}")
        print(f"   • Alertas NUEVAS: {len(new_alerts)}")
        print(f"   • Ya vistos: {duplicate_count}")

        stats = alert_mgr.get_stats()
        if stats['critical_pending'] > 0:
            print(f"\n   ⚠️  {stats['critical_pending']} alertas CRÍTICAS pendientes")

        # INTEGRACIÓN CON THEHIVE
        thehive = TheHiveIntegration()

        if thehive.test_connection():
            print(f"\n{'='*70}")
            print("🎯 Enviando alertas críticas a TheHive...")
            print(f"{'='*70}")

            cases_created = 0
            for alert in new_alerts:
                finding = alert['finding']
                if finding.get('severity') in ['CRITICAL', 'HIGH']:
                    case_id = thehive.create_case(finding, alert['alert_hash'])
                    if case_id:
                        cases_created += 1

            print(f"\n📋 Casos creados en TheHive: {cases_created}")
            print(f"🌐 Accede al dashboard: http://localhost:9000")
        else:
            print("\n⚠️  TheHive no está disponible (casos no enviados)")

        display_findings(results)
        generate_summary(results, summary_output)

        with open(latest_output, 'w') as f:
            json.dump(results, f, indent=2)

        print(f"\n{'='*70}")
        print(f"✅ Escaneo completado exitosamente")
        print(f"📁 Resultados guardados en: {ALERTS_DIR}/")
        print(f"{'='*70}\n")
    else:
        print("\n❌ Escaneo falló")
        exit(1)
docker exec -it hawk-scanner python run_hawk_scanner.py

¡Voilà, aquí están los registros marcados por severidad!

Ahora sabemos dónde están las joyas de la corona, para poder actuar en consecuencia. Vamos un poco mas adelante e integremos The Hive para poder hacer el seguimiento.

The Hive

Es la plataforma donde llevaremos adelante el seguimiento de los hallazgos. La misma esta en http://localhost:9000.

  • Usuario: admin@thehive.local

  • Password: secret

Es importante que obtengamos una API KEY para poder integrarla.

  1. Crear organización (si no existe):

    • Haz clic en "Admin" (arriba a la derecha)

    • "Organizations""+ New Organization"

    • Nombre: hawk-security

    • Haz clic en "Create"

  2. Crear usuario con permisos:

    • "Users""+ New User"

    • Nombre: hawk-scanner

    • Login: hawk-scanner@hawk-security

    • Perfil: org-admin (¡importante!)

    • Organización: hawk-security

    • Contraseña: HawkScanner2024!

    • Haz clic en "Create"

  3. Crear API Key:

    • Haz clic en el usuario hawk-scanner que acabas de crear

    • Pestaña "API Keys"

    • Haz clic en "+ Create API Key"

    • Nombre: hawk-scanner-api

    • Haz clic en "Create"

    • ⚠️ COPIA LA KEY

Estos son datos dummies, yo lo cree asi.

La sumamos la API Key a thehive_integration.py, antes de ejecutar. Una vez que el Hawk Eye se ejecute, veremos en The Hive:

5 casos creados:

  • 🔴 4 CRITICAL: Credit Cards (Visa, Mastercard, Amex, Discover)

  • 🟠 1 HIGH: Social Security Number (SSN)

Listo, tenemos todo para darle seguimiento. Te dejo el repositorio para que puedas hacerlo en tu laboratorio.

Referencias

https://github.com/rohitcoder/hawk-eye