import os, commands, re, tempfile from subprocess import Popen, PIPE, STDOUT import buffer, default, dirutil, regex, term, util, window from point import Point from method import Method, Argument class Exec(Method): '''Execute a command in a shell and put the output in a new buffer''' show_success = True args = [Argument('cmd', prompt="Exec: ", datatype='shell')] def _doit(self, w, path, cmd, cmdname=None, bufname=None, cmddir=None, opts={}): if cmddir: cmd = "cd %r && %s" % (cmddir, cmd) d = dict(opts) if path: d['path'] = path try: cmd = cmd % d except: pass if bufname is None: bufname = '*%s*' % self.name.title() if cmdname is None: cmdname = cmd.split(None, 1)[0] p = Popen(cmd, shell=True, stdout=PIPE, stderr=STDOUT) output = p.stdout.read() result = p.wait() status = os.WEXITSTATUS(result) if not os.WIFEXITED(result): err = True errmsg = "%s: killed by signal %r" % (cmdname, os.WTERMSIG(result)) elif status != 0: err = True errmsg = "%s: failed with status %r" % (cmdname, status) else: err = False errmsg = "%s: ok" % (cmdname,) if output: switch_to = err or self.show_success w.application.data_buffer(bufname, output, switch_to=switch_to) w.set_error(errmsg) def _execute(self, w, **vargs): if w.buffer.btype == 'dir': name = dirutil.resolve_name(w) path = dirutil.resolve_path(w) self._doit(w, path, vargs['cmd']) dirutil.find_name(w, name) elif hasattr(w.buffer, 'path'): path = w.buffer.path self._doit(w, path, vargs['cmd']) else: self._doit(w, None, vargs['cmd']) class Man(Exec): '''Execute a command in a shell and put the output in a new buffer''' args = [Argument('name', prompt="Program: ")] def _execute(self, w, **vargs): name = vargs['name'] cmd = 'man %r' % name p = Popen(cmd, shell=True, stdout=PIPE, stderr=STDOUT) output = p.stdout.read() result = p.wait() status = os.WEXITSTATUS(result) if not os.WIFEXITED(result): err = True errmsg = "man: killed by signal %r" % os.WTERMSIG(result) elif status != 0: err = True errmsg = "man: failed with status %r" % status else: err = False errmsg = "man: ok" if output: xterm = term.XTerm() output = xterm.term_filter(output) switch_to = err or self.show_success w.application.data_buffer('*Manpage*', output, switch_to=switch_to) w.set_error(errmsg) class Pipe(Method): '''Pipe the buffer's contents through the command, and display the output in a new buffer''' args = [Argument('cmd', datatype="str", prompt="Command: ")] def _parse(self, w, **vargs): # return 3 things: prog name, cmd, and whether to use the shell m = regex.shell_command.match(vargs['cmd']) if m: prog = m.group(0) return (prog, vargs['cmd'], True) else: return (None, None, False) def _display(self, w, data, status, prog, cmd, shell): lines = data.split('\n') if lines and lines[-1] == '': lines = lines[:-1] if status == 0 and len(lines) == 1: w.set_error("%s output: %r" % (prog, lines[0])) else: bufname = '*%s*' % self.name.title() w.application.data_buffer(bufname, data, switch_to=True) w.set_error("%s exited with status %d" % (prog, status)) def _execute(self, w, **vargs): (prog, cmd, shell) = self._parse(w, **vargs) if prog is None or not cmd: return self._dopipe(w, prog, cmd, shell) def _dopipe(self, w, prog, cmd, shell): pipe = Popen(cmd, shell=shell, stdin=PIPE, stdout=PIPE, stderr=STDOUT) pid = pipe.pid indata = w.buffer.make_string() pipe.stdin.write(indata) pipe.stdin.close() outdata = pipe.stdout.read() status = pipe.wait() >> 8 self._display(w, outdata, status, prog, cmd, shell) class Grep(Pipe): '''Grep the buffer's contents for instances of a pattern, and display them in a new buffer''' args = [Argument('pattern', datatype="str", prompt="Pattern: ")] def _parse(self, w, **vargs): return ('grep', ('grep', '-E', '-n', vargs['pattern']), False) class Sed(Pipe): '''Push the buffer's contents through a sed expression''' args = [Argument('expression', datatype="str", prompt="Expression: ")] def _parse(self, w, **vargs): return ('grep', ('sed', '-r', '-e', vargs['expression']), False)