ó {G_Tc@s5ddlZddlZddlZddlZddlZddlmZddlmZejdƒZ ej Z e ƒZ dd„Zdefd„ƒYZd„Zd „Zd efd „ƒYZd e fd „ƒYZeZdefd„ƒYZdefd„ƒYZdd„Zedkr1eƒndS(iÿÿÿÿN(tTestCase(t test_supportt threadingc Csû|jdƒ|jƒzÒy·|jƒ\}}|rÆd}|jtdƒ}|jƒxl|D]a}|tkrtPnt|ƒtt gkrœt j |ƒn ||7}|j |ƒ}||}q^Wn|j ƒWntjk rçnXWd|j ƒXdS(sÎ Open a tcp server in three steps 1) set evt to true to let the parent know we are ready 2) [optional] if is not False, write the list of data from dataq.get() to the socket. itgà?N(tlistentsettaccepttgettTruet task_donet EOF_sigilttypetinttfloatttimetsleeptsendtclosetsocketttimeout( tevttservtdataqtconntaddrtdatatnew_datatitemtwritten((s//usr/local/lib/python2.7/test/test_telnetlib.pytservers*      t GeneralTestscBsPeZd„Zd„Zd„Zd„Zd„Zd„Zd„Zd„Z RS(cCs¤tjƒ|_tjtjtjƒ|_|jjdƒtj |jƒ|_ tj dt d|j|jfƒ|_ |j jtƒ|j jƒ|jjƒdS(Ni<ttargettargs(RtEventRRtAF_INETt SOCK_STREAMtsockt settimeoutRt bind_porttporttThreadRtthreadt setDaemonRtstarttwait(tself((s//usr/local/lib/python2.7/test/test_telnetlib.pytsetUp-s' cCs|jjƒdS(N(R)tjoin(R-((s//usr/local/lib/python2.7/test/test_telnetlib.pyttearDown7scCs&tjt|jƒ}|jjƒdS(N(t telnetlibtTelnettHOSTR'R$R(R-ttelnet((s//usr/local/lib/python2.7/test/test_telnetlib.pyt testBasic:scCsz|jtjƒdkƒtjdƒztjt|jƒ}WdtjdƒX|j |j j ƒdƒ|j j ƒdS(Ni( t assertTrueRtgetdefaulttimeouttNonetsetdefaulttimeoutR1R2R3R't assertEqualR$t gettimeoutR(R-R4((s//usr/local/lib/python2.7/test/test_telnetlib.pyttestTimeoutDefault?s cCsƒ|jtjƒdkƒtjdƒztjt|jddƒ}WdtjdƒX|j|j j ƒdkƒ|j j ƒdS(NiR( R6RR7R8R9R1R2R3R'R$R;R(R-R4((s//usr/local/lib/python2.7/test/test_telnetlib.pyttestTimeoutNoneIs cCsEtjt|jddƒ}|j|jjƒdƒ|jjƒdS(NRi(R1R2R3R'R:R$R;R(R-R4((s//usr/local/lib/python2.7/test/test_telnetlib.pyttestTimeoutValueTscCsOtjƒ}|jt|jddƒ|j|jjƒdƒ|jjƒdS(NRi( R1R2topenR3R'R:R$R;R(R-R4((s//usr/local/lib/python2.7/test/test_telnetlib.pyttestTimeoutOpenYs cCsgtjt|jddƒ}|j}|j|jƒ|ƒ|j|jƒ|jƒƒ|jjƒdS(NRi( R1R2R3R'R$R:t get_sockettfilenoR(R-R4tt_sock((s//usr/local/lib/python2.7/test/test_telnetlib.pyt testGetters_s  ( t__name__t __module__R.R0R5R<R=R>R@RD(((s//usr/local/lib/python2.7/test/test_telnetlib.pyR+s    cCs©tjƒ|_tjƒ|_tjtjtjƒ|_|jj dƒt j |jƒ|_ tj dtd|j|j|jfƒ|_|jjƒ|jjƒdS(Ni RR (RR!RtQueueRRR"R#R$R%RR&R'R(RR)R+R,(R-((s//usr/local/lib/python2.7/test/test_telnetlib.pyt _read_setUpgs- cCs|jjƒdS(N(R)R/(R-((s//usr/local/lib/python2.7/test/test_telnetlib.pyt_read_tearDownqst ReadTestscBsæeZeZeZdZdZd„Zd„Z d„Z d„Z d„Z d„Z d„Zd „Zd „Zd „Zd „Zd „Zd„Zd„Zd„Zd„Zd„Zd„Zd„Zd„Zd„Zd„ZRS(g333333ã?g333333Ó?cCstddd tg}|jj|ƒtjt|jƒ}|jjƒ|jdƒ}|j |dj|d ƒƒdS( s« read_until(expected, [timeout]) Read until the expected string has been seen, or a timeout is hit (default is no timeout); may block. txi tmatchtyRiþÿÿÿNt xxxxxxxxxxt yyyyyyyyyy( R RtputR1R2R3R'R/t read_untilR:(R-twantR4R((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_read_until_A|s  cCsŠd|jdtg}|jj|ƒtjt|jƒ}|jjƒ|j d|j ƒ}|j ||dƒ|j |j ƒdƒdS(Nthellosnot seeni( t block_longR RRPR1R2R3R'R/RQt block_shortR:tread_all(R-RRR4R((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_read_until_B‰s cCs˜d dd tg}|jj|ƒtjt|jƒ}|jsRtj dƒ‚nt |_|jj ƒ|j dƒ}|j |dj |d ƒƒdS( s3Use select.poll() to implement telnet.read_until().RKi RLRMsselect.poll() is requiredRiþÿÿÿNRNRO(R RRPR1R2R3R't _has_polltunittesttSkipTestRR/RQR:(R-RRR4R((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_read_until_with_poll“s   cCs}ddd tg}|jj|ƒtjt|jƒ}t|_|jj ƒ|j dƒ}|j |dj |d ƒƒdS( s5Use select.select() to implement telnet.read_until().RKi RLRMRiþÿÿÿNRNRO( R RRPR1R2R3R'tFalseRYR/RQR:(R-RRR4R((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_read_until_with_selectŸs  cCs}ddddddtg}|jj|ƒtjt|jƒ}|jjƒ|jƒ}|j |dj|d ƒƒdS(sJ read_all() Read all data until EOF; may block. RKiôRMtzRiÿÿÿÿN( R RRPR1R2R3R'R/RWR:(R-RRR4R((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_read_all_A©s   cCs_|jj|jtgƒ|jjƒtjƒ}|ƒ}|j|jtjƒ|kƒdS(N(RRPRUR R/RR6RV(R-tfuncR+R((s//usr/local/lib/python2.7/test/test_telnetlib.pyt_test_blockingµs    cCs#|jtjt|jƒjƒdS(N(RbR1R2R3R'RW(R-((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_read_all_B¼scCsM|jjtgƒtjt|jƒ}|jjƒ|jƒ|jƒdS(N( RRPR R1R2R3R'R/RW(R-R4((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_read_all_C¿s   cCskddtg}|jj|ƒtjt|jƒ}|jjƒ|jƒ}|j t |ƒdkƒdS(sQ read_some() Read at least one byte or EOF; may block. RKiôiN( R RRPR1R2R3R'R/RWR6tlen(R-RRR4R((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_read_some_AÆs   cCsO|jjtgƒtjt|jƒ}|jjƒ|jd|j ƒƒdS(NR( RRPR R1R2R3R'R/R:t read_some(R-R4((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_read_some_BÓs cCs#|jtjt|jƒjƒdS(N(RbR1R2R3R'Rg(R-((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_read_some_CÚscCsÓ|jddddtg}|d|d}|jj|ƒtjt|jƒ}|jjƒt ||ƒ}d}xFt r¾y'||ƒ7}|j |j |ƒƒWqyt k rºPqyXqyW|j||ƒdS(s‚ read_very_eager() Read all data available already queued or on the socket, without blocking. RKidRMiiRN(RUR RRPR1R2R3R'R/tgetattrRR6t startswithtEOFErrorR:(R-t func_nameRRtexpectsR4RaR((s//usr/local/lib/python2.7/test/test_telnetlib.pyt_test_read_any_eager_AÝs     cCsh|jjtgƒtjt|jƒ}|jjƒtj |j ƒt ||ƒ}|j t |ƒdS(N(RRPR R1R2R3R'R/RRRVRjt assertRaisesRl(R-RmR4Ra((s//usr/local/lib/python2.7/test/test_telnetlib.pyt_test_read_any_eager_Bòs  cCs|jdƒdS(Ntread_very_eager(Ro(R-((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_read_very_eager_AýscCs|jdƒdS(NRr(Rq(R-((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_read_very_eager_BÿscCs|jdƒdS(Nt read_eager(Ro(R-((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_read_eager_AscCs|jdƒdS(NRu(Rq(R-((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_read_eager_BscCsb|jjtgƒtjt|jƒ}|jjƒt||ƒ}|j ƒ|j t |ƒdS(N( RRPR R1R2R3R'R/Rjt fill_rawqRpRl(R-RmR4Ra((s//usr/local/lib/python2.7/test/test_telnetlib.pyt_test_read_any_lazy_Bs   cCsïddtg}|jj|ƒtjt|jƒ}|jjƒtj |j ƒ|j d|j ƒƒd}xft rÖy-|j ƒ}||7}|s£|jƒnWntk r¸PnX|j|dj|ƒƒqqW|j ||dƒdS(NRKidRi(R RRPR1R2R3R'R/RRRVR:t read_lazyRRxRlR6Rk(R-RRR4Rt read_data((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_read_lazy_As"     cCs|jdƒdS(NRz(Ry(R-((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_read_lazy_B#scCs ddtg}|jj|ƒtjt|jƒ}|jjƒtj |j ƒ|j d|j ƒƒd}xƒt róy|j ƒ}Wntk r›PnX||7}|sÖ|jƒ|j d|jƒ|jƒn|j|dj|ƒƒqqW|j ||dƒdS(NRKidRi(R RRPR1R2R3R'R/RRRVR:tread_very_lazyRRlRxtcookedqt process_rawqR6Rk(R-RRR4RR{((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_read_very_lazy_A&s&      cCs|jdƒdS(NR~(Ry(R-((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_read_very_lazy_B;s(RERFRHR.RIR0RURVRSRXR\R^R`RbRcRdRfRhRiRoRqRsRtRvRwRyR|R}RR‚(((s//usr/local/lib/python2.7/test/test_telnetlib.pyRJts4              tnego_collectorcBseZdd„Zd„ZRS(cCsd|_||_d|_dS(NR(tseent sb_gettertsb_seen(R-R…((s//usr/local/lib/python2.7/test/test_telnetlib.pyt__init__?s  cCsM|j||7_|tjkrI|jrI|jƒ}|j|7_ndS(N(R„ttltSER…R†(R-R$tcmdtopttsb_data((s//usr/local/lib/python2.7/test/test_telnetlib.pytdo_negoDs N(RERFR8R‡R(((s//usr/local/lib/python2.7/test/test_telnetlib.pyRƒ>s t OptionTestscBseeZeZeZejejej ej ej ej ej ejgZd„Zd„Zd„ZRS(cCsõ|jƒ|jj|ƒtjt|jƒ}|jjƒtƒ}|j |j ƒ|j ƒ}|j }|j t|ƒdkƒ|j|d|jƒ|j|dtjƒ|jtdj|d ƒƒt||ƒƒd|_|jƒdS(s helper for testing IAC + cmd iiRiÿÿÿÿN(R.RRPR1R2R3R'R/Rƒtset_option_negotiation_callbackRRWR„R6RetassertIntcmdsR:RˆtNOOPTR8R…R0(R-RR4tnegottxtRŠ((s//usr/local/lib/python2.7/test/test_telnetlib.pyt _test_commandQs     - cCs|jjtgƒtjt|jƒ}|jjƒ|jƒxs|j D]h}|j ddt j |ddtgƒ|j dt j |dtgƒ|j t j |tgƒqIW|j g|j D]}t j |^qÅtgƒ|j d|jƒƒdS(NRKidRMi RRNRO(RRPR R1R2R3R'R/R0R‘R•RˆtIACR:t read_sb_data(R-R4RŠ((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_IAC_commandsbs  ( 1cCs¥tjtjtjtjtjtjtjtjtjtjtjtjtjtjdtjtjtjtjdtjtjtjtjtjtjdtjtjdtjtjtg}|jj|ƒtjt |j ƒ}|jj ƒt |j ƒ}|j|jƒ|jƒ}|j|dƒtjtjdtjdtjd}|j|j|ƒ|jd|j ƒƒd|_dS(NtaatbbtcctddRtaabb(RˆR–tSBR‰R RRPR1R2R3R'R/RƒR—RRRWR:R†R8R…(R-RR4R“R”t want_sb_data((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_SB_commandsqs")--1   *(RERFRHR.RIR0RˆtAOtAYTtBRKtECtELtGAtIPtNOPR‘R•R˜R (((s//usr/local/lib/python2.7/test/test_telnetlib.pyRŽKs 6  t ExpectTestscBsJeZd„Zd„ZdZdZd„Zd„Zd„Zd„Z RS(cCs©tjƒ|_tjƒ|_tjtjtjƒ|_|jj dƒt j |jƒ|_ tj dtd|j|j|jfƒ|_|jjƒ|jjƒdS(Ni RR (RR!RRGRRR"R#R$R%RR&R'R(RR)R+R,(R-((s//usr/local/lib/python2.7/test/test_telnetlib.pyR.ˆs cCs|jjƒdS(N(R)R/(R-((s//usr/local/lib/python2.7/test/test_telnetlib.pyR0“sg333333ã?g333333Ó?cCs€ddd tg}|jj|ƒtjt|jƒ}|jjƒ|jdgƒ\}}}|j |dj|d ƒƒdS( s§ expect(expected, [timeout]) Read until the expected string has been seen, or a timeout is hit (default is no timeout); may block. RKi RLRMRiþÿÿÿNRNRO( R RRPR1R2R3R'R/texpectR:(R-RRR4t_R((s//usr/local/lib/python2.7/test/test_telnetlib.pyt test_expect_Ašs  cCs–d|jdtg}|jj|ƒtjt|jƒ}|jjƒ|j dg|j ƒ\}}}|j ||dƒ|j |j ƒdƒdS(NRTsnot seeni( RUR RRPR1R2R3R'R/RªRVR:RW(R-RRR4R«R((s//usr/local/lib/python2.7/test/test_telnetlib.pyt test_expect_B§s !cCs¤d dd tg}|jj|ƒtjt|jƒ}|jsRtj dƒ‚nt |_|jj ƒ|j dgƒ\}}}|j |dj |d ƒƒdS( s/Use select.poll() to implement telnet.expect().RKi RLRMsselect.poll() is requiredRiþÿÿÿNRNRO(R RRPR1R2R3R'RYRZR[RR/RªR:(R-RRR4R«R((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_expect_with_poll±s   cCs‰ddd tg}|jj|ƒtjt|jƒ}t|_|jj ƒ|j dgƒ\}}}|j |dj |d ƒƒdS( s1Use select.select() to implement telnet.expect().RKi RLRMRiþÿÿÿNRNRO( R RRPR1R2R3R'R]RYR/RªR:(R-RRR4R«R((s//usr/local/lib/python2.7/test/test_telnetlib.pyttest_expect_with_select½s  ( RERFR.R0RURVR¬R­R®R¯(((s//usr/local/lib/python2.7/test/test_telnetlib.pyR©‡s  cCstjttttƒdS(N(Rt run_unittestRRJRŽR©(tverbose((s//usr/local/lib/python2.7/test/test_telnetlib.pyt test_mainÈst__main__(RR1RRGRZRttestRt import_moduleRR3tobjectR R8RRRHRIRJRƒRˆRŽR©R²RE(((s//usr/local/lib/python2.7/test/test_telnetlib.pyts*        < Ê <A