from celery_app import celery_app
from celery.result import AsyncResult
import os
import pandas as pd
from flask import Blueprint, jsonify, request
from datetime import datetime
import math
import gc
from flask_login import login_required, current_user


from functools import lru_cache
import pyarrow as pa
import pyarrow.parquet as pq
import sys

from tasks_backtestingidea2 import run_backtesting_idea

import redis as redis_lib
import datetime
import pytz
import exchange_calendars as xcals
_rate_redis = redis_lib.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# ── Rate limiting ─────────────────────────────────────────────
RATE_LIMIT_WINDOW        = 60         # segundos
RATE_LIMIT_MAX_MERCADO   = 5          # horario restrictivo — anti-bot
RATE_LIMIT_MAX_LIBRE     = 50         # horario libre — permisivo
RATE_LIMIT_WHITELIST     = {9}        # user_ids sin límite (agregar: {9, 15, 23})
# Zona restrictiva: lun-vie 7:30am–6:00pm NY, excluyendo feriados NYSE
RATE_LIMIT_HORA_ON       = (7, 30)    # 7:30am NY — 2h antes de apertura
RATE_LIMIT_HORA_OFF      = (18, 0)    # 6:00pm NY — 2h después del cierre
# ─────────────────────────────────────────────────────────────

_nyse_cal = xcals.get_calendar("XNYS")

def _en_horario_restrictivo() -> bool:
    """
    True si estamos en zona restrictiva: lun-vie 7:30am–6:00pm NY,
    excluyendo feriados NYSE (Thanksgiving, Christmas, etc.).
    Fines de semana y feriados → siempre libre.
    """
    tz_ny = pytz.timezone('America/New_York')
    now_ny = datetime.datetime.now(tz_ny)

    # Fines de semana → libre
    if now_ny.weekday() >= 5:
        return False

    # Feriados NYSE → libre
    fecha_hoy = now_ny.date().isoformat()
    if not _nyse_cal.is_session(fecha_hoy):
        return False

    # Comparar por minutos para manejar el :30
    minutos_ahora = now_ny.hour * 60 + now_ny.minute
    minutos_on    = RATE_LIMIT_HORA_ON[0]  * 60 + RATE_LIMIT_HORA_ON[1]   # 450
    minutos_off   = RATE_LIMIT_HORA_OFF[0] * 60 + RATE_LIMIT_HORA_OFF[1]  # 1080
    return minutos_on <= minutos_ahora < minutos_off

def _check_rate_limit(user_id, endpoint, max_calls=8, window=60):
    """Devuelve True si el usuario superó el límite."""
    if user_id in RATE_LIMIT_WHITELIST:
        return False

    restrictivo = _en_horario_restrictivo()
    limite = RATE_LIMIT_MAX_MERCADO if restrictivo else RATE_LIMIT_MAX_LIBRE

    key = f"rl:{endpoint}:{user_id}"
    count = _rate_redis.incr(key)
    if count == 1:
        _rate_redis.expire(key, RATE_LIMIT_WINDOW)
    if count > limite:
        print(f"[RATE_LIMIT] endpoint={endpoint} user_id={user_id} count={count} limite={limite} restrictivo={restrictivo}")
        return True
    return False


backtestingIdea2_bp = Blueprint('backtestingIdea2', __name__, template_folder='templates')

# Ruta base para archivos
PATH_UBUNTU_CHAINS = "/var/www/html/flask_project/"


def _load_chain(symbol: str, fecha: str, base: str) -> pd.DataFrame:
    folder = os.path.join(base, "chains")
    prefix = "$" if symbol in ["SPX", "RUT", "XSP"] else ""
    pq_path = os.path.join(folder, f"optionChain_{prefix}{symbol}_{fecha}.parquet")

    # print(f"[DEBUG] Intentando cargar {symbol} para {fecha}")

    if os.path.exists(pq_path):
        # print(f"[DEBUG] Leyendo PARQUET desde {pq_path}")
        df = pq.read_table(pq_path).to_pandas()
    else:
        # print(f"[ERROR] No existe PARQUET para {fecha} (ni CSV)")
        return pd.DataFrame()

    df["avg_call"] = (df["bid_call"] + df["ask_call"]) / 2
    df["avg_put"] = (df["bid_put"] + df["ask_put"]) / 2
    df.set_index(["timestamp", "strike"], inplace=True)

    return df


# ============================ FUNCIONES VERTICALES ============================


def calcular_credito_y_underlying_desde_archivo_csv(archivo_option_chain, strike1, strike2, option_type, timeHour):
    try:
        option_chain_df = pd.read_csv(archivo_option_chain, usecols=range(9))
        option_chain_df['timestamp'] = pd.to_datetime(option_chain_df['timestamp'])
        filtered_chain = option_chain_df[option_chain_df['timestamp'] >= timeHour]
        if filtered_chain.empty:
            return 0, None

        strikes_data = filtered_chain[(filtered_chain['strike'].isin([strike1, strike2]))]
        if option_type == 'PUT':
            strikes_data = strikes_data[["strike", "bid_put", "ask_put", "underlying_price"]]
        elif option_type == 'CALL':
            strikes_data = strikes_data[["strike", "bid_call", "ask_call", "underlying_price"]]

        strike1_data = strikes_data[strikes_data['strike'] == strike1].iloc[0]
        strike2_data = strikes_data[strikes_data['strike'] == strike2].iloc[0]

        if option_type == 'PUT':
            strike1_avg = (strike1_data['bid_put'] + strike1_data['ask_put']) / 2
            strike2_avg = (strike2_data['bid_put'] + strike2_data['ask_put']) / 2
        elif option_type == 'CALL':
            strike1_avg = (strike1_data['bid_call'] + strike1_data['ask_call']) / 2
            strike2_avg = (strike2_data['bid_call'] + strike2_data['ask_call']) / 2

        credit_received = strike1_avg - strike2_avg
        underlying_price = strike1_data['underlying_price']

        return credit_received, underlying_price
    except:
        return 0, None


# ========================= CREDIT TARGET HELPERS =========================

def _buscar_credito_con_target_vertical(df_chain, strike1, strike2, option_type, entry_time, credit_target):
    """
    Vectorizado: busca el primer timestamp con crédito >= credit_target sostenido 3 ticks consecutivos.
    - Primer tick del día ya cumple: devuelve crédito real.
    - Cumple más tarde: devuelve credit_target / 100.
    - Ninguno cumple: devuelve (None, None, None) → NO_ENTRY.
    """
    try:
        df = df_chain.reset_index()
        entry_time = entry_time.replace(second=30, microsecond=0)
        col = f'avg_{option_type.lower()}'

        df_filtered = df[
            (df['timestamp'] >= entry_time) &
            (df['strike'].isin([strike1, strike2]))
        ][['timestamp', 'strike', col, 'underlying_price']]

        if df_filtered.empty:
            return None, None, None

        pivot = df_filtered.pivot_table(index='timestamp', columns='strike', values=col, aggfunc='first')

        if strike1 not in pivot.columns or strike2 not in pivot.columns:
            return None, None, None

        credits = (pivot[strike1] - pivot[strike2]) * 100
        underlying_ts = df_filtered.drop_duplicates('timestamp').set_index('timestamp')['underlying_price']

        cumple = (credits >= credit_target).fillna(False)

        # Si el primer tick ya cumple → entrar directo, sin esperar 3 ticks
        if bool(cumple.iloc[0]):
            primer_ts = cumple.index[0]
            credit_real = credits.loc[primer_ts] / 100
            if credit_real > abs(strike1 - strike2):
                return None, None, None
            underlying   = underlying_ts.loc[primer_ts] if primer_ts in underlying_ts.index else None
            entry_ts_str = pd.Timestamp(primer_ts).strftime('%H:%M')
            return credit_real, underlying, entry_ts_str

        # Si no cumple desde el inicio → buscar 3 ticks consecutivos
        confirmado = (cumple.astype(int).rolling(3).sum() >= 3)

        if not confirmado.any():
            return None, None, None

        primer_ts_confirmado = confirmado[confirmado].index[0]
        credit_real = credits.loc[primer_ts_confirmado] / 100

        # Validar crédito máximo
        if credit_real > abs(strike1 - strike2):
            return None, None, None

        underlying   = underlying_ts.loc[primer_ts_confirmado] if primer_ts_confirmado in underlying_ts.index else None
        entry_ts_str = pd.Timestamp(primer_ts_confirmado).strftime('%H:%M')
        return credit_target / 100, underlying, entry_ts_str

    except Exception as e:
        print(f"[ERROR] _buscar_credito_con_target_vertical(): {e}")
        return None, None, None


def _buscar_credito_con_target_ic(df_chain, strikes, entry_time, credit_target):
    """
    Vectorizado: busca el primer timestamp con crédito IC >= credit_target sostenido 3 ticks consecutivos.
    """
    try:
        df = df_chain.reset_index()
        entry_time = entry_time.replace(second=30, microsecond=0)

        all_strikes = [
            strikes['Call_Strike1'], strikes['Call_Strike2'],
            strikes['Put_Strike1'],  strikes['Put_Strike2']
        ]

        df_filtered = df[
            (df['timestamp'] >= entry_time) &
            (df['strike'].isin(all_strikes))
        ][['timestamp', 'strike', 'avg_call', 'avg_put', 'underlying_price']]

        if df_filtered.empty:
            return None, None, None

        pivot_call = df_filtered.pivot_table(index='timestamp', columns='strike', values='avg_call', aggfunc='first')
        pivot_put  = df_filtered.pivot_table(index='timestamp', columns='strike', values='avg_put',  aggfunc='first')

        cs1, cs2 = strikes['Call_Strike1'], strikes['Call_Strike2']
        ps1, ps2 = strikes['Put_Strike1'],  strikes['Put_Strike2']

        if not all(s in pivot_call.columns for s in [cs1, cs2]):
            return None, None, None
        if not all(s in pivot_put.columns for s in [ps1, ps2]):
            return None, None, None

        idx = pivot_call.index.intersection(pivot_put.index)
        pivot_call = pivot_call.loc[idx]
        pivot_put  = pivot_put.loc[idx]

        credit_call   = pivot_call[cs1] - pivot_call[cs2]
        credit_put    = pivot_put[ps1]  - pivot_put[ps2]
        credits_total = (credit_call + credit_put) * 100

        underlying_ts = df_filtered.drop_duplicates('timestamp').set_index('timestamp')['underlying_price']

        cumple = credits_total >= credit_target
        cumple = cumple.fillna(False)

        # Si el primer tick ya cumple → entrar directo, sin esperar 3 ticks
        if bool(cumple.iloc[0]):
            primer_ts = cumple.index[0]
            credit_call_val  = credit_call.loc[primer_ts]
            credit_put_val   = credit_put.loc[primer_ts]
            credit_total_val = credit_call_val + credit_put_val
            put_spread  = strikes['Put_Strike1']  - strikes['Put_Strike2']
            call_spread = strikes['Call_Strike2'] - strikes['Call_Strike1']
            if credit_total_val > min(put_spread, call_spread):
                return None, None, None
            underlying   = underlying_ts.loc[primer_ts] if primer_ts in underlying_ts.index else None
            entry_ts_str = pd.Timestamp(primer_ts).strftime('%H:%M')
            return credit_total_val, underlying, entry_ts_str

        # Si no cumple desde el inicio → buscar 3 ticks consecutivos
        confirmado = (cumple.astype(int).rolling(3).sum() >= 3)

        if not confirmado.any():
            return None, None, None

        primer_ts_confirmado = confirmado[confirmado].index[0]

        credit_call_val  = credit_call.loc[primer_ts_confirmado]
        credit_put_val   = credit_put.loc[primer_ts_confirmado]
        credit_total_val = credit_call_val + credit_put_val

        # Validar crédito máximo
        put_spread  = strikes['Put_Strike1']  - strikes['Put_Strike2']
        call_spread = strikes['Call_Strike2'] - strikes['Call_Strike1']
        max_credit  = min(put_spread, call_spread)
        if credit_total_val > max_credit:
            return None, None, None

        underlying   = underlying_ts.loc[primer_ts_confirmado] if primer_ts_confirmado in underlying_ts.index else None
        entry_ts_str = pd.Timestamp(primer_ts_confirmado).strftime('%H:%M')
        return credit_target / 100, underlying, entry_ts_str

    except Exception as e:
        print(f"[ERROR] _buscar_credito_con_target_ic(): {e}")
        return None, None, None


def calcular_precios_verticales(vertical_strikes_df, path_csv_base, desplazamiento, desde, hasta, credit_target=0):
    import gc
    resultados = []
    symbol = vertical_strikes_df['Symbol'].iloc[0]
    vertical_strikes_df['Day'] = pd.to_datetime(vertical_strikes_df['Day'])

    # Filtrar rango de fechas
    vertical_strikes_df = vertical_strikes_df[
        (vertical_strikes_df['Day'] >= pd.to_datetime(desde)) &
        (vertical_strikes_df['Day'] <= pd.to_datetime(hasta))
    ].copy()

    # Aplicar desplazamientos
    vertical_strikes_df['Strike1_Desplazado'] = vertical_strikes_df.apply(
        lambda row: row['Strike1'] + desplazamiento if row['Option_Type'] == 'CALL' else row['Strike1'] - desplazamiento,
        axis=1)
    vertical_strikes_df['Strike2_Desplazado'] = vertical_strikes_df.apply(
        lambda row: row['Strike2'] + desplazamiento if row['Option_Type'] == 'CALL' else row['Strike2'] - desplazamiento,
        axis=1)

    df_cache = {}  # Cache optionChain

    for _, row in vertical_strikes_df.iterrows():
        fecha = pd.to_datetime(row['Day']).strftime('%Y-%m-%d')
        hora = row['Hour']
        specific_time = pd.to_datetime(f"{row['Day']} {hora}")
        strike1 = row['Strike1_Desplazado']
        strike2 = row['Strike2_Desplazado']
        option_type = row['Option_Type']

        # Load optionChain
        if fecha not in df_cache:
            try:
                df_cache[fecha] = _load_chain(symbol, fecha, path_csv_base)
            except:
                df_cache[fecha] = pd.DataFrame()

        df_chain = df_cache[fecha]

        credit_result, underlying_price_specific_time = calcular_credito_y_underlying_desde_df(
            df_chain, strike1, strike2, option_type, specific_time)

        # ✅ Validar ambos: strikes faltantes o underlying vacío
        if credit_result is None or underlying_price_specific_time is None:

            continue

        underlying_price_16 = obtener_ultimo_underlying_price_desde_df(df_chain)
        spreadDiff = abs(strike1 - strike2)

        # 🚫 Validar crédito máximo
        if credit_result > spreadDiff:
            continue

        # 🎯 Credit Target: iterar timestamps buscando el primero que cumpla
        entry_ts = hora  # default: hora fija/dinámica
        if credit_target > 0:
            credit_result, underlying_price_specific_time, entry_ts_found = _buscar_credito_con_target_vertical(
                df_chain, strike1, strike2, option_type, specific_time, credit_target
            )
            if credit_result is None:
                # Ningún timestamp del día cumplió el target → NO_ENTRY
                resultados.append({
                    'Day': fecha,
                    'Time': hora,
                    'Strikes': f"{strike1}/{strike2}",
                    'Option': option_type,
                    'Credit': None,
                    'Price': underlying_price_specific_time,
                    'Close': underlying_price_16,
                    'P/L': 0.0,
                    'Diff': round(underlying_price_16 - strike1, 2) if underlying_price_16 else None,
                    'itmOtm': 'NO_ENTRY',
                    "movimiento_esperado": row["movimiento_esperado"],
                    "score_30min": row["score_30min"],
                })
                if fecha in df_cache:
                    del df_cache[fecha]
                    gc.collect()
                continue
            entry_ts = entry_ts_found

        if underlying_price_16 is not None:
            if option_type == 'CALL':
                if underlying_price_16 > max(strike1, strike2):
                    ganancia_dia = (credit_result - spreadDiff) * 100
                elif underlying_price_16 < min(strike1, strike2):
                    ganancia_dia = credit_result * 100
                else:
                    valor_itm = (underlying_price_16 - min(strike1, strike2)) * 100
                    ganancia_dia = credit_result * 100 - valor_itm
            else:  # PUT
                if underlying_price_16 < min(strike1, strike2):
                    ganancia_dia = (credit_result - spreadDiff) * 100
                elif underlying_price_16 > max(strike1, strike2):
                    ganancia_dia = credit_result * 100
                else:
                    valor_itm = (max(strike1, strike2) - underlying_price_16) * 100
                    ganancia_dia = credit_result * 100 - valor_itm
        else:
            ganancia_dia = None

        # ITM/OTM status
        if underlying_price_16 is not None:
            if option_type == 'CALL':
                if underlying_price_16 > max(strike1, strike2):
                    itmOtm = "ITM"
                elif underlying_price_16 < min(strike1, strike2):
                    itmOtm = "OTM"
                else:
                    itmOtm = "ITM P"
            else:
                if underlying_price_16 < min(strike1, strike2):
                    itmOtm = "ITM"
                elif underlying_price_16 > max(strike1, strike2):
                    itmOtm = "OTM"
                else:
                    itmOtm = "ITM P"
        else:
            itmOtm = "ERROR"

        resultados.append({
            'Day': fecha,
            'Time': entry_ts,
            'Strikes': f"{strike1}/{strike2}",
            'Option': option_type,
            'Credit': round(credit_result * 100, 2),
            'Price': underlying_price_specific_time,
            'Close': underlying_price_16,
            'P/L': round(ganancia_dia, 2) if ganancia_dia is not None else None,
            'Diff': round(underlying_price_16 - strike1, 2) if underlying_price_16 else None,
            'itmOtm': itmOtm,
            "movimiento_esperado": row["movimiento_esperado"],
            "score_30min": row["score_30min"],

        })

        # Liberar memoria si es necesario
        if fecha in df_cache:
            del df_cache[fecha]
            gc.collect()

    df_resultados = pd.DataFrame(resultados)

    if df_resultados.empty:
        return df_resultados, 0.0, 0, 0

    # ✅ Filtro de respaldo por si algo se cuela
    df_resultados = df_resultados[
        df_resultados['Credit'].notnull() &
        df_resultados['Price'].notnull()
    ].copy()



    profit_total = df_resultados['P/L'].sum(skipna=True)
    positivos = df_resultados[df_resultados['P/L'] >= 0].shape[0]
    negativos = df_resultados[df_resultados['P/L'] < 0].shape[0]
    
    return df_resultados, profit_total, positivos, negativos


def calcular_credito_y_underlying_desde_df(df, strike1, strike2, option_type, timeHour):
    try:
        # Asegura segundos en 00 si es necesario
        timeHour = timeHour.replace(second=15, microsecond=0)
        df_debug = df.reset_index()  # en parquet ya tiene index = ["timestamp", "strike"]

        filtered_chain = df_debug[df_debug['timestamp'] >= timeHour]

        strikes_data = filtered_chain[filtered_chain['strike'].isin([strike1, strike2])]

        # print ("strikes_data:", strikes_data)

        if strikes_data.empty or len(strikes_data['strike'].unique()) < 2:
            print("[WARNING] → No se encontraron ambos strikes.")
            return 0, None

        strike1_data = strikes_data[strikes_data['strike'] == strike1].iloc[0]
        strike2_data = strikes_data[strikes_data['strike'] == strike2].iloc[0]

        # MOSTRAR EL TIMESTAMP REAL DE LAS FILAS USADAS
        # print(f"[DEBUG] → Timestamp usado strike1: {strike1_data['timestamp']}")
        # print(f"[DEBUG] → Timestamp usado strike2: {strike2_data['timestamp']}")

        avg1 = strike1_data[f'avg_{option_type.lower()}']
        avg2 = strike2_data[f'avg_{option_type.lower()}']
        credit_received = avg1 - avg2
        underlying_price = strike1_data['underlying_price']
        return credit_received, underlying_price
    except Exception as e:
        print(f"[ERROR] calcular_credito_y_underlying_desde_df(): {e}")
        return None, None


def obtener_ultimo_underlying_price_desde_df(df):
    try:
        return df.sort_values(by='timestamp')['underlying_price'].iloc[-1]
    except:
        return None

# ============================ FUNCIONES IRON CONDOR ============================


def calcular_ganancia_perdida_dia_iron_condor(credit_result, underlying_price_16, strikes):
    if underlying_price_16 is None or credit_result is None:
        return None

    diff_put = strikes['Put_Strike1'] - strikes['Put_Strike2']
    diff_call = strikes['Call_Strike2'] - strikes['Call_Strike1']

    if underlying_price_16 < strikes['Put_Strike2'] or underlying_price_16 > strikes['Call_Strike2']:
        return (credit_result - max(diff_put, diff_call)) * 100
    if strikes['Put_Strike1'] <= underlying_price_16 <= strikes['Call_Strike1']:
        return credit_result * 100
    if strikes['Put_Strike2'] <= underlying_price_16 < strikes['Put_Strike1']:
        return (credit_result - min(strikes['Put_Strike1'] - underlying_price_16, diff_put)) * 100
    if strikes['Call_Strike1'] < underlying_price_16 <= strikes['Call_Strike2']:
        return (credit_result - min(underlying_price_16 - strikes['Call_Strike1'], diff_call)) * 100

    return None


def calcular_precios_iron_condor(df, path_csv_base, desplazamiento, desde, hasta, credit_target=0):
    import gc

    resultados = []
    symbol = df['Symbol'].iloc[0]
    df['Day'] = pd.to_datetime(df['Day'])
    df = df[(df['Day'] >= pd.to_datetime(desde)) & (df['Day'] <= pd.to_datetime(hasta))].copy()

    df_cache = {}

    for _, row in df.iterrows():
        fecha = row['Day'].strftime('%Y-%m-%d')
        hora = row['Hour']
        specific_time = pd.to_datetime(f"{fecha} {hora}")

        strikes = {
            'Call_Strike1': row['Call_Strike1'] + desplazamiento,
            'Call_Strike2': row['Call_Strike2'] + desplazamiento,
            'Put_Strike1': row['Put_Strike1'] - desplazamiento,
            'Put_Strike2': row['Put_Strike2'] - desplazamiento
        }

        # 🔍 DEBUG TEMPORAL


        # 🚫 Validar estructura Iron Condor: short put <= short call
        short_put  = strikes['Put_Strike1']   # pata corta del put spread
        short_call = strikes['Call_Strike1']  # pata corta del call spread
        if short_put > short_call:

            continue

        if fecha not in df_cache:
            try:
                df_cache[fecha] = _load_chain(symbol, fecha, path_csv_base)
            except:
                df_cache[fecha] = pd.DataFrame()

        df_chain = df_cache[fecha]

        credit_result, underlying_price, _, _ = calcular_credito_y_underlying_iron_condor_df(
            df_chain, strikes, specific_time)

        # ✅ Validar ambos: si no hay strikes o underlying → SKIP
        if credit_result is None or underlying_price is None:

            continue

        # 🚫 Validar crédito máximo
        put_spread = strikes['Put_Strike1'] - strikes['Put_Strike2']
        call_spread = strikes['Call_Strike2'] - strikes['Call_Strike1']
        max_credit = min(put_spread, call_spread)
        if credit_result > max_credit:

            continue

        # 🎯 Credit Target: iterar timestamps buscando el primero que cumpla
        underlying_price_16 = obtener_ultimo_underlying_price_desde_df(df_chain)
        entry_ts = hora  # default: hora fija/dinámica
        if credit_target > 0:
            credit_result, underlying_price, entry_ts_found = _buscar_credito_con_target_ic(
                df_chain, strikes, specific_time, credit_target
            )
            if credit_result is None:
                # Ningún timestamp del día cumplió el target → NO_ENTRY
                resultados.append({
                    'Day': fecha,
                    'Time': hora,
                    'Strikes': f"{strikes['Put_Strike2']}/{strikes['Put_Strike1']} {strikes['Call_Strike1']}/{strikes['Call_Strike2']}",
                    'Price': None,
                    'Close': underlying_price_16,
                    'Credit': None,
                    'P/L': 0.0,
                    'Dif': None,
                    'itmOtm': 'NO_ENTRY',
                    "movimiento_esperado": row.get("movimiento_esperado", None),
                    "score_30min": row.get("score_30min", None),
                })
                del df_cache[fecha]
                gc.collect()
                continue
            entry_ts = entry_ts_found

        ganancia_dia = calcular_ganancia_perdida_dia_iron_condor(
            credit_result, underlying_price_16, strikes
        )

        if underlying_price_16 is not None:
            if underlying_price_16 < strikes['Put_Strike2'] or underlying_price_16 > strikes['Call_Strike2']:
                itmOtm = "ITM"
            elif strikes['Put_Strike1'] <= underlying_price_16 <= strikes['Call_Strike1']:
                itmOtm = "OTM"
            else:
                itmOtm = "ITM P"
        else:
            itmOtm = "N/A"

        dif_call = f"{(underlying_price_16 - strikes['Call_Strike1']):.2f}" if underlying_price_16 else None
        dif_put = f"{(underlying_price_16 - strikes['Put_Strike1']):.2f}" if underlying_price_16 else None

        resultados.append({
            'Day': fecha,
            'Time': entry_ts,
            'Strikes': f"{strikes['Put_Strike2']}/{strikes['Put_Strike1']} {strikes['Call_Strike1']}/{strikes['Call_Strike2']}",
            'Price': underlying_price,
            'Close': underlying_price_16,
            'Credit': round(credit_result * 100, 2),
            'P/L': round(ganancia_dia, 2) if isinstance(ganancia_dia, (int, float)) else None,
            'Dif': f"{dif_put}(P) / {dif_call}(C)",
            'itmOtm': itmOtm,
            "movimiento_esperado": row.get("movimiento_esperado", None),
            "score_30min": row.get("score_30min", None),
        })

        del df_cache[fecha]
        gc.collect()

    df_resultados = pd.DataFrame(resultados)

    if df_resultados.empty:
        return df_resultados, 0.0, 0, 0

    # ✅ Filtro de respaldo: nada de Credit o Price nulos
    df_resultados = df_resultados[
        df_resultados['Credit'].notnull() &
        df_resultados['Price'].notnull()
    ].copy()



    profit_total = df_resultados['P/L'].sum(skipna=True)
    positivos = df_resultados[df_resultados['P/L'] >= 0].shape[0]
    negativos = df_resultados[df_resultados['P/L'] < 0].shape[0]

    return df_resultados, profit_total, positivos, negativos


def calcular_credito_y_underlying_iron_condor_df(df, strikes, timeHour):
    try:
        timeHour = timeHour.replace(second=30, microsecond=0)
        df_debug = df.reset_index()
        filtered = df_debug[
            (df_debug['timestamp'] >= timeHour) &
            (df_debug['strike'].isin([
                strikes['Call_Strike1'], strikes['Call_Strike2'],
                strikes['Put_Strike1'],  strikes['Put_Strike2']
            ]))
        ]
        if filtered.empty:
            return None, None, None, None

        # Pivot vectorizado — primer timestamp disponible por strike
        ts_min = filtered['timestamp'].min()
        snap = filtered[filtered['timestamp'] == ts_min]

        cs1 = snap[snap['strike'] == strikes['Call_Strike1']]['avg_call']
        cs2 = snap[snap['strike'] == strikes['Call_Strike2']]['avg_call']
        ps1 = snap[snap['strike'] == strikes['Put_Strike1']]['avg_put']
        ps2 = snap[snap['strike'] == strikes['Put_Strike2']]['avg_put']

        if cs1.empty or cs2.empty or ps1.empty or ps2.empty:
            return None, None, None, None

        credit_calls = cs1.iloc[0] - cs2.iloc[0]
        credit_puts  = ps1.iloc[0] - ps2.iloc[0]
        credit_received = credit_calls + credit_puts
        underlying_price = snap['underlying_price'].iloc[0]
        return credit_received, underlying_price, credit_calls, credit_puts

    except Exception as e:
        print(f"[ERROR] calcular_credito_y_underlying_iron_condor_df(): {e}")
        return None, None, None, None


# ============================ MEJORES HORARIOS ============================

CSV_TOP3_BASE = "/var/www/html/backtestingmarket/backtestingSemanal/top3"

def obtener_hora_desde_csv(symbol, estrategia, risk, fecha_dia):
    """
    Dado un día de backtesting, busca en el CSV de top3_horarios el registro
    con rank=1 cuyo end_date corresponde al viernes de la semana anterior.
    Retorna el horario como string (ej: '1320') o None si no hay registro.
    """
    try:
        # Mapear riesgo al sufijo del archivo
        riesgo_map = {
            'conservador': 'cons',
            'intermedio': 'inte',
            'agresivo': 'agre',
            'ultra_agresivo': 'ultr'
        }
        sufijo = riesgo_map.get(risk.lower(), risk.lower())
        csv_path = os.path.join(CSV_TOP3_BASE, f"top3_horarios_{symbol}_{estrategia}_{sufijo}.csv")

        if not os.path.exists(csv_path):
            print(f"[MEJORES_HORARIOS] CSV no encontrado: {csv_path}")
            return None

        df_top = pd.read_csv(csv_path)
        df_top['end_date'] = pd.to_datetime(df_top['end_date'])

        # El viernes de la semana anterior al día actual
        fecha = pd.to_datetime(fecha_dia)
        dias_hasta_lunes = fecha.weekday()  # lunes=0, viernes=4
        lunes_semana_actual = fecha - pd.Timedelta(days=dias_hasta_lunes)
        viernes_semana_anterior = lunes_semana_actual - pd.Timedelta(days=3)  # viernes = lunes - 3

        # Buscar rank=1 con ese end_date
        fila = df_top[(df_top['end_date'] == viernes_semana_anterior) & (df_top['rank'] == 1)]

        if fila.empty:
            print(f"[MEJORES_HORARIOS] Sin registro rank=1 para end_date={viernes_semana_anterior.date()} ({fecha_dia})")
            return None

        hora_raw = fila.iloc[0]['hora']
        # Convertir float como 1320.0 → '1320'
        return str(int(float(hora_raw)))

    except Exception as e:
        print(f"[ERROR] obtener_hora_desde_csv(): {e}")
        return None


# ============================ RUTA PRINCIPAL ============================

@backtestingIdea2_bp.route('/get_backtesting_idea', methods=['GET'])
@login_required
def get_backtesting_idea():
    print(f"[AUDIT] get_backtesting_idea user_id={current_user.id} email={current_user.email}")
    if _check_rate_limit(current_user.id, 'backtesting_idea'):
        return jsonify({"error": "Too many requests. Please wait a moment."}), 429
    from tasks_backtestingidea2 import run_backtesting_idea

    desde = request.args.get('desde')
    hasta = request.args.get('hasta')
    symbol = request.args.get('symbol', 'SPX')
    estrategia = request.args.get('estrategia', 'Vertical')
    timeHour = request.args.get('hora', '1340')
    risk = request.args.get('risk', 'Intermedio').lower()
    modo = request.args.get('modo', 'HORA_FIJA')  # HORA_FIJA | MEJORES_HORARIOS
    credit_target = float(request.args.get('credit_target', 0) or 0)

    if modo == 'MEJORES_HORARIOS':
        # En modo dinámico no validamos archivo por hora fija,
        # la hora se resolverá semana a semana en el task
        job = run_backtesting_idea.apply_async(
            kwargs=dict(
                desde=desde,
                hasta=hasta,
                symbol=symbol,
                estrategia=estrategia,
                timeHour=None,
                risk=risk,
                modo='MEJORES_HORARIOS',
                credit_target=credit_target,
            ),
            queue="backtesting_idea2"
        )
    else:
        # Modo HORA_FIJA: comportamiento original
        archivo = f"/var/www/html/backtestingmarket/predictor_data/makekos/{symbol}/{symbol}_{estrategia}_strikes_{timeHour}.csv"
        if not os.path.exists(archivo):
            return jsonify({'error': 'Archivo de strikes no encontrado'}), 404

        job = run_backtesting_idea.apply_async(
            kwargs=dict(
                desde=desde,
                hasta=hasta,
                symbol=symbol,
                estrategia=estrategia,
                timeHour=timeHour,
                risk=risk,
                modo='HORA_FIJA',
                credit_target=credit_target,
            ),
            queue="backtesting_idea2"
        )

    return jsonify({
        "status": "queued",
        "task_id": job.id,
        "modo": modo
    })


def obtener_ultimo_underlying_price(archivo_option_chain):
    try:
        option_chain_df = pd.read_csv(archivo_option_chain)
        option_chain_df['timestamp'] = pd.to_datetime(option_chain_df['timestamp'])
        option_chain_df = option_chain_df.sort_values(by='timestamp')
        return option_chain_df['underlying_price'].iloc[-1] if 'underlying_price' in option_chain_df.columns else None
    except:
        return None


@backtestingIdea2_bp.route("/procesar_fila_vertical", methods=["POST"])
@login_required
def procesar_fila_vertical():
    print(f"[AUDIT] procesar_fila_vertical user_id={current_user.id} email={current_user.email}")
    if _check_rate_limit(current_user.id, 'procesar_vertical'):
        return jsonify({"error": "Too many requests. Please wait a moment."}), 429
    import time as _time
    _start = _time.time()
    data = request.get_json()
    try:
        symbol = data.get("symbol")
        fecha = data.get("Day")
        hora = data.get("Time")
        tipo = data.get("Option")  # "CALL" o "PUT"
        print(fecha)

        print("📌 [DEBUG] Day recibido del cliente:", fecha)
        # print("📌 [DEBUG] Time recibido del cliente:", hora)
        print("simbolo:", symbol)
        print("fecha:", fecha)
        strikes_raw = data.get("Strikes", "")
        strike1, strike2 = [float(s) for s in strikes_raw.split("/")]

        if tipo == "PUT" and strike1 < strike2:
            strike1, strike2 = strike2, strike1
        if tipo == "CALL" and strike1 > strike2:
            strike1, strike2 = strike2, strike1

        df_chain = _load_chain(symbol, fecha, PATH_UBUNTU_CHAINS)
        if df_chain.empty:
            return jsonify({"status": "error", "message": "No se pudo cargar option chain"}), 400

        time_obj = pd.to_datetime(f"{fecha} {hora}")
        credit_inicial, _ = calcular_credito_y_underlying_desde_df(df_chain, strike1, strike2, tipo, time_obj)
        if not isinstance(credit_inicial, (int, float)):
            return jsonify({"status": "error", "message": "No se pudo calcular el crédito inicial"}), 400

        evolution = []
        df_filtered = df_chain.reset_index()
        df_filtered = df_filtered[df_filtered["timestamp"] >= time_obj]

        for ts, group in df_filtered.groupby("timestamp"):
            try:
                avg1 = group[group["strike"] == strike1][f"avg_{tipo.lower()}"].iloc[0]
                avg2 = group[group["strike"] == strike2][f"avg_{tipo.lower()}"].iloc[0]
                underlying_price = group["underlying_price"].iloc[0]

                credit_actual = avg1 - avg2
                profit_loss = (credit_inicial - credit_actual) * 100
                evolution.append({
                    "timestamp": ts.strftime("%Y-%m-%d %H:%M:%S"),
                    "credit": float(round(credit_actual * 100, 2)),
                    "profit_loss": float(round(profit_loss, 2)),
                    "underlying_price": float(round(underlying_price, 2))

                })
            except:
                continue

        print(f"[TIMING] procesar_fila_vertical user_id={current_user.id} elapsed={round(_time.time()-_start,2)}s")
        return jsonify({
            "status": "success",
            "symbol": symbol,
            "fecha": fecha,
            "strikes": f"{strike1}/{strike2}",
            "tipo": tipo,
            "initial_credit": round(credit_inicial * 100, 2),
            "evolution": evolution,

        })

    except Exception as e:
        return jsonify({"status": "error", "message": str(e)}), 500


@backtestingIdea2_bp.route("/procesar_fila_ironcondor", methods=["POST"])
@login_required
def procesar_fila_ironcondor():
    print(f"[AUDIT] procesar_fila_ironcondor user_id={current_user.id} email={current_user.email}")
    if _check_rate_limit(current_user.id, 'procesar_ironcondor'):
        return jsonify({"error": "Too many requests. Please wait a moment."}), 429
    import time as _time
    _start = _time.time()
    data = request.get_json()
    try:
        symbol = data.get("symbol")
        fecha = data.get("Day")
        hora = data.get("Time")
        strikes_raw = data.get("Strikes", "").strip()

        # ✅ Separar bloques PUT y CALL
        try:
            put_block, call_block = strikes_raw.split(" ")
            put1, put2 = [float(s) for s in put_block.split("/")]
            call1, call2 = [float(s) for s in call_block.split("/")]

            sell_put, buy_put = max(put1, put2), min(put1, put2)
            sell_call, buy_call = min(call1, call2), max(call1, call2)
        except Exception:
            return jsonify({"status": "error", "message": f"Strikes inválidos: {strikes_raw}"}), 400

        df_chain = _load_chain(symbol, fecha, PATH_UBUNTU_CHAINS)
        if df_chain.empty:
            return jsonify({"status": "error", "message": "No se pudo cargar option chain"}), 400

        time_obj = pd.to_datetime(f"{fecha} {hora}")
        df_filtered = df_chain.reset_index()
        df_filtered = df_filtered[df_filtered["timestamp"] >= time_obj]

        # ✅ Calcular crédito inicial
        def obtener_credito(df, ts):
            try:
                df_momento = df[df["timestamp"] >= ts]
                ts_primero = df_momento["timestamp"].min()
                df_ts = df[df["timestamp"] == ts_primero]

                c1 = df_ts[df_ts["strike"] == sell_call]["avg_call"].iloc[0]
                c2 = df_ts[df_ts["strike"] == buy_call]["avg_call"].iloc[0]
                p1 = df_ts[df_ts["strike"] == sell_put]["avg_put"].iloc[0]
                p2 = df_ts[df_ts["strike"] == buy_put]["avg_put"].iloc[0]
                return (c1 - c2) + (p1 - p2), ts_primero
            except Exception as e:
                print(f"[ERROR crédito inicial]: {e}")
                return None, None

        credit_inicial, timestamp_inicial = obtener_credito(df_filtered, time_obj)
        if credit_inicial is None:
            return jsonify({"status": "error", "message": "No se pudo calcular el crédito inicial"}), 400

        # ✅ Generar evolución
        evolution = []
        for ts, group in df_filtered.groupby("timestamp"):
            try:
                c1 = group[group["strike"] == sell_call]["avg_call"].iloc[0]
                c2 = group[group["strike"] == buy_call]["avg_call"].iloc[0]
                p1 = group[group["strike"] == sell_put]["avg_put"].iloc[0]
                p2 = group[group["strike"] == buy_put]["avg_put"].iloc[0]
                underlying_price = group["underlying_price"].iloc[0]

                credit_actual = (c1 - c2) + (p1 - p2)
                profit_loss = (credit_inicial - credit_actual) * 100

                evolution.append({
                    "timestamp": ts.strftime("%Y-%m-%d %H:%M:%S"),
                    "credit": float(round(credit_actual * 100, 2)),
                    "profit_loss": float(round(profit_loss, 2)),
                    "underlying_price": float(round(underlying_price, 2))

                })
            except:
                continue

        print(f"[TIMING] procesar_fila_ironcondor user_id={current_user.id} elapsed={round(_time.time()-_start,2)}s")
        return jsonify({
            "status": "success",
            "symbol": symbol,
            "fecha": fecha,
            "strikes": f"{sell_put}/{buy_put} {sell_call}/{buy_call}",
            "initial_credit": round(credit_inicial * 100, 2),
            "evolution": evolution,

        })

    except Exception as e:
        return jsonify({"status": "error", "message": str(e)}), 500


@backtestingIdea2_bp.route("/get_mejores_horarios", methods=["GET"])
@login_required
def get_mejores_horarios():
    print(f"[AUDIT] get_mejores_horarios user_id={current_user.id} email={current_user.email}")
    if _check_rate_limit(current_user.id, 'mejores_horarios'):
        return jsonify({"error": "Too many requests. Please wait a moment."}), 429
    symbol = request.args.get('symbol', 'SPX')
    estrategia = request.args.get('estrategia', 'IronCondor')
    risk = request.args.get('risk', 'agresivo').lower()

    riesgo_map = {
        'conservador': 'cons',
        'intermedio': 'inte',
        'agresivo': 'agre',
        'ultra_agresivo': 'ultr'
    }
    sufijo = riesgo_map.get(risk, risk)
    csv_path = os.path.join(CSV_TOP3_BASE, f"top3_horarios_{symbol}_{estrategia}_{sufijo}.csv")

    if not os.path.exists(csv_path):
        return jsonify({'error': f'CSV no encontrado: {csv_path}'}), 404

    df = pd.read_csv(csv_path)
    df['end_date'] = pd.to_datetime(df['end_date'])

    semanas = []
    for end_date, grupo in df.groupby('end_date'):
        # Semana de aplicación = lunes siguiente al end_date (viernes)
        lunes_aplicacion = end_date + pd.Timedelta(days=3)
        viernes_aplicacion = lunes_aplicacion + pd.Timedelta(days=4)
        label_semana = f"{lunes_aplicacion.strftime('%d/%m')} - {viernes_aplicacion.strftime('%d/%m/%Y')}"

        rankings = []
        for _, row in grupo.sort_values('rank').iterrows():
            hora_raw = str(int(float(row['hora'])))
            rankings.append({
                'rank': int(row['rank']),
                'hora': hora_raw,
                'profit': float(row['profit']) if pd.notna(row.get('profit')) else None
            })

        semanas.append({
            'semana': label_semana,
            'end_date': end_date.strftime('%Y-%m-%d'),
            'rankings': rankings
        })

    # Ordenar por end_date descendente (más reciente primero)
    semanas.sort(key=lambda x: x['end_date'], reverse=True)

    return jsonify({'semanas': semanas})


@backtestingIdea2_bp.route("/task_result/<task_id>", methods=["GET"])
@login_required
def task_result(task_id):
    task = AsyncResult(task_id, app=celery_app)

    # Task revocado por el usuario
    if task.state == 'REVOKED':
        return jsonify({"state": "REVOKED", "result_ready": False}), 200

    if not task.ready():
        return jsonify({
            "state": task.state,
            "result_ready": False
        })

    if task.failed():
        return jsonify({
            "state": "FAILURE",
            "error": str(task.result)
        }), 500

    # 🔥 ESTO es lo que el HTML espera
    return jsonify(task.result)


@backtestingIdea2_bp.route("/cancel_task/<task_id>", methods=["POST"])
@login_required
def cancel_task(task_id):
    try:
        celery_app.control.revoke(task_id, terminate=True, signal='SIGTERM')
        return jsonify({"status": "cancelled", "task_id": task_id})
    except Exception as e:
        return jsonify({"status": "error", "message": str(e)}), 500