You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
165 lines
5.6 KiB
165 lines
5.6 KiB
6 years ago
|
# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
|
||
|
#
|
||
|
# This module is part of GitDB and is released under
|
||
|
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
|
||
|
"""Test for object db"""
|
||
|
|
||
|
from gitdb.test.lib import (
|
||
|
TestBase,
|
||
|
DummyStream,
|
||
|
make_bytes,
|
||
|
make_object,
|
||
|
fixture_path
|
||
|
)
|
||
|
|
||
|
from gitdb import (
|
||
|
DecompressMemMapReader,
|
||
|
FDCompressedSha1Writer,
|
||
|
LooseObjectDB,
|
||
|
Sha1Writer,
|
||
|
MemoryDB,
|
||
|
IStream,
|
||
|
)
|
||
|
from gitdb.util import hex_to_bin
|
||
|
|
||
|
import zlib
|
||
|
from gitdb.typ import (
|
||
|
str_blob_type
|
||
|
)
|
||
|
|
||
|
import tempfile
|
||
|
import os
|
||
|
from io import BytesIO
|
||
|
|
||
|
|
||
|
class TestStream(TestBase):
|
||
|
|
||
|
"""Test stream classes"""
|
||
|
|
||
|
data_sizes = (15, 10000, 1000 * 1024 + 512)
|
||
|
|
||
|
def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None):
|
||
|
"""Make stream tests - the orig_stream is seekable, allowing it to be
|
||
|
rewound and reused
|
||
|
:param cdata: the data we expect to read from stream, the contents
|
||
|
:param rewind_stream: function called to rewind the stream to make it ready
|
||
|
for reuse"""
|
||
|
ns = 10
|
||
|
assert len(cdata) > ns - 1, "Data must be larger than %i, was %i" % (ns, len(cdata))
|
||
|
|
||
|
# read in small steps
|
||
|
ss = len(cdata) // ns
|
||
|
for i in range(ns):
|
||
|
data = stream.read(ss)
|
||
|
chunk = cdata[i * ss:(i + 1) * ss]
|
||
|
assert data == chunk
|
||
|
# END for each step
|
||
|
rest = stream.read()
|
||
|
if rest:
|
||
|
assert rest == cdata[-len(rest):]
|
||
|
# END handle rest
|
||
|
|
||
|
if isinstance(stream, DecompressMemMapReader):
|
||
|
assert len(stream.data()) == stream.compressed_bytes_read()
|
||
|
# END handle special type
|
||
|
|
||
|
rewind_stream(stream)
|
||
|
|
||
|
# read everything
|
||
|
rdata = stream.read()
|
||
|
assert rdata == cdata
|
||
|
|
||
|
if isinstance(stream, DecompressMemMapReader):
|
||
|
assert len(stream.data()) == stream.compressed_bytes_read()
|
||
|
# END handle special type
|
||
|
|
||
|
def test_decompress_reader(self):
|
||
|
for close_on_deletion in range(2):
|
||
|
for with_size in range(2):
|
||
|
for ds in self.data_sizes:
|
||
|
cdata = make_bytes(ds, randomize=False)
|
||
|
|
||
|
# zdata = zipped actual data
|
||
|
# cdata = original content data
|
||
|
|
||
|
# create reader
|
||
|
if with_size:
|
||
|
# need object data
|
||
|
zdata = zlib.compress(make_object(str_blob_type, cdata))
|
||
|
typ, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion)
|
||
|
assert size == len(cdata)
|
||
|
assert typ == str_blob_type
|
||
|
|
||
|
# even if we don't set the size, it will be set automatically on first read
|
||
|
test_reader = DecompressMemMapReader(zdata, close_on_deletion=False)
|
||
|
assert test_reader._s == len(cdata)
|
||
|
else:
|
||
|
# here we need content data
|
||
|
zdata = zlib.compress(cdata)
|
||
|
reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata))
|
||
|
assert reader._s == len(cdata)
|
||
|
# END get reader
|
||
|
|
||
|
self._assert_stream_reader(reader, cdata, lambda r: r.seek(0))
|
||
|
|
||
|
# put in a dummy stream for closing
|
||
|
dummy = DummyStream()
|
||
|
reader._m = dummy
|
||
|
|
||
|
assert not dummy.closed
|
||
|
del(reader)
|
||
|
assert dummy.closed == close_on_deletion
|
||
|
# END for each datasize
|
||
|
# END whether size should be used
|
||
|
# END whether stream should be closed when deleted
|
||
|
|
||
|
def test_sha_writer(self):
|
||
|
writer = Sha1Writer()
|
||
|
assert 2 == writer.write("hi".encode("ascii"))
|
||
|
assert len(writer.sha(as_hex=1)) == 40
|
||
|
assert len(writer.sha(as_hex=0)) == 20
|
||
|
|
||
|
# make sure it does something ;)
|
||
|
prev_sha = writer.sha()
|
||
|
writer.write("hi again".encode("ascii"))
|
||
|
assert writer.sha() != prev_sha
|
||
|
|
||
|
def test_compressed_writer(self):
|
||
|
for ds in self.data_sizes:
|
||
|
fd, path = tempfile.mkstemp()
|
||
|
ostream = FDCompressedSha1Writer(fd)
|
||
|
data = make_bytes(ds, randomize=False)
|
||
|
|
||
|
# for now, just a single write, code doesn't care about chunking
|
||
|
assert len(data) == ostream.write(data)
|
||
|
ostream.close()
|
||
|
|
||
|
# its closed already
|
||
|
self.failUnlessRaises(OSError, os.close, fd)
|
||
|
|
||
|
# read everything back, compare to data we zip
|
||
|
fd = os.open(path, os.O_RDONLY | getattr(os, 'O_BINARY', 0))
|
||
|
written_data = os.read(fd, os.path.getsize(path))
|
||
|
assert len(written_data) == os.path.getsize(path)
|
||
|
os.close(fd)
|
||
|
assert written_data == zlib.compress(data, 1) # best speed
|
||
|
|
||
|
os.remove(path)
|
||
|
# END for each os
|
||
|
|
||
|
def test_decompress_reader_special_case(self):
|
||
|
odb = LooseObjectDB(fixture_path('objects'))
|
||
|
mdb = MemoryDB()
|
||
|
for sha in (b'888401851f15db0eed60eb1bc29dec5ddcace911',
|
||
|
b'7bb839852ed5e3a069966281bb08d50012fb309b',):
|
||
|
ostream = odb.stream(hex_to_bin(sha))
|
||
|
|
||
|
# if there is a bug, we will be missing one byte exactly !
|
||
|
data = ostream.read()
|
||
|
assert len(data) == ostream.size
|
||
|
|
||
|
# Putting it back in should yield nothing new - after all, we have
|
||
|
dump = mdb.store(IStream(ostream.type, ostream.size, BytesIO(data)))
|
||
|
assert dump.hexsha == sha
|
||
|
# end for each loose object sha to test
|