
    tj                         d dl Z d dlZddlmZmZ g dZdZde j                  ddfdZdd	Z	d
 Z
ddZddZ G d d      Zd Zy)    N   )	is_dunder
is_private)
StateQueuecreate_state_queue_monitor_taskcreate_taskdecorate_taskhandle_task_resulttaskSTOPr   returnc                     	 | j                          y # t        j                  $ r Y y t        $ r t	        j
                  d|        Y y w xY w)NzException raised by task = %r)resultasyncioCancelledError	Exceptionlogging	exceptionr   s    L/DATA/.local/lib/python3.12/site-packages/trame_server/utils/asynchronous.pyr
   r
      sD    A!!  A94@As    A	A	A	c                 b    |t        j                         }t        |j                  |             S )a  
    Create a task from a coroutine while also attaching a done callback so any
    exception or error could be caught and reported.

    :param coroutine: A coroutine to execute as an independent task
    :param loop: Optionally provide the loop on which the task should be
                 scheduled on. By default we will use the current running loop.

    :return: The decorated task
    :rtype: asyncio.Task
    )r   get_event_loopr	   r   )	coroutineloops     r   r   r      s-     |%%')))455    c                 0    | j                  t               | S )a  
    Decorate a task by attaching a done callback so any exception or error could
    be caught and reported.

    :param task: A coroutine to execute as an independent task
    :type task: asyncio.Task

    :return: The same task object
    :rtype: asyncio.Task
    )add_done_callbackr
   r   s    r   r	   r	   -   s     	-.Kr   c                 L  K   d}|r|j                         rt        j                  |       d {    n\|j                         }t	        |t
              r|t        k(  r3d}n0| j                  5  | j                  j                  |       d d d        |ry y 7 f# 1 sw Y   xY ww)NTF)	emptyr   sleep
get_nowait
isinstancestr
QUEUE_EXITstateupdate)serverqueuedelay_monitor_queuemsgs        r   _queue_update_stater,   <   s|     N
;;=--&&&""$C#s#*$%*N\\LL'', " & "\s-   -B$B=B$-B	
B$B$B!B$c                 0    t        t        | ||            S )a  
    Create and schedule a task to watch over the provided queue
    to update a server state.
    This is especially useful when using a multiprocess executor
    and you want to report progress into your current server.

    :param server: A coroutine to execute as an independent task
    :type server: trame_server.core.Server

    :param queue: A queue instance meant to exchange state from
                  the parallel process to the given server
    :type queue: multiprocessing.Queue

    :param delay: Time to sleep in seconds before processing
                  the queue once emptied
    :type delay: float

    :return: The monitoring task
    :rtype: asyncio.Task
    )r)   )r   r,   )r'   r(   r)   s      r   r   r   K   s    * *65FGGr   c                   ^    e Zd ZdZddZed        Zd Zd Zd Z	d Z
d Zd	 Zd
 Zd Zd Zy)r   a  
    Class use to decorate a multiprocessing.Queue inside your external
    process to simulate your server state object.

    :param queue: A queue instance meant to exchange state from the parallel
                  process to the given server
    :type queue: multiprocessing.Queue

    :param auto_flush: Should you manage the state update phase or just
                       propagate as soon as you update a property
    :type auto_flush: Boolean
    c                 J    || _         i | _        i | _        || _        d| _        y )Nr   )_queue_pending_update_pushed_state_auto_flush
_ctx_count)selfr(   
auto_flushs      r   __init__zStateQueue.__init__q   s(    !%r   c                     | j                   S )z%Provide access to the decorated queue)r0   r5   s    r   r(   zStateQueue.queuex   s     {{r   c                 l    | j                   j                  || j                  j                  |            S N)r1   getr2   r5   keys     r   __getitem__zStateQueue.__getitem__}   s,    ##''T-?-?-C-CC-HIIr   c                 \    || j                   |<   | j                  r| j                          y y r;   )r1   r3   flushr5   r>   values      r   __setitem__zStateQueue.__setitem__   s)    $)S!JJL r   c                     t        |      rt        t        |      S t        |      r| j                  j                  |      S | j                  |      S r;   )r   getattrobjectr   __dict__r<   r?   r=   s     r   __getattr__zStateQueue.__getattr__   sC    S>63''c?==$$S))$$r   c                 ^    t        |      r|| j                  |<   y | j                  ||       y r;   )r   rH   rD   rB   s      r   __setattr__zStateQueue.__setattr__   s'    c?!&DMM#S%(r   c                 t    | j                   j                  |       | j                  r| j                          yy)z
        Update the distributed state from a set of key/value pair

        :param _dict: A dict containing one or many key/value pair
        :type _dict: dict
        N)r1   r&   r3   rA   )r5   _dicts     r   r&   zStateQueue.update   s/     	##E*JJL r   c                     t        | j                        rR| j                  j                  | j                         | j                  j                  | j                         i | _        yy)z.Explicitly push any local change to the queue.N)lenr1   r0   
put_nowaitr2   r&   r9   s    r   rA   zStateQueue.flush   sO    t##$KK""4#7#78%%d&:&:;#%D  %r   c                 B    | j                   j                  t               y)z8Release the monitoring task as we are done with our workN)r0   rP   r$   r9   s    r   exitzStateQueue.exit   s    z*r   c                 0    | xj                   dz  c_         | S )Nr   )r4   r9   s    r   	__enter__zStateQueue.__enter__   s    1r   c                     | xj                   dz  c_         | j                   dk(  r!| j                          | j                          y y )Nr   r   )r4   rA   rR   )r5   exc_type	exc_valueexc_tracebacks       r   __exit__zStateQueue.__exit__   s3    1??aJJLIIK  r   N)T)__name__
__module____qualname____doc__r7   propertyr(   r?   rD   rI   rK   r&   rA   rR   rT   rY    r   r   r   r   c   sN      J
%)	&+r   r   c                       fd}|S )z<Function decorator to make its async execution within a taskc                  (    t         | i |       y r;   )r   )argskwargsfuncs     r   wrapperztask.<locals>.wrapper   s    D$)&)*r   r_   )rd   re   s   ` r   r   r      s    + Nr   r;   )r   )r   r    r   r   __all__r$   Taskr
   r   r	   r,   r   r   r   r_   r   r   <module>ri      s]      # 
AW\\ Ad A6$-H0Q Qhr   