
import os
import re
import math
import asyncio
from io import BytesIO
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple

import pandas as pd
import httpx
import aiofiles
from fastapi import UploadFile, HTTPException
from starlette.datastructures import UploadFile as StarletteUploadFile, Headers
from dotenv import load_dotenv
from tabulate import tabulate  # opcional (debug/prints bonitos)

# .env
load_dotenv()

# 🔗 Conexión asíncrona (tu módulo final)
# Debe exponer: async def get_connection(), async def queries(sql, params=None), async def execute(sql, params=None)
from conexion_db.conexion import get_connection, execute, queries


# ────────────────────────────────────────────────────────────────────────────────
# Helpers de BD (listas blancas, etc.)
# ────────────────────────────────────────────────────────────────────────────────

async def traer_tablas_permitidas() -> List[str]:
    """
    Devuelve la lista de tablas permitidas desde listado_tablas_prospectos.
    """
    try:
        filas = await queries("SELECT * FROM listado_tablas_prospectos")
        return [f["nombre_tabla"] for f in filas] if filas else []
    except Exception:
        return []


# ────────────────────────────────────────────────────────────────────────────────
# Consultas generales
# ────────────────────────────────────────────────────────────────────────────────

async def getListaTablas() -> Dict[str, Any]:
    try:
        filas = await queries(
            "SELECT * FROM listado_tablas_prospectos WHERE estado_contacto = 'Prospectos'"
        )
        return {
            "success": 1,
            "message": "tablas obtenidas correctamente",
            "data": filas or [],
        }
    except Exception as e:
        return {"success": 0, "message": f"Error: {str(e)}"}


async def datos_tabla_seleccionada(data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Paginado + join con estados_etapas. Valida tabla por whitelist.
    Filtra estado_etapa IN (1,5) para los datos y cuenta solo estado_etapa=1, tal como tu lógica original.
    """
    cnx = None
    cur = None
    try:
        tabla = data.get("tablaSeleccionada")
        page = int(data.get("page", 1))
        page_size = int(data.get("pageSize", 5))
        offset = max(0, (page - 1) * page_size)

        tablas_validas = await traer_tablas_permitidas()
        if not tablas_validas:
            return {"success": False, "message": "No se encontraron tablas permitidas"}
        if tabla not in tablas_validas:
            return {"success": False, "message": "Tabla no permitida"}

        # Query principal con join
        query = f"""
            SELECT 
                t.*, e.estado_etapa AS estado_etapa_desc
            FROM 
                {tabla} t
            LEFT JOIN 
                estados_etapas e ON t.estado_etapa = e.id
            WHERE 
                t.estado_etapa IN (1, 5)
            ORDER BY 
                t.id DESC
            LIMIT %s OFFSET %s
        """
        registros = await queries(query, (page_size, offset))

        # Conteo (solo estado_etapa = 1)
        count_query = f"SELECT COUNT(*) AS total FROM {tabla} t WHERE t.estado_etapa = 1"
        total_rows = await queries(count_query)
        total = int(total_rows[0]["total"]) if total_rows else 0
        total_pages = (total // page_size) + (1 if total % page_size else 0)

        # Reemplazar ID por la descripción
        for r in registros:
            r["estado_etapa"] = r.pop("estado_etapa_desc", None)

        return {
            "success": True,
            "message": "Datos obtenidos correctamente",
            "data": registros,
            "totalRecords": total,
            "totalPages": total_pages,
            "currentPage": page,
        }

    except Exception as e:
        return {"success": False, "message": f"Error: {str(e)}"}
    finally:
        if cur:
            await cur.close()
        if cnx:
            await cnx.close()


async def actualizar_voluntario_db(id_voluntario: int, data: Dict[str, Any]) -> Dict[str, Any]:
    """
    UPDATE sobre varias tablas soportadas. Preserva fecha_registro en prospectos_manual.
    """
    cnx = await get_connection()
    cur = None
    if cnx is None:
        return {"success": False, "message": "Error de conexión a la base de datos"}

    try:
        nombre_tabla = data.get("nombre_tabla", "")
        if nombre_tabla not in [
            "inscripciones_voluntarios",
            "union_voluntarios_antiguos",
            "base_datos_10k",
            "registros_crm_antiguo",
            "prospectos_manual",
        ]:
            return {"success": False, "message": "Tabla no válida especificada en la solicitud"}

        # Datos comunes
        tipo_documento = data.get("tipo_documento")
        documento      = data.get("documento")
        nombres        = data.get("nombres")
        apellidos      = data.get("apellidos")
        correos        = data.get("correos")
        telefono       = data.get("telefono")
        pais           = data.get("pais")
        departamento   = data.get("departamento")
        ciudad         = data.get("ciudad")
        direccion      = data.get("direccion")
        genero         = data.get("genero")
        recomendado    = data.get("recomendado") or "No"
        nombre_recom   = data.get("nombre_recomendador") or "No recomendado"
        grupo_wp       = data.get("grupo_wp")
        grupo_wp1      = data.get("grupo_wp1")
        como_ayudar    = data.get("como_ayudar")
        pasion         = data.get("pasion")
        nombres_comp   = data.get("nombres_completos")
        intereses      = data.get("intereses")
        autorizacion   = data.get("autorizacion")
        barrio         = data.get("barrio")
        fecha_cumple   = data.get("fecha_cumpleanos")
        localidad      = data.get("localidad")
        tipo_sangre    = data.get("tipo_sangre")
        fecha_registro = data.get("fecha_registro")

        cur = await cnx.cursor(dictionary=True)
        query = None
        params: List[Any] = []

        if nombre_tabla == "inscripciones_voluntarios":
            query = """
                UPDATE inscripciones_voluntarios
                   SET tipo_documento=%s, documento=%s, nombres=%s, apellidos=%s, correos=%s,
                       telefono=%s, pais=%s, departamento=%s, ciudad=%s, direccion=%s,
                       genero=%s, recomendado=%s, nombre_recomendador=%s, grupo_wp=%s,
                       grupo_wp1=%s, como_ayudar=%s, pasion=%s
                 WHERE id=%s
            """
            params = [tipo_documento, documento, nombres, apellidos, correos, telefono,
                      pais, departamento, ciudad, direccion, genero, recomendado,
                      nombre_recom, grupo_wp, grupo_wp1, como_ayudar, pasion, id_voluntario]

        elif nombre_tabla == "base_datos_10k":
            query = """
                UPDATE base_datos_10k
                   SET fecha_registro=%s, nombres_completos=%s, documento=%s, correos=%s,
                       telefono=%s, departamento=%s, ciudad=%s, intereses=%s, autorizacion=%s
                 WHERE id=%s
            """
            params = [fecha_registro, nombres_comp, documento, correos,
                      telefono, departamento, ciudad, intereses, autorizacion, id_voluntario]

        elif nombre_tabla == "registros_crm_antiguo":
            query = """
                UPDATE registros_crm_antiguo
                   SET nombres=%s, apellidos=%s, correos=%s, telefono=%s,
                       tipo_documento=%s, documento=%s, departamento=%s, ciudad=%s, localidad=%s,
                       barrio=%s, direccion=%s, fecha_cumpleanos=%s, genero=%s, tipo_sangre=%s
                 WHERE id=%s
            """
            params = [nombres, apellidos, correos, telefono,
                      tipo_documento, documento, departamento, ciudad, localidad,
                      barrio, direccion, fecha_cumple, genero, tipo_sangre, id_voluntario]

        elif nombre_tabla == "prospectos_manual":
            # Preservar fecha_registro actual
            await cur.execute("SELECT fecha_registro FROM prospectos_manual WHERE id=%s", (id_voluntario,))
            row = await cur.fetchone()
            fecha_registro_actual = row["fecha_registro"] if row else None

            query = """
                UPDATE prospectos_manual
                   SET nombres=%s, apellidos=%s, tipo_documento=%s, documento=%s, correos=%s,
                       telefono=%s, pais=%s, departamento=%s, ciudad=%s, localidad=%s,
                       barrio=%s, direccion=%s, fecha_cumpleanos=%s, genero=%s, tipo_sangre=%s,
                       recomendado=%s, nombre_recomendador=%s, grupo_wp=%s, grupo_wp1=%s,
                       como_ayudar=%s, pasion=%s, intereses=%s, autorizacion=%s, fecha_registro=%s
                 WHERE id=%s
            """
            params = [nombres, apellidos, tipo_documento, documento, correos, telefono,
                      pais, departamento, ciudad, localidad, barrio, direccion, fecha_cumple,
                      genero, tipo_sangre, recomendado, nombre_recom, grupo_wp, grupo_wp1,
                      como_ayudar, pasion, intereses, autorizacion, fecha_registro_actual, id_voluntario]

        if not query:
            return {"success": False, "message": "Tabla no válida especificada en la solicitud"}

        await cur.execute(query, params)
        await cnx.commit()

        await cur.execute(f"SELECT * FROM {nombre_tabla} WHERE id=%s", (id_voluntario,))
        voluntario_actualizado = await cur.fetchone()

        return {"success": True, "message": "Voluntario actualizado", "data": voluntario_actualizado}
    except Exception as e:
        return {"success": False, "message": f"Error al actualizar los datos: {str(e)}"}
    finally:
        if cur:
            await cur.close()
        if cnx:
            await cnx.close()


async def createNotes(data: Dict[str, Any]) -> Dict[str, Any]:
    try:
        id_empleado = data.get("id_empleado")
        id_contacto_origen = data.get("id_contacto_origen")
        id_contacto_union = data.get("id_contacto_union")
        origen_datos = data.get("origen_datos")
        id_estado_etapa = data.get("id_estado_etapa")
        nota = data.get("nota")

        if id_estado_etapa in [None, "", "undefined", "null"]:
            id_estado_etapa = None

        if any(x is None for x in [id_empleado, id_contacto_origen, id_contacto_union, origen_datos, nota]):
            return {"success": 0, "message": "Faltan datos requeridos"}

        affected = await execute(
            """
            INSERT INTO notas_contacto
            (fecha_hora, id_empleado, id_contacto_origen, id_contacto_union, origen_datos, id_estado_etapa, nota)
            VALUES (NOW(), %s, %s, %s, %s, %s, %s)
            """,
            (id_empleado, id_contacto_origen, id_contacto_union, origen_datos, id_estado_etapa, nota),
        )
        return {"success": 1 if affected else 0, "message": "Nota creada correctamente" if affected else "No se insertó la nota"}
    except Exception as e:
        return {"success": 0, "message": f"Error: {str(e)}"}


async def getNotesContact(data: Dict[str, Any]) -> Dict[str, Any]:
    try:
        id_contacto_origen = data.get("id_contacto_origen")
        if not id_contacto_origen:
            return {"success": 0, "message": "Falta el ID del contacto"}

        notas = await queries(
            """
            SELECT nc.*,
                   CONCAT(e.nombres,' ',e.apellidos) AS nombre_empleado,
                   ee.estado_etapa AS estado_etapa
              FROM notas_contacto nc
              JOIN empleados e ON nc.id_empleado = e.id
         LEFT JOIN estados_etapas ee ON nc.id_estado_etapa = ee.id
             WHERE nc.id_contacto_origen = %s
          ORDER BY nc.fecha_hora DESC
            """,
            (id_contacto_origen,),
        )
        return {"success": 1, "message": "Notas obtenidas correctamente", "data": notas}
    except Exception as e:
        return {"success": 0, "message": f"Error: {str(e)}"}


async def verificacion_persona_contactada(data: Dict[str, Any]) -> Dict[str, Any]:
    try:
        idOrigenDato = data.get("idOrigenDato")
        nombreTabla = data.get("nombreTabla", "")

        filas = await queries(
            """
            SELECT id
              FROM union_contactos_personas
             WHERE id_origen_datos = %s
               AND origen_datos     = %s
            """,
            (idOrigenDato, nombreTabla),
        )
        if filas:
            return {
                "success": True,
                "message": "Registro encontrado en tabla de unión",
                "data": {"campo_encontrado": filas[0]["id"], "exist": True},
            }
        return {
            "success": True,
            "message": "Registro no encontrado en la tabla unión",
            "data": {"campo_encontrado": None, "exist": False},
        }
    except Exception as e:
        return {"success": False, "message": f"Error en la consulta: {str(e)}", "data": {}}


async def actualizar_estado_etapa_verificado(data: Dict[str, Any]) -> Dict[str, Any]:
    try:
        origen_datos = data.get("origen_datos")
        estado_etapa = data.get("estado_etapa")
        id_registro  = data.get("id")

        tablas_validas = [
            "inscripciones_voluntarios",
            "registros_crm_antiguo",
            "base_datos_10k",
            "prospectos_manual",
        ]
        if origen_datos not in tablas_validas:
            return {"success": False, "message": "La tabla especificada no es válida", "data": {}}

        affected = await execute(
            f"UPDATE {origen_datos} SET estado_etapa=%s WHERE id=%s",
            (estado_etapa, id_registro),
        )
        if affected:
            return {
                "success": True,
                "message": "Estado de la etapa del contacto actualizado a primer contacto",
                "data": {"id": id_registro},
            }
        return {"success": False, "message": "No se encontró el registro con el id proporcionado", "data": {}}
    except Exception as e:
        return {"success": False, "message": f"Error en la actualización: {str(e)}", "data": {}}


async def agregar_prospecto_nuevo(data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Inserta en tabla validada por whitelist. Verifica duplicados por documento/correos/telefono.
    Devuelve el ID insertado.
    """
    cnx = None
    cur = None
    try:
        nombreTabla = data.get("nombreTabla", "")
        tablas_validas = [
            "inscripciones_voluntarios",
            "registros_crm_antiguo",
            "base_datos_10k",
            "prospectos_manual",
        ]
        if nombreTabla not in tablas_validas:
            return {"success": False, "message": "Tabla no válida", "data": {}}

        # Campos
        nombres           = data.get("nombres", "")
        apellidos         = data.get("apellidos", "")
        documento         = data.get("documento", "")
        correos           = data.get("correos", "")
        telefono          = data.get("telefono", "")
        tipo_documento    = data.get("tipo_documento")
        departamento      = data.get("departamento")
        ciudad            = data.get("ciudad")
        estado_etapa      = data.get("estado_etapa")
        genero            = data.get("genero")
        fecha_registro    = data.get("fecha_registro")
        intereses         = data.get("intereses")
        autorizacion      = data.get("autorizacion")
        pais              = data.get("pais")
        direccion         = data.get("direccion")
        recomendado       = data.get("recomendado")
        nombre_recom      = data.get("nombre_recomendador")
        grupo_wp          = data.get("grupo_wp")
        grupo_wp1         = data.get("grupo_wp1")
        como_ayudar       = data.get("como_ayudar")
        pasion            = data.get("pasion")
        localidad         = data.get("localidad")
        barrio            = data.get("barrio")
        fecha_cumple      = data.get("fecha_cumpleanos")
        tipo_sangre       = data.get("tipo_sangre")

        # Duplicados (documento, correos, telefono)
        if documento:
            dup = await queries(f"SELECT id FROM {nombreTabla} WHERE documento=%s", (documento,))
            if dup:
                return {"success": False, "message": "Ya existe un registro con el documento proporcionado", "data": {}}
        if correos and not dup:
            dup = await queries(f"SELECT id FROM {nombreTabla} WHERE correos=%s", (correos,))
            if dup:
                return {"success": False, "message": "Ya existe un registro con el correos proporcionado", "data": {}}
        if telefono and not dup:
            dup = await queries(f"SELECT id FROM {nombreTabla} WHERE telefono=%s", (telefono,))
            if dup:
                return {"success": False, "message": "Ya existe un registro con el telefono proporcionado", "data": {}}

        cnx = await get_connection()
        cur = await cnx.cursor(dictionary=True)

        if nombreTabla == "inscripciones_voluntarios":
            sql = """
                INSERT INTO inscripciones_voluntarios (
                    tipo_documento, documento, nombres, apellidos, correos, telefono,
                    pais, departamento, ciudad, direccion, genero, recomendado,
                    nombre_recomendador, grupo_wp, grupo_wp1, como_ayudar, pasion,
                    fecha_registro, estado_etapa, fecha_estado
                ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,NOW())
            """
            vals = (tipo_documento, documento, nombres, apellidos, correos, telefono,
                    pais, departamento, ciudad, direccion, genero, recomendado,
                    nombre_recom, grupo_wp, grupo_wp1, como_ayudar, pasion,
                    fecha_registro, estado_etapa)

        elif nombreTabla == "base_datos_10k":
            nombres_completos = f"{nombres} {apellidos}".strip()
            sql = """
                INSERT INTO base_datos_10k (
                    fecha_registro, nombres_completos, documento, correos, telefono,
                    departamento, ciudad, intereses, autorizacion, estado_etapa, fecha_estado
                ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,NOW())
            """
            vals = (fecha_registro, nombres_completos, documento, correos, telefono,
                    departamento, ciudad, intereses, autorizacion, estado_etapa)

        elif nombreTabla == "registros_crm_antiguo":
            sql = """
                INSERT INTO registros_crm_antiguo (
                    nombres, apellidos, correos, telefono, tipo_documento, documento,
                    departamento, ciudad, localidad, barrio, direccion, fecha_cumpleanos,
                    genero, tipo_sangre, estado_etapa, fecha_estado
                ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,NOW())
            """
            vals = (nombres, apellidos, correos, telefono, tipo_documento, documento,
                    departamento, ciudad, localidad, barrio, direccion, fecha_cumple,
                    genero, tipo_sangre, estado_etapa)

        else:  # prospectos_manual
            sql = """
                INSERT INTO prospectos_manual (
                    nombres, apellidos, tipo_documento, documento, correos, telefono,
                    pais, departamento, ciudad, localidad, barrio, direccion, fecha_cumpleanos,
                    genero, tipo_sangre, recomendado, nombre_recomendador, grupo_wp, grupo_wp1,
                    como_ayudar, pasion, intereses, autorizacion, fecha_registro, estado_etapa, fecha_estado
                ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,NOW())
            """
            vals = (nombres, apellidos, tipo_documento, documento, correos, telefono,
                    pais, departamento, ciudad, localidad, barrio, direccion, fecha_cumple,
                    genero, tipo_sangre, recomendado, nombre_recom, grupo_wp, grupo_wp1,
                    como_ayudar, pasion, intereses, autorizacion, fecha_registro, estado_etapa)

        await cur.execute(sql, vals)
        await cnx.commit()
        new_id = cur.lastrowid

        return {"success": True, "message": "Prospecto agregado exitosamente", "data": {"id": new_id}}
    except Exception as e:
        # Si necesitas la última statement, mysql-connector expone cur.statement
        try:
            last = getattr(cur, "statement", None)
            if last:
                print("Última consulta:", last)
        except Exception:
            pass
        return {"success": False, "message": f"Error en la inserción: {str(e)}", "data": {}}
    finally:
        if cur:
            await cur.close()
        if cnx:
            await cnx.close()


# ────────────────────────────────────────────────────────────────────────────────
# Manejo de archivos (Excel/CSV) y pasos de procesamiento
# ────────────────────────────────────────────────────────────────────────────────

async def archivo_a_dataframe(archivo: UploadFile, procesamiento_ligero: bool = False) -> pd.DataFrame:
    """
    Convierte un UploadFile (Excel o CSV) en un DataFrame de pandas.
    Usa asyncio.to_thread para que pandas no bloquee el event loop.
    """
    TIPOS_EXCEL = {
        "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
        "application/vnd.ms-excel",
        "application/vnd.ms-excel.sheet.macroEnabled.12",
    }
    nombre_archivo = (archivo.filename or "").lower()
    tipo_contenido = archivo.content_type or ""

    def es_excel() -> bool:
        return nombre_archivo.endswith((".xlsx", ".xls", ".xlsm", ".xlsb")) or tipo_contenido in TIPOS_EXCEL

    if procesamiento_ligero:
        try:
            archivo.file.seek(0)
        except Exception:
            pass
        if es_excel():
            df = await asyncio.to_thread(pd.read_excel, archivo.file, sheet_name=0, dtype=str, nrows=15)
        else:
            df = await asyncio.to_thread(pd.read_csv, archivo.file, dtype=str, sep=None, engine="python", nrows=15)
        try:
            archivo.file.seek(0)
        except Exception:
            pass
        return df

    raw = await archivo.read()
    try:
        archivo.file.seek(0)
    except Exception:
        pass

    if es_excel():
        return await asyncio.to_thread(pd.read_excel, BytesIO(raw), sheet_name=0, dtype=str)
    return await asyncio.to_thread(pd.read_csv, BytesIO(raw), dtype=str, sep=None, engine="python")


async def subir_archivo_excel(archivo: UploadFile) -> Dict[str, Any]:
    """
    Guarda el archivo en disco de forma asíncrona, leyendo en chunks.
    """
    try:
        carpeta_destino = "prospectos/archivo"
        os.makedirs(carpeta_destino, exist_ok=True)

        _, extension = os.path.splitext(archivo.filename or "")
        current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
        nombre_archivo = f"{current_time}{extension}"
        ruta_archivo = os.path.join(carpeta_destino, nombre_archivo)

        async with aiofiles.open(ruta_archivo, "wb") as f:
            while True:
                chunk = await archivo.read(1024 * 1024)  # 1MB
                if not chunk:
                    break
                await f.write(chunk)

        try:
            await archivo.seek(0)
        except Exception:
            pass

        return {
            "success": True,
            "message": f"Archivo guardado exitosamente en {ruta_archivo}",
            "data": {"ruta": ruta_archivo, "nombre": nombre_archivo},
        }
    except Exception as e:
        return {"success": False, "message": f"Error al guardar el archivo: {str(e)}", "data": {}}


def excel_a_uploadfile(nombre_excel: str) -> StarletteUploadFile:
    """
    Crea un UploadFile desde un archivo en disco (sync es OK aquí).
    """
    carpeta = "prospectos/archivo"
    ruta = os.path.join(carpeta, nombre_excel)
    if not os.path.exists(ruta):
        raise FileNotFoundError(f"No existe el archivo: {ruta}")

    extension = os.path.splitext(nombre_excel)[1].lower()
    if extension in [".xlsx", ".xlsm", ".xlsb", ".xls"]:
        content_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
    elif extension == ".csv":
        content_type = "text/csv"
    else:
        content_type = "application/octet-stream"

    headers = Headers({"content-type": content_type})
    return StarletteUploadFile(filename=nombre_excel, file=open(ruta, "rb"), headers=headers)


async def procesamiento_principal_paso_1_validacion_formato(excel: UploadFile) -> Dict[str, Any]:
    subir_excel = await subir_archivo_excel(excel)
    if not subir_excel.get("success"):
        return {"success": False, "message": "Error al subir el archivo", "data": {}}

    df = await archivo_a_dataframe(excel, procesamiento_ligero=True)
    if df is None:
        return {"success": False, "message": "Error al leer el archivo", "data": {}}

    if not validar_headers_en_primera_fila(df):
        return {"success": False, "message": "Encabezados inválidos en el archivo"}
    return {"success": True, "message": "Encabezados válidos", "id": subir_excel.get("data", {}).get("nombre", "")}


async def procesamiento_principal_paso_2_obtener_columnas_excel(excel: str) -> Dict[str, Any]:
    up = excel_a_uploadfile(excel)
    df = await archivo_a_dataframe(up, procesamiento_ligero=True)
    if df is None:
        return {"success": False, "message": "Error al leer el archivo", "data": {}}
    columnas = obtener_columnas_df(df)
    if not columnas:
        return {"success": False, "message": "No se encontraron columnas en el archivo", "data": {"columnas": []}}
    return {"success": True, "message": "Columnas obtenidas", "data": {"columnas": columnas, "nombre_archivo": up.filename}}


async def procesamiento_principal_paso_3_obtener_columnas_obligatorias_base_de_datos():
    return await obtener_columnas_obligatorias_prospectos_manual()


async def procesamiento_principal_paso_4_obtener_columnas_opcionales_base_de_datos():
    return await obtener_columnas_opcionales_prospectos_manual()


async def procesamiento_principal_paso_5_validacion_openai(nombre_excel: str):
    up = excel_a_uploadfile(nombre_excel)
    df = await archivo_a_dataframe(up, procesamiento_ligero=True)
    respuesta_openai = await validar_datos_con_openai(df, timeout_seconds=60)
    return respuesta_openai


async def procesamiento_principal_paso_6_procesar_excel_y_guardar_en_base_de_datos(
    nombre_excel: str,
    mapeo_columnas: Dict[str, str],
) -> Dict[str, Any]:
    print("=== Paso 6: Inicio ===")
    print(f"📌 Mapeo Columnas recibido: {mapeo_columnas}")

    up = excel_a_uploadfile(nombre_excel)
    df = await archivo_a_dataframe(up)
    if df is None:
        print("❌ Error: no se pudo leer el Excel")
        return {"success": False, "message": "Error al leer el archivo", "data": {}}

    try:
        print(f"✅ DataFrame leído con {len(df)} filas y {len(df.columns)} columnas")
        print(f"📋 Columnas originales del Excel: {list(df.columns)}")

        faltantes = [col for col in mapeo_columnas.keys() if col not in df.columns]
        if faltantes:
            print(f"⚠️ Columnas faltantes: {faltantes}")
            return {
                "success": False,
                "message": f"Columnas no encontradas en Excel: {faltantes}",
                "data": {"faltantes": faltantes, "columns_excel": list(df.columns)},
            }

        print("👉 Aplicando mapeo_de_columnas...")
        df_mapeado = mapeo_de_columnas(df.copy(), mapeo_columnas)
        print(f"✅ df_mapeado listo. Columnas: {list(df_mapeado.columns)}")

        print("👉 Aplicando incluir_solo_columnas_necesarias...")
        df_necesarias = await incluir_solo_columnas_necesarias(df_mapeado.copy())
        print(f"✅ df_con_columnas_necesarias listo. Columnas: {list(df_necesarias.columns)}")

        print("👉 Aplicando normalizar_columnas_obligatorias...")
        df_obl = normalizar_columnas_obligatorias(df_necesarias.copy())
        print(f"✅ df_columnas_obligatorias listo. Columnas: {list(df_obl.columns)}")

        print("👉 Aplicando normalizar_todas_columnas...")
        df_clean = normalizar_todas_columnas(df_obl.copy())
        print(f"✅ df_sin_strings_vacios listo. Columnas: {list(df_clean.columns)}")

        print("👉 Aplicando categorizacion_prospectos...")
        df_cat = categorizacion_prospectos(df_clean.copy())
        print(f"✅ df_categorizado listo. Columnas: {list(df_cat.columns)}")

        print("Entrando a insertar_prospectos...")
        insercion = await insertar_prospectos(df_cat.copy())
        print(f"Resultado de la inserción: {insercion}")
        print("✅ Inserción masiva finalizada.")
        print("=== Paso 6: Finalizado correctamente ===")

        return {
            "success": True,
            "data": {
                "cantidad_total_de_filas": len(df_cat),
                "cantidad_con_validacion_total": insercion.get("prospectos_manual_total", 0),
                "cantidad_con_validacion_parcial": insercion.get("prospectos_manual_parcial", 0),
                "cantidad_con_validacion_descartados": insercion.get("descartados_contactabilidad", 0),
            },
        }
    except Exception as e:
        import traceback
        traceback.print_exc()
        raise HTTPException(status_code=400, detail=str(e))


# ────────────────────────────────────────────────────────────────────────────────
# Utilidades de DataFrame / OpenAI
# ────────────────────────────────────────────────────────────────────────────────

def validar_headers_en_primera_fila(df: pd.DataFrame) -> bool:
    columnas = [str(c).strip().lower() for c in df.columns]
    headers_validos = [c for c in columnas if c and not c.startswith("unnamed")]
    return len(headers_validos) > 0


async def obtener_columnas_obligatorias_prospectos_manual():
    filas = await queries(
        "SELECT columna FROM columnas_obligatorias WHERE es_obligatoria=1 AND tabla='prospectos_manual'"
    )
    data = [c["columna"] for c in (filas or []) if c["columna"] != "id"]
    return {"success": True, "message": "Columnas obligatorias obtenidas", "data": data}


async def obtener_todas_las_columnas_prospectos_manual():
    filas = await queries(
        "SELECT columna FROM columnas_obligatorias WHERE tabla='prospectos_manual'"
    )
    data = [c["columna"] for c in (filas or []) if c["columna"] != "id"]
    return {"success": True, "message": "Todas las columnas obtenidas", "data": data}


async def obtener_columnas_opcionales_prospectos_manual():
    filas = await queries(
        "SELECT columna FROM columnas_obligatorias WHERE es_obligatoria=0 AND tabla='prospectos_manual'"
    )
    data = [c["columna"] for c in (filas or []) if c["columna"] != "id"]
    return {"success": True, "message": "Columnas opcionales obtenidas", "data": data}


def obtener_columnas_df(df: pd.DataFrame) -> List[str]:
    return [str(c).strip() for c in df.columns.tolist()]


def obtener_primeras_10_filas_df(df: pd.DataFrame) -> List[Dict[str, Any]]:
    df_muestra = df.head(10).copy().fillna("")
    return df_muestra.to_dict(orient="records")


async def validar_datos_con_openai(
    df: pd.DataFrame,
    openai_api_key: Optional[str] = None,
    model: str = "gpt-4o",
    timeout_seconds: int = 30,
) -> Dict[str, Any]:
    columnas_resp = await obtener_columnas_obligatorias_prospectos_manual()
    columnas = columnas_resp.get("data", [])
    filas_texto = str(obtener_primeras_10_filas_df(df))
    prompt = f"""
Eres un validador de coherencia de datos tabulares. 
Tu tarea es revisar si las columnas especificadas tienen sentido lógico y son consistentes, 
basándote en la semántica de los valores y no únicamente en el formato.

COLUMNAS A VALIDAR:
{columnas}

MUESTRA DE DATOS (lista de diccionarios que mapea la tabla):
{filas_texto}

Criterios de validación:
- Documento: puede ser cédula, tarjeta de identidad, pasaporte, NIT u otro identificador. 
  Debe parecer razonable (solo dígitos o combinación alfanumérica típica de documentos). 
  Ejemplos válidos: "1032456789", "TI12345", "NIT 900123456". 
  Ejemplos inválidos: "ABCXYZ", "123hola", "!!!!!".
- Nombres/Apellidos: deben contener letras (mínimo 2 caracteres alfabéticos). 
  Se permite que tengan espacios, tildes o caracteres especiales comunes en nombres. 
  Son inválidos si contienen solo números, símbolos o letras aleatorias sin sentido.
- Teléfonos: deben ser cadenas numéricas razonables (7–15 dígitos aprox.), 
  se aceptan con prefijos internacionales (+57, etc.). 
- En general: un valor se considera coherente si corresponde con el tipo esperado para la columna. 
  No te fijes en mayúsculas/minúsculas o pequeños errores de formato.
- Los valores nulos o vacíos NO cuentan como incoherentes.

Regla de decisión:
- Evalúa fila por fila las columnas especificadas. 
- Si en una fila la mayoría de las columnas evaluadas son coherentes → la fila es aceptable. 
- Si alguna fila tiene la mayoría de sus columnas incoherentes → toda la validación se considera fallida.

Regla de salida:
- Si TODAS las filas son aceptables → responde ÚNICAMENTE: exitosamenteexitoso
- Si ALGUNA fila es fallida → responde ÚNICAMENTE: fracasadamentefracasado

No des explicaciones, no muestres porcentajes ni repitas datos. 
Responde SOLO con la palabra clave fracasadamentefracasado o exitosamenteexitoso.
"""
    api_key = (openai_api_key or os.getenv("OPENAI_API_KEY") or "").strip().strip('"').strip("'")
    if not api_key:
        return {"ai_validation": "fracasadamentefracasado", "success": False}

    headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
    payload = {
        "model": model,
        "messages": [
            {"role": "system", "content": "Eres un verificador de coherencia de datos tabulares."},
            {"role": "user", "content": prompt},
        ],
        "temperature": 0,
    }

    ai_text = "fracasadamentefracasado"
    try:
        async with httpx.AsyncClient(timeout=timeout_seconds) as client:
            resp = await client.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
            resp.raise_for_status()
            data = resp.json()
            ai_text = (data.get("choices", [{}])[0].get("message", {}).get("content", "") or ai_text).strip().lower()
    except Exception as e:
        print(f"Error en la llamada a OpenAI: {e}")

    return {"ai_validation": ai_text, "success": ai_text == "exitosamenteexitoso"}


# ────────────────────────────────────────────────────────────────────────────────
# Transformaciones de DataFrame
# ────────────────────────────────────────────────────────────────────────────────

def mapeo_de_columnas(df: pd.DataFrame, mapeo: Dict[str, str]) -> pd.DataFrame:
    return df.rename(columns=mapeo)


async def incluir_solo_columnas_necesarias(df: pd.DataFrame) -> pd.DataFrame:
    """
    Trae columnas de la BD (async), agrega faltantes (None) y ordena.
    """
    columnas_tabla = (await obtener_todas_las_columnas_prospectos_manual()).get("data", [])
    for col in columnas_tabla:
        if col not in df.columns:
            df[col] = None
    return df[columnas_tabla].copy()


def normalizar_columnas_obligatorias(df: pd.DataFrame) -> pd.DataFrame:
    def es_vacio(v) -> bool:
        return pd.isna(v) or str(v).strip() == ""

    def limpiar_texto(v):
        return None if es_vacio(v) else str(v).strip().lower()

    def limpiar_documento(v):
        if es_vacio(v):
            return None
        solo = re.sub(r"\D", "", str(v))
        return solo if solo else None

    def limpiar_correo(v):
        if es_vacio(v):
            return None
        correo = str(v).strip().lower()
        return correo if "@" in correo else None

    def limpiar_telefono(v):
        if es_vacio(v):
            return None
        tel = re.sub(r"[^0-9 +]", "", str(v)).strip().lower()
        return tel if tel else None

    if "nombres"   in df.columns: df["nombres"]   = df["nombres"].apply(limpiar_texto)
    if "apellidos" in df.columns: df["apellidos"] = df["apellidos"].apply(limpiar_texto)
    if "documento" in df.columns: df["documento"] = df["documento"].apply(limpiar_documento)
    if "correos"   in df.columns: df["correos"]   = df["correos"].apply(limpiar_correo)
    if "telefono"  in df.columns: df["telefono"]  = df["telefono"].apply(limpiar_telefono)
    return df


def normalizar_todas_columnas(df: pd.DataFrame) -> pd.DataFrame:
    def es_vacio(v) -> bool:
        return pd.isna(v) or str(v).strip() == ""

    def limpiar(v):
        return None if es_vacio(v) else str(v).strip().lower()

    for col in df.columns:
        df[col] = df[col].apply(limpiar)
    return df


def categorizacion_prospectos(df: pd.DataFrame) -> pd.DataFrame:
    def es_valido(valor: Optional[str]) -> bool:
        if not valor or pd.isna(valor):
            return False
        v = str(valor).strip().lower()
        return v not in ["no ingresado", "mal digitado"]

    categorias: List[str] = []
    for _, fila in df.iterrows():
        correo_ok = es_valido(fila.get("correos"))
        telefono_ok = es_valido(fila.get("telefono"))
        if correo_ok and telefono_ok:
            categorias.append("total")
        elif correo_ok or telefono_ok:
            categorias.append("parcial")
        else:
            categorias.append("descartado")
    df = df.copy()
    df["categoria_prospecto"] = categorias
    return df


# ────────────────────────────────────────────────────────────────────────────────
# Inserciones masivas (async)
# ────────────────────────────────────────────────────────────────────────────────

async def insercion_masiva_dataframe(
    df: pd.DataFrame,
    nombre_tabla: str,
    tamano_lote: int = 500,  # ↓ baja de 5000 → 100 para locks más cortos
) -> int:
    """
    Inserta un DataFrame en MySQL por lotes (async) usando execute() con chunking interno.
    Agrega `estado_etapa`=1 y `fecha_estado`=NOW() en VALUES.
    Devuelve la cantidad de filas insertadas.
    """
    if df.empty:
        return 0

    # Columnas originales + extras fijas
    columnas = list(df.columns)
    columnas_sql = ", ".join(f"`{c}`" for c in columnas) + ", `estado_etapa`, `fecha_estado`"
    placeholders = ", ".join(["%s"] * len(columnas)) + ", 1, NOW()"

    sql = f"INSERT INTO `{nombre_tabla}` ({columnas_sql}) VALUES ({placeholders})"

    # Convierte NaN → None y arma lista de tuplas
    params: List[Tuple[Any, ...]] = [
        tuple(None if pd.isna(v) else v for v in fila)  # valores del DF
        for fila in df.itertuples(index=False, name=None)
    ]

    # Delega chunking+retry+commit por lote a execute()
    insertados = await execute(sql, params, chunk_size=tamano_lote)
    return insertados

async def insertar_prospectos(df: pd.DataFrame) -> Dict[str, Any]:
    """
    total/parcial → prospectos_manual
    descartado   → descartados_contactabilidad
    """
    resumen = {
        "success": False,
        "prospectos_manual_total": 0,
        "prospectos_manual_parcial": 0,
        "descartados_contactabilidad": 0,
        "errors": [],
    }
    try:
        if df.empty:
            resumen["success"] = True
            return resumen

        if "categoria_prospecto" not in df.columns:
            raise ValueError("El DataFrame debe incluir la columna 'categoria_prospecto'.")

        df = df.copy()
        df["categoria_prospecto"] = df["categoria_prospecto"].astype(str).str.strip().str.lower()

        columnas_comunes = [
            "nombres","apellidos","tipo_documento","documento","correos","telefono","pais",
            "departamento","ciudad","localidad","barrio","direccion","fecha_cumpleanos",
            "genero","tipo_sangre","recomendado","nombre_recomendador","grupo_wp","grupo_wp1",
            "como_ayudar","pasion","intereses","autorizacion","fecha_registro",
            "validacion_contactabilidad",
        ]

        def cols_presentes(cdf: pd.DataFrame, cols: List[str]) -> List[str]:
            return [c for c in cols if c in cdf.columns]

        # TOTAL
        try:
            df_total = df[df["categoria_prospecto"] == "total"].copy()
            if not df_total.empty:
                df_total["validacion_contactabilidad"] = 2
                cols = cols_presentes(df_total, columnas_comunes)
                resumen["prospectos_manual_total"] = await insercion_masiva_dataframe(
                    df_total[cols], "prospectos_manual"
                )
        except Exception as e:
            resumen["errors"].append(f"Error insertando prospectos TOTAL: {str(e)}")

        # PARCIAL
        try:
            df_parcial = df[df["categoria_prospecto"] == "parcial"].copy()
            if not df_parcial.empty:
                df_parcial["validacion_contactabilidad"] = 1
                cols = cols_presentes(df_parcial, columnas_comunes)
                resumen["prospectos_manual_parcial"] = await insercion_masiva_dataframe(
                    df_parcial[cols], "prospectos_manual"
                )
        except Exception as e:
            resumen["errors"].append(f"Error insertando prospectos PARCIAL: {str(e)}")

        # DESCARTADOS
        try:
            df_desc = df[df["categoria_prospecto"] == "descartado"].copy()
            if not df_desc.empty:
                df_desc["validacion_contactabilidad"] = 0
                cols = cols_presentes(df_desc, columnas_comunes)
                resumen["descartados_contactabilidad"] = await insercion_masiva_dataframe(
                    df_desc[cols], "descartados_contactabilidad"
                )
        except Exception as e:
            resumen["errors"].append(f"Error insertando prospectos DESCARTADOS: {str(e)}")

        resumen["success"] = len(resumen["errors"]) == 0
        return resumen
    except Exception as e:
        resumen["errors"].append(f"Error general: {str(e)}")
        resumen["success"] = False
        return resumen