o
    
j7                     @  s   d Z ddlmZ ddlZddlZddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlZddlmZ dd	lmZ dd
lmZ ddlmZmZmZ ddlmZ ddlmZmZmZ ddlm Z  ddl!m"Z"m#Z#m$Z$ e%e&Z'G dd dZ(dS )z/StreamableHTTP Session Manager for MCP servers.    )annotationsN)AsyncIterator)
HTTPStatus)Any)uuid4)
TaskStatus)Request)Response)ReceiveScopeSend)Server)MCP_SESSION_ID_HEADER
EventStoreStreamableHTTPServerTransport)TransportSecuritySettings)INVALID_REQUEST	ErrorDataJSONRPCErrorc                   @  sV   e Zd ZdZ						d$d%ddZejd&ddZd'ddZd'd d!Z	d'd"d#Z
dS )(StreamableHTTPSessionManagera  
    Manages StreamableHTTP sessions with optional resumability via event store.

    This class abstracts away the complexity of session management, event storage,
    and request handling for StreamableHTTP transports. It handles:

    1. Session tracking for clients
    2. Resumability via an optional event store
    3. Connection management and lifecycle
    4. Request handling and transport setup
    5. Idle session cleanup via optional timeout

    Important: Only one StreamableHTTPSessionManager instance should be created
    per application. The instance cannot be reused after its run() context has
    completed. If you need to restart the manager, create a new instance.

    Args:
        app: The MCP server instance
        event_store: Optional event store for resumability support. If provided, enables resumable connections
            where clients can reconnect and receive missed events. If None, sessions are still tracked but not
            resumable.
        json_response: Whether to use JSON responses instead of SSE streams
        stateless: If True, creates a completely fresh transport for each request with no session tracking or
            state persistence between requests.
        security_settings: Optional transport security settings.
        retry_interval: Retry interval in milliseconds to suggest to clients in SSE retry field. Used for SSE
            polling behavior.
        session_idle_timeout: Optional idle timeout in seconds for stateful sessions. If set, sessions that
            receive no HTTP requests for this duration will be automatically terminated and removed. When
            retry_interval is also configured, ensure the idle timeout comfortably exceeds the retry interval to
            avoid reaping sessions during normal SSE polling gaps. Default is None (no timeout). A value of 1800
            (30 minutes) is recommended for most deployments.
    NFappMCPServer[Any, Any]event_storeEventStore | Nonejson_responsebool	statelesssecurity_settings TransportSecuritySettings | Noneretry_interval
int | Nonesession_idle_timeoutfloat | Nonec                 C  s   |d ur|dkrt d|r|d urtd|| _|| _|| _|| _|| _|| _|| _t	
 | _i | _d | _t	
 | _d| _d S )Nr   z9session_idle_timeout must be a positive number of secondsz7session_idle_timeout is not supported in stateless modeF)
ValueErrorRuntimeErrorr   r   r   r   r   r   r!   anyioLock_session_creation_lock_server_instances_task_group	_run_lock_has_started)selfr   r   r   r   r   r   r!    r-   U/home/kuhnn/.local/lib/python3.10/site-packages/mcp/server/streamable_http_manager.py__init__A   s    



z%StreamableHTTPSessionManager.__init__returnAsyncIterator[None]c              
   C s   | j 4 I dH  | jrtdd| _W d  I dH  n1 I dH s#w   Y  t 4 I dH ?}|| _td zdV  W td |j	  d| _| j
  ntd |j	  d| _| j
  w W d  I dH  dS 1 I dH suw   Y  dS )aw  
        Run the session manager with proper lifecycle management.

        This creates and manages the task group for all session operations.

        Important: This method can only be called once per instance. The same
        StreamableHTTPSessionManager instance cannot be reused after this
        context manager exits. Create a new instance if you need to restart.

        Use this in the lifespan context manager of your Starlette app:

        @contextlib.asynccontextmanager
        async def lifespan(app: Starlette) -> AsyncIterator[None]:
            async with session_manager.run():
                yield
        NzyStreamableHTTPSessionManager .run() can only be called once per instance. Create a new instance if you need to run again.Tz&StreamableHTTP session manager startedz,StreamableHTTP session manager shutting down)r*   r+   r$   r%   create_task_groupr)   loggerinfocancel_scopecancelr(   clear)r,   tgr-   r-   r.   runb   s,   (




.z StreamableHTTPSessionManager.runscoper   receiver
   sendr   Nonec                   sJ   | j du r
td| jr| |||I dH  dS | |||I dH  dS )a  
        Process ASGI request with proper session handling and transport setup.

        Dispatches to the appropriate handler based on stateless mode.

        Args:
            scope: ASGI scope
            receive: ASGI receive function
            send: ASGI send function
        Nz6Task group is not initialized. Make sure to use run().)r)   r$   r   _handle_stateless_request_handle_stateful_request)r,   r:   r;   r<   r-   r-   r.   handle_request   s   
z+StreamableHTTPSessionManager.handle_requestc                   s~   t d tdjdjd tjdd	 fdd}jdus#J j|I dH   	|||I dH   
 I dH  dS )
z
        Process request in stateless mode - creating a new transport for each request.

        Args:
            scope: ASGI scope
            receive: ASGI receive function
            send: ASGI send function
        z7Stateless mode: Creating new transport for this requestN)mcp_session_idis_json_response_enabledr   r   task_statusrD   TaskStatus[None]c              	     s      4 I d H @}|\}}|   zjj||j ddI d H  W n ty2   td Y nw W d   I d H  d S W d   I d H  d S 1 I d H sOw   Y  d S )NTr   zStateless session crashed)connectstartedr   r9   create_initialization_options	Exceptionr3   	exception)rD   streamsread_streamwrite_streamhttp_transportr,   r-   r.   run_stateless_server   s$   .zTStreamableHTTPSessionManager._handle_stateless_request.<locals>.run_stateless_server)rD   rE   )r3   debugr   r   r   r%   TASK_STATUS_IGNOREDr)   startr@   	terminate)r,   r:   r;   r<   rQ   r-   rO   r.   r>      s   
z6StreamableHTTPSessionManager._handle_stateless_requestc              	     s  t ||}|jt}|dur>|jv r>j| }td |jdur2jdur2t	
 j |j_||||I dH  dS |du rtd j4 I dH X t j}t|jjjjd  jdusgJ  j j< td|  t	jdd fdd}jdusJ j|I dH   |||I dH  W d  I dH  dS 1 I dH sw   Y  dS tddttddd}	t|	jdddtj dd}
|
|||I dH  dS )z
        Process request in stateful mode - maintaining session state between requests.

        Args:
            scope: ASGI scope
            receive: ASGI receive function
            send: ASGI send function
        Nz1Session already exists, handling request directlyzCreating new transport)rA   rB   r   r   r   z'Created new transport with session ID: rC   rD   rE   r0   r=   c                   s     4 I d H }|\}}|   zz[t }jd ur(t j |_| _| jj	||j
 ddI d H  W d    n1 sEw   Y  |jrm jd usTJ td j d j jd    I d H  W n ty   td j d Y nw W  jr jjv r jstd j d j j= n jr jjv r jstd j d j j= w W d   I d H  d S 1 I d H sw   Y  d S )NFrF   zSession z idle timeoutz crashedzCleaning up crashed session z from active instances.)rG   rH   r%   CancelScoper!   current_timedeadline
idle_scoper   r9   rI   cancelled_caughtrA   r3   r4   r(   poprU   rJ   rK   is_terminated)rD   rL   rM   rN   rY   rO   r-   r.   
run_server  sf   

.zIStreamableHTTPSessionManager._handle_stateful_request.<locals>.run_serverz2.0zserver-errorzSession not found)codemessage)jsonrpciderrorT)by_aliasexclude_nonezapplication/json)contentstatus_code
media_type)rD   rE   r0   r=   )!r   headersgetr   r(   r3   rR   rY   r!   r%   rW   rX   r@   r'   r   hexr   r   r   r   r   rA   r4   rS   r)   rT   r   r   r   r	   model_dump_jsonr   	NOT_FOUND)r,   r:   r;   r<   requestrequest_mcp_session_id	transportnew_session_idr]   error_responseresponser-   rO   r.   r?      sV   



+.Dz5StreamableHTTPSessionManager._handle_stateful_request)NFFNNN)r   r   r   r   r   r   r   r   r   r   r   r    r!   r"   )r0   r1   )r:   r   r;   r
   r<   r   r0   r=   )__name__
__module____qualname____doc__r/   
contextlibasynccontextmanagerr9   r@   r>   r?   r-   r-   r-   r.   r      s    %!
(
1r   ))rv   
__future__r   rw   loggingcollections.abcr   httpr   typingr   uuidr   r%   	anyio.abcr   starlette.requestsr   starlette.responsesr	   starlette.typesr
   r   r   mcp.server.lowlevel.serverr   	MCPServermcp.server.streamable_httpr   r   r   mcp.server.transport_securityr   	mcp.typesr   r   r   	getLoggerrs   r3   r   r-   r-   r-   r.   <module>   s&    
