
    +j                         d dl Z d dlZddlmZmZ g dZdZde j        ddfdZdd	Z	d
 Z
ddZddZ G d d          Zd ZdS )    N   )	is_dunder
is_private)
StateQueuecreate_state_queue_monitor_taskcreate_taskdecorate_taskhandle_task_resulttaskSTOPr   returnc                     	 |                                   d S # t          j        $ r Y d S t          $ r t	          j        d|            Y d S w xY w)NzException raised by task = %r)resultasyncioCancelledError	Exceptionlogging	exceptionr   s    Y/DATA/AppData/hermes/venv/lib/python3.11/site-packages/trame_server/utils/asynchronous.pyr
   r
      sq    A!    A A A94@@@@@@As    AAAc                 p    |t          j                    }t          |                    |                     S )a  
    Create a task from a coroutine while also attaching a done callback so any
    exception or error could be caught and reported.

    :param coroutine: A coroutine to execute as an independent task
    :param loop: Optionally provide the loop on which the task should be
                 scheduled on. By default we will use the current running loop.

    :return: The decorated task
    :rtype: asyncio.Task
    )r   get_event_loopr	   r   )	coroutineloops     r   r   r      s3     |%'')))44555    c                 :    |                      t                     | S )a  
    Decorate a task by attaching a done callback so any exception or error could
    be caught and reported.

    :param task: A coroutine to execute as an independent task
    :type task: asyncio.Task

    :return: The same task object
    :rtype: asyncio.Task
    )add_done_callbackr
   r   s    r   r	   r	   -   s     	-...Kr   c                 X  K   d}|r|                                 rt          j        |           d {V  np|                                }t	          |t
                    r|t          k    rd}n9| j        5  | j                            |           d d d            n# 1 swxY w Y   |d S d S )NTF)	emptyr   sleep
get_nowait
isinstancestr
QUEUE_EXITstateupdate)serverqueuedelay_monitor_queuemsgs        r   _queue_update_stater,   <   s     N
 
-;;== 		--&&&&&&&&&&""$$C#s## -*$$%*N\ - -L'',,,- - - - - - - - - - - - - - -  
- 
- 
- 
- 
-s   4BB"Bc                 @    t          t          | ||                    S )a  
    Create and schedule a task to watch over the provided queue
    to update a server state.
    This is especially useful when using a multiprocess executor
    and you want to report progress into your current server.

    :param server: A coroutine to execute as an independent task
    :type server: trame_server.core.Server

    :param queue: A queue instance meant to exchange state from
                  the parallel process to the given server
    :type queue: multiprocessing.Queue

    :param delay: Time to sleep in seconds before processing
                  the queue once emptied
    :type delay: float

    :return: The monitoring task
    :rtype: asyncio.Task
    )r)   )r   r,   )r'   r(   r)   s      r   r   r   K   s"    * *65FFFGGGr   c                   f    e Zd ZdZddZed             Zd Zd Zd Z	d Z
d	 Zd
 Zd Zd Zd ZdS )r   a  
    Class use to decorate a multiprocessing.Queue inside your external
    process to simulate your server state object.

    :param queue: A queue instance meant to exchange state from the parallel
                  process to the given server
    :type queue: multiprocessing.Queue

    :param auto_flush: Should you manage the state update phase or just
                       propagate as soon as you update a property
    :type auto_flush: Boolean
    Tc                 L    || _         i | _        i | _        || _        d| _        d S )Nr   )_queue_pending_update_pushed_state_auto_flush
_ctx_count)selfr(   
auto_flushs      r   __init__zStateQueue.__init__q   s,    !%r   c                     | j         S )z%Provide access to the decorated queue)r0   r5   s    r   r(   zStateQueue.queuex   s     {r   c                 h    | j                             || j                            |                    S N)r1   getr2   r5   keys     r   __getitem__zStateQueue.__getitem__}   s,    #''T-?-C-CC-H-HIIIr   c                 T    || j         |<   | j        r|                                  d S d S r;   )r1   r3   flushr5   r>   values      r   __setitem__zStateQueue.__setitem__   s5    $)S! 	JJLLLLL	 	r   c                     t          |          rt          t          |          S t          |          r| j                            |          S |                     |          S r;   )r   getattrobjectr   __dict__r<   r?   r=   s     r   __getattr__zStateQueue.__getattr__   sX    S>> 	(63'''c?? 	*=$$S)))$$$r   c                 h    t          |          r|| j        |<   d S |                     ||           d S r;   )r   rH   rD   rB   s      r   __setattr__zStateQueue.__setattr__   s>    c?? 	)!&DM#S%(((((r   c                 t    | j                             |           | j        r|                                  dS dS )z
        Update the distributed state from a set of key/value pair

        :param _dict: A dict containing one or many key/value pair
        :type _dict: dict
        N)r1   r&   r3   rA   )r5   _dicts     r   r&   zStateQueue.update   sA     	##E*** 	JJLLLLL	 	r   c                     t          | j                  rG| j                            | j                   | j                            | j                   i | _        dS dS )z.Explicitly push any local change to the queue.N)lenr1   r0   
put_nowaitr2   r&   r9   s    r   rA   zStateQueue.flush   s`    t#$$ 	&K""4#7888%%d&:;;;#%D   	& 	&r   c                 D    | j                             t                     dS )z8Release the monitoring task as we are done with our workN)r0   rP   r$   r9   s    r   exitzStateQueue.exit   s    z*****r   c                 &    | xj         dz  c_         | S )Nr   )r4   r9   s    r   	__enter__zStateQueue.__enter__   s    1r   c                     | xj         dz  c_         | j         dk    r*|                                  |                                  d S d S )Nr   r   )r4   rA   rR   )r5   exc_type	exc_valueexc_tracebacks       r   __exit__zStateQueue.__exit__   sE    1?aJJLLLIIKKKKK  r   N)T)__name__
__module____qualname____doc__r7   propertyr(   r?   rD   rI   rK   r&   rA   rR   rT   rY    r   r   r   r   c   s               XJ J J  
% % %) ) )	 	 	& & &+ + +      r   r   c                       fd}|S )z<Function decorator to make its async execution within a taskc                  2    t           | i |           d S r;   )r   )argskwargsfuncs     r   wrapperztask.<locals>.wrapper   s&    DD$)&))*****r   r_   )rd   re   s   ` r   r   r      s#    + + + + + Nr   r;   )r   )r   r    r   r   __all__r$   Taskr
   r   r	   r,   r   r   r   r_   r   r   <module>ri      s
     # # # # # # # #   
AW\ Ad A A A A6 6 6 6$  - - - -H H H H0Q Q Q Q Q Q Q Qh    r   