import sys from typing import ( Any, Awaitable, Callable, Dict, Iterable, Optional, Tuple, Type, Union, ) if sys.version_info >= (3, 8): from typing import Literal, Protocol, TypedDict else: from typing_extensions import Literal, Protocol, TypedDict if sys.version_info >= (3, 11): from typing import NotRequired else: from typing_extensions import NotRequired __all__ = ( "ASGIVersions", "HTTPScope", "WebSocketScope", "LifespanScope", "WWWScope", "Scope", "HTTPRequestEvent", "HTTPResponseStartEvent", "HTTPResponseBodyEvent", "HTTPResponseTrailersEvent", "HTTPServerPushEvent", "HTTPDisconnectEvent", "WebSocketConnectEvent", "WebSocketAcceptEvent", "WebSocketReceiveEvent", "WebSocketSendEvent", "WebSocketResponseStartEvent", "WebSocketResponseBodyEvent", "WebSocketDisconnectEvent", "WebSocketCloseEvent", "LifespanStartupEvent", "LifespanShutdownEvent", "LifespanStartupCompleteEvent", "LifespanStartupFailedEvent", "LifespanShutdownCompleteEvent", "LifespanShutdownFailedEvent", "ASGIReceiveEvent", "ASGISendEvent", "ASGIReceiveCallable", "ASGISendCallable", "ASGI2Protocol", "ASGI2Application", "ASGI3Application", "ASGIApplication", ) class ASGIVersions(TypedDict): spec_version: str version: Union[Literal["2.0"], Literal["3.0"]] class HTTPScope(TypedDict): type: Literal["http"] asgi: ASGIVersions http_version: str method: str scheme: str path: str raw_path: bytes query_string: bytes root_path: str headers: Iterable[Tuple[bytes, bytes]] client: Optional[Tuple[str, int]] server: Optional[Tuple[str, Optional[int]]] state: NotRequired[Dict[str, Any]] extensions: Optional[Dict[str, Dict[object, object]]] class WebSocketScope(TypedDict): type: Literal["websocket"] asgi: ASGIVersions http_version: str scheme: str path: str raw_path: bytes query_string: bytes root_path: str headers: Iterable[Tuple[bytes, bytes]] client: Optional[Tuple[str, int]] server: Optional[Tuple[str, Optional[int]]] subprotocols: Iterable[str] state: NotRequired[Dict[str, Any]] extensions: Optional[Dict[str, Dict[object, object]]] class LifespanScope(TypedDict): type: Literal["lifespan"] asgi: ASGIVersions state: NotRequired[Dict[str, Any]] WWWScope = Union[HTTPScope, WebSocketScope] Scope = Union[HTTPScope, WebSocketScope, LifespanScope] class HTTPRequestEvent(TypedDict): type: Literal["http.request"] body: bytes more_body: bool class HTTPResponseDebugEvent(TypedDict): type: Literal["http.response.debug"] info: Dict[str, object] class HTTPResponseStartEvent(TypedDict): type: Literal["http.response.start"] status: int headers: Iterable[Tuple[bytes, bytes]] trailers: bool class HTTPResponseBodyEvent(TypedDict): type: Literal["http.response.body"] body: bytes more_body: bool class HTTPResponseTrailersEvent(TypedDict): type: Literal["http.response.trailers"] headers: Iterable[Tuple[bytes, bytes]] more_trailers: bool class HTTPServerPushEvent(TypedDict): type: Literal["http.response.push"] path: str headers: Iterable[Tuple[bytes, bytes]] class HTTPDisconnectEvent(TypedDict): type: Literal["http.disconnect"] class WebSocketConnectEvent(TypedDict): type: Literal["websocket.connect"] class WebSocketAcceptEvent(TypedDict): type: Literal["websocket.accept"] subprotocol: Optional[str] headers: Iterable[Tuple[bytes, bytes]] class WebSocketReceiveEvent(TypedDict): type: Literal["websocket.receive"] bytes: Optional[bytes] text: Optional[str] class WebSocketSendEvent(TypedDict): type: Literal["websocket.send"] bytes: Optional[bytes] text: Optional[str] class WebSocketResponseStartEvent(TypedDict): type: Literal["websocket.http.response.start"] status: int headers: Iterable[Tuple[bytes, bytes]] class WebSocketResponseBodyEvent(TypedDict): type: Literal["websocket.http.response.body"] body: bytes more_body: bool class WebSocketDisconnectEvent(TypedDict): type: Literal["websocket.disconnect"] code: int class WebSocketCloseEvent(TypedDict): type: Literal["websocket.close"] code: int reason: Optional[str] class LifespanStartupEvent(TypedDict): type: Literal["lifespan.startup"] class LifespanShutdownEvent(TypedDict): type: Literal["lifespan.shutdown"] class LifespanStartupCompleteEvent(TypedDict): type: Literal["lifespan.startup.complete"] class LifespanStartupFailedEvent(TypedDict): type: Literal["lifespan.startup.failed"] message: str class LifespanShutdownCompleteEvent(TypedDict): type: Literal["lifespan.shutdown.complete"] class LifespanShutdownFailedEvent(TypedDict): type: Literal["lifespan.shutdown.failed"] message: str ASGIReceiveEvent = Union[ HTTPRequestEvent, HTTPDisconnectEvent, WebSocketConnectEvent, WebSocketReceiveEvent, WebSocketDisconnectEvent, LifespanStartupEvent, LifespanShutdownEvent, ] ASGISendEvent = Union[ HTTPResponseStartEvent, HTTPResponseBodyEvent, HTTPResponseTrailersEvent, HTTPServerPushEvent, HTTPDisconnectEvent, WebSocketAcceptEvent, WebSocketSendEvent, WebSocketResponseStartEvent, WebSocketResponseBodyEvent, WebSocketCloseEvent, LifespanStartupCompleteEvent, LifespanStartupFailedEvent, LifespanShutdownCompleteEvent, LifespanShutdownFailedEvent, ] ASGIReceiveCallable = Callable[[], Awaitable[ASGIReceiveEvent]] ASGISendCallable = Callable[[ASGISendEvent], Awaitable[None]] class ASGI2Protocol(Protocol): def __init__(self, scope: Scope) -> None: ... async def __call__( self, receive: ASGIReceiveCallable, send: ASGISendCallable ) -> None: ... ASGI2Application = Type[ASGI2Protocol] ASGI3Application = Callable[ [ Scope, ASGIReceiveCallable, ASGISendCallable, ], Awaitable[None], ] ASGIApplication = Union[ASGI2Application, ASGI3Application]