159 lines
5.5 KiB
Python
159 lines
5.5 KiB
Python
import os
|
|
from subprocess import Popen, PIPE, STDOUT
|
|
|
|
import dirutil
|
|
import regex
|
|
from buffer.emul import XTermBuffer
|
|
from method import Method, arg
|
|
from term import XTerm
|
|
from window import Window
|
|
|
|
class Exec(Method):
|
|
'''Execute a command in a shell and put the output in a new buffer'''
|
|
show_success = True
|
|
args = [arg('cmd', dt='shell', p="Exec: ")]
|
|
def _doit(self, w, path, cmd, cmdname=None, bufname=None, cmddir=None, opts={}):
|
|
if cmddir: cmd = "cd %r && %s" % (cmddir, cmd)
|
|
d = dict(opts)
|
|
if path: d['path'] = path
|
|
|
|
try:
|
|
cmd = cmd % d
|
|
except:
|
|
pass
|
|
|
|
if bufname is None: bufname = '*%s*' % self.name.title()
|
|
if cmdname is None: cmdname = cmd.split(None, 1)[0]
|
|
|
|
p = Popen(cmd, shell=True, stdout=PIPE, stderr=STDOUT)
|
|
output = p.stdout.read()
|
|
result = p.wait()
|
|
status = os.WEXITSTATUS(result)
|
|
|
|
if not os.WIFEXITED(result):
|
|
err = True
|
|
msg = "%s: killed by signal %r" % (cmdname, os.WTERMSIG(result))
|
|
elif status != 0:
|
|
err = True
|
|
msg = "%s: failed with status %r" % (cmdname, status)
|
|
else:
|
|
err = False
|
|
msg = "%s: ok" % (cmdname,)
|
|
|
|
if output:
|
|
switch_to = err or self.show_success
|
|
w.application.data_buffer(bufname, output, switch_to=switch_to)
|
|
w.set_error(msg)
|
|
|
|
def _execute(self, w, **vargs):
|
|
if w.buffer.btype == 'dir':
|
|
name, path = dirutil.resolve_name_path(w)
|
|
self._doit(w, path, vargs['cmd'])
|
|
dirutil.find_name(w, name)
|
|
elif hasattr(w.buffer, 'path'):
|
|
path = w.buffer.path
|
|
self._doit(w, path, vargs['cmd'])
|
|
else:
|
|
self._doit(w, None, vargs['cmd'])
|
|
|
|
class Man(Exec):
|
|
'''Execute a command in a shell and put the output in a new buffer'''
|
|
args = [arg('name', p="Program: ")]
|
|
def _execute(self, w, **vargs):
|
|
name = vargs['name']
|
|
cmd = 'man %r' % name
|
|
p = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)
|
|
output = p.stdout.read()
|
|
result = p.wait()
|
|
status = os.WEXITSTATUS(result)
|
|
if not os.WIFEXITED(result):
|
|
err = True
|
|
errmsg = "man: killed by signal %r" % os.WTERMSIG(result)
|
|
elif status != 0:
|
|
err = True
|
|
errmsg = "man: failed with status %r" % status
|
|
else:
|
|
err = False
|
|
errmsg = "man: ok"
|
|
if output:
|
|
xterm = XTerm(cbuf=True)
|
|
s = xterm.term_filter(output)
|
|
switch = err or self.show_success
|
|
w.application.color_data_buffer('*Manpage*', s, switch_to=switch)
|
|
w.set_error(errmsg)
|
|
|
|
class Pipe(Method):
|
|
'''Pipe the buffer's contents through the command, and display the output in a new buffer'''
|
|
args = [arg('cmd', dt="shell", p="Pipe: ")]
|
|
def _parse(self, w, **vargs):
|
|
# return 3 things: prog name, cmd, and whether to use the shell
|
|
m = regex.shell_command.match(vargs['cmd'])
|
|
if m:
|
|
prog = m.group(0)
|
|
return (prog, vargs['cmd'], True)
|
|
else:
|
|
return (None, None, False)
|
|
|
|
def _display(self, w, data, status, prog, cmd, shell):
|
|
lines = data.split('\n')
|
|
if lines and lines[-1] == '':
|
|
lines = lines[:-1]
|
|
if status == 0 and len(lines) == 1:
|
|
w.set_error("%s output: %r" % (prog, lines[0]))
|
|
else:
|
|
bufname = '*%s*' % self.name.title()
|
|
w.application.data_buffer(bufname, data, switch_to=True)
|
|
w.set_error("%s exited with status %d" % (prog, status))
|
|
|
|
def _execute(self, w, **vargs):
|
|
(prog, cmd, shell) = self._parse(w, **vargs)
|
|
if prog is None or not cmd:
|
|
return
|
|
self._dopipe(w, prog, cmd, shell)
|
|
|
|
def _dopipe(self, w, prog, cmd, shell):
|
|
pipe = Popen(cmd, shell=shell, stdin=PIPE, stdout=PIPE, stderr=STDOUT)
|
|
pid = pipe.pid
|
|
|
|
indata = w.buffer.make_string()
|
|
pipe.stdin.write(indata)
|
|
pipe.stdin.close()
|
|
|
|
outdata = pipe.stdout.read()
|
|
status = pipe.wait() >> 8
|
|
|
|
self._display(w, outdata, status, prog, cmd, shell)
|
|
|
|
class Grep(Pipe):
|
|
'''Grep the buffer's contents for instances of a pattern, and display them in a new buffer'''
|
|
args = [arg('pattern', dt="str", p="Pattern: ")]
|
|
def _parse(self, w, **vargs):
|
|
return ('grep', ('grep', '-E', '-n', vargs['pattern']), False)
|
|
class Sed(Pipe):
|
|
'''Push the buffer's contents through a sed expression'''
|
|
args = [arg('expression', dt="str", p="Expression: ")]
|
|
def _parse(self, w, **vargs):
|
|
return ('grep', ('sed', '-r', '-e', vargs['expression']), False)
|
|
|
|
class Interact(Method):
|
|
'''Interact with a program via a PTY'''
|
|
args = [arg('bname', dt="str", p="Buffer Name: ", dv=lambda w: '*Interact*'),
|
|
arg('cmd', dt="shell", p="Command: ", dv=lambda w: 'bash')]
|
|
modename = None
|
|
reuse = False
|
|
def _execute(self, w, **vargs):
|
|
bname = vargs['bname']
|
|
cmd = vargs['cmd']
|
|
a = w.application
|
|
if self.reuse and a.has_buffer_name(bname):
|
|
a.switch_buffer(a.get_buffer_by_name(bname))
|
|
return
|
|
|
|
a.close_buffer_by_name(bname)
|
|
b = XTermBuffer(a, 'bash', ['-c', cmd], name=bname,
|
|
modename=self.modename)
|
|
a.add_buffer(b)
|
|
Window(b, a)
|
|
if a.window().buffer is not b:
|
|
a.switch_buffer(b)
|