GIF89a=( õ' 7IAXKgNgYvYx\%wh…hŽth%ˆs%—x¨}9®Œ©€&©‰%¶†(¹–.¹5·œD¹&Çš)ÇŸ5ǘ;Í£*È¡&Õ²)ׯ7×µ<Ñ»4ï°3ø‘HÖ§KͯT÷¨Yÿšqÿ»qÿÔFØ !ù ' !ÿ NETSCAPE2.0 , =( þÀ“pH,È¤rÉl:ŸÐ¨tJ­Z¯Ø¬vËíz¿à°xL.›Ïè´zÍn»ßð¸|N¯Ûïø¼~Ïïûÿ€‚ƒ„…†‡ˆ‰Š‹ŒŽ‘’“”•–—˜™š›œžŸ ¡¢£¤¥¦§gª«ªE¯°¨¬ª±²Œ¹º¹E¾­”´ÂB¶¯ §Åȸ»ÑD¾¿Á•ÄÅ®° ÝH¾ÒLÀÆDÙ«D¶BÝïðÀ¾DÑÑÔTÌÍíH òGö¨A RÎڐ |¥ ٭&ºìE8œ¹kGÔAÞpx­a¶­ã R2XB®åE8I€Õ6Xî:vT)äžþÀq¦è³¥ì仕F~%xñ  4#ZÔ‰O|-4Bs‘X:= QÉ œš lºÒyXJŠGȦ|s hÏíK–3l7·B|¥$'7Jީܪ‰‡àá”Dæn=Pƒ ¤Òëí‰`䌨ljóá¯Éüv>á–Á¼5 ½.69ûϸd«­ºÀûnlv©‹ªîf{¬ÜãPbŸ  l5‘ޝpß ´ ˜3aÅùäI«O’ý·‘áÞ‡˜¾Æ‚ÙÏiÇÿ‹Àƒ #öó)pâš Þ½ ‘Ý{ó)vmÞü%D~ 6f s}ŃƒDØW Eþ`‡þ À…L8xá†ç˜{)x`X/> Ì}mø‚–RØ‘*|`D=‚Ø_ ^ð5 !_…'aä“OÚ—7âcð`D”Cx`ÝÂ¥ä‹éY¹—F¼¤¥Š?¡Õ™ n@`} lď’ÄÉ@4>ñd œ à‘vÒxNÃ×™@žd=ˆgsžG±æ ´²æud &p8Qñ)ˆ«lXD©øÜéAžHìySun jª×k*D¤LH] †¦§C™Jä–´Xb~ʪwStŽ6K,°£qÁœ:9ت:¨þªl¨@¡`‚ûÚ ».Û¬¯t‹ÆSÉ[:°=Š‹„‘Nåû”Ìî{¿ÂA ‡Rà›ÀÙ6úë°Ÿð0Ä_ ½;ÃϱîÉì^ÇÛÇ#Ëë¼ôº!±Ä˜íUîÅÇ;0L1óÁµö«p% AÀºU̬ݵ¼á%霼€‡¯Á~`ÏG¯»À× ­²± =4ªnpð3¾¤³¯­ü¾¦îuÙuµÙ®|%2ÊIÿür¦#0·ÔJ``8È@S@5ê¢ ö×Þ^`8EÜ]ý.뜃Âç 7 ú ȉÞj œ½Dç zý¸iþœÑÙûÄë!ˆÞÀl§Ïw‹*DçI€nEX¯¬¼ &A¬Go¼QföõFç°¯;é¦÷îŽêJ°îúôF5¡ÌQ|îúöXªæ»TÁÏyñêï]ê² o óÎC=öõ›ÒÓPB@ D×½œä(>èCÂxŽ`±«Ÿ–JЀ»Û á¤±p+eE0`ëŽ`A Ú/NE€Ø†À9‚@¤à H½7”à‡%B‰`Àl*ƒó‘–‡8 2ñ%¸ —€:Ù1Á‰E¸àux%nP1ð!‘ðC)¾P81lÑɸF#ˆ€{´âé°ÈB„0>±û °b¡Š´±O‚3È–Ù()yRpbµ¨E.Z‘D8ÊH@% òŒx+%Ù˜Æcü »¸˜fõ¬b·d`Fê™8èXH"ÉÈ-±|1Ô6iI, 2““¬$+](A*jÐ QTÂo‰.ÛU슬Œã„Ž`¯SN¡–¶Äåyše¯ª’­¬‚´b¦Éož œ)åyâ@Ì®3 ÎtT̉°&Ø+žLÀf"Ø-|žçÔ>‡Ðv¦Ðžì\‚ Q1)Ž@Žh#aP72”ˆ™¨$‚ !ù " , =( …7IAXG]KgNgYvYxR"k\%w]'}hŽth%ˆg+ˆs%—r.—m3šx3˜x¨}9®€&©€+¨‡7§‰%¶†(¹–.¹œD¹&ǘ;Í•&ײ)×»4ïÌ6ò§KÍ þ@‘pH,È¤rÉl:ŸÐ¨tJ­Z¯Ø¬vËíz¿à°xL.›Ïè´zÍn»ßð¸|N¯Ûïø¼~Ïïûÿ€‚ƒ„…†‡ˆ‰Š‹ŒŽ‘’“”•–—˜™š›œžŸ ¡¢£¤¥¦§g «¬ E ±± ¨­¶°ººE Á´”·®C¬²§Ç¶Œ»ÓDÃÕƷ¯Ê±H½ºM×ÁGÚ¬D¶BËÁ½î½DÓôTÏÛßîG»ôõC×CÌ l&âž:'òtU³6ɹ#·Ø)€'Ü.6±&ëÍÈ» K(8p0N?!æ2"ÛˆNIJX>R¼ÐO‚M '¡¨2¸*Ÿþ>#n↠å@‚<[:¡Iïf’ ¤TÚ˘CdbÜÙ“[«ŽEú5MBo¤×@€`@„€Êt W-3 ¶Ÿ¡BíêäjIÝ…Eò9[T…$íêﯧ„…•s»Óȳ¹€ÅÚdc®UUρ#±Ùïldj?´í¼²`\ŽÁðÞu|3'ÖŒ]ë6 ¶S#²‡˜FKLÈ *N E´‘áäŠ$˜›eÄYD„ºq«.è촁ƒs \-ÔjA 9²õ÷å- üúM[Âx(ís÷ì®x€|í¡Ù’p¦‚ ŽkÛTÇDpE@WÜ ²Ç]kŠ1¨ þ€·Yb ÓÁ‰l°*n0 ç™—žzBdОu¾7ĉBl€â‰-ºx~|UåU‰  h*Hœ|e"#"?vpÄiŠe6^ˆ„+qâŠm8 #VÇá ‘å–ÄV„œ|Аè•m"сœn|@›U¶ÆÎž—Špb¥G¨ED”€±Úê2FÌIç? >Éxå Œ± ¡¤„%‘žjŸ‘ꄯ<Ìaà9ijÐ2˜D¦È&›†Z`‚å]wþ¼Â:ç6àB¤7eFJ|õÒ§Õ,¨äàFÇ®cS·Ê¶+B°,‘Þ˜ºNûãØ>PADÌHD¹æž«ÄÀnÌ¥}­#Ë’ë QÀÉSÌÂÇ2ÌXÀ{æk²lQÁ2«ÊðÀ¯w|2Í h‹ÄÂG€,m¾¶ë3ÐÙ6-´ÅE¬L°ÆIij*K½ÀÇqï`DwVÍQXœÚÔpeœ±¬Ñ q˜§Tœ½µƒ°Œìu Â<¶aØ*At¯lmEØ ü ôÛN[P1ÔÛ¦­±$ÜÆ@`ùåDpy¶yXvCAyåB`ŽD¶ 0QwG#¯ æš[^Äþ $ÀÓÝǦ{„L™[±úKÄgÌ;ï£S~¹ìGX.ôgoT.»åˆ°ùŸûù¡?1zö¦Ÿž:ÅgÁ|ìL¹ „®£œŠ‚à0œ]PÁ^p F<"•ç?!,ñ‡N4—…PÄ Á„ö¨Û:Tè@hÀ‹%táÿ:ø-žI<`þ‹p I….)^ 40D#p@ƒj4–؀:²‰1Øâr˜¼F2oW¼#Z†;$Q q” ‘ ÂK¦ñNl#29 !’F@¥Bh·ᏀL!—XFóLH‘Kh¤.«hE&JòG¨¥<™WN!€ÑÙÚˆY„@†>Œž19J" 2,/ &.GXB%ÌRÈ9B6¹W]’î×ÔW¥’IÎ$ ñ‹ÓŒE8YÆ ¼³™ñA5“à®Q.aŸB€&Ø©³ JÁ—! ¦t)K%tœ-¦JF bòNMxLôþ)ÐR¸Ð™‘ èÝ6‘O!THÌ„HÛ ‰ !ù ) , =( …AXKgNgYvYxR"k\%wh…hŽh%ˆg+ˆs%—r.—x3˜x¨}9®€&©€+¨Œ,©‡7§‰%¶†(¹–.¹5·&Çš)ǘ;Í•&×£*Ȳ)ׯ7×»4ï°3øÌ6ò‘HÖ§KÍ»Hó¯T÷¨Yÿ»qÿÇhÿ þÀ”pH,È¤rÉl:ŸÐ¨tJ­Z¯Ø¬vËíz¿à°xL.›Ïè´zÍn»ßð¸|N¯Ûïø¼~Ïïûÿ€‚ƒ„…†‡ˆ‰Š‹ŒŽ‘’“”•–—˜™š›œžŸ ¡¢£¤¥¦§g ª« E$±²¨ª­ · °²½$E$ÂÕ««D· Í ¿¦Ç¶¸ÌŒ¾³CÃÅÆ E ééH½MÛÂGâªD­ çBêêϾD²ÒaÀà€Š1r­ðÓ¤ ÔožzU!L˜C'¾yW½UGtäÇïÙllê0×àÂuGþ)AÀs[þ·xì ÁxO%ƒûX2ó—  P£n›R/¡ÑšHše+êDm?# —‘Ç£6¡8íJ¡ŸâDiäªM¥Ö„ôj“¬¹£5oQ7°- <‡ *´lãÓŒ2r/a!l)dÈ A™ÈE¢ôÔ͆…ð ;Ö˜c ¡%ß‚’Ùˆâ¸b½—pe~C"BíëÚHïeF2§æŠ8qb t_`urŠeü wÅu3êæPv§h•"ß`íÍxçLĹÜÖ3á  ~Öº“®›¸ÏMDfJÙ °„ÛµáWõ%§œ‚à©–‚X ÓØ)@®Ñ›Eþ´wëuÅSxb8y\mÖzœ¥§ZbºE—ÂLªÌw!y(>¡™wú=Ç|ÅÝs¢d €CÁW)HÜcC$€L Ä7„r.á\{)@ð` @ äXÈ$PD” `šaG:§æˆOˆ72EÐamn]ù"ŒcÊxÑŒ° &dR8`g«iÙŸLR!¦P …d’ä¡“¦ðÎTƒ¦ià|À _ ¥ Qi#¦Šg›Æ ›noMµ ›V ã£)p ç£ÎW…š=Âeªk§†j„ ´®1ß²sÉxéW«jšl|0¯B0Û, \jÛ´›6±¬¶C ÛíWþï|ëÙ‹¸ñzĸV {ì;Ýñn¼òVˆm³I¼³.Ðã¤PN¥ ²µ¼„µCã+¹ÍByî£Ñ¾HŸ›ëê 7ìYÆFTk¨SaoaY$Dµœìï¿Ã29RÈkt Çïfñ ÇÒ:ÀÐSp¹3ÇI¨â¥DZÄ ü9Ïýögñ½­uÔ*3)O‘˜Ö[_hv ,àî×Et Ÿé¶BH€ Õ[ü±64M@ÔSÌM7dÐl5-ÄÙU܍´©zߌ3Ô€3ž„ „ ¶ÛPô½5×g› êÚ˜kN„Ý…0Îj4€Ìë°“#{þÕ3S2çKÜ'ợlø¼Ú2K{° {Û¶?žm𸧠ËI¼nEò='êüóºè^üæÃ_Û=°óž‚ì#Oý¿Í'¡½áo..ÏYìnüñCœO±Áa¿¢Kô½o,üÄËbö²çºíï{ËC Ú— "”Ï{ËK ÍÒw„õ±Oz dÕ¨à:$ ƒô—«v»] A#ð «€¿šéz)Rx׿ˆ¥‚d``èw-îyÏf×K!ð€þ­Ð|ìPľ„=Ì`ý(f” 'Pa ¥ÐBJa%Ðâf§„%Š¡}FàáÝ×6>ÉäŠG"éŽè=ø!oа^FP¼Ø©Q„ÀCÙÁ`(Ž\ÄÝ® ©Â$<n@dÄ E#ììUÒI! ‚#lù‹`k¦ÐÇ'Rró’ZýNBÈMF Í[¤+‹ðɈ-áwj¨¥þ8¾rá ,VÂh„"|½œ=×G_¦Ñ™EØ 0i*%̲˜Æda0mV‚k¾)›;„&6 p>ÓjK “¦Ç# âDÂ:ûc?:R Ó¬fÞéI-Ì“•Ã<ä=™Ï7˜3œ¨˜c2ŒW ,ˆ”8(T™P‰F¡Jhç"‚ ; 403WebShell
403Webshell
Server IP : 104.21.83.152  /  Your IP : 216.73.216.195
Web Server : LiteSpeed
System : Linux premium229.web-hosting.com 4.18.0-553.45.1.lve.el8.x86_64 #1 SMP Wed Mar 26 12:08:09 UTC 2025 x86_64
User : akhalid ( 749)
PHP Version : 8.3.22
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/alt/python33/lib64/python3.3/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/alt/python33/lib64/python3.3/ssl.py
# Wrapper module for _ssl, providing some additional facilities
# implemented in Python.  Written by Bill Janssen.

"""This module provides some more Pythonic support for SSL.

Object types:

  SSLSocket -- subtype of socket.socket which does SSL over the socket

Exceptions:

  SSLError -- exception raised for I/O errors

Functions:

  cert_time_to_seconds -- convert time string used for certificate
                          notBefore and notAfter functions to integer
                          seconds past the Epoch (the time values
                          returned from time.time())

  fetch_server_certificate (HOST, PORT) -- fetch the certificate provided
                          by the server running on HOST at port PORT.  No
                          validation of the certificate is performed.

Integer constants:

SSL_ERROR_ZERO_RETURN
SSL_ERROR_WANT_READ
SSL_ERROR_WANT_WRITE
SSL_ERROR_WANT_X509_LOOKUP
SSL_ERROR_SYSCALL
SSL_ERROR_SSL
SSL_ERROR_WANT_CONNECT

SSL_ERROR_EOF
SSL_ERROR_INVALID_ERROR_CODE

The following group define certificate requirements that one side is
allowing/requiring from the other side:

CERT_NONE - no certificates from the other side are required (or will
            be looked at if provided)
CERT_OPTIONAL - certificates are not required, but if provided will be
                validated, and if validation fails, the connection will
                also fail
CERT_REQUIRED - certificates are required, and will be validated, and
                if validation fails, the connection will also fail

The following constants identify various SSL protocol variants:

PROTOCOL_SSLv2
PROTOCOL_SSLv3
PROTOCOL_SSLv23
PROTOCOL_TLSv1
"""

import textwrap
import re

import _ssl             # if we can't import it, let the error propagate

from _ssl import OPENSSL_VERSION_NUMBER, OPENSSL_VERSION_INFO, OPENSSL_VERSION
from _ssl import _SSLContext
from _ssl import (
    SSLError, SSLZeroReturnError, SSLWantReadError, SSLWantWriteError,
    SSLSyscallError, SSLEOFError,
    )
from _ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED
from _ssl import (
    OP_ALL, OP_NO_SSLv2, OP_NO_SSLv3, OP_NO_TLSv1,
    OP_CIPHER_SERVER_PREFERENCE, OP_SINGLE_DH_USE
    )
try:
    from _ssl import OP_NO_COMPRESSION
except ImportError:
    pass
try:
    from _ssl import OP_SINGLE_ECDH_USE
except ImportError:
    pass
from _ssl import RAND_status, RAND_add, RAND_bytes, RAND_pseudo_bytes
try:
    from _ssl import RAND_egd
except ImportError:
    pass

from _ssl import (
    SSL_ERROR_ZERO_RETURN,
    SSL_ERROR_WANT_READ,
    SSL_ERROR_WANT_WRITE,
    SSL_ERROR_WANT_X509_LOOKUP,
    SSL_ERROR_SYSCALL,
    SSL_ERROR_SSL,
    SSL_ERROR_WANT_CONNECT,
    SSL_ERROR_EOF,
    SSL_ERROR_INVALID_ERROR_CODE,
    )
from _ssl import HAS_SNI, HAS_ECDH, HAS_NPN
from _ssl import (PROTOCOL_SSLv3, PROTOCOL_SSLv23,
                  PROTOCOL_TLSv1, PROTOCOL_TLS)
from _ssl import _OPENSSL_API_VERSION

globals()['PROTOCOL_SSLv23'] = getattr(_ssl, 'PROTOCOL_TLS')

_PROTOCOL_NAMES = {
    PROTOCOL_TLSv1: "TLSv1",
    PROTOCOL_SSLv23: "SSLv23",
    PROTOCOL_SSLv3: "SSLv3",
    PROTOCOL_TLS: "SSLv23"
}
try:
    from _ssl import PROTOCOL_SSLv2
    _SSLv2_IF_EXISTS = PROTOCOL_SSLv2
except ImportError:
    _SSLv2_IF_EXISTS = None
else:
    _PROTOCOL_NAMES[PROTOCOL_SSLv2] = "SSLv2"

from socket import getnameinfo as _getnameinfo
from socket import error as socket_error
from socket import socket, AF_INET, SOCK_STREAM, create_connection
from socket import SOL_SOCKET, SO_TYPE
import base64        # for DER-to-PEM translation
import traceback
import errno

if _ssl.HAS_TLS_UNIQUE:
    CHANNEL_BINDING_TYPES = ['tls-unique']
else:
    CHANNEL_BINDING_TYPES = []

# Disable weak or insecure ciphers by default
# (OpenSSL's default setting is 'DEFAULT:!aNULL:!eNULL')
_DEFAULT_CIPHERS = 'DEFAULT:!aNULL:!eNULL:!LOW:!EXPORT:!SSLv2'


class CertificateError(ValueError):
    pass


def _dnsname_match(dn, hostname, max_wildcards=1):
    """Matching according to RFC 6125, section 6.4.3

    http://tools.ietf.org/html/rfc6125#section-6.4.3
    """
    pats = []
    if not dn:
        return False

    leftmost, *remainder = dn.split(r'.')

    wildcards = leftmost.count('*')
    if wildcards > max_wildcards:
        # Issue #17980: avoid denials of service by refusing more
        # than one wildcard per fragment.  A survery of established
        # policy among SSL implementations showed it to be a
        # reasonable choice.
        raise CertificateError(
            "too many wildcards in certificate DNS name: " + repr(dn))

    # speed up common case w/o wildcards
    if not wildcards:
        return dn.lower() == hostname.lower()

    # RFC 6125, section 6.4.3, subitem 1.
    # The client SHOULD NOT attempt to match a presented identifier in which
    # the wildcard character comprises a label other than the left-most label.
    if leftmost == '*':
        # When '*' is a fragment by itself, it matches a non-empty dotless
        # fragment.
        pats.append('[^.]+')
    elif leftmost.startswith('xn--') or hostname.startswith('xn--'):
        # RFC 6125, section 6.4.3, subitem 3.
        # The client SHOULD NOT attempt to match a presented identifier
        # where the wildcard character is embedded within an A-label or
        # U-label of an internationalized domain name.
        pats.append(re.escape(leftmost))
    else:
        # Otherwise, '*' matches any dotless string, e.g. www*
        pats.append(re.escape(leftmost).replace(r'\*', '[^.]*'))

    # add the remaining fragments, ignore any wildcards
    for frag in remainder:
        pats.append(re.escape(frag))

    pat = re.compile(r'\A' + r'\.'.join(pats) + r'\Z', re.IGNORECASE)
    return pat.match(hostname)


def match_hostname(cert, hostname):
    """Verify that *cert* (in decoded format as returned by
    SSLSocket.getpeercert()) matches the *hostname*.  RFC 2818 and RFC 6125
    rules are followed, but IP addresses are not accepted for *hostname*.

    CertificateError is raised on failure. On success, the function
    returns nothing.
    """
    if not cert:
        raise ValueError("empty or no certificate")
    dnsnames = []
    san = cert.get('subjectAltName', ())
    for key, value in san:
        if key == 'DNS':
            if _dnsname_match(value, hostname):
                return
            dnsnames.append(value)
    if not dnsnames:
        # The subject is only checked when there is no dNSName entry
        # in subjectAltName
        for sub in cert.get('subject', ()):
            for key, value in sub:
                # XXX according to RFC 2818, the most specific Common Name
                # must be used.
                if key == 'commonName':
                    if _dnsname_match(value, hostname):
                        return
                    dnsnames.append(value)
    if len(dnsnames) > 1:
        raise CertificateError("hostname %r "
            "doesn't match either of %s"
            % (hostname, ', '.join(map(repr, dnsnames))))
    elif len(dnsnames) == 1:
        raise CertificateError("hostname %r "
            "doesn't match %r"
            % (hostname, dnsnames[0]))
    else:
        raise CertificateError("no appropriate commonName or "
            "subjectAltName fields were found")


class SSLContext(_SSLContext):
    """An SSLContext holds various SSL-related configuration options and
    data, such as certificates and possibly a private key."""

    __slots__ = ('protocol',)

    def __new__(cls, protocol, *args, **kwargs):
        self = _SSLContext.__new__(cls, protocol)
        if protocol != _SSLv2_IF_EXISTS:
            self.set_ciphers(_DEFAULT_CIPHERS)
        return self

    def __init__(self, protocol):
        self.protocol = protocol

    def wrap_socket(self, sock, server_side=False,
                    do_handshake_on_connect=True,
                    suppress_ragged_eofs=True,
                    server_hostname=None):
        return SSLSocket(sock=sock, server_side=server_side,
                         do_handshake_on_connect=do_handshake_on_connect,
                         suppress_ragged_eofs=suppress_ragged_eofs,
                         server_hostname=server_hostname,
                         _context=self)

    def set_npn_protocols(self, npn_protocols):
        protos = bytearray()
        for protocol in npn_protocols:
            b = bytes(protocol, 'ascii')
            if len(b) == 0 or len(b) > 255:
                raise SSLError('NPN protocols must be 1 to 255 in length')
            protos.append(len(b))
            protos.extend(b)

        self._set_npn_protocols(protos)


class SSLSocket(socket):
    """This class implements a subtype of socket.socket that wraps
    the underlying OS socket in an SSL context when necessary, and
    provides read and write methods over that channel."""

    def __init__(self, sock=None, keyfile=None, certfile=None,
                 server_side=False, cert_reqs=CERT_NONE,
                 ssl_version=PROTOCOL_TLS, ca_certs=None,
                 do_handshake_on_connect=True,
                 family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None,
                 suppress_ragged_eofs=True, npn_protocols=None, ciphers=None,
                 server_hostname=None,
                 _context=None):

        if _context:
            self.context = _context
        else:
            if server_side and not certfile:
                raise ValueError("certfile must be specified for server-side "
                                 "operations")
            if keyfile and not certfile:
                raise ValueError("certfile must be specified")
            if certfile and not keyfile:
                keyfile = certfile
            self.context = SSLContext(ssl_version)
            self.context.verify_mode = cert_reqs
            if ca_certs:
                self.context.load_verify_locations(ca_certs)
            if certfile:
                self.context.load_cert_chain(certfile, keyfile)
            if npn_protocols:
                self.context.set_npn_protocols(npn_protocols)
            if ciphers:
                self.context.set_ciphers(ciphers)
            self.keyfile = keyfile
            self.certfile = certfile
            self.cert_reqs = cert_reqs
            self.ssl_version = ssl_version
            self.ca_certs = ca_certs
            self.ciphers = ciphers
        # Can't use sock.type as other flags (such as SOCK_NONBLOCK) get
        # mixed in.
        if sock.getsockopt(SOL_SOCKET, SO_TYPE) != SOCK_STREAM:
            raise NotImplementedError("only stream sockets are supported")
        if server_side and server_hostname:
            raise ValueError("server_hostname can only be specified "
                             "in client mode")
        self.server_side = server_side
        self.server_hostname = server_hostname
        self.do_handshake_on_connect = do_handshake_on_connect
        self.suppress_ragged_eofs = suppress_ragged_eofs
        connected = False
        if sock is not None:
            socket.__init__(self,
                            family=sock.family,
                            type=sock.type,
                            proto=sock.proto,
                            fileno=sock.fileno())
            self.settimeout(sock.gettimeout())
            # see if it's connected
            try:
                sock.getpeername()
            except socket_error as e:
                if e.errno != errno.ENOTCONN:
                    raise
            else:
                connected = True
            sock.detach()
        elif fileno is not None:
            socket.__init__(self, fileno=fileno)
        else:
            socket.__init__(self, family=family, type=type, proto=proto)

        self._closed = False
        self._sslobj = None
        self._connected = connected
        if connected:
            # create the SSL object
            try:
                self._sslobj = self.context._wrap_socket(self, server_side,
                                                         server_hostname)
                if do_handshake_on_connect:
                    timeout = self.gettimeout()
                    if timeout == 0.0:
                        # non-blocking
                        raise ValueError("do_handshake_on_connect should not be specified for non-blocking sockets")
                    self.do_handshake()

            except socket_error as x:
                self.close()
                raise x

    def dup(self):
        raise NotImplemented("Can't dup() %s instances" %
                             self.__class__.__name__)

    def _checkClosed(self, msg=None):
        # raise an exception here if you wish to check for spurious closes
        pass

    def read(self, len=0, buffer=None):
        """Read up to LEN bytes and return them.
        Return zero-length string on EOF."""

        self._checkClosed()
        try:
            if buffer is not None:
                v = self._sslobj.read(len, buffer)
            else:
                v = self._sslobj.read(len or 1024)
            return v
        except SSLError as x:
            if x.args[0] == SSL_ERROR_EOF and self.suppress_ragged_eofs:
                if buffer is not None:
                    return 0
                else:
                    return b''
            else:
                raise

    def write(self, data):
        """Write DATA to the underlying SSL channel.  Returns
        number of bytes of DATA actually transmitted."""

        self._checkClosed()
        return self._sslobj.write(data)

    def getpeercert(self, binary_form=False):
        """Returns a formatted version of the data in the
        certificate provided by the other end of the SSL channel.
        Return None if no certificate was provided, {} if a
        certificate was provided, but not validated."""

        self._checkClosed()
        return self._sslobj.peer_certificate(binary_form)

    def selected_npn_protocol(self):
        self._checkClosed()
        if not self._sslobj or not _ssl.HAS_NPN:
            return None
        else:
            return self._sslobj.selected_npn_protocol()

    def cipher(self):
        self._checkClosed()
        if not self._sslobj:
            return None
        else:
            return self._sslobj.cipher()

    def compression(self):
        self._checkClosed()
        if not self._sslobj:
            return None
        else:
            return self._sslobj.compression()

    def send(self, data, flags=0):
        self._checkClosed()
        if self._sslobj:
            if flags != 0:
                raise ValueError(
                    "non-zero flags not allowed in calls to send() on %s" %
                    self.__class__)
            while True:
                try:
                    v = self._sslobj.write(data)
                except SSLError as x:
                    if x.args[0] == SSL_ERROR_WANT_READ:
                        return 0
                    elif x.args[0] == SSL_ERROR_WANT_WRITE:
                        return 0
                    else:
                        raise
                else:
                    return v
        else:
            return socket.send(self, data, flags)

    def sendto(self, data, flags_or_addr, addr=None):
        self._checkClosed()
        if self._sslobj:
            raise ValueError("sendto not allowed on instances of %s" %
                             self.__class__)
        elif addr is None:
            return socket.sendto(self, data, flags_or_addr)
        else:
            return socket.sendto(self, data, flags_or_addr, addr)

    def sendmsg(self, *args, **kwargs):
        # Ensure programs don't send data unencrypted if they try to
        # use this method.
        raise NotImplementedError("sendmsg not allowed on instances of %s" %
                                  self.__class__)

    def sendall(self, data, flags=0):
        self._checkClosed()
        if self._sslobj:
            if flags != 0:
                raise ValueError(
                    "non-zero flags not allowed in calls to sendall() on %s" %
                    self.__class__)
            amount = len(data)
            count = 0
            while (count < amount):
                v = self.send(data[count:])
                count += v
            return amount
        else:
            return socket.sendall(self, data, flags)

    def recv(self, buflen=1024, flags=0):
        self._checkClosed()
        if self._sslobj:
            if flags != 0:
                raise ValueError(
                    "non-zero flags not allowed in calls to recv() on %s" %
                    self.__class__)
            return self.read(buflen)
        else:
            return socket.recv(self, buflen, flags)

    def recv_into(self, buffer, nbytes=None, flags=0):
        self._checkClosed()
        if buffer and (nbytes is None):
            nbytes = len(buffer)
        elif nbytes is None:
            nbytes = 1024
        if self._sslobj:
            if flags != 0:
                raise ValueError(
                  "non-zero flags not allowed in calls to recv_into() on %s" %
                  self.__class__)
            return self.read(nbytes, buffer)
        else:
            return socket.recv_into(self, buffer, nbytes, flags)

    def recvfrom(self, buflen=1024, flags=0):
        self._checkClosed()
        if self._sslobj:
            raise ValueError("recvfrom not allowed on instances of %s" %
                             self.__class__)
        else:
            return socket.recvfrom(self, buflen, flags)

    def recvfrom_into(self, buffer, nbytes=None, flags=0):
        self._checkClosed()
        if self._sslobj:
            raise ValueError("recvfrom_into not allowed on instances of %s" %
                             self.__class__)
        else:
            return socket.recvfrom_into(self, buffer, nbytes, flags)

    def recvmsg(self, *args, **kwargs):
        raise NotImplementedError("recvmsg not allowed on instances of %s" %
                                  self.__class__)

    def recvmsg_into(self, *args, **kwargs):
        raise NotImplementedError("recvmsg_into not allowed on instances of "
                                  "%s" % self.__class__)

    def pending(self):
        self._checkClosed()
        if self._sslobj:
            return self._sslobj.pending()
        else:
            return 0

    def shutdown(self, how):
        self._checkClosed()
        self._sslobj = None
        socket.shutdown(self, how)

    def unwrap(self):
        if self._sslobj:
            s = self._sslobj.shutdown()
            self._sslobj = None
            return s
        else:
            raise ValueError("No SSL wrapper around " + str(self))

    def _real_close(self):
        self._sslobj = None
        # self._closed = True
        socket._real_close(self)

    def do_handshake(self, block=False):
        """Perform a TLS/SSL handshake."""

        timeout = self.gettimeout()
        try:
            if timeout == 0.0 and block:
                self.settimeout(None)
            self._sslobj.do_handshake()
        finally:
            self.settimeout(timeout)

    def _real_connect(self, addr, connect_ex):
        if self.server_side:
            raise ValueError("can't connect in server-side mode")
        # Here we assume that the socket is client-side, and not
        # connected at the time of the call.  We connect it, then wrap it.
        if self._connected:
            raise ValueError("attempt to connect already-connected SSLSocket!")
        self._sslobj = self.context._wrap_socket(self, False, self.server_hostname)
        try:
            if connect_ex:
                rc = socket.connect_ex(self, addr)
            else:
                rc = None
                socket.connect(self, addr)
            if not rc:
                if self.do_handshake_on_connect:
                    self.do_handshake()
                self._connected = True
            return rc
        except socket_error:
            self._sslobj = None
            raise

    def connect(self, addr):
        """Connects to remote ADDR, and then wraps the connection in
        an SSL channel."""
        self._real_connect(addr, False)

    def connect_ex(self, addr):
        """Connects to remote ADDR, and then wraps the connection in
        an SSL channel."""
        return self._real_connect(addr, True)

    def accept(self):
        """Accepts a new connection from a remote client, and returns
        a tuple containing that new connection wrapped with a server-side
        SSL channel, and the address of the remote client."""

        newsock, addr = socket.accept(self)
        newsock = self.context.wrap_socket(newsock,
                    do_handshake_on_connect=self.do_handshake_on_connect,
                    suppress_ragged_eofs=self.suppress_ragged_eofs,
                    server_side=True)
        return newsock, addr

    def get_channel_binding(self, cb_type="tls-unique"):
        """Get channel binding data for current connection.  Raise ValueError
        if the requested `cb_type` is not supported.  Return bytes of the data
        or None if the data is not available (e.g. before the handshake).
        """
        if cb_type not in CHANNEL_BINDING_TYPES:
            raise ValueError("Unsupported channel binding type")
        if cb_type != "tls-unique":
            raise NotImplementedError(
                            "{0} channel binding type not implemented"
                            .format(cb_type))
        if self._sslobj is None:
            return None
        return self._sslobj.tls_unique_cb()


def wrap_socket(sock, keyfile=None, certfile=None,
                server_side=False, cert_reqs=CERT_NONE,
                ssl_version=PROTOCOL_TLS, ca_certs=None,
                do_handshake_on_connect=True,
                suppress_ragged_eofs=True,
                ciphers=None):

    return SSLSocket(sock=sock, keyfile=keyfile, certfile=certfile,
                     server_side=server_side, cert_reqs=cert_reqs,
                     ssl_version=ssl_version, ca_certs=ca_certs,
                     do_handshake_on_connect=do_handshake_on_connect,
                     suppress_ragged_eofs=suppress_ragged_eofs,
                     ciphers=ciphers)

# some utility functions

def cert_time_to_seconds(cert_time):
    """Takes a date-time string in standard ASN1_print form
    ("MON DAY 24HOUR:MINUTE:SEC YEAR TIMEZONE") and return
    a Python time value in seconds past the epoch."""

    import time
    return time.mktime(time.strptime(cert_time, "%b %d %H:%M:%S %Y GMT"))

PEM_HEADER = "-----BEGIN CERTIFICATE-----"
PEM_FOOTER = "-----END CERTIFICATE-----"

def DER_cert_to_PEM_cert(der_cert_bytes):
    """Takes a certificate in binary DER format and returns the
    PEM version of it as a string."""

    f = str(base64.standard_b64encode(der_cert_bytes), 'ASCII', 'strict')
    return (PEM_HEADER + '\n' +
            textwrap.fill(f, 64) + '\n' +
            PEM_FOOTER + '\n')

def PEM_cert_to_DER_cert(pem_cert_string):
    """Takes a certificate in ASCII PEM format and returns the
    DER-encoded version of it as a byte sequence"""

    if not pem_cert_string.startswith(PEM_HEADER):
        raise ValueError("Invalid PEM encoding; must start with %s"
                         % PEM_HEADER)
    if not pem_cert_string.strip().endswith(PEM_FOOTER):
        raise ValueError("Invalid PEM encoding; must end with %s"
                         % PEM_FOOTER)
    d = pem_cert_string.strip()[len(PEM_HEADER):-len(PEM_FOOTER)]
    return base64.decodebytes(d.encode('ASCII', 'strict'))

def get_server_certificate(addr, ssl_version=PROTOCOL_SSLv3, ca_certs=None):
    """Retrieve the certificate from the server at the specified address,
    and return it as a PEM-encoded string.
    If 'ca_certs' is specified, validate the server cert against it.
    If 'ssl_version' is specified, use it in the connection attempt."""

    host, port = addr
    if (ca_certs is not None):
        cert_reqs = CERT_REQUIRED
    else:
        cert_reqs = CERT_NONE
    s = create_connection(addr)
    s = wrap_socket(s, ssl_version=ssl_version,
                    cert_reqs=cert_reqs, ca_certs=ca_certs)
    dercert = s.getpeercert(True)
    s.close()
    return DER_cert_to_PEM_cert(dercert)

def get_protocol_name(protocol_code):
    return _PROTOCOL_NAMES.get(protocol_code, '<unknown>')

Youez - 2016 - github.com/yon3zu
LinuXploit