ó {G_Tc@s÷ddlZddlZddlZddlZddlZddlmZddlZddlZddl m Z ej ej dkdƒdej fd„ƒYƒZdefd „ƒYZd efd „ƒYZd „Zed króeƒndS(iÿÿÿÿN(t run_unittest(tFileIOtposixstests requires a posix system.tTestFileIOSignalInterruptcBs_eZd„Zd„Zd„Zdded„Zd„ZdZd„Z d„Z d „Z RS( cCs d|_dS(N(tNonet_process(tself((s0/usr/local/lib/python2.7/test/test_file_eintr.pytsetUpscCsJ|jrF|jjƒdkrFy|jjƒWqFtk rBqFXndS(N(RtpollRtkilltOSError(R((s0/usr/local/lib/python2.7/test/test_file_eintr.pyttearDown s  cCsdS(sReturns the infile = ... line of code for the reader process. subclasseses should override this to test different IO objects. s9import _io ;infile = _io.FileIO(sys.stdin.fileno(), "rb")((R((s0/usr/local/lib/python2.7/test/test_file_eintr.pyt_generate_infile_setup_code'stcCs¦|jjƒdkrJtjdƒy|jjƒWqJtk rFqJXn|r||jjƒ\}}||7}||7}n|jd||j ƒ|j ƒfƒdS(s;A common way to cleanup and fail with useful debug output. Kills the process if it is still running, collects remaining output and fails the test with an error message including the output. Args: why: Text to go after "Error from IO process" in the message. stdout, stderr: standard output and error from the process so far to include in the error message. communicate: bool, when True we call communicate() on the process after killing it to gather additional output. gš™™™™™¹?s/Error from IO process %s: STDOUT: %sSTDERR: %s N( RRRttimetsleept terminateR t communicatetfailtdecode(RtwhytstdouttstderrRt stdout_endt stderr_end((s0/usr/local/lib/python2.7/test/test_file_eintr.pytfail_with_process_info/s     c CsÜ|jƒ}t|ƒdks*tdƒ‚tjtjddd|dd|ddgd tjd tjd tjƒ|_|jj j td ƒƒ}|d kr»|j d d |ƒn|jj j |ƒd}g}xz|sVtj|jj gdddƒ\}}}|jjtjƒ|d7}|dkrÝ|jjƒ|jdƒqÝqÝW|jj jƒ}|dkr‹|j dd |ƒn|jjddƒ\} } |jjrØ|j d|jj| | dtƒndS(snGeneric buffered read method test harness to validate EINTR behavior. Also validates that Python signal handlers are run during the read. Args: data_to_write: String to write to the child process for reading before sending it a signal, confirming the signal was handled, writing a final newline and closing the infile pipe. read_and_verify_code: Single "line" of code to read from a file object named 'infile' and validate the result. This will be executed as part of a python subprocess fed data_to_write. is#data_to_write must fit in pipe buf.s-us-cs\import io, signal, sys ;signal.signal(signal.SIGINT, lambda s, f: sys.stderr.write("$\n")) ;s ;s"sys.stderr.write("Worm Sign!\n") ;sinfile.close()tstdinRRs Worm Sign! swhile awaiting a signigš™™™™™©?iiÈs,reader process failed to handle our signals.s$ swhile awaiting signaltinputs s exited rc=%dRN(((R tlentAssertionErrort subprocesstPopentsyst executabletPIPERRtreadRRtwritetselectt send_signaltsignaltSIGINTR RtreadlineRt returncodetFalse( Rt data_to_writetread_and_verify_codetinfile_setup_codet worm_signt signals_senttrlistt_t signal_lineRR((s0/usr/local/lib/python2.7/test/test_file_eintr.pyt _test_readingJs>       *        s­got = infile.{read_method_name}() ;expected = {expected!r} ;assert got == expected, ("{read_method_name} returned wrong data.\n""got data %r\nexpected %r" % (got, expected))c Cs/|jddd|jjddddƒƒdS( s1readline() must handle signals and not lose data.R,s hello, world!R-tread_method_nameR)texpectedshello, world! N(R4t_READING_CODE_TEMPLATEtformat(R((s0/usr/local/lib/python2.7/test/test_file_eintr.pyt test_readlinešs   c Cs5|jddd|jjdddddgƒƒd S( s2readlines() must handle signals and not lose data.R,s hello world!R-R5t readlinesR6shello sworld! N(R4R7R8(R((s0/usr/local/lib/python2.7/test/test_file_eintr.pyttest_readlines¢s   c CsZ|jddd|jjddddƒƒ|jddd|jjddddƒƒd S( s0readall() must handle signals and not lose data.R,s hello world!R-R5treadallR6s hello world! R#N(R4R7R8(R((s0/usr/local/lib/python2.7/test/test_file_eintr.pyt test_readallªs     ( t__name__t __module__RR R tTrueRR4R7R9R;R=(((s0/usr/local/lib/python2.7/test/test_file_eintr.pyRs     I  tTestBufferedIOSignalInterruptcBseZd„Zd„ZRS(cCsdS(s?Returns the infile = ... line of code to make a BufferedReader.seinfile = io.open(sys.stdin.fileno(), "rb") ;import _io ;assert isinstance(infile, _io.BufferedReader)((R((s0/usr/local/lib/python2.7/test/test_file_eintr.pyR ºsc Cs/|jddd|jjddddƒƒdS( s<BufferedReader.read() must handle signals and not lose data.R,s hello world!R-R5R#R6s hello world! N(R4R7R8(R((s0/usr/local/lib/python2.7/test/test_file_eintr.pyR=¿s   (R>R?R R=(((s0/usr/local/lib/python2.7/test/test_file_eintr.pyRA¹s tTestTextIOSignalInterruptcBs,eZd„Zd„Zd„Zd„ZRS(cCsdS(s>Returns the infile = ... line of code to make a TextIOWrapper.srinfile = io.open(sys.stdin.fileno(), "rt", newline=None) ;import _io ;assert isinstance(infile, _io.TextIOWrapper)((R((s0/usr/local/lib/python2.7/test/test_file_eintr.pyR Ésc Cs/|jddd|jjddddƒƒdS( s1readline() must handle signals and not lose data.R,s hello, world!R-R5R)R6shello, world! N(R4R7R8(R((s0/usr/local/lib/python2.7/test/test_file_eintr.pyR9Îs   c Cs5|jddd|jjdddddgƒƒd S( s2readlines() must handle signals and not lose data.R,s hello world!R-R5R:R6shello sworld! N(R4R7R8(R((s0/usr/local/lib/python2.7/test/test_file_eintr.pyR;Ös   c Cs/|jddd|jjddddƒƒdS( s-read() must handle signals and not lose data.R,s hello world!R-R5R#R6s hello world! N(R4R7R8(R((s0/usr/local/lib/python2.7/test/test_file_eintr.pyR=Þs   (R>R?R R9R;R=(((s0/usr/local/lib/python2.7/test/test_file_eintr.pyRBÈs   cCsQgtƒjƒD]-}t|tƒrt|tjƒr|^q}t|ŒdS(N(tglobalstvaluest isinstancettypet issubclasstunittesttTestCaseR(ttct test_cases((s0/usr/local/lib/python2.7/test/test_file_eintr.pyt test_mainçs-t__main__(tosR%R'RR ttest.test_supportRRRHt_ioRt skipUnlesstnameRIRRARBRLR>(((s0/usr/local/lib/python2.7/test/test_file_eintr.pyts