import md5, os, sets, shutil import aes, point, method, regex # set this to 0 or less to have infinite undo/redo REDO_STACK_LIMIT = 1024 UNDO_STACK_LIMIT = 1024 # abstract class class Buffer(object): def __init__(self, nl='\n'): self.lines = [""] self.windows = {} self.undo_stack = [] self.redo_stack = [] assert nl in ('\n', '\r', '\r\n'), "Invalid line ending" self.nl = nl self.modified = False def num_chars(self): n = 0 for line in self.lines[:-1]: n += len(line) + 1 n += len(self.lines[-1]) return n # basic file operation stuff def _open_file_r(self, path): path = os.path.realpath(path) if not os.path.isfile(path): raise Exception, "Path '%s' does not exist" % (path) if not os.access(path, os.R_OK): raise Exception, "Path '%s' cannot be read" % (path) f = open(path, 'r') return f def _open_file_w(self, path): if os.path.isfile(path): raise Exception, "Path '%s' already exists" % (path) d = os.path.dirname(path) if not os.access(d, os.R_OK): raise Exception, "Dir '%s' cannot be read" % (path) if not os.access(d, os.W_OK): raise Exception, "Dir '%s' cannot be written" % (path) f = open(path, 'w') return f def _temp_path(self, path): (dirname, basename) = os.path.split(path) return os.path.join(dirname, ".__%s__pmacs" % (basename)) # undo/redo stack def add_to_stack(self, move, stack="undo"): if stack == "undo": self.redo_stack = [] self.undo_stack.append(move) if UNDO_STACK_LIMIT > 0: while len(self.undo_stack) > UNDO_STACK_LIMIT: self.undo_stack.pop(0) elif stack == "redo": self.redo_stack.append(move) if REDO_STACK_LIMIT > 0: while len(self.redo_stack) > REDO_STACK_LIMIT: self.redo_stack.pop(0) elif stack == "none": self.undo_stack.append(move) if UNDO_STACK_LIMIT > 0: while len(self.undo_stack) > UNDO_STACK_LIMIT: self.undo_stack.pop(0) else: raise Exception, "Invalid stack to add to: %s" % (stack) def restore_move(self, move, stack="redo"): if move[0] == "insert": self.insert_string(move[1], move[2], stack=stack) elif move[0] == "delete": self.delete_string(move[1], move[2], stack=stack) else: raise Exception, "Invalid undo move type: '%s'" % (move[0]) def undo(self): if len(self.undo_stack): move = self.undo_stack.pop(-1) self.restore_move(move, stack="redo") else: raise Exception, "Nothing to Undo!" def redo(self): if len(self.redo_stack): move = self.redo_stack.pop(-1) self.restore_move(move, stack="none") else: raise Exception, "Nothing to Redo!" # window-buffer communication def add_window(self, w, name): assert name not in self.windows, "window %r already exists" % name self.windows[name] = w def remove_window(self, name): del self.windows[name] def _region_added(self, p, xdiff, ydiff, str=None, stack="undo"): y = p.y + ydiff if ydiff == 0: x = p.x + xdiff else: x = xdiff p2 = point.Point(x, y) move = ["delete", p, p2, str] self.add_to_stack(move, stack) for w in self.windows.itervalues(): w._region_added(p, xdiff, ydiff, str) def _region_removed(self, p1, p2, str=None, stack="undo"): move = ["insert", p1, str] self.add_to_stack(move, stack) for w in self.windows.itervalues(): w._region_removed(p1, p2, str) def has_window(self, name): return name in self.windows def get_window(self, name): if name in self.windows: return self.windows[name] else: raise Exception, "uh oh %r" % self.windows # internal validation def _validate_point(self, p): self._validate_xy(p.x, p.y) def _validate_xy(self, x, y): assert y >= 0 and y < len(self.lines), \ "xy1: %d >= 0 and %d < %d" % (y, y, len(self.lines)) assert x >= 0 and x <= len(self.lines[y]), \ "xy2: %d >= 0 and %d <= %d" % (x, x, len(self.lines[y])) def _validate_y(self, y): assert y >= 0 and y < len(self.lines), \ "y: %d >= 0 and %d < %d" % (y, y, len(self.lines)) # internal def make_string(self, start=0, end=None, nl='\n'): assert end is None or start < end if start == 0 and end is None: return nl.join(self.lines) else: lines = [] i = 0 offset = 0 while i < len(self.lines): l = self.lines[i] if offset + len(l) < start: pass elif offset <= start: if end is None or offset + len(l) < end: lines.append(l[start - offset:]) else: lines.append(l[start - offset:end - offset]) elif end is None or offset + len(l) < end: lines.append(l) else: lines.append(l[:end]) offset += len(l) + 1 i += 1 return nl.join(lines) # methods to be overridden by subclasses def name(self): return "Generic" def close(self): pass def open(self): pass def changed(self): return self.modified def reload(self): raise Exception, "%s reload: Unimplemented" % (self.name()) def save_as(self, path, force=False): # check to see if the path exists, and if we're prepared to overwrite it # if yes to both, get its mode so we can preserve the path's permissions mode = None if os.path.exists(path): if force: mode = os.stat(self.path)[0] else: raise Exception, "oh no! %r already exists" % path # create the string that we're going to write into the file data = self.write_filter(self.make_string(nl=self.nl)) # create a safe temporary path to write to, and write out data to it temp_path = self._temp_path() f2 = self._open_file_w(temp_path) f2.write(data) f2.close() # move the temporary file to the actual path; maybe change permissions shutil.move(temp_path, path) if mode: os.chmod(path, mode) # the file has not been modified now self.modified = False def readonly(self): return False def read_filter(self, data): return data def write_filter(self, data): return data # point retrieval def get_buffer_start(self): return point.Point(0, 0, "logical") def get_buffer_end(self): y = len(self.lines) - 1 return point.Point(len(self.lines[y]), y, "logical") def get_line_start(self, y): self._validate_y(y) return Point(0, y, "logical") def get_line_end(self, y): self._validate_y(y) return Point(len(self.lines[y]), y, "logical") def get_point_offset(self, p): '''used to find positions in data string''' self._validate_point(p) offset = 0 for line in self.lines[:p.y]: offset += len(line) + 1 offset += p.x return offset def get_offset_point(self, offset): i = 0 y = 0 for line in self.lines: if i + len(line) + 1 > offset: break else: i += len(line) + 1 y += 1 return point.Point(offset - i, y) # data retrieval def get_character(self, p): self._validate_point(p) if p.x == len(self.lines[p.y]): if p1.y < len(self.lines): return "\n" else: return "" else: return self.lines[p.y][p.x] def get_substring(self, p1, p2): self._validate_point(p1) self._validate_point(p2) assert p1 <= p2, "p1.x (%d) > p2.x (%d)" % (p1.x, p2.x) if p1 == p2: return "" elif p1.y == p2.y: return self.lines[p1.y][p1.x:p2.x] else: if p1.x == 0: text = "%s\n" % (self.lines[p1.y]) else: text = "%s\n" % (self.lines[p1.y][p1.x:]) for i in range(p1.y+1, p2.y): text = "%s%s\n" % (text, self.lines[i]) if p2.x > 0: text = "%s%s" % (text, self.lines[p2.y][:p2.x]) return text def set_data(self, d, force=False): if not force and self.readonly(): raise Exception, "set_data: buffer is readonly" start = self.get_buffer_start() end = self.get_buffer_end() self.delete_string(start, end, force=force) self.insert_string(start, d, force=force) self.modified = True # insertion into buffer def insert_string(self, p, s, stack="undo", force=False): if not force: assert not self.readonly(), "insert_string: buffer is read-only" new_lines = s.split("\n") if len(new_lines) > 1: xdiff = len(new_lines[-1]) - p.x else: xdiff = len(new_lines[-1]) ydiff = len(new_lines) - 1 new_lines[0] = self.lines[p.y][:p.x] + new_lines[0] new_lines[-1] = new_lines[-1] + self.lines[p.y][p.x:] self.lines[p.y:p.y+1] = new_lines self._region_added(p, xdiff, ydiff, s, stack) self.modified = True # deletion from buffer def delete_character(self, p, stack="undo", force=False): """delete character at (x,y) from the buffer""" if not force: assert not self.readonly(), "delete_character: buffer is read-only" self._validate_point(p) x, y = p.x, p.y if p.x < len(self.lines[p.y]): s = self.lines[y][x] self.lines[y] = "%s%s" % (self.lines[y][:x], self.lines[y][x+1:]) self._region_removed(p, p.offset(1, 0, "logical"), str=s, stack=stack) elif p.y < len(self.lines) - 1: s = "\n" self.lines[y:y+2] = ["%s%s" % (self.lines[y], self.lines[y+1])] self._region_removed(p, point.Point(0, p.y + 1, "logical"), str="\n", stack=stack) self.modified = True def delete_string(self, p1, p2, stack="undo", force=False): """delete characters from p1 up to p2 from the buffer""" if not force: assert not self.readonly(), "delete_string: buffer is read-only" self._validate_xy(p1.x, p1.y) self._validate_xy(p2.x, p2.y) if p1 == p2: return assert p1 < p2, "p1 %r > p2 %r" % (p1, p2) s = self.get_substring(p1, p2) if p1.y < p2.y: start_line = self.lines[p1.y][:p1.x] end_line = self.lines[p2.y][p2.x:] self.lines[p1.y:p2.y+1] = ["%s%s" % (start_line, end_line)] elif p1.y == p2.y: if p1.x == p2.x - 1: s = self.lines[p1.y][p1.x] self.delete_character(p1, stack=stack) # make sure we don't call _region_removed twice, so return return elif p1.x < p2.x: s = self.lines[p1.y][p1.x:p2.x] self.lines[p1.y] = "%s%s" % (self.lines[p1.y][:p1.x], self.lines[p1.y][p2.x:]) else: raise Exception, "p1.x (%d) >= p2.x (%d)" % (p1.x, p2.x) else: raise Exception, "p1.y (%d) > p2.y (%d)" % (p1.y, p2.y) self._region_removed(p1, p2, str=s, stack=stack) self.modified = True # random def count_leading_whitespace(self, y): line = self.lines[y] m = regex.leading_whitespace.match(line) if m: return m.end() else: # should not happen raise Exception, "iiiijjjj" return 0 # scratch is a singleton scratch = None class ScratchBuffer(Buffer): def __new__(cls, *args, **kwargs): global scratch if scratch is None: scratch = object.__new__(ScratchBuffer, *args, **kwargs) return scratch def name(self): return "*Scratch*" def close(self): global scratch scratch = None class DataBuffer(Buffer): def __init__(self, name, data, nl='\n'): Buffer.__init__(self, nl) self._name = name self.lines = data.split("\n") def name(self): return self._name def close(self): pass def readonly(self): return True # console is another singleton console = None class ConsoleBuffer(Buffer): def __new__(cls, *args, **kwargs): global console if console is None: b = object.__new__(ConsoleBuffer, *args, **kwargs) console = b return console def __init__(self, nl='\n'): Buffer.__init__(self, nl) lines = ['Python Console\n', "Evaluate python expressions in the editor's context (self)\n", 'Press Control-] to exit\n', '\n'] console.set_data(''.join(lines), force=True) def name(self): return '*Console*' def changed(self): return False def close(self): global console console = None def readonly(self): return True class FileBuffer(Buffer): def __init__(self, path, nl='\n', name=None): '''fb = FileBuffer(path)''' Buffer.__init__(self, nl) self.path = os.path.realpath(path) self.checksum = None if name is None: self._name = os.path.basename(self.path) else: self._name = name if os.path.exists(self.path) and not os.access(self.path, os.W_OK): self._readonly = True else: self._readonly = False def readonly(self): return self._readonly def _open_file_r(self, path=None): if path is None: path = self.path path = os.path.realpath(path) self.path = path if not os.path.isfile(path): raise Exception, "Path '%s' does not exist" % (path) if not os.access(path, os.R_OK): raise Exception, "Path '%s' cannot be read" % (path) f = open(path, 'r') return f def _open_file_w(self, path=None): if path is None: path = self.path if os.path.isfile(path): raise Exception, "Path '%s' already exists" % (path) d = os.path.dirname(path) if not os.access(d, os.R_OK): raise Exception, "Dir '%s' cannot be read" % (path) if not os.access(d, os.W_OK): raise Exception, "Dir '%s' cannot be written" % (path) f = open(path, 'w') return f def _temp_path(self, path=None): if path is None: path = self.path (dirname, basename) = os.path.split(path) return os.path.join(dirname, ".__%s__pmacs" % (basename)) # methods for dealing with the underlying resource, etc. def name(self): #return self.path return self._name def path_exists(self): return os.path.exists(self.path) def store_checksum(self, data): self.checksum = md5.new(data) def read(self): if self.path_exists(): f = self._open_file_r() data = f.read() f.close() self.store_checksum(data) else: data = '' data = self.read_filter(data) #FIXME: this is horrible...but maybe not as horrible as using tabs?? data = data.replace("\t", " ") return data def open(self): data = self.read() self.lines = data.split(self.nl) def reload(self): self.open() def changed_on_disk(self): assert self.checksum is not None f = open(self.path) data = f.read() f.close() m = md5.new(data) return self.checksum.digest() != m.digest() def save(self, force=False): if self.readonly(): raise Exception, "can't save a read-only file" if self.checksum is not None and force is False: # the file already existed and we took a checksum so make sure it's # still the same right now if not self.path_exists(): raise Exception, "oh no! %r disappeared!" % self.path if self.changed_on_disk(): raise Exception, "oh no! %r has changed on-disk!" % self.path temp_path = self._temp_path() data = self.make_string(nl=self.nl) data = self.write_filter(data) f2 = self._open_file_w(temp_path) f2.write(data) f2.close() if self.path_exists(): mode = os.stat(self.path)[0] os.chmod(temp_path, mode) shutil.move(temp_path, self.path) self.store_checksum(data) self.modified = False def save_as(self, path): self.path = path self.save() class AesBuffer(FileBuffer): def __init__(self, path, password, nl='\n'): '''fb = FileBuffer(path)''' FileBuffer.__init__(self, path, nl) self.password = password def read_filter(self, data): return aes.decrypt(data, self.password) def write_filter(self, data): return aes.encrypt(data, self.password)