pmacs3/method/hg.py

108 lines
3.9 KiB
Python

import os
import re
from subprocess import Popen, PIPE, STDOUT
from method import Method, Argument
from method.vc import VcBlame
try:
from mercurial import hg, ui, node
from mercurial import commands as hgc
has_hg = True
except ImportError:
has_hg = False
class HgBase(object):
def _hg_check(self, w):
if not has_hg:
w.set_error("Mercurial is not installed")
return False
elif not hasattr(w.buffer, 'path'):
w.set_error("Buffer has no corresponding file")
return False
else:
return True
def _hg_init(self, **kwargs):
ui_imp = ui.ui(**kwargs)
ui_imp.pushbuffer()
repo = hg.repository(ui=ui_imp, path='.'.encode('UTF-8'))
return ui_imp, repo
class HgBlame(VcBlame, HgBase):
"""Show buffer annotated with hg metadata"""
num_fields = 3
# user, rev, [changeset], date, content
line_re = re.compile(r'^ *([^ ]+) +(\d+) +[^ ]+ +(\d{4}-\d{2}-\d{2}): (.*)\n$')
prefix_fmt = '[g:d:*]%*s [c:d:*]%*s [b:d:*]%*s[d:d:*]'
_is_method = True
def _open_pipe(self, w, **vargs):
cmd = ("hg", 'blame', '-cnudq', w.buffer.path)
return Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
class HgLog(Method, HgBase):
"""Show hg log for this buffer"""
def _execute(self, w, **vargs):
if not self._hg_check(w):
return
ui_imp, repo = self._hg_init()
hgc.log(ui_imp, repo)
output = ui_imp.popbuffer().decode('UTF-8')
w.application.data_buffer('*Log*', output, switch_to=True)
class HgInfo(Method, HgBase):
"""Get some basic info from Mercurial about the current buffer"""
def _execute(self, w, **vargs):
if not self._hg_check(w):
return
base = os.path.basename(w.buffer.path)
ui_imp, repo = self._hg_init()
path = w.buffer.path.replace(os.getcwd() + '/', '')
#l = repo.file(path.encode('UTF-8'))
status = 'Unknown'
#rev = l.linkrev(l.rev(l.tip()))
fc = repo.filectx(path.encode('UTF-8'), b'tip')
user = ui_imp.shortuser(fc.user()).decode('UTF-8')
rev = node.hex(fc.node())
ctx = repo[fc.node()]
changeid = node.hex(ctx.node()[:6]).decode('UTF-8')
#user = ui_imp.shortuser(ctx.user())
msg = '%s %s %s:%s [%s]' % (base, status, rev, changeid, user)
w.set_error(msg)
class HgDiff(Method, HgBase):
"""Diff the current file with the version in Mercurial"""
def _get_revs(self, w, **vargs):
return []
def _execute(self, w, **vargs):
if not self._hg_check(w):
return
ui_imp, repo = self._hg_init()
revs = self._get_revs(w, **vargs)
try:
c = w.application.config
opts = {
'ignore_all_space': c.get('hg.diff.ignore_app_space'),
'ignore_space_change': c.get('hg.diff.ignore_space_change'),
'ignore-blank-lines': c.get('hg.diff.ignore_blank_lines'),
'unified': c.get('hg.diff.unified'),
}
hgc.diff(ui_imp, repo, w.buffer.path, rev=revs, **opts)
except Exception as e:
w.set_error(str(e))
return
s = ''.join(ui_imp.popbuffer())
w.application.data_buffer("*Diff*", s, switch_to=True, modename='diff')
class HgDiff2(HgDiff):
"""Diff the current file with a specific revision in Mercurial"""
args = [Argument("revision", type=type(""), prompt="Revision: ")]
def _get_revs(self, w, **vargs):
return [vargs['revision']]
class HgDiff3(HgDiff):
"""Diff the current file with a specific revision in Mercurial"""
args = [Argument("revision1", type=type(""), prompt="First Revision: "),
Argument("revision2", type=type(""), prompt="Second Revision: ")]
def _get_revs(self, w, **vargs):
return [vargs['revision1'], vargs['revision2']]