o
    geY                     @   sZ   d Z ddlZddlZddlZddlZddlZddlZddlmZ ddl	ZG dd dZ
dS )z}
This file contains functions to stream data
Coded by Tyler Bowers
Github: https://github.com/tylerebowers/Schwab-API-Python
    N)sleepc                	   @   s  e Zd Zdd ZefddZedfdefddZed	d	dfdefd
dZde	fddZ
dee	B fddZdee	B fddZdAdefddZdBdedede	fddZedeeB fddZdCd!eeB d"eeB ded#e	fd$d%ZdCd!eeB d"eeB ded#e	fd&d'ZdCd!eeB d"eeB ded#e	fd(d)ZdCd!eeB d"eeB ded#e	fd*d+ZdCd!eeB d"eeB ded#e	fd,d-ZdCd!eeB d"eeB ded#e	fd.d/ZdCd!eeB d"eeB ded#e	fd0d1ZdCd!eeB d"eeB ded#e	fd2d3ZdCd!eeB d"eeB ded#e	fd4d5ZdCd!eeB d"eeB ded#e	fd6d7ZdCd!eeB d"eeB ded#e	fd8d9ZdCd!eeB d"eeB ded#e	fd:d;ZdDded#e	fd?d@ZdS )EStreamc                    sD   d _ d _d _d _d _| _i  _ fdd}t| dS )z
        Initialize the stream object to stream data from Schwab Streamer
        :param client: Client object
        :type client: Client
        Nr   Fc                      s    j r	   d S d S N)activestop selfr   //var/www/html/StreammingVPS/schwabdev/stream.pystop_atexit"   s   z$Stream.__init__.<locals>.stop_atexit)	
_websocket_streamer_info_request_idr   _thread_clientsubscriptionsatexitregister)r	   clientr   r   r   r
   __init__   s   zStream.__init__c                    s  | j  }|jr| ddd | _ntd dS tjtj	j
}	 ztjtj	j
}| j jr5td tj| jddd4 I dH | _| j jrNtd	 | jd
d| j jj| jd| jddd}| jt|I dH  || j I dH fi | d| _| j D ]>\}}g }| D ]\}	}
|| j|d|	t|
dd q|r| jtd|iI dH  || j I dH fi | q	 || j I dH fi | q1 I dH sw   Y  W nt tyV } zgd| _|tjju st|dkr| j jrtd W Y d}~dS |tjju st|dkr'td|  W Y d}~dS tjtj	j
| j dkrDtd| d W Y d}~dS td| d W Y d}~nd}~ww q$)z
        Start the streamer
        :param receiver_func: function to call when data is received
        :type receiver_func: function
        streamerInfoNr   z&[Schwabdev] Could not get streamerInfoTz-[Schwabdev] Connecting to streaming server...streamerSocketUrl)ping_intervalz*[Schwabdev] Connected to streaming server.ADMINLOGINschwabClientChannelschwabClientFunctionId)AuthorizationSchwabClientChannelSchwabClientFunctionId)servicecommand
parametersADDkeysfieldsrequestsFz'received 1000 (OK); then sent 1000 (OK)z%[Schwabdev] Stream connection closed.zno close frame received or sentz@[Schwabdev] Stream connection closed (likely no subscriptions): Z   z2[Schwabdev] Stream has crashed within 90 seconds (zO), likely no subscriptions, invalid login, or lost connection (not restarting).z.[Schwabdev] Stream connection lost to server (z), reconnecting...)!r   preferencesokjsongetr   printdatetimenowtimezoneutcverbose
websocketsconnectr   basic_requesttokensaccess_tokensenddumpsrecvr   r   itemsappendr   _list_to_string	Exception
exceptionsConnectionClosedOKstrConnectionClosedErrorseconds)r	   receiver_funckwargsresponse
start_timelogin_payloadr    subsreqskeyr&   er   r   r
   _start_streamer(   sn   
"



zStream._start_streamerTdaemonc                    sL   j s fdd}tj||d_j  dS jjr$td dS dS )z
        Start the stream
        :param receiver: function to call when data is received
        :type receiver: function
        :param daemon: whether to run the thread in the background (as a daemon)
        :type daemon: bool
        c                      s   t jfi   d S r   )asynciorunrM   r   rE   receiverr	   r   r
   _start_asynco   s   z"Stream.start.<locals>._start_asynctargetrN   z"[Schwabdev] Stream already active.N)r   	threadingThreadr   startr   r2   r-   )r	   rR   rN   rE   rS   r   rQ   r
   rX   f   s
   zStream.startFc                    s   t jdddt jjdt jdddt jjd|r#t jdddt jjd|r/t jjjt jjd fdd	}tj| d
  t j 	t jj jt jjd  kr[ksbn t
d dS dS )ar  
        Start the stream automatically at market open and close, will NOT erase subscriptions
        :param receiver: function to call when data is received
        :type receiver: function
        :param after_hours: include after hours trading
        :type after_hours: bool
        :param pre_hours: include pre hours trading
        :type pre_hours: bool
              r   tzinfo   
   ;   c                     s   	 t j t jj} |  jt jjd  kokn  o*d|    ko(dkn  }|rKjsKtj	dkr?j
jr?td jd d n|s^jr^j
jrXtd jdd	 td
 q)NTr[   r      z6[Schwabdev] No subscriptions, starting stream anyways.)rR   rN   z[Schwabdev] Stopping Stream.F)clear_subscriptions   r   )r.   r/   r0   r1   timereplaceweekdayr   lenr   r   r2   r-   rX   r   r   )r/   in_hoursrN   endrE   rR   r	   rX   r   r
   checker   s   D

z"Stream.start_auto.<locals>.checkerrT   zU[Schwabdev] Stream was started outside of active hours and will launch when in hours.N)r.   rc   r0   r1   maxrd   rV   rW   rX   r/   r-   )r	   rR   after_hours	pre_hoursrN   rE   rj   r   rh   r
   
start_autox   s   
2zStream.start_autorequestc           	      C   s\  dd }| dd}| dd}| dd}|dur|| dg }|| dg }|| jvr4i | j|< |d	krb|D ]%}|| j| vrK|| j| |< q:tt|t| j| | B | j| |< q:dS |d
kryi | j|< |D ]	}|| j| |< qmdS |dkr|D ]}|| j| v r| j| | qdS |dkr| j|  D ]}|| j| |< qdS dS dS )z
        Record the request into self.subscriptions (for the event of crashes)
        :param request: request
        :type request: dict
        c                 S   s*   t | tu r| dS t | tu r| S d S )N,)typerA   splitlist)str   r   r
   str_to_list   s   z+Stream._record_request.<locals>.str_to_listr    Nr!   r"   r%   r&   r#   SUBSUNSUBSVIEW)r,   r   rs   setpopr%   )	r	   ro   ru   r    r!   r"   r%   r&   rK   r   r   r
   _record_request   s@   

*
zStream._record_requestr'   c                    sp    fdd}t |tur|g}|D ]} | q jr,td|i}t|| dS  jj	r6t
d dS dS )z
        Send a request to the stream
        :param requests: list of requests or a single request
        :type requests: list | dict
        c                    s    j | I d H  d S r   )r   r8   )to_sendr   r   r
   _send   s   zStream.send.<locals>._sendr'   1[Schwabdev] Stream is not active, request queued.N)rq   rs   r{   r   r+   r9   rO   rP   r   r2   r-   )r	   r'   r}   ro   r|   r   r   r
   r8      s   zStream.sendc                    sj   t |tur
|g}|D ]}| | q| jr)td|i}| j|I dH  dS | jj	r3t
d dS dS )z
        Send an async (must be awaited) request to the stream (functionally equivalent to send)
        :param requests: list of requests or a single request
        :type requests: list | dict
        r'   Nr~   )rq   rs   r{   r   r+   r9   r   r8   r   r2   r-   )r	   r'   ro   r|   r   r   r
   
send_async   s   zStream.send_asyncra   c                 C   s6   |ri | _ |  jd7  _| | jddd d| _dS )z{
        Stop the stream
        :param clear_subscriptions: clear records
        :type clear_subscriptions: bool
           r   LOGOUT)r    r!   FN)r   r   r8   r5   r   )r	   ra   r   r   r
   r      s
   
zStream.stopNr    r!   r"   c                 C   s   | j du r| j }|jr| ddd | _ ntd i S |dur3| D ]}|| du r2||= q'|  jd7  _|	 |	 | j| j d| j dd}|dur]t
|dkr]||d	< |S )
a  
        Create a basic request (all requests follow this format)
        :param service: service to use
        :type service: str
        :param command: command to use ("SUBS"|"ADD"|"UNSUBS"|"VIEW"|"LOGIN"|"LOGOUT")
        :type command: str
        :param parameters: parameters to use
        :type parameters: dict
        :return: stream request
        :rtype: dict
        Nr   r   z*[Schwabdev] Could not use/get streamerInfor   schwabClientCustomerIdschwabClientCorrelId)r    r!   	requestidSchwabClientCustomerIdSchwabClientCorrelIdr"   )r   r   r)   r*   r+   r,   r-   r%   r   upperrf   )r	   r    r!   r"   rF   rK   ro   r   r   r
   r5      s&   



zStream.basic_requestlsc                 C   s0   t | tu r| S t | tu rdtt| S dS )z
        Convert a list to a string (e.g. [1, "B", 3] -> "1,B,3"), or passthrough if already a string
        :param ls: list to convert
        :type ls: list | str
        :return: converted string
        :rtype: str
        rp   N)rq   rA   rs   joinmap)r   r   r   r
   r=      s   	 zStream._list_to_stringr#   r%   r&   returnc                 C   "   | j d|t|t|ddS )ag  
        Level one equities
        :param keys: list of keys to use (e.g. ["AMD", "INTC"])
        :type keys: list | str
        :param fields: list of fields to use
        :type fields: list | str
        :param command: command to use ("SUBS"|"ADD"|"UNSUBS"|"VIEW")
        :type command: str
        :return: stream request
        :rtype: dict
        LEVELONE_EQUITIESr$   r"   r5   r   r=   r	   r%   r&   r!   r   r   r
   level_one_equities,     "zStream.level_one_equitiesc                 C   r   )a!  
        Level one options, key format: [Underlying Symbol (6 characters including spaces) | Expiration (6 characters) | Call/Put (1 character) | Strike Price (5+3=8 characters)]
        :param keys: list of keys to use (e.g. ["GOOG  240809C00095000", "AAPL  240517P00190000"])
        :type keys: list | str
        :param fields: list of fields to use
        :type fields: list | str
        :param command: command to use ("SUBS"|"ADD"|"UNSUBS"|"VIEW")
        :type command: str
        :return: stream request
        :rtype: dict
        LEVELONE_OPTIONSr$   r   r   r   r   r   r
   level_one_options:  r   zStream.level_one_optionsc                 C   r   )aR  
        Level one futures, key format: '/' + 'root symbol' + 'month code' + 'year code'; month code is 1 character: (F: Jan, G: Feb, H: Mar, J: Apr, K: May, M: Jun, N: Jul, Q: Aug, U: Sep, V: Oct, X: Nov, Z: Dec), year code is 2 characters (i.e. 2024 = 24)
        :param keys: list of keys to use (e.g. ["/ESF24", "/GCG24"])
        :type keys: list | str
        :param fields: list of fields to use
        :type fields: list | str
        :param command: command to use ("SUBS"|"ADD"|"UNSUBS"|"VIEW")
        :type command: str
        :return: stream request
        :rtype: dict
        LEVELONE_FUTURESr$   r   r   r   r   r   r
   level_one_futuresH  r   zStream.level_one_futuresc                 C   r   )a  
        Level one futures options, key format: '.' + '/' + 'root symbol' + 'month code' + 'year code' + 'Call/Put code' + 'Strike Price'
        :param keys: list of keys to use (e.g. ["./OZCZ23C565"])
        :type keys: list | str
        :param fields: list of fields to use
        :type fields: list | str
        :param command: command to use ("SUBS"|"ADD"|"UNSUBS"|"VIEW")
        :type command: str
        :return: stream request
        :rtype: dict
        LEVELONE_FUTURES_OPTIONSr$   r   r   r   r   r   r
   level_one_futures_optionsV  r   z Stream.level_one_futures_optionsc                 C   r   )a  
        Level one forex, key format: 'from currency' + '/' + 'to currency'
        :param keys: list of keys to use (e.g. ["EUR/USD", "JPY/USD"])
        :type keys: list | str
        :param fields: list of fields to use
        :type fields: list | str
        :param command: command to use ("SUBS"|"ADD"|"UNSUBS"|"VIEW")
        :type command: str
        :return: stream request
        :rtype: dict
        LEVELONE_FOREXr$   r   r   r   r   r   r
   level_one_forexd  r   zStream.level_one_forexc                 C   r   )ab  
        NYSE book orders
        :param keys: list of keys to use (e.g. ["NIO", "F"])
        :type keys: list | str
        :param fields: list of fields to use
        :type fields: list | str
        :param command: command to use ("SUBS"|"ADD"|"UNSUBS"|"VIEW")
        :type command: str
        :return: stream request
        :rtype: dict
        	NYSE_BOOKr$   r   r   r   r   r   r
   	nyse_bookr  r   zStream.nyse_bookc                 C   r   )ag  
        NASDAQ book orders
        :param keys: list of keys to use (e.g. ["AMD", "CRWD"])
        :type keys: list | str
        :param fields: list of fields to use
        :type fields: list | str
        :param command: command to use ("SUBS"|"ADD"|"UNSUBS"|"VIEW")
        :type command: str
        :return: stream request
        :rtype: dict
        NASDAQ_BOOKr$   r   r   r   r   r   r
   nasdaq_book  r   zStream.nasdaq_bookc                 C   r   )a  
        Options book orders
        :param keys: list of keys to use (e.g. ["GOOG  240809C00095000", "AAPL  240517P00190000"])
        :type keys: list | str
        :param fields: list of fields to use
        :type fields: list | str
        :param command: command to use ("SUBS"|"ADD"|"UNSUBS"|"VIEW")
        :type command: str
        :return: stream request
        :rtype: dict
        OPTIONS_BOOKr$   r   r   r   r   r   r
   options_book  r   zStream.options_bookc                 C   r   )ab  
        Chart equity
        :param keys: list of keys to use (e.g. ["GOOG", "AAPL"])
        :type keys: list | str
        :param fields: list of fields to use
        :type fields: list | str
        :param command: command to use ("SUBS"|"ADD"|"UNSUBS"|"VIEW")
        :type command: str
        :return: stream request
        :rtype: dict
        CHART_EQUITYr$   r   r   r   r   r   r
   chart_equity  r   zStream.chart_equityc                 C   r   )aN  
        Chart futures, key format: '/' + 'root symbol' + 'month code' + 'year code'; month code is 1 character: (F: Jan, G: Feb, H: Mar, J: Apr, K: May, M: Jun, N: Jul, Q: Aug, U: Sep, V: Oct, X: Nov, Z: Dec), year code is 2 characters (i.e. 2024 = 24)
        :param keys: list of keys to use (e.g. ["/ESF24", "/GCG24"])
        :type keys: list | str
        :param fields: list of fields to use
        :type fields: list | str
        :param command: command to use ("SUBS"|"ADD"|"UNSUBS"|"VIEW")
        :type command: str
        :return: stream request
        :rtype: dict
        CHART_FUTURESr$   r   r   r   r   r   r
   chart_futures  r   zStream.chart_futuresc                 C   r   )a  
        Screener equity, key format: (PREFIX)_(SORTFIELD)_(FREQUENCY); Prefix: ($COMPX, $DJI, $SPX.X, INDEX_AL, NYSE, NASDAQ, OTCBB, EQUITY_ALL); Sortfield: (VOLUME, TRADES, PERCENT_CHANGE_UP, PERCENT_CHANGE_DOWN, AVERAGE_PERCENT_VOLUME), Frequency: (0 (all day), 1, 5, 10, 30 60)
        :param keys: list of keys to use (e.g. ["$DJI_PERCENT_CHANGE_UP_60", "NASDAQ_VOLUME_30"])
        :type keys: list | str
        :param fields: list of fields to use
        :type fields: list | str
        :param command: command to use ("SUBS"|"ADD"|"UNSUBS"|"VIEW")
        :type command: str
        :return: stream request
        :rtype: dict
        SCREENER_EQUITYr$   r   r   r   r   r   r
   screener_equity  r   zStream.screener_equityc                 C   r   )au  
        Screener option key format: (PREFIX)_(SORTFIELD)_(FREQUENCY); Prefix: (OPTION_PUT, OPTION_CALL, OPTION_ALL); Sortfield: (VOLUME, TRADES, PERCENT_CHANGE_UP, PERCENT_CHANGE_DOWN, AVERAGE_PERCENT_VOLUME), Frequency: (0 (all day), 1, 5, 10, 30 60)
        :param keys: list of keys to use (e.g. ["OPTION_PUT_PERCENT_CHANGE_UP_60", "OPTION_CALL_TRADES_30"])
        :type keys: list | str
        :param fields: list of fields to use
        :type fields: list | str
        :param command: command to use ("SUBS"|"ADD"|"UNSUBS"|"VIEW")
        :type command: str
        :return: stream request
        :rtype: dict
        SCREENER_OPTIONr$   r   r   r   r   r   r
   screener_options  r   zStream.screener_optionsAccount Activity0,1,2,3rv   c                 C   r   )ap  
        Account activity
        :param keys: list of keys to use (e.g. ["Account Activity"])
        :type keys: list | str
        :param fields: list of fields to use (e.g. ["0,1,2,3"])
        :type fields: list | str
        :param command: command to use ("SUBS"|"UNSUBS")
        :type command: str
        :return: stream request
        :rtype: dict
        ACCT_ACTIVITYr$   r   r   r   r   r   r
   account_activity  r   zStream.account_activity)Tr   )r#   )r   r   rv   ) __name__
__module____qualname__r   r-   rM   boolrX   rn   dictr{   rs   r8   r   r   rA   r5   staticmethodr=   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r
   r      s2    >#&"$$$$$$$$$$$$r   )__doc__r+   r   rO   r.   rV   r3   rc   r   websockets.exceptionsr   r   r   r   r
   <module>   s    