
    yjbM                     z   U d Z ddlZddlZddlZddlZddlmZ ddlmZm	Z	 ddl
Z
ddlmZ ddlmZ ddlmZ erddlmZ  ej        e          Zdd	d
ddededz  defdZdedefdZ G d d          Zdaedz  ed<   dedz  fdZ	 	 	 	 	 	 	 	 d&dedz  de eef         dz  de!de"d e!d!e!d"e!d#edefd$Z#d'd%Z$dS )(a/  
Buffered CloudEvents HTTP emitter with retry logic.

This module provides a TelemetryEmitter class that:
- Buffers events in memory for efficient batching
- Sends events as CloudEvents to a configurable HTTP endpoint
- Implements exponential backoff retry logic
- Supports graceful shutdown with flush
    N)deque)TYPE_CHECKINGAny)to_json)
CloudEvent)HONCHO_VERSION)	BaseEventevent_ideventr	   rater   returnc                   t          |t          t          z            sdS t          |          }|dk    rdS |dk    rdS t          | dd          }t          |t                    r|r|}n||}n|                                 }t                              t          |          dd         d          d	z  }|t          |d	z            k     S )
uk  Trace-coherent deterministic sampler for high-volume events.

    `rate` is typed as `object` (rather than `float`) because the caller
    reads it straight from `settings.TELEMETRY.HIGH_VOLUME_SAMPLE_RATE`,
    which in tests gets MagicMock'd. A MagicMock comparison against 1.0
    raises TypeError, so we validate at the boundary and fall back to
    passthrough on anything non-numeric.

    When the event carries a `run_id`, sampling decisions hash on that id —
    so every event in an agent run either passes or fails the sampler, and
    join queries downstream don't see half-traces. Events without `run_id`
    (summarizer, deriver — non-agentic call sites) sample independently per
    event using the event's deterministic id. Callers that have already
    computed `event.generate_id()` can pass it as `event_id` to avoid the
    redundant sha256 hash.
    T      ?g        Frun_idN   big'  )
isinstanceintfloatgetattrstrgenerate_id
from_bytes_stable_hash)r   r   r   rate_fr   keybuckets          =/DATA/AppData/hermes/projects/honcho/src/telemetry/emitter.py_should_sampler!      s    & dC%K(( t4[[F}}t}}uUHd++F&# "6 "		!!^^L--bqb1599FBFC((((    valuec                 ~    dd l }|                    |                     d                                                    S )Nr   zutf-8)hashlibsha256encodedigest)r#   r%   s     r    r   r   D   s5    NNN>>%,,w//0077999r"   c                      e Zd ZU dZedz  ed<   eeef         ed<   eed<   eed<   eed<   eed<   eed	<   e	ed
<   e
e         ed<   ej        d         dz  ed<   ej        dz  ed<   e	ed<   ej        ed<   e	ed<   eej        d                  ed<   	 	 	 	 	 	 	 	 d&dedz  deeef         dz  dedededed	ed
e	fdZd'dZd'dZd(dZd'd Zd!ee         de	fd"Zd'd#Zedefd$            Zede	fd%            ZdS ))TelemetryEmittera0  Buffered, async CloudEvents emitter with retry logic.

    This emitter queues events and sends them in batches to reduce
    network overhead. It supports automatic periodic flushing and
    exponential backoff retry on failure.

    Usage:
        emitter = TelemetryEmitter(
            endpoint="https://telemetry.example.com/v1/events",
            headers={"X-API-Key": "..."},
        )
        await emitter.start()

        # Queue events (non-blocking)
        emitter.emit(my_event)

        # Graceful shutdown
        await emitter.shutdown()
    Nendpointheaders
batch_sizeflush_intervalflush_thresholdmax_retriesmax_buffer_sizeenabled_buffer_flush_task_client_running_lock_capacity_warning_active_pending_flush_tasksd   r   2      r   Tflush_interval_secondsc	                 :   || _         |pi | _        || _        || _        || _        || _        || _        |o|du| _        t          |          | _	        d| _
        d| _        d| _        t          j                    | _        d| _        t#                      | _        dS )a&  Initialize the telemetry emitter.

        Args:
            endpoint: CloudEvents HTTP endpoint URL
            headers: Optional HTTP headers for authentication
            batch_size: Maximum events per batch
            flush_interval_seconds: How often to flush the buffer
            flush_threshold: Trigger flush when buffer reaches this size
            max_retries: Maximum retry attempts on failure
            max_buffer_size: Maximum events to buffer (oldest dropped if exceeded)
            enabled: Whether emission is enabled
        N)maxlenF)r+   r,   r-   r.   r/   r0   r1   r2   r   r3   r4   r5   r6   asyncioLockr7   r8   setr9   )	selfr+   r,   r-   r=   r/   r0   r1   r2   s	            r    __init__zTelemetryEmitter.__init__o   s    . !}"$4.&.784#7O444\^^
(-%$'EE!!!r"   r   c                 <  K   | j         st                              d           dS t          j        dddi| j                  | _        d| _        t          j	        | 
                                          | _        t                              d| j                   dS )	zqStart the emitter background tasks.

        Creates the HTTP client and starts the periodic flush task.
        z4Telemetry emitter disabled (no endpoint or disabled)Ng      >@Content-Typezapplication/cloudevents+json)timeoutr,   Tz'Telemetry emitter started, endpoint: %s)r2   loggerinfohttpxAsyncClientr,   r5   r6   r@   create_task_periodic_flushr4   r+   rC   s    r    startzTelemetryEmitter.start   s      
 | 	KKNOOOF( >,
 
 
 ".t/C/C/E/EFF=t}MMMMMr"   c                   K   | j         sdS d| _        | j        \| j                                         t	          j        t          j                  5  | j         d{V  ddd           n# 1 swxY w Y   t          | j	                  }|rt          j
        |ddi d{V  |                                  d{V  | j        &| j                                         d{V  d| _        t                              d           dS )u
  Gracefully shutdown the emitter.

        Stops the periodic flush task, drains any in-flight threshold
        flushes, flushes remaining events, and closes the HTTP client.

        Threshold flushes are spawned from emit() and pop their batch
        under lock before releasing it for the HTTP send. If we don't
        await those tasks first, the final flush() can see an empty
        buffer and return while the in-flight task is still mid-send —
        closing the HTTP client then orphans that batch.
        NFreturn_exceptionsTz#Telemetry emitter shutdown complete)r2   r6   r4   cancel
contextlibsuppressr@   CancelledErrorlistr9   gatherflushr5   acloserH   rI   )rC   pendings     r    shutdownzTelemetryEmitter.shutdown   sv      | 	F '##%%%$W%;<< ' '&&&&&&&&' ' ' ' ' ' ' ' ' ' ' ' ' ' '
 t011 	C.'BTBBBBBBBBB jjll <#,%%'''''''''DL9:::::s   A++A/2A/r   r	   c           	      *   | j         sdS ddlm} ddlm} d}|                                dk    rt          |dd          }t          |t                    ot          |          }|s|
                                }t          ||j        j        |          s*|                    |                                           dS ||
                                }|j        j        }d	|                                 }|rd	| d
|                                 }|||                                |j                                        d|                                 d|                                 d}	|                    d          }
t,          |
d<   t/          | j                  | j        k    }t5          |	|
          }|r|                    d           | j                            |           t/          | j                  }|                    |                                           |                    |           t>                               d||           || j        z  }|dk    r4| j!        s,d| _!        t>          "                    d|dz  || j                   nd| _!        t>                               d|           || j#        k    r| j$        rt>                               d|           	 tK          j&                     tK          j'        | (                                          }| j)        *                    |           |+                    | j)        j,                   dS # tZ          $ r t>                               d           Y dS w xY wdS dS )a>  Queue an event for emission. Non-blocking.

        The event is converted to a CloudEvent and added to the buffer.
        If the buffer is full, the oldest events are dropped.
        Triggers an immediate flush when buffer reaches flush_threshold.

        Args:
            event: The Honcho event to emit
        Nr   )settingsprometheus_metricshigh_volumer   r
   )
event_typez/honcho//zhttps://honcho.dev/schemas/z/v)idsourcetypetime
dataschemajson)modehoncho_versionbuffer_fullreason)sizez!Queued event %s (buffer size: %d)g?Tz2Telemetry buffer at %.0f%% capacity (%d/%d events)r:   Fz(Event added to emitter (buffer size: %d)z"Triggering flush (buffer size: %d)zEemit() called outside an event loop; deferring flush to periodic task).r2   
src.configr]    src.telemetry.prometheus.metricsr_   volume_classr   r   r   boolr   r!   	TELEMETRYHIGH_VOLUME_SAMPLE_RATE"record_telemetry_event_sampled_outra   	NAMESPACEcategory	timestamp	isoformatschema_version
model_dumpr   lenr3   r1   r   record_telemetry_event_droppedappendrecord_telemetry_event_emittedset_telemetry_buffer_sizerH   debugr8   warningr/   r6   r@   get_running_looprL   rX   r9   addadd_done_callbackdiscardRuntimeError)rC   r   r]   r_   r   r   
has_run_id	namespacerd   
attributesbodywill_drop_oldestcloud_eventbuffer_sizecapacity_ratio
flush_tasks                   r    emitzTelemetryEmitter.emit   s    | 	F''''''GGGGGG(  $=00UHd33F#FC00AT&\\J / ,,..!":!   
 #EE$//11 F    ((**H &0	.ENN,,.. 	?>	>>ENN,<,<>>F $$&&O--//f8H8H8J8JffeNbNbNdNdff&
 &

  %//V/<<!/
 t|,,0DD !T22 	T==]=SSSK((($,''99UEUEUEWEW9XXX44+4FFF8(KPPP
 %t';;S  0 04-H"S((	   -2D)?MMM $...4=.LL={KKK(***$0>>
)--j999,,T-F-NOOOOO   [      /...s   2A2M& &$NNc                 n  K   | j         r| j        r| j        dS 	 | j        4 d{V  | j        s	 ddd          d{V  dS g }| j        rct	          |          | j        k     rK|                    | j                                                   | j        rt	          |          | j        k     K	 ddd          d{V  n# 1 d{V swxY w Y   |sdS |                     |           d{V }|rddl	m
} | j        4 d{V  t          |          D ]O}t	          | j                  | j        k    r|                    d           | j                            |           P	 ddd          d{V  n# 1 d{V swxY w Y   t                              dt	          |                     dS )u  Flush buffered events to the endpoint.

        Sends events in batches up to batch_size. Uses exponential
        backoff retry on failure. Events are returned to the buffer
        on permanent failure.

        The lock is held only for buffer mutations (pop batch / restore on
        failure) — never across the HTTP send. A failing endpoint can spend
        tens of seconds in retry + backoff; keeping that out of the lock
        lets concurrent flushers make progress on disjoint batches.
        NTr   r^   send_failedrl   z5Failed to send batch of %d events, returned to buffer)r2   r3   r5   r7   r|   r-   r~   popleft_send_batchrp   r_   reversedr1   r}   
appendleftrH   r   )rC   batchsuccessr_   r   s        r    rX   zTelemetryEmitter.flushS  s=      | 	4< 	4<3GF	z 9 9 9 9 9 9 9 9| 9 9 9 9 9 9 9 9 9 9 9 9 9 9 +-l 9s5zzDO'C'CLL!5!5!7!7888 l 9s5zzDO'C'C'C	9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9   ,,U33333333G KKKKKKz 	3 	3 	3 	3 	3 	3 	3 	3 &e__ 3 3E4<((D,@@@*II#0 J    L++E22223		3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 NNGE

   Fs+   	CA,C
CC	A E<<
F	Fr   c                 p  K   | j         | j        dS t          | j                  D ]}	 t	          |          dk    rt          |d                   }n2d |D             }t          j        |                                          }| j         	                    | j        |t	          |          dk    rddind           d{V }|
                                 t                              d	t	          |          |j                    d
S # t          j        $ r/}t                              d|dz   | j        |           Y d}~nDd}~wt          j        $ r/}t                              d|dz   | j        |           Y d}~nd}~ww xY w|| j        dz
  k     rd|z  }t%          j        |           d{V  dS )zSend a batch of events to the endpoint with retry logic.

        Args:
            batch: List of CloudEvents to send

        Returns:
            True if successful, False if all retries failed
        NF   r   c                 P    g | ]#}t          j        t          |                    $S  )rh   loadsr   ).0es     r    
<listcomp>z0TelemetryEmitter._send_batch.<locals>.<listcomp>  s(    "I"I"Ia4:gajj#9#9"I"I"Ir"   rF   z"application/cloudevents-batch+json)contentr,   z$Sent batch of %d events (status: %d)Tz-HTTP error sending events (attempt %d/%d): %sz0Request error sending events (attempt %d/%d): %s   )r5   r+   ranger0   r|   r   rh   dumpsr'   postraise_for_statusrH   r   status_coderJ   HTTPStatusErrorr   RequestErrorr@   sleep)rC   r   attemptr   events_jsonresponser   delays           r    r   zTelemetryEmitter._send_batch  s@      <4=#85T-.. 0	+ 0	+G* u::??"58,,DD #J"I5"I"I"IK:k2299;;D!%!2!2M  u::>> ()MNN! "3 " "       ))+++:JJ(  
 tt(   CaK$	        %   FaK$	        )A---7
mE*********us$   CD

F%EF%E??Fc                 4  K   | j         r	 t          j        | j                   d{V  | j        r|                                  d{V  nC# t          j        $ r Y dS t          $ r%}t          	                    d|           Y d}~nd}~ww xY w| j         dS dS )z5Background task that periodically flushes the buffer.NzError in periodic flush: %s)
r6   r@   r   r.   r3   rX   rU   	ExceptionrH   	exception)rC   r   s     r    rM   z TelemetryEmitter._periodic_flush  s      m 	CCmD$7888888888< '**,,&&&&&&&)    C C C  !>BBBBBBBBC m 	C 	C 	C 	C 	Cs   A A B	B'BBc                 *    t          | j                  S )z-Return the current number of buffered events.)r|   r3   rN   s    r    r   zTelemetryEmitter.buffer_size  s     4<   r"   c                     | j         S )z&Return whether the emitter is running.)r6   rN   s    r    
is_runningzTelemetryEmitter.is_running  s     }r"   NNr:   r   r;   r<   r   Tr   N)r   r	   r   N)__name__
__module____qualname____doc__r   __annotations__dictr   r   rr   r   r   r@   TaskrJ   rK   rA   rB   rD   rO   r[   r   rX   rV   r   rM   propertyr   r   r   r"   r    r*   r*   J   s|         ( Dj#s(^OOOMMM:d#d****%%%%NNN<""""gl401111  $)-(+!$&* &**&* c3h$&&* 	&*
 !&&* &* &* &* &* &* &* &*PN N N N(%; %; %; %;N   B. . . .`>tJ'7 >D > > > >@
C 
C 
C 
C !S ! ! ! X! D    X  r"   r*   _emitterc                      t           S )z*Get the global telemetry emitter instance.)r   r   r"   r    get_emitterr     s    Or"   r:   r   r;   r<   r   Tr+   r,   r-   r=   r/   r0   r1   r2   c           
         K   t          | |||||||          at                                           d{V  t          S )a)  Initialize and start the global telemetry emitter.

    Args:
        endpoint: CloudEvents HTTP endpoint URL
        headers: Optional HTTP headers for authentication
        batch_size: Maximum events per batch
        flush_interval_seconds: How often to flush the buffer
        flush_threshold: Trigger flush when buffer reaches this size
        max_retries: Maximum retry attempts on failure
        max_buffer_size: Maximum events to buffer
        enabled: Whether emission is enabled

    Returns:
        The initialized emitter instance
    r+   r,   r-   r=   r/   r0   r1   r2   N)r*   r   rO   r   s           r    initialize_emitterr     s]      6  5''	 	 	H ..

Or"   c                  ^   K   t           #t                                            d{V  da dS dS )z&Shutdown the global telemetry emitter.N)r   r[   r   r"   r    shutdown_emitterr     sC       !!!!!!!!! r"   r   r   )%r   r@   rS   rh   loggingcollectionsr   typingr   r   rJ   cloudevents.conversionr   cloudevents.httpr   src._versionr   src.telemetry.events.baser	   	getLoggerr   rH   objectr   rr   r!   bytesr   r*   r   r   r   r   r   r   r   r   r   r"   r    <module>r      sa                  % % % % % % % %  * * * * * * ' ' ' ' ' ' ' ' ' ' ' ' 4333333		8	$	$ AE#) #) #)#)$#)36:#)	#) #) #) #)L: : : : : :M M M M M M M Mb %)
T
! ( ( (%,      %)$' & &Dj&#s(^d"& & "	&
 & & & & & & & &R     r"   