-
Notifications
You must be signed in to change notification settings - Fork 0
/
git.py
198 lines (165 loc) · 7.04 KB
/
git.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# Copyright 2024 Google LLC
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/env python3
import os
import subprocess
import tempfile
# Version selection for checkout and cat_file
VERSION_BASE = 1
VERSION_OURS = 2
VERSION_THEIRS = 3
VERSIONS = (VERSION_BASE, VERSION_OURS, VERSION_THEIRS)
# File state information
STATE_MODIFIED = 'U'
STATE_ADDED = 'A'
STATE_DELETED = 'D'
def get_conflicts():
"""Returns a list of conflicting files for the current repo.
Each file is returned as a tuple: (file, state), with state being two
characters (ours and theirs) from one of the STATE_ states.
"""
conflicts = iter(subprocess.check_output(['git', 'status', '-z'],
cwd=repo_path()).split(b'\0'))
for path in conflicts:
path = path.decode()
# If it's a rename, consume an extra file
state = path[0:2]
if 'R' in state:
next(conflicts)
continue
if 'U' in state or state in ('AA', 'DD'):
yield (path[3:], state)
def clone_tmp(commit=None):
"""Does a shared clone into a temporary directory. No checkout is performed
unless a commit is specified for checkout.
Returns the temporary directory. You are responsible for deleting it."""
tempdir = tempfile.mkdtemp()
subprocess.check_call(['git', 'clone', '-qns', '.', tempdir])
if commit:
subprocess.check_call(['git', 'checkout', '-q', commit], cwd=tempdir)
return tempdir
def checkout(path, version):
"""Runs git checkout on a relative path, with either --ours or --theirs."""
if version == VERSION_BASE:
return False
return subprocess.call(['git', 'checkout', '-q',
'--ours' if version == VERSION_OURS else '--theirs', '--', path],
cwd=repo_path()) == 0
def cat(path, ref=':', relative=False):
"""Returns a file for a path (and ref, if provided)."""
return subprocess.Popen(['git', 'show', ':'.join((ref,
path if not relative or path.startswith('/') else './%s' % path))],
cwd=None if relative else repo_path(),
stdout=subprocess.PIPE).stdout
def cat_files(path, state=STATE_MODIFIED*2):
"""Returns a tuple of three files (base, ours, theirs) for a path.
Substitutes in None if a tree does not contain the file (based on state)
You can avoid specifying state if you know for a fact all files exist.
"""
# Maps VERSION to whether the file exists in that index, based on state
exists = (None,
STATE_ADDED not in state,
state[0] != STATE_DELETED,
state[1] != STATE_DELETED)
return tuple(cat(path, ':%s' % str(version)) if exists[version] else None
for version in VERSIONS)
def add(path):
"""Runs git add on the relative path."""
return subprocess.call(['git', 'add', '--', path], cwd=repo_path()) == 0
def rev_parse(revs):
"""Parses a revspec for a list of revisions to be considered."""
revlist = subprocess.check_output(
['git', 'rev-parse', '--revs-only', revs], universal_newlines=True)
return [rev.lstrip('^') for rev in revlist.splitlines()]
# A cache for repo_path
__repopath = None
def repo_path(path=''):
"""Returns the absolute path for a file in the repo."""
global __repopath
if __repopath is None:
__repopath = subprocess.check_output(
['git', 'rev-parse', '--show-toplevel'],
universal_newlines=True).rstrip()
return os.path.join(os.fsdecode(__repopath), path)
def is_in_repo(path):
"""Return true if the specified path is within the current repo."""
# We use abspath instead of realpath since we don't support having symlinks in
# repos (since it breaks under Windows). This would only affect the outcome of
# this function if someone had a symlink in a project repo pointing to a file
# or directory outside of the repo.
real_repo = os.path.abspath(repo_path())
real_path = os.path.abspath(path)
try:
return os.path.commonpath((real_repo, real_path)) == real_repo
except ValueError:
return False
def listdir(path_or_tuple, githash=None):
"""Behaves like os.listdir, but if path_or_tuple is a tuple of (path, githash)
(or if githash is specified separately), will query git instead for non-abs
paths. For absolute paths, will just query the filesystem.
"""
if isinstance(path_or_tuple, tuple):
path_or_tuple, githash = path_or_tuple
if not githash or not is_in_repo(path_or_tuple):
return os.listdir(path_or_tuple)
return [os.path.basename(path)
for path in ls_tree(path_or_tuple + '/.', githash,
full_tree=False, recurse=False)]
def isdir(path_or_tuple, githash=None):
"""Behaves like os.path.isdir and handles git hashes as in listdir."""
if isinstance(path_or_tuple, tuple):
path_or_tuple, githash = path_or_tuple
if not githash or not is_in_repo(path_or_tuple):
return os.path.isdir(path_or_tuple)
return True if ls_tree(path_or_tuple + '/.', githash,
full_tree=False, recurse=False) else False
def isfile(path_or_tuple, githash=None):
"""Behaves like os.path.isfile and handles git hashes as in listdir."""
if isinstance(path_or_tuple, tuple):
path_or_tuple, githash = path_or_tuple
if not githash or not is_in_repo(path_or_tuple):
return os.path.isfile(path_or_tuple)
if isdir(path_or_tuple, githash):
return False
return True if ls_tree(path_or_tuple, githash,
full_tree=False, recurse=False) else False
def open_rb(path_or_tuple, githash=None):
"""Behaves like open(x, 'rb') and handles git hashes as in listdir."""
if isinstance(path_or_tuple, tuple):
path_or_tuple, githash = path_or_tuple
if not githash or not is_in_repo(path_or_tuple):
return open(path_or_tuple, 'rb')
return cat(path_or_tuple, githash, relative=True)
# A cache for is_rebase
__isrebase = None
def is_rebase():
"""Returns True if the git repo is in the middle of a rebase (vs a merge)."""
global __isrebase
if __isrebase is None:
__isrebase = False
for d in ('rebase-merge', 'rebase-apply'):
if os.path.isdir(subprocess.check_output(
['git', 'rev-parse', '--git-path', d],
universal_newlines=True).rstrip()):
__isrebase = True
return __isrebase
# A cache for ls_tree, indexed by (path, commit)
__lscache = {}
def ls_tree(path, commit='HEAD', full_tree=True, recurse=True):
"""Returns a list of all the files under a given path and commit."""
if (path, commit, full_tree) not in __lscache:
__lscache[(path, commit, full_tree)] = subprocess.check_output([
'git', 'ls-tree', '--name-only'
] + ['-r']*recurse + ['--full-tree']*full_tree + [
commit, path
], cwd=repo_path() if full_tree else None,
universal_newlines=True).splitlines()
return __lscache[(path, commit, full_tree)]