import datetime, grp, md5, os, pwd, re, sets, shutil, stat, string import aes, dirutil, regex, highlight from point import Point # undo/redo stack constants ACT_NORM = 0 ACT_UNDO = 1 ACT_REDO = 2 STACK_LIMIT = 1024 # used for undo/redo stacks when text will need to be added back class AddMove: def __init__(self, buffer, p, lines): self.buffer = buffer self.p = p self.lines = lines def restore(self, act=ACT_UNDO): assert act == ACT_UNDO or act == ACT_REDO self.buffer.insert_lines(self.p, self.lines, act) def getpos(self): return self.p # used for undo/redo stacks when text will need to be removed class DelMove: def __init__(self, buffer, p1, p2): self.buffer = buffer self.p1 = p1 self.p2 = p2 def restore(self, act): assert act == ACT_UNDO or act == ACT_REDO self.buffer.delete(self.p1, self.p2, act) def getpos(self): return self.p1 # abstract class class Buffer(object): btype = 'generic' def __init__(self, nl='\n', stack_limit=STACK_LIMIT): assert nl in ('\n', '\r', '\r\n'), "Invalid line ending" self.lines = [""] self.windows = [] self.undo_stack = [] self.redo_stack = [] self.stack_limit = stack_limit self.nl = nl self.modified = False self.highlights = {} # basic file operation stuff def _open_file_r(self, path): path = os.path.realpath(path) if not os.path.isfile(path): raise Exception, "Path '%s' does not exist" % (path) if not os.access(path, os.R_OK): raise Exception, "Path '%s' cannot be read" % (path) f = open(path, 'r') return f def _open_file_w(self, path): if os.path.isfile(path): raise Exception, "Path '%s' already exists" % (path) d = os.path.dirname(path) if not os.access(d, os.R_OK): raise Exception, "Dir '%s' cannot be read" % (path) if not os.access(d, os.W_OK): raise Exception, "Dir '%s' cannot be written" % (path) f = open(path, 'w') return f def _temp_path(self, path): (dirname, basename) = os.path.split(path) return os.path.join(dirname, ".__%s__pmacs" % (basename)) # undo/redo stack def _stack_trim(self, stack): if self.stack_limit: while len(stack) > self.stack_limit: stack.pop(0) def add_to_stack(self, move, act): if act == ACT_NORM: self.redo_stack = [] self.undo_stack.append(move) self._stack_trim(self.undo_stack) elif act == ACT_UNDO: self.redo_stack.append(move) self._stack_trim(self.redo_stack) elif act == ACT_REDO: self.undo_stack.append(move) self._stack_trim(self.undo_stack) else: raise Exception, "Invalid act: %d" % (act) def undo(self): if len(self.undo_stack): move = self.undo_stack.pop(-1) move.restore(ACT_UNDO) return move.getpos() else: raise Exception, "Nothing to Undo!" def redo(self): if len(self.redo_stack): move = self.redo_stack.pop(-1) move.restore(ACT_REDO) return move.getpos() else: raise Exception, "Nothing to Redo!" # window-buffer communication def add_window(self, w): if w not in self.windows: self.windows.append(w) modename = w.mode.name() if modename not in self.highlights and w.mode.lexer is not None: self.highlights[modename] = highlight.Highlighter(w.mode.lexer) self.highlights[modename].highlight(self.lines) #if modename not in self.tabbing and w.mode.tabber is not None: # self.tabbing[modename] = def remove_window(self, w): if w in self.windows: self.windows.remove(w) modename = w.mode.name() found = False for w2 in self.windows: if w2.mode.name() == modename: found = True break if not found: del self.highlights[modename] def _region_add(self, p1, p2, lines, act): move = DelMove(self, p1, p2) self.add_to_stack(move, act) for w in self.windows: w.region_added(p1, lines) for name in self.highlights: self.highlights[name].relex_add(self.lines, p1.y, p1.x, lines) def _region_del(self, p1, p2, lines, act): move = AddMove(self, p1, lines) self.add_to_stack(move, act) for w in self.windows: w.region_removed(p1, p2) for name in self.highlights: self.highlights[name].relex_del(self.lines, p1.y, p1.x, p2.y, p2.x) # internal validation def _validate_point(self, p): self._validate_xy(p.x, p.y) def _validate_xy(self, x, y): assert y >= 0 and y < len(self.lines), \ "xy1: %d >= 0 and %d < %d" % (y, y, len(self.lines)) assert x >= 0 and x <= len(self.lines[y]), \ "xy2: %d >= 0 and %d <= %d" % (x, x, len(self.lines[y])) def _validate_y(self, y): assert y >= 0 and y < len(self.lines), \ "y: %d >= 0 and %d < %d" % (y, y, len(self.lines)) # deal with the actual logical document string def num_chars(self): n = 0 for line in self.lines[:-1]: n += len(line) + 1 n += len(self.lines[-1]) return n def num_lines(self): return len(self.lines) def make_string(self, nl='\n'): return nl.join(self.lines) # methods to be overridden by subclasses def name(self): return "Generic" def close(self): pass def open(self): pass def changed(self): return self.modified def reload(self): raise Exception, "%s reload: Unimplemented" % (self.name()) def save_as(self, path, force=False): # check to see if the path exists, and if we're prepared to overwrite it # if yes to both, get its mode so we can preserve the path's permissions mode = None if os.path.exists(path): if force: mode = os.stat(self.path)[0] else: raise Exception, "oh no! %r already exists" % path # create the string that we're going to write into the file data = self.write_filter(self.make_string(nl=self.nl)) # create a safe temporary path to write to, and write out data to it temp_path = self._temp_path() f2 = self._open_file_w(temp_path) f2.write(data) f2.close() # move the temporary file to the actual path; maybe change permissions shutil.move(temp_path, path) if mode: os.chmod(path, mode) # the file has not been modified now self.modified = False def readonly(self): return False def read_filter(self, data): return data def write_filter(self, data): return data # point retrieval def get_buffer_start(self): return Point(0, 0) def get_buffer_end(self): return Point(len(self.lines[-1]), len(self.lines) - 1) # data retrieval def get_sublines(self, p1, p2): self._validate_point(p1) self._validate_point(p2) assert p1 <= p2, "p1.x (%d) > p2.x (%d)" % (p1.x, p2.x) lines = [] x = p1.x for i in range(p1.y, p2.y): lines.append(self.lines[i][x:]) x = 0 lines.append(self.lines[p2.y][x:p2.x]) return lines def get_substring(self, p1, p2): lines = self.get_sublines(p1, p2) return '\n'.join(lines) # buffer set def set_lines(self, lines, force=False): if not force and self.readonly(): raise Exception, "set_data: buffer is readonly" start = self.get_buffer_start() end = self.get_buffer_end() self.delete(start, end, force=force) self.insert_lines(start, lines, force=force) self.modified = True def set_data(self, data, force=False): lines = data.split('\n') self.set_lines(lines, force) # insertion into buffer def insert_lines(self, p, lines, act=ACT_NORM, force=False): #if lines == ['(']: # raise Exception, "damn" llen = len(lines) assert llen > 0 if not force: assert not self.readonly(), "insert_string: buffer is read-only" p2 = p.vadd(len(lines[-1]), llen - 1) if llen > 1: self.lines.insert(p.y + 1, []) self.lines[p.y + 1] = lines[-1] + self.lines[p.y][p.x:] self.lines[p.y] = self.lines[p.y][:p.x] + lines[0] for i in range(1, llen - 1): self.lines.insert(p.y + i, lines[i]) else: self.lines[p.y] = self.lines[p.y][:p.x] + lines[-1] + self.lines[p.y][p.x:] self._region_add(p, p2, lines, act) self.modified = True def insert_string(self, p, s, act=ACT_NORM, force=False): lines = s.split("\n") self.insert_lines(p, lines, act, force) # deletion from buffer def delete(self, p1, p2, act=ACT_NORM, force=False): """delete characters from p1 up to p2 from the buffer""" if not force: assert not self.readonly(), "delete_string: buffer is read-only" self._validate_point(p1) self._validate_point(p2) if p1 == p2: return assert p1 < p2, "p1 %r > p2 %r" % (p1, p2) lines = self.get_sublines(p1, p2) line1 = self.lines[p1.y] line2 = self.lines[p2.y] self.lines[p1.y:p2.y+1] = ["%s%s" % (line1[:p1.x], line2[p2.x:])] self._region_del(p1, p2, lines, act) self.modified = True def delete_char(self, p, act=ACT_NORM, force=False): if p.x == len(self.lines[p.y]): p2 = Point(0, p.y + 1) else: p2 = Point(p.x + 1, p.y) self.delete(p, p2, act=act, force=force) def overwrite_char(self, p, c, act=ACT_NORM, force=False): self.delete_char(p, act=act, force=force) self.insert_string(p, c, act=act, force=force) def delete_line(self, y, act=ACT_NORM, force=False): line = self.lines[y] p1 = Point(0, y) if y < len(self.lines) - 1: p2 = Point(0, y + 1) else: p2 = Point(len(self.lines[-1]), y) self.delete(p1, p2, act, force) # random def count_leading_whitespace(self, y): line = self.lines[y] m = regex.leading_whitespace.match(line) if m: return m.end() else: # should not happen raise Exception, "iiiijjjj" # scratch is a singleton scratch = None class ScratchBuffer(Buffer): btype = 'scratch' def __new__(cls, *args, **kwargs): global scratch if scratch is None: scratch = object.__new__(ScratchBuffer, *args, **kwargs) return scratch def name(self): return "*Scratch*" def close(self): global scratch scratch = None class DataBuffer(Buffer): btype = 'data' def __init__(self, name, data, nl='\n'): Buffer.__init__(self, nl) self._name = name self.lines = data.split("\n") def name(self): return self._name def close(self): pass def readonly(self): return True # console is another singleton console = None class ConsoleBuffer(Buffer): btype = 'console' modename = 'console' def __new__(cls, *args, **kwargs): global console if console is None: b = object.__new__(ConsoleBuffer, *args, **kwargs) console = b return console def __init__(self, nl='\n'): Buffer.__init__(self, nl) self.clear() def clear(self): lines = ['Python Console\n', "Evaluate python expressions in the editor's context (self)\n", 'Press Control-] to exit\n', '\n'] console.set_data(''.join(lines), force=True) def name(self): return '*Console*' def changed(self): return False def close(self): global console console = None def readonly(self): return True class BinaryDataException(Exception): pass class FileBuffer(Buffer): btype = 'file' def __init__(self, path, nl='\n', name=None): '''fb = FileBuffer(path)''' Buffer.__init__(self, nl) self.path = os.path.realpath(path) self.checksum = None if name is None: self._name = os.path.basename(self.path) else: self._name = name if os.path.exists(self.path) and not os.access(self.path, os.W_OK): self._readonly = True else: self._readonly = False def readonly(self): return self._readonly def _open_file_r(self, path=None): if path is None: path = self.path path = os.path.realpath(path) self.path = path if not os.path.isfile(path): raise Exception, "Path '%s' does not exist" % (path) if not os.access(path, os.R_OK): raise Exception, "Path '%s' cannot be read" % (path) f = open(path, 'r') return f def _open_file_w(self, path=None): if path is None: path = self.path if os.path.isfile(path): raise Exception, "Path '%s' already exists" % (path) d = os.path.dirname(path) if not os.access(d, os.R_OK): raise Exception, "Dir '%s' cannot be read" % (path) if not os.access(d, os.W_OK): raise Exception, "Dir '%s' cannot be written" % (path) f = open(path, 'w') return f def _temp_path(self, path=None): if path is None: path = self.path (dirname, basename) = os.path.split(path) return os.path.join(dirname, ".__%s__pmacs" % (basename)) # methods for dealing with the underlying resource, etc. def name(self): #return self.path return self._name def path_exists(self): return os.path.exists(self.path) def store_checksum(self, data): self.checksum = md5.new(data) def read(self): if self.path_exists(): f = self._open_file_r() data = f.read() f.close() self.store_checksum(data) else: data = '' data = self.read_filter(data) data = data.replace("\t", " ") for i in range(0, min(len(data), 8)): if data[i] not in string.printable: raise BinaryDataException, "binary files are not supported" #FIXME: this is horrible...but maybe not as horrible as using tabs?? return data def open(self): data = self.read() self.lines = data.split(self.nl) def reload(self): self.open() def changed_on_disk(self): assert self.checksum is not None f = open(self.path) data = f.read() f.close() m = md5.new(data) return self.checksum.digest() != m.digest() def save(self, force=False): if self.readonly(): raise Exception, "can't save a read-only file" if self.checksum is not None and force is False: # the file already existed and we took a checksum so make sure it's # still the same right now if not self.path_exists(): raise Exception, "oh no! %r disappeared!" % self.path if self.changed_on_disk(): raise Exception, "oh no! %r has changed on-disk!" % self.path temp_path = self._temp_path() data = self.make_string(nl=self.nl) if self.windows[0].mode.savetabs: data = data.replace(" ", "\t") data = self.write_filter(data) f2 = self._open_file_w(temp_path) f2.write(data) f2.close() if self.path_exists(): mode = os.stat(self.path)[0] os.chmod(temp_path, mode) shutil.move(temp_path, self.path) self.store_checksum(data) self.modified = False def save_as(self, path): self.path = path self.save() class AesBuffer(FileBuffer): btype = 'aesfile' def __init__(self, path, password, nl='\n', name=None): '''fb = FileBuffer(path)''' FileBuffer.__init__(self, path, nl, name) self.password = password def read_filter(self, data): return aes.decrypt_data(data, self.password) def write_filter(self, data): return aes.encrypt_data(data, self.password) class Binary32Buffer(FileBuffer): btype = 'bin32file' wordsize = 4 numwords = 4 def __init__(self, path, nl='\n', name=None): '''fb = FileBuffer(path)''' FileBuffer.__init__(self, path, nl, name) def read_filter(self, data): lines = [] i = 0 while i < len(data): j = 0 words = [] while j < self.numwords * self.wordsize and i + j < len(data): nibbles = [] for c in data[i + j:i + j + self.wordsize]: nibbles.append(string.hexdigits[ord(c) / 16]) nibbles.append(string.hexdigits[ord(c) % 16]) words.append(''.join(nibbles)) j += self.wordsize lines.append(' '.join(words)) i += self.numwords * self.wordsize return '\n'.join(lines) def write_filter(self, data): bytes = [] lastc = None for c in data: if c not in '0123456789abcdefABCDEF': pass elif lastc is None: lastc = c else: bytes.append(chr(int(lastc + c, 16))) lastc = None if lastc is not None: bytes.append(chr(int(lastc + '0', 16))) return ''.join(bytes) class Binary64Buffer(Binary32Buffer): wordsize = 8 numwords = 2 class DirBuffer(Buffer): btype = 'dir' def __init__(self, path, nl='\n', name=None): Buffer.__init__(self, nl) self.path = os.path.realpath(path) def changed(self): return False def readonly(self): return True def name(self): return self.path def path_exists(self): return os.path.exists(self.path) def _get_names(self): if not self.path_exists(): raise Exception, "directory %r does not exists" % self.path names = os.listdir(self.path) if self.path != '/': names.insert(0, '..') names.insert(0, '.') return names def _make_path(self, name): return os.path.join(self.path, name) def _get_lines(self): names = self._get_names() fieldlines = [] maxlens = [0] * 5 for name in names: path = self._make_path(name) fields = dirutil.path_fields(path, name) for i in range(0, 5): try: maxlens[i] = max(maxlens[i], len(fields[i])) except: raise Exception, '%d %r' % (i, fields[i]) fieldlines.append(fields) fieldlines.sort(cmp=dirutil.path_sort) fmt = '%%%ds %%-%ds %%-%ds %%%ds %%%ds %%s' % tuple(maxlens) lines = [] for fields in fieldlines: s = fmt % fields lines.append(s) return lines def open(self): self.lines = self._get_lines() def reload(self): lines = self._get_lines() self.set_lines(lines, force=True) def save(self, force=False): raise Exception, "can't save a directory buffer" def save_as(self, path): raise Exception, "can't save a directory buffer" class PathListBuffer(DirBuffer): btype = 'pathlist' def __init__(self, name, paths, nl='\n'): Buffer.__init__(self, nl) self.paths = paths self.path = os.getcwd() self._name = name def path_exists(self): raise Exception def _get_names(self): cwd = os.getcwd() return [x.replace(cwd, '.', 1) for x in self.paths] def _make_path(self, name): if name.startswith('.'): return name.replace('.', os.getcwd(), 1) else: return name def name(self): return self._name class AboutBuffer(DataBuffer): btype = 'about' data = ''' ================================================================================ ************ ********** ****** ****** **** ******** ********* ************** ****************** ************* ********** *********** ******* ***** ****** ***** **** **** ****** **** **** ***** **** *** *** *** *** *** **** **** **** ******* *** *** *** *** *** **** **** **** ******* ***** ***** ***** ***** **** **** ****** **** **** **** ***** ************ ***** ***** **** ************* ********** *********** ********** ***** ***** **** ****** **** ******** ********* *** *** pmacs is a python-based text editor by Erik Osheim, (c) 2005-2007 ***** pmacs is available to you under the GNU General Public License v2 ***** ***** to see all available commands use: C-c M-h (show-bindings-buffer) ================================================================================ ''' modename = 'about' def __init__(self): DataBuffer.__init__(self, '*About*', self.data)