GIF89a=( õ' 7IAXKgNgYvYx\%wh…hŽth%ˆs%—x¨}9®Œ©€&©‰%¶†(¹–.¹5·œD¹&Çš)ÇŸ5ǘ;Í£*È¡&Õ²)ׯ7×µ<Ñ»4ï°3ø‘HÖ§KͯT÷¨Yÿšqÿ»qÿÔFØ !ù ' !ÿ NETSCAPE2.0 , =( þÀ“pH,È¤rÉl:ŸÐ¨tJ­Z¯Ø¬vËíz¿à°xL.›Ïè´zÍn»ßð¸|N¯Ûïø¼~Ïïûÿ€‚ƒ„…†‡ˆ‰Š‹ŒŽ‘’“”•–—˜™š›œžŸ ¡¢£¤¥¦§gª«ªE¯°¨¬ª±²Œ¹º¹E¾­”´ÂB¶¯ §Åȸ»ÑD¾¿Á•ÄÅ®° ÝH¾ÒLÀÆDÙ«D¶BÝïðÀ¾DÑÑÔTÌÍíH òGö¨A RÎڐ |¥ ٭&ºìE8œ¹kGÔAÞpx­a¶­ã R2XB®åE8I€Õ6Xî:vT)äžþÀq¦è³¥ì仕F~%xñ  4#ZÔ‰O|-4Bs‘X:= QÉ œš lºÒyXJŠGȦ|s hÏíK–3l7·B|¥$'7Jީܪ‰‡àá”Dæn=Pƒ ¤Òëí‰`䌨ljóá¯Éüv>á–Á¼5 ½.69ûϸd«­ºÀûnlv©‹ªîf{¬ÜãPbŸ  l5‘ޝpß ´ ˜3aÅùäI«O’ý·‘áÞ‡˜¾Æ‚ÙÏiÇÿ‹Àƒ #öó)pâš Þ½ ‘Ý{ó)vmÞü%D~ 6f s}ŃƒDØW Eþ`‡þ À…L8xá†ç˜{)x`X/> Ì}mø‚–RØ‘*|`D=‚Ø_ ^ð5 !_…'aä“OÚ—7âcð`D”Cx`ÝÂ¥ä‹éY¹—F¼¤¥Š?¡Õ™ n@`} lď’ÄÉ@4>ñd œ à‘vÒxNÃ×™@žd=ˆgsžG±æ ´²æud &p8Qñ)ˆ«lXD©øÜéAžHìySun jª×k*D¤LH] †¦§C™Jä–´Xb~ʪwStŽ6K,°£qÁœ:9ت:¨þªl¨@¡`‚ûÚ ».Û¬¯t‹ÆSÉ[:°=Š‹„‘Nåû”Ìî{¿ÂA ‡Rà›ÀÙ6úë°Ÿð0Ä_ ½;ÃϱîÉì^ÇÛÇ#Ëë¼ôº!±Ä˜íUîÅÇ;0L1óÁµö«p% AÀºU̬ݵ¼á%霼€‡¯Á~`ÏG¯»À× ­²± =4ªnpð3¾¤³¯­ü¾¦îuÙuµÙ®|%2ÊIÿür¦#0·ÔJ``8È@S@5ê¢ ö×Þ^`8EÜ]ý.뜃Âç 7 ú ȉÞj œ½Dç zý¸iþœÑÙûÄë!ˆÞÀl§Ïw‹*DçI€nEX¯¬¼ &A¬Go¼QföõFç°¯;é¦÷îŽêJ°îúôF5¡ÌQ|îúöXªæ»TÁÏyñêï]ê² o óÎC=öõ›ÒÓPB@ D×½œä(>èCÂxŽ`±«Ÿ–JЀ»Û á¤±p+eE0`ëŽ`A Ú/NE€Ø†À9‚@¤à H½7”à‡%B‰`Àl*ƒó‘–‡8 2ñ%¸ —€:Ù1Á‰E¸àux%nP1ð!‘ðC)¾P81lÑɸF#ˆ€{´âé°ÈB„0>±û °b¡Š´±O‚3È–Ù()yRpbµ¨E.Z‘D8ÊH@% òŒx+%Ù˜Æcü »¸˜fõ¬b·d`Fê™8èXH"ÉÈ-±|1Ô6iI, 2““¬$+](A*jÐ QTÂo‰.ÛU슬Œã„Ž`¯SN¡–¶Äåyše¯ª’­¬‚´b¦Éož œ)åyâ@Ì®3 ÎtT̉°&Ø+žLÀf"Ø-|žçÔ>‡Ðv¦Ðžì\‚ Q1)Ž@Žh#aP72”ˆ™¨$‚ !ù " , =( …7IAXG]KgNgYvYxR"k\%w]'}hŽth%ˆg+ˆs%—r.—m3šx3˜x¨}9®€&©€+¨‡7§‰%¶†(¹–.¹œD¹&ǘ;Í•&ײ)×»4ïÌ6ò§KÍ þ@‘pH,È¤rÉl:ŸÐ¨tJ­Z¯Ø¬vËíz¿à°xL.›Ïè´zÍn»ßð¸|N¯Ûïø¼~Ïïûÿ€‚ƒ„…†‡ˆ‰Š‹ŒŽ‘’“”•–—˜™š›œžŸ ¡¢£¤¥¦§g «¬ E ±± ¨­¶°ººE Á´”·®C¬²§Ç¶Œ»ÓDÃÕƷ¯Ê±H½ºM×ÁGÚ¬D¶BËÁ½î½DÓôTÏÛßîG»ôõC×CÌ l&âž:'òtU³6ɹ#·Ø)€'Ü.6±&ëÍÈ» K(8p0N?!æ2"ÛˆNIJX>R¼ÐO‚M '¡¨2¸*Ÿþ>#n↠å@‚<[:¡Iïf’ ¤TÚ˘CdbÜÙ“[«ŽEú5MBo¤×@€`@„€Êt W-3 ¶Ÿ¡BíêäjIÝ…Eò9[T…$íêﯧ„…•s»Óȳ¹€ÅÚdc®UUρ#±Ùïldj?´í¼²`\ŽÁðÞu|3'ÖŒ]ë6 ¶S#²‡˜FKLÈ *N E´‘áäŠ$˜›eÄYD„ºq«.è촁ƒs \-ÔjA 9²õ÷å- üúM[Âx(ís÷ì®x€|í¡Ù’p¦‚ ŽkÛTÇDpE@WÜ ²Ç]kŠ1¨ þ€·Yb ÓÁ‰l°*n0 ç™—žzBdОu¾7ĉBl€â‰-ºx~|UåU‰  h*Hœ|e"#"?vpÄiŠe6^ˆ„+qâŠm8 #VÇá ‘å–ÄV„œ|Аè•m"сœn|@›U¶ÆÎž—Špb¥G¨ED”€±Úê2FÌIç? >Éxå Œ± ¡¤„%‘žjŸ‘ꄯ<Ìaà9ijÐ2˜D¦È&›†Z`‚å]wþ¼Â:ç6àB¤7eFJ|õÒ§Õ,¨äàFÇ®cS·Ê¶+B°,‘Þ˜ºNûãØ>PADÌHD¹æž«ÄÀnÌ¥}­#Ë’ë QÀÉSÌÂÇ2ÌXÀ{æk²lQÁ2«ÊðÀ¯w|2Í h‹ÄÂG€,m¾¶ë3ÐÙ6-´ÅE¬L°ÆIij*K½ÀÇqï`DwVÍQXœÚÔpeœ±¬Ñ q˜§Tœ½µƒ°Œìu Â<¶aØ*At¯lmEØ ü ôÛN[P1ÔÛ¦­±$ÜÆ@`ùåDpy¶yXvCAyåB`ŽD¶ 0QwG#¯ æš[^Äþ $ÀÓÝǦ{„L™[±úKÄgÌ;ï£S~¹ìGX.ôgoT.»åˆ°ùŸûù¡?1zö¦Ÿž:ÅgÁ|ìL¹ „®£œŠ‚à0œ]PÁ^p F<"•ç?!,ñ‡N4—…PÄ Á„ö¨Û:Tè@hÀ‹%táÿ:ø-žI<`þ‹p I….)^ 40D#p@ƒj4–؀:²‰1Øâr˜¼F2oW¼#Z†;$Q q” ‘ ÂK¦ñNl#29 !’F@¥Bh·ᏀL!—XFóLH‘Kh¤.«hE&JòG¨¥<™WN!€ÑÙÚˆY„@†>Œž19J" 2,/ &.GXB%ÌRÈ9B6¹W]’î×ÔW¥’IÎ$ ñ‹ÓŒE8YÆ ¼³™ñA5“à®Q.aŸB€&Ø©³ JÁ—! ¦t)K%tœ-¦JF bòNMxLôþ)ÐR¸Ð™‘ èÝ6‘O!THÌ„HÛ ‰ !ù ) , =( …AXKgNgYvYxR"k\%wh…hŽh%ˆg+ˆs%—r.—x3˜x¨}9®€&©€+¨Œ,©‡7§‰%¶†(¹–.¹5·&Çš)ǘ;Í•&×£*Ȳ)ׯ7×»4ï°3øÌ6ò‘HÖ§KÍ»Hó¯T÷¨Yÿ»qÿÇhÿ þÀ”pH,È¤rÉl:ŸÐ¨tJ­Z¯Ø¬vËíz¿à°xL.›Ïè´zÍn»ßð¸|N¯Ûïø¼~Ïïûÿ€‚ƒ„…†‡ˆ‰Š‹ŒŽ‘’“”•–—˜™š›œžŸ ¡¢£¤¥¦§g ª« E$±²¨ª­ · °²½$E$ÂÕ««D· Í ¿¦Ç¶¸ÌŒ¾³CÃÅÆ E ééH½MÛÂGâªD­ çBêêϾD²ÒaÀà€Š1r­ðÓ¤ ÔožzU!L˜C'¾yW½UGtäÇïÙllê0×àÂuGþ)AÀs[þ·xì ÁxO%ƒûX2ó—  P£n›R/¡ÑšHše+êDm?# —‘Ç£6¡8íJ¡ŸâDiäªM¥Ö„ôj“¬¹£5oQ7°- <‡ *´lãÓŒ2r/a!l)dÈ A™ÈE¢ôÔ͆…ð ;Ö˜c ¡%ß‚’Ùˆâ¸b½—pe~C"BíëÚHïeF2§æŠ8qb t_`urŠeü wÅu3êæPv§h•"ß`íÍxçLĹÜÖ3á  ~Öº“®›¸ÏMDfJÙ °„ÛµáWõ%§œ‚à©–‚X ÓØ)@®Ñ›Eþ´wëuÅSxb8y\mÖzœ¥§ZbºE—ÂLªÌw!y(>¡™wú=Ç|ÅÝs¢d €CÁW)HÜcC$€L Ä7„r.á\{)@ð` @ äXÈ$PD” `šaG:§æˆOˆ72EÐamn]ù"ŒcÊxÑŒ° &dR8`g«iÙŸLR!¦P …d’ä¡“¦ðÎTƒ¦ià|À _ ¥ Qi#¦Šg›Æ ›noMµ ›V ã£)p ç£ÎW…š=Âeªk§†j„ ´®1ß²sÉxéW«jšl|0¯B0Û, \jÛ´›6±¬¶C ÛíWþï|ëÙ‹¸ñzĸV {ì;Ýñn¼òVˆm³I¼³.Ðã¤PN¥ ²µ¼„µCã+¹ÍByî£Ñ¾HŸ›ëê 7ìYÆFTk¨SaoaY$Dµœìï¿Ã29RÈkt Çïfñ ÇÒ:ÀÐSp¹3ÇI¨â¥DZÄ ü9Ïýögñ½­uÔ*3)O‘˜Ö[_hv ,àî×Et Ÿé¶BH€ Õ[ü±64M@ÔSÌM7dÐl5-ÄÙU܍´©zߌ3Ô€3ž„ „ ¶ÛPô½5×g› êÚ˜kN„Ý…0Îj4€Ìë°“#{þÕ3S2çKÜ'ợlø¼Ú2K{° {Û¶?žm𸧠ËI¼nEò='êüóºè^üæÃ_Û=°óž‚ì#Oý¿Í'¡½áo..ÏYìnüñCœO±Áa¿¢Kô½o,üÄËbö²çºíï{ËC Ú— "”Ï{ËK ÍÒw„õ±Oz dÕ¨à:$ ƒô—«v»] A#ð «€¿šéz)Rx׿ˆ¥‚d``èw-îyÏf×K!ð€þ­Ð|ìPľ„=Ì`ý(f” 'Pa ¥ÐBJa%Ðâf§„%Š¡}FàáÝ×6>ÉäŠG"éŽè=ø!oа^FP¼Ø©Q„ÀCÙÁ`(Ž\ÄÝ® ©Â$<n@dÄ E#ììUÒI! ‚#lù‹`k¦ÐÇ'Rró’ZýNBÈMF Í[¤+‹ðɈ-áwj¨¥þ8¾rá ,VÂh„"|½œ=×G_¦Ñ™EØ 0i*%̲˜Æda0mV‚k¾)›;„&6 p>ÓjK “¦Ç# âDÂ:ûc?:R Ó¬fÞéI-Ì“•Ã<ä=™Ï7˜3œ¨˜c2ŒW ,ˆ”8(T™P‰F¡Jhç"‚ ; 403WebShell
403Webshell
Server IP : 104.21.83.152  /  Your IP : 216.73.216.195
Web Server : LiteSpeed
System : Linux premium229.web-hosting.com 4.18.0-553.45.1.lve.el8.x86_64 #1 SMP Wed Mar 26 12:08:09 UTC 2025 x86_64
User : akhalid ( 749)
PHP Version : 8.3.22
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /lib64/python2.7/multiprocessing/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /lib64/python2.7/multiprocessing/pool.py
#
# Module providing the `Pool` class for managing a process pool
#
# multiprocessing/pool.py
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
# 3. Neither the name of author nor the names of any contributors may be
#    used to endorse or promote products derived from this software
#    without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#

__all__ = ['Pool']

#
# Imports
#

import threading
import Queue
import itertools
import collections
import time

from multiprocessing import Process, cpu_count, TimeoutError
from multiprocessing.util import Finalize, debug

#
# Constants representing the state of a pool
#

RUN = 0
CLOSE = 1
TERMINATE = 2

#
# Miscellaneous
#

job_counter = itertools.count()

def mapstar(args):
    return map(*args)

#
# Code run by worker processes
#

class MaybeEncodingError(Exception):
    """Wraps possible unpickleable errors, so they can be
    safely sent through the socket."""

    def __init__(self, exc, value):
        self.exc = repr(exc)
        self.value = repr(value)
        super(MaybeEncodingError, self).__init__(self.exc, self.value)

    def __str__(self):
        return "Error sending result: '%s'. Reason: '%s'" % (self.value,
                                                             self.exc)

    def __repr__(self):
        return "<MaybeEncodingError: %s>" % str(self)


def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
    assert maxtasks is None or (type(maxtasks) in (int, long) and maxtasks > 0)
    put = outqueue.put
    get = inqueue.get
    if hasattr(inqueue, '_writer'):
        inqueue._writer.close()
        outqueue._reader.close()

    if initializer is not None:
        initializer(*initargs)

    completed = 0
    while maxtasks is None or (maxtasks and completed < maxtasks):
        try:
            task = get()
        except (EOFError, IOError):
            debug('worker got EOFError or IOError -- exiting')
            break

        if task is None:
            debug('worker got sentinel -- exiting')
            break

        job, i, func, args, kwds = task
        try:
            result = (True, func(*args, **kwds))
        except Exception, e:
            result = (False, e)
        try:
            put((job, i, result))
        except Exception as e:
            wrapped = MaybeEncodingError(e, result[1])
            debug("Possible encoding error while sending result: %s" % (
                wrapped))
            put((job, i, (False, wrapped)))

        task = job = result = func = args = kwds = None
        completed += 1
    debug('worker exiting after %d tasks' % completed)

#
# Class representing a process pool
#

class Pool(object):
    '''
    Class which supports an async version of the `apply()` builtin
    '''
    Process = Process

    def __init__(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None):
        self._setup_queues()
        self._taskqueue = Queue.Queue()
        self._cache = {}
        self._state = RUN
        self._maxtasksperchild = maxtasksperchild
        self._initializer = initializer
        self._initargs = initargs

        if processes is None:
            try:
                processes = cpu_count()
            except NotImplementedError:
                processes = 1
        if processes < 1:
            raise ValueError("Number of processes must be at least 1")

        if initializer is not None and not hasattr(initializer, '__call__'):
            raise TypeError('initializer must be a callable')

        self._processes = processes
        self._pool = []
        self._repopulate_pool()

        self._worker_handler = threading.Thread(
            target=Pool._handle_workers,
            args=(self, )
            )
        self._worker_handler.daemon = True
        self._worker_handler._state = RUN
        self._worker_handler.start()


        self._task_handler = threading.Thread(
            target=Pool._handle_tasks,
            args=(self._taskqueue, self._quick_put, self._outqueue,
                  self._pool, self._cache)
            )
        self._task_handler.daemon = True
        self._task_handler._state = RUN
        self._task_handler.start()

        self._result_handler = threading.Thread(
            target=Pool._handle_results,
            args=(self._outqueue, self._quick_get, self._cache)
            )
        self._result_handler.daemon = True
        self._result_handler._state = RUN
        self._result_handler.start()

        self._terminate = Finalize(
            self, self._terminate_pool,
            args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
                  self._worker_handler, self._task_handler,
                  self._result_handler, self._cache),
            exitpriority=15
            )

    def _join_exited_workers(self):
        """Cleanup after any worker processes which have exited due to reaching
        their specified lifetime.  Returns True if any workers were cleaned up.
        """
        cleaned = False
        for i in reversed(range(len(self._pool))):
            worker = self._pool[i]
            if worker.exitcode is not None:
                # worker exited
                debug('cleaning up worker %d' % i)
                worker.join()
                cleaned = True
                del self._pool[i]
        return cleaned

    def _repopulate_pool(self):
        """Bring the number of pool processes up to the specified number,
        for use after reaping workers which have exited.
        """
        for i in range(self._processes - len(self._pool)):
            w = self.Process(target=worker,
                             args=(self._inqueue, self._outqueue,
                                   self._initializer,
                                   self._initargs, self._maxtasksperchild)
                            )
            self._pool.append(w)
            w.name = w.name.replace('Process', 'PoolWorker')
            w.daemon = True
            w.start()
            debug('added worker')

    def _maintain_pool(self):
        """Clean up any exited workers and start replacements for them.
        """
        if self._join_exited_workers():
            self._repopulate_pool()

    def _setup_queues(self):
        from .queues import SimpleQueue
        self._inqueue = SimpleQueue()
        self._outqueue = SimpleQueue()
        self._quick_put = self._inqueue._writer.send
        self._quick_get = self._outqueue._reader.recv

    def apply(self, func, args=(), kwds={}):
        '''
        Equivalent of `apply()` builtin
        '''
        assert self._state == RUN
        return self.apply_async(func, args, kwds).get()

    def map(self, func, iterable, chunksize=None):
        '''
        Equivalent of `map()` builtin
        '''
        assert self._state == RUN
        return self.map_async(func, iterable, chunksize).get()

    def imap(self, func, iterable, chunksize=1):
        '''
        Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
        '''
        assert self._state == RUN
        if chunksize == 1:
            result = IMapIterator(self._cache)
            self._taskqueue.put((((result._job, i, func, (x,), {})
                         for i, x in enumerate(iterable)), result._set_length))
            return result
        else:
            assert chunksize > 1
            task_batches = Pool._get_tasks(func, iterable, chunksize)
            result = IMapIterator(self._cache)
            self._taskqueue.put((((result._job, i, mapstar, (x,), {})
                     for i, x in enumerate(task_batches)), result._set_length))
            return (item for chunk in result for item in chunk)

    def imap_unordered(self, func, iterable, chunksize=1):
        '''
        Like `imap()` method but ordering of results is arbitrary
        '''
        assert self._state == RUN
        if chunksize == 1:
            result = IMapUnorderedIterator(self._cache)
            self._taskqueue.put((((result._job, i, func, (x,), {})
                         for i, x in enumerate(iterable)), result._set_length))
            return result
        else:
            assert chunksize > 1
            task_batches = Pool._get_tasks(func, iterable, chunksize)
            result = IMapUnorderedIterator(self._cache)
            self._taskqueue.put((((result._job, i, mapstar, (x,), {})
                     for i, x in enumerate(task_batches)), result._set_length))
            return (item for chunk in result for item in chunk)

    def apply_async(self, func, args=(), kwds={}, callback=None):
        '''
        Asynchronous equivalent of `apply()` builtin
        '''
        assert self._state == RUN
        result = ApplyResult(self._cache, callback)
        self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
        return result

    def map_async(self, func, iterable, chunksize=None, callback=None):
        '''
        Asynchronous equivalent of `map()` builtin
        '''
        assert self._state == RUN
        if not hasattr(iterable, '__len__'):
            iterable = list(iterable)

        if chunksize is None:
            chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
            if extra:
                chunksize += 1
        if len(iterable) == 0:
            chunksize = 0

        task_batches = Pool._get_tasks(func, iterable, chunksize)
        result = MapResult(self._cache, chunksize, len(iterable), callback)
        self._taskqueue.put((((result._job, i, mapstar, (x,), {})
                              for i, x in enumerate(task_batches)), None))
        return result

    @staticmethod
    def _handle_workers(pool):
        thread = threading.current_thread()

        # Keep maintaining workers until the cache gets drained, unless the pool
        # is terminated.
        while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
            pool._maintain_pool()
            time.sleep(0.1)
        # send sentinel to stop workers
        pool._taskqueue.put(None)
        debug('worker handler exiting')

    @staticmethod
    def _handle_tasks(taskqueue, put, outqueue, pool, cache):
        thread = threading.current_thread()

        for taskseq, set_length in iter(taskqueue.get, None):
            task = None
            i = -1
            try:
                for i, task in enumerate(taskseq):
                    if thread._state:
                        debug('task handler found thread._state != RUN')
                        break
                    try:
                        put(task)
                    except Exception as e:
                        job, ind = task[:2]
                        try:
                            cache[job]._set(ind, (False, e))
                        except KeyError:
                            pass
                else:
                    if set_length:
                        debug('doing set_length()')
                        set_length(i+1)
                    continue
                break
            except Exception as ex:
                job, ind = task[:2] if task else (0, 0)
                if job in cache:
                    cache[job]._set(ind + 1, (False, ex))
                if set_length:
                    debug('doing set_length()')
                    set_length(i+1)
            finally:
                task = taskseq = job = None
        else:
            debug('task handler got sentinel')

        try:
            # tell result handler to finish when cache is empty
            debug('task handler sending sentinel to result handler')
            outqueue.put(None)

            # tell workers there is no more work
            debug('task handler sending sentinel to workers')
            for p in pool:
                put(None)
        except IOError:
            debug('task handler got IOError when sending sentinels')

        debug('task handler exiting')

    @staticmethod
    def _handle_results(outqueue, get, cache):
        thread = threading.current_thread()

        while 1:
            try:
                task = get()
            except (IOError, EOFError):
                debug('result handler got EOFError/IOError -- exiting')
                return

            if thread._state:
                assert thread._state == TERMINATE
                debug('result handler found thread._state=TERMINATE')
                break

            if task is None:
                debug('result handler got sentinel')
                break

            job, i, obj = task
            try:
                cache[job]._set(i, obj)
            except KeyError:
                pass
            task = job = obj = None

        while cache and thread._state != TERMINATE:
            try:
                task = get()
            except (IOError, EOFError):
                debug('result handler got EOFError/IOError -- exiting')
                return

            if task is None:
                debug('result handler ignoring extra sentinel')
                continue
            job, i, obj = task
            try:
                cache[job]._set(i, obj)
            except KeyError:
                pass
            task = job = obj = None

        if hasattr(outqueue, '_reader'):
            debug('ensuring that outqueue is not full')
            # If we don't make room available in outqueue then
            # attempts to add the sentinel (None) to outqueue may
            # block.  There is guaranteed to be no more than 2 sentinels.
            try:
                for i in range(10):
                    if not outqueue._reader.poll():
                        break
                    get()
            except (IOError, EOFError):
                pass

        debug('result handler exiting: len(cache)=%s, thread._state=%s',
              len(cache), thread._state)

    @staticmethod
    def _get_tasks(func, it, size):
        it = iter(it)
        while 1:
            x = tuple(itertools.islice(it, size))
            if not x:
                return
            yield (func, x)

    def __reduce__(self):
        raise NotImplementedError(
              'pool objects cannot be passed between processes or pickled'
              )

    def close(self):
        debug('closing pool')
        if self._state == RUN:
            self._state = CLOSE
            self._worker_handler._state = CLOSE

    def terminate(self):
        debug('terminating pool')
        self._state = TERMINATE
        self._worker_handler._state = TERMINATE
        self._terminate()

    def join(self):
        debug('joining pool')
        assert self._state in (CLOSE, TERMINATE)
        self._worker_handler.join()
        self._task_handler.join()
        self._result_handler.join()
        for p in self._pool:
            p.join()

    @staticmethod
    def _help_stuff_finish(inqueue, task_handler, size):
        # task_handler may be blocked trying to put items on inqueue
        debug('removing tasks from inqueue until task handler finished')
        inqueue._rlock.acquire()
        while task_handler.is_alive() and inqueue._reader.poll():
            inqueue._reader.recv()
            time.sleep(0)

    @classmethod
    def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
                        worker_handler, task_handler, result_handler, cache):
        # this is guaranteed to only be called once
        debug('finalizing pool')

        worker_handler._state = TERMINATE
        task_handler._state = TERMINATE

        debug('helping task handler/workers to finish')
        cls._help_stuff_finish(inqueue, task_handler, len(pool))

        assert result_handler.is_alive() or len(cache) == 0

        result_handler._state = TERMINATE
        outqueue.put(None)                  # sentinel

        # We must wait for the worker handler to exit before terminating
        # workers because we don't want workers to be restarted behind our back.
        debug('joining worker handler')
        if threading.current_thread() is not worker_handler:
            worker_handler.join(1e100)

        # Terminate workers which haven't already finished.
        if pool and hasattr(pool[0], 'terminate'):
            debug('terminating workers')
            for p in pool:
                if p.exitcode is None:
                    p.terminate()

        debug('joining task handler')
        if threading.current_thread() is not task_handler:
            task_handler.join(1e100)

        debug('joining result handler')
        if threading.current_thread() is not result_handler:
            result_handler.join(1e100)

        if pool and hasattr(pool[0], 'terminate'):
            debug('joining pool workers')
            for p in pool:
                if p.is_alive():
                    # worker has not yet exited
                    debug('cleaning up worker %d' % p.pid)
                    p.join()

#
# Class whose instances are returned by `Pool.apply_async()`
#

class ApplyResult(object):

    def __init__(self, cache, callback):
        self._cond = threading.Condition(threading.Lock())
        self._job = job_counter.next()
        self._cache = cache
        self._ready = False
        self._callback = callback
        cache[self._job] = self

    def ready(self):
        return self._ready

    def successful(self):
        assert self._ready
        return self._success

    def wait(self, timeout=None):
        self._cond.acquire()
        try:
            if not self._ready:
                self._cond.wait(timeout)
        finally:
            self._cond.release()

    def get(self, timeout=None):
        self.wait(timeout)
        if not self._ready:
            raise TimeoutError
        if self._success:
            return self._value
        else:
            raise self._value

    def _set(self, i, obj):
        self._success, self._value = obj
        if self._callback and self._success:
            self._callback(self._value)
        self._cond.acquire()
        try:
            self._ready = True
            self._cond.notify()
        finally:
            self._cond.release()
        del self._cache[self._job]

AsyncResult = ApplyResult       # create alias -- see #17805

#
# Class whose instances are returned by `Pool.map_async()`
#

class MapResult(ApplyResult):

    def __init__(self, cache, chunksize, length, callback):
        ApplyResult.__init__(self, cache, callback)
        self._success = True
        self._value = [None] * length
        self._chunksize = chunksize
        if chunksize <= 0:
            self._number_left = 0
            self._ready = True
            del cache[self._job]
        else:
            self._number_left = length//chunksize + bool(length % chunksize)

    def _set(self, i, success_result):
        success, result = success_result
        if success:
            self._value[i*self._chunksize:(i+1)*self._chunksize] = result
            self._number_left -= 1
            if self._number_left == 0:
                if self._callback:
                    self._callback(self._value)
                del self._cache[self._job]
                self._cond.acquire()
                try:
                    self._ready = True
                    self._cond.notify()
                finally:
                    self._cond.release()

        else:
            self._success = False
            self._value = result
            del self._cache[self._job]
            self._cond.acquire()
            try:
                self._ready = True
                self._cond.notify()
            finally:
                self._cond.release()

#
# Class whose instances are returned by `Pool.imap()`
#

class IMapIterator(object):

    def __init__(self, cache):
        self._cond = threading.Condition(threading.Lock())
        self._job = job_counter.next()
        self._cache = cache
        self._items = collections.deque()
        self._index = 0
        self._length = None
        self._unsorted = {}
        cache[self._job] = self

    def __iter__(self):
        return self

    def next(self, timeout=None):
        self._cond.acquire()
        try:
            try:
                item = self._items.popleft()
            except IndexError:
                if self._index == self._length:
                    raise StopIteration
                self._cond.wait(timeout)
                try:
                    item = self._items.popleft()
                except IndexError:
                    if self._index == self._length:
                        raise StopIteration
                    raise TimeoutError
        finally:
            self._cond.release()

        success, value = item
        if success:
            return value
        raise value

    __next__ = next                    # XXX

    def _set(self, i, obj):
        self._cond.acquire()
        try:
            if self._index == i:
                self._items.append(obj)
                self._index += 1
                while self._index in self._unsorted:
                    obj = self._unsorted.pop(self._index)
                    self._items.append(obj)
                    self._index += 1
                self._cond.notify()
            else:
                self._unsorted[i] = obj

            if self._index == self._length:
                del self._cache[self._job]
        finally:
            self._cond.release()

    def _set_length(self, length):
        self._cond.acquire()
        try:
            self._length = length
            if self._index == self._length:
                self._cond.notify()
                del self._cache[self._job]
        finally:
            self._cond.release()

#
# Class whose instances are returned by `Pool.imap_unordered()`
#

class IMapUnorderedIterator(IMapIterator):

    def _set(self, i, obj):
        self._cond.acquire()
        try:
            self._items.append(obj)
            self._index += 1
            self._cond.notify()
            if self._index == self._length:
                del self._cache[self._job]
        finally:
            self._cond.release()

#
#
#

class ThreadPool(Pool):

    from .dummy import Process

    def __init__(self, processes=None, initializer=None, initargs=()):
        Pool.__init__(self, processes, initializer, initargs)

    def _setup_queues(self):
        self._inqueue = Queue.Queue()
        self._outqueue = Queue.Queue()
        self._quick_put = self._inqueue.put
        self._quick_get = self._outqueue.get

    @staticmethod
    def _help_stuff_finish(inqueue, task_handler, size):
        # put sentinels at head of inqueue to make workers finish
        inqueue.not_empty.acquire()
        try:
            inqueue.queue.clear()
            inqueue.queue.extend([None] * size)
            inqueue.not_empty.notify_all()
        finally:
            inqueue.not_empty.release()

Youez - 2016 - github.com/yon3zu
LinuXploit