ó {G_Tc@sdZddlZddlZddlZddlZddlZddlZddlZddlZddlZddl Z ddl Z ddl Z ddl m Z mZmZyddlZWnek rÛdZnXe jjdƒdZe jjZeedƒZe jedƒZeedƒo:ejd kZe jed ƒZd „ZejZd d „Z er´de j!e j"fd„ƒYZ#de j!e j$fd„ƒYZ%nej&d„ƒZ'e jedƒde j(fd„ƒYƒZ)d„Z*e+dkre*ƒndS(s! Test suite for SocketServer.py. iÿÿÿÿN(t reap_childrent reap_threadstverbosetnetworks hello world tAF_UNIXsrequires Unix socketstforktos2srequires forkingcCs#ttdƒrtj|ƒndS(s7Call signal.alarm when it exists (i.e. not on Windows).talarmN(thasattrtsignalR(tn((s2/usr/local/lib/python2.7/test/test_socketserver.pyt signal_alarm#sicCsNt|ggg|ƒ\}}}||kr:|j|ƒStd|f‚dS(Nstimed out on %r(t _real_selecttrecvt RuntimeError(tsockR ttimeouttrtwtx((s2/usr/local/lib/python2.7/test/test_socketserver.pytreceive+s!  tForkingUnixStreamServercBseZRS((t__name__t __module__(((s2/usr/local/lib/python2.7/test/test_socketserver.pyR3stForkingUnixDatagramServercBseZRS((RR(((s2/usr/local/lib/python2.7/test/test_socketserver.pyR7sccsitjƒ}|dkr(tjdƒndVtj|dƒ\}}|j||ƒ|jd|ƒdS(NiiHiiH(tosRt_exittNonetwaitpidt assertEqual(ttestcasetpidtpid2tstatus((s2/usr/local/lib/python2.7/test/test_socketserver.pytsimple_subprocess<s  s!Threading required for this test.tSocketServerTestcBsòeZdZd„Zd„Zd„Zd„Zed„ƒZd„Z d„Z d„Z d „Z e d „ƒZed „ƒZed „ƒZee d „ƒƒZd„Zd„Ze d„ƒZejd„ƒZd„Zed„ƒZRS(sTest all socket servers.cCs tdƒd|_g|_dS(Ni<i(R t port_seedt test_files(tself((s2/usr/local/lib/python2.7/test/test_socketserver.pytsetUpLs  cCs[tdƒtƒx9|jD].}ytj|ƒWqtjk rHqXqWg|j(dS(Ni(R RR%Rtremoveterror(R&tfn((s2/usr/local/lib/python2.7/test/test_socketserver.pyttearDownQs cCs|tjkrtdfSd}tjdkr7d}ntjddd|ƒ}tjdkré|ddkr{|d }n|dtjtj fkr¤|d}ntjd krÎ|j tjtj ƒ}qé|j tj tjƒ}n|j j |ƒ|SdS( NiRs\sockettprefixs unix_socket.tdirit:it/( tsockettAF_INETtHOSTRRtnamettempfiletmktemptseptaltseptreplaceR%tappend(R&tprotoR-R*((s2/usr/local/lib/python2.7/test/test_socketserver.pytpickaddr\s     cCsid|fd„ƒY}d|fd„ƒY}tr:dGHn|||ƒ}|j|j|jjƒƒ|S(NtMyServercBseZd„ZRS(cSs|j|ƒ|jƒ‚dS(N(t close_requestt server_close(R&trequesttclient_address((s2/usr/local/lib/python2.7/test/test_socketserver.pyt handle_errorws  (RRRA(((s2/usr/local/lib/python2.7/test/test_socketserver.pyR<vst MyHandlercBseZd„ZRS(cSs#|jjƒ}|jj|ƒdS(N(trfiletreadlinetwfiletwrite(R&tline((s2/usr/local/lib/python2.7/test/test_socketserver.pythandle}s(RRRH(((s2/usr/local/lib/python2.7/test/test_socketserver.pyRB|sscreating server(RRtserver_addressR0t getsockname(R&taddrtsvrclsthdlrbaseR<RBtserver((s2/usr/local/lib/python2.7/test/test_socketserver.pyt make_serverusc Cs|j|j|jƒ||ƒ}|j}trJdGHdG|GHdG|GHntjdd|d|jdidd 6ƒ}t|_ |j ƒtr—d GHnx6t d ƒD](}tr¼d G|GHn||j|ƒq¤WtrÞd GHn|j ƒ|j ƒtrdGHndS(Nsserver createdsADDR =sCLASS =R3s %s servingttargettkwargsg{®Gáz„?t poll_intervalsserver runningis test clientswaiting for servertdone(ROR;taddress_familyRIRt threadingtThreadt serve_forevertTruetdaemontstarttrangetshutdowntjoin(R&RLRMttestfuncRNRKttti((s2/usr/local/lib/python2.7/test/test_socketserver.pyt run_server†s2            cCs’tj|tjƒ}|j|ƒ|jtƒt|dƒ}}x/|rsd|krst|dƒ}||7}qEW|j|tƒ|jƒdS(Nids (R0t SOCK_STREAMtconnecttsendalltTEST_STRRRtclose(R&R:RKtstbuftdata((s2/usr/local/lib/python2.7/test/test_socketserver.pytstream_examine£s  cCsˆtj|tjƒ}|jt|ƒt|dƒ}}x/|rid|krit|dƒ}||7}q;W|j|tƒ|jƒdS(Nids (R0t SOCK_DGRAMtsendtoReRRRf(R&R:RKRgRhRi((s2/usr/local/lib/python2.7/test/test_socketserver.pyt dgram_examine®scCs |jtjtj|jƒdS(N(Rat SocketServert TCPServertStreamRequestHandlerRj(R&((s2/usr/local/lib/python2.7/test/test_socketserver.pyttest_TCPServer¸s cCs |jtjtj|jƒdS(N(RaRntThreadingTCPServerRpRj(R&((s2/usr/local/lib/python2.7/test/test_socketserver.pyttest_ThreadingTCPServer½s cCs3t|ƒ!|jtjtj|jƒWdQXdS(N(R"RaRntForkingTCPServerRpRj(R&((s2/usr/local/lib/python2.7/test/test_socketserver.pyttest_ForkingTCPServerÂs  cCs |jtjtj|jƒdS(N(RaRntUnixStreamServerRpRj(R&((s2/usr/local/lib/python2.7/test/test_socketserver.pyttest_UnixStreamServerÉs cCs |jtjtj|jƒdS(N(RaRntThreadingUnixStreamServerRpRj(R&((s2/usr/local/lib/python2.7/test/test_socketserver.pyttest_ThreadingUnixStreamServerÏs cCs0t|ƒ|jttj|jƒWdQXdS(N(R"RaRRnRpRj(R&((s2/usr/local/lib/python2.7/test/test_socketserver.pyttest_ForkingUnixStreamServerÕs  cCs |jtjtj|jƒdS(N(RaRnt UDPServertDatagramRequestHandlerRm(R&((s2/usr/local/lib/python2.7/test/test_socketserver.pyttest_UDPServerÝs cCs |jtjtj|jƒdS(N(RaRntThreadingUDPServerR|Rm(R&((s2/usr/local/lib/python2.7/test/test_socketserver.pyttest_ThreadingUDPServerâs cCs3t|ƒ!|jtjtj|jƒWdQXdS(N(R"RaRntForkingUDPServerR|Rm(R&((s2/usr/local/lib/python2.7/test/test_socketserver.pyttest_ForkingUDPServerçs  c#sKtj‰dd‡fd†ƒY}|ƒt_z tjVWdˆt_XdS(s<Mocks the select.select() call to raise EINTR for first callt MockSelectcs eZd„Z‡fd†ZRS(cSs d|_dS(Ni(tcalled(R&((s2/usr/local/lib/python2.7/test/test_socketserver.pyt__init__ôscsP|jd7_|jdkrBtjtjtjtjƒƒ‚n ˆ|ŒSdS(Ni(RƒtselectR)terrnotEINTRRtstrerror(R&targs(t old_select(s2/usr/local/lib/python2.7/test/test_socketserver.pyt__call__÷s$(RRR„R‹((RŠ(s2/usr/local/lib/python2.7/test/test_socketserver.pyR‚ós N((R…(R&R‚((RŠs2/usr/local/lib/python2.7/test/test_socketserver.pytmocked_select_moduleîs    cCsJ|jƒ8}|jtjtj|jƒ}|j|jdƒWdQXdS(Ni(RŒRaRnRoRpRjt assertGreaterRƒ(R&t mock_selectR((s2/usr/local/lib/python2.7/test/test_socketserver.pyttest_InterruptServerSelectCalls   c Csødtjfd„ƒY}dtjfd„ƒY}g}xmtdƒD]_}|tdf|ƒ}tjddd |jd id d 6ƒ}t|_ |j ||fƒqEWx(|D] \}}|j ƒ|j ƒq¯Wx|D]\}}|j ƒqÚWdS( NR<cBseZRS((RR(((s2/usr/local/lib/python2.7/test/test_socketserver.pyR<(sRBcBseZRS((RR(((s2/usr/local/lib/python2.7/test/test_socketserver.pyRB+siiR3sMyServer servingRPRQg{®Gáz„?RR(RnRoRpR[R2RURVRWRXRYR9RZR\R](R&R<RBtthreadsR`RgR_((s2/usr/local/lib/python2.7/test/test_socketserver.pyt test_shutdown$s     (RRt__doc__R'R+R;RORRaRjRmRqRstrequires_forkingRutrequires_unix_socketsRwRyRzR}RRt contextlibtcontextmanagerRŒRR‘(((s2/usr/local/lib/python2.7/test/test_socketserver.pyR#Hs*        cCs2tjƒrtjdƒ‚ntjjtƒdS(Ns"can't run when import lock is held(timpt lock_heldtunittesttSkipTestttestt test_supportt run_unittestR#(((s2/usr/local/lib/python2.7/test/test_socketserver.pyt test_main>s t__main__(,R’R•R—RR…R R0R†R4R™Rnttest.test_supportR›RRRRUt ImportErrorRRœtrequiresReR2RtHAVE_UNIX_SOCKETSt skipUnlessR”R3t HAVE_FORKINGR“R R Rt ForkingMixInRvRtUnixDatagramServerRR–R"tTestCaseR#RžR(((s2/usr/local/lib/python2.7/test/test_socketserver.pytsN                       õ