
    yjQ                     8   d dl Z d dlmZmZ d dlmZmZmZ d dlm	Z	 d dl
mZmZmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZmZ d dlmZ d dlmZmZm Z  d dl!m"Z"  e j#        e$          Z%de&e'e(ef                  ddfdZ)de	de&e'e(ef                  de(de(de&e'e(ef                  f
dZ*de	de(de(de'e(e&e'e(ef                  f         fdZ+	 d1de'e(ef         dede(dz  de&e(         de(de'e(ef         fdZ,de'e(ef         dede(de-de'e(ef         f
dZ.de(d e'e(e&e'e(ef                  f         de/fd!Z0de	de'e(ef         d e'e(e&e'e(ef                  f         de(dede&e'e(ef                  fd"Z1dddddd#de(d$e(de(d%ej2        de(dz  d&e(dz  d'e(dz  d(e-dz  d)e-dz  de'e(ef         fd*Z3	 	 	 	 	 d2de(d$e(de(d%ej2        de(dz  d&e(dz  d'e(dz  d(e-dz  d)e-dz  ddfd+Z4de(d,ed-         d.e(de'e(ef         fd/Z5	 d1de(d,ed-         d.e(de	dz  ddf
d0Z6dS )3    N)AnyLiteral)existsinsertselect)AsyncSession)crudmodelsschemas)settings)
tracked_db)get_dream_scheduler)ValidationException)	QueueItem)MessageConfigurationResolvedConfiguration)get_configuration)create_deletion_payloadcreate_dream_payloadcreate_payload)construct_work_unit_keypayloadreturnc                   K   t                      }|r| rt                      }| D ]a}|                    d          }|                    d          }|r3|r1|                    ||           d{V }|                    |           b|r+t
                              dt          |           d           t          d          4 d{V }	 | s	 ddd          d{V  dS | d         d         }| d         d         }||t          d	          t          || ||           d{V }	|	rbt          t                                        t                    }
|                    |
|	           d{V  |                                 d{V  n[# t           $ rN}t
                              d
           t$          j        j        rddl}|                    |           Y d}~nd}~ww xY wddd          d{V  dS # 1 d{V swxY w Y   dS )z~
    Add message(s) to the deriver queue for processing.

    Args:
        payload: List of message payload dictionaries
    workspace_name	peer_nameNz
Cancelled z# pending dreams due to new activitymessage_enqueuer   session_namez"Session and workspace are requiredzFailed to enqueue message(s)!)r   setgetcancel_dreams_for_observedupdateloggerinfolenr   r   handle_sessionr   r   	returningexecutecommit	Exception	exceptionr   SENTRYENABLED
sentry_sdkcapture_exception)r   dream_schedulercancelled_dreamsmessager   r   	cancelled
db_sessionr   queue_recordsstmter.   s                ;/DATA/AppData/hermes/projects/honcho/src/deriver/enqueue.pyenqueuer9      sZ      *++O 7 %(UU 	3 	3G$[[)9::NK00I 3) 3"1"L"L"I# #      	 !''	222 	KKWS!122WWW   +,, 0 0 0 0 0 0 0
	0 	0 0 0 0 0 0 0 0 0 0 0 0 0 0
 %QZ(89N"1:n5L#~'=)*NOOO"0G^\# #      M  *i((229== ((}========= ''))))))))) 	0 	0 	0<===& 0!!!!,,Q///	0)0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0sD   	G;F!B+FG;
G%AG G; G%%G;;
HHr4   r   r   c           
        K   t          j        | t          j        |          |           d{V j        }t          j        | |           d{V }t          d||          }t          | ||           d{V }g }|D ]^}	|	                    d          }
|
t          |
||          }n|}|	                    t          | |	||j        |           d{V            _|S )a[  
    Handle enqueueing for normal session cases, creating appropriate queue items based on configurations.

    Args:
        db_session: The database session
        payload: List of message payloads
        workspace_name: Name of the workspace
        session_name: Name of the session

    Returns:
        List of queue records to insert
    )name)sessionr   N)r   configuration)r	   get_or_create_sessionr   SessionCreateresourceget_workspacer   get_peers_with_configurationr    extendgenerate_queue_recordsid)r4   r   r   r   r<   	workspacesession_level_configurationpeers_with_configurationr5   r2   message_configmessage_level_configurations               r8   r&   r&   Q   s     & ()|<<<)
 
 
 	
 	
 	
 	
 	
 	

   (NSSSSSSSSSI #4D'9"M"M%ANL& &             +-M 
 
6=kk/6R6R%*;+ +'' +F'((
+       	
 	
 	
 	
     c                    K   t          j        ||           d{V }|                     |           d{V }|                                }d |D             S )a  
    Retrieve peers with their configurations for a given session.

    Args:
        db_session: The database session
        workspace_name: Name of the workspace
        session_name: Name of the session

    Returns:
        Dictionary mapping peer names to their configurations
    )r   r   Nc                 B    i | ]}|j         |j        |j        |j        gS  )r   peer_configurationsession_peer_configuration	is_active).0rows     r8   
<dictcomp>z0get_peers_with_configuration.<locals>.<dictcomp>   sB         	"*M
  rK   )r	   get_session_peer_configurationr(   all)r4   r   r   configuration_querypeers_with_configuration_resultpeers_with_configuration_lists         r8   rB   rB      s       !% C%L! ! !       -7,>,>?R,S,S&S&S&S&S&S&S#$C$G$G$I$I!  1   rK   r2   conf
session_id	observersobservedc                <   |                      d          }|                      d          }t          |t                    st          d          t          |t                    st          d          t          | |d||          }t          ||          ||d||dS )a  
    Create a queue record for representation task.

    Args:
        message: The message payload
        conf: Resolved configuration for this particular message
        session_id: Optional session ID
        observers: List of observer peer names
        observed: Name of the sender

    Returns:
        Queue record dictionary with workspace_name and message_id as separate fields
    r   
message_id/workspace_name is required and must be a string-message_id is required and must be an integerrepresentation)r2   r=   	task_typer\   r]   work_unit_keyr   r[   rc   r   r_   )r    
isinstancestr	TypeErrorintr   r   )r2   rZ   r[   r\   r]   r   r_   processed_payloads           r8   create_representation_recordrk      s    * [[!122N\**Jnc** KIJJJj#&& IGHHH(6") ) ) 1ARSS$ %(   rK   r=   message_seq_in_sessionc                 :   |                      d          }|                      d          }t          |t                    st          d          t          |t                    st          d          t          | |d|          }t          ||          ||d||dS )a6  
    Create a queue record for summary task.

    Args:
        message: The message payload
        session_id: Session ID
        message_seq_in_session: The sequence number of the message in the session

    Returns:
        Queue record dictionary with workspace_name and message_id as separate fields
    r   r_   r`   ra   summary)r2   r=   rc   rl   rd   )r    rf   rg   
ValueErrorri   r   r   )r2   r=   r[   rl   r   r_   rj   s          r8   create_summary_recordrp      s    " [[!122N\**Jnc** LJKKKj#&& JHIII&#5	   1ARSS$ (   rK   rH   c                    |                     | i i g          }|d         rt          j        di |d         nd}|d         rt          j        di |d         nt          j                    }|r|j        |j        S |j        |j        ndS )a(  
    Determine the effective observe_me setting for a sender, considering session and peer configurations.

    Args:
        observed: Name of the sender
        peers_with_configuration: Dictionary of peer configurations

    Returns:
        True if observe_me is enabled, False otherwise
       Nr   TrN   )r    r   SessionPeerConfig
PeerConfig
observe_me)r]   rH   r=   sender_session_peer_configsender_peer_configs        r8   get_effective_observe_merx      s       8;;Hr2hOOM9Fq9IS!55M!$4555t 
 	"..]1-...!!  " 5&@&K&W)44
 (4 	%%rK   c           	        K   |d         }|d         }t          |                    d          pd          }|dk    r*t          j        | |d         |d         |           d{V }g }|j        j        rL||j        j        z  dk    s||j        j        z  dk    r&|                    t          ||||	                     t          ||          }	|j        j        s|S g }
|	r|
                    |           |                                D ]U\  }}||k    r|d
         s|d         rt          j        di |d         nd}||j        s@|
                    |           V|
r'|                    t!          ||||
|                     t"                              d||t'          |          t'          |
                     |S )a  
    Process a single message and generate queue records based on configurations.

    Args:
        db_session: The database session
        message: The message payload
        peers_with_configuration: Dictionary of peer configurations
        session_id: Session ID
        configuration: Resolved configuration for this particular message

    Returns:
        List of queue records for this message
    r   r_   seq_in_sessionr   r   r   )r   r   r_   N)r=   r[   rl      rr   )r]   r\   r[   z;message %s from %s created %s queue items with %s observersrN   )ri   r    r	   get_message_seq_in_sessionrn   enabledmessages_per_short_summarymessages_per_long_summaryappendrp   rx   	reasoningitemsr   rs   observe_othersrk   r#   debugr%   )r4   r2   rH   r[   rZ   r]   r_   rl   recordsshould_observer\   r   	peer_confsession_peer_configs                 r8   rD   rD   %  sd     ( {#Hl+J !-=!>!>!C!DD""'+'F"#34 0!	(
 (
 (
 "
 "
 "
 "
 "
 "
 %'G| 
!HHAMM!DL$JJaOO!"%'=	  	
 	
 	
 .h8PQQN>!  I (""" %=$B$B$D$D 	( 	( IyH$$ Q<  >Gq\S)99IaL999t   #*2E2T*Y''''  	
(!#%  	
 	
 	
 LLEGI   NrK   )r   trigger_reasondelay_reason&documents_since_last_dream_at_scheduledocument_thresholdobserver
dream_typer   r   r   r   c          
      ^    t          ||||||||          }	t          | |	          |	dd| ddS )a  
    Create a queue record for a dream task.

    Args:
        workspace_name: Name of the workspace
        observer: Name of the observer peer
        observed: Name of the observed peer
        dream_type: Type of dream to execute
        session_name: Name of the session to scope the dream to if specified
        trigger_reason: what tripped the schedule
        delay_reason: what governed when it fires
        documents_since_last_dream_at_schedule: count snapshot at schedule time
        document_threshold: DOCUMENT_THRESHOLD snapshot at schedule time

    Returns:
        Queue record dictionary with workspace_name and other fields
    )r   r]   r   r   r   r   r   Ndreamrd   )r   r   )
r   r   r]   r   r   r   r   r   r   dream_payloads
             r8   create_dream_recordr     s]    : )!%!/U-	 	 	M 1OO (  rK   c	                 P  K   t          d          4 d{V }		 t          | ||||||||	  	        }
|
d         }t          t          t          t          j        j                                      t          j        j        |k                                  }|		                    |           d{V }|r7t                              d| |||j                   	 ddd          d{V  dS t          t          t          t          j                                      t          j        |k    t          j        dk                                  }|		                    |           d{V }|r7t                              d| |||j                   	 ddd          d{V  dS t          t                                        t                    }|	                    ||
g           d{V  |	                                 d{V  t                              d| |||j                   nW# t&          $ rJ}t                              d	           t*          j        j        rd
dl}|                    |            d}~ww xY w	 ddd          d{V  dS # 1 d{V swxY w Y   dS )u  
    Enqueue a dream task for immediate processing by the deriver.

    Does not touch collection.internal_metadata["dream"] — both guard fields
    are written atomically in process_dream on successful completion.

    Deduplication: If a dream with the same work_unit_key is already in-progress
    (has an ActiveQueueSession) or pending in the queue, the enqueue is skipped.

    Args:
        workspace_name: Name of the workspace
        observer: Name of the observer peer
        observed: Name of the observed peer
        dream_type: Type of dream to execute
        session_name: Name of the session to scope the dream to if specified
    dream_enqueueN)r   r]   r   r   r   r   r   r   re   zASkipping dream enqueue - already in progress: %s/%s/%s (type: %s)Fz3Dream already pending in queue: %s/%s/%s (type: %s)z+Enqueued dream task for %s/%s/%s (type: %s)zFailed to enqueue dream task!r   )r   r   r   r   r
   ActiveQueueSessionrE   wherere   scalarr#   r$   valuer   	processedr   r'   r(   r)   r*   r+   r   r,   r-   r.   r/   )r   r   r]   r   r   r   r   r   r   r4   dream_recordre   in_progress_checkis_in_progresspending_check
is_pendingr6   r7   r.   s                      r8   enqueue_dreamr     s=     6 /** I I I I I I IjH	.!!%)-)7]#5
 
 
L )9M &64788>>1?=P  ! ! $.#4#45F#G#GGGGGGGN W"$   CI I I I I I I I I I I I I IF #9<((..!/=@!+u4   M  *00????????J I"$   kI I I I I I I I I I I I I In )$$..y99D$$TL>:::::::::##%%%%%%%%%KK=      	 	 	<===& 0!!!!,,Q///	wI I I I I I I I I I I I I I I I I I I I I I I I I I I I I IsD   JB>H*,B#H*#BH*)J*
I>4AI99I>>J
J"Jdeletion_type)r<   observationrF   resource_idc                 R    t          ||          }t          | |          |dd| ddS )a9  
    Create a queue record for a deletion task.

    Args:
        workspace_name: Name of the workspace
        deletion_type: Type of resource to delete ("session" or "observation")
        resource_id: ID of the resource to delete

    Returns:
        Queue record dictionary for insertion into the queue
    )r   r   Ndeletionrd   )r   r   )r   r   r   deletion_payloads       r8   create_deletion_recordr   $  sM      /#   1AQRR#(  rK   c                    K   dt           dt          ddf fd}	 | ||d           d{V  dS t          d          4 d{V } ||d	           d{V  ddd          d{V  dS # 1 d{V swxY w Y   dS # t          $ rJ}t                              d
           t          j        j        rddl	}|
                    |            d}~ww xY w)a:  
    Enqueue a deletion task for processing by the deriver.

    This function adds a deletion task to the queue for asynchronous processing.
    The deletion will be handled by the queue consumer with retry support.

    Args:
        workspace_name: Name of the workspace
        deletion_type: Type of resource to delete ("session" or "observation")
        resource_id: ID of the resource to delete
        db_session: Optional database session. If provided, uses this session
            instead of creating a new one. The caller is responsible for committing.
    r<   should_commitr   Nc                 2  K   t                    }t          t                                        t                    }|                     ||g           d {V  |r|                                  d {V  t                              d           d S )Nz=Enqueued deletion task: type=%s, resource_id=%s, workspace=%s)r   r   r   r'   r(   r)   r#   r$   )r<   r   deletion_recordr6   r   r   r   s       r8   _do_enqueuez%enqueue_deletion.<locals>._do_enqueueW  s      0
 
 i  **955ood_$5666666666 	#.."""""""""K		
 	
 	
 	
 	
rK   F)r   deletion_enqueueTz Failed to enqueue deletion task!r   )r   boolr   r*   r#   r+   r   r,   r-   r.   r/   )r   r   r   r4   r   new_sessionr7   r.   s   ```     r8   enqueue_deletionr   C  s     (
< 
 
 
 
 
 
 
 
 
 
(!+j>>>>>>>>>>>> ""455 C C C C C C C!k+TBBBBBBBBBBC C C C C C C C C C C C C C C C C C C C C C C C C C C C C C    ;<<<?" 	,((+++sA   B B A2B 2
A<<B ?A< B 
CACC)N)NNNNN)7loggingtypingr   r   
sqlalchemyr   r   r   sqlalchemy.ext.asyncior   srcr	   r
   r   
src.configr   src.dependenciesr   src.dreamer.dream_schedulerr   src.exceptionsr   
src.modelsr   src.schemasr   r   src.utils.config_helpersr   src.utils.queue_payloadr   r   r   src.utils.work_unitr   	getLogger__name__r#   listdictrg   r9   r&   rB   rk   ri   rp   r   rx   rD   	DreamTyper   r   r   r   rN   rK   r8   <module>r      s            - - - - - - - - - - / / / / / / % % % % % % % % % %       ' ' ' ' ' ' ; ; ; ; ; ; . . . . . .             C C C C C C C C 6 6 6 6 6 6         
 8 7 7 7 7 7		8	$	$504S#X/ 50D 50 50 50 50p77$sCx.!7 7 	7
 
$sCx.7 7 7 7t.1AD	#tDcN#
#$   @ "+ +#s(^+
+ d
+
 Cy+ + 
#s(^+ + + +\&#s(^&(& &  	&
 
#s(^& & & &R##-1#tDcN7K2K-L#	# # # #Lcc#s(^c #3T#s(^(<#<=c 	c
  c 
$sCx.c c c cX  $!%#9=%)/ / // / 	/
 !/ */ $J/ */ -0$J/ d
/ 
#s(^/ / / /n  $!%#9=%)d ddd d !	d
 *d $Jd *d -0$Jd d
d 
d d d dN@A  
#s(^	   F '+	7 77@A7 7 t#	7
 
7 7 7 7 7 7rK   