# tasks.py
import glob
import os
import pandas as pd
import numpy as np
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from celery import Celery, group
import json

# Configuración de Celery (asegúrate de que coincida con la configuración de app.py)
celery = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

# Función auxiliar para impresión de debug (ajusta DEBUG_MODE según necesites)
DEBUG_MODE = False


def debug_print(*args, **kwargs):
    if DEBUG_MODE:
        print(*args, **kwargs)

# Función auxiliar para cargar un CSV


def load_csv(file_info):
    file_date, file_path = file_info
    try:
        return file_date, pd.read_csv(file_path)
    except Exception as e:
        print(f"⚠ Error cargando {file_path}: {e}")
        return file_date, None

# Función para encontrar las filas con el timestamp más cercano o posterior al tiempo objetivo


def find_next_closest_time_rows(df, target_time_str, call_delta, put_delta, call_spread, put_spread, takeProfit, stopLoss, unit, desplazamiento):
    """
    Encuentra las filas con el timestamp más cercano o posterior al tiempo objetivo, selecciona los strikes del Iron Condor y calcula el crédito inicial.
    Se asegura de retornar valores convertidos a tipos nativos (cadenas, float) para que sean serializables a JSON.
    """
    from datetime import datetime
    
    # Convertir la columna 'timestamp' a datetime
    df['Time'] = pd.to_datetime(df['timestamp'], format='%Y-%m-%d %H:%M:%S')
    
    # Convertir target_time_str a objeto time
    target_time = datetime.strptime(target_time_str, '%H:%M:%S').time()
    
    # Extraer solo la hora de los timestamps
    df['only_time'] = df['Time'].dt.time
    
    # Filtrar los timestamps iguales o posteriores al target
    future_times = df[df['only_time'] >= target_time].copy()
    if future_times.empty:
        debug_print(f"⚠ No hay timestamps iguales o posteriores a {target_time_str}")
        return None
    
    # Calcular la diferencia en segundos y obtener el mínimo
    future_times['time_diff'] = future_times['only_time'].apply(
        lambda x: abs((datetime.combine(datetime.min, x) - datetime.combine(datetime.min, target_time)).total_seconds())
    )
    min_diff = future_times['time_diff'].min()
    
    # Seleccionar las filas con la mínima diferencia
    closest_rows = future_times[future_times['time_diff'] == min_diff].copy()
    
    # Obtener el timestamp de apertura (de la columna original 'timestamp')
    opening_timestamp = closest_rows.iloc[0]['timestamp']
    
    debug_print("Timestamp más cercano encontrado:")
    debug_print(closest_rows[['timestamp']].drop_duplicates())
    debug_print("\nFilas completas con el timestamp más cercano:")
    debug_print(closest_rows)
    
    # Calcular la diferencia en delta
    closest_rows['delta_call_diff'] = abs(closest_rows['delta_call'] - float(call_delta))
    closest_rows['delta_put_diff'] = abs(closest_rows['delta_put'] - float(put_delta))
    
    idx_min_call_delta = closest_rows['delta_call_diff'].idxmin()
    idx_min_put_delta = closest_rows['delta_put_diff'].idxmin()
    
    sell_call = closest_rows.loc[idx_min_call_delta, 'strike']
    sell_put = closest_rows.loc[idx_min_put_delta, 'strike']
    
    # Determinar los strikes de compra basados en los spreads
    buy_call = sell_call + float(call_spread)
    buy_put = sell_put - float(put_spread)
    
    # Calcular el crédito inicial del Iron Condor
    credit = calculate_iron_condor_credit(closest_rows, sell_call, buy_call, sell_put, buy_put)
    spx_price = closest_rows.iloc[0]["underlying_price"]
    
    if credit is None:
        debug_print(f"🚨 Día descartado: {opening_timestamp} - No se pudo calcular el crédito.")
        return None
    
    # Ajustar el crédito según el desplazamiento
    credit = credit - (desplazamiento / 100)
    
    # Evaluar el desempeño del Iron Condor
    resultado, final_credit, occur_time = evaluate_iron_condor_performance(
        df, sell_call, buy_call, sell_put, buy_put, credit, takeProfit, stopLoss, opening_timestamp
    )
    
    debug_print(f"📌 Resultado final del Iron Condor: {resultado}")
    
    # Armar el diccionario final con conversiones explícitas
    result_dict = {
        "Time": pd.to_datetime(opening_timestamp).strftime('%Y-%m-%d %H:%M:%S'),
        "SPX Price": float(spx_price),
        "Outcome": str(resultado),
        "Initial Credit": float(credit),
        "Credit at Occurrence": float(final_credit),
        "Occur Time": occur_time if isinstance(occur_time, str) else pd.to_datetime(occur_time).strftime('%H:%M:%S'),
        "Stop Loss": str(stopLoss),
        "Take Profit": str(takeProfit),
        "Sell Call Strike": float(sell_call),
        "Buy Call Strike": float(buy_call),
        "Sell Put Strike": float(sell_put),
        "Buy Put Strike": float(buy_put)
    }
    
    # Depuración: imprimir el tipo y valor de cada clave
    for key, value in result_dict.items():
        print(f"{key}: {value} ({type(value)})")
    
    return result_dict

@celery.task
def process_day_task(args):
    # args: (file_date_str, file_path, horario, call_delta, put_delta, call_spread, put_spread, takeProfit, stopLoss, unit, desplazamiento)
    from datetime import datetime
    file_date_str, file_path, horario, call_delta, put_delta, call_spread, put_spread, takeProfit, stopLoss, unit, desplazamiento = args
    # Si es necesario, reconvierte la fecha
    file_date = datetime.strptime(file_date_str, '%Y-%m-%d')
    df = pd.read_csv(file_path)
    return process_day(df, horario, call_delta, put_delta, call_spread, put_spread, takeProfit, stopLoss, unit, desplazamiento)


# Función que procesa un solo día; esta es la que usa la función anterior
def process_day(day_df, horario, call_delta, put_delta, call_spread, put_spread, takeProfit, stopLoss, unit, desplazamiento):
    result = find_next_closest_time_rows(day_df, horario, call_delta, put_delta, call_spread, put_spread, takeProfit, stopLoss, unit, desplazamiento)
    return result if result else None

# Wrapper para desempaquetar argumentos


def process_day_wrapper(args):
    (file_date, day_df), horario, call_delta, put_delta, call_spread, put_spread, takeProfit, stopLoss, unit, desplazamiento = args
    return process_day(day_df, horario, call_delta, put_delta, call_spread, put_spread, takeProfit, stopLoss, unit, desplazamiento)

# Función para calcular el crédito inicial del Iron Condor


def calculate_iron_condor_credit(df, sell_call, buy_call, sell_put, buy_put):
    sell_call, buy_call, sell_put, buy_put = map(float, [sell_call, buy_call, sell_put, buy_put])
    df['strike'] = df['strike'].astype(float)
    missing_strikes = [strike for strike in [sell_call, buy_call, sell_put, buy_put] if strike not in df['strike'].values]
    if missing_strikes:
        debug_print(f"⚠ Strikes faltantes en el DataFrame: {missing_strikes}")
        debug_print(f"📌 Strikes disponibles en el df: {df['strike'].unique()}")
        debug_print("❌ Día descartado debido a datos insuficientes para el Iron Condor.")
        return None
    try:
        sell_call_row = df[df['strike'] == sell_call].iloc[0]
        buy_call_row = df[df['strike'] == buy_call].iloc[0]
        sell_put_row = df[df['strike'] == sell_put].iloc[0]
        buy_put_row = df[df['strike'] == buy_put].iloc[0]
        sell_call_price = (sell_call_row['bid_call'] + sell_call_row['ask_call']) / 2
        buy_call_price = (buy_call_row['bid_call'] + buy_call_row['ask_call']) / 2
        sell_put_price = (sell_put_row['bid_put'] + sell_put_row['ask_put']) / 2
        buy_put_price = (buy_put_row['bid_put'] + buy_put_row['ask_put']) / 2
        credit = (sell_call_price - buy_call_price) + (sell_put_price - buy_put_price)
        debug_print("\n🔹 Valores de precios:")
        debug_print(f"📌 Sell Call ({sell_call}): {sell_call_price:.2f}")
        debug_print(f"📌 Buy Call ({buy_call}): {buy_call_price:.2f}")
        debug_print(f"📌 Sell Put ({sell_put}): {sell_put_price:.2f}")
        debug_print(f"📌 Buy Put ({buy_put}): {buy_put_price:.2f}")
        return credit
    except IndexError as e:
        debug_print(f"⚠ Error al seleccionar filas del DataFrame: {e}")
        return None

# Función para evaluar el desempeño del Iron Condor


def evaluate_iron_condor_performance(df, sell_call, buy_call, sell_put, buy_put, initial_credit, take_profit, stop_loss, opening_timestamp):
    take_profit = float(take_profit)
    stop_loss = float(stop_loss)
    initial_credit = float(initial_credit) * 100
    debug_print("\n📊 Evaluando el desempeño del Iron Condor...")
    df["timestamp"] = pd.to_datetime(df["timestamp"])
    df = df[df["timestamp"] >= opening_timestamp].sort_values("timestamp")
    strikes_needed = {sell_call, buy_call, sell_put, buy_put}
    df = df[df["strike"].isin(strikes_needed)]
    if df.empty:
        debug_print("⚠ No hay datos suficientes después del timestamp de apertura.")
        return "Neutross", initial_credit, opening_timestamp.strftime('%H:%M:%S')
    df["mid_call"] = (df["bid_call"] + df["ask_call"]) / 2
    df["mid_put"] = (df["bid_put"] + df["ask_put"]) / 2
    pivot_df = df.pivot(index="timestamp", columns="strike", values=["mid_call", "mid_put"])
    pivot_df = pivot_df.rolling(window=3, min_periods=1).mean()
    pivot_df["credit"] = (
        (pivot_df["mid_call", sell_call] - pivot_df["mid_call", buy_call]) +
        (pivot_df["mid_put", sell_put] - pivot_df["mid_put", buy_put])
    ) * 100
    for timestamp, row in pivot_df.iterrows():
        current_credit = row["credit"]
        if isinstance(current_credit, pd.Series):
            current_credit = current_credit.iloc[0]
        debug_print(f"⏳ {timestamp} | Crédito: {current_credit:.2f} | P&L: {(initial_credit - current_credit):.2f}")
        if current_credit <= initial_credit - take_profit:
            debug_print(f"✅ Take Profit alcanzado en {timestamp} con crédito de {current_credit:.2f}")
            return "Ganancia", current_credit, timestamp.strftime('%H:%M:%S')
        if current_credit >= initial_credit + stop_loss:
            debug_print(f"❌ Stop Loss alcanzado en {timestamp} con crédito de {current_credit:.2f}")
            return "Pérdida", current_credit, timestamp.strftime('%H:%M:%S')
 
 
   #  # Evaluar el resultado parcial al cierre
    pnl = initial_credit - current_credit
    if pnl >= 0:
        resultado = "Ganancia parcial"
    elif pnl < 0:
        resultado = "Pérdida parcial"
    else:
        resultado = "Neutron"

    debug_print("📌 Iron Condor llegó al cierre sin alcanzar TP ni SL")
    return resultado, current_credit, timestamp.strftime('%H:%M:%S')

    # debug_print("📌 Iron Condor llegó al final sin alcanzar TP ni SL")
    # return "Neutro", current_credit, timestamp.strftime('%H:%M:%S')

# Tarea para procesar un solo día (se ejecuta de forma independiente)


@celery.task
def process_day_task(args):
    # args: (file_date_str, file_path, horario, call_delta, put_delta, call_spread, put_spread, takeProfit, stopLoss, unit, desplazamiento)
    from datetime import datetime
    file_date_str, file_path, horario, call_delta, put_delta, call_spread, put_spread, takeProfit, stopLoss, unit, desplazamiento = args
    # Si es necesario, reconvierte la fecha
    file_date = datetime.strptime(file_date_str, '%Y-%m-%d')
    df = pd.read_csv(file_path)
    return process_day(df, horario, call_delta, put_delta, call_spread, put_spread, takeProfit, stopLoss, unit, desplazamiento)


@celery.task
def aggregate_results(results):
    # Filtrar resultados válidos
    results = [res for res in results if res is not None]
    if not results:
        return {'status': 'error', 'message': 'No se encontraron datos procesables.', 'data': []}

    import pandas as pd
    csv_path_gap = '/var/www/html/flask_project/spx_history_with_gap.csv'
    df_gap = pd.read_csv(csv_path_gap)
    gap_dict = dict(zip(df_gap['date'], df_gap['gap']))

    # Agregar la información de gap a cada resultado
    for res in results:
        date_str = res['Time'].split(" ")[0]
        res['Gap'] = gap_dict.get(date_str)

    return {
        'status': 'success',
        'data': results,
        'message': 'Datos procesados correctamente'
    }

@celery.task
def process_backtesting_task(data):
    print("Iniciando tarea de backtesting...")
    # Extraer parámetros del JSON recibido
    symbol = data.get('symbol')
    if symbol in ['SPX', 'XSP', 'RUT']:
        symbol = '$' + symbol

    fecha_desde = data.get('fechaDesde')
    fecha_hasta = data.get('fechaHasta')
    horario = data.get('horario') + ":00"
    call_delta = data['deltas']['call']
    put_delta = data['deltas']['put']
    takeProfit = data.get('takeProfit')
    stopLoss = data.get('stopLoss')
    unit = data.get('unit')
    call_spread = data['spreads']['call']
    put_spread = data['spreads']['put']
    desplazamiento = float(data['desplazamiento'])

    from datetime import datetime
    # Convertir las fechas a objetos datetime
    fecha_desde_dt = datetime.strptime(fecha_desde, '%Y-%m-%d')
    fecha_hasta_dt = datetime.strptime(fecha_hasta, '%Y-%m-%d')

    import glob, os
    directory_path = '/var/www/html/flask_project/chains'
    pattern = f"{directory_path}/optionChain_{symbol}_*.csv"
    files_with_dates = []

    for file_path in glob.glob(pattern):
        base_name = os.path.basename(file_path)
        parts = base_name.replace('.csv', '').split('_')
        if len(parts) < 3:
            print(f"⚠ Advertencia: Nombre inesperado en {base_name}, archivo ignorado.")
            continue
        file_date_str = parts[-1]
        try:
            file_date_dt = datetime.strptime(file_date_str, '%Y-%m-%d')
        except ValueError:
            print(f"⚠ Error: Fecha inválida en {base_name}, archivo ignorado.")
            continue
        if fecha_desde_dt <= file_date_dt <= fecha_hasta_dt:
            files_with_dates.append((file_date_dt, file_path))
    files_with_dates.sort(key=lambda x: x[0])

    if not files_with_dates:
        return {'status': 'error', 'message': 'No se encontraron archivos en el rango seleccionado.', 'data': []}

    # Construir args_list usando rutas y convirtiendo la fecha a cadena (serializable)
    args_list = [
        (file_date.strftime('%Y-%m-%d'), file_path, horario, call_delta, put_delta, call_spread, put_spread, takeProfit, stopLoss, unit, desplazamiento)
        for (file_date, file_path) in files_with_dates
    ]

    # Para depuración: imprimir tipos de argumentos
    for args in args_list:
        print("Tipo del primer elemento (fecha):", type(args[0]))   # Debe ser str
        print("Tipo del segundo elemento (ruta):", type(args[1]))    # Debe ser str

    from celery import chord, group
    # Crear un chord: cada subtarea procesa un archivo (un día)
    job = chord([process_day_task.s(args) for args in args_list])(aggregate_results.s())
    
    # En lugar de bloquear con .get() dentro de la tarea (lo cual no se debe hacer),
    # retornamos el id del chord para que se consulte el resultado final desde fuera.
    return job.id