{G_Tc@sLddlZddlmZmZddlmZddlZddlZddlZej j dZ ej j dZ ddl Z ddlZddlZddlZddlZyddlZWnek reZnXddlmZdefdYZd e jfd YZd ejfd YZd efdYZdefdYZdefdYZdejfdYZdej fdYZ dej!fdYZ!dej fdYZ"dej#fdYZ#dej$fdYZ$dej%fd YZ%d!Z&e'd"krHe&ndS(#iN(tverboset cpython_only(tassert_python_oktthreadt threading(t lock_teststCountercBs,eZdZdZdZdZRS(cCs d|_dS(Ni(tvalue(tself((s//usr/local/lib/python2.7/test/test_threading.pyt__init__scCs|jd7_dS(Ni(R(R((s//usr/local/lib/python2.7/test/test_threading.pytincscCs|jd8_dS(Ni(R(R((s//usr/local/lib/python2.7/test/test_threading.pytdecscCs|jS(N(R(R((s//usr/local/lib/python2.7/test/test_threading.pytget s(t__name__t __module__R R R R (((s//usr/local/lib/python2.7/test/test_threading.pyRs   t TestThreadcBseZdZdZRS(cCs>tjj|d|||_||_||_||_dS(Ntname(RtThreadR ttestcasetsematmutextnrunning(RRRRRR((s//usr/local/lib/python2.7/test/test_threading.pyR $s    c Cs tjd}tr/d|j|dfGHn|j|jL|jjtrk|jjGdGHn|jj |jjdkWdQXt j |trdG|jGdGHn|jU|jj |jj |jjdktrd |j|jjfGHnWdQXWdQXdS( Ng@stask %s will run for %.1f usecg.Astasks are runningittasktdoneis$%s is finished. %d tasks are running( trandomRRRRRR R Rt assertTruettimetsleepR (Rtdelay((s//usr/local/lib/python2.7/test/test_threading.pytrun+s&   %   (R RR R(((s//usr/local/lib/python2.7/test/test_threading.pyR#s t BaseTestCasecBseZdZdZRS(cCstjj|_dS(N(ttestt test_supporttthreading_setupt_threads(R((s//usr/local/lib/python2.7/test/test_threading.pytsetUpDscCs$tjj|jtjjdS(N(RR tthreading_cleanupR"t reap_children(R((s//usr/local/lib/python2.7/test/test_threading.pyttearDownGs(R RR#R&(((s//usr/local/lib/python2.7/test/test_threading.pyRCs t ThreadTestscBseZdZdZdZdZdZdZdZdZ dZ d Z d Z d Z ejeed d dZejeed ddZdZRS(cCs_d}tjdd}tj}t}g}xyt|D]k}td|||||}|j||j|jd|j t j dt ||jq@WtrdGHnxw|D]o}|j||j |j |j|jd|j|jdk|j t j dt |qWtrEd GHn|j|jddS( Ni Ris ss!waiting for all tasks to completeissall tasks done(RtBoundedSemaphoretRLockRtrangeRtappendt assertEqualtidenttNoneRtretmatchtreprtstartRtjointis_alivetassertNotEqualt assertFalseR (RtNUMTASKSRRt numrunningtthreadstitt((s//usr/local/lib/python2.7/test/test_threading.pyttest_various_opsPs,     #cs|jtjjdkfd}tjgtj|dj|jddktj d=dS(Ncs$jtjjjdS(N(R+Rt currentThreadR-tset((RR-(s//usr/local/lib/python2.7/test/test_threading.pytfrsi(( R6RR=R-R.tEventRtstart_new_threadtwaitt_active(RR?((RR-s//usr/local/lib/python2.7/test/test_threading.pyt"test_ident_of_no_threading_threadsos  cCs^trdGHnytjdWn!tjk rB|jdnX|jtjddS(Nswith 256kB thread stack size...is4platform does not support changing thread stack sizei(RRt stack_sizeRterrortskipTestR<(R((s//usr/local/lib/python2.7/test/test_threading.pyttest_various_ops_small_stack~s cCs^trdGHnytjdWn!tjk rB|jdnX|jtjddS(Nswith 1MB thread stack size...is4platform does not support changing thread stack sizei(RRRERRFRGR<(R((s//usr/local/lib/python2.7/test/test_threading.pyttest_various_ops_large_stacks cCsyd}tj}|jtj||f}|j|j|tj|jtj|tjtj|=dS(NcSstj|jdS(N(Rtcurrent_threadtrelease(R((s//usr/local/lib/python2.7/test/test_threading.pyR?s ( RtLocktacquireRRAtassertInRCtassertIsInstancet _DummyThread(RR?Rttid((s//usr/local/lib/python2.7/test/test_threading.pyttest_foreign_threads    c sRyddl}Wntk r0|jdnX|jj}dtfdY|j}tj}y)||j ||}x t rqWWnk rnX|j dy|j |dWnt k rnXtjtjdtjffdY}|}t |_|jtrGd GHntrUd GHn||j d|}|j |d trd GHnj}|j|trd GHn|j|j trdGHn||j |j|}|j |dtr dGHnjdd|j|jtr8dGHn|jrN|jndS(Nisrequires ctypestAsyncExccBseZRS((R R(((s//usr/local/lib/python2.7/test/test_threading.pyRSssAsyncExc not raiseditWorkercseZfdZRS(csktj|_t|_y(x!tr>jtjdqWWn$k rft|_jnXdS(Ng?( Rt get_identtidtFalsetfinishedtTrueR>RR(R(RStworker_saw_exceptiontworker_started(s//usr/local/lib/python2.7/test/test_threading.pyRs     (R RR((RSRZR[(s//usr/local/lib/python2.7/test/test_threading.pyRTss started worker threads trying nonsensical thread idis, waiting for worker thread to get starteds" verifying worker hasn't exiteds2 attempting to raise asynch exception in workers5 waiting for worker to say it caught the exceptionttimeouti s all OK -- joining worker(tctypest ImportErrorRGt pythonapitPyThreadState_SetAsyncExct Exceptiont py_objectRRUtc_longRYtfailR,tUnboundLocalErrorRR@RtdaemonR2RRBRRXRVR3( RR]t set_async_exct exceptionRQtresultRTR;tret((RSRZR[s//usr/local/lib/python2.7/test/test_threading.pyttest_PyThreadState_SetAsyncExcsd         %      cCstd}tj}|t_zHtjdd}|jtj|j|j|tjkdWd|t_XdS(NcWstjdS(N(RRF(targs((s//usr/local/lib/python2.7/test/test_threading.pytfail_new_threadsttargetcSsdS(N(R.(((s//usr/local/lib/python2.7/test/test_threading.pytss:Failed to cleanup _limbo map on failure of Thread.start().( Rt_start_new_threadRt assertRaisesRRFR2R6t_limbo(RRmRpR;((s//usr/local/lib/python2.7/test/test_threading.pyttest_limbo_cleanups     cCs`yddl}Wntk r0|jdnXtjtjddg}|j|ddS(Nisrequires ctypess-csKif 1: import ctypes, sys, time, thread # This lock is used as a simple event variable. ready = thread.allocate_lock() ready.acquire() # Module globals are cleared before __del__ is run # So we save the functions in class dict class C: ensure = ctypes.pythonapi.PyGILState_Ensure release = ctypes.pythonapi.PyGILState_Release def __del__(self): state = self.ensure() self.release(state) def waitingThread(): x = C() ready.release() time.sleep(100) thread.start_new_thread(waitingThread, ()) ready.acquire() # Be sure the other thread is waiting. sys.exit(42) i*(R]R^RGt subprocesstcalltsyst executableR,(RR]trc((s//usr/local/lib/python2.7/test/test_threading.pyttest_finalize_runnning_thread s  cCstjtjddgdtjdtj}|j|jj|j|jj|j \}}|j }|j |dkd|j |dkdt |dS( Ns-csOif 1: import sys, threading # A deadlock-killer, to prevent the # testsuite to hang forever def killer(): import os, time time.sleep(2) print 'program blocked; aborting' os._exit(2) t = threading.Thread(target=killer) t.daemon = True t.start() # This is the trace function def func(frame, event, arg): threading.current_thread() return func sys.settrace(func) tstdouttstderrisinterpreted was blockedisUnexpected error: (RttPopenRvRwtPIPEt addCleanupRztcloseR{t communicatet returncodeR6RR1(RtpRzR{Rx((s//usr/local/lib/python2.7/test/test_threading.pyttest_finalize_with_trace1s    cCstjtjddgdtjdtj}|j|jj|j|jj|j \}}|j |j dt j dd|t jj }|j |ddS(Ns-csif 1: import threading from time import sleep def child(): sleep(1) # As a non-daemon thread we SHOULD wake up and nothing # should be torn down yet print "Woke up, sleep function is:", sleep threading.Thread(target=child).start() raise SystemExit RzR{s5Woke up, sleep function is: s ^\[\d+ refs\]t(RtR|RvRwR}R~RzRR{RR,tstripR/tsubt MULTILINE(RRRzR{((s//usr/local/lib/python2.7/test/test_threading.pyttest_join_nondaemon_on_shutdownSs   !cCstj}tj}z~xwtddD]f}tj|dtjdd}|j|j|}|j ||d||fq(WWdtj|XdS(NiidiRncSsdS(N(R.(((s//usr/local/lib/python2.7/test/test_threading.pyRowss&#1703448 triggered after %d trials: %s( Rt enumerateRvtgetcheckintervaltxrangetsetcheckintervalRR2R3t assertNotIn(Rtenumt old_intervalR:R;tl((s//usr/local/lib/python2.7/test/test_threading.pyttest_enumerate_after_joinms      cCsdtfdY}|dt}tj|}|jj~|jd|ddtj ||dt }tj|}|jj~|jd|ddtj |dS(NtRunSelfFunctioncBseZdZdZRS(c SsH||_tjd|jd|fdi|d6|_|jjdS(NRnRltkwargst yet_another(t should_raiseRRt_runRR2(RR((s//usr/local/lib/python2.7/test/test_threading.pyR s   cSs|jrtndS(N(Rt SystemExit(Rt other_refR((s//usr/local/lib/python2.7/test/test_threading.pyRs (R RR R(((s//usr/local/lib/python2.7/test/test_threading.pyRs Rtmsgs%d references still around( tobjectRWtweakreftrefRR3R,R.Rvt getrefcountRY(RRt cyclic_objecttweak_cyclic_objecttraising_cyclic_objecttweak_raising_cyclic_object((s//usr/local/lib/python2.7/test/test_threading.pyttest_no_refcycle_through_targets  tforkstest needs fork()cCsBd}td|\}}}|j|d|j|ddS(Nsif 1: import thread, threading, os, time def background_thread(evt): # Creates and registers the _DummyThread instance threading.current_thread() evt.set() time.sleep(10) evt = threading.Event() thread.start_new_thread(background_thread, (evt,)) evt.wait() assert threading.active_count() == 2, threading.active_count() if os.fork() == 0: assert threading.active_count() == 1, threading.active_count() os._exit(0) else: os.wait() s-cR(RR,(Rtcodet_toutterr((s//usr/local/lib/python2.7/test/test_threading.pyttest_dummy_thread_after_forkssneeds os.fork()cCstj}tjdzxtdD]}tjdd}|jtj}|dkrtj |j r~dndq)|j tj |d\}}|j d|q)WWdtj|XdS(Ni iRncSsdS(N(R.(((s//usr/local/lib/python2.7/test/test_threading.pyRosii(RvRRR*RRR2tosRt_exitR4R3twaitpidR,(RRR:R;tpidtstatus((s//usr/local/lib/python2.7/test/test_threading.pyttest_is_alive_after_forks     " cCsxtddD]}tj|}gt|D]}tjd|j^q2}x|D]}|jqZWx|D]}|jquWgt|D]}tjd|j^q}x|D]}|jqWx|D]}|jqW|jt |jqWdS(Nii Rn( R*RR(RRMR2R3RKRqt ValueError(RtlimittbsRR9R;((s//usr/local/lib/python2.7/test/test_threading.pyttest_BoundedSemaphore_limits+  +  (R RR<RDRHRIRRRkRsRyRRRRtunittestt skipUnlessthasattrRRRR(((s//usr/local/lib/python2.7/test/test_threading.pyR'Ls    X  $ "   $$tThreadJoinOnShutdowncBseZdZdZdZejeeddej e j ekd d Z ejeeddej e j ekd d Z d Zejeeddej e j ekd d Zejeeddej e j ekd dZej e j ekd dZejeeddej e j ekd dZeej edkddZRS(tfreebsd4tfreebsd5tfreebsd6tnetbsd5tos2emxcCsd|}tjtjd|gdtj}|j}|jjjdd}|jj |j |d|j |dkd|j |d kd dS( Nsif 1: import sys, os, time, threading # a thread, which waits for the main program to terminate def joiningfunc(mainthread): mainthread.join() print 'end of thread' s-cRzs Rsend of main end of thread isinterpreter was blockedisUnexpected error( RtR|RvRwR}RBRztreadtreplaceRR,R6R(RtscriptRRxtdata((s//usr/local/lib/python2.7/test/test_threading.pyt _run_and_joins $  cCsd}|j|dS(Nsif 1: import os t = threading.Thread(target=joiningfunc, args=(threading.current_thread(),)) t.start() time.sleep(0.1) print 'end of main' (R(RR((s//usr/local/lib/python2.7/test/test_threading.pyttest_1_join_on_shutdowns Rsneeds os.fork()sdue to known OS bugcCsd}|j|dS(NsFif 1: childpid = os.fork() if childpid != 0: os.waitpid(childpid, 0) sys.exit(0) t = threading.Thread(target=joiningfunc, args=(threading.current_thread(),)) t.start() print 'end of main' (R(RR((s//usr/local/lib/python2.7/test/test_threading.pyttest_2_join_in_forked_process scCsd}|j|dS(Ns9if 1: main_thread = threading.current_thread() def worker(): childpid = os.fork() if childpid != 0: os.waitpid(childpid, 0) sys.exit(0) t = threading.Thread(target=joiningfunc, args=(main_thread,)) print 'end of main' t.start() t.join() # Should not block: main_thread is already stopped w = threading.Thread(target=worker) w.start() (R(RR((s//usr/local/lib/python2.7/test/test_threading.pyt!test_3_join_in_forked_from_threadscCsxtjtjd|gdtj}|j}|jjjj dd}|j |dd|j ||dS(Ns-cRzs RisUnexpected error( RtR|RvRwR}RBRzRtdecodeRR,(RRtexpected_outputRRxR((s//usr/local/lib/python2.7/test/test_threading.pytassertScriptHasOutput4s   !cCsd}|j|ddS(NsDif 1: import os, time, threading finish_join = False start_fork = False def worker(): # Wait until this thread's lock is acquired before forking to # create the deadlock. global finish_join while not start_fork: time.sleep(0.01) # LOCK HELD: Main thread holds lock across this call. childpid = os.fork() finish_join = True if childpid != 0: # Parent process just waits for child. os.waitpid(childpid, 0) # Child process should just return. w = threading.Thread(target=worker) # Stub out the private condition variable's lock acquire method. # This acquires the lock and then waits until the child has forked # before returning, which will release the lock soon after. If # someone else tries to fix this test case by acquiring this lock # before forking instead of resetting it, the test case will # deadlock when it shouldn't. condition = w._block orig_acquire = condition.acquire call_count_lock = threading.Lock() call_count = 0 def my_acquire(): global call_count global start_fork orig_acquire() # LOCK ACQUIRED HERE start_fork = True if call_count == 0: while not finish_join: time.sleep(0.01) # WORKER THREAD FORKS HERE with call_count_lock: call_count += 1 condition.acquire = my_acquire w.start() w.join() print('end of main') s end of main (R(RR((s//usr/local/lib/python2.7/test/test_threading.pyt+test_4_joining_across_fork_in_worker_thread<sFcCs d}d}|j||dS(Nsif True: import os, time, threading start_fork = False def worker(): # Wait until the main thread has attempted to join this thread # before continuing. while not start_fork: time.sleep(0.01) childpid = os.fork() if childpid != 0: # Parent process just waits for child. (cpid, rc) = os.waitpid(childpid, 0) assert cpid == childpid assert rc == 0 print('end of worker thread') else: # Child process should just return. pass w = threading.Thread(target=worker) # Stub out the private condition variable's _release_save method. # This releases the condition's lock and flips the global that # causes the worker to fork. At this point, the problematic waiter # lock has been acquired once by the waiter and has been put onto # the waiters list. condition = w._block orig_release_save = condition._release_save def my_release_save(): global start_fork orig_release_save() # Waiter lock held here, condition lock released. start_fork = True condition._release_save = my_release_save w.start() w.join() print('end of main thread') s(end of worker thread end of main thread (R(RRtoutput((s//usr/local/lib/python2.7/test/test_threading.pyt(test_5_clear_waiter_locks_to_avoid_crashs6cCs/d}td|\}}}|j|dS(Nsif True: import os import random import sys import time import threading thread_has_run = set() def random_io(): '''Loop for a while sleeping random tiny amounts and doing some I/O.''' while True: in_f = open(os.__file__, 'rb') stuff = in_f.read(200) null_f = open(os.devnull, 'wb') null_f.write(stuff) time.sleep(random.random() / 1995) null_f.close() in_f.close() thread_has_run.add(threading.current_thread()) def main(): count = 0 for _ in range(40): new_thread = threading.Thread(target=random_io) new_thread.daemon = True new_thread.start() count += 1 while len(thread_has_run) < count: time.sleep(0.001) # Trigger process shutdown sys.exit(0) main() s-c(RR6(RRRxRR((s//usr/local/lib/python2.7/test/test_threading.pyttest_6_daemon_threadss'cCsnd}g}x=tdD]/}tjd|}|j||jqWx|D]}|jqVWdS(NcSs<tj}|dkr+tj|dn tjddS(Ni(RRRR(R((s//usr/local/lib/python2.7/test/test_threading.pytdo_fork_and_waits  iRn(R*RRR+R2R3(RRR9R:R;((s//usr/local/lib/python2.7/test/test_threading.pyttest_reinit_tls_after_forks   sneed _testcapi modulecsfddfdd_tj}tjz<tjtjxtdD] }qvWWdtj|XdS(NcsS(N((tframeteventtarg(t noop_trace(s//usr/local/lib/python2.7/test/test_threading.pyR scssxdVqdS(Nt genereator((((s//usr/local/lib/python2.7/test/test_threading.pyt generatorscs+jdkr_ntjS(N(tgenR.tnext((tcallbackR(s//usr/local/lib/python2.7/test/test_threading.pyRsi( R.RRvtgettracetsettraceRt _testcapitcall_in_temporary_c_threadR*(Rt old_traceR((RRRs//usr/local/lib/python2.7/test/test_threading.pyttest_frame_tstate_tracings       (RRRRsos2emxN(R Rtplatforms_to_skipRRRRRRtskipIfRvtplatformRRRRRRRRRR.R(((s//usr/local/lib/python2.7/test/test_threading.pyRs$  '' 'H'9$+'tThreadingExceptionTestscBs,eZdZdZdZdZRS(cCs-tj}|j|jt|jdS(N(RRR2Rqt RuntimeError(RR((s//usr/local/lib/python2.7/test/test_threading.pyttest_start_thread_again.s  cCs#tj}|jt|jdS(N(RRJRqRR3(RRJ((s//usr/local/lib/python2.7/test/test_threading.pyttest_joining_current_thread3s cCs#tj}|jt|jdS(N(RRRqRR3(RR((s//usr/local/lib/python2.7/test/test_threading.pyttest_joining_inactive_thread7s cCs3tj}|j|jtt|dtdS(NRf(RRR2RqRtsetattrRY(RR((s//usr/local/lib/python2.7/test/test_threading.pyttest_daemonize_active_thread;s  (R RRRRR(((s//usr/local/lib/python2.7/test/test_threading.pyR+s   t LockTestscBseZeejZRS((R Rt staticmethodRRLtlocktype(((s//usr/local/lib/python2.7/test/test_threading.pyRAst RLockTestscBseZeejZRS((R RRRR)R(((s//usr/local/lib/python2.7/test/test_threading.pyRDst EventTestscBseZeejZRS((R RRRR@t eventtype(((s//usr/local/lib/python2.7/test/test_threading.pyRGstConditionAsRLockTestscBseZeejZRS((R RRRt ConditionR(((s//usr/local/lib/python2.7/test/test_threading.pyRJstConditionTestscBseZeejZRS((R RRRRtcondtype(((s//usr/local/lib/python2.7/test/test_threading.pyRNstSemaphoreTestscBseZeejZRS((R RRRt Semaphoretsemtype(((s//usr/local/lib/python2.7/test/test_threading.pyRQstBoundedSemaphoreTestscBs;eZeejZejej dkddZ RS(tdarwinstest macosx problemcCsd}d}tjtjd|gdtj}|j\}}|jjdd}|j|j dd|j||dS( Ns}if True: import threading def recurse(): return recurse() def outer(): try: recurse() except RuntimeError: pass w = threading.Thread(target=outer) w.start() w.join() print('end of main thread') send of main thread s-cRzs RisUnexpected error( RtR|RvRwR}RRRR,R(RRRRRzR{R((s//usr/local/lib/python2.7/test/test_threading.pyttest_recursion_limitWs ( R RRRR(RRRRvRR(((s//usr/local/lib/python2.7/test/test_threading.pyRTsc Cs/tjjttttttt t t t dS(N( RR t run_unittestRRRRRRRR'RR(((s//usr/local/lib/python2.7/test/test_threading.pyt test_mainws t__main__((ttest.test_supportRRRttest.script_helperRRR/RvR t import_moduleRRRRRRRtRR^R.RRRRRtTestCaseRR'RRRRRRRRRRR (((s//usr/local/lib/python2.7/test/test_threading.pytsF              I#