La Batalla perdida de la Clasificación de la Información, primera parte.
Te muestro como buscar PII, PCI & mas en nuestra infra.

Introducción
La clasificación de datos, esa práctica noble de etiquetar documentos, bases de datos y buckets según su sensibilidad, fue pensada para dar control. Pero en la era de la nube y el desarrollo ágil, la clasificación se volvió una ilusión. Los equipos crean recursos efímeros; los pipelines generan snapshots y backups; los scripts y lambdas almacenan datos temporales. El resultado: PII y PCI dispersas en lugares que nadie revisó.
En lugar de intentar imponer una clasificación perfecta (una batalla perdida), proponemos otro enfoque: encontrar lo que importa. Descubrir automáticamente las superficies que contienen datos sensibles, priorizar por riesgo y aplicar controles proporcionalmente. Esto no solo es pragmático: es la única estrategia viable para reducir la exposición real en ambientes dinámicos.
En la prueba de concepto que desarrollé, combiné LocalStack para emular AWS S3, un contenedor MySQL y un escáner como Hawk Eye. Una manera de evidenciar archivos y tablas con PII/PCI que no figuraban en ningún inventario. A grandes rasgos, Hawk Eye realiza el descubrimiento, luego los hallazgos son clasificados, se revisa si es algo ya evidenciado o no y eso se impacta en The Hive para un seguimiento.
Arquitectura

Aca esta nuestro docker compose, todo nuestro stack.
services:
localstack:
image: localstack/localstack:2.2
container_name: localstack
environment:
- SERVICES=s3
- DEFAULT_REGION=us-east-1
- DEBUG=1
ports:
- "4566:4566"
volumes:
- "./localstack_data:/tmp/localstack"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:4566/_localstack/health"]
interval: 10s
timeout: 5s
retries: 5
networks:
- hawk-network
hawk-mysql:
image: mysql:8.0
container_name: hawk-mysql
environment:
MYSQL_ROOT_PASSWORD: rootpassword
MYSQL_DATABASE: pocdb
ports:
- "3306:3306"
volumes:
- mysql_data:/var/lib/mysql
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-u", "root", "-prootpassword"]
interval: 10s
timeout: 5s
retries: 5
networks:
- hawk-network
cassandra:
image: cassandra:4.1
container_name: cassandra
environment:
- MAX_HEAP_SIZE=1G
- HEAP_NEWSIZE=256M
- CASSANDRA_CLUSTER_NAME=thehive
volumes:
- cassandra_data:/var/lib/cassandra
networks:
- hawk-network
healthcheck:
test: ["CMD-SHELL", "cqlsh -e 'describe cluster' || exit 1"]
interval: 30s
timeout: 10s
retries: 10
start_period: 120s
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.9
container_name: elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- cluster.name=thehive
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- elasticsearch_data:/usr/share/elasticsearch/data
networks:
- hawk-network
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:9200/_cluster/health || exit 1"]
interval: 30s
timeout: 10s
retries: 5
start_period: 60s
thehive:
image: strangebee/thehive:5.0
platform: linux/amd64
container_name: thehive
depends_on:
cassandra:
condition: service_healthy
elasticsearch:
condition: service_healthy
ports:
- "9000:9000"
environment:
- JVM_OPTS=-Xms1G -Xmx1G
volumes:
- thehive_data:/opt/thehive/data
- ./thehive-config/application.conf:/etc/thehive/application.conf:ro
networks:
- hawk-network
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/api/v1/status"]
interval: 30s
timeout: 10s
retries: 5
start_period: 180s
hawk-scanner:
build: .
container_name: hawk-scanner
platform: linux/amd64
depends_on:
hawk-mysql:
condition: service_healthy
localstack:
condition: service_healthy
thehive:
condition: service_started
environment:
- AWS_ACCESS_KEY_ID=test
- AWS_SECRET_ACCESS_KEY=test
- AWS_ENDPOINT_URL=http://localstack:4566
- AWS_DEFAULT_REGION=us-east-1
- PYTHONIOENCODING=utf-8
- LANG=C.UTF-8
- LC_ALL=C.UTF-8
volumes:
- ./alerts:/app/alerts
- ./hawk-scanner/connection.yml:/app/connection.yml
- ./hawk-scanner/fingerprint.yml:/app/fingerprint.yml
- ./hawk-scanner/data:/app/data
command: tail -f /dev/null
networks:
- hawk-network
networks:
hawk-network:
driver: bridge
volumes:
mysql_data:
cassandra_data:
elasticsearch_data:
thehive_data:
Hawk Scanner
Primero el Dockerfile para la creación del contenedor que contendrá Hawk Eye.
# Dockerfile corregido
FROM python:3.11-slim
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1
WORKDIR /app
# Dependencias del sistema
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
curl \
git \
netcat-openbsd \
build-essential \
libgl1 \
libglx-mesa0 \
libglib2.0-0 \
&& rm -rf /var/lib/apt/lists/*
# Instalar dependencias Python
COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt
# Copiar todo el directorio del scanner
COPY hawk-scanner /app/
# Crear carpetas necesarias
RUN mkdir -p /app/alerts /app/data \
&& chmod -R a+rX /app
# WORKDIR ya es /app (donde están los archivos yml)
CMD ["python", "run_hawk_scanner.py"]
Vamos a configurar las conexiones para que Hawk Eye pueda hacer la investigación. En mi caso me propuse investigar AWS S3 y una BBDD MySQL.
# connection.yml
# Configuración de conexiones para Hawk-eye Scanner
# Este archivo define las fuentes de datos a escanear
notify:
redacted: true # Redactar datos sensibles en las notificaciones
suppress_duplicates: true # Suprimir alertas duplicadas
# Opcional: Configurar webhook de Slack para notificaciones
# slack:
# webhook_url: https://hooks.slack.com/services/YOUR/WEBHOOK/URL
# mention: "<@USERID>" # Opcional: mencionar usuario/bot en alertas
sources:
# ==========================================
# CONFIGURACIÓN MYSQL
# ==========================================
mysql:
poc_mysql:
host: hawk-mysql
port: 3306
user: pocuser
password: pocpassword
database: pocdb
limit_start: 0
limit_end: 10000
# Opcional: especificar tablas específicas
# tables:
# - payments
# - users
# Opcional: excluir columnas
# exclude_columns:
# - id
# - created_at
# ==========================================
# CONFIGURACIÓN S3 (LOCALSTACK)
# ==========================================
s3:
poc_s3:
access_key: test
secret_key: test
bucket_name: poc-bucket
endpoint_url: http://localstack:4566 # LocalStack endpoint
cache: false
# Opcional: patrones a excluir
# exclude_patterns:
# - .log
# - .tmp
# ==========================================
# CONFIGURACIÓN FILESYSTEM (Opcional)
# ==========================================
# fs:
# local_scan:
# path: /app/test-data
# exclude_patterns:
# - .git
# - node_modules
# - venv
# - __pycache__
# ==========================================
# CONFIGURACIÓN REDIS (Opcional)
# ==========================================
# redis:
# poc_redis:
# host: redis
# port: 6379
# password: your_redis_password
# ==========================================
# CONFIGURACIÓN POSTGRESQL (Opcional)
# ==========================================
# postgresql:
# poc_postgres:
# host: postgres
# port: 5432
# user: postgres
# password: postgres
# database: testdb
# limit_start: 0
# limit_end: 1000
Y ahora lo mas importante ¿Que estamos buscando? Para ello generamos fingerprint.yml.
"Credit Card - Visa": '\b4[0-9]{12}(?:[0-9]{3})?\b'
"Credit Card - Mastercard": '\b5[1-5][0-9]{14}\b'
"Credit Card - American Express": '\b3[47][0-9]{13}\b'
"Credit Card - Discover": '\b6(?:011|5[0-9]{2})[0-9]{12}\b'
"Social Security Number (SSN)": '\b\d{3}-\d{2}-\d{4}\b'
"Email Address": '\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
"Phone Number - US": '\b(?:\+?1[-.]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b'
"Phone Number - International": '\b\+\d{1,3}[-.\s]?\d{1,4}[-.\s]?\d{1,4}[-.\s]?\d{1,9}\b'
"AWS Access Key": '\b(AKIA|A3T|AGPA|AIDA|AROA|AIPA|ANPA|ANVA|ASIA)[A-Z0-9]{16}\b'
"AWS Secret Key": '\b[A-Za-z0-9/+=]{40}\b'
"Private Key": '-----BEGIN (RSA|DSA|EC|OPENSSH|PGP) PRIVATE KEY-----'
"Generic Password": "(?i)(password|pwd|passwd)\\s*[=:]\\s*\\S{4,}"
"API Key": "(?i)(api[_-]?key|apikey)\\s*[=:]\\s*[A-Za-z0-9_\\-]{20,}"
"JWT Token": '\beyJ[A-Za-z0-9_-]*\.eyJ[A-Za-z0-9_-]*\.[A-Za-z0-9_-]*\b'
"IP Address - Private": '\b(?:10\.\d{1,3}\.\d{1,3}\.\d{1,3}|172\.(?:1[6-9]|2[0-9]|3[01])\.\d{1,3}\.\d{1,3}|192\.168\.\d{1,3}\.\d{1,3})\b'
"IBAN": '\b[A-Z]{2}\d{2}[A-Z0-9]{4}\d{7}([A-Z0-9]?){0,16}\b'
"Bitcoin Address": '\b[13][a-km-zA-HJ-NP-Z1-9]{25,34}\b'
"URL with Credentials": '(?i)https?://[^:]+:[^@]+@[^\s]+'
¿Qué hace?
Define 15+ patrones regex para detectar datos sensibles
Incluye: tarjetas (Visa, MC, Amex), SSN, emails, teléfonos, AWS keys, JWT, passwords, IPs, etc.Cada
Generación Datos Dummies
Vamos a crear un entorno virtual para ejecutar el generador de datos. Lo encontrarás en la carpeta data.
python3 -m venv env
source env/bin/activate
# Instalar dependencias
pip3 install -r requirements.txt
# Correr el generador
> python3 generar_datos.py
============================================================
Generando datos de prueba PCI/PII
============================================================
[1] Conectando a MySQL...
[2] Creando tabla de pagos...
[3] Insertando datos PCI (tarjetas de crédito)...
✓ 4 tarjetas insertadas
[4] Generando PDF con información PII...
✓ PDF generado
[5] Conectando a S3 (LocalStack)...
[6] Creando bucket...
✓ Bucket 'poc-bucket' creado
[7] Subiendo PDF con datos sensibles...
✓ PDF subido: s3://poc-bucket/hr/empleados_confidencial.pdf
[8] Subiendo archivo de texto con más PII...
✓ Archivo TXT subido: s3://poc-bucket/contacts/internal_directory.txt
============================================================
✅ DATOS DE PRUEBA GENERADOS EXITOSAMENTE
============================================================
Resumen:
• MySQL tabla 'payments': 4 registros con tarjetas
• S3 archivo PDF: 3 empleados con SSN y tarjetas
• S3 archivo TXT: 3 contactos con datos sensibles
💡 Ahora ejecuta el scanner con: docker exec -it hawk-scanner python run_hawk_scanner.py
Vamos a revisar la data en MySQL

Perfecto, ahora listamos el S3 Bucket donde vemos las dos carpetas, con información, que habíamos generado.

Vamos a revisar nuestro run_hawk_scanner.py. Script orquestador que automatiza el proceso completo de escaneo de seguridad, consolidación de resultados y generación de reportes. Ahora vamos a correr nuestra Aguila. En mi caso desde el docker.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import subprocess
import json
import os
import sys
from datetime import datetime
from collections import Counter
from severity_classifier import reclassify_findings, get_critical_findings
from alert_manager import AlertManager
from thehive_integration import TheHiveIntegration
# Directorios
ALERTS_DIR = "/app/alerts"
RESULTS_DIR = "/app/alerts"
os.makedirs(ALERTS_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
def run_scan(source_type, output_file):
print(f"🔍 Escaneando {source_type}...")
cmd = [
"hawk_scanner",
source_type,
"--connection", "connection.yml",
"--fingerprint", "fingerprint.yml",
"--json", output_file
]
try:
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f"✅ {source_type} completado: {output_file}")
return True
else:
print(f"❌ Error en {source_type}:")
print(result.stderr)
return False
except Exception as e:
print(f"❌ Excepción en {source_type}: {e}")
return False
def consolidate_results(mysql_file, s3_file, output_file):
all_results = []
if os.path.exists(mysql_file):
with open(mysql_file, 'r') as f:
mysql_data = json.load(f)
if isinstance(mysql_data, dict):
for key, findings in mysql_data.items():
if isinstance(findings, list):
all_results.extend(findings)
elif isinstance(mysql_data, list):
all_results.extend(mysql_data)
if os.path.exists(s3_file):
with open(s3_file, 'r') as f:
s3_data = json.load(f)
if isinstance(s3_data, dict):
for key, findings in s3_data.items():
if isinstance(findings, list):
all_results.extend(findings)
elif isinstance(s3_data, list):
all_results.extend(s3_data)
all_results = reclassify_findings(all_results)
with open(output_file, 'w') as f:
json.dump(all_results, f, indent=2)
print(f"📊 Resultados consolidados: {len(all_results)} hallazgos")
return all_results
def display_findings(results):
if not results:
print("\n✅ No se detectaron hallazgos de seguridad")
return
print(f"\n{'='*70}")
print(f"🔍 HALLAZGOS DETECTADOS")
print(f"{'='*70}")
by_severity = {}
for r in results:
severity = r.get('severity', 'Unknown')
if severity not in by_severity:
by_severity[severity] = []
by_severity[severity].append(r)
severity_order = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW', 'Unknown']
severity_icons = {
'CRITICAL': '🔴',
'HIGH': '🟠',
'MEDIUM': '🟡',
'LOW': '🟢',
'Unknown': '⚪'
}
for severity in severity_order:
if severity not in by_severity:
continue
findings = by_severity[severity]
icon = severity_icons.get(severity, '⚪')
print(f"\n{icon} {severity} - {len(findings)} hallazgos")
print("-" * 70)
max_display = len(findings) if severity == 'CRITICAL' else min(5, len(findings))
for i, finding in enumerate(findings[:max_display], 1):
print(f"\n [{i}] {finding.get('pattern_name', 'Unknown Pattern')}")
print(f" Fuente: {finding.get('data_source', 'unknown')}")
if finding.get('data_source') == 'mysql':
print(f" Base de datos: {finding.get('database', 'N/A')}")
print(f" Tabla: {finding.get('table', 'N/A')}")
print(f" Columna: {finding.get('column', 'N/A')}")
elif finding.get('data_source') == 's3':
print(f" Bucket: {finding.get('bucket', 'N/A')}")
print(f" Archivo: {finding.get('file_path', 'N/A')}")
matches = finding.get('matches', [])
if matches:
match_preview = matches[:3]
print(f" Matches: {', '.join(match_preview)}")
if len(matches) > 3:
print(f" ... y {len(matches) - 3} más")
if len(findings) > max_display:
print(f"\n ... y {len(findings) - max_display} hallazgos más de severidad {severity}")
def generate_summary(results, output_file):
valid_results = [r for r in results if isinstance(r, dict) and 'pattern_name' in r]
summary = {
"scan_date": datetime.now().isoformat(),
"total_findings": len(valid_results),
"by_severity": dict(Counter([r.get('severity', 'unknown') for r in valid_results])),
"by_pattern": dict(Counter([r.get('pattern_name', 'unknown') for r in valid_results])),
"by_source": dict(Counter([r.get('data_source', 'unknown') for r in valid_results])),
"findings": valid_results
}
with open(output_file, 'w') as f:
json.dump(summary, f, indent=2, default=str)
print(f"\n{'='*70}")
print(f"📈 RESUMEN ESTADÍSTICO")
print(f"{'='*70}")
print(f" 📊 Total de hallazgos: {summary['total_findings']}")
print(f"\n 🚨 Por severidad:")
severity_display_order = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']
for severity in severity_display_order:
count = summary['by_severity'].get(severity, 0)
if count > 0:
print(f" {severity}: {count}")
print(f"\n 📁 Por fuente:")
for source, count in summary['by_source'].items():
print(f" {source}: {count}")
print(f"\n 🔍 Top 5 patrones más detectados:")
top_patterns = sorted(summary['by_pattern'].items(),
key=lambda x: x[1], reverse=True)[:5]
for pattern, count in top_patterns:
print(f" {pattern}: {count}")
critical = get_critical_findings(valid_results)
if critical:
print(f"\n ⚠️ ATENCIÓN: {len(critical)} hallazgos CRÍTICOS detectados")
print(f" Requieren acción inmediata")
return summary
if __name__ == "__main__":
print("=" * 70)
print("🦅 HAWK-EYE SCANNER - Automated Security Scan")
print("=" * 70)
mysql_output = f"{RESULTS_DIR}/mysql_{timestamp}.json"
s3_output = f"{RESULTS_DIR}/s3_{timestamp}.json"
consolidated_output = f"{RESULTS_DIR}/consolidated_{timestamp}.json"
summary_output = f"{RESULTS_DIR}/summary_{timestamp}.json"
latest_output = f"{RESULTS_DIR}/latest.json"
mysql_success = run_scan("mysql", mysql_output)
s3_success = run_scan("s3", s3_output)
if mysql_success or s3_success:
results = consolidate_results(mysql_output, s3_output, consolidated_output)
# SISTEMA DE TRACKING
print(f"\n{'='*70}")
print("🔄 Procesando con sistema de tracking...")
print(f"{'='*70}")
alert_mgr = AlertManager()
new_alerts = []
duplicate_count = 0
for finding in results:
processed = alert_mgr.process_finding(finding)
if processed['is_new']:
new_alerts.append(processed)
else:
duplicate_count += 1
print(f"\n📊 Resultados del tracking:")
print(f" • Total de hallazgos: {len(results)}")
print(f" • Alertas NUEVAS: {len(new_alerts)}")
print(f" • Ya vistos: {duplicate_count}")
stats = alert_mgr.get_stats()
if stats['critical_pending'] > 0:
print(f"\n ⚠️ {stats['critical_pending']} alertas CRÍTICAS pendientes")
# INTEGRACIÓN CON THEHIVE
thehive = TheHiveIntegration()
if thehive.test_connection():
print(f"\n{'='*70}")
print("🎯 Enviando alertas críticas a TheHive...")
print(f"{'='*70}")
cases_created = 0
for alert in new_alerts:
finding = alert['finding']
if finding.get('severity') in ['CRITICAL', 'HIGH']:
case_id = thehive.create_case(finding, alert['alert_hash'])
if case_id:
cases_created += 1
print(f"\n📋 Casos creados en TheHive: {cases_created}")
print(f"🌐 Accede al dashboard: http://localhost:9000")
else:
print("\n⚠️ TheHive no está disponible (casos no enviados)")
display_findings(results)
generate_summary(results, summary_output)
with open(latest_output, 'w') as f:
json.dump(results, f, indent=2)
print(f"\n{'='*70}")
print(f"✅ Escaneo completado exitosamente")
print(f"📁 Resultados guardados en: {ALERTS_DIR}/")
print(f"{'='*70}\n")
else:
print("\n❌ Escaneo falló")
exit(1)
docker exec -it hawk-scanner python run_hawk_scanner.py
¡Voilà, aquí están los registros marcados por severidad!

Ahora sabemos dónde están las joyas de la corona, para poder actuar en consecuencia. Vamos un poco mas adelante e integremos The Hive para poder hacer el seguimiento.
The Hive
Es la plataforma donde llevaremos adelante el seguimiento de los hallazgos. La misma esta en http://localhost:9000.
Usuario:
admin@thehive.localPassword:
secret
Es importante que obtengamos una API KEY para poder integrarla.
Crear organización (si no existe):
Haz clic en "Admin" (arriba a la derecha)
"Organizations" → "+ New Organization"
Nombre:
hawk-securityHaz clic en "Create"
Crear usuario con permisos:
"Users" → "+ New User"
Nombre:
hawk-scannerLogin:
hawk-scanner@hawk-securityPerfil:
org-admin(¡importante!)Organización:
hawk-securityContraseña:
HawkScanner2024!Haz clic en "Create"
Crear API Key:
Haz clic en el usuario
hawk-scannerque acabas de crearPestaña "API Keys"
Haz clic en "+ Create API Key"
Nombre:
hawk-scanner-apiHaz clic en "Create"
⚠️ COPIA LA KEY
Estos son datos dummies, yo lo cree asi.

La sumamos la API Key a thehive_integration.py, antes de ejecutar. Una vez que el Hawk Eye se ejecute, veremos en The Hive:
5 casos creados:
🔴 4 CRITICAL: Credit Cards (Visa, Mastercard, Amex, Discover)
🟠 1 HIGH: Social Security Number (SSN)

Listo, tenemos todo para darle seguimiento. Te dejo el repositorio para que puedas hacerlo en tu laboratorio.



