ó {G_Tc@sÉdZddladdlZddlZddlZddlZddlmZda dej fd„ƒYZ dej fd„ƒYZ d ej fd „ƒYZ dd „Zed krÅeƒndS( sÕGeneric thread tests. Meant to be used by dummy_thread and thread. To allow for different modules to be used, test_main() can be called with the module to use as the thread implementation as its sole argument. iÿÿÿÿN(t test_supportit LockTestscBs_eZdZd„Zd„Zd„Zd„Zd„Zd„Zd„Z d„Z d „Z RS( sTest lock objects.cCstjƒ|_dS(N(t_threadt allocate_locktlock(tself((s2/usr/local/lib/python2.7/test/test_dummy_thread.pytsetUpscCs|j|jjƒ dƒdS(Ns(Lock object is not initialized unlocked.(t assertTrueRtlocked(R((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyt test_initlockscCs8|jjƒ|jjƒ|j|jjƒ dƒdS(Ns%Lock object did not release properly.(RtacquiretreleaseRR(R((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyt test_releases  cCs|jtj|jjƒdS(N(t assertRaisesRterrorRR (R((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyttest_improper_release%scCs |j|jjdƒdƒdS(Nis)Conditional acquiring of the lock failed.(RRR (R((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyttest_cond_acquire_success)scCs1|jjdƒ|j|jjdƒ dƒdS(Nis=Conditional acquiring of a locked lock incorrectly succeeded.(RR R(R((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyttest_cond_acquire_fail.scCs*|jjƒ|j|jjƒdƒdS(NsUncondional locking failed.(RR RR(R((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyttest_uncond_acquire_success5s cCsB|j|jjdƒtkdƒ|j|jjƒtkƒdS(Nis*Unconditional locking did not return True.(RRR tTrue(R((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyttest_uncond_acquire_return_val;scCs¥d„}|jjƒttjƒƒ}tj||jtfƒtjrWHdtGHn|jjƒttjƒƒ}tjr‡dGHn|j ||tkdƒdS(NcSstj|ƒ|jƒdS(s:Hold on to lock for a set amount of time before unlocking.N(ttimetsleepR (t to_unlocktdelay((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyt delay_unlockCs s@*** Waiting for thread to release the lock (approx. %s sec.) ***tdones+Blocking by unconditional acquiring failed.( RR tintRRtstart_new_threadtDELAYRtverboseR(RRt start_timetend_time((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyttest_uncond_acquire_blockingAs      ( t__name__t __module__t__doc__RR R RRRRRR!(((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyRs        t MiscTestscBs;eZdZd„Zd„Zd„Zd„Zd„ZRS(sMiscellaneous tests.cCs|jttjƒdS(N(R t SystemExitRtexit(R((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyt test_exitYscCs9|jtjƒtdƒ|jtjƒdkdƒdS(Ns*_thread.get_ident() returned a non-integeris_thread.get_ident() returned 0(tassertIsInstanceRt get_identRR(R((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyt test_ident]scCs |jtjƒtjdƒdS(NsR_thread.LockType is not an instance of what is returned by _thread.allocate_lock()(R)RRtLockType(R((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyt test_LockTypedscCs)d„}|jttj|tƒƒdS(NcSstjƒdS(N(Rtinterrupt_main(((s2/usr/local/lib/python2.7/test/test_dummy_thread.pytcall_interruptms(R tKeyboardInterruptRRttuple(RR/((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyttest_interrupt_mainjs cCs|jttjƒdS(N(R R0RR.(R((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyttest_interrupt_in_mainrs(R"R#R$R(R+R-R2R3(((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyR%Vs     t ThreadTestscBs eZdZd„Zd„ZRS(sTest thread creation.cCsttd„}tjdƒ}tj||ttfƒ|jƒ}|j|doZ|ddƒtj|tƒi|d6td6td6ƒ|jƒ}|j|do¯|ddƒtj||tfitd6ƒ|jƒ}|j|doù|dd ƒdS( NcSs|j||fƒdS(s<Use to test _thread.start_new_thread() passes args properly.N(tput(tqueuetarg1targ2((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyt arg_tester|siis7Argument passing for thread creation using tuple failedR6R7R8s8Argument passing for thread creation using kwargs failedsGArgument passing for thread creation using both tuple and kwargs failed(tFalsetQueueRRRtgetRR1(RR9t testing_queuetresult((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyttest_arg_passingzs    cCsÓd„}d}tj|ƒ}tjr:Hdt|fGHnxNt|ƒD]@}trkttjƒdƒ}nd}tj|||fƒqGWt j tƒtjr©dGHn|j |j ƒ|kd|tfƒdS(NcSs$tj|ƒ|jtjƒƒdS(s@Wait for ``delay`` seconds and then put something into ``queue``N(RRR5RR*(R6R((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyt queue_mark’s isJ*** Testing multiple thread creation (will take approx. %s to %s sec.) ***iiRs2Not all %s threads executed properly after %s sec.( R;RRRtxrangetroundtrandomRRRRRtqsize(RR@t thread_countR=tcountt local_delay((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyttest_multi_creations&     (R"R#R$R?RH(((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyR4ws cCsB|r|adantjr+HdtGHntjtttƒdS(Nis"*** Using %s as _thread module ***(RRRRt run_unittestRR%R4(timported_module((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyt test_main«s   t__main__(R$t dummy_threadRRR;RCtunittestttestRRtTestCaseRR%R4tNoneRKR"(((s2/usr/local/lib/python2.7/test/test_dummy_thread.pyts     D!4