ó {G_Tc@s»dZddlZddlZddlZddlZddlZddlZddlmZe edƒsej dƒ‚ndej fd„ƒYZ d„Z ed kr·e ƒndS( s Tests for kqueue wrapper. iÿÿÿÿN(t test_supporttkqueuestest works only on BSDt TestKQueuecBs,eZd„Zd„Zd„Zd„ZRS(cCsptjƒ}|j|jƒdk|jƒƒ|j|j ƒ|jƒ|j|jƒ|jt|jƒdS(Ni(tselectRt assertTruetfilenotclosedtcloset assertRaisest ValueError(tselftkq((s,/usr/local/lib/python2.7/test/test_kqueue.pyttest_create_queues  " cCstjjƒ}tj|ƒ}tjdƒ}|j|j|ƒ|j|jtjƒ|j|j tj ƒ|j|j dƒ|j|j dƒ|j|j dƒ|j||ƒ|j||ƒ|jt||ƒdƒ|j||kƒ|j||kƒ|jtt|dƒ|jtt|dƒ|jtt|dƒtj|tjƒ}|j|j|ƒ|j|jtjƒ|j|j tj ƒ|j|j dƒ|j|j dƒ|j|j dƒ|j||ƒ|j||ƒtj|tjtjƒ}|j|j|ƒ|j|jtjƒ|j|j tjƒ|j|j dƒ|j|j dƒ|j|j dƒ|j||ƒ|j||ƒtjddddd d ƒ}|j|jdƒ|j|jdƒ|j|j dƒ|j|j dƒ|j|j d ƒ|j|j d ƒ|j||ƒ|j||ƒd }tj|ddd|d|ƒ}|j|j|ƒ|j|jdƒ|j|j dƒ|j|j dƒ|j|j |dƒ|j|j |ƒ|j||ƒ|j||ƒdS( Nièiiÿÿÿÿiteviiiiiiÿ(tsyststderrRRtkeventt assertEqualtidenttfiltertKQ_FILTER_READtflagst KQ_EV_ADDtfflagstdatatudatatassertNotEqualtcmpRRt TypeErrortNonetKQ_FILTER_WRITEt KQ_EV_ONESHOT(R tfdR tothertbignum((s,/usr/local/lib/python2.7/test/test_kqueue.pyttest_create_eventsl"c Cstjƒ}|jdƒ|jdƒtjƒ}|jtƒy!|jd|jƒdfƒWn0tjk r’}|j|j dt j ƒnX|j ƒ\}}t jƒ}t jj|jƒƒ}t j|jƒt jt jt jBƒ}|j|gdƒt j|jƒt jt jt jBƒ}|j|gdƒt j|jƒt jt jt jBƒ}|j|gdƒt j|jƒt jt jt jBƒ}|j|gdƒ|jdddƒ} td„| Dƒƒ} |j| t|jƒt jf|jƒt jfgƒƒ|jdƒ|jdƒxYtdƒD]>} |jdddƒ} t| ƒdkrrPntjd ƒqAW|jd ƒtd „| Dƒƒ} |j| t|jƒt jf|jƒt jf|jƒt jf|jƒt jfgƒƒt j|jƒt jt jƒ}|j|gdƒt j|jƒt jt jƒ}|j|gdƒt j|jƒt jt jƒ}|j|gddƒ|jgdd ƒ} td „| Dƒƒ} |j| t|jƒt jfgƒƒ|j ƒ|j ƒ|j ƒdS(Ns 127.0.0.1iiicss!|]}|j|jfVqdS(N(RR(t.0te((s,/usr/local/lib/python2.7/test/test_kqueue.pys xssHello!sworld!!!i gð?s'timeout waiting for event notificationscss!|]}|j|jfVqdS(N(RR(R$R%((s,/usr/local/lib/python2.7/test/test_kqueue.pys ‰sg®Gáz®ï?css!|]}|j|jfVqdS(N(RR(R$R%((s,/usr/local/lib/python2.7/test/test_kqueue.pys Ÿs(s 127.0.0.1i(!tsockettbindtlistent setblockingtFalsetconnectt getsocknameterrorRtargsterrnot EINPROGRESStacceptRRtfromfdRRRRt KQ_EV_ENABLEtcontrolRRtsettsendtrangetlenttimetsleeptfailt KQ_EV_DELETER( R t serverSockettclientR%tservertaddrR tkq2R teventsti((s,/usr/local/lib/python2.7/test/test_kqueue.pyttest_queue_eventTs„     !            cCsôtjƒ}tjƒ\}}|jdƒtj|tjtjtjBƒ}tj|tjtjtjBƒ}|j ||gddƒ}|j |ƒ|j |dj tj @ƒ|j|j|djƒdƒ|jƒ|jƒ|jƒdS(Ntfooii(RRR&t socketpairR6RRRR3R4Rt assertFalseRt KQ_EV_ERRORRtrecvRR(R R tatbtevent1tevent2tr((s,/usr/local/lib/python2.7/test/test_kqueue.pyttestPair§s  ""    (t__name__t __module__R R#RDRO(((s,/usr/local/lib/python2.7/test/test_kqueue.pyRs  < ScCstjtƒdS(N(Rt run_unittestR(((s,/usr/local/lib/python2.7/test/test_kqueue.pyt test_main·st__main__(t__doc__R&R/R9RRtunittestttestRthasattrtSkipTesttTestCaseRRSRP(((s,/usr/local/lib/python2.7/test/test_kqueue.pyts      ¨