
    yj%                     x   U d Z ddlZddlZddlZddlmZmZmZ ddlZddlm	Z	 ddl
mZmZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ  ej        e          Z G d de	          ZdZ eddej        j                   edde          dZee ef         e!d<   da"de!d<   ddZ#ddZ$ G d d          Z%dS )a:  
Reconciler scheduler for self-healing background tasks.

This module provides a scheduler for running reconciliation and cleanup tasks
like vector store sync and soft-delete cleanup. It ensures only one task of each
type runs at a time across multiple deriver instances by using the queue table
for coordination.
    N)datetime	timedeltatimezone)	BaseModel)existsselect)IntegrityError)models)settings)
tracked_db)	QueueItemc                   2    e Zd ZU dZeed<   eed<   eed<   dS )ReconcilerTaskz Definition of a reconciler task.namework_unit_keyinterval_secondsN)__name__
__module____qualname____doc__str__annotations__int     @/DATA/AppData/hermes/projects/honcho/src/reconciler/scheduler.pyr   r      s9         **
IIIr   r   i  sync_vectorszreconciler:sync_vectors)r   r   r   cleanup_queuezreconciler:cleanup_queue)r   r   RECONCILER_TASKSReconcilerScheduler | None_reconciler_scheduler	schedulerReconcilerSchedulerreturnc                 
    | a dS )z.Set the global reconciler scheduler reference.Nr!   )r"   s    r   set_reconciler_schedulerr'   9   s     &r   c                      t           S )z.Get the global reconciler scheduler reference.r&   r   r   r   get_reconciler_schedulerr)   ?   s      r   c                        e Zd ZU dZdZded<   dZeed<    fdZd Z	e
dd
            ZddZddZddZded	efdZ xZS )r#   a  
    Scheduler for self-healing reconciliation and cleanup tasks.

    Ensures only one task of each type runs at a time across multiple deriver
    instances by using the queue table for coordination. This provides:
    - Vector store synchronization (syncing pending documents/embeddings)
    - Soft-delete cleanup (removing soft-deleted documents from vector stores)
    - Self-healing behavior (retrying failed syncs)
    - Extensible task registry for adding new maintenance tasks

    Each task type has its own interval and work_unit_key, allowing multiple
    different tasks to be queued simultaneously while preventing duplicates
    of the same task type.
    Nr    	_instanceF_initializedc                 l    | j         &t                                          |           | _         | j         S )N)r+   super__new__)cls	__class__s    r   r/   zReconcilerScheduler.__new__W   s*    = !GGOOC00CM}r   c                     t           j        s4d | _        t          j                    | _        i | _        dt           _        d S d S )NT)r#   r,   _scheduler_taskasyncioEvent_shutdown_event	_next_runselfs    r   __init__zReconcilerScheduler.__init__\   sC    "/ 	4>BD 29-//D 24DN/3,,,	4 	4r   r$   c                 "    d| _         d| _        dS )z5Reset the singleton instance. Only use this in tests.NF)r+   r,   )r0   s    r   reset_singletonz#ReconcilerScheduler.reset_singletone   s      r   c                 6  K   | j         t                              d           dS | j                                         t          j        t          j                  }t          
                                D ]%\  }}|t          |j                  z   | j        |<   &t          j        |                                           | _         t                              dt%          t                    t'          t                                                               dS )z$Start the reconciler scheduler loop.Nz#ReconcilerScheduler already runningsecondsz-ReconcilerScheduler started with %d tasks: %s)r3   loggerwarningr6   clearr   nowr   utcr   itemsr   r   r7   r4   create_task_scheduler_loopinfolenlistkeys)r9   rC   	task_nametasks       r   startzReconcilerScheduler.startk   s      +NN@AAAF""$$$l8<((/5577 	W 	WOIt(+i@U.V.V.V(VDN9%%&243G3G3I3IJJ; !!!&&(())	
 	
 	
 	
 	
r   c                 R  K   | j         dS t                              d           | j                                         	 t          j        | j         d           d{V  n# t
          j        $ ry t                              d           | j         	                                 t          j        t
          j                  5  | j          d{V  ddd           n# 1 swxY w Y   Y nw xY wd| _         | j                                         t                              d           dS )zStop the reconciler scheduler.Nz$Shutting down ReconcilerScheduler...g      @timeoutz7ReconcilerScheduler shutdown timed out, cancelling taskzReconcilerScheduler stopped)r3   r@   rH   r6   setr4   wait_forTimeoutErrorrA   cancel
contextlibsuppressCancelledErrorr7   rB   r8   s    r   shutdownzReconcilerScheduler.shutdown~   sr     'F:;;;  """	+"4#7EEEEEEEEEEE# 	+ 	+ 	+NNTUUU '')))$W%;<< + +********+ + + + + + + + + + + + + + +	+  $122222s7    !A" "A C*CC*C 	 C*#C 	$C*)C*c                 j  K   	 | j                                         st          j        t          j                  }t                                          D ]\  }}| j        	                    ||          }||k    r	 | 
                    |           d{V }|rt                              d|           nW# t          $ rJ}t                              d|           t          j        j        rt%          j        |           Y d}~nd}~ww xY w|t)          |j                  z   | j        |<   | j        rht-          | j                                                  }t1          d|t          j        t          j                  z
                                            }nd}	 t5          j        | j                                         |           d{V  dS # t4          j        $ r Y nw xY w| j                                         dS dS # t4          j        $ r t                              d            w xY w)	z
        Main scheduler loop that enqueues tasks based on their intervals.

        Each task has its own interval and the loop checks all tasks on each
        iteration, enqueueing any that are due.
        NzEnqueued task: %szError enqueueing task %sr>   g      ?g      N@rP   z"ReconcilerScheduler loop cancelled)r6   is_setr   rC   r   rD   r   rE   r7   get_try_enqueue_taskr@   debug	Exception	exceptionr   SENTRYENABLED
sentry_sdkcapture_exceptionr   r   minvaluesmaxtotal_secondsr4   rS   waitrT   rX   )	r9   rC   rL   rM   next_runenqueuedenext_task_timesleep_secondss	            r   rG   z#ReconcilerScheduler._scheduler_loop   s     -	*1133 (l8<00 (8'='='?'?  OIt#~11)SAAHh@-1-C-CD-I-I'I'I'I'I'I'IH' M &-@) L L L( @ @ @",,-GSSS'6 @ * <Q ? ? ?@ 58)$($9; ; ; 5y1
 > )%()>)>)@)@%A%AN$''(,x|*D*DDSSUU% %MM
 %)M!*,1133 -          E+   DM *1133 ( ( ( ( (T % 	 	 	LL=>>>	sV   A7H <8B54H 5
D	?A D?H D		BH 3G G&#H %G&&H +H2rM   c           
        K   t          d          4 d{V }t          t          t          t          j        j                                      t          j        j        |j        k                                  }|                    |           d{V }|r4t          
                    d|j                   	 ddd          d{V  dS t          t          t          t          j                                      t          j        |j        k    t          j        dk                                  }|                    |           d{V }|r4t          
                    d|j                   	 ddd          d{V  dS t          |j        d|j        idddd          }|                    |           	 |                                 d{V  n\# t           $ rO |                                 d{V  t          
                    d	|j                   Y ddd          d{V  dS w xY wt                              d
|j                   	 ddd          d{V  dS # 1 d{V swxY w Y   dS )aY  
        Attempt to enqueue a reconciler task.

        This is idempotent - if the task is already in-progress (has an
        ActiveQueueSession) or pending in the queue, the enqueue is skipped.

        Args:
            task: The task definition to enqueue

        Returns:
            True if a task was enqueued, False if skipped
        reconciler_enqueueNz-Task %s already in progress, skipping enqueueFz2Task %s already pending in queue, skipping enqueuereconciler_type
reconciler)r   payload
session_id	task_typeworkspace_name
message_idz6Task %s already enqueued by another instance, skippingzEnqueued reconciler task: %sT)r   r   r   r
   ActiveQueueSessionidwherer   scalarr@   r^   r   r   	processedaddcommitr	   rollbackrH   )r9   rM   dbin_progress_checkis_in_progresspending_check
is_pending
queue_items           r   r]   z%ReconcilerScheduler._try_enqueue_task   s      233 7	 7	 7	 7	 7	 7	 7	r &64788>>1?4CUU  ! ! $&99->#?#???????N LdiXXX7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	  #9<((..!/43EE!+u4   M  "yy77777777J H$)   =7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	B #"0%ty  &#	 	 	J FF:iikk!!!!!!!!!   kkmm#######Ldi   i7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	\ KK6	BBBo7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	 7	sD   B"I(B&I(6I(=GI(AH1I(0H11$I((
I25I2)r$   N)r   r   r   r   r+   r   r,   boolr/   r:   classmethodr<   rN   rY   rG   r   r]   __classcell__)r1   s   @r   r#   r#   D   s           /3I+222L$    
4 4 4 ! ! ! [!

 
 
 
&3 3 3 3(4 4 4 4lDN Dt D D D D D D D Dr   )r"   r#   r$   N)r$   r    )&r   r4   rV   loggingr   r   r   rc   pydanticr   
sqlalchemyr   r   sqlalchemy.excr	   srcr
   
src.configr   src.dependenciesr   
src.modelsr   	getLoggerr   r@   r   QUEUE_CLEANUP_INTERVAL_SECONDSVECTOR_STORERECONCILIATION_INTERVAL_SECONDSr   dictr   r   r!   r'   r)   r#   r   r   r   <module>r      s
           2 2 2 2 2 2 2 2 2 2           % % % % % % % % ) ) ) ) ) )             ' ' ' ' ' '            		8	$	$    Y    "+  #N/!.N  
 $^07  / / $sN*+    7; 3 : : :& & & &! ! ! !
H H H H H H H H H Hr   