
    yjB                        d dl Z d dlZd dl mZ d dlmZ d dlmZmZ d dlmZm	Z	m
Z
 d dlmZ d dlmZmZmZ d dlZd dlmZ d d	lmZ d d
lmZ d dlmZmZmZmZmZ d dlm Z  d dl!m"Z" d dl#m$Z$ d dl%m&Z& d dl'm(Z( d dl)m*Z*m+Z+ d dl,m-Z- d dl.m/Z/ d dl0m1Z1m2Z2 d dl3m4Z4m5Z5m6Z6 d dl7m8Z8 d dl9m:Z:m;Z;m<Z< d dl=m>Z> d dl?m@Z@ d dlAmBZB d dlCmDZD d dlEmFZFmGZG  eeH          ZI ed            G d d e          ZJ ed!           G d" d#                      ZKd$e$d%eLe(jM                 d&eLe8         d'dfd(ZNd&eLe8         d'eOeLe8         e>dz  f         fd)ZP G d* d+          ZQd, ZRdS )-    N)Task)Sequence)	dataclassfield)datetime	timedeltatimezone)	getLogger)Any
NamedTuplecast)load_dotenv)generate)AsyncioIntegration)and_deleteor_selectupdate)insert)CursorResult)AsyncSession)func)models)close_cache
init_cache)settings)
tracked_db)process_itemprocess_representation_batch)DreamSchedulerget_dream_schedulerset_dream_scheduler)	QueueItem)ReconcilerSchedulerget_reconciler_schedulerset_reconciler_scheduler)ResolvedConfiguration)prometheus_metrics)initialize_sentry)parse_work_unit_key)QueueEmptyEventpublish_webhook_eventT)overridec                   (    e Zd ZU dZeed<   eed<   dS )WorkerOwnershipzCRepresents the instance of a work unit that a worker is processing.work_unit_keyaqs_idN)__name__
__module____qualname____doc__str__annotations__     A/DATA/AppData/hermes/projects/honcho/src/deriver/queue_manager.pyr0   r0   5   s+         MMKKKKKr:   r0   )frozenc                       e Zd ZU dZ ee          Zeej                 e	d<    ee          Z
ed         e	d<   dZedz  e	d<   dZee	d	<   dZee	d
<   dZee	d<   dS )QueueBatchResultau  Result of `QueueManager.get_queue_item_batch`.

    telemetry needs to know two things in addition to the batch
    contents: whether the cumulative-token cap clamped the batch, and what
    the configured cap was. These flags feed `RepresentationCompletedEvent`
    so analytics can detect "we under-batched because of a flush" vs
    "we hit the cap and kept going".
    )default_factorymessages_contextr$   items_to_processNconfigurationFhit_batch_token_capwas_flush_enabledr   batch_max_tokens)r3   r4   r5   r6   r   listr@   r   Messager8   rA   rB   r(   rC   boolrD   rE   intr9   r:   r;   r>   r>   <   s           .3U4-H-H-Hd6>*HHH*/%*E*E*Ed;'EEE26M(4/666 %%%%#t###cr:   r>   dbr@   rA   returnc                     t                      }g ||D ]@}t          |          }||v r|                     |           |                    |           AdS )zIDetach loaded batch objects so they remain usable after tracked_db exits.N)setidexpungeadd)rJ   r@   rA   seenobjobj_ids         r;   _detach_queue_batch_objectsrT   O   sm     UUD5!5$45  CT>>


3 r:   c                 2   | sg dfS | d         j                             d          }|dnt          j        |          }g }| D ]Q}|j                             d          }|dnt          j        |          }||k    r n|                    |           R||fS )zCKeep only the initial homogeneous configuration prefix for a batch.Nr   rB   )payloadgetr(   model_validateappend)rA   
raw_configresolved_configvalid_itemsitemitem_raw_configitem_configs          r;   _resolve_batch_configurationr`   ^   s      4x!!$,00AAJ"(=(LZ(X(X  $&K  	! 	!,**?;; & D&5oFF 	
 /))E4    ''r:   c            
          e Zd Zd Zdej        d         ddfdZdedededdfd	Zdededdfd
Z	defdZ
defdZd$dZdej        ddfdZd$dZd$dZdeeef         fdZdedee         deeef         fdZd$dZdedee         dededdf
dZdededdfdZej        dededededz  fd            Z ej        dedededdfd            Z!dee         deddfd Z"d!edededdfd"Z#dedede$fd#Z%dS )%QueueManagerc                 ^   t          j                    | _        t                      | _        i | _        t          j                    | _        t          j        j	        | _
        t          j        | j
                  | _        t                      }|(t                      | _        t!          | j                   n|| _        t#                      }|(t%                      | _        t)          | j                   n|| _        t          j        j        rt/          t1                      g           d S d S )N)integrations)asyncioEventshutdown_eventrM   active_tasksworker_ownershipqueue_empty_flagr   DERIVERWORKERSworkers	Semaphore	semaphorer"   r!   dream_schedulerr#   r&   r%   reconciler_schedulerr'   SENTRYENABLEDr*   r   )selfexisting_schedulerexisting_reconcilers      r;   __init__zQueueManager.__init__z   s   -4]__58UU<>/6} %,4,3,=dl,K,K 122%3A3C3CD  45555#5D  788&=P=R=RD%$T%>????(;D% ?" 	C,>,@,@+ABBBBBB	C 	Cr:   taskNrK   c                 x    | j                             |           |                    | j         j                   dS )zTrack a new taskN)rh   rP   add_done_callbackdiscard)rt   rx   s     r;   add_taskzQueueManager.add_task   s9    d###t0899999r:   	worker_idr1   r2   c                 6    t          ||          | j        |<   dS )z,Track a work unit owned by a specific workerN)r0   ri   )rt   r}   r1   r2   s       r;   track_worker_work_unitz#QueueManager.track_worker_work_unit   s!     ,;=&+Q+Qi(((r:   c                 l    | j                             |          }|r|j        |k    r| j         |= dS dS dS )z'Remove a work unit from worker trackingN)ri   rW   r1   )rt   r}   r1   	ownerships       r;   untrack_worker_work_unitz%QueueManager.untrack_worker_work_unit   sN    )--i88	 	10MAA%i000	1 	1AAr:   c                     t                      S )z4Generate a unique worker ID for this processing task)generate_nanoidrt   s    r;   create_worker_idzQueueManager.create_worker_id   s       r:   c                 *    t          | j                  S )z7Get the total number of work units owned by all workers)lenri   r   s    r;   get_total_owned_work_unitsz'QueueManager.get_total_owned_work_units   s    4()))r:   c                    K   t                               d j         d           t          j                    }t
          j        t
          j        f}|D ]}|                    ||f fd	           t                               d           	  j	        
                                 d{V  n*# t          $ r t                               d           Y nw xY wt                               d           	                                   d{V                                    d{V  dS #                                   d{V  w xY w)zISetup signal handlers, initialize client, and start the main polling loopzInitializing QueueManager with z workersc                 R    t          j                            |                     S N)re   create_taskshutdown)srt   s    r;   <lambda>z)QueueManager.initialize.<locals>.<lambda>   s    7#6t}}Q7G7G#H#H r:   zSignal handlers registeredNz$Failed to start reconciler schedulerzStarting polling loop directly)loggerdebugrm   re   get_running_loopsignalSIGTERMSIGINTadd_signal_handlerrq   start	Exception	exceptionpolling_loopcleanup)rt   loopsignalssigs   `   r;   
initializezQueueManager.initialize   s     Mt|MMMNNN '))>6=1 	 	C##cHHHHH    	1222	E+113333333333 	E 	E 	ECDDDDD	E 	5666	!##%%%%%%%%%,,..         $,,..        s   B- -$CC2D( (Er   c                   K   t                               d|j         d           | j                                         | j                                         d{V  | j                                         d{V  | j        rNt                               dt          | j                   d           t          j        | j        ddi d{V  dS dS )zHandle graceful shutdownzReceived exit signal z...NzWaiting for z active tasks to complete...return_exceptionsT)r   infonamerg   rM   rp   r   rq   rh   r   re   gather)rt   r   s     r;   r   zQueueManager.shutdown   s     9CH999:::!!! "++--------- '00222222222 	MKKSs4#455SSS   .$"3LtLLLLLLLLLLL		M 	Mr:   c                 |  K   |                                  }|dk    rt                              d| d           	 t          d          4 d{V }d | j                                        D             }|rg|                    t          t          j	                  
                    t          j	        j                            |                               d{V  |                                 d{V  ddd          d{V  n# 1 d{V swxY w Y   nf# t          $ rY}t                              dt!          |                      t"          j        j        rt)          j        |           Y d}~nd}~ww xY w| j                                         dS # | j                                         w xY wdS )zClean up owned work unitsr   zCleaning up z owned work units...queue_cleanupNc                     g | ]	}|j         
S r9   )r2   ).0r   s     r;   
<listcomp>z(QueueManager.cleanup.<locals>.<listcomp>   s(       -6	(  r:   zError during cleanup: )r   r   r   r   ri   valuesexecuter   r   ActiveQueueSessionwhererN   in_commitr   errorr7   r   rr   rs   
sentry_sdkcapture_exceptionclear)rt   total_work_unitsrJ   aqs_idses        r;   r   zQueueManager.cleanup   si     ::<<aLLN(8NNNOOO.%o66 
& 
& 
& 
& 
& 
& 
&" :>:O:V:V:X:X  G   jj"6#<==CC & 9 < @ @ I I         
 ))++%%%%%%%
& 
& 
& 
& 
& 
& 
& 
& 
& 
& 
& 
& 
& 
& 
& 
& 
& 
& 
& 
& 
& 
& 
& 
& 
& 
& 
&  4 4 4>c!ff>>????* 403334
 %++-----%++----)  sU   D B'D9D 
DD DD F 
F 'AE;6F ;F  F F9c                 n  K   t          d          4 d{V }t          j        t          j                  t          t          j        j                  z
  }|	                    t          t          j        j                                      t          j        j        |k                                   t          j        j                                      d                     d{V                                                                 }|rg|	                    t)          t          j                                      t          j        j                            |                               d{V  |                                 d{V  ddd          d{V  dS # 1 d{V swxY w Y   dS )zClean up stale work unitscleanup_stale_work_unitsN)minutesT)skip_locked)r   r   nowr	   utcr   r   rk   STALE_SESSION_TIMEOUT_MINUTESr   r   r   r   rN   r   last_updatedorder_bywith_for_updatescalarsallr   r   r   )rt   rJ   cutoff	stale_idss       r;   r   z%QueueManager.cleanup_stale_work_units   s*     899 	 	 	 	 	 	 	R\(,//) (F3 3 3 F **v8;<<v8ENOO!&";"HII(T::	           jj6455;;1488CC         
 ))++3	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   E9F$$
F.1F.c           
        K   t          d| j        |                                 z
            }|dk    ri S t          j        j        }t          d          4 d{V }d}t          t          j	        j
        t          j        t          j        j                                      d                                        t          j        t          j	        j        t          j        j        k                                  t          j	        j                                       t          j	        j
                            |                                        t          j	        j
                                                  }t          t          j	        j
                                      t          j	        j                                       t          j	        j
                                                  }t          |j        j
                                      |                              ||j        j
        |j        j
        k                                  t          t          j        j                                      t          j        j
        |j        j
        k                                                         }t          j        j        sh|dk    rb|                    t=          |j        j
                            |           t          j        |j        j         d          |k                        }|!                    |           d{V }|"                                #                                }	|	s.|$                                 d{V  i cddd          d{V  S | %                    ||	           d{V }
|$                                 d{V  |
cddd          d{V  S # 1 d{V swxY w Y   dS )a-  
        Get available work units that aren't being processed.
        For representation tasks, only returns work units with accumulated tokens
        >= REPRESENTATION_BATCH_MAX_TOKENS (forced batching), unless FLUSH_ENABLED is True.
        Returns a dict mapping work_unit_key to aqs_id.
        r   get_available_work_unitsNzrepresentation:total_tokens)&maxrm   r   r   rk   REPRESENTATION_BATCH_MAX_TOKENSr   r   r   r$   r1   r   sumrG   token_countlabeljoin
message_idrN   r   	processed
startswithgroup_bysubqueryclimit	outerjoinr   existsFLUSH_ENABLEDr   coalescer   r   r   r   r   claim_work_units)rt   r   rE   rJ   representation_prefixtoken_stats_subqwork_units_subqqueryresultavailable_unitsclaimed_mappings              r;   get_and_claim_work_unitsz%QueueManager.get_and_claim_work_units  s6      DL4+J+J+L+LLMMA::I#+K899 >	# >	# >	# >	# >	# >	# >	#R$5!$2HV^788>>~NN  N$/6>3DD  (2233v'5@@AVWWXX&*899   v'566(2233&*899	  (677u$#%37G7I7WW  F5899U1?*,:;  VXX  $ #1 	6F6J6J(*8CC1   &6&8&EqII+,	   ::e,,,,,,,,F$nn..2244O" iikk!!!!!!!s>	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	#v %)$9$9"o$N$NNNNNNNO))++"}>	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	# >	#s   L8O0&7O00
O:=O:rJ   work_unit_keysc           	        K   d |D             }t          t          j                                      |                                                              t          j        j        t          j        j                  }|                    |           d{V }|	                                }d |D             }t                              dt          |           dt          |                                                      |S )z
        Claim work units and return a mapping of work_unit_key to aqs_id.
        Returns only the work units that were successfully claimed.
        c                     g | ]}d |iS )r1   r9   )r   keys     r;   r   z1QueueManager.claim_work_units.<locals>.<listcomp>j  s    CCCS?C(CCCr:   Nc                 ,    i | ]}|d          |d         S )r      r9   )r   rows     r;   
<dictcomp>z1QueueManager.claim_work_units.<locals>.<dictcomp>w  s"    BBBc3q63q6BBBr:   zClaimed z work units: )r   r   r   r   on_conflict_do_nothing	returningr1   rN   r   r   r   r   r   rF   keys)rt   rJ   r   r   stmtr   claimed_rowsr   s           r;   r   zQueueManager.claim_work_unitsc  s       DCNCCC 6,--VF^^##%%Y)79R9U 	 	 zz$''''''''zz||BB\BBBXs?++XX$?S?S?U?U:V:VXX	
 	
 	
 r:   c                   K   t                               d           	 | j                                        sN| j                                        rCt          j        t          j        j	                   d{V  | j        
                                 v| j                                        r*t          j        t          j        j	                   d{V  	 |                                  d{V  |                                  d{V }|r|                                D ]\  }}| j                                        sh|                                 }|                     |||           t          j        |                     ||                    }|                     |           nB| j                                         t          j        t          j        j	                   d{V  n# t,          $ rr}t                               d           t          j        j        rt5          j        |           t          j        t          j        j	                   d{V  Y d}~nd}~ww xY w| j                                        Nt                               d           dS # t                               d           w xY w)z4Main polling loop to find and process new work unitszStarting polling loopNzError in polling loopzPolling loop stopped)r   r   rg   is_setrj   re   sleepr   rk   POLLING_SLEEP_INTERVAL_SECONDSr   ro   lockedr   r   itemsr   r   r   process_work_unitr|   rM   r   r   rr   rs   r   r   r   )rt   claimed_work_unitsr1   r2   r}   rx   r   s          r;   r   zQueueManager.polling_loop}  s      ,---+	0)0022 (Y(//11 !-(8(WXXXXXXXXX)//111 >((** !-(8(WXXXXXXXXXY77999999999/3/L/L/N/N)N)N)N)N)N)N&) 5G5M5M5O5O 4 41M6#'#6#=#=#?#? 
4,0,A,A,C,C	 $ ; ;$-}f!" !" !" 4;3F$($:$:=)$T$T4" 4" !%d 3 3 34 -11333%m$,K         ! Y Y Y$$%<===. 8"4Q777!-(8(WXXXXXXXXXXXXXXYG )0022 (YT KK./////FKK.////s8   B9J" DG- ,J" -
I)7A(I$J" $I))J" "J>r   r   contextc                   K   |j         j         dt          |           }	 |r#|                     |d         ||           d{V  n9# t          $ r,}t
                              d| d| d           Y d}~nd}~ww xY wt
                              d| d| d| d           t          j        j	        rt          j        |           dS dS )	au  
        Handle processing errors by marking queue items as errored, logging, and forwarding to Sentry.
        We only mark the first queue item as errored so we don't potentially throw away a batch. This allows us
        to incrementally attempt to process the batch while still maintaining progress in a work unit.

        Args:
            error: The exception that occurred
            items: The queue items that were being processed
            work_unit_key: The work unit key for the queue items
            context: Context string describing what was being processed (e.g., "processing representation batch")
        : r   Nz4Failed to mark queue items as errored for work unit Texc_infozError z for work unit )	__class__r3   r7   mark_queue_item_as_erroredr   r   r   r   rr   rs   r   r   )rt   r   r   r1   r   	error_msg
mark_errors          r;   _handle_processing_errorz%QueueManager._handle_processing_error  sL     $ /??3u::??			 55!HmY          	 	 	LLd}ddXbdd         	 	EWEE]EEeEE 	 	
 	
 	
 ?" 	0(/////	0 	0s   %A 
A>"A99A>c                 b  K   t                               d|            t          |          }| j        4 d{V  d}	 | j                                        sV| j                            |          }|r|j        |k    r%t           	                    d| d| d|            n
	 |j
        dk    r|                     |j
        ||j                   d{V }|j        }|j        }|j        }	t                               d| dt!          |           d	t!          |           d
| d|j         d           |s"t                               d| d|            nT	 |d         j        }
|
                    d          }||
                    d          }|r|g}ng }d |D             }t%          ||	||j        ||j        |j        |j                   d{V  |                     ||           d{V  |t!          |          z  }n# t0          $ r1}|                     |||d|j
         d           d{V  Y d}~nd}~ww xY w|                     |j
        ||j                   d{V }|s"t                               d| d|            n	 t7          |           d{V  |                     |g|           d{V  |dz  }n6# t0          $ r)}|                     ||g|d           d{V  Y d}~nd}~ww xY wn^# t0          $ rQ}t                               d| d| d           t:          j        j        rtA          j!        |           Y d}~nd}~ww xY w| j                                        rt                               d|           n| j                                        V| j                            |          }|r-|j        |k    r"| "                    |j        |           d{V }nd}| #                    ||           |r|dk    r	 |j
        dv rl|j$        et                               d| d|j$                    tK          tM          |j$        |j
        |j'        |j(        |j                              d{V  nz# t0          $ r t           )                    d!           Y nSw xY wt                               d"| d#           n/# | j                            |          }|r-|j        |k    r"| "                    |j        |           d{V }nd}| #                    ||           |r|dk    r	 |j
        dv rm|j$        gt                               d| d|j$                    tK          tM          |j$        |j
        |j'        |j(        |j                              d{V  w w w # t0          $ r t           )                    d!           Y w w xY wt                               d"| d#           w xY wddd          d{V  dS # 1 d{V swxY w Y   dS )$zSProcess all queue items for a specific work unit by routing to the correct handler.zStarting to process work unit Nr   zWorker z lost ownership of work unit z, stopping processing representationz retrieved z messages and z queue items for work unit z
 (AQS ID: )z-No more queue items to process for work unit z for worker 	observersobserverc                 *    g | ]}|j         	|j         S r   r   r   r]   s     r;   r   z2QueueManager.process_work_unit.<locals>.<listcomp>  s,     :" :" :"(,'+'B %)O'B'B'Br:   )r  observedqueue_item_message_idsrC   rD   rE   zprocessing z batchr   zprocessing queue itemz'Error in processing loop for work unit r   Tr   z8Shutdown requested, stopping processing for work unit %sFr  summaryz!Publishing queue.empty event for z in workspace )workspace_id
queue_type
session_idr  r
  z$Error triggering queue_empty webhookz
Work unit z7 already cleaned up by another worker, skipping webhook)*r   r   r+   ro   rg   r   ri   rW   r1   warning	task_typeget_queue_item_batchr2   r@   rA   rB   r   rV   r    r
  rC   rD   rE   mark_queue_items_as_processedr   r  get_next_queue_itemr   r   r   rr   rs   r   r   _cleanup_work_unitr   workspace_namer-   r,   session_namer  r   )rt   r1   r}   	work_unitqueue_item_countr   batch_resultr@   rA   message_level_configurationrV   r  legacy_observerr  r   
queue_itemremoveds                    r;   r   zQueueManager.process_work_unit  sD
     EmEEFFF'66	> N	 N	 N	 N	 N	 N	 N	 N	 L-4466 f $ 5 9 9) D DI$ 	(?=(P(P Ci  C  Cm  C  C  tA  C  C   V<$.2BBB151J1J ) 3]IDT2 2 , , , , , ,L 0</L,/;/L,:F:T7"LL !D)  !D  !DDT@U@U  !D  !Dehiyezez  !D  !D  Xe  !D  !D  qz  qA  !D  !D  !D   $4 & &$zTa$z$zox$z$z!" !" !" !&%"*:1*=*E,3KK,D,D	#,#46=kk*6M6MO'6 %75D4E		46	:" :"0@:" :" :" 6
 'C$4$?.7-6-?;Q8D8X6B6T5A5R	'" 	'" 	'" 	!" 	!" 	!" 	!" 	!" 	!" 	!" '+&H&H$4m'" '" !" !" !" !" !" !" !" !1C8H4I4I I 0 0#, " " "&*&C&C$%$4$1$M)2E$M$M$M	'" '" !" !" !" !" !" !" !" !" !" !" !" !"" 04/G/G ) 3]IDT0 0 * * * * * *J $. & &$zTa$z$zox$z$z!" !" !" !&"&2:&>&> > > > > > > >&*&H&H%/L-'" '" !" !" !" !" !" !" !" !1A 5 0 0#, " " "&*&C&C$%%/L$1$;	'" '" !" !" !" !" !" !" !" !" !" !" !" !"" % < < <ZmZZWXZZ%) %    $?2 <&8;;;< *1133 V)   M -4466 fT 594I4M4Mi4X4X	 $!8M!I!I$($;$;!(-% %      GG $G--iGGG /!33Q%/3PPP ) 8 D"LL {M { {ajay { {   #8 /1:1I/8/B/8/E-6-?-6-?!" !" !"# #        % Q Q Q(()OPPPPPQ LLk]kkk   ? 594I4M4Mi4X4X	 $!8M!I!I$($;$;!(-% %      GG $G--iGGG /!33Q%/3PPP ) 8 D"LL {M { {ajay { {   #8 /1:1I/8/B/8/E-6-?-6-?!" !" !"# #         Q D % Q Q Q(()OPPPPPQ LLk]kkk   YN	 N	 N	 N	 N	 N	 N	 N	 N	 N	 N	 N	 N	 N	 N	 N	 N	 N	 N	 N	 N	 N	 N	 N	 N	 N	 N	 N	 N	 N	s  X A&S'B3LSB,H
	L

I'I ;L IALS7KL
K?K:5L:K??LS
MAMSMAS0A)XA5RX$R95X8R99#XA*XA5V?<X?$W&	#X%W&	&"XX
X(+X(r  c                 N  K   |dk    rt          d          t          d          4 d{V }t          j        j        |k    t          j        j        |k    g} t          t          j                                      t          j        t          j        j        t          j        j        k              	                    t          j        j        |k              	                    t          j        j
                   j	        |                     t          j        j                                      d          }|                    |           d{V }|                                }|                                 d{V  |cddd          d{V  S # 1 d{V swxY w Y   dS )z<Get the next queue item to process for a specific work unit.r  z>representation tasks are not supported for get_next_queue_itemr  Nr   )
ValueErrorr   r   r   r1   rN   r   r$   r   r   r   r   r   r   scalar_one_or_noner   )	rt   r  r1   r2   rJ   aqs_conditionsr   r   r  s	            r;   r  z QueueManager.get_next_queue_itemj  s     
 (((P   344 	 	 	 	 	 	 	 )7=H),6Nv'((-$20>? 
 v'5FGG(2233( &*-..q  ::e,,,,,,,,F2244J ))++5	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   EF
F!Fr>   c                   K   |dk    rt          | d          t          j        j        }t          j        j        }t          |          }g }g }t          d          4 d{V }	|	                    t          t          j
        j                                      t          j
        j        |k                                  t          j
        j        |k                         d{V }
|
                                s#t          ||          cddd          d{V  S t          t!          j        t          j        j                                                t          j                                      t          j        t          j        j        t          j        j        k                                  t          j        j                                       t          j        j        |j        k                                  t          j        j        |j        k                                  t          j        j        |k                                              }t          t!          j        t          j        j                                                t          j        j        |j        k                                  t          j        j        |j        k                                  t          j        j        |k                                               }t          t          j        j                                      t          j        j        |k                                  t          j        j        |j        k                                              }t!          j        ||          }t          t          j        j                            d          t          j        j                             d          t          j        j                            d          t!          j!        t          j        j                   "                    t          j        j        	                              d
                                        t          j        j        |j        k                                  t          j        j        |j        k                                  t          j        j        |k              #                                }t          |j$        j        |j$        j         |j$        j        |j$        j%        t!          j&        |j$        j%        |k              "                                                    d                    '                    |j$        j                  (                                }|j$        j%        |k    |j$        j        |k    z  }t          t          j        t          j        |j$        j)                            d                                        |                              t          j        t          j        j        |j$        j        k              *                    t          j        tW          t          j        j        |k    t          j        j         t          j        j        t          j        j        k                                            |          '                    t          j        j        t          j        j                  }|	                    |           d{V }|,                                }|s#t          ||          cddd          d{V  S |r!|dk    rt[          |d         d                   nd}t]                      }|D ]U\  }}}|j        |vr/|/                    |           |0                    |j                   ||/                    |           Vtc          |	||           |rt7          d |D                       nd}te          |          \  }}|r't7          d |D                       fd|D             }|rt7          d |D                       nd}|dk    r|	||k    r|}nd}	 ddd          d{V  n# 1 d{V swxY w Y   t          ||||||          S )aj  
        Batch processing for representation and agent tasks.

        Returns a `QueueBatchResult` carrying:
        - messages_context: unique Message rows (conversation turns) forming the context window
        - items_to_process: QueueItems for the current work_unit_key within that window
        - configuration: Resolved configuration for the batch
        - hit_batch_token_cap: True when the cumulative-token window clamped the batch
        - was_flush_enabled: snapshot of `settings.DERIVER.FLUSH_ENABLED` at fetch time
        - batch_max_tokens: snapshot of the cap actually applied to this batch
        r  z1 tasks are not supported for get_queue_item_batchr  N)rD   rE   r   r   	peer_name)r   cumulative_token_countcap_exceededr      Fc              3   2   K   | ]}|j         	|j         V  d S r   r  r   qis     r;   	<genexpr>z4QueueManager.get_queue_item_batch.<locals>.<genexpr>O  8        }0 M0000 r:   c              3   2   K   | ]}|j         	|j         V  d S r   r  r*  s     r;   r,  z4QueueManager.get_queue_item_batch.<locals>.<genexpr>\  s8       0 0}0 M00000 0r:   c                 *    g | ]}|j         k    |S r9   rN   )r   mmax_queue_item_message_ids     r;   r   z5QueueManager.get_queue_item_batch.<locals>.<listcomp>a  s-     $ $ $14;T3T3TA3T3T3Tr:   c              3   2   K   | ]}|j         	|j         V  d S r   r  r*  s     r;   r,  z4QueueManager.get_queue_item_batch.<locals>.<genexpr>f  r-  r:   )r@   rA   rB   rC   rD   rE   )3r!  r   rk   r   r   r+   r   r   r   r   r   rN   r   r1   r"  r>   r   minrG   select_fromr$   r   r   r   r  r  scalar_subqueryr   r%  r
  r   r   r   r   overr   r   r&  bool_orr   cter'  r   r   r   rH   rM   rY   rP   rT   r`   )rt   r  r1   r2   rE   rD   
parsed_keyr@   rA   rJ   ownership_checkmin_unprocessed_message_id_subqimmediately_preceding_id_subqpreceding_message_id_subqeffective_start_id	inner_cter9  allowed_conditionr   r   rowscap_exceeded_from_queryseen_messagesr1  r+  _caplast_queued_id_beforer[   last_queued_id_afterrC   r2  s                                 @r;   r  z!QueueManager.get_queue_item_batch  s	     $ (((OOO   $+K$,:(77
13,.455 ]	, ]	, ]	, ]	, ]	, ]	, ]	, %'JJv0344v0>-OPPv03v=>>% %      O
 #5577 '&7%5  ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	,0 tx 12233V-..N$/6>3DD  (2233v~2j6MMNNv~4
8QQRRv'5FGG "" ,  tx 12233v~2j6MMNNv~4
8QQRRv~(+JJKK "" * v~())v~(,IIJJv~/:3FFGG ""	 & "&)+J" " N%++L99N.44]CCN,22;??HV^788T6>#4T55U344  v~2j6MMNNv~4
8QQRRv~(,>>??   K*K+K)K6L!CFV!VWWTVVU>**  )+011  -1AAE$(GG  N$E&,,^<< 
 S!!fnfn&735;K&KLL$(6-G)33(3v~7HH   ())&.+V-=-@AA# ( ::e,,,,,,,,F::<<D '&7%5  C]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	,T %)L-=-A-AT!WQZ   u $ '*eeM# 0 02t4},,$++A...!%%ad+++>$++B///
 (,<>NOOO  $  .       " 1M 1 1-o   ,/ 0 0.0 0 0 - -)
$ $ $ $/$ $ $  $  .       !< !1$$)5)-AAA&=##&+#+{]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	, ]	,~  --) 3/-
 
 
 	
s!   &B(b2!Yb2;D$b22
b<?b<c                   K   |sd S t          d          4 d {V }t          |          }d |D             }|                    t          t          j                                      t          j        j                            |                                        t          j        j	        |k              
                    d                     d {V  |                    t          t          j                                      t          j        j	        |k              
                    t          j                                         d {V  |                                 d {V  |j        dv rF|j        ?t"          j        j        r.t)          j        t-          |          |j        |j                   d d d           d {V  d S # 1 d {V swxY w Y   d S )Nprocess_queue_item_batchc                     g | ]	}|j         
S r9   r0  r	  s     r;   r   z>QueueManager.mark_queue_items_as_processed.<locals>.<listcomp>  s    222D222r:   T)r   r   r  )countr  r  )r   r+   r   r   r   r$   r   rN   r   r1   r   r   r   r   r   r  r  r   METRICSrs   r)   record_deriver_queue_itemr   )rt   r   r1   rJ   r  item_idss         r;   r  z*QueueManager.mark_queue_items_as_processed  su       	F899 	 	 	 	 	 	 	R+M::I22E222H**v'((v'*..x8899v'5FGG$''	         **v011v0>-OPPTXZZ00        
 ))++ #'DDD,8$, 9 #<e**#,#;'1   +	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   F#G
GGr]   c           	        K   |sdS t          d          4 d{V }|                    t          t          j                                      t          j        j        |j        k                                  t          j        j        |k                                  d|dd                              d{V  |                    t          t          j	                                      t          j	        j        |k                                  t          j                                         d{V  |                                 d{V  ddd          d{V  dS # 1 d{V swxY w Y   dS )z*Mark queue item as processed with an errorNr   Ti  )r   r   rK  )r   r   r   r   r$   r   rN   r1   r   r   r   r   r   )rt   r]   r1   r   rJ   s        r;   r   z'QueueManager.mark_queue_item_as_errored  s       	F:;; 	 	 	 	 	 	 	r**v'((v'*dg566v'5FGG$eFUFm<<	         **v011v0>-OPPTXZZ00        
 ))++	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   D8E''
E14E1c           
        K   t          d          4 d{V }t          t          t                   |                    t          t          j                                      t          j        j	        |k                                  t          j        j
        |k                         d{V           }|                                 d{V  |j        dk    cddd          d{V  S # 1 d{V swxY w Y   dS )zY
        Clean up a specific work unit session by both work_unit_key and AQS ID.
        cleanup_work_unitNr   )r   r   r   r   r   r   r   r   r   rN   r1   r   rowcount)rt   r2   r1   rJ   r   s        r;   r  zQueueManager._cleanup_work_unit  s      122 
	' 
	' 
	' 
	' 
	' 
	' 
	'bS!jj6455U6476ABBU64BmSTT        F ))++?Q&
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	' 
	's   CC,,
C69C6)rK   N)&r3   r4   r5   rw   re   r   r|   r7   r   r   r   rI   r   r   r   Signalsr   r   r   dictr   r   r   r   r   r   rF   r$   r  r   r   tracer  r  r  r   rH   r  r9   r:   r;   rb   rb   y   sr       C C C<:W\$/ :D : : : :
RR-0R:=R	R R R R1# 1c 1d 1 1 1 1!# ! ! ! !*C * * * *! ! ! !4M&. MT M M M M". . . .:   :K#S#X K# K# K# K#Z08	c3h   4.0 .0 .0 .0h#0#0 I#0 	#0
 #0 
#0 #0 #0 #0JRS RS RT R R R Rh ""-0":="	T	" " " "H B
B
 B
 	B

 
B
 B
 B
 B
H)_58	   @.1:=	   ('' ' 
	' ' ' ' ' 'r:   rb   c                    K   t                               d           	 t                       d {V  n2# t          $ r%} t                               d|            Y d } ~ nd } ~ ww xY wt                      }	 |                                 d {V  nU# t          $ rH} t                               dt          |                       t          j
        |            Y d } ~ nd } ~ ww xY wt                       d {V  t                               d           d S # t                       d {V  t                               d           w xY w)NzStarting queue managerzGError initializing cache in queue manager; proceeding without cache: %szError in main: zMain function exiting)r   r   r   r   r  rb   r   r   r7   r   r   r   )r   managers     r;   mainrY    s     
LL)***
ll 
 
 
UWX	
 	
 	
 	
 	
 	
 	
 	


 nnG.  """""""""" ( ( (/s1vv//000$Q''''''''( mm,----- mm,----sD   3 
A"AA"4B D 
C!>CD C!!D 0E)Sre   r   r   collections.abcr   dataclassesr   r   r   r   r	   loggingr
   typingr   r   r   r   dotenvr   nanoidr   r   sentry_sdk.integrations.asyncior   
sqlalchemyr   r   r   r   r   sqlalchemy.dialects.postgresqlr   sqlalchemy.enginer   sqlalchemy.ext.asyncior   sqlalchemy.sqlr   srcr   src.cache.clientr   r   
src.configr   src.dependenciesr   src.deriver.consumerr   r    src.dreamer.dream_schedulerr!   r"   r#   
src.modelsr$   src.reconcilerr%   r&   r'   src.schemasr(   src.telemetryr)   src.telemetry.sentryr*   src.utils.work_unitr+   src.webhooks.eventsr,   r-   r3   r   r0   r>   rF   rG   rT   tupler`   rb   rY  r9   r:   r;   <module>rt     s           $ $ $ $ $ $ ( ( ( ( ( ( ( ( 2 2 2 2 2 2 2 2 2 2       ( ( ( ( ( ( ( ( ( (           . . . . . . > > > > > > 8 8 8 8 8 8 8 8 8 8 8 8 8 8 1 1 1 1 1 1 * * * * * * / / / / / /             4 4 4 4 4 4 4 4       ' ' ' ' ' '                
 !                   
 . - - - - - , , , , , , 2 2 2 2 2 2 3 3 3 3 3 3       
 
8		 T        j    $       $6>* 9o 
	   (9o(
4	?1D889( ( ( (6a' a' a' a' a' a' a' a'H. . . . .r:   