288 lines
10 KiB
288 lines
10 KiB
6 years ago
|
# -*- coding: utf-8 -*-
|
||
|
# test_git.py
|
||
|
# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
|
||
|
#
|
||
|
# This module is part of GitPython and is released under
|
||
|
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
|
||
|
import os
|
||
|
import subprocess
|
||
|
import sys
|
||
|
|
||
|
from git import (
|
||
|
Git,
|
||
|
refresh,
|
||
|
GitCommandError,
|
||
|
GitCommandNotFound,
|
||
|
Repo,
|
||
|
cmd
|
||
|
)
|
||
|
from git.compat import PY3, is_darwin
|
||
|
from git.test.lib import (
|
||
|
TestBase,
|
||
|
patch,
|
||
|
raises,
|
||
|
assert_equal,
|
||
|
assert_true,
|
||
|
assert_match,
|
||
|
fixture_path
|
||
|
)
|
||
|
from git.test.lib import with_rw_directory
|
||
|
from git.util import finalize_process
|
||
|
|
||
|
import os.path as osp
|
||
|
|
||
|
|
||
|
try:
|
||
|
from unittest import mock
|
||
|
except ImportError:
|
||
|
import mock
|
||
|
|
||
|
from git.compat import is_win
|
||
|
|
||
|
|
||
|
class TestGit(TestBase):
|
||
|
|
||
|
@classmethod
|
||
|
def setUpClass(cls):
|
||
|
super(TestGit, cls).setUpClass()
|
||
|
cls.git = Git(cls.rorepo.working_dir)
|
||
|
|
||
|
def tearDown(self):
|
||
|
import gc
|
||
|
gc.collect()
|
||
|
|
||
|
@patch.object(Git, 'execute')
|
||
|
def test_call_process_calls_execute(self, git):
|
||
|
git.return_value = ''
|
||
|
self.git.version()
|
||
|
assert_true(git.called)
|
||
|
assert_equal(git.call_args, ((['git', 'version'],), {}))
|
||
|
|
||
|
def test_call_unpack_args_unicode(self):
|
||
|
args = Git._Git__unpack_args(u'Unicode€™')
|
||
|
if PY3:
|
||
|
mangled_value = 'Unicode\u20ac\u2122'
|
||
|
else:
|
||
|
mangled_value = 'Unicode\xe2\x82\xac\xe2\x84\xa2'
|
||
|
assert_equal(args, [mangled_value])
|
||
|
|
||
|
def test_call_unpack_args(self):
|
||
|
args = Git._Git__unpack_args(['git', 'log', '--', u'Unicode€™'])
|
||
|
if PY3:
|
||
|
mangled_value = 'Unicode\u20ac\u2122'
|
||
|
else:
|
||
|
mangled_value = 'Unicode\xe2\x82\xac\xe2\x84\xa2'
|
||
|
assert_equal(args, ['git', 'log', '--', mangled_value])
|
||
|
|
||
|
@raises(GitCommandError)
|
||
|
def test_it_raises_errors(self):
|
||
|
self.git.this_does_not_exist()
|
||
|
|
||
|
def test_it_transforms_kwargs_into_git_command_arguments(self):
|
||
|
assert_equal(["-s"], self.git.transform_kwargs(**{'s': True}))
|
||
|
assert_equal(["-s", "5"], self.git.transform_kwargs(**{'s': 5}))
|
||
|
|
||
|
assert_equal(["--max-count"], self.git.transform_kwargs(**{'max_count': True}))
|
||
|
assert_equal(["--max-count=5"], self.git.transform_kwargs(**{'max_count': 5}))
|
||
|
|
||
|
# Multiple args are supported by using lists/tuples
|
||
|
assert_equal(["-L", "1-3", "-L", "12-18"], self.git.transform_kwargs(**{'L': ('1-3', '12-18')}))
|
||
|
assert_equal(["-C", "-C"], self.git.transform_kwargs(**{'C': [True, True]}))
|
||
|
|
||
|
# order is undefined
|
||
|
res = self.git.transform_kwargs(**{'s': True, 't': True})
|
||
|
self.assertEqual({'-s', '-t'}, set(res))
|
||
|
|
||
|
def test_it_executes_git_to_shell_and_returns_result(self):
|
||
|
assert_match(r'^git version [\d\.]{2}.*$', self.git.execute(["git", "version"]))
|
||
|
|
||
|
def test_it_accepts_stdin(self):
|
||
|
filename = fixture_path("cat_file_blob")
|
||
|
with open(filename, 'r') as fh:
|
||
|
assert_equal("70c379b63ffa0795fdbfbc128e5a2818397b7ef8",
|
||
|
self.git.hash_object(istream=fh, stdin=True))
|
||
|
|
||
|
@patch.object(Git, 'execute')
|
||
|
def test_it_ignores_false_kwargs(self, git):
|
||
|
# this_should_not_be_ignored=False implies it *should* be ignored
|
||
|
self.git.version(pass_this_kwarg=False)
|
||
|
assert_true("pass_this_kwarg" not in git.call_args[1])
|
||
|
|
||
|
def test_it_accepts_environment_variables(self):
|
||
|
filename = fixture_path("ls_tree_empty")
|
||
|
with open(filename, 'r') as fh:
|
||
|
tree = self.git.mktree(istream=fh)
|
||
|
env = {
|
||
|
'GIT_AUTHOR_NAME': 'Author Name',
|
||
|
'GIT_AUTHOR_EMAIL': 'author@example.com',
|
||
|
'GIT_AUTHOR_DATE': '1400000000+0000',
|
||
|
'GIT_COMMITTER_NAME': 'Committer Name',
|
||
|
'GIT_COMMITTER_EMAIL': 'committer@example.com',
|
||
|
'GIT_COMMITTER_DATE': '1500000000+0000',
|
||
|
}
|
||
|
commit = self.git.commit_tree(tree, m='message', env=env)
|
||
|
assert_equal(commit, '4cfd6b0314682d5a58f80be39850bad1640e9241')
|
||
|
|
||
|
def test_persistent_cat_file_command(self):
|
||
|
# read header only
|
||
|
import subprocess as sp
|
||
|
hexsha = "b2339455342180c7cc1e9bba3e9f181f7baa5167"
|
||
|
g = self.git.cat_file(batch_check=True, istream=sp.PIPE, as_process=True)
|
||
|
g.stdin.write(b"b2339455342180c7cc1e9bba3e9f181f7baa5167\n")
|
||
|
g.stdin.flush()
|
||
|
obj_info = g.stdout.readline()
|
||
|
|
||
|
# read header + data
|
||
|
g = self.git.cat_file(batch=True, istream=sp.PIPE, as_process=True)
|
||
|
g.stdin.write(b"b2339455342180c7cc1e9bba3e9f181f7baa5167\n")
|
||
|
g.stdin.flush()
|
||
|
obj_info_two = g.stdout.readline()
|
||
|
self.assertEqual(obj_info, obj_info_two)
|
||
|
|
||
|
# read data - have to read it in one large chunk
|
||
|
size = int(obj_info.split()[2])
|
||
|
g.stdout.read(size)
|
||
|
g.stdout.read(1)
|
||
|
|
||
|
# now we should be able to read a new object
|
||
|
g.stdin.write(b"b2339455342180c7cc1e9bba3e9f181f7baa5167\n")
|
||
|
g.stdin.flush()
|
||
|
self.assertEqual(g.stdout.readline(), obj_info)
|
||
|
|
||
|
# same can be achieved using the respective command functions
|
||
|
hexsha, typename, size = self.git.get_object_header(hexsha)
|
||
|
hexsha, typename_two, size_two, data = self.git.get_object_data(hexsha) # @UnusedVariable
|
||
|
self.assertEqual(typename, typename_two)
|
||
|
self.assertEqual(size, size_two)
|
||
|
|
||
|
def test_version(self):
|
||
|
v = self.git.version_info
|
||
|
self.assertIsInstance(v, tuple)
|
||
|
for n in v:
|
||
|
self.assertIsInstance(n, int)
|
||
|
# END verify number types
|
||
|
|
||
|
def test_cmd_override(self):
|
||
|
prev_cmd = self.git.GIT_PYTHON_GIT_EXECUTABLE
|
||
|
exc = GitCommandNotFound
|
||
|
try:
|
||
|
# set it to something that doens't exist, assure it raises
|
||
|
type(self.git).GIT_PYTHON_GIT_EXECUTABLE = osp.join(
|
||
|
"some", "path", "which", "doesn't", "exist", "gitbinary")
|
||
|
self.failUnlessRaises(exc, self.git.version)
|
||
|
finally:
|
||
|
type(self.git).GIT_PYTHON_GIT_EXECUTABLE = prev_cmd
|
||
|
# END undo adjustment
|
||
|
|
||
|
def test_refresh(self):
|
||
|
# test a bad git path refresh
|
||
|
self.assertRaises(GitCommandNotFound, refresh, "yada")
|
||
|
|
||
|
# test a good path refresh
|
||
|
which_cmd = "where" if is_win else "which"
|
||
|
path = os.popen("{0} git".format(which_cmd)).read().strip().split('\n')[0]
|
||
|
refresh(path)
|
||
|
|
||
|
def test_options_are_passed_to_git(self):
|
||
|
# This work because any command after git --version is ignored
|
||
|
git_version = self.git(version=True).NoOp()
|
||
|
git_command_version = self.git.version()
|
||
|
self.assertEquals(git_version, git_command_version)
|
||
|
|
||
|
def test_persistent_options(self):
|
||
|
git_command_version = self.git.version()
|
||
|
# analog to test_options_are_passed_to_git
|
||
|
self.git.set_persistent_git_options(version=True)
|
||
|
git_version = self.git.NoOp()
|
||
|
self.assertEquals(git_version, git_command_version)
|
||
|
# subsequent calls keep this option:
|
||
|
git_version_2 = self.git.NoOp()
|
||
|
self.assertEquals(git_version_2, git_command_version)
|
||
|
|
||
|
# reset to empty:
|
||
|
self.git.set_persistent_git_options()
|
||
|
self.assertRaises(GitCommandError, self.git.NoOp)
|
||
|
|
||
|
def test_single_char_git_options_are_passed_to_git(self):
|
||
|
input_value = 'TestValue'
|
||
|
output_value = self.git(c='user.name=%s' % input_value).config('--get', 'user.name')
|
||
|
self.assertEquals(input_value, output_value)
|
||
|
|
||
|
def test_change_to_transform_kwargs_does_not_break_command_options(self):
|
||
|
self.git.log(n=1)
|
||
|
|
||
|
def test_insert_after_kwarg_raises(self):
|
||
|
# This isn't a complete add command, which doesn't matter here
|
||
|
self.failUnlessRaises(ValueError, self.git.remote, 'add', insert_kwargs_after='foo')
|
||
|
|
||
|
def test_env_vars_passed_to_git(self):
|
||
|
editor = 'non_existent_editor'
|
||
|
with mock.patch.dict('os.environ', {'GIT_EDITOR': editor}): # @UndefinedVariable
|
||
|
self.assertEqual(self.git.var("GIT_EDITOR"), editor)
|
||
|
|
||
|
@with_rw_directory
|
||
|
def test_environment(self, rw_dir):
|
||
|
# sanity check
|
||
|
self.assertEqual(self.git.environment(), {})
|
||
|
|
||
|
# make sure the context manager works and cleans up after itself
|
||
|
with self.git.custom_environment(PWD='/tmp'):
|
||
|
self.assertEqual(self.git.environment(), {'PWD': '/tmp'})
|
||
|
|
||
|
self.assertEqual(self.git.environment(), {})
|
||
|
|
||
|
old_env = self.git.update_environment(VARKEY='VARVALUE')
|
||
|
# The returned dict can be used to revert the change, hence why it has
|
||
|
# an entry with value 'None'.
|
||
|
self.assertEqual(old_env, {'VARKEY': None})
|
||
|
self.assertEqual(self.git.environment(), {'VARKEY': 'VARVALUE'})
|
||
|
|
||
|
new_env = self.git.update_environment(**old_env)
|
||
|
self.assertEqual(new_env, {'VARKEY': 'VARVALUE'})
|
||
|
self.assertEqual(self.git.environment(), {})
|
||
|
|
||
|
path = osp.join(rw_dir, 'failing-script.sh')
|
||
|
with open(path, 'wt') as stream:
|
||
|
stream.write("#!/usr/bin/env sh\n"
|
||
|
"echo FOO\n")
|
||
|
os.chmod(path, 0o777)
|
||
|
|
||
|
rw_repo = Repo.init(osp.join(rw_dir, 'repo'))
|
||
|
remote = rw_repo.create_remote('ssh-origin', "ssh://git@server/foo")
|
||
|
|
||
|
with rw_repo.git.custom_environment(GIT_SSH=path):
|
||
|
try:
|
||
|
remote.fetch()
|
||
|
except GitCommandError as err:
|
||
|
if sys.version_info[0] < 3 and is_darwin:
|
||
|
self.assertIn('ssh-orig, ' in str(err))
|
||
|
self.assertEqual(err.status, 128)
|
||
|
else:
|
||
|
self.assertIn('FOO', str(err))
|
||
|
|
||
|
def test_handle_process_output(self):
|
||
|
from git.cmd import handle_process_output
|
||
|
|
||
|
line_count = 5002
|
||
|
count = [None, 0, 0]
|
||
|
|
||
|
def counter_stdout(line):
|
||
|
count[1] += 1
|
||
|
|
||
|
def counter_stderr(line):
|
||
|
count[2] += 1
|
||
|
|
||
|
cmdline = [sys.executable, fixture_path('cat_file.py'), str(fixture_path('issue-301_stderr'))]
|
||
|
proc = subprocess.Popen(cmdline,
|
||
|
stdin=None,
|
||
|
stdout=subprocess.PIPE,
|
||
|
stderr=subprocess.PIPE,
|
||
|
shell=False,
|
||
|
creationflags=cmd.PROC_CREATIONFLAGS,
|
||
|
)
|
||
|
|
||
|
handle_process_output(proc, counter_stdout, counter_stderr, finalize_process)
|
||
|
|
||
|
self.assertEqual(count[1], line_count)
|
||
|
self.assertEqual(count[2], line_count)
|