pmacs3/buffer/__init__.py

797 lines
26 KiB
Python

from util import defaultdict
import codecs, datetime, grp, os, pwd, re, shutil, stat, string
import fcntl, select, pty, threading
import aes, dirutil, regex, highlight, lex, term
from point import Point
from subprocess import Popen, PIPE, STDOUT
from keyinput import MAP
# undo/redo stack constants
ACT_NONE = -1
ACT_NORM = 0
ACT_UNDO = 1
ACT_REDO = 2
STACK_LIMIT = 1024
def hasher(data):
try:
import hashlib
m = hashlib.md5(data)
except:
import md5
m = md5.new(data)
return m
class ReadOnlyError(Exception): pass
class FileChangedError(Exception): pass
class FileGoneError(Exception): pass
# used for undo/redo stacks when text will need to be added back
class AddMove(object):
def __init__(self, buffer, p, lines):
self.buffer = buffer
self.p = p
self.lines = lines
self.undo_id = buffer.undo_id
def restore(self, act=ACT_UNDO):
assert act == ACT_UNDO or act == ACT_REDO
self.buffer.insert_lines(self.p, self.lines, act)
def getpos(self):
return self.p
# used for undo/redo stacks when text will need to be removed
class DelMove(object):
def __init__(self, buffer, p1, p2):
self.buffer = buffer
self.p1 = p1
self.p2 = p2
self.undo_id = buffer.undo_id
def restore(self, act):
assert act == ACT_UNDO or act == ACT_REDO
self.buffer.delete(self.p1, self.p2, act)
def getpos(self):
return self.p1
# abstract class
class Buffer(object):
btype = 'generic'
modename = None
mac_re = re.compile('\r(?!\n)')
unix_re = re.compile('(?<!\r)\n')
win_re = re.compile('\r\n')
def __init__(self, stack_limit=STACK_LIMIT):
self.lines = [""]
self.windows = []
self.undo_id = 1
self.undo_stack = []
self.redo_stack = []
self.stack_limit = stack_limit
self.nl = '\n'
self.modified = False
self.highlights = {}
self.settings = {}
self.indentlvl = 4
self.writetabs = False
self.metadata = {}
def _detect_nl_type(self, data):
mac_c = len(self.mac_re.findall(data))
unix_c = len(self.unix_re.findall(data))
win_c = len(self.win_re.findall(data))
if (unix_c and mac_c) or (unix_c and win_c) or (mac_c and win_c):
# warn the user?
pass
if unix_c >= win_c and unix_c >= mac_c:
return '\n'
elif mac_c >= win_c:
return '\r'
else:
return '\r\n'
# basic file operation stuff
def _open_file_r(self, path):
path = os.path.realpath(path)
if not os.path.isfile(path):
raise Exception("Path '%s' does not exist" % (path))
if not os.access(path, os.R_OK):
raise Exception("Path '%s' cannot be read" % (path))
f = open(path, 'r')
return f
def _open_file_w(self, path):
if os.path.isfile(path):
raise Exception("Path '%s' already exists" % (path))
d = os.path.dirname(path)
if not os.access(d, os.R_OK):
raise Exception("Dir '%s' cannot be read" % (path))
if not os.access(d, os.W_OK):
raise Exception("Dir '%s' cannot be written" % (path))
f = open(path, 'w')
return f
def _temp_path(self, path):
(dirname, basename) = os.path.split(path)
return os.path.join(dirname, ".__%s__pmacs" % (basename))
# undo/redo stack
def _stack_trim(self, stack):
if self.stack_limit:
while len(stack) > self.stack_limit:
stack.pop(0)
def add_to_stack(self, move, act):
if act == ACT_NONE:
pass
elif act == ACT_NORM:
self.redo_stack = []
self.undo_stack.append(move)
self._stack_trim(self.undo_stack)
elif act == ACT_UNDO:
self.redo_stack.append(move)
self._stack_trim(self.redo_stack)
elif act == ACT_REDO:
self.undo_stack.append(move)
self._stack_trim(self.undo_stack)
else:
raise Exception("Invalid act: %d" % (act))
def undo(self):
if len(self.undo_stack):
undo_id = self.undo_stack[-1].undo_id
pos = None
while self.undo_stack and self.undo_stack[-1].undo_id == undo_id:
move = self.undo_stack.pop(-1)
move.restore(ACT_UNDO)
pos = move.getpos()
return pos
else:
raise Exception("Nothing to Undo!")
def redo(self):
if len(self.redo_stack):
undo_id = self.redo_stack[-1].undo_id
pos = None
while self.redo_stack and self.redo_stack[-1].undo_id == undo_id:
move = self.redo_stack.pop(-1)
move.restore(ACT_REDO)
pos = move.getpos()
return pos
else:
raise Exception("Nothing to Redo!")
# window-buffer communication
def add_window(self, w):
if w not in self.windows:
self.windows.append(w)
modename = w.mode.name
if modename not in self.highlights and w.mode.lexer is not None:
self.highlights[modename] = highlight.Highlighter(w.mode.lexer)
self.highlights[modename].highlight(self.lines)
if modename not in self.settings:
self.settings[modename] = {}
def remove_window(self, w):
if w in self.windows:
self.windows.remove(w)
modename = w.mode.name
if w.mode.name in self.highlights:
for w2 in self.windows:
if w2.mode.name == w.mode.name:
return
del self.highlights[w.mode.name]
def _region_add(self, p1, p2, lines, act):
move = DelMove(self, p1, p2)
self.add_to_stack(move, act)
for name in self.highlights:
self.highlights[name].relex_add(self.lines, p1.y, p1.x, lines)
for w in self.windows:
w.region_added(p1, lines)
def _region_del(self, p1, p2, lines, act):
move = AddMove(self, p1, lines)
self.add_to_stack(move, act)
for name in self.highlights:
self.highlights[name].relex_del(self.lines, p1.y, p1.x, p2.y, p2.x)
for w in self.windows:
w.region_removed(p1, p2)
# internal validation
def _validate_point(self, p):
self._validate_xy(p.x, p.y)
def _validate_xy(self, x, y):
assert y >= 0 and y < len(self.lines), \
"xy1: %d >= 0 and %d < %d" % (y, y, len(self.lines))
assert x >= 0 and x <= len(self.lines[y]), \
"xy2: %d >= 0 and %d <= %d" % (x, x, len(self.lines[y]))
def _validate_y(self, y):
assert y >= 0 and y < len(self.lines), \
"y: %d >= 0 and %d < %d" % (y, y, len(self.lines))
# deal with the actual logical document string
def num_chars(self):
n = 0
for line in self.lines[:-1]:
n += len(line) + 1
n += len(self.lines[-1])
return n
def num_lines(self):
return len(self.lines)
def make_string(self):
if self.writetabs:
lines = []
for line in self.lines:
i = 0
while i < len(line) and line[i] == ' ':
i += 1
j, k = i // self.indentlvl, i % self.indentlvl
lines.append(('\t' * j) + (' ' * k) + line[i:])
return self.nl.join(lines)
else:
return self.nl.join(self.lines)
# methods to be overridden by subclasses
def name(self):
return "Generic"
def close(self):
pass
def open(self):
pass
def changed(self):
return self.modified
def changed_on_disk(self):
return False
def reload(self):
raise Exception("%s reload: Unimplemented" % (self.name()))
def save_as(self, path, force=False):
# check to see if the path exists, and if we're prepared to overwrite it
# if yes to both, get its mode so we can preserve the path's permissions
mode = None
if os.path.exists(path):
if force:
mode = os.stat(self.path)[0]
else:
# XYZ
raise Exception("oh no! %r already exists" % path)
# create the string that we're going to write into the file
data = self.write_filter(self.make_string())
# create a safe temporary path to write to, and write out data to it
temp_path = self._temp_path()
f2 = self._open_file_w(temp_path)
f2.write(data)
f2.close()
# move the temporary file to the actual path; maybe change permissions
shutil.move(temp_path, path)
if mode:
os.chmod(path, mode)
# the file has not been modified now
self.modified = False
def readonly(self):
return False
def read_filter(self, data):
return data
def write_filter(self, data):
return data
# point retrieval
def get_buffer_start(self):
return Point(0, 0)
def get_buffer_end(self):
return Point(len(self.lines[-1]), len(self.lines) - 1)
# data retrieval
def get_sublines(self, p1, p2):
self._validate_point(p1)
self._validate_point(p2)
assert p1 <= p2, "p1.x (%d) > p2.x (%d)" % (p1.x, p2.x)
lines = []
x = p1.x
for i in range(p1.y, p2.y):
lines.append(self.lines[i][x:])
x = 0
lines.append(self.lines[p2.y][x:p2.x])
return lines
def get_substring(self, p1, p2):
lines = self.get_sublines(p1, p2)
return '\n'.join(lines)
# buffer set
def set_lines(self, lines, force=False):
if not force and self.readonly():
raise Exception("set_data: buffer is readonly")
start = self.get_buffer_start()
end = self.get_buffer_end()
self.delete(start, end, force=force)
self.insert_lines(start, lines, force=force)
self.modified = True
def set_data(self, data, force=False):
lines = data.split('\n')
self.set_lines(lines, force)
# append into buffer
def append_lines(self, lines, act=ACT_NORM, force=False):
p = self.get_buffer_end()
self.insert_lines(p, lines, act, force)
def append_string(self, s, act=ACT_NORM, force=False):
lines = s.split("\n")
self.insert_lines(lines, act, force)
# insertion into buffer
def insert_lines(self, p, lines, act=ACT_NORM, force=False):
llen = len(lines)
assert llen > 0
if not force and self.readonly():
raise ReadOnlyError("buffer is read-only")
p2 = p.vadd(len(lines[-1]), llen - 1)
if llen > 1:
self.lines.insert(p.y + 1, [])
self.lines[p.y + 1] = lines[-1] + self.lines[p.y][p.x:]
self.lines[p.y] = self.lines[p.y][:p.x] + lines[0]
for i in range(1, llen - 1):
self.lines.insert(p.y + i, lines[i])
else:
self.lines[p.y] = self.lines[p.y][:p.x] + lines[-1] + self.lines[p.y][p.x:]
self._region_add(p, p2, lines, act)
self.modified = True
def insert_string(self, p, s, act=ACT_NORM, force=False):
lines = s.split("\n")
self.insert_lines(p, lines, act, force)
# deletion from buffer
def delete(self, p1, p2, act=ACT_NORM, force=False):
"""delete characters from p1 up to p2 from the buffer"""
if not force and self.readonly():
raise ReadOnlyError("buffer is read-only")
self._validate_point(p1)
self._validate_point(p2)
if p1 == p2:
return
assert p1 < p2, "p1 %r > p2 %r" % (p1, p2)
lines = self.get_sublines(p1, p2)
line1 = self.lines[p1.y]
line2 = self.lines[p2.y]
self.lines[p1.y:p2.y+1] = ["%s%s" % (line1[:p1.x], line2[p2.x:])]
self._region_del(p1, p2, lines, act)
self.modified = True
def delete_char(self, p, act=ACT_NORM, force=False):
if p.x == len(self.lines[p.y]):
p2 = Point(0, p.y + 1)
else:
p2 = Point(p.x + 1, p.y)
self.delete(p, p2, act=act, force=force)
def overwrite_char(self, p, c, act=ACT_NORM, force=False):
self.delete_char(p, act=act, force=force)
self.insert_string(p, c, act=act, force=force)
def delete_line(self, y, act=ACT_NORM, force=False):
line = self.lines[y]
p1 = Point(0, y)
if y < len(self.lines) - 1:
p2 = Point(0, y + 1)
else:
p2 = Point(len(self.lines[-1]), y)
self.delete(p1, p2, act, force)
# random
def is_whitespace(self, y):
return regex.whitespace.match(self.lines[y])
def count_leading_whitespace(self, y):
m = regex.leading_whitespace.match(self.lines[y])
return m.end()
def detect_indent_level(self, y1, y2):
x = None
for y in range(y1, y2):
if self.is_whitespace(y):
continue
c = self.count_leading_whitespace(y)
if x is None:
x = c
else:
x = min(x, c)
return x or 0
# generic window functionality
def forward(self, p):
if p.x < len(self.lines[p.y]):
return Point(p.x + 1, p.y)
elif p.y < len(self.lines) - 1:
return Point(0, p.y + 1)
else:
return p
def backward(self, p):
if p.x > 0:
return Point(p.x - 1, p.y)
elif p.y > 0:
x = len(self.lines[p.y - 1])
return Point(x, p.y - 1)
else:
return p
def end_of_line(self, p):
return Point(len(self.lines[p.y]), p.y)
def start_of_line(self, p):
return Point(0, p.y)
def previous_line(self, p):
if p.y > 0:
return Point(p.x, p.y - 1)
else:
return p
def next_line(self, p):
if p.y < len(self.lines) - 1:
return Point(p.x, p.y + 1)
else:
return p
def left_delete(self, p):
(x, y) = p.xy()
if x > 0:
self.delete_char(Point(x - 1, y))
elif y > 0:
x = len(self.lines[y - 1])
self.delete_char(Point(x, y - 1))
def right_delete(self, p):
if (p.y < len(self.lines) - 1 or
p.x < len(self.lines[-1])):
self.delete_char(p)
class InterpreterPipeError(Exception):
pass
class InterpreterBuffer(Buffer):
_basename = 'Interpreter'
def create_name(cls, parent):
if hasattr(parent, 'path'):
return '*%s:%s*' % (cls._basename, parent.name())
else:
return '*%s*' % cls._basename
create_name = classmethod(create_name)
btype = 'interpreter'
readre = re.compile('^([A-Z]+):(.*)\n$')
def __init__(self, parent, app):
self.application = app
if parent and hasattr(parent, 'path'):
self.parent = parent
else:
self.parent = None
Buffer.__init__(self)
cmd = self.get_cmd()
env = dict(os.environ)
env.update(self.get_env())
f = open('/dev/null', 'w')
#f = open('pipe-errors', 'w')
self.pipe = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=f, env=env)
self.prompt = '***'
self.clear()
self.pipe_read()
self._name = self.create_name(parent)
def name(self):
return self._name
def get_env(self):
return {}
def get_cmd(self):
raise Exception('unimplemented')
def pipe_readline(self):
if self.pipe.poll() is not None:
raise InterpreterPipeError('broken pipe')
line = self.pipe.stdout.readline()
m = self.readre.match(line)
if m:
return (m.group(1), m.group(2))
else:
return (None, line.rstrip())
def pipe_read(self):
lines = []
while True:
(type_, value) = self.pipe_readline()
if type_ == 'PROMPT':
self.prompt = value.strip() + ' '
break
value.rstrip()
if value:
lines.append(value)
if lines:
output = '\n'.join(lines) + '\n'
p = self.get_buffer_end()
self.insert_string(p, output, force=True)
def pipe_write(self, s):
self.pipe.stdin.write("%s\n" % s)
self.pipe.stdin.flush()
def completions(self, word):
self.pipe_write("COMPLETE:%s" % word)
candidates = self.pipe_read_completions()
self.pipe_read()
return candidates
def pipe_read_completions(self):
try:
(typ_, value) = self.pipe_readline()
assert typ_ == 'COMPLETIONS', '%r %r' % (typ_, value)
candidates = [x for x in value.split('|') if x]
return candidates
except:
return []
def clear(self):
self.set_data('', force=True)
def changed(self):
return False
def readonly(self):
return True
class IperlBuffer(InterpreterBuffer):
_basename = 'IPerl'
btype = 'iperl'
modename = 'iperl'
def create_name(cls, parent):
if parent and hasattr(parent, 'path'):
if parent.path.endswith('.pm'):
return '*%s:%s*' % (cls._basename, parent.name())
else:
raise Exception("not a perl module")
else:
return '*%s*' % cls._basename
create_name = classmethod(create_name)
def get_cmd(self):
if self.parent:
return ('iperl', '-p', '-r', self.parent.path)
else:
return ('iperl', '-p')
def get_env(self):
lib = ':'.join(self.application.config.get('perl.libs', []))
return {'PERL5LIB': lib}
def readline_completions(self, x1, x2, line):
self.pipe.stdin.write("READLINE:%d:%d:%s\n" % (x1, x2, line))
self.pipe.stdin.flush()
(typ_, value) = self.pipe_readline()
assert typ_ == 'COMPLETIONS', '%r %r' % (typ_, value)
candidates = [x for x in value.split('|') if x]
self.pipe_read()
return candidates
class IpythonBuffer(InterpreterBuffer):
_basename = 'IPython'
btype = 'ipython'
modename = 'ipython'
def get_cmd(self):
if self.parent:
return ('epython', '-p', '-r', self.parent.path)
else:
return ('epython', '-p')
def get_env(self):
return {'PYTHONPATH': self.application.config.get('python.lib', '.')}
class BinaryDataException(Exception):
pass
class FileBuffer(Buffer):
btype = 'file'
def __init__(self, path, name=None):
'''fb = FileBuffer(path)'''
Buffer.__init__(self)
self.path = os.path.realpath(path)
self.checksum = None
self.bytemark = ''
if name is None:
self._name = os.path.basename(self.path)
else:
self._name = name
if os.path.exists(self.path) and not os.access(self.path, os.W_OK):
self._readonly = True
else:
self._readonly = False
def readonly(self):
return self._readonly
def _open_file_r(self, path=None):
if path is None:
path = self.path
path = os.path.realpath(path)
self.path = path
if not os.path.isfile(path):
raise Exception("Path '%s' does not exist" % (path))
if not os.access(path, os.R_OK):
raise Exception("Path '%s' cannot be read" % (path))
f = open(path, 'r')
return f
def _open_file_w(self, path=None, preserve=True):
if path is None:
path = self.path
if preserve and os.path.isfile(path):
raise Exception("Path '%s' already exists" % (path))
d = os.path.dirname(path)
if not os.access(d, os.R_OK):
raise Exception("Dir '%s' cannot be read" % (path))
if not os.access(d, os.W_OK):
raise Exception("Dir '%s' cannot be written" % (path))
f = open(path, 'w')
return f
def _temp_path(self, path=None):
if path is None:
path = self.path
(dirname, basename) = os.path.split(path)
return os.path.join(dirname, ".__%s__pmacs" % (basename))
# methods for dealing with the underlying resource, etc.
def name(self):
return self._name
def path_exists(self):
return os.path.exists(self.path)
def store_checksum(self, data):
self.checksum = hasher(data)
def read(self):
if self.path_exists():
f = self._open_file_r()
data = f.read()
if '\t' in data:
self.writetabs = True
f.close()
self.store_checksum(data)
else:
data = ''
if data.startswith('\xEF\xBB\xBF'):
# utf-8 bytemark
self.bytemark = data[:3]
data = data[3:]
self.nl = self._detect_nl_type(data)
data = self.read_filter(data)
data = data.replace("\t", " ")
for i in range(0, min(len(data), 128)):
if data[i] not in string.printable:
raise BinaryDataException("binary files are not supported")
return data
def open(self):
data = self.read()
self.lines = data.split(self.nl)
def reload(self):
data = self.read()
self.set_data(data)
def changed_on_disk(self):
assert self.checksum is not None
f = open(self.path)
data = f.read()
f.close()
m = hasher(data)
return self.checksum.digest() != m.digest()
def save(self, force=False):
if self.readonly():
raise ReadOnlyError("can't save read-only file")
if self.checksum is not None and force is False:
# the file already existed and we took a checksum so make sure it's
# still the same right now
if not self.path_exists():
raise FileGoneError("oh no! %r disappeared!" % self.path)
if self.changed_on_disk():
raise FileChangedError("oh no! %r has changed on-disk!" % self.path)
exists = os.path.exists(self.path)
if exists:
temp_path = self._temp_path()
shutil.copyfile(self.path, temp_path)
try:
data = self.make_string()
if self.windows[0].mode.savetabs:
data = data.replace(" ", "\t")
data = self.write_filter(data)
f2 = self._open_file_w(self.path, preserve=False)
f2.write(self.bytemark + data)
f2.close()
except:
if exists: shutil.copyfile(temp_path, self.path)
else:
self.store_checksum(data)
self.modified = False
if exists: os.unlink(temp_path)
def save_as(self, path):
self.path = path
self.save()
class AesBuffer(FileBuffer):
btype = 'aesfile'
def __init__(self, path, password, name=None):
'''fb = FileBuffer(path)'''
FileBuffer.__init__(self, path, name)
self.password = password
def read_filter(self, data):
return aes.decrypt_data(data, self.password)
def write_filter(self, data):
return aes.encrypt_data(data, self.password)
class Binary32Buffer(FileBuffer):
btype = 'bin32file'
grouppad = 2
groupsize = 8
numgroups = 2
bytepad = 1
data = None
wordsize = 4
def __init__(self, path, name=None):
'''fb = FileBuffer(path)'''
FileBuffer.__init__(self, path, name)
def _detect_nl_type(self, data):
return '\n'
def cursorx_to_datax(self, cy, cx):
bytespace = 2 + self.bytepad
groupspace = bytespace * self.groupsize - self.bytepad + self.grouppad
groupmod = (cx + self.grouppad) % groupspace
if groupmod < self.grouppad:
return None
groupdiv = (cx + 2) // groupspace
if groupdiv >= self.numgroups:
return None
bytemod = (cx + self.bytepad - groupdiv) % bytespace
if bytemod == 0:
return None
bytediv = ((cx + self.bytepad) % groupspace) // bytespace
ix = self.groupsize * groupdiv + bytediv
if ix < len(self.rawdata[cy]):
return ix
else:
return None
def datax_to_cursorx(self, ix):
groupsize = (((2 + self.bytepad) * self.groupsize) + self.grouppad)
maxsize = groupsize * self.numgroups - self.grouppad
if ix < maxsize:
return (ix // self.groupsize) * (self.grouppad - 1) + ix * (2 + self.bytepad)
else:
return None
def datax_to_cursory(self, ix):
return ix // (self.groupsize * self.numgroups)
def get_address(self, cy, ix):
return (cy * self.numgroups * self.groupsize) + ix
def overwrite_char(self, p, c, act=ACT_NORM, force=False):
ix = self.cursorx_to_datax(p.y, p.x)
if ix is None:
return
Buffer.overwrite_char(self, p, c, act, force)
cx = self.datax_to_cursorx(ix)
c = chr(int(self.lines[p.y][cx:cx + 2], 16))
rawline = self.rawdata[p.y]
self.rawdata[p.y] = rawline[0:ix] + c + rawline[ix + 1:]
def read_filter(self, data):
bytepad = ' ' * self.bytepad
grouppad = ' ' * self.grouppad
self.rawdata = []
lines = []
i = 0
while i < len(data):
self.rawdata.append(data[i:i + self.numgroups * self.groupsize])
j = 0
groups = []
while j < self.numgroups * self.groupsize and i + j < len(data):
bytes = []
for c in data[i + j:i + j + self.groupsize]:
bytes.append(string.hexdigits[ord(c) / 16] + string.hexdigits[ord(c) % 16])
groups.append(bytepad.join(bytes))
j += self.groupsize
lines.append(grouppad.join(groups))
i += self.numgroups * self.groupsize
if not self.rawdata:
self.rawdata = ['']
return '\n'.join(lines)
def write_filter(self, data):
return ''.join(self.rawdata)
# log is another singleton
log = None
class LogBuffer(Buffer):
btype = 'log'
def __new__(cls, *args, **kwargs):
global log
if log is None:
log = object.__new__(LogBuffer, *args, **kwargs)
return log
def __init__(self): Buffer.__init__(self)
def clear(self): log.set_data('', force=True)
def name(self): return '*Log*'
def changed(self): return False
def close(self): global log; log = None
def readonly(self): return True