import os from subprocess import Popen, PIPE, STDOUT import dirutil import regex from buffer.emul import XTermBuffer from method import Method, arg from term import XTerm from window import Window class Exec(Method): '''Execute a command in a shell and put the output in a new buffer''' show_success = True args = [arg('cmd', dt='shell', p="Exec: ")] def _doit(self, w, path, cmd, cmdname=None, bufname=None, cmddir=None, opts={}): if cmddir: cmd = "cd %r && %s" % (cmddir, cmd) d = dict(opts) if path: d['path'] = path try: cmd = cmd % d except: pass if bufname is None: bufname = '*%s*' % self.name.title() if cmdname is None: cmdname = cmd.split(None, 1)[0] p = Popen(cmd, shell=True, stdout=PIPE, stderr=STDOUT) output = p.stdout.read() result = p.wait() status = os.WEXITSTATUS(result) if not os.WIFEXITED(result): err = True msg = "%s: killed by signal %r" % (cmdname, os.WTERMSIG(result)) elif status != 0: err = True msg = "%s: failed with status %r" % (cmdname, status) else: err = False msg = "%s: ok" % (cmdname,) if output: switch_to = err or self.show_success w.application.data_buffer(bufname, output, switch_to=switch_to) w.set_error(msg) def _execute(self, w, **vargs): if w.buffer.btype == 'dir': name, path = dirutil.resolve_name_path(w) self._doit(w, path, vargs['cmd']) dirutil.find_name(w, name) elif hasattr(w.buffer, 'path'): path = w.buffer.path self._doit(w, path, vargs['cmd']) else: self._doit(w, None, vargs['cmd']) class Man(Exec): '''Execute a command in a shell and put the output in a new buffer''' args = [arg('name', p="Program: ")] def _execute(self, w, **vargs): name = vargs['name'] cmd = 'man %r' % name p = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE) output = p.stdout.read() result = p.wait() status = os.WEXITSTATUS(result) if not os.WIFEXITED(result): err = True errmsg = "man: killed by signal %r" % os.WTERMSIG(result) elif status != 0: err = True errmsg = "man: failed with status %r" % status else: err = False errmsg = "man: ok" if output: xterm = XTerm(cbuf=True) s = xterm.term_filter(output) switch = err or self.show_success w.application.color_data_buffer('*Manpage*', s, switch_to=switch) w.set_error(errmsg) class Pipe(Method): '''Pipe the buffer's contents through the command, and display the output in a new buffer''' args = [arg('cmd', dt="shell", p="Pipe: ")] def _parse(self, w, **vargs): # return 3 things: prog name, cmd, and whether to use the shell m = regex.shell_command.match(vargs['cmd']) if m: prog = m.group(0) return (prog, vargs['cmd'], True) else: return (None, None, False) def _display(self, w, data, status, prog, cmd, shell): lines = data.split('\n') if lines and lines[-1] == '': lines = lines[:-1] if status == 0 and len(lines) == 1: w.set_error("%s output: %r" % (prog, lines[0])) else: bufname = '*%s*' % self.name.title() w.application.data_buffer(bufname, data, switch_to=True) w.set_error("%s exited with status %d" % (prog, status)) def _execute(self, w, **vargs): (prog, cmd, shell) = self._parse(w, **vargs) if prog is None or not cmd: return self._dopipe(w, prog, cmd, shell) def _dopipe(self, w, prog, cmd, shell): pipe = Popen(cmd, shell=shell, stdin=PIPE, stdout=PIPE, stderr=STDOUT) pid = pipe.pid indata = w.buffer.make_string() pipe.stdin.write(indata) pipe.stdin.close() outdata = pipe.stdout.read() status = pipe.wait() >> 8 self._display(w, outdata, status, prog, cmd, shell) class Grep(Pipe): '''Grep the buffer's contents for instances of a pattern, and display them in a new buffer''' args = [arg('pattern', dt="str", p="Pattern: ")] def _parse(self, w, **vargs): return ('grep', ('grep', '-E', '-n', vargs['pattern']), False) class Sed(Pipe): '''Push the buffer's contents through a sed expression''' args = [arg('expression', dt="str", p="Expression: ")] def _parse(self, w, **vargs): return ('grep', ('sed', '-r', '-e', vargs['expression']), False) class Interact(Method): '''Interact with a program via a PTY''' args = [arg('bname', dt="str", p="Buffer Name: ", dv=lambda w: '*Interact*'), arg('cmd', dt="shell", p="Command: ", dv=lambda w: 'bash')] modename = None reuse = False def _execute(self, w, **vargs): bname = vargs['bname'] cmd = vargs['cmd'] a = w.application if self.reuse and a.has_buffer_name(bname): a.switch_buffer(a.get_buffer_by_name(bname)) return a.close_buffer_by_name(bname) b = XTermBuffer(a, 'bash', ['-c', cmd], name=bname, modename=self.modename) a.add_buffer(b) Window(b, a) if a.window().buffer is not b: a.switch_buffer(b)