pmacs3/method/shell.py

156 lines
5.6 KiB
Python
Raw Normal View History

2008-11-08 10:30:04 -05:00
import os, commands, re, tempfile
from subprocess import Popen, PIPE, STDOUT
2008-10-19 20:01:58 -04:00
import buffer, default, dirutil, regex, term, util, window
from point import Point
from method import Method, Argument
class Exec(Method):
'''Execute a command in a shell and put the output in a new buffer'''
show_success = True
args = [Argument('cmd', prompt="Exec: ", datatype='shell')]
2008-10-04 12:09:15 -04:00
def _doit(self, w, path, cmd, cmdname=None, bufname=None, cmddir=None, opts={}):
if cmddir:
cmd = "cd %r && %s" % (cmddir, cmd)
2008-10-04 12:09:15 -04:00
d = dict(opts)
if path:
2008-10-04 12:09:15 -04:00
d['path'] = path
try:
cmd = cmd % d
except:
pass
if bufname is None: bufname = '*%s*' % self.name.title()
if cmdname is None: cmdname = cmd.split(None, 1)[0]
p = Popen(cmd, shell=True, stdout=PIPE, stderr=STDOUT)
output = p.stdout.read()
result = p.wait()
status = os.WEXITSTATUS(result)
if not os.WIFEXITED(result):
err = True
errmsg = "%s: killed by signal %r" % (cmdname, os.WTERMSIG(result))
elif status != 0:
err = True
errmsg = "%s: failed with status %r" % (cmdname, status)
else:
err = False
errmsg = "%s: ok" % (cmdname,)
if output:
switch_to = err or self.show_success
w.application.data_buffer(bufname, output, switch_to=switch_to)
w.set_error(errmsg)
def _execute(self, w, **vargs):
if w.buffer.btype == 'dir':
name = dirutil.resolve_name(w)
path = dirutil.resolve_path(w)
self._doit(w, path, vargs['cmd'])
dirutil.find_name(w, name)
elif hasattr(w.buffer, 'path'):
path = w.buffer.path
self._doit(w, path, vargs['cmd'])
else:
self._doit(w, None, vargs['cmd'])
2008-10-19 20:01:58 -04:00
class Man(Exec):
'''Execute a command in a shell and put the output in a new buffer'''
args = [Argument('name', prompt="Program: ")]
def _execute(self, w, **vargs):
name = vargs['name']
cmd = 'man %r' % name
p = Popen(cmd, shell=True, stdout=PIPE, stderr=STDOUT)
output = p.stdout.read()
result = p.wait()
status = os.WEXITSTATUS(result)
if not os.WIFEXITED(result):
err = True
errmsg = "man: killed by signal %r" % os.WTERMSIG(result)
elif status != 0:
err = True
errmsg = "man: failed with status %r" % status
else:
err = False
errmsg = "man: ok"
if output:
xterm = term.XTerm()
2008-11-03 09:30:06 -05:00
output = xterm.term_filter(output)
2008-10-19 20:01:58 -04:00
switch_to = err or self.show_success
w.application.data_buffer('*Manpage*', output, switch_to=switch_to)
w.set_error(errmsg)
class Pipe(Method):
'''Pipe the buffer's contents through the command, and display the output in a new buffer'''
args = [Argument('cmd', datatype="shell", prompt="Pipe: ")]
def _parse(self, w, **vargs):
# return 3 things: prog name, cmd, and whether to use the shell
m = regex.shell_command.match(vargs['cmd'])
if m:
prog = m.group(0)
return (prog, vargs['cmd'], True)
else:
return (None, None, False)
def _display(self, w, data, status, prog, cmd, shell):
2008-03-20 09:31:43 -04:00
lines = data.split('\n')
if lines and lines[-1] == '':
lines = lines[:-1]
if status == 0 and len(lines) == 1:
w.set_error("%s output: %r" % (prog, lines[0]))
else:
bufname = '*%s*' % self.name.title()
w.application.data_buffer(bufname, data, switch_to=True)
w.set_error("%s exited with status %d" % (prog, status))
def _execute(self, w, **vargs):
(prog, cmd, shell) = self._parse(w, **vargs)
if prog is None or not cmd:
return
2008-10-06 00:55:38 -04:00
self._dopipe(w, prog, cmd, shell)
2008-10-06 00:55:38 -04:00
def _dopipe(self, w, prog, cmd, shell):
pipe = Popen(cmd, shell=shell, stdin=PIPE, stdout=PIPE, stderr=STDOUT)
pid = pipe.pid
indata = w.buffer.make_string()
pipe.stdin.write(indata)
pipe.stdin.close()
outdata = pipe.stdout.read()
status = pipe.wait() >> 8
self._display(w, outdata, status, prog, cmd, shell)
class Grep(Pipe):
'''Grep the buffer's contents for instances of a pattern, and display them in a new buffer'''
args = [Argument('pattern', datatype="str", prompt="Pattern: ")]
def _parse(self, w, **vargs):
return ('grep', ('grep', '-E', '-n', vargs['pattern']), False)
class Sed(Pipe):
'''Push the buffer's contents through a sed expression'''
args = [Argument('expression', datatype="str", prompt="Expression: ")]
def _parse(self, w, **vargs):
return ('grep', ('sed', '-r', '-e', vargs['expression']), False)
class Interact(Method):
'''Interact with a program via a PTY'''
args = [Argument('bname', datatype="str", prompt="Buffer Name: ",
default=default.build_constant('*Interact*')),
Argument('cmd', datatype="shell", prompt="Command: ",
default=default.build_constant('bash'))]
def _execute(self, w, **vargs):
bname = vargs['bname']
cmd = vargs['cmd']
a = w.application
a.close_buffer_by_name(bname)
2009-03-05 03:08:33 -05:00
#b = buffer.emul.XTermBuffer(a, cmd, [], name=bname)
b = buffer.emul.XTermBuffer(a, 'bash', ['-c', cmd], name=bname)
a.add_buffer(b)
window.Window(b, a)
if a.window().buffer is not b:
a.switch_buffer(b)