#!/usr/bin/env python
"""Start process; wait 2 seconds; kill the process; print all process output."""
import subprocess
import tempfile
import time
def main():
# open temporary file (it automatically deleted when it is closed)
# `Popen` requires `f.fileno()` so `SpooledTemporaryFile` adds nothing here
f = tempfile.TemporaryFile()
# start process, redirect stdout
p = subprocess.Popen(["top"], stdout=f)
# wait 2 seconds
time.sleep(2)
# kill process
#NOTE: if it doesn't kill the process then `p.wait()` blocks forever
p.terminate()
p.wait() # wait for the process to terminate otherwise the output is garbled
# print saved output
f.seek(0) # rewind to the beginning of the file
print f.read(),
f.close()
if __name__=="__main__":
main()
Tail-like Solutions that print only the portion of the output
You could read the process output in another thread and save the required number of the last lines in a queue:
import collections
import subprocess
import time
import threading
def read_output(process, append):
for line in iter(process.stdout.readline, ""):
append(line)
def main():
# start process, redirect stdout
process = subprocess.Popen(["top"], stdout=subprocess.PIPE, close_fds=True)
try:
# save last `number_of_lines` lines of the process output
number_of_lines = 200
q = collections.deque(maxlen=number_of_lines) # atomic .append()
t = threading.Thread(target=read_output, args=(process, q.append))
t.daemon = True
t.start()
#
time.sleep(2)
finally:
process.terminate() #NOTE: it doesn't ensure the process termination
# print saved lines
print ''.join(q)
if __name__=="__main__":
main()
This variant requires q.append()
to be atomic operation. Otherwise the output might be corrupted.
signal.alarm()
solution
You could use signal.alarm()
to call the process.terminate()
after specified timeout instead of reading in another thread. Though it might not interact very well with the subprocess
module. Based on @Alex Martelli’s answer:
import collections
import signal
import subprocess
class Alarm(Exception):
pass
def alarm_handler(signum, frame):
raise Alarm
def main():
# start process, redirect stdout
process = subprocess.Popen(["top"], stdout=subprocess.PIPE, close_fds=True)
# set signal handler
signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(2) # produce SIGALRM in 2 seconds
try:
# save last `number_of_lines` lines of the process output
number_of_lines = 200
q = collections.deque(maxlen=number_of_lines)
for line in iter(process.stdout.readline, ""):
q.append(line)
signal.alarm(0) # cancel alarm
except Alarm:
process.terminate()
finally:
# print saved lines
print ''.join(q)
if __name__=="__main__":
main()
This approach works only on *nix systems. It might block if process.stdout.readline()
doesn’t return.
threading.Timer
solution
import collections
import subprocess
import threading
def main():
# start process, redirect stdout
process = subprocess.Popen(["top"], stdout=subprocess.PIPE, close_fds=True)
# terminate process in timeout seconds
timeout = 2 # seconds
timer = threading.Timer(timeout, process.terminate)
timer.start()
# save last `number_of_lines` lines of the process output
number_of_lines = 200
q = collections.deque(process.stdout, maxlen=number_of_lines)
timer.cancel()
# print saved lines
print ''.join(q),
if __name__=="__main__":
main()
This approach should also work on Windows. Here I’ve used process.stdout
as an iterable; it might introduce an additional output buffering, you could switch to the iter(process.stdout.readline, "")
approach if it is not desirable. if the process doesn’t terminate on process.terminate()
then the scripts hangs.
No threads, no signals solution
import collections
import subprocess
import sys
import time
def main():
args = sys.argv[1:]
if not args:
args = ['top']
# start process, redirect stdout
process = subprocess.Popen(args, stdout=subprocess.PIPE, close_fds=True)
# save last `number_of_lines` lines of the process output
number_of_lines = 200
q = collections.deque(maxlen=number_of_lines)
timeout = 2 # seconds
now = start = time.time()
while (now - start) < timeout:
line = process.stdout.readline()
if not line:
break
q.append(line)
now = time.time()
else: # on timeout
process.terminate()
# print saved lines
print ''.join(q),
if __name__=="__main__":
main()
This variant use neither threads, no signals but it produces garbled output in the terminal. It will block if process.stdout.readline()
blocks.