# from celery_app import celery_app
# from celery.result import AsyncResult
# import os
# import pandas as pd
# from flask import Blueprint, jsonify, request
# from datetime import datetime
# import math
# import gc
# from flask_login import login_required, current_user


# from functools import lru_cache
# import pyarrow as pa
# import pyarrow.parquet as pq
# import sys

# from tasks_backtestingidea2 import run_backtesting_idea

# import redis as redis_lib
# import datetime
# import pytz
# _rate_redis = redis_lib.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# # ── Rate limiting ─────────────────────────────────────────────
# RATE_LIMIT_WINDOW      = 60   # segundos
# RATE_LIMIT_MAX_DIA     = 5    # 8am-6pm NY — uso normal, anti-bot
# RATE_LIMIT_MAX_NOCHE   = 30   # 6pm-8am NY — más permisivo
# RATE_LIMIT_WHITELIST   = {9}  # user_ids sin límite siempre (agregar con comas: {9, 15, 23})
# RATE_LIMIT_HORA_ON     = 8    # 8am NY — empieza horario restrictivo
# RATE_LIMIT_HORA_OFF    = 18   # 6pm NY — termina horario restrictivo
# # ─────────────────────────────────────────────────────────────

# def _check_rate_limit(user_id, endpoint, max_calls=8, window=60):
#     """Devuelve True si el usuario superó el límite."""
#     if user_id in RATE_LIMIT_WHITELIST:
#         return False  # sin límite siempre

#     # Horario NY (maneja EST/EDT automáticamente)
#     tz_ny = pytz.timezone('America/New_York')
#     hora_ny = datetime.datetime.now(tz_ny).hour

#     # Seleccionar límite según horario
#     if RATE_LIMIT_HORA_ON <= hora_ny < RATE_LIMIT_HORA_OFF:
#         limite = RATE_LIMIT_MAX_DIA    # 8am-6pm: límite estricto
#     else:
#         limite = RATE_LIMIT_MAX_NOCHE  # 6pm-8am: límite permisivo

#     key = f"rl:{endpoint}:{user_id}"
#     count = _rate_redis.incr(key)
#     if count == 1:
#         _rate_redis.expire(key, RATE_LIMIT_WINDOW)
#     if count > limite:
#         print(f"[RATE_LIMIT] endpoint={endpoint} user_id={user_id} count={count} limite={limite}")
#         return True
#     return False


# backtestingIdea2_bp = Blueprint('backtestingIdea2', __name__, template_folder='templates')

# # Ruta base para archivos
# PATH_UBUNTU_CHAINS = "/var/www/html/flask_project/"


# def _load_chain(symbol: str, fecha: str, base: str) -> pd.DataFrame:
#     folder = os.path.join(base, "chains")
#     prefix = "$" if symbol in ["SPX", "RUT", "XSP"] else ""
#     pq_path = os.path.join(folder, f"optionChain_{prefix}{symbol}_{fecha}.parquet")

#     # print(f"[DEBUG] Intentando cargar {symbol} para {fecha}")

#     if os.path.exists(pq_path):
#         # print(f"[DEBUG] Leyendo PARQUET desde {pq_path}")
#         df = pq.read_table(pq_path).to_pandas()
#     else:
#         # print(f"[ERROR] No existe PARQUET para {fecha} (ni CSV)")
#         return pd.DataFrame()

#     df["avg_call"] = (df["bid_call"] + df["ask_call"]) / 2
#     df["avg_put"] = (df["bid_put"] + df["ask_put"]) / 2
#     df.set_index(["timestamp", "strike"], inplace=True)

#     return df


# # ============================ FUNCIONES VERTICALES ============================


# def calcular_credito_y_underlying_desde_archivo_csv(archivo_option_chain, strike1, strike2, option_type, timeHour):
#     try:
#         option_chain_df = pd.read_csv(archivo_option_chain, usecols=range(9))
#         option_chain_df['timestamp'] = pd.to_datetime(option_chain_df['timestamp'])
#         filtered_chain = option_chain_df[option_chain_df['timestamp'] >= timeHour]
#         if filtered_chain.empty:
#             return 0, None

#         strikes_data = filtered_chain[(filtered_chain['strike'].isin([strike1, strike2]))]
#         if option_type == 'PUT':
#             strikes_data = strikes_data[["strike", "bid_put", "ask_put", "underlying_price"]]
#         elif option_type == 'CALL':
#             strikes_data = strikes_data[["strike", "bid_call", "ask_call", "underlying_price"]]

#         strike1_data = strikes_data[strikes_data['strike'] == strike1].iloc[0]
#         strike2_data = strikes_data[strikes_data['strike'] == strike2].iloc[0]

#         if option_type == 'PUT':
#             strike1_avg = (strike1_data['bid_put'] + strike1_data['ask_put']) / 2
#             strike2_avg = (strike2_data['bid_put'] + strike2_data['ask_put']) / 2
#         elif option_type == 'CALL':
#             strike1_avg = (strike1_data['bid_call'] + strike1_data['ask_call']) / 2
#             strike2_avg = (strike2_data['bid_call'] + strike2_data['ask_call']) / 2

#         credit_received = strike1_avg - strike2_avg
#         underlying_price = strike1_data['underlying_price']

#         return credit_received, underlying_price
#     except:
#         return 0, None


# # ========================= CREDIT TARGET HELPERS =========================

# def _buscar_credito_con_target_vertical(df_chain, strike1, strike2, option_type, entry_time, credit_target):
#     """
#     Itera timestamps desde entry_time buscando el primero cuyo crédito >= credit_target.
#     - Primer timestamp que cumple: devuelve el crédito real.
#     - Timestamps posteriores que cumplen: devuelve credit_target / 100 (igual que backtestingCreditLimit).
#     - Ninguno cumple: devuelve (None, None, None) → NO_ENTRY.
#     """
#     try:
#         df = df_chain.reset_index()
#         entry_time = entry_time.replace(second=30, microsecond=0)
#         df_filtered = df[df['timestamp'] >= entry_time]
#         col = f'avg_{option_type.lower()}'

#         es_primer_ts = True
#         for ts, snap in df_filtered.groupby('timestamp'):
#             s1 = snap[snap['strike'] == strike1]
#             s2 = snap[snap['strike'] == strike2]
#             if s1.empty or s2.empty:
#                 continue

#             credit = s1[col].iloc[0] - s2[col].iloc[0]
#             credit_dollars = credit * 100

#             if credit_dollars >= credit_target:
#                 underlying = s1['underlying_price'].iloc[0]
#                 entry_ts = pd.Timestamp(ts).strftime('%H:%M')
#                 if es_primer_ts:
#                     return credit, underlying, entry_ts
#                 else:
#                     return credit_target / 100, underlying, entry_ts

#             es_primer_ts = False

#         return None, None, None
#     except Exception as e:
#         print(f"[ERROR] _buscar_credito_con_target_vertical(): {e}")
#         return None, None, None


# def _buscar_credito_con_target_ic(df_chain, strikes, entry_time, credit_target):
#     """
#     Igual que _buscar_credito_con_target_vertical pero para Iron Condor.
#     """
#     try:
#         df = df_chain.reset_index()
#         entry_time = entry_time.replace(second=30, microsecond=0)
#         df_filtered = df[df['timestamp'] >= entry_time]

#         all_strikes = [
#             strikes['Call_Strike1'], strikes['Call_Strike2'],
#             strikes['Put_Strike1'],  strikes['Put_Strike2']
#         ]

#         es_primer_ts = True
#         for ts, snap in df_filtered.groupby('timestamp'):
#             rows = snap[snap['strike'].isin(all_strikes)]
#             if len(rows['strike'].unique()) < 4:
#                 continue

#             def mid(s, col): return rows[rows['strike'] == s][col].iloc[0]

#             credit_call = mid(strikes['Call_Strike1'], 'avg_call') - mid(strikes['Call_Strike2'], 'avg_call')
#             credit_put  = mid(strikes['Put_Strike1'],  'avg_put')  - mid(strikes['Put_Strike2'],  'avg_put')
#             credit_total = (credit_call + credit_put) * 100

#             if credit_total >= credit_target:
#                 underlying = rows.iloc[0]['underlying_price']
#                 entry_ts = pd.Timestamp(ts).strftime('%H:%M')
#                 if es_primer_ts:
#                     return credit_call + credit_put, underlying, entry_ts
#                 else:
#                     return credit_target / 100, underlying, entry_ts

#             es_primer_ts = False

#         return None, None, None
#     except Exception as e:
#         print(f"[ERROR] _buscar_credito_con_target_ic(): {e}")
#         return None, None, None


# def calcular_precios_verticales(vertical_strikes_df, path_csv_base, desplazamiento, desde, hasta, credit_target=0):
#     import gc
#     resultados = []
#     symbol = vertical_strikes_df['Symbol'].iloc[0]
#     vertical_strikes_df['Day'] = pd.to_datetime(vertical_strikes_df['Day'])

#     # Filtrar rango de fechas
#     vertical_strikes_df = vertical_strikes_df[
#         (vertical_strikes_df['Day'] >= pd.to_datetime(desde)) &
#         (vertical_strikes_df['Day'] <= pd.to_datetime(hasta))
#     ].copy()

#     # Aplicar desplazamientos
#     vertical_strikes_df['Strike1_Desplazado'] = vertical_strikes_df.apply(
#         lambda row: row['Strike1'] + desplazamiento if row['Option_Type'] == 'CALL' else row['Strike1'] - desplazamiento,
#         axis=1)
#     vertical_strikes_df['Strike2_Desplazado'] = vertical_strikes_df.apply(
#         lambda row: row['Strike2'] + desplazamiento if row['Option_Type'] == 'CALL' else row['Strike2'] - desplazamiento,
#         axis=1)

#     df_cache = {}  # Cache optionChain

#     for _, row in vertical_strikes_df.iterrows():
#         fecha = pd.to_datetime(row['Day']).strftime('%Y-%m-%d')
#         hora = row['Hour']
#         specific_time = pd.to_datetime(f"{row['Day']} {hora}")
#         strike1 = row['Strike1_Desplazado']
#         strike2 = row['Strike2_Desplazado']
#         option_type = row['Option_Type']

#         # Load optionChain
#         if fecha not in df_cache:
#             try:
#                 df_cache[fecha] = _load_chain(symbol, fecha, path_csv_base)
#             except:
#                 df_cache[fecha] = pd.DataFrame()

#         df_chain = df_cache[fecha]

#         credit_result, underlying_price_specific_time = calcular_credito_y_underlying_desde_df(
#             df_chain, strike1, strike2, option_type, specific_time)

#         # ✅ Validar ambos: strikes faltantes o underlying vacío
#         if credit_result is None or underlying_price_specific_time is None:

#             continue

#         underlying_price_16 = obtener_ultimo_underlying_price_desde_df(df_chain)
#         spreadDiff = abs(strike1 - strike2)

#         # 🚫 Validar crédito máximo
#         if credit_result > spreadDiff:
#             continue

#         # 🎯 Credit Target: iterar timestamps buscando el primero que cumpla
#         entry_ts = hora  # default: hora fija/dinámica
#         if credit_target > 0:
#             credit_result, underlying_price_specific_time, entry_ts_found = _buscar_credito_con_target_vertical(
#                 df_chain, strike1, strike2, option_type, specific_time, credit_target
#             )
#             if credit_result is None:
#                 # Ningún timestamp del día cumplió el target → NO_ENTRY
#                 resultados.append({
#                     'Day': fecha,
#                     'Time': hora,
#                     'Strikes': f"{strike1}/{strike2}",
#                     'Option': option_type,
#                     'Credit': None,
#                     'Price': underlying_price_specific_time,
#                     'Close': underlying_price_16,
#                     'P/L': 0.0,
#                     'Diff': round(underlying_price_16 - strike1, 2) if underlying_price_16 else None,
#                     'itmOtm': 'NO_ENTRY',
#                     "movimiento_esperado": row["movimiento_esperado"],
#                     "score_30min": row["score_30min"],
#                 })
#                 if fecha in df_cache:
#                     del df_cache[fecha]
#                     gc.collect()
#                 continue
#             entry_ts = entry_ts_found

#         if underlying_price_16 is not None:
#             if option_type == 'CALL':
#                 if underlying_price_16 > max(strike1, strike2):
#                     ganancia_dia = (credit_result - spreadDiff) * 100
#                 elif underlying_price_16 < min(strike1, strike2):
#                     ganancia_dia = credit_result * 100
#                 else:
#                     valor_itm = (underlying_price_16 - min(strike1, strike2)) * 100
#                     ganancia_dia = credit_result * 100 - valor_itm
#             else:  # PUT
#                 if underlying_price_16 < min(strike1, strike2):
#                     ganancia_dia = (credit_result - spreadDiff) * 100
#                 elif underlying_price_16 > max(strike1, strike2):
#                     ganancia_dia = credit_result * 100
#                 else:
#                     valor_itm = (max(strike1, strike2) - underlying_price_16) * 100
#                     ganancia_dia = credit_result * 100 - valor_itm
#         else:
#             ganancia_dia = None

#         # ITM/OTM status
#         if underlying_price_16 is not None:
#             if option_type == 'CALL':
#                 if underlying_price_16 > max(strike1, strike2):
#                     itmOtm = "ITM"
#                 elif underlying_price_16 < min(strike1, strike2):
#                     itmOtm = "OTM"
#                 else:
#                     itmOtm = "ITM P"
#             else:
#                 if underlying_price_16 < min(strike1, strike2):
#                     itmOtm = "ITM"
#                 elif underlying_price_16 > max(strike1, strike2):
#                     itmOtm = "OTM"
#                 else:
#                     itmOtm = "ITM P"
#         else:
#             itmOtm = "ERROR"

#         resultados.append({
#             'Day': fecha,
#             'Time': entry_ts,
#             'Strikes': f"{strike1}/{strike2}",
#             'Option': option_type,
#             'Credit': round(credit_result * 100, 2),
#             'Price': underlying_price_specific_time,
#             'Close': underlying_price_16,
#             'P/L': round(ganancia_dia, 2) if ganancia_dia is not None else None,
#             'Diff': round(underlying_price_16 - strike1, 2) if underlying_price_16 else None,
#             'itmOtm': itmOtm,
#             "movimiento_esperado": row["movimiento_esperado"],
#             "score_30min": row["score_30min"],

#         })

#         # Liberar memoria si es necesario
#         if fecha in df_cache:
#             del df_cache[fecha]
#             gc.collect()

#     df_resultados = pd.DataFrame(resultados)

#     if df_resultados.empty:
#         return df_resultados, 0.0, 0, 0

#     # ✅ Filtro de respaldo por si algo se cuela
#     df_resultados = df_resultados[
#         df_resultados['Credit'].notnull() &
#         df_resultados['Price'].notnull()
#     ].copy()



#     profit_total = df_resultados['P/L'].sum(skipna=True)
#     positivos = df_resultados[df_resultados['P/L'] >= 0].shape[0]
#     negativos = df_resultados[df_resultados['P/L'] < 0].shape[0]
    
#     return df_resultados, profit_total, positivos, negativos


# def calcular_credito_y_underlying_desde_df(df, strike1, strike2, option_type, timeHour):
#     try:
#         # Asegura segundos en 00 si es necesario
#         timeHour = timeHour.replace(second=15, microsecond=0)
#         df_debug = df.reset_index()  # en parquet ya tiene index = ["timestamp", "strike"]

#         filtered_chain = df_debug[df_debug['timestamp'] >= timeHour]

#         strikes_data = filtered_chain[filtered_chain['strike'].isin([strike1, strike2])]

#         # print ("strikes_data:", strikes_data)

#         if strikes_data.empty or len(strikes_data['strike'].unique()) < 2:
#             print("[WARNING] → No se encontraron ambos strikes.")
#             return 0, None

#         strike1_data = strikes_data[strikes_data['strike'] == strike1].iloc[0]
#         strike2_data = strikes_data[strikes_data['strike'] == strike2].iloc[0]

#         # MOSTRAR EL TIMESTAMP REAL DE LAS FILAS USADAS
#         # print(f"[DEBUG] → Timestamp usado strike1: {strike1_data['timestamp']}")
#         # print(f"[DEBUG] → Timestamp usado strike2: {strike2_data['timestamp']}")

#         avg1 = strike1_data[f'avg_{option_type.lower()}']
#         avg2 = strike2_data[f'avg_{option_type.lower()}']
#         credit_received = avg1 - avg2
#         underlying_price = strike1_data['underlying_price']
#         return credit_received, underlying_price
#     except Exception as e:
#         print(f"[ERROR] calcular_credito_y_underlying_desde_df(): {e}")
#         return None, None


# def obtener_ultimo_underlying_price_desde_df(df):
#     try:
#         return df.sort_values(by='timestamp')['underlying_price'].iloc[-1]
#     except:
#         return None

# # ============================ FUNCIONES IRON CONDOR ============================


# def calcular_ganancia_perdida_dia_iron_condor(credit_result, underlying_price_16, strikes):
#     if underlying_price_16 is None or credit_result is None:
#         return None

#     diff_put = strikes['Put_Strike1'] - strikes['Put_Strike2']
#     diff_call = strikes['Call_Strike2'] - strikes['Call_Strike1']

#     if underlying_price_16 < strikes['Put_Strike2'] or underlying_price_16 > strikes['Call_Strike2']:
#         return (credit_result - max(diff_put, diff_call)) * 100
#     if strikes['Put_Strike1'] <= underlying_price_16 <= strikes['Call_Strike1']:
#         return credit_result * 100
#     if strikes['Put_Strike2'] <= underlying_price_16 < strikes['Put_Strike1']:
#         return (credit_result - min(strikes['Put_Strike1'] - underlying_price_16, diff_put)) * 100
#     if strikes['Call_Strike1'] < underlying_price_16 <= strikes['Call_Strike2']:
#         return (credit_result - min(underlying_price_16 - strikes['Call_Strike1'], diff_call)) * 100

#     return None


# def calcular_precios_iron_condor(df, path_csv_base, desplazamiento, desde, hasta, credit_target=0):
#     import gc

#     resultados = []
#     symbol = df['Symbol'].iloc[0]
#     df['Day'] = pd.to_datetime(df['Day'])
#     df = df[(df['Day'] >= pd.to_datetime(desde)) & (df['Day'] <= pd.to_datetime(hasta))].copy()

#     df_cache = {}

#     for _, row in df.iterrows():
#         fecha = row['Day'].strftime('%Y-%m-%d')
#         hora = row['Hour']
#         specific_time = pd.to_datetime(f"{fecha} {hora}")

#         strikes = {
#             'Call_Strike1': row['Call_Strike1'] + desplazamiento,
#             'Call_Strike2': row['Call_Strike2'] + desplazamiento,
#             'Put_Strike1': row['Put_Strike1'] - desplazamiento,
#             'Put_Strike2': row['Put_Strike2'] - desplazamiento
#         }

#         # 🔍 DEBUG TEMPORAL


#         # 🚫 Validar estructura Iron Condor: short put <= short call
#         short_put  = strikes['Put_Strike1']   # pata corta del put spread
#         short_call = strikes['Call_Strike1']  # pata corta del call spread
#         if short_put > short_call:

#             continue

#         if fecha not in df_cache:
#             try:
#                 df_cache[fecha] = _load_chain(symbol, fecha, path_csv_base)
#             except:
#                 df_cache[fecha] = pd.DataFrame()

#         df_chain = df_cache[fecha]

#         credit_result, underlying_price, _, _ = calcular_credito_y_underlying_iron_condor_df(
#             df_chain, strikes, specific_time)

#         # ✅ Validar ambos: si no hay strikes o underlying → SKIP
#         if credit_result is None or underlying_price is None:

#             continue

#         # 🚫 Validar crédito máximo
#         put_spread = strikes['Put_Strike1'] - strikes['Put_Strike2']
#         call_spread = strikes['Call_Strike2'] - strikes['Call_Strike1']
#         max_credit = min(put_spread, call_spread)
#         if credit_result > max_credit:

#             continue

#         # 🎯 Credit Target: iterar timestamps buscando el primero que cumpla
#         underlying_price_16 = obtener_ultimo_underlying_price_desde_df(df_chain)
#         entry_ts = hora  # default: hora fija/dinámica
#         if credit_target > 0:
#             credit_result, underlying_price, entry_ts_found = _buscar_credito_con_target_ic(
#                 df_chain, strikes, specific_time, credit_target
#             )
#             if credit_result is None:
#                 # Ningún timestamp del día cumplió el target → NO_ENTRY
#                 resultados.append({
#                     'Day': fecha,
#                     'Time': hora,
#                     'Strikes': f"{strikes['Put_Strike2']}/{strikes['Put_Strike1']} {strikes['Call_Strike1']}/{strikes['Call_Strike2']}",
#                     'Price': None,
#                     'Close': underlying_price_16,
#                     'Credit': None,
#                     'P/L': 0.0,
#                     'Dif': None,
#                     'itmOtm': 'NO_ENTRY',
#                     "movimiento_esperado": row.get("movimiento_esperado", None),
#                     "score_30min": row.get("score_30min", None),
#                 })
#                 del df_cache[fecha]
#                 gc.collect()
#                 continue
#             entry_ts = entry_ts_found

#         ganancia_dia = calcular_ganancia_perdida_dia_iron_condor(
#             credit_result, underlying_price_16, strikes
#         )

#         if underlying_price_16 is not None:
#             if underlying_price_16 < strikes['Put_Strike2'] or underlying_price_16 > strikes['Call_Strike2']:
#                 itmOtm = "ITM"
#             elif strikes['Put_Strike1'] <= underlying_price_16 <= strikes['Call_Strike1']:
#                 itmOtm = "OTM"
#             else:
#                 itmOtm = "ITM P"
#         else:
#             itmOtm = "N/A"

#         dif_call = f"{(underlying_price_16 - strikes['Call_Strike1']):.2f}" if underlying_price_16 else None
#         dif_put = f"{(underlying_price_16 - strikes['Put_Strike1']):.2f}" if underlying_price_16 else None

#         resultados.append({
#             'Day': fecha,
#             'Time': entry_ts,
#             'Strikes': f"{strikes['Put_Strike2']}/{strikes['Put_Strike1']} {strikes['Call_Strike1']}/{strikes['Call_Strike2']}",
#             'Price': underlying_price,
#             'Close': underlying_price_16,
#             'Credit': round(credit_result * 100, 2),
#             'P/L': round(ganancia_dia, 2) if isinstance(ganancia_dia, (int, float)) else None,
#             'Dif': f"{dif_put}(P) / {dif_call}(C)",
#             'itmOtm': itmOtm,
#             "movimiento_esperado": row.get("movimiento_esperado", None),
#             "score_30min": row.get("score_30min", None),
#         })

#         del df_cache[fecha]
#         gc.collect()

#     df_resultados = pd.DataFrame(resultados)

#     if df_resultados.empty:
#         return df_resultados, 0.0, 0, 0

#     # ✅ Filtro de respaldo: nada de Credit o Price nulos
#     df_resultados = df_resultados[
#         df_resultados['Credit'].notnull() &
#         df_resultados['Price'].notnull()
#     ].copy()



#     profit_total = df_resultados['P/L'].sum(skipna=True)
#     positivos = df_resultados[df_resultados['P/L'] >= 0].shape[0]
#     negativos = df_resultados[df_resultados['P/L'] < 0].shape[0]

#     return df_resultados, profit_total, positivos, negativos


# def calcular_credito_y_underlying_iron_condor_df(df, strikes, timeHour):
#     try:
#         df_debug = df.reset_index()  # porque el parquet ya está indexado por ["timestamp", "strike"]
#         filtered_chain = df_debug[df_debug['timestamp'] >= timeHour]
#         if filtered_chain.empty:
#             print("[WARNING] No hay datos después del timeHour")
#             return None, None, None, None

#         strikes_data = filtered_chain[filtered_chain['strike'].isin([
#             strikes['Call_Strike1'], strikes['Call_Strike2'],
#             strikes['Put_Strike1'], strikes['Put_Strike2']
#         ])]

#         strike_values = {}
#         for key, strike in strikes.items():
#             row = strikes_data[strikes_data['strike'] == strike]
#             if row.empty:
#                 strike_values[key] = None
#             elif key.startswith("Call"):
#                 strike_values[key] = row["avg_call"].iloc[0]
#             else:
#                 strike_values[key] = row["avg_put"].iloc[0]

#         if any(v is None for v in strike_values.values()):
#             print("[WARNING] Faltan strikes necesarios para calcular crédito.")
#             return None, None, None, None

#         credit_calls = strike_values['Call_Strike1'] - strike_values['Call_Strike2']
#         credit_puts = strike_values['Put_Strike1'] - strike_values['Put_Strike2']
#         credit_received = credit_calls + credit_puts
#         underlying_price = strikes_data['underlying_price'].iloc[0]
#         return credit_received, underlying_price, credit_calls, credit_puts

#     except Exception as e:
#         print(f"[ERROR] calcular_credito_y_underlying_iron_condor_df(): {e}")
#         return None, None, None, None


# # ============================ MEJORES HORARIOS ============================

# CSV_TOP3_BASE = "/var/www/html/backtestingmarket"

# def obtener_hora_desde_csv(symbol, estrategia, risk, fecha_dia):
#     """
#     Dado un día de backtesting, busca en el CSV de top3_horarios el registro
#     con rank=1 cuyo end_date corresponde al viernes de la semana anterior.
#     Retorna el horario como string (ej: '1320') o None si no hay registro.
#     """
#     try:
#         # Mapear riesgo al sufijo del archivo
#         riesgo_map = {
#             'conservador': 'cons',
#             'intermedio': 'inte',
#             'agresivo': 'agre',
#             'ultra_agresivo': 'ultr'
#         }
#         sufijo = riesgo_map.get(risk.lower(), risk.lower())
#         csv_path = os.path.join(CSV_TOP3_BASE, f"top3_horarios_{symbol}_{estrategia}_{sufijo}.csv")

#         if not os.path.exists(csv_path):
#             print(f"[MEJORES_HORARIOS] CSV no encontrado: {csv_path}")
#             return None

#         df_top = pd.read_csv(csv_path)
#         df_top['end_date'] = pd.to_datetime(df_top['end_date'])

#         # El viernes de la semana anterior al día actual
#         fecha = pd.to_datetime(fecha_dia)
#         dias_hasta_lunes = fecha.weekday()  # lunes=0, viernes=4
#         lunes_semana_actual = fecha - pd.Timedelta(days=dias_hasta_lunes)
#         viernes_semana_anterior = lunes_semana_actual - pd.Timedelta(days=3)  # viernes = lunes - 3

#         # Buscar rank=1 con ese end_date
#         fila = df_top[(df_top['end_date'] == viernes_semana_anterior) & (df_top['rank'] == 1)]

#         if fila.empty:
#             print(f"[MEJORES_HORARIOS] Sin registro rank=1 para end_date={viernes_semana_anterior.date()} ({fecha_dia})")
#             return None

#         hora_raw = fila.iloc[0]['hora']
#         # Convertir float como 1320.0 → '1320'
#         return str(int(float(hora_raw)))

#     except Exception as e:
#         print(f"[ERROR] obtener_hora_desde_csv(): {e}")
#         return None


# # ============================ RUTA PRINCIPAL ============================

# @backtestingIdea2_bp.route('/get_backtesting_idea', methods=['GET'])
# @login_required
# def get_backtesting_idea():
#     print(f"[AUDIT] get_backtesting_idea user_id={current_user.id} email={current_user.email}")
#     if _check_rate_limit(current_user.id, 'backtesting_idea'):
#         return jsonify({"error": "Too many requests. Please wait a moment."}), 429
#     from tasks_backtestingidea2 import run_backtesting_idea

#     desde = request.args.get('desde')
#     hasta = request.args.get('hasta')
#     symbol = request.args.get('symbol', 'SPX')
#     estrategia = request.args.get('estrategia', 'Vertical')
#     timeHour = request.args.get('hora', '1340')
#     risk = request.args.get('risk', 'Intermedio').lower()
#     modo = request.args.get('modo', 'HORA_FIJA')  # HORA_FIJA | MEJORES_HORARIOS
#     credit_target = float(request.args.get('credit_target', 0) or 0)

#     if modo == 'MEJORES_HORARIOS':
#         # En modo dinámico no validamos archivo por hora fija,
#         # la hora se resolverá semana a semana en el task
#         job = run_backtesting_idea.apply_async(
#             kwargs=dict(
#                 desde=desde,
#                 hasta=hasta,
#                 symbol=symbol,
#                 estrategia=estrategia,
#                 timeHour=None,
#                 risk=risk,
#                 modo='MEJORES_HORARIOS',
#                 credit_target=credit_target,
#             ),
#             queue="backtesting_idea2"
#         )
#     else:
#         # Modo HORA_FIJA: comportamiento original
#         archivo = f"/var/www/html/backtestingmarket/predictor_data/makekos/{symbol}/{symbol}_{estrategia}_strikes_{timeHour}.csv"
#         if not os.path.exists(archivo):
#             return jsonify({'error': 'Archivo de strikes no encontrado'}), 404

#         job = run_backtesting_idea.apply_async(
#             kwargs=dict(
#                 desde=desde,
#                 hasta=hasta,
#                 symbol=symbol,
#                 estrategia=estrategia,
#                 timeHour=timeHour,
#                 risk=risk,
#                 modo='HORA_FIJA',
#                 credit_target=credit_target,
#             ),
#             queue="backtesting_idea2"
#         )

#     return jsonify({
#         "status": "queued",
#         "task_id": job.id,
#         "modo": modo
#     })


# def obtener_ultimo_underlying_price(archivo_option_chain):
#     try:
#         option_chain_df = pd.read_csv(archivo_option_chain)
#         option_chain_df['timestamp'] = pd.to_datetime(option_chain_df['timestamp'])
#         option_chain_df = option_chain_df.sort_values(by='timestamp')
#         return option_chain_df['underlying_price'].iloc[-1] if 'underlying_price' in option_chain_df.columns else None
#     except:
#         return None


# @backtestingIdea2_bp.route("/procesar_fila_vertical", methods=["POST"])
# @login_required
# def procesar_fila_vertical():
#     print(f"[AUDIT] procesar_fila_vertical user_id={current_user.id} email={current_user.email}")
#     if _check_rate_limit(current_user.id, 'procesar_vertical'):
#         return jsonify({"error": "Too many requests. Please wait a moment."}), 429
#     import time as _time
#     _start = _time.time()
#     data = request.get_json()
#     try:
#         symbol = data.get("symbol")
#         fecha = data.get("Day")
#         hora = data.get("Time")
#         tipo = data.get("Option")  # "CALL" o "PUT"
#         print(fecha)

#         print("📌 [DEBUG] Day recibido del cliente:", fecha)
#         # print("📌 [DEBUG] Time recibido del cliente:", hora)
#         print("simbolo:", symbol)
#         print("fecha:", fecha)
#         strikes_raw = data.get("Strikes", "")
#         strike1, strike2 = [float(s) for s in strikes_raw.split("/")]

#         if tipo == "PUT" and strike1 < strike2:
#             strike1, strike2 = strike2, strike1
#         if tipo == "CALL" and strike1 > strike2:
#             strike1, strike2 = strike2, strike1

#         df_chain = _load_chain(symbol, fecha, PATH_UBUNTU_CHAINS)
#         if df_chain.empty:
#             return jsonify({"status": "error", "message": "No se pudo cargar option chain"}), 400

#         time_obj = pd.to_datetime(f"{fecha} {hora}")
#         credit_inicial, _ = calcular_credito_y_underlying_desde_df(df_chain, strike1, strike2, tipo, time_obj)
#         if not isinstance(credit_inicial, (int, float)):
#             return jsonify({"status": "error", "message": "No se pudo calcular el crédito inicial"}), 400

#         evolution = []
#         df_filtered = df_chain.reset_index()
#         df_filtered = df_filtered[df_filtered["timestamp"] >= time_obj]

#         for ts, group in df_filtered.groupby("timestamp"):
#             try:
#                 avg1 = group[group["strike"] == strike1][f"avg_{tipo.lower()}"].iloc[0]
#                 avg2 = group[group["strike"] == strike2][f"avg_{tipo.lower()}"].iloc[0]
#                 underlying_price = group["underlying_price"].iloc[0]

#                 credit_actual = avg1 - avg2
#                 profit_loss = (credit_inicial - credit_actual) * 100
#                 evolution.append({
#                     "timestamp": ts.strftime("%Y-%m-%d %H:%M:%S"),
#                     "credit": float(round(credit_actual * 100, 2)),
#                     "profit_loss": float(round(profit_loss, 2)),
#                     "underlying_price": float(round(underlying_price, 2))

#                 })
#             except:
#                 continue

#         print(f"[TIMING] procesar_fila_vertical user_id={current_user.id} elapsed={round(_time.time()-_start,2)}s")
#         return jsonify({
#             "status": "success",
#             "symbol": symbol,
#             "fecha": fecha,
#             "strikes": f"{strike1}/{strike2}",
#             "tipo": tipo,
#             "initial_credit": round(credit_inicial * 100, 2),
#             "evolution": evolution,

#         })

#     except Exception as e:
#         return jsonify({"status": "error", "message": str(e)}), 500


# @backtestingIdea2_bp.route("/procesar_fila_ironcondor", methods=["POST"])
# @login_required
# def procesar_fila_ironcondor():
#     print(f"[AUDIT] procesar_fila_ironcondor user_id={current_user.id} email={current_user.email}")
#     if _check_rate_limit(current_user.id, 'procesar_ironcondor'):
#         return jsonify({"error": "Too many requests. Please wait a moment."}), 429
#     import time as _time
#     _start = _time.time()
#     data = request.get_json()
#     try:
#         symbol = data.get("symbol")
#         fecha = data.get("Day")
#         hora = data.get("Time")
#         strikes_raw = data.get("Strikes", "").strip()

#         # ✅ Separar bloques PUT y CALL
#         try:
#             put_block, call_block = strikes_raw.split(" ")
#             put1, put2 = [float(s) for s in put_block.split("/")]
#             call1, call2 = [float(s) for s in call_block.split("/")]

#             sell_put, buy_put = max(put1, put2), min(put1, put2)
#             sell_call, buy_call = min(call1, call2), max(call1, call2)
#         except Exception:
#             return jsonify({"status": "error", "message": f"Strikes inválidos: {strikes_raw}"}), 400

#         df_chain = _load_chain(symbol, fecha, PATH_UBUNTU_CHAINS)
#         if df_chain.empty:
#             return jsonify({"status": "error", "message": "No se pudo cargar option chain"}), 400

#         time_obj = pd.to_datetime(f"{fecha} {hora}")
#         df_filtered = df_chain.reset_index()
#         df_filtered = df_filtered[df_filtered["timestamp"] >= time_obj]

#         # ✅ Calcular crédito inicial
#         def obtener_credito(df, ts):
#             try:
#                 df_momento = df[df["timestamp"] >= ts]
#                 ts_primero = df_momento["timestamp"].min()
#                 df_ts = df[df["timestamp"] == ts_primero]

#                 c1 = df_ts[df_ts["strike"] == sell_call]["avg_call"].iloc[0]
#                 c2 = df_ts[df_ts["strike"] == buy_call]["avg_call"].iloc[0]
#                 p1 = df_ts[df_ts["strike"] == sell_put]["avg_put"].iloc[0]
#                 p2 = df_ts[df_ts["strike"] == buy_put]["avg_put"].iloc[0]
#                 return (c1 - c2) + (p1 - p2), ts_primero
#             except Exception as e:
#                 print(f"[ERROR crédito inicial]: {e}")
#                 return None, None

#         credit_inicial, timestamp_inicial = obtener_credito(df_filtered, time_obj)
#         if credit_inicial is None:
#             return jsonify({"status": "error", "message": "No se pudo calcular el crédito inicial"}), 400

#         # ✅ Generar evolución
#         evolution = []
#         for ts, group in df_filtered.groupby("timestamp"):
#             try:
#                 c1 = group[group["strike"] == sell_call]["avg_call"].iloc[0]
#                 c2 = group[group["strike"] == buy_call]["avg_call"].iloc[0]
#                 p1 = group[group["strike"] == sell_put]["avg_put"].iloc[0]
#                 p2 = group[group["strike"] == buy_put]["avg_put"].iloc[0]
#                 underlying_price = group["underlying_price"].iloc[0]

#                 credit_actual = (c1 - c2) + (p1 - p2)
#                 profit_loss = (credit_inicial - credit_actual) * 100

#                 evolution.append({
#                     "timestamp": ts.strftime("%Y-%m-%d %H:%M:%S"),
#                     "credit": float(round(credit_actual * 100, 2)),
#                     "profit_loss": float(round(profit_loss, 2)),
#                     "underlying_price": float(round(underlying_price, 2))

#                 })
#             except:
#                 continue

#         print(f"[TIMING] procesar_fila_ironcondor user_id={current_user.id} elapsed={round(_time.time()-_start,2)}s")
#         return jsonify({
#             "status": "success",
#             "symbol": symbol,
#             "fecha": fecha,
#             "strikes": f"{sell_put}/{buy_put} {sell_call}/{buy_call}",
#             "initial_credit": round(credit_inicial * 100, 2),
#             "evolution": evolution,

#         })

#     except Exception as e:
#         return jsonify({"status": "error", "message": str(e)}), 500


# @backtestingIdea2_bp.route("/get_mejores_horarios", methods=["GET"])
# @login_required
# def get_mejores_horarios():
#     print(f"[AUDIT] get_mejores_horarios user_id={current_user.id} email={current_user.email}")
#     if _check_rate_limit(current_user.id, 'mejores_horarios'):
#         return jsonify({"error": "Too many requests. Please wait a moment."}), 429
#     symbol = request.args.get('symbol', 'SPX')
#     estrategia = request.args.get('estrategia', 'IronCondor')
#     risk = request.args.get('risk', 'agresivo').lower()

#     riesgo_map = {
#         'conservador': 'cons',
#         'intermedio': 'inte',
#         'agresivo': 'agre',
#         'ultra_agresivo': 'ultr'
#     }
#     sufijo = riesgo_map.get(risk, risk)
#     csv_path = os.path.join(CSV_TOP3_BASE, f"top3_horarios_{symbol}_{estrategia}_{sufijo}.csv")

#     if not os.path.exists(csv_path):
#         return jsonify({'error': f'CSV no encontrado: {csv_path}'}), 404

#     df = pd.read_csv(csv_path)
#     df['end_date'] = pd.to_datetime(df['end_date'])

#     semanas = []
#     for end_date, grupo in df.groupby('end_date'):
#         # Semana de aplicación = lunes siguiente al end_date (viernes)
#         lunes_aplicacion = end_date + pd.Timedelta(days=3)
#         viernes_aplicacion = lunes_aplicacion + pd.Timedelta(days=4)
#         label_semana = f"{lunes_aplicacion.strftime('%d/%m')} - {viernes_aplicacion.strftime('%d/%m/%Y')}"

#         rankings = []
#         for _, row in grupo.sort_values('rank').iterrows():
#             hora_raw = str(int(float(row['hora'])))
#             rankings.append({
#                 'rank': int(row['rank']),
#                 'hora': hora_raw,
#                 'profit': float(row['profit']) if pd.notna(row.get('profit')) else None
#             })

#         semanas.append({
#             'semana': label_semana,
#             'end_date': end_date.strftime('%Y-%m-%d'),
#             'rankings': rankings
#         })

#     # Ordenar por end_date descendente (más reciente primero)
#     semanas.sort(key=lambda x: x['end_date'], reverse=True)

#     return jsonify({'semanas': semanas})


# @backtestingIdea2_bp.route("/task_result/<task_id>", methods=["GET"])
# @login_required
# def task_result(task_id):
#     task = AsyncResult(task_id, app=celery_app)

#     # Task revocado por el usuario
#     if task.state == 'REVOKED':
#         return jsonify({"state": "REVOKED", "result_ready": False}), 200

#     if not task.ready():
#         return jsonify({
#             "state": task.state,
#             "result_ready": False
#         })

#     if task.failed():
#         return jsonify({
#             "state": "FAILURE",
#             "error": str(task.result)
#         }), 500

#     # 🔥 ESTO es lo que el HTML espera
#     return jsonify(task.result)


# @backtestingIdea2_bp.route("/cancel_task/<task_id>", methods=["POST"])
# @login_required
# def cancel_task(task_id):
#     try:
#         celery_app.control.revoke(task_id, terminate=True, signal='SIGTERM')
#         return jsonify({"status": "cancelled", "task_id": task_id})
#     except Exception as e:
#         return jsonify({"status": "error", "message": str(e)}), 500







from celery_app import celery_app
from celery.result import AsyncResult
import os
import pandas as pd
from flask import Blueprint, jsonify, request
from datetime import datetime
import math
import gc
from flask_login import login_required, current_user


from functools import lru_cache
import pyarrow as pa
import pyarrow.parquet as pq
import sys

from tasks_backtestingidea2 import run_backtesting_idea

import redis as redis_lib
import datetime
import pytz
_rate_redis = redis_lib.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# ── Rate limiting ─────────────────────────────────────────────
RATE_LIMIT_WINDOW      = 60   # segundos
RATE_LIMIT_MAX_DIA     = 5    # 8am-6pm NY — uso normal, anti-bot
RATE_LIMIT_MAX_NOCHE   = 50   # 6pm-8am NY — más permisivo
RATE_LIMIT_WHITELIST   = {9}  # user_ids sin límite siempre (agregar con comas: {9, 15, 23})
RATE_LIMIT_HORA_ON     = 8    # 8am NY — empieza horario restrictivo
RATE_LIMIT_HORA_OFF    = 16   # 6pm NY — termina horario restrictivo
# ─────────────────────────────────────────────────────────────

def _check_rate_limit(user_id, endpoint, max_calls=8, window=60):
    """Devuelve True si el usuario superó el límite."""
    if user_id in RATE_LIMIT_WHITELIST:
        return False  # sin límite siempre

    # Horario NY (maneja EST/EDT automáticamente)
    tz_ny = pytz.timezone('America/New_York')
    hora_ny = datetime.datetime.now(tz_ny).hour

    # Seleccionar límite según horario
    if RATE_LIMIT_HORA_ON <= hora_ny < RATE_LIMIT_HORA_OFF:
        limite = RATE_LIMIT_MAX_DIA    # 8am-6pm: límite estricto
    else:
        limite = RATE_LIMIT_MAX_NOCHE  # 6pm-8am: límite permisivo

    key = f"rl:{endpoint}:{user_id}"
    count = _rate_redis.incr(key)
    if count == 1:
        _rate_redis.expire(key, RATE_LIMIT_WINDOW)
    if count > limite:
        print(f"[RATE_LIMIT] endpoint={endpoint} user_id={user_id} count={count} limite={limite}")
        return True
    return False


backtestingIdea2_bp = Blueprint('backtestingIdea2', __name__, template_folder='templates')

# Ruta base para archivos
PATH_UBUNTU_CHAINS = "/var/www/html/flask_project/"


def _load_chain(symbol: str, fecha: str, base: str) -> pd.DataFrame:
    folder = os.path.join(base, "chains")
    prefix = "$" if symbol in ["SPX", "RUT", "XSP"] else ""
    pq_path = os.path.join(folder, f"optionChain_{prefix}{symbol}_{fecha}.parquet")

    # print(f"[DEBUG] Intentando cargar {symbol} para {fecha}")

    if os.path.exists(pq_path):
        # print(f"[DEBUG] Leyendo PARQUET desde {pq_path}")
        df = pq.read_table(pq_path).to_pandas()
    else:
        # print(f"[ERROR] No existe PARQUET para {fecha} (ni CSV)")
        return pd.DataFrame()

    df["avg_call"] = (df["bid_call"] + df["ask_call"]) / 2
    df["avg_put"] = (df["bid_put"] + df["ask_put"]) / 2
    df.set_index(["timestamp", "strike"], inplace=True)

    return df


# ============================ FUNCIONES VERTICALES ============================


def calcular_credito_y_underlying_desde_archivo_csv(archivo_option_chain, strike1, strike2, option_type, timeHour):
    try:
        option_chain_df = pd.read_csv(archivo_option_chain, usecols=range(9))
        option_chain_df['timestamp'] = pd.to_datetime(option_chain_df['timestamp'])
        filtered_chain = option_chain_df[option_chain_df['timestamp'] >= timeHour]
        if filtered_chain.empty:
            return 0, None

        strikes_data = filtered_chain[(filtered_chain['strike'].isin([strike1, strike2]))]
        if option_type == 'PUT':
            strikes_data = strikes_data[["strike", "bid_put", "ask_put", "underlying_price"]]
        elif option_type == 'CALL':
            strikes_data = strikes_data[["strike", "bid_call", "ask_call", "underlying_price"]]

        strike1_data = strikes_data[strikes_data['strike'] == strike1].iloc[0]
        strike2_data = strikes_data[strikes_data['strike'] == strike2].iloc[0]

        if option_type == 'PUT':
            strike1_avg = (strike1_data['bid_put'] + strike1_data['ask_put']) / 2
            strike2_avg = (strike2_data['bid_put'] + strike2_data['ask_put']) / 2
        elif option_type == 'CALL':
            strike1_avg = (strike1_data['bid_call'] + strike1_data['ask_call']) / 2
            strike2_avg = (strike2_data['bid_call'] + strike2_data['ask_call']) / 2

        credit_received = strike1_avg - strike2_avg
        underlying_price = strike1_data['underlying_price']

        return credit_received, underlying_price
    except:
        return 0, None


# ========================= CREDIT TARGET HELPERS =========================

def _buscar_credito_con_target_vertical(df_chain, strike1, strike2, option_type, entry_time, credit_target):
    """
    Vectorizado: busca el primer timestamp con crédito >= credit_target sostenido 3 ticks consecutivos.
    - Primer tick del día ya cumple: devuelve crédito real.
    - Cumple más tarde: devuelve credit_target / 100.
    - Ninguno cumple: devuelve (None, None, None) → NO_ENTRY.
    """
    try:
        df = df_chain.reset_index()
        entry_time = entry_time.replace(second=30, microsecond=0)
        col = f'avg_{option_type.lower()}'

        df_filtered = df[
            (df['timestamp'] >= entry_time) &
            (df['strike'].isin([strike1, strike2]))
        ][['timestamp', 'strike', col, 'underlying_price']]

        if df_filtered.empty:
            return None, None, None

        pivot = df_filtered.pivot_table(index='timestamp', columns='strike', values=col, aggfunc='first')

        if strike1 not in pivot.columns or strike2 not in pivot.columns:
            return None, None, None

        credits = (pivot[strike1] - pivot[strike2]) * 100
        underlying_ts = df_filtered.drop_duplicates('timestamp').set_index('timestamp')['underlying_price']

        cumple = (credits >= credit_target).fillna(False)

        # Si el primer tick ya cumple → entrar directo, sin esperar 3 ticks
        if bool(cumple.iloc[0]):
            primer_ts = cumple.index[0]
            credit_real = credits.loc[primer_ts] / 100
            if credit_real > abs(strike1 - strike2):
                return None, None, None
            underlying   = underlying_ts.loc[primer_ts] if primer_ts in underlying_ts.index else None
            entry_ts_str = pd.Timestamp(primer_ts).strftime('%H:%M')
            return credit_real, underlying, entry_ts_str

        # Si no cumple desde el inicio → buscar 3 ticks consecutivos
        confirmado = (cumple.astype(int).rolling(3).sum() >= 3)

        if not confirmado.any():
            return None, None, None

        primer_ts_confirmado = confirmado[confirmado].index[0]
        credit_real = credits.loc[primer_ts_confirmado] / 100

        # Validar crédito máximo
        if credit_real > abs(strike1 - strike2):
            return None, None, None

        underlying   = underlying_ts.loc[primer_ts_confirmado] if primer_ts_confirmado in underlying_ts.index else None
        entry_ts_str = pd.Timestamp(primer_ts_confirmado).strftime('%H:%M')
        return credit_target / 100, underlying, entry_ts_str

    except Exception as e:
        print(f"[ERROR] _buscar_credito_con_target_vertical(): {e}")
        return None, None, None


def _buscar_credito_con_target_ic(df_chain, strikes, entry_time, credit_target):
    """
    Vectorizado: busca el primer timestamp con crédito IC >= credit_target sostenido 3 ticks consecutivos.
    """
    try:
        df = df_chain.reset_index()
        entry_time = entry_time.replace(second=30, microsecond=0)

        all_strikes = [
            strikes['Call_Strike1'], strikes['Call_Strike2'],
            strikes['Put_Strike1'],  strikes['Put_Strike2']
        ]

        df_filtered = df[
            (df['timestamp'] >= entry_time) &
            (df['strike'].isin(all_strikes))
        ][['timestamp', 'strike', 'avg_call', 'avg_put', 'underlying_price']]

        if df_filtered.empty:
            return None, None, None

        pivot_call = df_filtered.pivot_table(index='timestamp', columns='strike', values='avg_call', aggfunc='first')
        pivot_put  = df_filtered.pivot_table(index='timestamp', columns='strike', values='avg_put',  aggfunc='first')

        cs1, cs2 = strikes['Call_Strike1'], strikes['Call_Strike2']
        ps1, ps2 = strikes['Put_Strike1'],  strikes['Put_Strike2']

        if not all(s in pivot_call.columns for s in [cs1, cs2]):
            return None, None, None
        if not all(s in pivot_put.columns for s in [ps1, ps2]):
            return None, None, None

        idx = pivot_call.index.intersection(pivot_put.index)
        pivot_call = pivot_call.loc[idx]
        pivot_put  = pivot_put.loc[idx]

        credit_call   = pivot_call[cs1] - pivot_call[cs2]
        credit_put    = pivot_put[ps1]  - pivot_put[ps2]
        credits_total = (credit_call + credit_put) * 100

        underlying_ts = df_filtered.drop_duplicates('timestamp').set_index('timestamp')['underlying_price']

        cumple = credits_total >= credit_target
        cumple = cumple.fillna(False)

        # Si el primer tick ya cumple → entrar directo, sin esperar 3 ticks
        if bool(cumple.iloc[0]):
            primer_ts = cumple.index[0]
            credit_call_val  = credit_call.loc[primer_ts]
            credit_put_val   = credit_put.loc[primer_ts]
            credit_total_val = credit_call_val + credit_put_val
            put_spread  = strikes['Put_Strike1']  - strikes['Put_Strike2']
            call_spread = strikes['Call_Strike2'] - strikes['Call_Strike1']
            if credit_total_val > min(put_spread, call_spread):
                return None, None, None
            underlying   = underlying_ts.loc[primer_ts] if primer_ts in underlying_ts.index else None
            entry_ts_str = pd.Timestamp(primer_ts).strftime('%H:%M')
            return credit_total_val, underlying, entry_ts_str

        # Si no cumple desde el inicio → buscar 3 ticks consecutivos
        confirmado = (cumple.astype(int).rolling(3).sum() >= 3)

        if not confirmado.any():
            return None, None, None

        primer_ts_confirmado = confirmado[confirmado].index[0]

        credit_call_val  = credit_call.loc[primer_ts_confirmado]
        credit_put_val   = credit_put.loc[primer_ts_confirmado]
        credit_total_val = credit_call_val + credit_put_val

        # Validar crédito máximo
        put_spread  = strikes['Put_Strike1']  - strikes['Put_Strike2']
        call_spread = strikes['Call_Strike2'] - strikes['Call_Strike1']
        max_credit  = min(put_spread, call_spread)
        if credit_total_val > max_credit:
            return None, None, None

        underlying   = underlying_ts.loc[primer_ts_confirmado] if primer_ts_confirmado in underlying_ts.index else None
        entry_ts_str = pd.Timestamp(primer_ts_confirmado).strftime('%H:%M')
        return credit_target / 100, underlying, entry_ts_str

    except Exception as e:
        print(f"[ERROR] _buscar_credito_con_target_ic(): {e}")
        return None, None, None


def calcular_precios_verticales(vertical_strikes_df, path_csv_base, desplazamiento, desde, hasta, credit_target=0):
    import gc
    resultados = []
    symbol = vertical_strikes_df['Symbol'].iloc[0]
    vertical_strikes_df['Day'] = pd.to_datetime(vertical_strikes_df['Day'])

    # Filtrar rango de fechas
    vertical_strikes_df = vertical_strikes_df[
        (vertical_strikes_df['Day'] >= pd.to_datetime(desde)) &
        (vertical_strikes_df['Day'] <= pd.to_datetime(hasta))
    ].copy()

    # Aplicar desplazamientos
    vertical_strikes_df['Strike1_Desplazado'] = vertical_strikes_df.apply(
        lambda row: row['Strike1'] + desplazamiento if row['Option_Type'] == 'CALL' else row['Strike1'] - desplazamiento,
        axis=1)
    vertical_strikes_df['Strike2_Desplazado'] = vertical_strikes_df.apply(
        lambda row: row['Strike2'] + desplazamiento if row['Option_Type'] == 'CALL' else row['Strike2'] - desplazamiento,
        axis=1)

    df_cache = {}  # Cache optionChain

    for _, row in vertical_strikes_df.iterrows():
        fecha = pd.to_datetime(row['Day']).strftime('%Y-%m-%d')
        hora = row['Hour']
        specific_time = pd.to_datetime(f"{row['Day']} {hora}")
        strike1 = row['Strike1_Desplazado']
        strike2 = row['Strike2_Desplazado']
        option_type = row['Option_Type']

        # Load optionChain
        if fecha not in df_cache:
            try:
                df_cache[fecha] = _load_chain(symbol, fecha, path_csv_base)
            except:
                df_cache[fecha] = pd.DataFrame()

        df_chain = df_cache[fecha]

        credit_result, underlying_price_specific_time = calcular_credito_y_underlying_desde_df(
            df_chain, strike1, strike2, option_type, specific_time)

        # ✅ Validar ambos: strikes faltantes o underlying vacío
        if credit_result is None or underlying_price_specific_time is None:

            continue

        underlying_price_16 = obtener_ultimo_underlying_price_desde_df(df_chain)
        spreadDiff = abs(strike1 - strike2)

        # 🚫 Validar crédito máximo
        if credit_result > spreadDiff:
            continue

        # 🎯 Credit Target: iterar timestamps buscando el primero que cumpla
        entry_ts = hora  # default: hora fija/dinámica
        if credit_target > 0:
            credit_result, underlying_price_specific_time, entry_ts_found = _buscar_credito_con_target_vertical(
                df_chain, strike1, strike2, option_type, specific_time, credit_target
            )
            if credit_result is None:
                # Ningún timestamp del día cumplió el target → NO_ENTRY
                resultados.append({
                    'Day': fecha,
                    'Time': hora,
                    'Strikes': f"{strike1}/{strike2}",
                    'Option': option_type,
                    'Credit': None,
                    'Price': underlying_price_specific_time,
                    'Close': underlying_price_16,
                    'P/L': 0.0,
                    'Diff': round(underlying_price_16 - strike1, 2) if underlying_price_16 else None,
                    'itmOtm': 'NO_ENTRY',
                    "movimiento_esperado": row["movimiento_esperado"],
                    "score_30min": row["score_30min"],
                })
                if fecha in df_cache:
                    del df_cache[fecha]
                    gc.collect()
                continue
            entry_ts = entry_ts_found

        if underlying_price_16 is not None:
            if option_type == 'CALL':
                if underlying_price_16 > max(strike1, strike2):
                    ganancia_dia = (credit_result - spreadDiff) * 100
                elif underlying_price_16 < min(strike1, strike2):
                    ganancia_dia = credit_result * 100
                else:
                    valor_itm = (underlying_price_16 - min(strike1, strike2)) * 100
                    ganancia_dia = credit_result * 100 - valor_itm
            else:  # PUT
                if underlying_price_16 < min(strike1, strike2):
                    ganancia_dia = (credit_result - spreadDiff) * 100
                elif underlying_price_16 > max(strike1, strike2):
                    ganancia_dia = credit_result * 100
                else:
                    valor_itm = (max(strike1, strike2) - underlying_price_16) * 100
                    ganancia_dia = credit_result * 100 - valor_itm
        else:
            ganancia_dia = None

        # ITM/OTM status
        if underlying_price_16 is not None:
            if option_type == 'CALL':
                if underlying_price_16 > max(strike1, strike2):
                    itmOtm = "ITM"
                elif underlying_price_16 < min(strike1, strike2):
                    itmOtm = "OTM"
                else:
                    itmOtm = "ITM P"
            else:
                if underlying_price_16 < min(strike1, strike2):
                    itmOtm = "ITM"
                elif underlying_price_16 > max(strike1, strike2):
                    itmOtm = "OTM"
                else:
                    itmOtm = "ITM P"
        else:
            itmOtm = "ERROR"

        resultados.append({
            'Day': fecha,
            'Time': entry_ts,
            'Strikes': f"{strike1}/{strike2}",
            'Option': option_type,
            'Credit': round(credit_result * 100, 2),
            'Price': underlying_price_specific_time,
            'Close': underlying_price_16,
            'P/L': round(ganancia_dia, 2) if ganancia_dia is not None else None,
            'Diff': round(underlying_price_16 - strike1, 2) if underlying_price_16 else None,
            'itmOtm': itmOtm,
            "movimiento_esperado": row["movimiento_esperado"],
            "score_30min": row["score_30min"],

        })

        # Liberar memoria si es necesario
        if fecha in df_cache:
            del df_cache[fecha]
            gc.collect()

    df_resultados = pd.DataFrame(resultados)

    if df_resultados.empty:
        return df_resultados, 0.0, 0, 0

    # ✅ Filtro de respaldo por si algo se cuela
    df_resultados = df_resultados[
        df_resultados['Credit'].notnull() &
        df_resultados['Price'].notnull()
    ].copy()



    profit_total = df_resultados['P/L'].sum(skipna=True)
    positivos = df_resultados[df_resultados['P/L'] >= 0].shape[0]
    negativos = df_resultados[df_resultados['P/L'] < 0].shape[0]
    
    return df_resultados, profit_total, positivos, negativos


def calcular_credito_y_underlying_desde_df(df, strike1, strike2, option_type, timeHour):
    try:
        # Asegura segundos en 00 si es necesario
        timeHour = timeHour.replace(second=15, microsecond=0)
        df_debug = df.reset_index()  # en parquet ya tiene index = ["timestamp", "strike"]

        filtered_chain = df_debug[df_debug['timestamp'] >= timeHour]

        strikes_data = filtered_chain[filtered_chain['strike'].isin([strike1, strike2])]

        # print ("strikes_data:", strikes_data)

        if strikes_data.empty or len(strikes_data['strike'].unique()) < 2:
            print("[WARNING] → No se encontraron ambos strikes.")
            return 0, None

        strike1_data = strikes_data[strikes_data['strike'] == strike1].iloc[0]
        strike2_data = strikes_data[strikes_data['strike'] == strike2].iloc[0]

        # MOSTRAR EL TIMESTAMP REAL DE LAS FILAS USADAS
        # print(f"[DEBUG] → Timestamp usado strike1: {strike1_data['timestamp']}")
        # print(f"[DEBUG] → Timestamp usado strike2: {strike2_data['timestamp']}")

        avg1 = strike1_data[f'avg_{option_type.lower()}']
        avg2 = strike2_data[f'avg_{option_type.lower()}']
        credit_received = avg1 - avg2
        underlying_price = strike1_data['underlying_price']
        return credit_received, underlying_price
    except Exception as e:
        print(f"[ERROR] calcular_credito_y_underlying_desde_df(): {e}")
        return None, None


def obtener_ultimo_underlying_price_desde_df(df):
    try:
        return df.sort_values(by='timestamp')['underlying_price'].iloc[-1]
    except:
        return None

# ============================ FUNCIONES IRON CONDOR ============================


def calcular_ganancia_perdida_dia_iron_condor(credit_result, underlying_price_16, strikes):
    if underlying_price_16 is None or credit_result is None:
        return None

    diff_put = strikes['Put_Strike1'] - strikes['Put_Strike2']
    diff_call = strikes['Call_Strike2'] - strikes['Call_Strike1']

    if underlying_price_16 < strikes['Put_Strike2'] or underlying_price_16 > strikes['Call_Strike2']:
        return (credit_result - max(diff_put, diff_call)) * 100
    if strikes['Put_Strike1'] <= underlying_price_16 <= strikes['Call_Strike1']:
        return credit_result * 100
    if strikes['Put_Strike2'] <= underlying_price_16 < strikes['Put_Strike1']:
        return (credit_result - min(strikes['Put_Strike1'] - underlying_price_16, diff_put)) * 100
    if strikes['Call_Strike1'] < underlying_price_16 <= strikes['Call_Strike2']:
        return (credit_result - min(underlying_price_16 - strikes['Call_Strike1'], diff_call)) * 100

    return None


def calcular_precios_iron_condor(df, path_csv_base, desplazamiento, desde, hasta, credit_target=0):
    import gc

    resultados = []
    symbol = df['Symbol'].iloc[0]
    df['Day'] = pd.to_datetime(df['Day'])
    df = df[(df['Day'] >= pd.to_datetime(desde)) & (df['Day'] <= pd.to_datetime(hasta))].copy()

    df_cache = {}

    for _, row in df.iterrows():
        fecha = row['Day'].strftime('%Y-%m-%d')
        hora = row['Hour']
        specific_time = pd.to_datetime(f"{fecha} {hora}")

        strikes = {
            'Call_Strike1': row['Call_Strike1'] + desplazamiento,
            'Call_Strike2': row['Call_Strike2'] + desplazamiento,
            'Put_Strike1': row['Put_Strike1'] - desplazamiento,
            'Put_Strike2': row['Put_Strike2'] - desplazamiento
        }

        # 🔍 DEBUG TEMPORAL


        # 🚫 Validar estructura Iron Condor: short put <= short call
        short_put  = strikes['Put_Strike1']   # pata corta del put spread
        short_call = strikes['Call_Strike1']  # pata corta del call spread
        if short_put > short_call:

            continue

        if fecha not in df_cache:
            try:
                df_cache[fecha] = _load_chain(symbol, fecha, path_csv_base)
            except:
                df_cache[fecha] = pd.DataFrame()

        df_chain = df_cache[fecha]

        credit_result, underlying_price, _, _ = calcular_credito_y_underlying_iron_condor_df(
            df_chain, strikes, specific_time)

        # ✅ Validar ambos: si no hay strikes o underlying → SKIP
        if credit_result is None or underlying_price is None:

            continue

        # 🚫 Validar crédito máximo
        put_spread = strikes['Put_Strike1'] - strikes['Put_Strike2']
        call_spread = strikes['Call_Strike2'] - strikes['Call_Strike1']
        max_credit = min(put_spread, call_spread)
        if credit_result > max_credit:

            continue

        # 🎯 Credit Target: iterar timestamps buscando el primero que cumpla
        underlying_price_16 = obtener_ultimo_underlying_price_desde_df(df_chain)
        entry_ts = hora  # default: hora fija/dinámica
        if credit_target > 0:
            credit_result, underlying_price, entry_ts_found = _buscar_credito_con_target_ic(
                df_chain, strikes, specific_time, credit_target
            )
            if credit_result is None:
                # Ningún timestamp del día cumplió el target → NO_ENTRY
                resultados.append({
                    'Day': fecha,
                    'Time': hora,
                    'Strikes': f"{strikes['Put_Strike2']}/{strikes['Put_Strike1']} {strikes['Call_Strike1']}/{strikes['Call_Strike2']}",
                    'Price': None,
                    'Close': underlying_price_16,
                    'Credit': None,
                    'P/L': 0.0,
                    'Dif': None,
                    'itmOtm': 'NO_ENTRY',
                    "movimiento_esperado": row.get("movimiento_esperado", None),
                    "score_30min": row.get("score_30min", None),
                })
                del df_cache[fecha]
                gc.collect()
                continue
            entry_ts = entry_ts_found

        ganancia_dia = calcular_ganancia_perdida_dia_iron_condor(
            credit_result, underlying_price_16, strikes
        )

        if underlying_price_16 is not None:
            if underlying_price_16 < strikes['Put_Strike2'] or underlying_price_16 > strikes['Call_Strike2']:
                itmOtm = "ITM"
            elif strikes['Put_Strike1'] <= underlying_price_16 <= strikes['Call_Strike1']:
                itmOtm = "OTM"
            else:
                itmOtm = "ITM P"
        else:
            itmOtm = "N/A"

        dif_call = f"{(underlying_price_16 - strikes['Call_Strike1']):.2f}" if underlying_price_16 else None
        dif_put = f"{(underlying_price_16 - strikes['Put_Strike1']):.2f}" if underlying_price_16 else None

        resultados.append({
            'Day': fecha,
            'Time': entry_ts,
            'Strikes': f"{strikes['Put_Strike2']}/{strikes['Put_Strike1']} {strikes['Call_Strike1']}/{strikes['Call_Strike2']}",
            'Price': underlying_price,
            'Close': underlying_price_16,
            'Credit': round(credit_result * 100, 2),
            'P/L': round(ganancia_dia, 2) if isinstance(ganancia_dia, (int, float)) else None,
            'Dif': f"{dif_put}(P) / {dif_call}(C)",
            'itmOtm': itmOtm,
            "movimiento_esperado": row.get("movimiento_esperado", None),
            "score_30min": row.get("score_30min", None),
        })

        del df_cache[fecha]
        gc.collect()

    df_resultados = pd.DataFrame(resultados)

    if df_resultados.empty:
        return df_resultados, 0.0, 0, 0

    # ✅ Filtro de respaldo: nada de Credit o Price nulos
    df_resultados = df_resultados[
        df_resultados['Credit'].notnull() &
        df_resultados['Price'].notnull()
    ].copy()



    profit_total = df_resultados['P/L'].sum(skipna=True)
    positivos = df_resultados[df_resultados['P/L'] >= 0].shape[0]
    negativos = df_resultados[df_resultados['P/L'] < 0].shape[0]

    return df_resultados, profit_total, positivos, negativos


def calcular_credito_y_underlying_iron_condor_df(df, strikes, timeHour):
    try:
        timeHour = timeHour.replace(second=30, microsecond=0)
        df_debug = df.reset_index()
        filtered = df_debug[
            (df_debug['timestamp'] >= timeHour) &
            (df_debug['strike'].isin([
                strikes['Call_Strike1'], strikes['Call_Strike2'],
                strikes['Put_Strike1'],  strikes['Put_Strike2']
            ]))
        ]
        if filtered.empty:
            return None, None, None, None

        # Pivot vectorizado — primer timestamp disponible por strike
        ts_min = filtered['timestamp'].min()
        snap = filtered[filtered['timestamp'] == ts_min]

        cs1 = snap[snap['strike'] == strikes['Call_Strike1']]['avg_call']
        cs2 = snap[snap['strike'] == strikes['Call_Strike2']]['avg_call']
        ps1 = snap[snap['strike'] == strikes['Put_Strike1']]['avg_put']
        ps2 = snap[snap['strike'] == strikes['Put_Strike2']]['avg_put']

        if cs1.empty or cs2.empty or ps1.empty or ps2.empty:
            return None, None, None, None

        credit_calls = cs1.iloc[0] - cs2.iloc[0]
        credit_puts  = ps1.iloc[0] - ps2.iloc[0]
        credit_received = credit_calls + credit_puts
        underlying_price = snap['underlying_price'].iloc[0]
        return credit_received, underlying_price, credit_calls, credit_puts

    except Exception as e:
        print(f"[ERROR] calcular_credito_y_underlying_iron_condor_df(): {e}")
        return None, None, None, None


# ============================ MEJORES HORARIOS ============================

CSV_TOP3_BASE = "/var/www/html/backtestingmarket"

def obtener_hora_desde_csv(symbol, estrategia, risk, fecha_dia):
    """
    Dado un día de backtesting, busca en el CSV de top3_horarios el registro
    con rank=1 cuyo end_date corresponde al viernes de la semana anterior.
    Retorna el horario como string (ej: '1320') o None si no hay registro.
    """
    try:
        # Mapear riesgo al sufijo del archivo
        riesgo_map = {
            'conservador': 'cons',
            'intermedio': 'inte',
            'agresivo': 'agre',
            'ultra_agresivo': 'ultr'
        }
        sufijo = riesgo_map.get(risk.lower(), risk.lower())
        csv_path = os.path.join(CSV_TOP3_BASE, f"top3_horarios_{symbol}_{estrategia}_{sufijo}.csv")

        if not os.path.exists(csv_path):
            print(f"[MEJORES_HORARIOS] CSV no encontrado: {csv_path}")
            return None

        df_top = pd.read_csv(csv_path)
        df_top['end_date'] = pd.to_datetime(df_top['end_date'])

        # El viernes de la semana anterior al día actual
        fecha = pd.to_datetime(fecha_dia)
        dias_hasta_lunes = fecha.weekday()  # lunes=0, viernes=4
        lunes_semana_actual = fecha - pd.Timedelta(days=dias_hasta_lunes)
        viernes_semana_anterior = lunes_semana_actual - pd.Timedelta(days=3)  # viernes = lunes - 3

        # Buscar rank=1 con ese end_date
        fila = df_top[(df_top['end_date'] == viernes_semana_anterior) & (df_top['rank'] == 1)]

        if fila.empty:
            print(f"[MEJORES_HORARIOS] Sin registro rank=1 para end_date={viernes_semana_anterior.date()} ({fecha_dia})")
            return None

        hora_raw = fila.iloc[0]['hora']
        # Convertir float como 1320.0 → '1320'
        return str(int(float(hora_raw)))

    except Exception as e:
        print(f"[ERROR] obtener_hora_desde_csv(): {e}")
        return None


# ============================ RUTA PRINCIPAL ============================

@backtestingIdea2_bp.route('/get_backtesting_idea', methods=['GET'])
@login_required
def get_backtesting_idea():
    print(f"[AUDIT] get_backtesting_idea user_id={current_user.id} email={current_user.email}")
    if _check_rate_limit(current_user.id, 'backtesting_idea'):
        return jsonify({"error": "Too many requests. Please wait a moment."}), 429
    from tasks_backtestingidea2 import run_backtesting_idea

    desde = request.args.get('desde')
    hasta = request.args.get('hasta')
    symbol = request.args.get('symbol', 'SPX')
    estrategia = request.args.get('estrategia', 'Vertical')
    timeHour = request.args.get('hora', '1340')
    risk = request.args.get('risk', 'Intermedio').lower()
    modo = request.args.get('modo', 'HORA_FIJA')  # HORA_FIJA | MEJORES_HORARIOS
    credit_target = float(request.args.get('credit_target', 0) or 0)

    if modo == 'MEJORES_HORARIOS':
        # En modo dinámico no validamos archivo por hora fija,
        # la hora se resolverá semana a semana en el task
        job = run_backtesting_idea.apply_async(
            kwargs=dict(
                desde=desde,
                hasta=hasta,
                symbol=symbol,
                estrategia=estrategia,
                timeHour=None,
                risk=risk,
                modo='MEJORES_HORARIOS',
                credit_target=credit_target,
            ),
            queue="backtesting_idea2"
        )
    else:
        # Modo HORA_FIJA: comportamiento original
        archivo = f"/var/www/html/backtestingmarket/predictor_data/makekos/{symbol}/{symbol}_{estrategia}_strikes_{timeHour}.csv"
        if not os.path.exists(archivo):
            return jsonify({'error': 'Archivo de strikes no encontrado'}), 404

        job = run_backtesting_idea.apply_async(
            kwargs=dict(
                desde=desde,
                hasta=hasta,
                symbol=symbol,
                estrategia=estrategia,
                timeHour=timeHour,
                risk=risk,
                modo='HORA_FIJA',
                credit_target=credit_target,
            ),
            queue="backtesting_idea2"
        )

    return jsonify({
        "status": "queued",
        "task_id": job.id,
        "modo": modo
    })


def obtener_ultimo_underlying_price(archivo_option_chain):
    try:
        option_chain_df = pd.read_csv(archivo_option_chain)
        option_chain_df['timestamp'] = pd.to_datetime(option_chain_df['timestamp'])
        option_chain_df = option_chain_df.sort_values(by='timestamp')
        return option_chain_df['underlying_price'].iloc[-1] if 'underlying_price' in option_chain_df.columns else None
    except:
        return None


@backtestingIdea2_bp.route("/procesar_fila_vertical", methods=["POST"])
@login_required
def procesar_fila_vertical():
    print(f"[AUDIT] procesar_fila_vertical user_id={current_user.id} email={current_user.email}")
    if _check_rate_limit(current_user.id, 'procesar_vertical'):
        return jsonify({"error": "Too many requests. Please wait a moment."}), 429
    import time as _time
    _start = _time.time()
    data = request.get_json()
    try:
        symbol = data.get("symbol")
        fecha = data.get("Day")
        hora = data.get("Time")
        tipo = data.get("Option")  # "CALL" o "PUT"
        print(fecha)

        print("📌 [DEBUG] Day recibido del cliente:", fecha)
        # print("📌 [DEBUG] Time recibido del cliente:", hora)
        print("simbolo:", symbol)
        print("fecha:", fecha)
        strikes_raw = data.get("Strikes", "")
        strike1, strike2 = [float(s) for s in strikes_raw.split("/")]

        if tipo == "PUT" and strike1 < strike2:
            strike1, strike2 = strike2, strike1
        if tipo == "CALL" and strike1 > strike2:
            strike1, strike2 = strike2, strike1

        df_chain = _load_chain(symbol, fecha, PATH_UBUNTU_CHAINS)
        if df_chain.empty:
            return jsonify({"status": "error", "message": "No se pudo cargar option chain"}), 400

        time_obj = pd.to_datetime(f"{fecha} {hora}")
        credit_inicial, _ = calcular_credito_y_underlying_desde_df(df_chain, strike1, strike2, tipo, time_obj)
        if not isinstance(credit_inicial, (int, float)):
            return jsonify({"status": "error", "message": "No se pudo calcular el crédito inicial"}), 400

        evolution = []
        df_filtered = df_chain.reset_index()
        df_filtered = df_filtered[df_filtered["timestamp"] >= time_obj]

        for ts, group in df_filtered.groupby("timestamp"):
            try:
                avg1 = group[group["strike"] == strike1][f"avg_{tipo.lower()}"].iloc[0]
                avg2 = group[group["strike"] == strike2][f"avg_{tipo.lower()}"].iloc[0]
                underlying_price = group["underlying_price"].iloc[0]

                credit_actual = avg1 - avg2
                profit_loss = (credit_inicial - credit_actual) * 100
                evolution.append({
                    "timestamp": ts.strftime("%Y-%m-%d %H:%M:%S"),
                    "credit": float(round(credit_actual * 100, 2)),
                    "profit_loss": float(round(profit_loss, 2)),
                    "underlying_price": float(round(underlying_price, 2))

                })
            except:
                continue

        print(f"[TIMING] procesar_fila_vertical user_id={current_user.id} elapsed={round(_time.time()-_start,2)}s")
        return jsonify({
            "status": "success",
            "symbol": symbol,
            "fecha": fecha,
            "strikes": f"{strike1}/{strike2}",
            "tipo": tipo,
            "initial_credit": round(credit_inicial * 100, 2),
            "evolution": evolution,

        })

    except Exception as e:
        return jsonify({"status": "error", "message": str(e)}), 500


@backtestingIdea2_bp.route("/procesar_fila_ironcondor", methods=["POST"])
@login_required
def procesar_fila_ironcondor():
    print(f"[AUDIT] procesar_fila_ironcondor user_id={current_user.id} email={current_user.email}")
    if _check_rate_limit(current_user.id, 'procesar_ironcondor'):
        return jsonify({"error": "Too many requests. Please wait a moment."}), 429
    import time as _time
    _start = _time.time()
    data = request.get_json()
    try:
        symbol = data.get("symbol")
        fecha = data.get("Day")
        hora = data.get("Time")
        strikes_raw = data.get("Strikes", "").strip()

        # ✅ Separar bloques PUT y CALL
        try:
            put_block, call_block = strikes_raw.split(" ")
            put1, put2 = [float(s) for s in put_block.split("/")]
            call1, call2 = [float(s) for s in call_block.split("/")]

            sell_put, buy_put = max(put1, put2), min(put1, put2)
            sell_call, buy_call = min(call1, call2), max(call1, call2)
        except Exception:
            return jsonify({"status": "error", "message": f"Strikes inválidos: {strikes_raw}"}), 400

        df_chain = _load_chain(symbol, fecha, PATH_UBUNTU_CHAINS)
        if df_chain.empty:
            return jsonify({"status": "error", "message": "No se pudo cargar option chain"}), 400

        time_obj = pd.to_datetime(f"{fecha} {hora}")
        df_filtered = df_chain.reset_index()
        df_filtered = df_filtered[df_filtered["timestamp"] >= time_obj]

        # ✅ Calcular crédito inicial
        def obtener_credito(df, ts):
            try:
                df_momento = df[df["timestamp"] >= ts]
                ts_primero = df_momento["timestamp"].min()
                df_ts = df[df["timestamp"] == ts_primero]

                c1 = df_ts[df_ts["strike"] == sell_call]["avg_call"].iloc[0]
                c2 = df_ts[df_ts["strike"] == buy_call]["avg_call"].iloc[0]
                p1 = df_ts[df_ts["strike"] == sell_put]["avg_put"].iloc[0]
                p2 = df_ts[df_ts["strike"] == buy_put]["avg_put"].iloc[0]
                return (c1 - c2) + (p1 - p2), ts_primero
            except Exception as e:
                print(f"[ERROR crédito inicial]: {e}")
                return None, None

        credit_inicial, timestamp_inicial = obtener_credito(df_filtered, time_obj)
        if credit_inicial is None:
            return jsonify({"status": "error", "message": "No se pudo calcular el crédito inicial"}), 400

        # ✅ Generar evolución
        evolution = []
        for ts, group in df_filtered.groupby("timestamp"):
            try:
                c1 = group[group["strike"] == sell_call]["avg_call"].iloc[0]
                c2 = group[group["strike"] == buy_call]["avg_call"].iloc[0]
                p1 = group[group["strike"] == sell_put]["avg_put"].iloc[0]
                p2 = group[group["strike"] == buy_put]["avg_put"].iloc[0]
                underlying_price = group["underlying_price"].iloc[0]

                credit_actual = (c1 - c2) + (p1 - p2)
                profit_loss = (credit_inicial - credit_actual) * 100

                evolution.append({
                    "timestamp": ts.strftime("%Y-%m-%d %H:%M:%S"),
                    "credit": float(round(credit_actual * 100, 2)),
                    "profit_loss": float(round(profit_loss, 2)),
                    "underlying_price": float(round(underlying_price, 2))

                })
            except:
                continue

        print(f"[TIMING] procesar_fila_ironcondor user_id={current_user.id} elapsed={round(_time.time()-_start,2)}s")
        return jsonify({
            "status": "success",
            "symbol": symbol,
            "fecha": fecha,
            "strikes": f"{sell_put}/{buy_put} {sell_call}/{buy_call}",
            "initial_credit": round(credit_inicial * 100, 2),
            "evolution": evolution,

        })

    except Exception as e:
        return jsonify({"status": "error", "message": str(e)}), 500


@backtestingIdea2_bp.route("/get_mejores_horarios", methods=["GET"])
@login_required
def get_mejores_horarios():
    print(f"[AUDIT] get_mejores_horarios user_id={current_user.id} email={current_user.email}")
    if _check_rate_limit(current_user.id, 'mejores_horarios'):
        return jsonify({"error": "Too many requests. Please wait a moment."}), 429
    symbol = request.args.get('symbol', 'SPX')
    estrategia = request.args.get('estrategia', 'IronCondor')
    risk = request.args.get('risk', 'agresivo').lower()

    riesgo_map = {
        'conservador': 'cons',
        'intermedio': 'inte',
        'agresivo': 'agre',
        'ultra_agresivo': 'ultr'
    }
    sufijo = riesgo_map.get(risk, risk)
    csv_path = os.path.join(CSV_TOP3_BASE, f"top3_horarios_{symbol}_{estrategia}_{sufijo}.csv")

    if not os.path.exists(csv_path):
        return jsonify({'error': f'CSV no encontrado: {csv_path}'}), 404

    df = pd.read_csv(csv_path)
    df['end_date'] = pd.to_datetime(df['end_date'])

    semanas = []
    for end_date, grupo in df.groupby('end_date'):
        # Semana de aplicación = lunes siguiente al end_date (viernes)
        lunes_aplicacion = end_date + pd.Timedelta(days=3)
        viernes_aplicacion = lunes_aplicacion + pd.Timedelta(days=4)
        label_semana = f"{lunes_aplicacion.strftime('%d/%m')} - {viernes_aplicacion.strftime('%d/%m/%Y')}"

        rankings = []
        for _, row in grupo.sort_values('rank').iterrows():
            hora_raw = str(int(float(row['hora'])))
            rankings.append({
                'rank': int(row['rank']),
                'hora': hora_raw,
                'profit': float(row['profit']) if pd.notna(row.get('profit')) else None
            })

        semanas.append({
            'semana': label_semana,
            'end_date': end_date.strftime('%Y-%m-%d'),
            'rankings': rankings
        })

    # Ordenar por end_date descendente (más reciente primero)
    semanas.sort(key=lambda x: x['end_date'], reverse=True)

    return jsonify({'semanas': semanas})


@backtestingIdea2_bp.route("/task_result/<task_id>", methods=["GET"])
@login_required
def task_result(task_id):
    task = AsyncResult(task_id, app=celery_app)

    # Task revocado por el usuario
    if task.state == 'REVOKED':
        return jsonify({"state": "REVOKED", "result_ready": False}), 200

    if not task.ready():
        return jsonify({
            "state": task.state,
            "result_ready": False
        })

    if task.failed():
        return jsonify({
            "state": "FAILURE",
            "error": str(task.result)
        }), 500

    # 🔥 ESTO es lo que el HTML espera
    return jsonify(task.result)


@backtestingIdea2_bp.route("/cancel_task/<task_id>", methods=["POST"])
@login_required
def cancel_task(task_id):
    try:
        celery_app.control.revoke(task_id, terminate=True, signal='SIGTERM')
        return jsonify({"status": "cancelled", "task_id": task_id})
    except Exception as e:
        return jsonify({"status": "error", "message": str(e)}), 500