terminfo junk

--HG--
branch : pmacs2
This commit is contained in:
moculus 2008-11-03 14:30:06 +00:00
parent e9f00278cc
commit 7576ee83e3
7 changed files with 292 additions and 112 deletions

View File

@ -290,6 +290,14 @@ class Buffer(object):
lines = data.split('\n') lines = data.split('\n')
self.set_lines(lines, force) self.set_lines(lines, force)
# append into buffer
def append_lines(self, lines, act=ACT_NORM, force=False):
p = self.get_buffer_end()
self.insert_lines(p, lines, act, force)
def append_string(self, s, act=ACT_NORM, force=False):
lines = s.split("\n")
self.insert_lines(lines, act, force)
# insertion into buffer # insertion into buffer
def insert_lines(self, p, lines, act=ACT_NORM, force=False): def insert_lines(self, p, lines, act=ACT_NORM, force=False):
llen = len(lines) llen = len(lines)
@ -355,6 +363,48 @@ class Buffer(object):
# should not happen # should not happen
raise Exception, "iiiijjjj" raise Exception, "iiiijjjj"
# generic window functionality
def forward(self, p):
if p.x < len(self.lines[p.y]):
return Point(p.x + 1, p.y)
elif p.y < len(self.lines) - 1:
return Point(0, p.y + 1)
else:
return p
def backward(self, p):
if p.x > 0:
return Point(p.x - 1, p.y)
elif p.y > 0:
x = len(self.lines[p.y - 1])
return Point(x, p.y - 1)
else:
return p
def end_of_line(self, p):
return Point(len(self.lines[p.y]), p.y)
def start_of_line(self, p):
return Point(0, p.y)
def previous_line(self, p):
if p.y > 0:
return Point(p.x, p.y - 1)
else:
return p
def next_line(self, p):
if p.y < len(self.lines) - 1:
return Point(p.x, p.y + 1)
else:
return p
def left_delete(self, p):
(x, y) = p.xy()
if x > 0:
self.delete_char(Point(x - 1, y))
elif y > 0:
x = len(self.lines[y - 1])
self.delete_char(Point(x, y - 1))
def right_delete(self, p):
if (p.y < len(self.lines) - 1 or
p.x < len(self.lines[-1])):
self.delete_char(p)
class InterpreterPipeError(Exception): class InterpreterPipeError(Exception):
pass pass

124
buffer/emul.py Normal file
View File

@ -0,0 +1,124 @@
import fcntl, os, select, pty, threading
from buffer import Buffer, ACT_NORM, ACT_NONE
from term import XTerm
from point import Point
# evil evil evil evil evil
class XTermBuffer(Buffer, XTerm):
btype = 'term'
modename = 'pipe'
def __init__(self, cmd, args, name=None):
XTerm.__init__(self)
Buffer.__init__(self)
self._name = name or '*XTerm*'
self._pid, self._pty = pty.fork()
if self._pid == 0:
# child process
os.execve(cmd, [cmd] + args, {'TERM': 'xterm'})
self._lock = threading.Lock()
self._towrite = ''
self._done = False
self._set_nonblock(self._pty)
self._thread = threading.Thread(target=self.pipe_read)
self._thread.setDaemon(True)
self._thread.start()
def _w(self):
return self.windows[0]
def _get_height(self):
return self._w().height
def _get_width(self):
return self._w().width
# TERM STUFF
def _term_insert(self, s):
w = self._w()
p = w.logical_cursor()
if p.x == len(self.lines[p.y]):
w.buffer.insert_string(p, s, act=ACT_NONE, force=True)
else:
w.buffer.overwrite_char(p, s, act=ACT_NONE, force=True)
def term_do_clear(self):
self.set_lines([''], force=True)
self._meta = []
def term_do_backspace(self):
self._w().backward()
def term_do_tab(self):
self._term_insert(' ')
def term_do_newline(self):
w = self._w()
p = w.logical_cursor()
if p.y < len(self.lines) - 1:
w.start_of_line()
w.next_line()
else:
w.end_of_line()
w.buffer.insert_string(w.logical_cursor(), "\n", act=ACT_NONE, force=True)
def term_do_creturn(self):
self._w().start_of_line()
def term_do_delete(self):
self._w().delete_right()
def term_handle_print(self, c):
#self._term_insert('%d ' % ord(c))
self._term_insert(c)
def term_handle_ctl(self, c):
n = ord(c)
if n == 8:
self.term_do_backspace()
elif n == 9:
self.term_do_tab()
elif n == 10:
self.term_do_newline()
elif n == 13:
self.term_do_creturn()
elif n == 27:
self.term_do_esc(c)
elif n == 127:
self.term_do_delete()
else:
self._term_insert('%x' % n)
def term_receive(self, s):
for c in s:
self.term_handle(c)
# BUFFER STUFF
def _set_nonblock(self, fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NDELAY)
def pipe_read(self):
fd = self._pty
try:
while not self._done:
if self._towrite:
ifd, ofd, efd = select.select([fd], [fd], [fd], 0.1)
else:
ifd, ofd, efd = select.select([fd], [], [fd], 0.1)
if ifd:
data = os.read(ifd[0], 1024)
self.term_receive(data)
if ofd:
self._lock.acquire()
n = os.write(ofd[0], self._towrite)
self._towrite = self._towrite[n:]
self._lock.release()
if efd:
raise Exception, "exception is ready: %s" % repr(efd)
except (OSError, TypeError):
pass
os.close(fd)
def pipe_write(self, s):
self._lock.acquire()
self._towrite += s
self._lock.release()
def name(self): return self._name
def changed(self): return False
def readonly(self): return True
def undo(self, move, act): raise Exception, "invalid"
def redo(self, move, act): raise Exception, "invalid"
def reload(self): raise Exception, "invalid"

View File

@ -75,7 +75,7 @@ class PipeBuffer(Buffer):
if ifd: if ifd:
data = os.read(ifd[0], 1024) data = os.read(ifd[0], 1024)
end = self.get_buffer_end() end = self.get_buffer_end()
data = self.term.filter(data) data = self.term.term_filter(data)
self.insert_string(end, data, force=True, act=ACT_NONE) self.insert_string(end, data, force=True, act=ACT_NONE)
if ofd: if ofd:
self._lock.acquire() self._lock.acquire()

View File

@ -78,7 +78,7 @@ class Man(Exec):
errmsg = "man: ok" errmsg = "man: ok"
if output: if output:
xterm = term.XTerm() xterm = term.XTerm()
output = xterm.filter(output) output = xterm.term_filter(output)
switch_to = err or self.show_success switch_to = err or self.show_success
w.application.data_buffer('*Manpage*', output, switch_to=switch_to) w.application.data_buffer('*Manpage*', output, switch_to=switch_to)
w.set_error(errmsg) w.set_error(errmsg)

View File

@ -1,5 +1,5 @@
import code, os, re, string, StringIO, sys, traceback import code, os, re, string, StringIO, sys, traceback
import buffer, buffer.pipe import buffer, buffer.pipe, buffer.emul
import color, completer, lex, method, mode, window import color, completer, lex, method, mode, window
from lex import Grammar, PatternRule, RegionRule from lex import Grammar, PatternRule, RegionRule
from point import Point from point import Point
@ -122,7 +122,8 @@ class OpenShell(Method):
def execute(self, w, **vargs): def execute(self, w, **vargs):
a = w.application a = w.application
if not a.has_buffer_name('*Shell*'): if not a.has_buffer_name('*Shell*'):
b = buffer.pipe.PipeBuffer('/bin/bash', [], name="*Shell*", term='xterm') #b = buffer.pipe.PipeBuffer('/bin/bash', [], name="*Shell*", term='xterm')
b = buffer.emul.XTermBuffer('/bin/bash', [], name="*Shell*")
a.add_buffer(b) a.add_buffer(b)
window.Window(b, a) window.Window(b, a)
b = a.bufferlist.get_buffer_by_name('*Shell*') b = a.bufferlist.get_buffer_by_name('*Shell*')

181
term.py
View File

@ -1,6 +1,15 @@
import os, re, string
from point import Point
def show(c):
if c in string.printable:
return c
else:
return '\\%03o' % ord(c)
class Dumb: class Dumb:
name = 'dumb' name = 'dumb'
def insert(self, s): def _term_insert(self, s):
assert self.i <= len(self.outc) assert self.i <= len(self.outc)
if self.i == len(self.outc): if self.i == len(self.outc):
self.outc.append(s) self.outc.append(s)
@ -8,116 +17,134 @@ class Dumb:
self.outc[self.i] = s self.outc[self.i] = s
self.i += 1 self.i += 1
def do_backspace(self): def term_do_clear(self):
self.outs = ''
self.i = 0
self.outc = []
def term_do_backspace(self):
self.i = max(0, self.i - 1) self.i = max(0, self.i - 1)
def do_tab(self): def term_do_tab(self):
self.insert(' ') self._term_insert(' ')
def do_newline(self): def term_do_newline(self):
self.outs += ''.join(self.outc) + '\n' self.outs += ''.join(self.outc) + '\n'
self.i = 0 self.i = 0
self.outc = [] self.outc = []
def do_careturn(self): def term_do_creturn(self):
self.i = 0 self.i = 0
def do_esc(self, c): def term_do_esc(self, c):
pass pass
def do_delete(self): def term_do_delete(self):
if self.i < len(self.outc): if self.i < len(self.outc):
del self.outc[self.i] del self.outc[self.i]
def handle_ctl(self, c): def term_handle_ctl(self, c):
n = ord(c) n = ord(c)
if n == 8: if n == 8:
self.do_backspace() self.term_do_backspace()
elif n == 9: elif n == 9:
self.do_tab() self.term_do_tab()
elif n == 10: elif n == 10:
self.do_newline() self.term_do_newline()
elif n == 12:
self.term_do_clear()
elif n == 13: elif n == 13:
self.do_careturn() self.term_do_creturn()
elif n == 27: elif n == 27:
self.do_esc(c) self.term_do_esc(c)
elif n == 127: elif n == 127:
self.do_delete() self.term_do_delete()
def handle_print(self, c): def term_handle_print(self, c):
self.insert(c) self._term_insert(c)
def handle_8bit(self, c): def term_handle_8bit(self, c):
pass pass
def handle(self, c): def term_handle(self, c):
n = ord(c) n = ord(c)
assert n >= 0 and n < 256 assert n >= 0 and n < 256
if n <= 27 or n == 127: if n <= 27 or n == 127:
self.handle_ctl(c) self.term_handle_ctl(c)
elif n < 127: elif n < 127:
self.handle_print(c) self.term_handle_print(c)
else: else:
self.handle_8bit(c) self.term_handle_8bit(c)
def term_receive(self, s):
for c in s:
self.term_handle(c)
def filter(self, s): def term_filter(self, s):
self.i = 0 self.i = 0
self.outc = [] self.outc = []
self.outs = "" self.outs = ""
for c in s: self.term_receive(s)
self.handle(c)
return self.outs + ''.join(self.outc) return self.outs + ''.join(self.outc)
class XTerm(Dumb): class XTerm(Dumb):
name = 'xterm' name = 'xterm'
ansi_colors = {
'\033[30m': '[B:d]', comment_re = re.compile('^ *#')
'\033[30;0m': '[B:d]', header_re = re.compile('^[a-zA-Z0-9]+\|')
'\033[30;1m': '[B:d:*]',
'\033[31m': '[r:d]', bool_re = re.compile('^([a-zA-Z0-9]+)$')
'\033[31;0m': '[r:d]', num_re = re.compile('^([a-zA-Z0-9]+)#(.+)$')
'\033[31;1m': '[r:d:*]', str_re = re.compile('^([a-zA-Z0-9]+)=(.+)$')
'\033[32m': '[g:d]',
'\033[32;0m': '[g:d]', style_re = re.compile('^\033[[0-9;]+m')
'\033[32;1m': '[g:d:*]',
'\033[33m': '[y:d]', callbacks = {
'\033[33;0m': '[y:d]', 'clear': 'term_do_clear',
'\033[33;1m': '[y:d:*]', 'home': 'term_do_creturn',
'\033[34m': '[b:d]',
'\033[34;0m': '[b:d]',
'\033[34;1m': '[b:d:*]',
'\033[35m': '[m:d]',
'\033[35;0m': '[m:d]',
'\033[35;1m': '[m:d:*]',
'\033[36m': '[c:d]',
'\033[36;0m': '[c:d]',
'\033[36;1m': '[c:d:*]',
'\033[37m': '[w:d]',
'\033[37;0m': '[w:d]',
'\033[37;1m': '[w:d:*]',
'\033[39m': '[d:d]',
'\033[39;0m': '[d:d]',
'\033[39;1m': '[d:d:*]',
} }
def filter(self, s):
self.meta = []
return Dumb.filter(self, s)
def do_esc(self, c):
self.meta.append(c)
def handle(self, c):
if self.meta:
self.meta.append(c)
if c == 'm':
s = ''.join(self.meta)
if s in self.ansi_colors:
#self.insert(self.ansi_colors[s])
pass
self.meta = []
else:
Dumb.handle(self, c)
class Auto(Dumb):
name = 'auto'
def __init__(self): def __init__(self):
try: self._meta = []
curses.setupterm() f = os.popen('infocmp xterm', 'r')
except: #f = open('xterm.terminfo')
self.sequences = {}
for line in f:
if self.comment_re.match(line) or self.header_re.match(line):
continue
for field in [x.strip() for x in line.split(',')]:
if not field:
continue
elif self.bool_re.match(field) or self.num_re.match(field):
continue
m = self.str_re.match(field)
assert m, "huh?? %r" % field
name, val = m.groups()
if val.startswith('\\E'):
self.sequences[val.replace('\\E', '\033')] = name
f.close()
def term_filter(self, s):
self._meta = []
return Dumb.term_filter(self, s)
def term_do_esc(self, c):
self._meta.append(c)
def term_handle(self, c):
if self._meta:
self._meta.append(c)
s = ''.join(self._meta)
if s in self.sequences:
name = self.sequences[s]
if name in self.callbacks:
f = getattr(self, self.callbacks[name])
f()
else:
self._term_insert('<%s>' % name)
# certain sequences shouldn't get removed immediately
if name in ('home',):
pass pass
else:
self._meta = []
elif self.style_re.match(s):
self._meta = []
elif len(s) > 20:
self._term_insert(''.join([show(c) for c in self._meta]))
self._meta = []
else:
Dumb.term_handle(self, c)
# terminfo junk # terminfo junk
boolean_settings = [ boolean_settings = [

View File

@ -278,35 +278,22 @@ class Window(object):
# moving in buffer # moving in buffer
def forward(self): def forward(self):
cursor = self.logical_cursor() self.cursor = self.buffer.forward(self.logical_cursor())
if cursor.x < len(self.buffer.lines[cursor.y]):
self.cursor = Point(cursor.x + 1, cursor.y)
elif cursor.y < len(self.buffer.lines) -1:
self.cursor = Point(0, cursor.y + 1)
self.assure_visible_cursor() self.assure_visible_cursor()
def backward(self): def backward(self):
cursor = self.logical_cursor() self.cursor = self.buffer.backward(self.logical_cursor())
if cursor.x > 0:
self.cursor = Point(cursor.x - 1, cursor.y)
elif cursor.y > 0:
x = len(self.buffer.lines[cursor.y - 1])
self.cursor = Point(x, cursor.y - 1)
self.assure_visible_cursor() self.assure_visible_cursor()
def end_of_line(self): def end_of_line(self):
cursor = self.logical_cursor() self.cursor = self.buffer.end_of_line(self.logical_cursor())
self.cursor = Point(len(self.buffer.lines[cursor.y]), cursor.y)
self.assure_visible_cursor() self.assure_visible_cursor()
def start_of_line(self): def start_of_line(self):
cursor = self.logical_cursor() self.cursor = self.buffer.start_of_line(self.logical_cursor())
self.cursor = Point(0, cursor.y)
self.assure_visible_cursor() self.assure_visible_cursor()
def previous_line(self): def previous_line(self):
if self.cursor.y > 0: self.cursor = self.buffer.previous_line(self.cursor)
self.cursor = Point(self.cursor.x, self.cursor.y - 1)
self.assure_visible_cursor() self.assure_visible_cursor()
def next_line(self): def next_line(self):
if self.cursor.y < len(self.buffer.lines) - 1: self.cursor = self.buffer.next_line(self.cursor)
self.cursor = Point(self.cursor.x, self.cursor.y + 1)
self.assure_visible_cursor() self.assure_visible_cursor()
# word handling # word handling
@ -511,18 +498,9 @@ class Window(object):
# deletion # deletion
def left_delete(self): def left_delete(self):
(x, y) = self.logical_cursor().xy() self.buffer.left_delete(self.logical_cursor())
if x > 0:
self.buffer.delete_char(Point(x - 1, y))
elif y > 0:
x = len(self.buffer.lines[y - 1])
self.buffer.delete_char(Point(x, y - 1))
def right_delete(self): def right_delete(self):
cursor = self.logical_cursor() self.buffer.right_delete(self.logical_cursor())
if cursor < self.last:
self.buffer.delete_char(cursor)
else:
pass
def delete(self, p1, p2): def delete(self, p1, p2):
self.buffer.delete(p1, p2) self.buffer.delete(p1, p2)