# tasks.py
import glob
import os
import pandas as pd
from datetime import datetime
from celery import Celery, chord
import numpy as np



# Configuración de Celery (asegúrate de que coincida con la configuración de app.py)
celery = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')


# Tarea para procesar un solo día (se ejecuta de forma independiente)
@celery.task(queue='opcionsigma_queue')
def process_day_task(args):
    file_date_str, file_path, horario, call_delta, put_delta, call_spread, put_spread, takeProfit, stopLoss, unit, desplazamiento = args
    file_date = datetime.strptime(file_date_str, '%Y-%m-%d')
    try:
        df = pd.read_parquet(file_path)
    except Exception as e:
        return {
            'status': 'error',
            'message': f'Error leyendo {file_path}: {e}',
            'data': [],
            'file': file_path
        }
    return process_day(df, horario, call_delta, put_delta, call_spread, put_spread, takeProfit, stopLoss, unit, desplazamiento)



@celery.task(queue='opcionsigma_queue')  # Solo para tasks.py
def aggregate_results(results):
    # Filtrar resultados válidos
    results = [res for res in results if res is not None]
    if not results:
        return {'status': 'error', 'message': 'No se encontraron datos procesables.', 'data': []}

    # Cargar gaps
    csv_path_gap = '/var/www/html/flask_project/spx_history_with_gap.csv'
    df_gap = pd.read_csv(csv_path_gap)
    gap_dict = dict(zip(df_gap['date'], df_gap['gap']))

    # Cargar eventos / noticias
    csv_path_eventos = '/var/www/html/flask_project/holidays.csv'
    df_eventos = pd.read_csv(csv_path_eventos, header=None, names=["date", "event"])
    df_eventos['date'] = pd.to_datetime(df_eventos['date']).dt.strftime('%Y-%m-%d')
    eventos_dict = dict(zip(df_eventos['date'], df_eventos['event']))

    # Agregar gap y evento a cada resultado
    for res in results:
        date_str = res['Time'].split(" ")[0]
        res['Gap'] = gap_dict.get(date_str)
        res['News'] = eventos_dict.get(date_str)

    return {
        'status': 'success',
        'data': results,
        'message': 'Datos procesados con gap y eventos'
    }


@celery.task(queue='opcionsigma_queue')
def process_backtesting_task(data):
    symbol = data.get('symbol')
    if symbol in ['SPX', 'XSP', 'RUT']:
        symbol = '$' + symbol

    fecha_desde = data.get('fechaDesde')
    fecha_hasta = data.get('fechaHasta')
    horario = data.get('horario') + ":00"
    call_delta = data['deltas']['call']
    put_delta = data['deltas']['put']
    takeProfit = data.get('takeProfit')
    stopLoss = data.get('stopLoss')
    unit = data.get('unit')
    call_spread = data['spreads']['call']
    put_spread = data['spreads']['put']
    desplazamiento = float(data['desplazamiento'])

    fecha_desde_dt = datetime.strptime(fecha_desde, '%Y-%m-%d')
    fecha_hasta_dt = datetime.strptime(fecha_hasta, '%Y-%m-%d')

    pattern = f"/var/www/html/flask_project/chains/optionChain_{symbol}_*.csv"
    files_with_dates = []

    for file_path in glob.glob(pattern):
        base_name = os.path.basename(file_path)
        parts = base_name.replace('.csv', '').split('_')
        if len(parts) < 3:
            continue
        try:
            file_date_dt = datetime.strptime(parts[-1], '%Y-%m-%d')
        except ValueError:
            continue
        if fecha_desde_dt <= file_date_dt <= fecha_hasta_dt:
            parquet_path = file_path.replace(".csv", ".parquet")
            if not os.path.exists(parquet_path):
                try:
                    df = pd.read_csv(file_path)
                    df.to_parquet(parquet_path, index=False, compression='snappy')
                except Exception as e:
                    continue
            files_with_dates.append((file_date_dt, parquet_path))

    files_with_dates.sort(key=lambda x: x[0])
    if not files_with_dates:
        return {'status': 'error', 'message': 'No se encontraron archivos en el rango seleccionado.', 'data': []}

    args_list = [
        (file_date.strftime('%Y-%m-%d'), file_path, horario, call_delta, put_delta, call_spread, put_spread, takeProfit, stopLoss, unit, desplazamiento)
        for (file_date, file_path) in files_with_dates
    ]

    job = chord([process_day_task.s(args) for args in args_list])(aggregate_results.s())
    return job.id


# Función para encontrar las filas con el timestamp más cercano o posterior al tiempo objetivo
def find_next_closest_time_rows(df, target_time_str, call_delta, put_delta, call_spread, put_spread, takeProfit, stopLoss, unit, desplazamiento):
    """
    Encuentra las filas con el timestamp más cercano o posterior al tiempo objetivo, selecciona los strikes del Iron Condor y calcula el crédito inicial.
    Si los strikes necesarios no están disponibles, intenta con el siguiente timestamp.
    """
    df['Time'] = pd.to_datetime(df['timestamp'], format='%Y-%m-%d %H:%M:%S')
    target_time = datetime.strptime(target_time_str, '%H:%M:%S').time()
    df['only_time'] = df['Time'].dt.time
    # Obtener todos los tiempos únicos ordenados iguales o posteriores al target
    future_times = sorted(set(t for t in df['only_time'].unique() if t >= target_time))

    # # Crear DataFrame para exportar
    # df_future_times = pd.DataFrame(future_times, columns=["future_times"])

    # # Guardar como CSV
    # df_future_times.to_csv("future_times_from_target.csv", index=False)
    # print("✅ Archivo CSV generado: future_times_from_target.csv")

    max_intentos = 10
    intentos = 0
    for t in future_times:
       
        if intentos >= max_intentos:
            print("❌ No se encontraron 4 strikes válidos en los primeros 10 intentos.")
            return None

        rows_at_time = df[df['only_time'] == t].copy()
        if rows_at_time.empty:
            intentos += 1
            continue
        
        rows_at_time['delta_call_diff'] = abs(rows_at_time['delta_call'] - float(call_delta))
        rows_at_time['delta_put_diff'] = abs(rows_at_time['delta_put'] - float(put_delta))

        idx_min_call = rows_at_time['delta_call_diff'].idxmin()
        idx_min_put = rows_at_time['delta_put_diff'].idxmin()

        sell_call = rows_at_time.loc[idx_min_call, 'strike']
        sell_put = rows_at_time.loc[idx_min_put, 'strike']

        buy_call = sell_call + float(call_spread)
        buy_put = sell_put - float(put_spread)

        # Validar si los 4 strikes están presentes en ese timestamp
        strikes_needed = {sell_call, buy_call, sell_put, buy_put}
        available_strikes = set(rows_at_time['strike'].unique())

        if not strikes_needed.issubset(available_strikes):
            intentos += 1
            continue

        credit = calculate_iron_condor_credit(rows_at_time, sell_call, buy_call, sell_put, buy_put)
        if credit is None:
            continue

        credit -= (desplazamiento / 100)
        spx_price = rows_at_time.iloc[0]['underlying_price']
        opening_timestamp = rows_at_time.iloc[0]['timestamp']

        resultado, final_credit, occur_time, evolution  = evaluate_iron_condor_performance(
            df, sell_call, buy_call, sell_put, buy_put, credit, takeProfit, stopLoss, opening_timestamp
        )
        # pd.DataFrame(evolution).to_csv("evolucion_credito.csv", index=False)

        return {
            "Time": pd.to_datetime(opening_timestamp).strftime('%Y-%m-%d %H:%M:%S'),
            "SPX Price": float(spx_price),
            "Outcome": str(resultado),
            "Initial Credit": float(credit),
            "Credit at Occurrence": float(final_credit),
            "Occur Time": occur_time if isinstance(occur_time, str) else pd.to_datetime(occur_time).strftime('%H:%M:%S'),
            "Stop Loss": str(stopLoss),
            "Take Profit": str(takeProfit),
            "Sell Call Strike": float(sell_call),
            "Buy Call Strike": float(buy_call),
            "Sell Put Strike": float(sell_put),
            "Buy Put Strike": float(buy_put)
        }

    return None


# Función que procesa un solo día; esta es la que usa la función anterior
def process_day(day_df, horario, call_delta, put_delta, call_spread, put_spread, takeProfit, stopLoss, unit, desplazamiento):
    result = find_next_closest_time_rows(day_df, horario, call_delta, put_delta, call_spread, put_spread, takeProfit, stopLoss, unit, desplazamiento)
    return result if result else None


# Función para calcular el crédito inicial del Iron Condor
def calculate_iron_condor_credit(df, sell_call, buy_call, sell_put, buy_put):
    sell_call, buy_call, sell_put, buy_put = map(float, [sell_call, buy_call, sell_put, buy_put])
    df['strike'] = df['strike'].astype(float)
    missing_strikes = [strike for strike in [sell_call, buy_call, sell_put, buy_put] if strike not in df['strike'].values]
    if missing_strikes:
        return None
    try:
        sell_call_row = df[df['strike'] == sell_call].iloc[0]
        buy_call_row = df[df['strike'] == buy_call].iloc[0]
        sell_put_row = df[df['strike'] == sell_put].iloc[0]
        buy_put_row = df[df['strike'] == buy_put].iloc[0]
        sell_call_price = (sell_call_row['bid_call'] + sell_call_row['ask_call']) / 2
        buy_call_price = (buy_call_row['bid_call'] + buy_call_row['ask_call']) / 2
        sell_put_price = (sell_put_row['bid_put'] + sell_put_row['ask_put']) / 2
        buy_put_price = (buy_put_row['bid_put'] + buy_put_row['ask_put']) / 2
        credit = (sell_call_price - buy_call_price) + (sell_put_price - buy_put_price)

        return credit
    except IndexError as e:
        return None

def evaluate_iron_condor_performance(df, sell_call, buy_call, sell_put, buy_put, initial_credit, take_profit, stop_loss, opening_timestamp):
    take_profit = float(take_profit)
    stop_loss = float(stop_loss)
    initial_credit = round(float(initial_credit) * 100)
    df["timestamp"] = pd.to_datetime(df["timestamp"])
    df = df[df["timestamp"] >= opening_timestamp].sort_values("timestamp")

    strikes_needed = {sell_call, buy_call, sell_put, buy_put}
    df = df[df["strike"].isin(strikes_needed)]
    if df.empty:
        return "Neutro", initial_credit, opening_timestamp.strftime('%H:%M:%S')

    df["mid_call"] = (df["bid_call"] + df["ask_call"]) / 2
    df["mid_put"] = (df["bid_put"] + df["ask_put"]) / 2

    pivot_df = df.pivot(index="timestamp", columns="strike", values=["mid_call", "mid_put"])
    # pivot_df = pivot_df.rolling(window=5, min_periods=1).mean()

    pivot_df["credit"] = (
        (pivot_df["mid_call", sell_call] - pivot_df["mid_call", buy_call]) +
        (pivot_df["mid_put", sell_put] - pivot_df["mid_put", buy_put])
    ) * 100

    pivot_df["credit"] = pivot_df["credit"].rolling(window=5, min_periods=1).mean()

    # Crear tabla de evolución de crédito
    credit_evolution = []
    for timestamp, row in pivot_df.iterrows():
        current_credit = row["credit"]
        if isinstance(current_credit, pd.Series):
            current_credit = current_credit.iloc[0]
        credit_evolution.append({
            "timestamp": timestamp.strftime('%Y-%m-%d %H:%M:%S'),
            "credit": float(current_credit)
        })

        if current_credit < initial_credit - take_profit:
            return "Ganancia", current_credit, timestamp.strftime('%H:%M:%S'), credit_evolution
        if current_credit > initial_credit + stop_loss:
            return "Pérdida", current_credit, timestamp.strftime('%H:%M:%S'), credit_evolution

    # Calcular pérdida final si no se llegó a TP/SL
    last_underlying_price = df["underlying_price"].iloc[-1]
    call_spread_intrinsic = max(0, last_underlying_price - sell_call)
    put_spread_intrinsic = max(0, sell_put - last_underlying_price)
    total_loss_points = call_spread_intrinsic + put_spread_intrinsic
    total_loss = total_loss_points * 100
    pnl = initial_credit - total_loss

    if pnl > 0:
        resultado = "Ganancia" if pnl > take_profit else "Ganancia Parcial"
        pnl = min(pnl, take_profit)
    elif pnl < 0:
        resultado = "Pérdida" if pnl < -stop_loss else "Pérdida Parcial"
        pnl = max(pnl, -stop_loss)
    else:
        resultado = "Neutro"

    return resultado, pnl, df["timestamp"].iloc[-1].strftime('%H:%M:%S'), credit_evolution

#sudo systemctl daemon-reload
#celery -A tasks.celery worker --loglevel=info
#sudo systemctl start celery        