pmacs3/method/shell.py

158 lines
5.3 KiB
Python
Raw Normal View History

2009-03-21 15:41:33 -04:00
import os
from subprocess import Popen, PIPE, STDOUT
2009-03-21 15:41:33 -04:00
import dirutil
import regex
from buffer.emul import XTermBuffer
from method import Method, arg
from term import XTerm
from window import Window
class Exec(Method):
'''Execute a command in a shell and put the output in a new buffer'''
show_success = True
2009-03-21 15:41:33 -04:00
args = [arg('cmd', dt='shell', p="Exec: ")]
2008-10-04 12:09:15 -04:00
def _doit(self, w, path, cmd, cmdname=None, bufname=None, cmddir=None, opts={}):
2009-03-21 15:41:33 -04:00
if cmddir: cmd = "cd %r && %s" % (cmddir, cmd)
2008-10-04 12:09:15 -04:00
d = dict(opts)
2009-03-21 15:41:33 -04:00
if path: d['path'] = path
2008-10-04 12:09:15 -04:00
try:
cmd = cmd % d
except:
pass
if bufname is None: bufname = '*%s*' % self.name.title()
if cmdname is None: cmdname = cmd.split(None, 1)[0]
p = Popen(cmd, shell=True, stdout=PIPE, stderr=STDOUT)
output = p.stdout.read()
result = p.wait()
status = os.WEXITSTATUS(result)
try:
output2 = output.decode('utf-8')
except:
output2 = output.decode('latin-1')
if not os.WIFEXITED(result) or status != 0:
err = True
msg = "failed to exec %r" % cmdname
else:
err = False
msg = "exec(%r): ok" % (cmdname,)
if output2:
switch_to = err or self.show_success
w.application.data_buffer(bufname, output2, switch_to=switch_to)
w.set_error(msg)
def _execute(self, w, **vargs):
if w.buffer.btype == 'dir':
2009-03-21 15:41:33 -04:00
name, path = dirutil.resolve_name_path(w)
self._doit(w, path, vargs['cmd'])
dirutil.find_name(w, name)
elif hasattr(w.buffer, 'path'):
path = w.buffer.path
self._doit(w, path, vargs['cmd'])
else:
self._doit(w, None, vargs['cmd'])
2008-10-19 20:01:58 -04:00
class Man(Exec):
'''Execute a command in a shell and put the output in a new buffer'''
2009-03-21 15:41:33 -04:00
args = [arg('name', p="Program: ")]
2008-10-19 20:01:58 -04:00
def _execute(self, w, **vargs):
name = vargs['name']
cmd = 'man %r' % name
2009-03-14 19:50:16 -04:00
p = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)
2008-10-19 20:01:58 -04:00
output = p.stdout.read()
result = p.wait()
status = os.WEXITSTATUS(result)
if not os.WIFEXITED(result) or status != 0:
2008-10-19 20:01:58 -04:00
err = True
errmsg = "found no page for %r" % name
2008-10-19 20:01:58 -04:00
else:
err = False
errmsg = "man(%r): ok" % name
2008-10-19 20:01:58 -04:00
if output:
2009-03-21 15:41:33 -04:00
xterm = XTerm(cbuf=True)
s = xterm.term_filter(output)
switch = err or self.show_success
w.application.color_data_buffer('*Manpage*', s, switch_to=switch)
2008-10-19 20:01:58 -04:00
w.set_error(errmsg)
class Pipe(Method):
'''Pipe the buffer's contents through the command, and display the output in a new buffer'''
2009-03-21 15:41:33 -04:00
args = [arg('cmd', dt="shell", p="Pipe: ")]
def _parse(self, w, **vargs):
# return 3 things: prog name, cmd, and whether to use the shell
m = regex.shell_command.match(vargs['cmd'])
if m:
prog = m.group(0)
return (prog, vargs['cmd'], True)
else:
return (None, None, False)
def _display(self, w, data, status, prog, cmd, shell):
2008-03-20 09:31:43 -04:00
lines = data.split('\n')
if lines and lines[-1] == '':
lines = lines[:-1]
2009-04-26 22:13:57 -04:00
bufname = '*%s*' % self.name.title()
b = w.application.data_buffer(bufname, data, switch_to=True, modename='error')
b.orig_path = w.buffer.path
w.set_error("%s exited with status %d" % (prog, status))
def _execute(self, w, **vargs):
(prog, cmd, shell) = self._parse(w, **vargs)
if prog is None or not cmd:
return
2009-04-26 22:13:57 -04:00
2008-10-06 00:55:38 -04:00
self._dopipe(w, prog, cmd, shell)
2008-10-06 00:55:38 -04:00
def _dopipe(self, w, prog, cmd, shell):
pipe = Popen(cmd, shell=shell, stdin=PIPE, stdout=PIPE, stderr=STDOUT)
pid = pipe.pid
indata = w.buffer.make_string()
pipe.stdin.write(indata)
pipe.stdin.close()
outdata = pipe.stdout.read()
status = pipe.wait() >> 8
self._display(w, outdata, status, prog, cmd, shell)
class Grep(Pipe):
'''Grep the buffer's contents for instances of a pattern, and display them in a new buffer'''
2009-03-21 15:41:33 -04:00
args = [arg('pattern', dt="str", p="Pattern: ")]
def _parse(self, w, **vargs):
return ('grep', ('grep', '-E', '-n', vargs['pattern']), False)
2009-04-26 22:13:57 -04:00
class Sed(Pipe):
'''Push the buffer's contents through a sed expression'''
2009-03-21 15:41:33 -04:00
args = [arg('expression', dt="str", p="Expression: ")]
def _parse(self, w, **vargs):
2009-04-26 22:13:57 -04:00
return ('sed', ('sed', '-r', '-e', vargs['expression']), False)
class Interact(Method):
'''Interact with a program via a PTY'''
2009-03-21 15:41:33 -04:00
args = [arg('bname', dt="str", p="Buffer Name: ", dv=lambda w: '*Interact*'),
arg('cmd', dt="shell", p="Command: ", dv=lambda w: 'bash')]
2009-03-11 23:53:56 -04:00
modename = None
2009-04-06 02:23:41 -04:00
reuse = False
def _execute(self, w, **vargs):
bname = vargs['bname']
cmd = vargs['cmd']
a = w.application
2009-04-06 02:23:41 -04:00
if self.reuse and a.has_buffer_name(bname):
a.switch_buffer(a.get_buffer_by_name(bname))
return
a.close_buffer_by_name(bname)
2009-04-06 02:20:43 -04:00
b = XTermBuffer(a, 'bash', ['-c', cmd], name=bname,
modename=self.modename)
a.add_buffer(b)
2009-03-21 15:41:33 -04:00
Window(b, a)
if a.window().buffer is not b:
a.switch_buffer(b)