GIF89a=( õ' 7IAXKgNgYvYx\%wh…hŽth%ˆs%—x¨}9®Œ©€&©‰%¶†(¹–.¹5·œD¹&Çš)ÇŸ5ǘ;Í£*È¡&Õ²)ׯ7×µ<Ñ»4ï°3ø‘HÖ§KͯT÷¨Yÿšqÿ»qÿÔFØ !ù ' !ÿ NETSCAPE2.0 , =( þÀ“pH,È¤rÉl:ŸÐ¨tJ­Z¯Ø¬vËíz¿à°xL.›Ïè´zÍn»ßð¸|N¯Ûïø¼~Ïïûÿ€‚ƒ„…†‡ˆ‰Š‹ŒŽ‘’“”•–—˜™š›œžŸ ¡¢£¤¥¦§gª«ªE¯°¨¬ª±²Œ¹º¹E¾­”´ÂB¶¯ §Åȸ»ÑD¾¿Á•ÄÅ®° ÝH¾ÒLÀÆDÙ«D¶BÝïðÀ¾DÑÑÔTÌÍíH òGö¨A RÎڐ |¥ ٭&ºìE8œ¹kGÔAÞpx­a¶­ã R2XB®åE8I€Õ6Xî:vT)äžþÀq¦è³¥ì仕F~%xñ  4#ZÔ‰O|-4Bs‘X:= QÉ œš lºÒyXJŠGȦ|s hÏíK–3l7·B|¥$'7Jީܪ‰‡àá”Dæn=Pƒ ¤Òëí‰`䌨ljóá¯Éüv>á–Á¼5 ½.69ûϸd«­ºÀûnlv©‹ªîf{¬ÜãPbŸ  l5‘ޝpß ´ ˜3aÅùäI«O’ý·‘áÞ‡˜¾Æ‚ÙÏiÇÿ‹Àƒ #öó)pâš Þ½ ‘Ý{ó)vmÞü%D~ 6f s}ŃƒDØW Eþ`‡þ À…L8xá†ç˜{)x`X/> Ì}mø‚–RØ‘*|`D=‚Ø_ ^ð5 !_…'aä“OÚ—7âcð`D”Cx`ÝÂ¥ä‹éY¹—F¼¤¥Š?¡Õ™ n@`} lď’ÄÉ@4>ñd œ à‘vÒxNÃ×™@žd=ˆgsžG±æ ´²æud &p8Qñ)ˆ«lXD©øÜéAžHìySun jª×k*D¤LH] †¦§C™Jä–´Xb~ʪwStŽ6K,°£qÁœ:9ت:¨þªl¨@¡`‚ûÚ ».Û¬¯t‹ÆSÉ[:°=Š‹„‘Nåû”Ìî{¿ÂA ‡Rà›ÀÙ6úë°Ÿð0Ä_ ½;ÃϱîÉì^ÇÛÇ#Ëë¼ôº!±Ä˜íUîÅÇ;0L1óÁµö«p% AÀºU̬ݵ¼á%霼€‡¯Á~`ÏG¯»À× ­²± =4ªnpð3¾¤³¯­ü¾¦îuÙuµÙ®|%2ÊIÿür¦#0·ÔJ``8È@S@5ê¢ ö×Þ^`8EÜ]ý.뜃Âç 7 ú ȉÞj œ½Dç zý¸iþœÑÙûÄë!ˆÞÀl§Ïw‹*DçI€nEX¯¬¼ &A¬Go¼QföõFç°¯;é¦÷îŽêJ°îúôF5¡ÌQ|îúöXªæ»TÁÏyñêï]ê² o óÎC=öõ›ÒÓPB@ D×½œä(>èCÂxŽ`±«Ÿ–JЀ»Û á¤±p+eE0`ëŽ`A Ú/NE€Ø†À9‚@¤à H½7”à‡%B‰`Àl*ƒó‘–‡8 2ñ%¸ —€:Ù1Á‰E¸àux%nP1ð!‘ðC)¾P81lÑɸF#ˆ€{´âé°ÈB„0>±û °b¡Š´±O‚3È–Ù()yRpbµ¨E.Z‘D8ÊH@% òŒx+%Ù˜Æcü »¸˜fõ¬b·d`Fê™8èXH"ÉÈ-±|1Ô6iI, 2““¬$+](A*jÐ QTÂo‰.ÛU슬Œã„Ž`¯SN¡–¶Äåyše¯ª’­¬‚´b¦Éož œ)åyâ@Ì®3 ÎtT̉°&Ø+žLÀf"Ø-|žçÔ>‡Ðv¦Ðžì\‚ Q1)Ž@Žh#aP72”ˆ™¨$‚ !ù " , =( …7IAXG]KgNgYvYxR"k\%w]'}hŽth%ˆg+ˆs%—r.—m3šx3˜x¨}9®€&©€+¨‡7§‰%¶†(¹–.¹œD¹&ǘ;Í•&ײ)×»4ïÌ6ò§KÍ þ@‘pH,È¤rÉl:ŸÐ¨tJ­Z¯Ø¬vËíz¿à°xL.›Ïè´zÍn»ßð¸|N¯Ûïø¼~Ïïûÿ€‚ƒ„…†‡ˆ‰Š‹ŒŽ‘’“”•–—˜™š›œžŸ ¡¢£¤¥¦§g «¬ E ±± ¨­¶°ººE Á´”·®C¬²§Ç¶Œ»ÓDÃÕƷ¯Ê±H½ºM×ÁGÚ¬D¶BËÁ½î½DÓôTÏÛßîG»ôõC×CÌ l&âž:'òtU³6ɹ#·Ø)€'Ü.6±&ëÍÈ» K(8p0N?!æ2"ÛˆNIJX>R¼ÐO‚M '¡¨2¸*Ÿþ>#n↠å@‚<[:¡Iïf’ ¤TÚ˘CdbÜÙ“[«ŽEú5MBo¤×@€`@„€Êt W-3 ¶Ÿ¡BíêäjIÝ…Eò9[T…$íêﯧ„…•s»Óȳ¹€ÅÚdc®UUρ#±Ùïldj?´í¼²`\ŽÁðÞu|3'ÖŒ]ë6 ¶S#²‡˜FKLÈ *N E´‘áäŠ$˜›eÄYD„ºq«.è촁ƒs \-ÔjA 9²õ÷å- üúM[Âx(ís÷ì®x€|í¡Ù’p¦‚ ŽkÛTÇDpE@WÜ ²Ç]kŠ1¨ þ€·Yb ÓÁ‰l°*n0 ç™—žzBdОu¾7ĉBl€â‰-ºx~|UåU‰  h*Hœ|e"#"?vpÄiŠe6^ˆ„+qâŠm8 #VÇá ‘å–ÄV„œ|Аè•m"сœn|@›U¶ÆÎž—Špb¥G¨ED”€±Úê2FÌIç? >Éxå Œ± ¡¤„%‘žjŸ‘ꄯ<Ìaà9ijÐ2˜D¦È&›†Z`‚å]wþ¼Â:ç6àB¤7eFJ|õÒ§Õ,¨äàFÇ®cS·Ê¶+B°,‘Þ˜ºNûãØ>PADÌHD¹æž«ÄÀnÌ¥}­#Ë’ë QÀÉSÌÂÇ2ÌXÀ{æk²lQÁ2«ÊðÀ¯w|2Í h‹ÄÂG€,m¾¶ë3ÐÙ6-´ÅE¬L°ÆIij*K½ÀÇqï`DwVÍQXœÚÔpeœ±¬Ñ q˜§Tœ½µƒ°Œìu Â<¶aØ*At¯lmEØ ü ôÛN[P1ÔÛ¦­±$ÜÆ@`ùåDpy¶yXvCAyåB`ŽD¶ 0QwG#¯ æš[^Äþ $ÀÓÝǦ{„L™[±úKÄgÌ;ï£S~¹ìGX.ôgoT.»åˆ°ùŸûù¡?1zö¦Ÿž:ÅgÁ|ìL¹ „®£œŠ‚à0œ]PÁ^p F<"•ç?!,ñ‡N4—…PÄ Á„ö¨Û:Tè@hÀ‹%táÿ:ø-žI<`þ‹p I….)^ 40D#p@ƒj4–؀:²‰1Øâr˜¼F2oW¼#Z†;$Q q” ‘ ÂK¦ñNl#29 !’F@¥Bh·ᏀL!—XFóLH‘Kh¤.«hE&JòG¨¥<™WN!€ÑÙÚˆY„@†>Œž19J" 2,/ &.GXB%ÌRÈ9B6¹W]’î×ÔW¥’IÎ$ ñ‹ÓŒE8YÆ ¼³™ñA5“à®Q.aŸB€&Ø©³ JÁ—! ¦t)K%tœ-¦JF bòNMxLôþ)ÐR¸Ð™‘ èÝ6‘O!THÌ„HÛ ‰ !ù ) , =( …AXKgNgYvYxR"k\%wh…hŽh%ˆg+ˆs%—r.—x3˜x¨}9®€&©€+¨Œ,©‡7§‰%¶†(¹–.¹5·&Çš)ǘ;Í•&×£*Ȳ)ׯ7×»4ï°3øÌ6ò‘HÖ§KÍ»Hó¯T÷¨Yÿ»qÿÇhÿ þÀ”pH,È¤rÉl:ŸÐ¨tJ­Z¯Ø¬vËíz¿à°xL.›Ïè´zÍn»ßð¸|N¯Ûïø¼~Ïïûÿ€‚ƒ„…†‡ˆ‰Š‹ŒŽ‘’“”•–—˜™š›œžŸ ¡¢£¤¥¦§g ª« E$±²¨ª­ · °²½$E$ÂÕ««D· Í ¿¦Ç¶¸ÌŒ¾³CÃÅÆ E ééH½MÛÂGâªD­ çBêêϾD²ÒaÀà€Š1r­ðÓ¤ ÔožzU!L˜C'¾yW½UGtäÇïÙllê0×àÂuGþ)AÀs[þ·xì ÁxO%ƒûX2ó—  P£n›R/¡ÑšHše+êDm?# —‘Ç£6¡8íJ¡ŸâDiäªM¥Ö„ôj“¬¹£5oQ7°- <‡ *´lãÓŒ2r/a!l)dÈ A™ÈE¢ôÔ͆…ð ;Ö˜c ¡%ß‚’Ùˆâ¸b½—pe~C"BíëÚHïeF2§æŠ8qb t_`urŠeü wÅu3êæPv§h•"ß`íÍxçLĹÜÖ3á  ~Öº“®›¸ÏMDfJÙ °„ÛµáWõ%§œ‚à©–‚X ÓØ)@®Ñ›Eþ´wëuÅSxb8y\mÖzœ¥§ZbºE—ÂLªÌw!y(>¡™wú=Ç|ÅÝs¢d €CÁW)HÜcC$€L Ä7„r.á\{)@ð` @ äXÈ$PD” `šaG:§æˆOˆ72EÐamn]ù"ŒcÊxÑŒ° &dR8`g«iÙŸLR!¦P …d’ä¡“¦ðÎTƒ¦ià|À _ ¥ Qi#¦Šg›Æ ›noMµ ›V ã£)p ç£ÎW…š=Âeªk§†j„ ´®1ß²sÉxéW«jšl|0¯B0Û, \jÛ´›6±¬¶C ÛíWþï|ëÙ‹¸ñzĸV {ì;Ýñn¼òVˆm³I¼³.Ðã¤PN¥ ²µ¼„µCã+¹ÍByî£Ñ¾HŸ›ëê 7ìYÆFTk¨SaoaY$Dµœìï¿Ã29RÈkt Çïfñ ÇÒ:ÀÐSp¹3ÇI¨â¥DZÄ ü9Ïýögñ½­uÔ*3)O‘˜Ö[_hv ,àî×Et Ÿé¶BH€ Õ[ü±64M@ÔSÌM7dÐl5-ÄÙU܍´©zߌ3Ô€3ž„ „ ¶ÛPô½5×g› êÚ˜kN„Ý…0Îj4€Ìë°“#{þÕ3S2çKÜ'ợlø¼Ú2K{° {Û¶?žm𸧠ËI¼nEò='êüóºè^üæÃ_Û=°óž‚ì#Oý¿Í'¡½áo..ÏYìnüñCœO±Áa¿¢Kô½o,üÄËbö²çºíï{ËC Ú— "”Ï{ËK ÍÒw„õ±Oz dÕ¨à:$ ƒô—«v»] A#ð «€¿šéz)Rx׿ˆ¥‚d``èw-îyÏf×K!ð€þ­Ð|ìPľ„=Ì`ý(f” 'Pa ¥ÐBJa%Ðâf§„%Š¡}FàáÝ×6>ÉäŠG"éŽè=ø!oа^FP¼Ø©Q„ÀCÙÁ`(Ž\ÄÝ® ©Â$<n@dÄ E#ììUÒI! ‚#lù‹`k¦ÐÇ'Rró’ZýNBÈMF Í[¤+‹ðɈ-áwj¨¥þ8¾rá ,VÂh„"|½œ=×G_¦Ñ™EØ 0i*%̲˜Æda0mV‚k¾)›;„&6 p>ÓjK “¦Ç# âDÂ:ûc?:R Ó¬fÞéI-Ì“•Ã<ä=™Ï7˜3œ¨˜c2ŒW ,ˆ”8(T™P‰F¡Jhç"‚ ; 403WebShell
403Webshell
Server IP : 104.21.83.152  /  Your IP : 216.73.216.195
Web Server : LiteSpeed
System : Linux premium229.web-hosting.com 4.18.0-553.45.1.lve.el8.x86_64 #1 SMP Wed Mar 26 12:08:09 UTC 2025 x86_64
User : akhalid ( 749)
PHP Version : 8.3.22
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/hc_python/lib64/python3.12/site-packages/sentry_sdk/integrations/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/hc_python/lib64/python3.12/site-packages/sentry_sdk/integrations/starlette.py
import asyncio
import functools
import warnings
from collections.abc import Set
from copy import deepcopy
from json import JSONDecodeError

import sentry_sdk
from sentry_sdk.consts import OP
from sentry_sdk.integrations import (
    DidNotEnable,
    Integration,
    _DEFAULT_FAILED_REQUEST_STATUS_CODES,
)
from sentry_sdk.integrations._wsgi_common import (
    DEFAULT_HTTP_METHODS_TO_CAPTURE,
    HttpCodeRangeContainer,
    _is_json_content_type,
    request_body_within_bounds,
)
from sentry_sdk.integrations.asgi import SentryAsgiMiddleware
from sentry_sdk.scope import should_send_default_pii
from sentry_sdk.tracing import (
    SOURCE_FOR_STYLE,
    TransactionSource,
)
from sentry_sdk.utils import (
    AnnotatedValue,
    capture_internal_exceptions,
    ensure_integration_enabled,
    event_from_exception,
    logger,
    parse_version,
    transaction_from_function,
)

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from typing import Any, Awaitable, Callable, Container, Dict, Optional, Tuple, Union

    from sentry_sdk._types import Event, HttpStatusCodeRange

try:
    import starlette  # type: ignore
    from starlette import __version__ as STARLETTE_VERSION
    from starlette.applications import Starlette  # type: ignore
    from starlette.datastructures import UploadFile  # type: ignore
    from starlette.middleware import Middleware  # type: ignore
    from starlette.middleware.authentication import (  # type: ignore
        AuthenticationMiddleware,
    )
    from starlette.requests import Request  # type: ignore
    from starlette.routing import Match  # type: ignore
    from starlette.types import ASGIApp, Receive, Scope as StarletteScope, Send  # type: ignore
except ImportError:
    raise DidNotEnable("Starlette is not installed")

try:
    # Starlette 0.20
    from starlette.middleware.exceptions import ExceptionMiddleware  # type: ignore
except ImportError:
    # Startlette 0.19.1
    from starlette.exceptions import ExceptionMiddleware  # type: ignore

try:
    # Optional dependency of Starlette to parse form data.
    try:
        # python-multipart 0.0.13 and later
        import python_multipart as multipart  # type: ignore
    except ImportError:
        # python-multipart 0.0.12 and earlier
        import multipart  # type: ignore
except ImportError:
    multipart = None


_DEFAULT_TRANSACTION_NAME = "generic Starlette request"

TRANSACTION_STYLE_VALUES = ("endpoint", "url")


class StarletteIntegration(Integration):
    identifier = "starlette"
    origin = f"auto.http.{identifier}"

    transaction_style = ""

    def __init__(
        self,
        transaction_style="url",  # type: str
        failed_request_status_codes=_DEFAULT_FAILED_REQUEST_STATUS_CODES,  # type: Union[Set[int], list[HttpStatusCodeRange], None]
        middleware_spans=True,  # type: bool
        http_methods_to_capture=DEFAULT_HTTP_METHODS_TO_CAPTURE,  # type: tuple[str, ...]
    ):
        # type: (...) -> None
        if transaction_style not in TRANSACTION_STYLE_VALUES:
            raise ValueError(
                "Invalid value for transaction_style: %s (must be in %s)"
                % (transaction_style, TRANSACTION_STYLE_VALUES)
            )
        self.transaction_style = transaction_style
        self.middleware_spans = middleware_spans
        self.http_methods_to_capture = tuple(map(str.upper, http_methods_to_capture))

        if isinstance(failed_request_status_codes, Set):
            self.failed_request_status_codes = (
                failed_request_status_codes
            )  # type: Container[int]
        else:
            warnings.warn(
                "Passing a list or None for failed_request_status_codes is deprecated. "
                "Please pass a set of int instead.",
                DeprecationWarning,
                stacklevel=2,
            )

            if failed_request_status_codes is None:
                self.failed_request_status_codes = _DEFAULT_FAILED_REQUEST_STATUS_CODES
            else:
                self.failed_request_status_codes = HttpCodeRangeContainer(
                    failed_request_status_codes
                )

    @staticmethod
    def setup_once():
        # type: () -> None
        version = parse_version(STARLETTE_VERSION)

        if version is None:
            raise DidNotEnable(
                "Unparsable Starlette version: {}".format(STARLETTE_VERSION)
            )

        patch_middlewares()
        patch_asgi_app()
        patch_request_response()

        if version >= (0, 24):
            patch_templates()


def _enable_span_for_middleware(middleware_class):
    # type: (Any) -> type
    old_call = middleware_class.__call__

    async def _create_span_call(app, scope, receive, send, **kwargs):
        # type: (Any, Dict[str, Any], Callable[[], Awaitable[Dict[str, Any]]], Callable[[Dict[str, Any]], Awaitable[None]], Any) -> None
        integration = sentry_sdk.get_client().get_integration(StarletteIntegration)
        if integration is None or not integration.middleware_spans:
            return await old_call(app, scope, receive, send, **kwargs)

        middleware_name = app.__class__.__name__

        # Update transaction name with middleware name
        name, source = _get_transaction_from_middleware(app, scope, integration)
        if name is not None:
            sentry_sdk.get_current_scope().set_transaction_name(
                name,
                source=source,
            )

        with sentry_sdk.start_span(
            op=OP.MIDDLEWARE_STARLETTE,
            name=middleware_name,
            origin=StarletteIntegration.origin,
        ) as middleware_span:
            middleware_span.set_tag("starlette.middleware_name", middleware_name)

            # Creating spans for the "receive" callback
            async def _sentry_receive(*args, **kwargs):
                # type: (*Any, **Any) -> Any
                with sentry_sdk.start_span(
                    op=OP.MIDDLEWARE_STARLETTE_RECEIVE,
                    name=getattr(receive, "__qualname__", str(receive)),
                    origin=StarletteIntegration.origin,
                ) as span:
                    span.set_tag("starlette.middleware_name", middleware_name)
                    return await receive(*args, **kwargs)

            receive_name = getattr(receive, "__name__", str(receive))
            receive_patched = receive_name == "_sentry_receive"
            new_receive = _sentry_receive if not receive_patched else receive

            # Creating spans for the "send" callback
            async def _sentry_send(*args, **kwargs):
                # type: (*Any, **Any) -> Any
                with sentry_sdk.start_span(
                    op=OP.MIDDLEWARE_STARLETTE_SEND,
                    name=getattr(send, "__qualname__", str(send)),
                    origin=StarletteIntegration.origin,
                ) as span:
                    span.set_tag("starlette.middleware_name", middleware_name)
                    return await send(*args, **kwargs)

            send_name = getattr(send, "__name__", str(send))
            send_patched = send_name == "_sentry_send"
            new_send = _sentry_send if not send_patched else send

            return await old_call(app, scope, new_receive, new_send, **kwargs)

    not_yet_patched = old_call.__name__ not in [
        "_create_span_call",
        "_sentry_authenticationmiddleware_call",
        "_sentry_exceptionmiddleware_call",
    ]

    if not_yet_patched:
        middleware_class.__call__ = _create_span_call

    return middleware_class


@ensure_integration_enabled(StarletteIntegration)
def _capture_exception(exception, handled=False):
    # type: (BaseException, **Any) -> None
    event, hint = event_from_exception(
        exception,
        client_options=sentry_sdk.get_client().options,
        mechanism={"type": StarletteIntegration.identifier, "handled": handled},
    )

    sentry_sdk.capture_event(event, hint=hint)


def patch_exception_middleware(middleware_class):
    # type: (Any) -> None
    """
    Capture all exceptions in Starlette app and
    also extract user information.
    """
    old_middleware_init = middleware_class.__init__

    not_yet_patched = "_sentry_middleware_init" not in str(old_middleware_init)

    if not_yet_patched:

        def _sentry_middleware_init(self, *args, **kwargs):
            # type: (Any, Any, Any) -> None
            old_middleware_init(self, *args, **kwargs)

            # Patch existing exception handlers
            old_handlers = self._exception_handlers.copy()

            async def _sentry_patched_exception_handler(self, *args, **kwargs):
                # type: (Any, Any, Any) -> None
                integration = sentry_sdk.get_client().get_integration(
                    StarletteIntegration
                )

                exp = args[0]

                if integration is not None:
                    is_http_server_error = (
                        hasattr(exp, "status_code")
                        and isinstance(exp.status_code, int)
                        and exp.status_code in integration.failed_request_status_codes
                    )
                    if is_http_server_error:
                        _capture_exception(exp, handled=True)

                # Find a matching handler
                old_handler = None
                for cls in type(exp).__mro__:
                    if cls in old_handlers:
                        old_handler = old_handlers[cls]
                        break

                if old_handler is None:
                    return

                if _is_async_callable(old_handler):
                    return await old_handler(self, *args, **kwargs)
                else:
                    return old_handler(self, *args, **kwargs)

            for key in self._exception_handlers.keys():
                self._exception_handlers[key] = _sentry_patched_exception_handler

        middleware_class.__init__ = _sentry_middleware_init

        old_call = middleware_class.__call__

        async def _sentry_exceptionmiddleware_call(self, scope, receive, send):
            # type: (Dict[str, Any], Dict[str, Any], Callable[[], Awaitable[Dict[str, Any]]], Callable[[Dict[str, Any]], Awaitable[None]]) -> None
            # Also add the user (that was eventually set by be Authentication middle
            # that was called before this middleware). This is done because the authentication
            # middleware sets the user in the scope and then (in the same function)
            # calls this exception middelware. In case there is no exception (or no handler
            # for the type of exception occuring) then the exception bubbles up and setting the
            # user information into the sentry scope is done in auth middleware and the
            # ASGI middleware will then send everything to Sentry and this is fine.
            # But if there is an exception happening that the exception middleware here
            # has a handler for, it will send the exception directly to Sentry, so we need
            # the user information right now.
            # This is why we do it here.
            _add_user_to_sentry_scope(scope)
            await old_call(self, scope, receive, send)

        middleware_class.__call__ = _sentry_exceptionmiddleware_call


@ensure_integration_enabled(StarletteIntegration)
def _add_user_to_sentry_scope(scope):
    # type: (Dict[str, Any]) -> None
    """
    Extracts user information from the ASGI scope and
    adds it to Sentry's scope.
    """
    if "user" not in scope:
        return

    if not should_send_default_pii():
        return

    user_info = {}  # type: Dict[str, Any]
    starlette_user = scope["user"]

    username = getattr(starlette_user, "username", None)
    if username:
        user_info.setdefault("username", starlette_user.username)

    user_id = getattr(starlette_user, "id", None)
    if user_id:
        user_info.setdefault("id", starlette_user.id)

    email = getattr(starlette_user, "email", None)
    if email:
        user_info.setdefault("email", starlette_user.email)

    sentry_scope = sentry_sdk.get_isolation_scope()
    sentry_scope.user = user_info


def patch_authentication_middleware(middleware_class):
    # type: (Any) -> None
    """
    Add user information to Sentry scope.
    """
    old_call = middleware_class.__call__

    not_yet_patched = "_sentry_authenticationmiddleware_call" not in str(old_call)

    if not_yet_patched:

        async def _sentry_authenticationmiddleware_call(self, scope, receive, send):
            # type: (Dict[str, Any], Dict[str, Any], Callable[[], Awaitable[Dict[str, Any]]], Callable[[Dict[str, Any]], Awaitable[None]]) -> None
            await old_call(self, scope, receive, send)
            _add_user_to_sentry_scope(scope)

        middleware_class.__call__ = _sentry_authenticationmiddleware_call


def patch_middlewares():
    # type: () -> None
    """
    Patches Starlettes `Middleware` class to record
    spans for every middleware invoked.
    """
    old_middleware_init = Middleware.__init__

    not_yet_patched = "_sentry_middleware_init" not in str(old_middleware_init)

    if not_yet_patched:

        def _sentry_middleware_init(self, cls, *args, **kwargs):
            # type: (Any, Any, Any, Any) -> None
            if cls == SentryAsgiMiddleware:
                return old_middleware_init(self, cls, *args, **kwargs)

            span_enabled_cls = _enable_span_for_middleware(cls)
            old_middleware_init(self, span_enabled_cls, *args, **kwargs)

            if cls == AuthenticationMiddleware:
                patch_authentication_middleware(cls)

            if cls == ExceptionMiddleware:
                patch_exception_middleware(cls)

        Middleware.__init__ = _sentry_middleware_init


def patch_asgi_app():
    # type: () -> None
    """
    Instrument Starlette ASGI app using the SentryAsgiMiddleware.
    """
    old_app = Starlette.__call__

    async def _sentry_patched_asgi_app(self, scope, receive, send):
        # type: (Starlette, StarletteScope, Receive, Send) -> None
        integration = sentry_sdk.get_client().get_integration(StarletteIntegration)
        if integration is None:
            return await old_app(self, scope, receive, send)

        middleware = SentryAsgiMiddleware(
            lambda *a, **kw: old_app(self, *a, **kw),
            mechanism_type=StarletteIntegration.identifier,
            transaction_style=integration.transaction_style,
            span_origin=StarletteIntegration.origin,
            http_methods_to_capture=(
                integration.http_methods_to_capture
                if integration
                else DEFAULT_HTTP_METHODS_TO_CAPTURE
            ),
        )

        middleware.__call__ = middleware._run_asgi3
        return await middleware(scope, receive, send)

    Starlette.__call__ = _sentry_patched_asgi_app


# This was vendored in from Starlette to support Starlette 0.19.1 because
# this function was only introduced in 0.20.x
def _is_async_callable(obj):
    # type: (Any) -> bool
    while isinstance(obj, functools.partial):
        obj = obj.func

    return asyncio.iscoroutinefunction(obj) or (
        callable(obj) and asyncio.iscoroutinefunction(obj.__call__)
    )


def patch_request_response():
    # type: () -> None
    old_request_response = starlette.routing.request_response

    def _sentry_request_response(func):
        # type: (Callable[[Any], Any]) -> ASGIApp
        old_func = func

        is_coroutine = _is_async_callable(old_func)
        if is_coroutine:

            async def _sentry_async_func(*args, **kwargs):
                # type: (*Any, **Any) -> Any
                integration = sentry_sdk.get_client().get_integration(
                    StarletteIntegration
                )
                if integration is None:
                    return await old_func(*args, **kwargs)

                request = args[0]

                _set_transaction_name_and_source(
                    sentry_sdk.get_current_scope(),
                    integration.transaction_style,
                    request,
                )

                sentry_scope = sentry_sdk.get_isolation_scope()
                extractor = StarletteRequestExtractor(request)
                info = await extractor.extract_request_info()

                def _make_request_event_processor(req, integration):
                    # type: (Any, Any) -> Callable[[Event, dict[str, Any]], Event]
                    def event_processor(event, hint):
                        # type: (Event, Dict[str, Any]) -> Event

                        # Add info from request to event
                        request_info = event.get("request", {})
                        if info:
                            if "cookies" in info:
                                request_info["cookies"] = info["cookies"]
                            if "data" in info:
                                request_info["data"] = info["data"]
                        event["request"] = deepcopy(request_info)

                        return event

                    return event_processor

                sentry_scope._name = StarletteIntegration.identifier
                sentry_scope.add_event_processor(
                    _make_request_event_processor(request, integration)
                )

                return await old_func(*args, **kwargs)

            func = _sentry_async_func

        else:

            @functools.wraps(old_func)
            def _sentry_sync_func(*args, **kwargs):
                # type: (*Any, **Any) -> Any
                integration = sentry_sdk.get_client().get_integration(
                    StarletteIntegration
                )
                if integration is None:
                    return old_func(*args, **kwargs)

                current_scope = sentry_sdk.get_current_scope()
                if current_scope.transaction is not None:
                    current_scope.transaction.update_active_thread()

                sentry_scope = sentry_sdk.get_isolation_scope()
                if sentry_scope.profile is not None:
                    sentry_scope.profile.update_active_thread_id()

                request = args[0]

                _set_transaction_name_and_source(
                    sentry_scope, integration.transaction_style, request
                )

                extractor = StarletteRequestExtractor(request)
                cookies = extractor.extract_cookies_from_request()

                def _make_request_event_processor(req, integration):
                    # type: (Any, Any) -> Callable[[Event, dict[str, Any]], Event]
                    def event_processor(event, hint):
                        # type: (Event, dict[str, Any]) -> Event

                        # Extract information from request
                        request_info = event.get("request", {})
                        if cookies:
                            request_info["cookies"] = cookies

                        event["request"] = deepcopy(request_info)

                        return event

                    return event_processor

                sentry_scope._name = StarletteIntegration.identifier
                sentry_scope.add_event_processor(
                    _make_request_event_processor(request, integration)
                )

                return old_func(*args, **kwargs)

            func = _sentry_sync_func

        return old_request_response(func)

    starlette.routing.request_response = _sentry_request_response


def patch_templates():
    # type: () -> None

    # If markupsafe is not installed, then Jinja2 is not installed
    # (markupsafe is a dependency of Jinja2)
    # In this case we do not need to patch the Jinja2Templates class
    try:
        from markupsafe import Markup
    except ImportError:
        return  # Nothing to do

    from starlette.templating import Jinja2Templates  # type: ignore

    old_jinja2templates_init = Jinja2Templates.__init__

    not_yet_patched = "_sentry_jinja2templates_init" not in str(
        old_jinja2templates_init
    )

    if not_yet_patched:

        def _sentry_jinja2templates_init(self, *args, **kwargs):
            # type: (Jinja2Templates, *Any, **Any) -> None
            def add_sentry_trace_meta(request):
                # type: (Request) -> Dict[str, Any]
                trace_meta = Markup(
                    sentry_sdk.get_current_scope().trace_propagation_meta()
                )
                return {
                    "sentry_trace_meta": trace_meta,
                }

            kwargs.setdefault("context_processors", [])

            if add_sentry_trace_meta not in kwargs["context_processors"]:
                kwargs["context_processors"].append(add_sentry_trace_meta)

            return old_jinja2templates_init(self, *args, **kwargs)

        Jinja2Templates.__init__ = _sentry_jinja2templates_init


class StarletteRequestExtractor:
    """
    Extracts useful information from the Starlette request
    (like form data or cookies) and adds it to the Sentry event.
    """

    request = None  # type: Request

    def __init__(self, request):
        # type: (StarletteRequestExtractor, Request) -> None
        self.request = request

    def extract_cookies_from_request(self):
        # type: (StarletteRequestExtractor) -> Optional[Dict[str, Any]]
        cookies = None  # type: Optional[Dict[str, Any]]
        if should_send_default_pii():
            cookies = self.cookies()

        return cookies

    async def extract_request_info(self):
        # type: (StarletteRequestExtractor) -> Optional[Dict[str, Any]]
        client = sentry_sdk.get_client()

        request_info = {}  # type: Dict[str, Any]

        with capture_internal_exceptions():
            # Add cookies
            if should_send_default_pii():
                request_info["cookies"] = self.cookies()

            # If there is no body, just return the cookies
            content_length = await self.content_length()
            if not content_length:
                return request_info

            # Add annotation if body is too big
            if content_length and not request_body_within_bounds(
                client, content_length
            ):
                request_info["data"] = AnnotatedValue.removed_because_over_size_limit()
                return request_info

            # Add JSON body, if it is a JSON request
            json = await self.json()
            if json:
                request_info["data"] = json
                return request_info

            # Add form as key/value pairs, if request has form data
            form = await self.form()
            if form:
                form_data = {}
                for key, val in form.items():
                    is_file = isinstance(val, UploadFile)
                    form_data[key] = (
                        val
                        if not is_file
                        else AnnotatedValue.removed_because_raw_data()
                    )

                request_info["data"] = form_data
                return request_info

            # Raw data, do not add body just an annotation
            request_info["data"] = AnnotatedValue.removed_because_raw_data()
            return request_info

    async def content_length(self):
        # type: (StarletteRequestExtractor) -> Optional[int]
        if "content-length" in self.request.headers:
            return int(self.request.headers["content-length"])

        return None

    def cookies(self):
        # type: (StarletteRequestExtractor) -> Dict[str, Any]
        return self.request.cookies

    async def form(self):
        # type: (StarletteRequestExtractor) -> Any
        if multipart is None:
            return None

        # Parse the body first to get it cached, as Starlette does not cache form() as it
        # does with body() and json() https://github.com/encode/starlette/discussions/1933
        # Calling `.form()` without calling `.body()` first will
        # potentially break the users project.
        await self.request.body()

        return await self.request.form()

    def is_json(self):
        # type: (StarletteRequestExtractor) -> bool
        return _is_json_content_type(self.request.headers.get("content-type"))

    async def json(self):
        # type: (StarletteRequestExtractor) -> Optional[Dict[str, Any]]
        if not self.is_json():
            return None
        try:
            return await self.request.json()
        except JSONDecodeError:
            return None


def _transaction_name_from_router(scope):
    # type: (StarletteScope) -> Optional[str]
    router = scope.get("router")
    if not router:
        return None

    for route in router.routes:
        match = route.matches(scope)
        if match[0] == Match.FULL:
            try:
                return route.path
            except AttributeError:
                # routes added via app.host() won't have a path attribute
                return scope.get("path")

    return None


def _set_transaction_name_and_source(scope, transaction_style, request):
    # type: (sentry_sdk.Scope, str, Any) -> None
    name = None
    source = SOURCE_FOR_STYLE[transaction_style]

    if transaction_style == "endpoint":
        endpoint = request.scope.get("endpoint")
        if endpoint:
            name = transaction_from_function(endpoint) or None

    elif transaction_style == "url":
        name = _transaction_name_from_router(request.scope)

    if name is None:
        name = _DEFAULT_TRANSACTION_NAME
        source = TransactionSource.ROUTE

    scope.set_transaction_name(name, source=source)
    logger.debug(
        "[Starlette] Set transaction name and source on scope: %s / %s", name, source
    )


def _get_transaction_from_middleware(app, asgi_scope, integration):
    # type: (Any, Dict[str, Any], StarletteIntegration) -> Tuple[Optional[str], Optional[str]]
    name = None
    source = None

    if integration.transaction_style == "endpoint":
        name = transaction_from_function(app.__class__)
        source = TransactionSource.COMPONENT
    elif integration.transaction_style == "url":
        name = _transaction_name_from_router(asgi_scope)
        source = TransactionSource.ROUTE

    return name, source

Youez - 2016 - github.com/yon3zu
LinuXploit