pmacs3/method/shell.py

158 lines
5.3 KiB
Python

import os
from subprocess import Popen, PIPE, STDOUT
import dirutil
import regex
from buffer.emul import XTermBuffer
from method import Method, arg
from term import XTerm
from window import Window
class Exec(Method):
'''Execute a command in a shell and put the output in a new buffer'''
show_success = True
args = [arg('cmd', dt='shell', p="Exec: ")]
def _doit(self, w, path, cmd, cmdname=None, bufname=None, cmddir=None, opts={}):
if cmddir: cmd = "cd %r && %s" % (cmddir, cmd)
d = dict(opts)
if path: d['path'] = path
try:
cmd = cmd % d
except:
pass
if bufname is None: bufname = '*%s*' % self.name.title()
if cmdname is None: cmdname = cmd.split(None, 1)[0]
p = Popen(cmd, shell=True, stdout=PIPE, stderr=STDOUT)
output = p.stdout.read()
result = p.wait()
status = os.WEXITSTATUS(result)
try:
output2 = output.decode('utf-8')
except:
output2 = output.decode('latin-1')
if not os.WIFEXITED(result) or status != 0:
err = True
msg = "failed to exec %r" % cmdname
else:
err = False
msg = "exec(%r): ok" % (cmdname,)
if output2:
switch_to = err or self.show_success
w.application.data_buffer(bufname, output2, switch_to=switch_to)
w.set_error(msg)
def _execute(self, w, **vargs):
if w.buffer.btype == 'dir':
name, path = dirutil.resolve_name_path(w)
self._doit(w, path, vargs['cmd'])
dirutil.find_name(w, name)
elif hasattr(w.buffer, 'path'):
path = w.buffer.path
self._doit(w, path, vargs['cmd'])
else:
self._doit(w, None, vargs['cmd'])
class Man(Exec):
'''Execute a command in a shell and put the output in a new buffer'''
args = [arg('name', p="Program: ")]
def _execute(self, w, **vargs):
name = vargs['name']
cmd = 'man %r' % name
p = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)
output = p.stdout.read()
result = p.wait()
status = os.WEXITSTATUS(result)
if not os.WIFEXITED(result) or status != 0:
err = True
errmsg = "found no page for %r" % name
else:
err = False
errmsg = "man(%r): ok" % name
if output:
xterm = XTerm(cbuf=True)
s = xterm.term_filter(output)
switch = err or self.show_success
w.application.color_data_buffer('*Manpage*', s, switch_to=switch)
w.set_error(errmsg)
class Pipe(Method):
'''Pipe the buffer's contents through the command, and display the output in a new buffer'''
args = [arg('cmd', dt="shell", p="Pipe: ")]
def _parse(self, w, **vargs):
# return 3 things: prog name, cmd, and whether to use the shell
m = regex.shell_command.match(vargs['cmd'])
if m:
prog = m.group(0)
return (prog, vargs['cmd'], True)
else:
return (None, None, False)
def _display(self, w, data, status, prog, cmd, shell):
lines = data.split('\n')
if lines and lines[-1] == '':
lines = lines[:-1]
bufname = '*%s*' % self.name.title()
b = w.application.data_buffer(bufname, data, switch_to=True, modename='error')
b.orig_path = w.buffer.path
w.set_error("%s exited with status %d" % (prog, status))
def _execute(self, w, **vargs):
(prog, cmd, shell) = self._parse(w, **vargs)
if prog is None or not cmd:
return
self._dopipe(w, prog, cmd, shell)
def _dopipe(self, w, prog, cmd, shell):
pipe = Popen(cmd, shell=shell, stdin=PIPE, stdout=PIPE, stderr=STDOUT)
pid = pipe.pid
indata = w.buffer.make_string()
pipe.stdin.write(indata)
pipe.stdin.close()
outdata = pipe.stdout.read()
status = pipe.wait() >> 8
self._display(w, outdata, status, prog, cmd, shell)
class Grep(Pipe):
'''Grep the buffer's contents for instances of a pattern, and display them in a new buffer'''
args = [arg('pattern', dt="str", p="Pattern: ")]
def _parse(self, w, **vargs):
return ('grep', ('grep', '-E', '-n', vargs['pattern']), False)
class Sed(Pipe):
'''Push the buffer's contents through a sed expression'''
args = [arg('expression', dt="str", p="Expression: ")]
def _parse(self, w, **vargs):
return ('sed', ('sed', '-r', '-e', vargs['expression']), False)
class Interact(Method):
'''Interact with a program via a PTY'''
args = [arg('bname', dt="str", p="Buffer Name: ", dv=lambda w: '*Interact*'),
arg('cmd', dt="shell", p="Command: ", dv=lambda w: 'bash')]
modename = None
reuse = False
def _execute(self, w, **vargs):
bname = vargs['bname']
cmd = vargs['cmd']
a = w.application
if self.reuse and a.has_buffer_name(bname):
a.switch_buffer(a.get_buffer_by_name(bname))
return
a.close_buffer_by_name(bname)
b = XTermBuffer(a, 'bash', ['-c', cmd], name=bname,
modename=self.modename)
a.add_buffer(b)
Window(b, a)
if a.window().buffer is not b:
a.switch_buffer(b)