
    yj1<                        U d dl Z d dlZd dlmZmZ d dlmZ d dlZd dlmZm	Z	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZmZ  ee          Zdaded<   ddZddZ G d d          Zdedej        de fdZ!dS )    N)datetimetimezone)	getLogger)existsfuncselect)AsyncSession)models)settings)
tracked_db)	DreamType)construct_work_unit_keyparse_work_unit_keyDreamScheduler | None_dream_schedulerdream_schedulerDreamSchedulerreturnc                 
    | a dS )z)Set the global dream scheduler reference.Nr   )r   s    C/DATA/AppData/hermes/projects/honcho/src/dreamer/dream_scheduler.pyset_dream_schedulerr      s     '    c                      t           S )z)Get the global dream scheduler reference.r    r   r   get_dream_schedulerr      s    r   c                       e Zd ZU dZded<   dZeed<    fdZd Ze	dd	            Z
ddddd
dedededededededz  dedz  dedz  dedz  ddfdZdedefdZdededee         fdZddddd
dedededededededz  dedz  dedz  dedz  ddfdZddddd
dedededededz  dedz  dedz  dedz  ddfdZddZ xZS )r   Nr   	_instanceF_initializedc                 l    | j         &t                                          |           | _         | j         S N)r   super__new__)cls	__class__s    r   r#   zDreamScheduler.__new__%   s*    = !GGOOC00CM}r   c                 H    t           j        si | _        dt           _        d S d S )NT)r   r   pending_dreams)selfs    r   __init__zDreamScheduler.__init__*   s-    * 	/ACD*.N'''	/ 	/r   r   c                 "    d| _         d| _        dS )z5Reset the singleton instance. Only use this in tests.NF)r   r   )r$   s    r   reset_singletonzDreamScheduler.reset_singleton0   s      r   )trigger_reasondelay_reason&documents_since_last_dream_at_scheduledocument_thresholdwork_unit_keyworkspace_namedelay_minutes
dream_typeobserverobservedr,   r-   r.   r/   c                   K   t           j        j        sdS                                 d{V  t	          j                             ||||||||	|

  
                  }| j        <   |                     fd           dS )zSchedule a dream for a collection after a delay.

        telemetry kwargs are captured at schedule time and threaded
        through the queue payload so DreamRunEvent can attribute the dream
        back to its scheduling context.
        Nr4   r5   r,   r-   r.   r/   c                 :    j                             d           S r!   )r'   pop)tr(   r0   s    r   <lambda>z/DreamScheduler.schedule_dream.<locals>.<lambda>_   s    )<)@)@PT)U)U r   )	r   DREAMENABLEDcancel_dreamasynciocreate_task_delayed_dreamr'   add_done_callback)r(   r0   r1   r2   r3   r4   r5   r,   r-   r.   r/   tasks   ``          r   schedule_dreamzDreamScheduler.schedule_dream6   s      ( ~% 	F ........."!!-)7]#5    
 
 .2M*UUUUUVVVVVr   c                    K   || j         v rn| j                             |          }|                                 t          j        t
          j                  5  | d{V  ddd           n# 1 swxY w Y   dS dS )z>Cancel a pending dream. Returns True if a dream was cancelled.NTF)r'   r9   cancel
contextlibsuppressr?   CancelledError)r(   r0   rC   s      r   r>   zDreamScheduler.cancel_dreama   s      D///&**=99DKKMMM$W%;<<  






              4us   	A--A14A1c                   K   t                      }g }| j        D ]<}t          |          }|j        |k    r |j        |k    r|                    |           =|D ]2}|                     |           d{V r|                    |           3|S )a  
        Cancel all pending dreams where the observed peer matches.

        This handles both self-observation (observer=observed) and peer-to-peer
        observation (observer!=observed) dreams.

        Args:
            workspace_name: The workspace to match
            observed: The observed peer name to match

        Returns:
            Set of work_unit_keys that were cancelled
        N)setr'   r   r1   r5   appendr>   add)r(   r1   r5   	cancelledkeys_to_cancelr0   parsedkeys           r   cancel_dreams_for_observedz)DreamScheduler.cancel_dreams_for_observedl   s        "ee	 %'!0 	5 	5M(77F$666?h;V;V%%m444 " 	# 	#C&&s++++++++ #c"""r   c          
        K   	 t          j        |dz             d {V  |                     |||||||	|
           d {V  t                              d|           d S # t           j        $ r t                              d|           Y d S t          $ rR}t                              d||           t          j
        j        rt          j        |           Y d }~d S Y d }~d S d }~ww xY w)N<   r7   zExecuted dream for %szDream task cancelled for %sz!Error in delayed dream for %s: %s)r?   sleepexecute_dreamloggerinforI   debug	Exceptionerrorr   SENTRYr=   
sentry_sdkcapture_exception)r(   r0   r1   r2   r3   r4   r5   r,   r-   r.   r/   es               r   rA   zDreamScheduler._delayed_dream   s\     	0- 2333333333$$!!-)7]#5 % 	 	 	 	 	 	 	 	 	 KK/?????% 	G 	G 	GLL6FFFFFF 	0 	0 	0LL<mQOOO& 0,Q/////////0 0 0 0 0 0	0s   AA! !*C)	C)AC$$C)c                   K   ddl m}	 ddlm}
 ddlm} t          d          4 d{V }t          t          j	        j
                                      t          j	        j        |k    t          j	        j        |k    t          j	        j        |k    t          j	        j        dk                                  t          j	        j                                                                      d          }|                    |           d{V }|s8t*                              d	| d
| d
| d           	 ddd          d{V  dS |	                    |||           d{V }|	                    ||           d{V } |d||          }|j        j        s5t*                              d| d
| d           	 ddd          d{V  dS 	 ddd          d{V  n# 1 d{V swxY w Y    |
|||||||||	  	         d{V  dS )z#Execute the dream by enqueueing it.r   )crud)enqueue_dream)get_configurationdream_session_lookupNexplicit   zNo documents found for /z, skipping dream)r1   session_name)r1   zDreams disabled for )r4   r5   r3   rh   r,   r-   r.   r/   )srcra   src.deriver.enqueuerb   src.utils.config_helpersrc   r   r   r
   Documentrh   wherer1   r4   r5   levelorder_by
created_atdesclimitscalarrW   warningget_sessionget_workspacedreamenabledrX   )r(   r1   r3   r4   r5   r,   r-   r.   r/   ra   rb   rc   dbstmtrh   session	workspaceconfigurations                     r   rV   zDreamScheduler.execute_dream   s      	555555>>>>>>455 	 	 	 	 	 	 	v344O2nDO,8O,8O)Z7	  &/499;;<<q  "$4000000L dnddxdd(ddd   %	 	 	 	 	 	 	 	 	 	 	 	 	 	( !,,> -        G #00N0SSSSSSSSI--dGYGGM &. Z>ZZLZZZ   ?	 	 	 	 	 	 	 	 	 	 	 	 	 	67	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	B m!%)%3Y1

 

 

 
	
 
	
 
	
 
	
 
	
 
	
 
	
 
	
 
	
s   D
G#A6G##
G-0G-c                 j  K   | j         rt                              dt          | j                    d           | j                                         D ]}|                                 t          j        | j                                         ddi d{V  | j                                          dS dS )z*Cancel all pending dreams during shutdown.zCancelling z pending dreams...return_exceptionsTN)	r'   rW   rX   lenvaluesrF   r?   gatherclear)r(   rC   s     r   shutdownzDreamScheduler.shutdown   s       	(KKRc$*=&>&>RRRSSS+2244  .$"5"<"<">">WRVWWWWWWWWW%%'''''	( 	(r   )r   N)__name__
__module____qualname__r   __annotations__r   boolr#   r)   classmethodr+   strintr   rD   r>   rK   rR   rA   rV   r   __classcell__)r%   s   @r   r   r   !   s        )-I&---L$    
/ / / ! ! ! [! &*#'=A)-)W )W )W)W )W 	)W
 )W )W )W d
)W Dj)W 14d
)W  $J)W 
)W )W )W )WV	 	 	 	 	 	!-0	S   R &*#'=A)-"0 "0 "0"0 "0 	"0
 "0 "0 "0 d
"0 Dj"0 14d
"0  $J"0 
"0 "0 "0 "0V &*#'=A)-<
 <
 <
<
 <

 <
 <
 d
<
 Dj<
 14d
<
  $J<
 
<
 <
 <
 <
|( ( ( ( ( ( ( (r   ry   
collectionc                   K   t           j        j        sdS j                            di           }|                    dd          }|                    d          }t          t          j        t          j	        j
                                                t          j	        j        j        k    t          j	        j        j        k    t          j	        j        j        k    t          j	        j        dk              }t!          |                     |           d{V pd          }||z
  }t$                              dj        j        j        |||t           j        j        d	
           |t           j        j        k    rd}t           j        j        dk    rdnd}	|r	 t-          j        |          }
t-          j        t2          j                  |
z
                                  dz  }|t           j        j        k     rGt$                              dj         dj         d|dddt           j        j         dz              dS n># t<          t>          f$ r*}t$                               d| d|            Y d}~nd}~ww xY wt           j        j!        }fd|D             }|                     t          tE          t          t          j#        j
                                      t          j#        j$        dk    t          j#        j%        dk    t          j#        j&        '                    |                                                   d{V }|r(t$                              dj        j                   dS tQ                      }|r|D ]}tS          j        dj        j        |d          }|*                    |j        t           j        j        tW          |          j        j        ||	|t           j        j        
  
         d{V  t$                              dj        j        j        |t           j        j        |d
           dS dS )u`  
    From the moment a dream is scheduled until it completes or fails, no second
    dream may be enqueued for the same (workspace, observer, observed) — and the
    baseline count advances only when consolidation actually happened.

    Check if a collection has reached the explicit-observation threshold and schedule a timer-based dream.

    This function only schedules a timer-based dream if:
    1. Dreams are enabled
    2. Explicit-observation threshold is reached (dreamer output does not count)
    3. Minimum hours between dreams have passed
    4. No dream is already pending in the queue for this collection (in-flight check)
    5. No dream is already scheduled for this collection

    Args:
        db: Database session
        collection: Collection model to check

    Returns:
        True if a dream timer was scheduled, False otherwise
    Frw   last_dream_document_countr   last_dream_atre   NzDream check)r1   r4   r5   current_explicit_countr   documents_since_last_dreamr/   )extrar/   idle_timeout	immediatei  zSkipping dream for rg   z: only z.1fz hours zsince last dream (minimum: )z!Invalid last_dream_at timestamp: z	, error: c           
      X    g | ]&}t          j        d j        j        |d          'S )rw   	task_typer4   r5   r3   )r   r1   r4   r5   ).0r3   r   s     r   
<listcomp>z,check_and_schedule_dream.<locals>.<listcomp>R  sW     
 
 
  $)!( * 3 * 3",	  
 
 
r   zASkipping dream schedule for %s/%s: pending dream already in queuer   )r3   r4   r5   r,   r-   r.   r/   zScheduled dream)r1   r4   r5   r   r/   r3   T),r   r<   r=   internal_metadatagetr   r   countr
   rl   idrm   r1   r4   r5   rn   r   rs   rW   rY   DOCUMENT_THRESHOLDIDLE_TIMEOUT_MINUTESr   fromisoformatnowr   utctotal_secondsMIN_HOURS_BETWEEN_DREAMSrX   
ValueError	TypeErrorrt   ENABLED_TYPESr   	QueueItemr   	processedr0   in_r   r   rD   r   )ry   r   dream_metadatar   r   
count_stmtr   r   r,   r-   last_dream_timehours_since_last_dreamr_   enabled_dream_typespending_keyspending_existsr   r3   dream_work_unit_keys    `                 r   check_and_schedule_dreamr      s     2 >! u155grBBN . 2 23NPQ R R"&&77M 
6?#56677==&**CC J$77 J$77+	 J !ryy'<'<!<!<!<!<!<!<!ABB!7:S!S
LL(7"+"+&<)B*D"*."C
 
     "X^%FFF
 .&nAAEENN; 	  	"*"8"G"GL..@-//D*)& *HN,SSSKK Dj.A  D  DJDW  D  D`v  D  D  D  Db8_bbbc   !5 T 	*   SSSPQSS        'n:
 
 
 
 2
 
 
  "yy6+.//55(2g=(2e;(6::<HH   
 
 
 
 

 

 

 

 

 

  	KKS##  
 5-// "	1    
&=-%,$.$7$.$7&0	 ' '# &44'-N7(44'0'0#1!-;U'/~'H 5          %*4*C$.$7$.$76P.6n.O&0   
 
 
 
 45s   #B$I
 
J J  J)r   r   r   N)r   r   )"r?   rG   r   r   loggingr   r]   
sqlalchemyr   r   r   sqlalchemy.ext.asyncior	   ri   r
   
src.configr   src.dependenciesr   src.schemasr   src.utils.work_unitr   r   r   rW   r   r   r   r   r   
Collectionr   r   r   r   r   <module>r      s        ' ' ' ' ' ' ' '           + + + + + + + + + + / / / / / /             ' ' ' ' ' ' ! ! ! ! ! ! L L L L L L L L	8		 -1 ) 0 0 0' ' ' '   
T( T( T( T( T( T( T( T(n^^!^ 
^ ^ ^ ^ ^ ^r   