import os, commands, popen2, re, sets import buffer2, default, dirutil, regex, util, window2 from point2 import Point WHITESPACE = [' ', '\n'] DATATYPES = { "path": None, "buffer": None, "method": None, "command": None, "shell": None, "shellcommand": None, } class MethodError(Exception): pass class Argument: def __init__(self, name, type=type(""), datatype=None, prompt=None, help="", default=default.none, load_default=False): self.name = name self.type = type self.datatype = datatype if prompt is None: self.prompt = "%s: " % (name) else: self.prompt = prompt self.help = help self.load_default = load_default self.default = default def coerce_to_type(self, value): if self.type == type(0): try: return int(value, 0) except: raise Exception, "expected int; got %s" % (repr(value)) else: return value def ask_for_value(self, method, w, **vargs): app = w.application assert app.mini_buffer_is_open() is False, "Recursive minibuffer antics" vargs2 = vargs.copy() assert callable(self.default), "default value func must be callable" if self.load_default: d = None starting_value = self.default(w) else: d = self.default(w) starting_value = None def return_value(v): if d is not None and v == "": v = d vargs2[self.name] = self.coerce_to_type(v) app.close_mini_buffer() method.execute(w, **vargs2) tabber = DATATYPES.get(self.datatype, None) if d is not None: p = self.prompt + "(%s) " % (d) else: p = self.prompt app.open_mini_buffer(p, return_value, method, tabber) if starting_value: app.mini_buffer.set_data(starting_value) class Method: _is_method = True args = [] def __init__(self): self.name = self._name() self.help = self.__doc__ def _name(cls): s = cls.__name__ s2 = s[0].lower() for c in s[1:]: if c.isupper(): s2 += '-' + c.lower() elif c == '_': s2 += '-' else: s2 += c return s2 _name = classmethod(_name) def _pre_execute(self, w, **vargs): pass def execute(self, w, **vargs): try: self._pre_execute(w, **vargs) except MethodError, e: w.application.set_error(str(e)) return for arg in self.args: if arg.name not in vargs: self.old_window = w arg.ask_for_value(self, w, **vargs) return self._execute(w, **vargs) def _execute(self, w, **vargs): raise Exception, "Unimplemented Method: %s %r" % (self.name, vargs) class GotoChar(Method): '''Jump to the specified character''' args = [Argument("charno", type=type(0), prompt="Goto char: ")] def _execute(self, w, **vargs): w.goto_char(vargs["charno"]) class ForwardChars(Method): '''Move forward the specified number of characters''' args = [Argument("charno", type=type(0), prompt="Forward chars: ")] def _execute(self, w, **vargs): w.forward_chars(vargs["charno"]) class GotoLine(Method): '''Jump to the specified line number''' args = [Argument("lineno", type=type(0), prompt="Goto line: ")] def _execute(self, w, **vargs): n = vargs["lineno"] if n < 0: n = len(w.buffer.lines) + n + 1 if n > len(w.buffer.lines): n = len(w.buffer.lines) elif n < 1: n = 1 w.goto_line(n) class ForwardLines(Method): '''Move forward the specified number of characters''' args = [Argument("lineno", type=type(0), prompt="Forward lines: ")] def _execute(self, w, **vargs): w.forward_lines(vargs["lineno"]) # search and replace class Search(Method): '''Interactive search; finds next occurance of text in buffer''' is_literal = True direction = 'next' prompt = 'I-Search: ' def execute(self, w, **vargs): self.old_cursor = w.logical_cursor() self.old_window = w w.application.open_mini_buffer(self.prompt, lambda x: None, self, None, 'search') class ReverseSearch(Search): '''Interactive search; finds previous occurance of text in buffer''' direction = 'previous' class RegexSearch(Search): '''Interactive search; finds next occurance of regex in buffer''' is_literal = False prompt = 'I-RegexSearch: ' class RegexReverseSearch(RegexSearch): '''Interactive search; finds prevoius occurance of regex in buffer''' direction = 'previous' class Replace(Method): '''Replace occurances of string X with string Y''' is_literal = True args = [Argument('before', prompt="Replace String: ", default=default.last_replace_before, load_default=True), Argument('after', prompt="Replace With: ", default=default.last_replace_after, load_default=True)] def _execute(self, w, **vargs): a = w.application a.last_replace_before = self.before = vargs['before'] a.last_replace_after = self.after = vargs['after'] self.old_window = w a.open_mini_buffer('I-Replace: ', lambda x: None, self, None, 'replace') class RegexReplace(Method): '''Replace occurances of string X with string Y''' is_literal = False args = [Argument('before', prompt="Replace Regex: ", default=default.last_replace_before, load_default=True), Argument('after', prompt="Replace With: ", default=default.last_replace_after, load_default=True)] def _execute(self, w, **vargs): w.application.last_replace_before = self.before = vargs['before'] w.application.last_replace_after = self.after = vargs['after'] self.old_window = w f = lambda x: None w.application.open_mini_buffer('I-RegexReplace: ', f, self, None, 'replace') # navigating between buffers class OpenFile(Method): '''Open file in a new buffer, or go to file's open buffer''' args = [Argument('filename', datatype="path", prompt="Open File: ")] def _execute(self, w, **vargs): b = w.application.open_path(vargs['filename']) SwitchBuffer().execute(w, buffername=b.name()) class OpenAesFile(Method): '''Open AES encrypted file in a new buffer, or go to file's open buffer''' args = [Argument('filename', datatype="path", prompt="Open AES File: "), Argument('password', prompt="Use AES Password: ")] def _execute(self, w, **vargs): b = w.application.open_path(vargs['filename'], 'aes', vargs['password']) SwitchBuffer().execute(w, buffername=b.name()) return class ViewBufferParent(Method): def _execute(self, w, **vargs): b = w.buffer if not hasattr(b, 'path'): w.set_error('Buffer has no path') elif b.path == '/': w.set_error("Root directory has no parent") else: path = os.path.dirname(b.path) w.application.methods['open-file'].execute(w, filename=path) class SwitchBuffer(Method): '''Switch to a different''' args = [Argument('buffername', datatype="buffer", prompt="Switch To Buffer: ", default=default.last_buffer)] def _pre_execute(self, w, **vargs): a = w.application if len(a.bufferlist.buffers) < 1: raise Exception, "No other buffers" def _execute(self, w, **vargs): name = vargs['buffername'] buf = None if w.application.has_buffer_name(name): b = w.application.bufferlist.get_buffer_by_name(name) w.application.switch_buffer(b) else: w.application.set_error("buffer %r was not found" % name) class KillBuffer(Method): '''Close the current buffer''' force=False args = [Argument('buffername', datatype="buffer", prompt="Kill Buffer: ", default=default.current_buffer)] def _execute(self, w, **vargs): name = vargs['buffername'] a = w.application assert name in a.bufferlist.buffer_names, "Buffer %r does not exist" % name assert name != '*Scratch*', "Can't kill scratch buffer" self._to_kill = a.bufferlist.buffer_names[name] self._old_window = w if self.force or not self._to_kill.changed(): self._doit() else: self._prompt = "Buffer has unsaved changes; kill anyway? " a.open_mini_buffer(self._prompt, self._callback) def _doit(self): a = self._old_window.application b = self._to_kill if a.bufferlist.is_buffer_visible(b): a.bufferlist.set_slot(a.active_slot, a.bufferlist.hidden_buffers[0]) a.bufferlist.remove_buffer(b) b.close() def _callback(self, v): a = self._old_window.application if v == 'yes': self._doit() a.close_mini_buffer() elif v == 'no': a.close_mini_buffer() else: a.close_mini_buffer() a.set_error('Please type "yes" or "no"') class ForceKillBuffer(KillBuffer): force=True args = [Argument('buffername', datatype="buffer", prompt="Force Kill Buffer: ", default=default.current_buffer)] class ListBuffers(Method): '''List all open buffers in a new buffer''' def _execute(self, w, **vargs): bl = w.application.bufferlist bnames = [b.name() for b in bl.buffers] bnames.sort() data = '\n'.join(bnames) w.application.data_buffer("*Buffers*", data, switch_to=True) class SaveBufferAs(Method): '''Save the contents of a buffer to the specified path''' args = [Argument('path', datatype="path", prompt="Write file: ", default=default.current_working_dir, load_default=True)] def _execute(self, w, **vargs): curr_buffer = w.buffer curr_buffer_name = curr_buffer.name() data = curr_buffer.make_string() path = os.path.realpath(os.path.expanduser(vargs['path'])) w.application.set_error("got %r (%d)" % (path, len(data))) if w.application.has_buffer_name(path): w.application.set_error("buffer for %r is already open" % path) return w.application.file_buffer(path, data, switch_to=True) if curr_buffer_name != '*Scratch*': w.application.methods['kill-buffer'].execute(w, buffername=curr_buffer_name) else: curr_buffer.set_data('') w.application.set_error('Wrote %r' % path) class SaveBuffer(Method): '''Save the contents of a buffer''' def _execute(self, w, **vargs): if w.buffer.changed(): w.buffer.save() w.application.set_error("Wrote %s" % (w.buffer.path)) else: w.application.set_error("(No changes need to be saved)") class RelexBuffer(Method): '''Relex the buffer; this resets syntax highlighting''' def _execute(self, w, **vargs): h = w.get_highlighter() if h is None: w.application.set_error("No lexer for buffer.") else: h.highlight(w.buffer.lines) w.application.set_error("Buffer relexed.") class ToggleWindow(Method): '''Move between visible windows''' def _execute(self, w, **vargs): w.application.toggle_window() # complex text maniuplation class TransposeWords(Method): '''Switch the place of the two words nearest the cursor''' pass # you wanna quit right? class Exit(Method): '''Exit the program, unless there are unsaved changes''' def _execute(self, w, **vargs): a = w.application assert a.mini_buffer_is_open() is False, "Recursive minibuffer antics" changed = False for b in w.application.bufferlist.buffers: changed = b.changed() if changed: break if not changed: w.application.exit() return else: self._old_window = w self._prompt = "There are buffers with unsaved changes; exit anyway? " a.open_mini_buffer(self._prompt, self._callback) def _callback(self, v): a = self._old_window.application if v == 'yes': a.exit() a.close_mini_buffer() if v == 'no': return a.open_mini_buffer(self._prompt, self._callback) a.set_error('Please type "yes" or "no"') # insert text class InsertString(Method): _is_method = False def __init__(self, s): self.name = "insert-string-%s" % s self.args = [] self.help = "Insert %r into the current buffer." % s self.string = s def _execute(self, w, **vargs): w.insert_string_at_cursor(self.string) class OverwriteChar(Method): _is_method = False def __init__(self, c): self.name = 'overwrite-char-%s' % c self.args = [] self.help = "Overwrite %r into the current buffer." % c self.char = c def _execute(self, w, **vargs): w.overwrite_char_at_cursor(self.char) # killing/copying/etc. class Kill(Method): '''Kill the contents of the current line''' def _execute(self, w, **vargs): w.kill_line() class KillRegion(Method): '''Kill the region between the mark and the cursor''' def _execute(self, w, **vargs): w.kill_region() w.application.set_error("Region killed by %s" % self.name) class Copy(Method): '''Copy the contents of the current line''' def _execute(self, w, **vargs): w.copy_line() class CopyRegion(Method): '''Copy the region between the mark and the cursor''' def _execute(self, w, **vargs): w.copy_region() w.set_active_point(w.mark) w.application.set_error("Region copied") class Yank(Method): '''Paste the top item in the kill ring into the buffer''' def _execute(self, w, **vargs): if w.application.has_kill(): w.yank() else: w.application.set_error("Kill ring is empty") class ShowKill(Method): '''Display the top item in the kill ring''' def _execute(self, w, **vargs): if w.application.has_kill(): s = w.application.get_kill() x = w.application.x if len(s) > x - 40: s = s[:x - 40] + "..." w.application.set_error("Kill ring contains %r" % s) else: w.application.set_error("Kill ring is empty") class PopKill(Method): '''Pop the top item in the kill ring off''' def _execute(self, w, **vargs): if w.application.has_kill(): s = w.pop_kill() x = w.application.x if len(s) > x - 40: s = s[:x - 40] + "..." w.application.set_error("Removed %r from Kill ring" % s) else: w.application.set_error("Kill ring is empty") # delete class DeleteLeft(Method): '''Delete the character to the left of the cursor''' def _execute(self, w, **vargs): (x, y) = w.logical_cursor().xy() line = w.buffer.lines[y] tabwidth = w.mode.tabwidth if x >= tabwidth and x % tabwidth == 0 and line[0:x].isspace(): w.kill(Point(x - tabwidth, y), Point(x, y)) else: w.left_delete() class DeleteRight(Method): '''Delete the character under the cursor''' def _execute(self, w, **vargs): cursor = w.logical_cursor() line = w.buffer.lines[cursor.y] if len(line[cursor.x:]) >= 4 and line[:cursor.x + 4].isspace(): w.kill(Point(cursor.x, cursor.y), Point(cursor.x + 4, cursor.y)) else: w.right_delete() class DeleteLeftWord(Method): '''Delete the from the cursor left to the end of the word''' def _execute(self, w, **vargs): w.kill_left_word() class DeleteRightWord(Method): '''Delete the from under cursor right to the end of the word''' def _execute(self, w, **vargs): w.kill_right_word() class DeleteLeftWhitespace(Method): '''Delete all contiguous of whitespace left of the cursor''' def _execute(self, w, **vargs): c = w.logical_cursor() p = c l = w.point_left(p) if l is None: return while l is not None and w.point_char(l) in WHITESPACE: p = l l = w.point_left(p) if p < c: w.kill(p, c) class DeleteRightWhitespace(Method): '''Delete all contiguous of whitespace under and right of the cursor''' def _execute(self, w, **vargs): c = w.logical_cursor() p = c while w.point_char(p) in WHITESPACE: r = w.point_right(p) if r is None: break p = r if p > c: w.kill(c, p) # random stuff class DumpRegions(Method): '''debug region highlighting''' def _execute(self, w, **vargs): lines = [] for (w, p1, p2) in w.application.highlighted_ranges: lines.append("%r %s %s" % (w, p1, p2)) output = "\n".join(lines) w.application.data_buffer("region-dump", output, switch_to=True) class DumpMarkers(Method): '''Dump all tab markers (tab debugging)''' def _execute(self, w, **vargs): lines = [] if w.mode.tabber: keys = w.mode.tabber.lines.keys() keys.sort() for i in keys: line = w.mode.tabber.lines[i] lines.append("LINE %d: %r" % (i, line)) lines.append(" %s" % repr(w.mode.tabber.record[i])) else: lines.append("no tokens") output = "\n".join(lines) w.application.data_buffer("marker-dump", output, switch_to=True) class DumpTokens(Method): '''Dump all lexical tokens (syntax highlighting debugging)''' def _execute(self, w, **vargs): modename = w.mode.name() lines = [] if modename in w.buffer.highlights: tokens = w.buffer.highlights[modename].tokens for i in range(0, len(tokens)): lines.append("LINE %d" % i) group = tokens[i] for token in group: fqname = token.fqname() p1 = Point(token.x, token.y) if token.parent is None: pcoord = '' else: pcoord = '[%d, %d]' % (token.parent.x, token.parent.y) if fqname in w.mode.ghist and p1 in w.mode.ghist[fqname]: g = '[' + w.mode.ghist[fqname][p1].name() + ']' else: g = '' fields = (str(p1), pcoord, token.fqname(), g, token.string) lines.append(' %-10s %-10s %-20s %-10s %r' % fields) else: lines.append("no tokens") output = "\n".join(lines) w.application.data_buffer("token-dump", output, switch_to=True) class MetaX(Method): '''Invoke commands by name''' args = [Argument('method', datatype="method", prompt="M-x ")] def _execute(self, w, **vargs): name = vargs['method'] if name in w.application.methods: w.application.methods[name].execute(w) else: w.application.set_error('no method named %r found' % name) class ToggleMargins(Method): '''Show or hide column margins''' def _execute(self, w, **vargs): w.margins_visible = not w.margins_visible class CenterView(Method): '''Move view to center on cursor''' def _execute(self, w, **vargs): w.center_view() class SetMark(Method): '''Set the mark to the current cursor location''' def _execute(self, w, **vargs): w.set_mark() class SwitchMark(Method): '''Switch the mark and the cursor locations''' def _execute(self, w, **vargs): w.switch_mark() # insertion methods class InsertNewline(Method): '''Insert newline into buffer at the cursor''' def _execute(self, w, **vargs): w.insert_string_at_cursor('\n') class InsertSpace(Method): '''Insert space into buffer at the cursor''' def _execute(self, w, **vargs): w.insert_string_at_cursor(' ') class GetIndentionLevel(Method): '''Calculate the indention level for this line''' def _execute(self, w, **vargs): cursor = w.logical_cursor() if not w.mode.tabber: w.application.set_error('No tabber available') return else: i = w.mode.tabber.get_level(cursor.y) w.application.set_error('Indention level: %r' % i) class InsertTab(Method): '''Insert tab into buffer, or tabbify line, depending on mode''' def _execute(self, w, **vargs): cursor = w.logical_cursor() if w.mode.tabber: i = w.mode.tabber.get_level(cursor.y) else: i = None if i is None: #w.insert_string_at_cursor(' ') w.insert_string_at_cursor(' ' * w.mode.tabwidth) else: j = w.buffer.count_leading_whitespace(cursor.y) if i != j: KillWhitespace().execute(w) w.insert_string(Point(0, cursor.y), ' ' * i) else: w.goto(Point(j, cursor.y)) class KillWhitespace(Method): '''Delete leading whitespace on current line''' def _execute(self, w, **vargs): cursor = w.logical_cursor() i = w.buffer.count_leading_whitespace(cursor.y) if i > 0: w.kill(Point(0, cursor.y), Point(i, cursor.y)) # tabification class TabBuffer(Method): '''Tabbify every line in the current buffer''' def _execute(self, w, **vargs): y = w.logical_cursor().y it = InsertTab() for i in range(0, len(w.buffer.lines)): w.goto_line(i) it.execute(w) w.goto_line(y) # commenting class CommentRegion(Method): '''Prepend a comment to every line in the current buffer''' def _execute(self, w, **vargs): cursor = w.logical_cursor() if cursor < w.mark: p1 = cursor p2 = w.mark elif w.mark < cursor: p1 = w.mark p2 = cursor else: w.input_line = "Empty kill region" return for y in range(p1.y, p2.y): w.buffer.insert_string(Point(0, y), "#") class UncommentRegion(Method): '''Remove a comment from every line in the current buffer''' def _execute(self, w, **vargs): cursor = w.logical_cursor() if cursor < w.mark: p1 = cursor p2 = w.mark elif w.mark < cursor: p1 = w.mark p2 = cursor else: w.input_line = "Empty kill region" return for y in range(p1.y, p2.y): if w.buffer.lines[y].startswith("#"): w.buffer.delete(Point(0, y), Point(1, y)) # wrapping/justifying/etc class WrapLine(Method): '''Wrap the current line at 80 characters by word''' limit = 80 prefix_re = None def _execute(self, w, **vargs): cursor = w.logical_cursor() old_cursor = cursor i = cursor.y move_old_cursor = old_cursor.x > self.limit while len(w.buffer.lines[i]) > self.limit: if ' ' in w.buffer.lines[i][:self.limit]: j = w.buffer.lines[i][:self.limit].rindex(' ') elif ' ' in w.buffer.lines[i][self.limit:]: j = w.buffer.lines[i][self.limit:].index(' ') else: break if move_old_cursor: move_old_cursor = False old_cursor.x -= j + 1 old_cursor.y += 1 w.goto(Point(j, i)) w.right_delete() w.insert_string_at_cursor('\n') i += 1 l = len(w.buffer.lines[old_cursor.y]) if l > old_cursor.x: w.goto(old_cursor) else: w.goto(Point(l, old_cursor.y)) class WrapParagraph(Method): limit = 80 wrapper = WrapLine empty_re = regex.whitespace prefix_re = None def _execute(self, w, **vargs): old_cursor = w.logical_cursor() i = old_cursor.y while i < len(w.buffer.lines) - 1: if i < len(w.buffer.lines) and \ self.empty_re.match(w.buffer.lines[i + 1]): break EndOfLine().execute(w) InsertSpace().execute(w) DeleteRightWhitespace().execute(w) w.goto(old_cursor) self.wrapper().execute(w) class JustifyRight(Method): '''Justify text with the previous line right from the cursor by whitespace''' def _execute(self, w, **vargs): DeleteLeftWhitespace().execute(w) cursor = w.logical_cursor() prev_line = w.buffer.lines[cursor.y-1] this_line = w.buffer.lines[cursor.y] if cursor.y <= 0: return if cursor.x >= len(prev_line): return i = cursor.x while prev_line[i] != ' ': i += 1 if i >= len(prev_line): return while prev_line[i] == ' ': i += 1 if i >= len(prev_line): return s = ' ' * (i - cursor.x) w.insert_string_at_cursor(s) class JustifyLeft(Method): '''Justify text with the previous line left from the cursor by whitespace''' def _execute(self, w, **vargs): DeleteRightWhitespace().execute(w) cursor = w.logical_cursor() prev_line = w.buffer.lines[cursor.y-1] this_line = w.buffer.lines[cursor.y] if cursor.y <= 0: return if cursor.x <= 0: return i = cursor.x while i >= len(prev_line): i -= 1 if i <= 0: return if this_line[i] != ' ': return while prev_line[i] != ' ': i -= 1 if i <= 0: return if this_line[i] != ' ': return while prev_line[i] == ' ': i -= 1 if i >= len(prev_line): return if this_line[i] != ' ': return w.buffer.delete(Point(i, cursor.y), cursor) # undo/redo class Undo(Method): '''Undo last action''' def _execute(self, w, **vargs): try: w.undo() except Exception, e: w.application.set_error("%s" % (e)) class Redo(Method): '''Redo last undone action''' def _execute(self, w, **vargs): try: w.redo() except Exception, e: w.application.set_error("%s" % (e)) # w navigation methods class StartOfLine(Method): '''Move the cursor to the start of the current line''' def _execute(self, w, **vargs): w.start_of_line() class EndOfLine(Method): '''Move the cursor to the end of the current line''' def _execute(self, w, **vargs): w.end_of_line() class Forward(Method): '''Move the cursor right one character''' def _execute(self, w, **vargs): w.forward() class Backward(Method): '''Move the cursor left one character''' def _execute(self, w, **vargs): w.backward() class NextLine(Method): '''Move the cursor down one line''' def _execute(self, w, **vargs): w.next_line() class PreviousLine(Method): '''Move the cursor up one line''' def _execute(self, w, **vargs): w.previous_line() class PageUp(Method): '''Move the cursor up one page''' def _execute(self, w, **vargs): w.page_up() class PageDown(Method): '''Move the cursor down one page''' def _execute(self, w, **vargs): w.page_down() class GotoBeginning(Method): '''Move the cursor to the beginning of the buffer''' def _execute(self, w, **vargs): w.goto_beginning() class GotoEnd(Method): '''Move the cursor to the end of the buffer''' def _execute(self, w, **vargs): w.goto_end() class RightWord(Method): '''Move the cursor to the start of the word to the right''' def _execute(self, w, **vargs): w.right_word() class LeftWord(Method): '''Move the cursor to the start of the word to the left''' def _execute(self, w, **vargs): w.left_word() class NextSection(Method): '''Move the cursor to the next section''' def _execute(self, w, **vargs): cursor = w.logical_cursor() i = cursor.y + 1 seen_null_line = False while i < len(w.buffer.lines): if seen_null_line: w.goto_line(i) break seen_null_line = regex.whitespace.match(w.buffer.lines[i]) i += 1 class PreviousSection(Method): '''Move the cursor to the previous section''' def _execute(self, w, **vargs): cursor = w.logical_cursor() i = cursor.y - 1 seen_null_line = False while i >= 0: if seen_null_line: w.goto_line(i) break seen_null_line = regex.whitespace.match(w.buffer.lines[i]) i -= 1 class UnindentBlock(Method): '''Prepend 4 spaces to each line in region''' def _execute(self, w, **vargs): cursor = w.logical_cursor() if cursor < w.mark: p1 = cursor p2 = w.mark elif w.mark < cursor: p1 = w.mark p2 = cursor else: w.input_line = "Empty kill region" return lines = w.buffer.lines[p1.y:p2.y] for i in range(0, len(lines)): if lines[i].startswith(' '): lines[i] = lines[i][4:] w.buffer.delete(Point(0, p1.y), Point(0, p2.y)) w.buffer.insert_string(Point(0, p1.y), '\n'.join(lines) + '\n') class IndentBlock(Method): '''Add 4 spaces to each line in region''' def _execute(self, w, **vargs): cursor = w.logical_cursor() if cursor < w.mark: p1 = cursor p2 = w.mark elif w.mark < cursor: p1 = w.mark p2 = cursor else: w.input_line = "Empty kill region" return lines = w.buffer.lines[p1.y:p2.y] tstr = ' ' * w.mode.tabwidth for i in range(0, len(lines)): lines[i] = tstr + lines[i] w.buffer.delete(Point(0, p1.y), Point(0, p2.y)) w.buffer.insert_string(Point(0, p1.y), '\n'.join(lines) + '\n') class CodeComplete(Method): '''Complete based on tokenized strings''' def execute(self, w, **vargs): cursor = w.logical_cursor() if len(w.buffer.lines[cursor.y]) == 0: return bounds = w.get_word_bounds() if bounds is None: return (p1, p2) = bounds buffer = w.buffer word = buffer.get_substring(p1, p2) app = w.application highlighter = buffer.highlights[w.mode.name()] tokens = highlighter.tokens seen = {} sofar = None for group in tokens: for token in group: s = token.string if s == word: continue elif s.startswith(word): seen[s] = True if sofar is None: sofar = s else: l = min(len(s), len(sofar)) i = len(word) while i < l: if s[i] == sofar[i]: i += 1 else: break sofar = s[:i] seen_keys = seen.keys() num_seen = len(seen_keys) if word == sofar: w.application.set_error('No completion possible: %r' % word) pass elif sofar: w.buffer.delete(p1, p2) w.buffer.insert_string(p1, sofar) if num_seen == 1: w.application.set_error('Unique!') else: w.application.set_error('Ambiguous: %r' % seen_keys) else: w.application.set_error('No completion found: %r' % word) pass class OpenConsole(Method): '''Evaluate python expressions (for advanced use and debugging only)''' def execute(self, w, **vargs): a = w.application if not a.has_buffer_name('*Console*'): a.add_buffer(buffer2.ConsoleBuffer()) b = a.bufferlist.get_buffer_by_name('*Console*') if a.window().buffer is not b: a.switch_buffer(b) f = lambda x: None w.application.open_mini_buffer('>>> ', f, self, None, 'consolemini') class ShellCmd(Method): '''Run a command in a shell and put the output in a new buffer''' args = [Argument("cmd", type=type(""), prompt="$ ", datatype='shell')] def _execute(self, w, **vargs): cmd = "PBUF='%s'; %s" % (w.buffer.name(), vargs['cmd']) (status, data) = commands.getstatusoutput(cmd) if status == 0: mesg = 'ok' else: mesg = 'error' data += "\nprocess exited with status %d (%s)" % (status, mesg) w.application.data_buffer("*Shell*", data, switch_to=True) class FileDiff(Method): '''diff the buffer's contents with the given file''' args = [Argument("path", type=type(""), prompt="Filename: ", datatype='path')] def _execute(self, w, **vargs): cmd = ("/usr/bin/diff", '-u', '-', vargs['path']) pipe = popen2.Popen3(cmd, capturestderr=True) pid = pipe.pid indata = w.buffer.make_string() pipe.tochild.write(indata) pipe.tochild.close() outdata = pipe.fromchild.read() errdata = pipe.childerr.read() status = pipe.wait() >> 8 if status == 0: w.application.set_error("No difference found") elif status == 1: w.application.data_buffer("*Diff*", outdata, switch_to=True, modename='diff') w.application.set_error("Differences were found") else: w.application.data_buffer("*Diff*", errdata, switch_to=True) w.application.set_error("There was an error: %d exited with status %s" % (pid, status)) class SvnDiff(Method): '''diff the current file with the version in SVN''' def _execute(self, w, **vargs): if not hasattr(w.buffer, 'path'): w.application.set_error("Buffer has no corresponding file") return cmd = "svn diff %r" % w.buffer.path (status, data) = commands.getstatusoutput(cmd) if status == 0: if data: w.application.data_buffer("*Diff*", data, switch_to=True, modename='diff') w.application.set_error("Differences were found") else: w.application.set_error("No difference found") else: w.application.set_error("There was an error (%s)" % (status)) class SvnBlame(Method): '''show blame output for the current version in SVN''' line_re = re.compile('^ *(\d+) *([a-zA-Z0-9_]+) *([-0-9]+) *([:0-9]+) *(-\d{4}) *\(([^\)]+)\) (.*)$') def _execute(self, w, **vargs): if not hasattr(w.buffer, 'path'): w.application.set_error("Buffer has no corresponding file") return cmd = ("/usr/bin/svn", 'blame', '-v', w.buffer.path) pipe = popen2.Popen3(cmd) lines = [] for line in pipe.fromchild: m = self.line_re.match(line) if not m: raise Exception, line (rev, user, date, t, tz, vdate, content) = m.groups() lines.append("%-4s %-10s %10s %s\n" % (rev, user, date, content)) data = ''.join(lines) status = pipe.wait() >> 8 if status == 0: w.application.data_buffer("*Blame*", data, switch_to=True, modename='blame') else: w.application.set_error("There was an error (%s)" % (status)) class CvsStatus(Method): regex1 = re.compile('^File: (.+?) +?\tStatus: (.*?)$') regex2 = re.compile('^ Working revision:\t([0-9\.]+)$') regex3 = re.compile('^ Repository revision:\t([0-9\.]+)\t(.*)$') regex4 = re.compile('^ Sticky Tag:\t\t\((.*)\)$') regex5 = re.compile('^ Sticky Date:\t\t\((.*)\)$') regex6 = re.compile('^ Sticky Options:\t\((.*)\)$') def _execute(self, w, **vargs): if not hasattr(w.buffer, 'path'): w.application.set_error("Buffer has no corresponding file") return cwd = os.getcwd() + os.path.sep path = w.buffer.path if path.startswith(cwd): path = path[len(cwd):] cmd = "cvs status %r" % path (status, data) = commands.getstatusoutput(cmd) status = status >> 8 if status != 0: w.application.set_error("Problems with CVS status: %d" % status) return lines = data.split('\n') if lines[0].startswith('cvs status: nothing known about '): w.application.set_error('File is not under CVS control') return m = self.regex1.match(lines[1]) assert m, "regex1 %r" % lines[1] ffile = m.group(1) fstatus = m.group(2) m = self.regex2.match(lines[3]) assert m, "regex2 %r" % lines[3] wrev = m.group(1) m = self.regex3.match(lines[4]) assert m, "regex3 %r" % lines[4] rrev = m.group(1) rpath = m.group(2) m = self.regex4.match(lines[5]) assert m, "regex4 %r" % lines[5] stag = m.group(1) m = self.regex5.match(lines[6]) assert m, "regex5 %r" % lines[6] sdate = m.group(1) m = self.regex6.match(lines[7]) assert m, "regex6 %r" % lines[7] soptions = m.group(1) w.application.set_error('%s %s %s/%s [%s|%s|%s]' % (ffile, fstatus, wrev, rrev, stag, sdate, soptions)) class CvsDiff(Method): '''diff the current file with the version in CVS''' def _execute(self, w, **vargs): if not hasattr(w.buffer, 'path'): w.application.set_error("Buffer has no corresponding file") return cwd = os.getcwd() + os.path.sep path = w.buffer.path if path.startswith(cwd): path = path[len(cwd):] cmd = "cvs diff -u %r" % path (status, data) = commands.getstatusoutput(cmd) status = status >> 8 if status == 0: w.application.set_error("No difference found") else: w.application.data_buffer("*Diff*", data, switch_to=True, modename='diff') w.application.set_error("Differences were found") class CvsDiff2(Method): '''diff the current file with the version in CVS''' rev_regex = re.compile('^[0-9]+\.[0-9]+$') args = [Argument("revision", type=type(""), prompt="Old Revision: ")] def _execute(self, w, **vargs): if not hasattr(w.buffer, 'path'): w.application.set_error("Buffer has no corresponding file") return rev = vargs['revision'] if not self.rev_regex.match(rev): w.application.set_error("Could not parse revision: %r" % rev) return cwd = os.getcwd() + os.path.sep path = w.buffer.path if path.startswith(cwd): path = path[len(cwd):] cmd = "cvs diff -r %s -u %r" % (rev, path) (status, data) = commands.getstatusoutput(cmd) status = status >> 8 if status == 0: w.application.set_error("No difference found") else: w.application.data_buffer("*Diff*", data, switch_to=True, modename='diff') w.application.set_error("Differences were found") class CvsDiff3(Method): '''diff the current file with the version in CVS''' rev_regex = re.compile('^[0-9]+\.[0-9]+$') args = [Argument("revision1", type=type(""), prompt="Old Revision: "), Argument("revision2", type=type(""), prompt="New Revision: ")] def _execute(self, w, **vargs): if not hasattr(w.buffer, 'path'): w.application.set_error("Buffer has no corresponding file") return rev1 = vargs['revision1'] if not self.rev_regex.match(rev1): w.application.set_error("Could not parse revision1: %r" % rev) return rev2 = vargs['revision2'] if not self.rev_regex.match(rev2): w.application.set_error("Could not parse revision2: %r" % rev) return cwd = os.getcwd() + os.path.sep path = w.buffer.path if path.startswith(cwd): path = path[len(cwd):] cmd = "cvs diff -r %s -r %s -u %r" % (rev1, rev2, path) (status, data) = commands.getstatusoutput(cmd) status = status >> 8 if status == 0: w.application.set_error("No difference found") else: w.application.data_buffer("*Diff*", data, switch_to=True, modename='diff') w.application.set_error("Differences were found") class CvsBlame(Method): '''show blame output for the current version in SVN''' line_re = re.compile('^([0-9.]+) +\(*([a-zA-Z0-9_]+) +([-0-9A-Za-z]+)\): (.*)$') def _execute(self, w, **vargs): if not hasattr(w.buffer, 'path'): w.application.set_error("Buffer has no corresponding file") return cwd = os.getcwd() + os.path.sep path = w.buffer.path if path.startswith(cwd): path = path[len(cwd):] cmd = ("/usr/bin/cvs", 'annotate', path) pipe = popen2.Popen3(cmd, capturestderr=True) tokens = [] max_rev = 0 max_user = 0 for line in pipe.fromchild: m = self.line_re.match(line) if not m: raise Exception, line (rev, user, date, content) = m.groups() max_rev = max(max_rev, len(rev)) max_user = max(max_user, len(user)) tokens.append((rev, user, date, content)) lines = [] fmt = "%%-%ds %%-%ds %%9s %%s\n" % (max_rev, max_user) for (rev, user, date, content) in tokens: lines.append(fmt % (rev, user, date, content)) data = ''.join(lines) status = pipe.wait() >> 8 if status == 0: w.application.data_buffer("*Blame*", data, switch_to=True, modename='blame') else: w.application.set_error("There was an error (%s)" % (status)) class ShowBindingsBuffer(Method): '''Dump all keybindings for current mode into a new buffer''' def _execute(self, w, **vargs): lines = [] mode_name = w.mode.name() lines.append('Key bindings for mode %r:' % (mode_name)) lines.append('') names_to_sequences = {} seq_len = len('BINDINGS') name_len = len('ACTION') for seq in w.mode.bindings: name = w.mode.bindings[seq] if name.startswith('insert-string-'): # we aren't going to show all the generic keypress actions continue # determine this for formatting seq_len = max(seq_len, len(seq)) name_len = max(name_len, len(name)) # set up our new data structure names_to_sequences.setdefault(name, []) names_to_sequences[name].append(seq) # generate the format string (note the 'meta formatting') format_str = '%%-%ds %%-%ds %%s' % (seq_len, name_len) lines.append(format_str % ('BINDINGS', 'ACTIONS', 'HELP')) names = names_to_sequences.keys() names.sort() for name in names: sequences = names_to_sequences[name] sequences.sort() seq = sequences[0] help = w.application.methods[name].help if help is None: help = '' lines.append(format_str % (seq, name, help)) for seq2 in sequences[1:]: lines.append(format_str % (seq2, '', '')) data = '\n'.join(lines) w.application.data_buffer("*Bindings-Help*", data, switch_to=True) class CmdHelpBuffer(Method): '''Get help with the specified command''' args = [Argument('method', datatype="method", prompt="Help for command: ")] def _execute(self, w, **vargs): lines = [] name = vargs['method'] if name not in w.application.methods: err = "No command called %r in mode %r" % (name, w.mode.name) raise Exception, err m = w.application.methods[name] lines.append('HELP FOR %r' % name) lines.append('') # sequences sequences = [] for seq in w.mode.bindings: if w.mode.bindings[seq] == name: sequences.append(seq) sequences.sort() lines.append('Keys bound to this command:') for seq in sequences: lines.append(' %s' % (seq)) lines.append('') # arguments if m.args: lines.append('Arguments for this command:') for arg in m.args: if arg.datatype is None: if arg.type == type(""): t = 'str' elif arg.type == type(0): t = 'int' elif arg.type == type(0.0): t = 'float' else: t = 'str' else: t = arg.datatype if arg.help: lines.append(' %s %r: %s' % (t, arg.name, arg.help)) else: lines.append(' %s %r' % (t, arg.name)) lines.append('') # help text lines.append('Help text for this command:') h = m.help if not h: h = 'No help available' lines.append(' %s' % h) data = '\n'.join(lines) w.application.data_buffer("*Command-Help*", data, switch_to=True) class SetMode(Method): '''Set the mode of the current buffer''' args = [Argument('mode', datatype='mode', prompt="Enter new mode: ")] def _execute(self, w, **vargs): mode_name = vargs['mode'] m = w.application.modes[mode_name](w) w.set_mode(m) w.application.set_error('Set mode to %r' % (mode_name)) class WhichCommand(Method): '''Display which command is run for a given key-sequence''' def _execute(self, w, **vargs): self.old_window = w w.application.open_mini_buffer('Enter a key sequence to be explained: ', lambda x: None, self, None, 'which') class Cancel(Method): '''Cancel command in-progress, and return to the main buffer''' def execute(self, w, **vargs): w.application.close_mini_buffer() w.application.set_error('Cancel') class SplitWindow(Method): '''Split the main window horizontally into upper and lower windows''' def execute(self, w, **vargs): a = w.application a.add_slot() if not w.cursor_is_visible(): p = w.first w.goto(p) n = len(a.bufferlist.slots) a.set_error('Window has been split into %d windows!' % n) class UnsplitWindow(Method): '''Maximize the current window to fill the screen''' def execute(self, w, **vargs): w.application.single_slot() w.application.set_error('Window has been unsplit back to one window!') class SomethingCrazy(Method): def _execute(self, w, **vargs): pass class CloseTag(Method): mytag = ')' def _execute(self, w, **vargs): # first, de-reference some variables and actually do the insertion # NOTE: we derence the cursor *before* inserting the character, so it is # expecected that the cursor variable should be the point the new # character is on. (x, y) = w.logical_cursor().xy() w.insert_string_at_cursor(self.mytag) app = w.application buffer = w.buffer highlighter = buffer.highlights[w.mode.name()] tokens = highlighter.tokens # REFACTOR: we have methods in window to do this now i = 0 while i < len(tokens[y]): token = tokens[y][i] if token.x == x and token.string == self.mytag: break elif token.x <= x and token.end_x() > x: return i += 1 if i >= len(tokens[y]): return tag_stack = [] while y >= 0: while i >= 0 and i < len(tokens[y]): token = tokens[y][i] n = token.name s = token.string if n in w.mode.closetokens and s in w.mode.closetags: tag_stack.append(s) elif n in w.mode.opentokens and s in w.mode.opentags: if tag_stack[-1] == w.mode.opentags[s]: del tag_stack[-1] else: app.set_error("tag mismatch; got %r expected %r" % (s, w.mode.closetags[tag_stack[-1]])) return if len(tag_stack) == 0: p = Point(token.x, y) s = w.buffer.lines[p.y][:p.x+1] if len(s) > 60: s = "..." + s[-60:] msg = 'matches %r' % s w.set_active_point(p, msg) return i -= 1 y -= 1 i = len(tokens[y]) - 1 class CloseParen(CloseTag): mytag = ')' class CloseBrace(CloseTag): mytag = '}' class CloseBracket(CloseTag): mytag = ']' class GetToken(Method): def _execute(self, w, **vargs): token = w.get_token() if token is None: w.application.set_error('No Token') else: w.application.set_error('Token: %r' % token.string) class RegisterSave(Method): MAX_TXT = 30 MAX_REG = 20 '''Save the top item of the kill stack into the named register''' args = [Argument('name', datatype="str", prompt="Register name: ")] def _pre_execute(self, w, **vargs): if not w.has_kill(): raise MethodError, "No text on the kill stack" def _execute(self, w, **vargs): name = vargs['name'] text = w.get_kill() w.application.registers[name] = text if len(name) > self.MAX_REG: name = name[:self.MAX_REG] + '...' if len(text) > self.MAX_TXT: text = text[:self.MAX_TXT] + '...' w.set_error('Saved %r into register %r' % (text, name)) class RegisterRestore(Method): MAX_TXT = 30 MAX_REG = 18 '''Push the value saved in the named register onto the kill stack''' args = [Argument('name', datatype="str", prompt="Register name: ")] def _execute(self, w, **vargs): name = vargs['name'] if name not in w.application.registers: w.set_error('Register %r does not exist' % name) return text = app.registers[name] w.push_kill(text) if len(text) > self.MAX_TXT: text = text[0:self.MAX_TXT] + '...' if len(name) > self.MAX_REG: name = name[0:self.MAX_REG] + '...' w.set_error('Restored %r from register %r' % (text2, name2)) class Pipe(Method): '''Pipe the buffer's contents through the command, and display the output in a new buffer''' args = [Argument('cmd', datatype="str", prompt="Command: ")] def _parse(self, w, **vargs): m = regex.shell_command.match(vargs['cmd']) if m: prog = m.group(0) return (prog, vargs['cmd']) else: return (None, None) def _execute(self, w, **vargs): (prog, cmd) = self._parse(w, **vargs) if prog is None: return pipe = popen2.Popen4(cmd) pid = pipe.pid indata = w.buffer.make_string() pipe.tochild.write(indata) pipe.tochild.close() outdata = pipe.fromchild.read() status = pipe.wait() >> 8 bufname = '*%s*' % self.name.title() w.application.data_buffer(bufname, outdata, switch_to=True) w.set_error("%s exited with status %d" % (prog, status)) class Grep(Pipe): '''Grep the buffer's contents for instances of a pattern, and display them in a new buffer''' args = [Argument('pattern', datatype="str", prompt="Pattern: ")] def _parse(self, w, **vargs): return ('grep', ('/usr/bin/grep', '-E', '-n', vargs['pattern'])) class Exec(Method): args = [Argument('cmd', datatype="str", prompt="Exec: ")] def _doit(self, w, path, cmd): try: cmd = cmd % path except: w.set_error("Malformed command: %r" % cmd) return (status, output) = commands.getstatusoutput(cmd) bufname = '*%s*' % self.name.title() w.application.data_buffer(bufname, output, switch_to=True) w.set_error("Shell exited with %d" % status) def _execute(self, w, **vargs): if w.buffer.btype == 'dir': name = dirutil.resolve_name(w) path = dirutil.resolve_path(w) self._doit(w, path, vargs['cmd']) dirutil.find_name(w, name) elif hasattr(w.buffer, 'path'): path = w.buffer.path self._doit(w, path, vargs['cmd']) else: w.set_error("Don't know how to exec: %r" % w.buffer) return class TokenComplete(Method): '''Complete token names based on other tokens in the buffer''' name_overrides = {} def _min_completion(self, w, t): h = w.get_highlighter() minlen = None if t.name in self.name_overrides: ok = name_overrides[t.name] else: ok = (t.name,) strings = {} for line in h.tokens: for t2 in line: if t2 is t: continue elif False and t2.name not in ok: continue elif t2.string.startswith(t.string): strings[t2.string] = 1 if minlen is None: minlen = len(t2.string) else: minlen = min(minlen, len(t2.string)) strings = strings.keys() if not strings: return ([], t.string) i = len(t.string) while i < minlen: c = strings[0][i] for s in strings: if s[i] != c: return (strings, strings[0][:i]) i += 1 return (strings, strings[0][:minlen]) def _execute(self, w, **vargs): t = w.get_token2() if t is None: w.set_error("No token to complete!") return elif regex.reserved_token_names.match(t.name): w.set_error("Will not complete reserved token") return (candidates, result) = self._min_completion(w, t) if candidates: p1 = Point(t.x, t.y) p2 = Point(t.end_x(), t.y) w.buffer.delete(p1, p2) w.insert_string(p1, result) if not candidates: w.set_error("No completion: %r" % result) elif len(candidates) == 1: w.set_error("Unique completion: %r" % result) elif result in candidates: w.set_error("Ambiguous completion: %r" % candidates) else: w.set_error("Partial completion: %r" % candidates)