
    yj.V              
       @   d Z ddlZddlZddlZddlmZ ddlmZ ddlm	Z	m
Z
mZmZmZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddl m!Z! ddl"m#Z# ddl$m%Z%m&Z&m'Z'  ej(        e)          Z*dZ+dZ,dZ- ej.        d          Z/deej        dz           dee0         fdZ1e G d d                      Z2e+fdede3de4ej5                 fdZ6e+fdede3de4ej7                 fdZ8ded e4ej5                 ddfd!Z9ded"e4ej7                 ddfd#Z:ded e4ej5                 d$e&de;e3e3f         fd%Z<ded"e4ej7                 d$e&de;e3e3f         fd&Z=e+d'fdede3d(e3de3fd)Z>d$e&d*e2de0fd+Z?d$e&d*e2de0fd,Z@d$e&d*e2de0fd-ZAd*e2de0fd.ZBde2fd/ZCdS )0z
Vector store reconciliation job.

This module provides a periodic reconciliation job that syncs documents and message
embeddings to the vector store on a rolling basis, healing any missed writes.
    N)	dataclass)cast)and_deleteor_selectupdate)AsyncSession)InstrumentedAttribute)ColumnElement)func)models)settings)
tracked_db)embedding_client)VectorStoreError)EmbeddingCallPurpose)embedding_call_purpose)VectorRecordVectorStoreget_external_vector_store2         
   minuteslast_sync_atreturnc                     t          |                     d          | t          j                    t          z
  k               S )zIRows are eligible for sync if never attempted or past the backoff window.N)r   is_r   nowSYNC_BACKOFF)r   s    C/DATA/AppData/hermes/projects/honcho/src/reconciler/sync_vectors.py_backoff_eligibler%   (   s:     txzzL00      c                       e Zd ZU dZdZeed<   dZeed<   dZeed<   dZ	eed<   dZ
eed<   edefd	            Zedefd
            Zedefd            ZdS )ReconciliationMetricsz#Metrics for a reconciliation cycle.r   documents_synceddocuments_faileddocuments_cleanedmessage_embeddings_syncedmessage_embeddings_failedr   c                      | j         | j        z   S N)r)   r,   selfs    r$   total_syncedz"ReconciliationMetrics.total_synced<       $t'EEEr&   c                      | j         | j        z   S r/   )r*   r-   r0   s    r$   total_failedz"ReconciliationMetrics.total_failed@   r3   r&   c                     | j         S r/   )r+   r0   s    r$   total_cleanedz#ReconciliationMetrics.total_cleanedD   s    %%r&   N)__name__
__module____qualname____doc__r)   int__annotations__r*   r+   r,   r-   propertyr2   r5   r7    r&   r$   r(   r(   2   s         --ccs%&s&&&%&s&&&Fc F F F XF Fc F F F XF &s & & & X& & &r&   r(   db
batch_sizec           	        K   t          t          j                                      t	          t          j        j                            d          t          j        j        dk    t          t          j        j	                                      
                    t          j        j	                                                                                                      |                              d          }|                     |           d{V }t!          |                                                                          S )ac  
    Get documents that need to be synced to the vector store.

    Finds documents where:
    - not soft-deleted (deleted_at is NULL)
    - sync_state is "pending" (never synced or retry needed)
    - Note: "synced" = done forever, "failed" = permanent failure (manual intervention)

    Uses FOR UPDATE SKIP LOCKED to prevent concurrent processing.
    NpendingTskip_locked)r   r   Documentwherer   
deleted_atr!   
sync_stater%   r   order_byasc
nullsfirstlimitwith_for_updateexecutelistscalarsallr@   rA   stmtresults       r$   _get_documents_needing_syncrV   I   s       	v	*..t44*i7!&/">?? 

 

 
&/.2244??AA	B	B	z			T	*	* 	 ::d########F  $$&&'''r&   c                 N  K   t          t          j                                      t	          t          j        j        dk    t          t          j        j                                                          t          j        j        	                                
                                                              |                              d          }|                     |           d{V }t          |                                                                          S )a  
    Get pending message embeddings that need to be synced to the vector store.

    Returns only pending embeddings (with full data including embedding vectors).
    The batch_size limits the number of embeddings returned.

    Uses FOR UPDATE SKIP LOCKED to prevent concurrent processing and
    orders by last_sync_at (nulls first) to prioritize never-synced records.

    Note: "synced" = done forever, "failed" = permanent failure (manual intervention)
    rC   TrD   N)r   r   MessageEmbeddingrG   r   rI   r%   r   rJ   rK   rL   rM   rN   rO   rP   rQ   rR   rS   s       r$   $_get_message_embeddings_needing_syncrY   i   s        	v&''	'2i?!&"9"FGG 

 

 
&)6::<<GGII	J	J	z			T	*	* 	 ::d########F  $$&&'''r&   	documentsc           	      V  K   |sd S |D ]}|j         dz   }|t          k    rdnd}|                     t          t          j                                      t          j        j        |j        k                                  ||t          j
                                         d {V  d S N   failedrC   )rI   sync_attemptsr   )r_   MAX_SYNC_ATTEMPTSrO   r	   r   rF   rG   idvaluesr   r"   )r@   rZ   docnew_attempts	new_states        r$   _bump_document_sync_attemptsrf      s         
 
(1, ,0A A AHHy	jj6?##U6?%/00V$*!XZZ   
 
 	
 	
 	
 	
 	
 	
 	
 	

 
r&   
embeddingsc           	      V  K   |sd S |D ]}|j         dz   }|t          k    rdnd}|                     t          t          j                                      t          j        j        |j        k                                  ||t          j
                                         d {V  d S r\   )r_   r`   rO   r	   r   rX   rG   ra   rb   r   r"   )r@   rg   embrd   re   s        r$   %_bump_message_embedding_sync_attemptsrj      s         
 
(1, ,0A A AHHy	jj6*++U6*-788V$*!XZZ   
 
 	
 	
 	
 	
 	
 	
 	
 	
	
 
r&   external_vector_storec                 f  K   |sdS d}d}t           j        j        dk    pt           j        j         }d |D             }i |r	 d |D             }t	          t
          j        j        d          5  t          j	        |           d{V }ddd           n# 1 swxY w Y   t          |          t          |          k    r6t                              d	t          |          t          |                     t          ||d
          D ]\  }	}
|
|	j        <   |r|
|	_        n8# t           $ r+ t                              dt          |                     Y nw xY wfd|D             }|r(t%          | |           d{V  |t          |          z  }i }|D ]R}	|                    d|	j        |	j        |	j                  }|                    |g                               |	           S|                                D ]-\  }}g }g }|D ]}	t5          t6          t8                   dz  |	j                  }||n                    |	j                  }|K|                    t=          |	j        d |D             |	j        |	j        |	j        |	j        |	j         d                     |                    |	           |s	 |!                    ||           d{V  | "                    tG          tH          j%                  &                    tH          j%        j        '                    d |D                                 (                    dtS          j*                    d                     d{V  |t          |          z  }# tV          $ rG t                              d|           t%          | |           d{V  |t          |          z  }Y t           $ rG t                              d|           t%          | |           d{V  |t          |          z  }Y +w xY w||fS )u  
    Sync a batch of pending documents to the external vector store.

    Handles three cases for each document:
    1. Embedding exists in postgres → use it for external upsert
    2. Embedding missing + need postgres storage → re-embed, write to both stores
    3. Embedding missing + external-only mode → re-embed, write to external only

    Returns (synced_count, failed_count).
    r   r   r   pgvectorc                 b    g | ],}t          t          t                   d z  |j                  *|-S r/   )r   rP   float	embedding.0rc   s     r$   
<listcomp>z#_sync_documents.<locals>.<listcomp>   s9       Det);S]$K$K$S$S$S$Sr&   c                     g | ]	}|j         
S r?   contentrr   s     r$   rt   z#_sync_documents.<locals>.<listcomp>       BBBBBBr&   reconciliationparent_categoryNz6Re-embedded %s/%s documents; remaining will be retriedFstrictzFailed to re-embed %s documentsc                 &    g | ]}|j         v|S r?   ra   )rs   rc   freshly_embeddeds     r$   rt   z#_sync_documents.<locals>.<listcomp>   s-       SV;K-K-K-K-K-Kr&   documentc                 ,    g | ]}t          |          S r?   rp   rs   xs     r$   rt   z#_sync_documents.<locals>.<listcomp>      ;;;AuQxx;;;r&   )workspace_nameobserverobservedsession_namelevelra   rq   metadatac                     g | ]	}|j         
S r?   r   )rs   ds     r$   rt   z#_sync_documents.<locals>.<listcomp>  s    .J.J.Jqt.J.J.Jr&   syncedrI   r   r_   z3Vector store unavailable while syncing namespace %sz2Unexpected error syncing documents to namespace %s),r   VECTOR_STORETYPEMIGRATEDr   r   VECTOR_SYNCvaluer   simple_batch_embedlenloggerwarningzipra   rq   	Exception	exceptionrf   get_vector_namespacer   r   r   
setdefaultappenditemsr   rP   rp   getr   r   r   upsert_manyrO   r	   r   rF   rG   in_rb   r   r"   r   )r@   rZ   rk   synced_countfailed_countstore_in_postgresdocs_needing_embedcontentsnew_embeddingsrc   ri   failed_to_embedby_namespacens	namespacedocsdocs_to_syncvector_recordsexistingrq   r   s                       @r$   _sync_documentsr      s       tLL 	"j0V8M8V4V 
     02 Y	YBB/ABBBH'$06 0   U U (8'J8'T'T!T!T!T!T!T!T	U U U U U U U U U U U U U U U >""c*<&=&===L''*++     2N5QQQ ( (S+. ($ ($'CM(  	Y 	Y 	Y>DV@W@WXXXXX	Y   )  O  -*2?????????O,,, 68L 4 4"77*CL#,
 
 	B''..s3333'--// 2. 2.	4.0-/ 	% 	%CDK$.>>H$06F6J6J366R6R   !!v;;;;;*-*<$'L$'L(+(8!$ 
 
 
   $$$$ 		.'33I~NNNNNNNNN**v''v)--.J.J\.J.J.JKKLL8$(**TUVV        
 C---LL 	. 	. 	.NNEy   /r<@@@@@@@@@C---LLL 	. 	. 	.Di   /r<@@@@@@@@@C---LLL	. %%sR   ,D% 1BD% BD% B BD% %2EECNAP,AP,+P,c                 
  K   |sdS d}d}t           j        j        dk    pt           j        j         }d |D             }i |r	 d |D             }t	          t
          j        j        d          5  t          j	        |           d{V }ddd           n# 1 swxY w Y   t          |          t          |          k    r6t                              d	t          |          t          |                     t          ||d
          D ]\  }	}
|
|	j        <   |r|
|	_        n8# t           $ r+ t                              dt          |                     Y nw xY wfd|D             }|r(t%          | |           d{V  |t          |          z  }t'          d |D                       }t)          t*          j        j        t*          j        j                                      t*          j        j                            |                                        t*          j        j        t*          j        j                  }|                     |           d{V                                 }i }|D ].\  }}|                    |g                               |           /i }|                                D ]}tA          |          D ]
\  }}|||<   i }|D ]F}	|!                    d|	j"                  }|                    |g                               |	           G|#                                D ]\  }}g }g }|D ]}	|	j        }||n$                    |	j                  }|*|                    tK          |	j         d||	j                  d |D             |	j        |	j&        |	j'        d                     |                    |	           |s	 |(                    ||           d{V  |                     tS          t*          j                                      t*          j        j                            d |D                                                     dtU          j+                    d                     d{V  |t          |          z  }o# tX          $ rG t                              d|           t%          | |           d{V  |t          |          z  }Y t           $ rG t                              d|           t%          | |           d{V  |t          |          z  }Y w xY w||fS )u  
    Sync a batch of pending message embeddings to the external vector store.

    Handles three cases for each embedding:
    1. Embedding exists in postgres → use it for external upsert
    2. Embedding missing + need postgres storage → re-embed, write to both stores
    3. Embedding missing + external-only mode → re-embed, write to external only

    Returns (synced_count, failed_count).
    rm   r   rn   c                      g | ]}|j         	|S r/   )rq   rs   ri   s     r$   rt   z,_sync_message_embeddings.<locals>.<listcomp>M  s&     9 9 9S]%:%:%:%:r&   c                     g | ]	}|j         
S r?   rv   r   s     r$   rt   z,_sync_message_embeddings.<locals>.<listcomp>T  rx   r&   ry   rz   Nz?Re-embedded %s/%s message embeddings; remaining will be retriedFr|   z(Failed to re-embed %s message embeddingsc                 &    g | ]}|j         v|S r?   r   )rs   ri   r   s     r$   rt   z,_sync_message_embeddings.<locals>.<listcomp>l  s-     6 6 6SV;K-K-K-K-K-Kr&   c                     h | ]	}|j         
S r?   )
message_idr   s     r$   	<setcomp>z+_sync_message_embeddings.<locals>.<setcomp>~  s    ===3===r&   message_c                 ,    g | ]}t          |          S r?   r   r   s     r$   rt   z,_sync_message_embeddings.<locals>.<listcomp>  r   r&   )r   r   	peer_namer   c                     g | ]	}|j         
S r?   r   )rs   es     r$   rt   z,_sync_message_embeddings.<locals>.<listcomp>  s    6R6R6Rqt6R6R6Rr&   r   r   zIVector store unavailable while syncing message embeddings to namespace %sz;Unexpected error syncing message embeddings to namespace %s)-r   r   r   r   r   r   r   r   r   r   r   r   r   r   ra   rq   r   r   rj   rP   r   r   rX   r   rG   r   rJ   rO   rR   r   r   rb   	enumerater   r   r   r   r   r   r   r   r	   r   r"   r   )r@   rg   rk   r   r   r   embs_needing_embedr   r   ri   new_embr   message_idssibling_stmtsibling_rowsembs_by_messageemb_idmsg_idchunk_positionemb_idsposr   r   r   embsembs_to_syncr   r   rq   r   s                                @r$   _sync_message_embeddingsr   2  s       tLL 	"j0V8M8V4V 
9 9!9 9 9 02 	BB/ABBBH'$06 0   U U (8'J8'T'T!T!T!T!T!T!T	U U U U U U U U U U U U U U U >""c*<&=&===U''*++   !$$6u U U U , ,W+2 ($ ,$+CM,  	 	 	:C@R<S<S    	6 6 6 6)6 6 6O  -3BHHHHHHHHHO,,, ==*===>>Kv&)6+B+MNN	v&155kBB	C	C	&)4f6M6P	Q	Q 
 **\222222227799L,.O& > >""62..55f====%'N"))++ ) )$W-- 	) 	)KC%(N6""	) >@L 4 4"77	3CUVVB''..s3333'--// 2. 2.	468-/ 	% 	%C}H$06F6J6J366R6R   !!.CC>#&+ACC;;;;;&)n(+(8%(]   
 
 
 $$$$ 		.'33I~NNNNNNNNN**v.//v.1556R6R\6R6R6RSSTT8$(**TUVV        
 C---LL 	. 	. 	.NN[   8LIIIIIIIIIC---LLL 	. 	. 	.M   8LIIIIIIIIIC---LLL	. %%sR   ,D% 1BD% BD% B BD% %2EECR$$AU4AUU   older_than_minutesc                   K   t           j                             t           j        j                  t          j        |          z
  }t          t          j        j                  	                    t          j        j
                            d                    	                    t          j        j
        |k                                   |                              d          }|                     |           d{V }d |                                D             }|sdS |                     t!          t          j                  	                    t          j        j                            |                               d{V  t$                              dt)          |           d           t)          |          S )	z(
    Cleanup soft-deleted documents
    r   NTrD   c                     g | ]
}|d          S )r   r?   )rs   rows     r$   rt   z<_cleanup_soft_deleted_documents_pgvector.<locals>.<listcomp>  s    ...#s1v...r&   r   zCleaned up z' soft-deleted documents (pgvector mode))datetimer"   timezoneutc	timedeltar   r   rF   ra   rG   rH   is_notrM   rN   rO   rR   r   r   r   debugr   )r@   rA   r   cutoffrT   rU   doc_idss          r$   (_cleanup_soft_deleted_documents_pgvectorr     sx      ""8#4#899H<N"= = = F 	v!""	v)0066	7	7	v)F2	3	3	z			T	*	* 	 ::d########F.....G q **VFO,,226?3E3I3I'3R3RSS
T
TTTTTTTT
LLTs7||TTTUUUw<<r&   metricsc                 ~  K   t          d          4 d{V }t          |           d{V }|s	 ddd          d{V  dS t          |||            d{V \  }}|xj        |z  c_        |xj        |z  c_        |                                 d{V  	 ddd          d{V  dS # 1 d{V swxY w Y   dS )ze
    Reconcile a single batch of documents.

    Returns True if work was done, False otherwise.
    reconciliation_docsNFT)r   rV   r   r)   r*   commit)rk   r   r@   r   r   r^   s         r$   _reconcile_documents_batchr     s      /00 	 	 	 	 	 	 	B044444444 		 	 	 	 	 	 	 	 	 	 	 	 	 	
  /r49NOOOOOOOO  F*    F*  iikk	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	   B,AB,,
B69B6c                 ~  K   t          d          4 d{V }t          |           d{V }|s	 ddd          d{V  dS t          |||            d{V \  }}|xj        |z  c_        |xj        |z  c_        |                                 d{V  	 ddd          d{V  dS # 1 d{V swxY w Y   dS )zn
    Reconcile a single batch of message embeddings.

    Returns True if work was done, False otherwise.
    reconciliation_embsNFT)r   rY   r   r,   r-   r   )rk   r   r@   r   r   r^   s         r$   #_reconcile_message_embeddings_batchr     s      /00 	 	 	 	 	 	 	B9"======== 		 	 	 	 	 	 	 	 	 	 	 	 	 	
  8DBWXXXXXXXX))V3))))V3))iikk	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	r   c                 >  K   ddl m} t          d          4 d{V } ||| t                     d{V }|s	 ddd          d{V  dS |xj        |z  c_        |                                 d{V  	 ddd          d{V  dS # 1 d{V swxY w Y   dS )zq
    Clean up a single batch of soft-deleted documents.

    Returns True if work was done, False otherwise.
    r   )cleanup_soft_deleted_documentsreconciliation_cleanupNrA   FT)src.crud.documentr   r   RECONCILIATION_BATCH_SIZEr+   r   )rk   r   r   r@   cleaneds        r$   _cleanup_documents_batchr     s      A@@@@@233       r66!0
 
 
 
 
 
 
 
 

  	              	!!W,!!iikk                             s   B+B
BBc                 8  K   t          d          4 d{V }t          |t                     d{V }|s	 ddd          d{V  dS | xj        |z  c_        |                                 d{V  	 ddd          d{V  dS # 1 d{V swxY w Y   dS )z
    Clean up a single batch of soft-deleted documents in pgvector-only mode.

    Returns True if work was done, False otherwise.
    reconciliation_pgvector_cleanupNr   FT)r   r   r   r+   r   )r   r@   r   s      r$   _cleanup_pgvector_batchr   0  s      ;<< 	 	 	 	 	 	 	@4
 
 
 
 
 
 
 
 
  		 	 	 	 	 	 	 	 	 	 	 	 	 	 	!!W,!!iikk	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s    B	+B		
BBc                    K   t                      } t                      }t          j                    t          z   }|bt          j                    |k     r/t          |            d{V }|snt          j                    |k     /t                              d           | S t          j                    |k     rt          ||            d{V }t          j                    |k    rn|t          ||            d{V }t          j                    |k    rnNt          ||            d{V }|s|s|st                              d           nt          j                    |k     t                              d           | S )at  
    Run a complete reconciliation cycle.

    Runs a rolling sweep to reconcile missing vectors and clean up soft deletes.
    Uses batching and FOR UPDATE SKIP LOCKED for safe concurrent operation.
    Each batch operation uses its own database session to avoid holding
    connections open for the entire cycle duration.

    Returns metrics about what was synced.
    Nz5Vector reconciliation cycle completed (pgvector mode)z*No work done, breaking reconciliation loopz%Vector reconciliation cycle completed)r(   r   time	monotonic"RECONCILIATION_TIME_BUDGET_SECONDSr   r   infor   r   r   r   )r   rk   deadlinedid_work	docs_work	embs_workcleanup_works          r$   run_vector_reconciliation_cycler   D  s      $%%G577~"DDH $n))4W========H  n)) 	KLLL .

X
%
%45JGTTTTTTTT	>x'' >!7
 
 
 
 
 
 
 
	 >x'' 66KWUUUUUUUU  	Y 	, 	LLEFFF+ .

X
%
%. KK7888Nr&   )Dr;   r   loggingr   dataclassesr   typingr   
sqlalchemyr   r   r   r   r	   sqlalchemy.ext.asyncior
   sqlalchemy.orm.attributesr   sqlalchemy.sqlr   sqlalchemy.sql.functionsr   srcr   
src.configr   src.dependenciesr   src.embedding_clientr   src.exceptionsr   src.telemetry.eventsr   src.utils.typesr   src.vector_storer   r   r   	getLoggerr8   r   r   r   r`   r   r#   boolr%   r(   r<   rP   rF   rV   rX   rY   rf   rj   tupler   r   r   r   r   r   r   r   r?   r&   r$   <module>r     s       ! ! ! ! ! !       8 8 8 8 8 8 8 8 8 8 8 8 8 8 / / / / / / ; ; ; ; ; ; ( ( ( ( ( ( ) ) ) ) ) )             ' ' ' ' ' ' 1 1 1 1 1 1 + + + + + + 5 5 5 5 5 5 2 2 2 2 2 2 Q Q Q Q Q Q Q Q Q Q		8	$	$  %( "  "x!"---'(9D(@A4    & & & & & & & &0 0( ((( 
&/( ( ( (D 0( ((( 
&
!"( ( ( (@

FO$
 

 
 
 
*

V,-
 

 
 
 
,{&{&FO${& '{& 38_	{& {& {& {&|W&W&V,-W& 'W& 38_	W& W& W& W&x 0   		   B&" 
   *&" 
   *&" 
   2"	   (1/D 1 1 1 1 1 1r&   