You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
bazarr/libs/requests_toolbelt/multipart/decoder.py

157 lines
4.7 KiB

# -*- coding: utf-8 -*-
"""
requests_toolbelt.multipart.decoder
===================================
This holds all the implementation details of the MultipartDecoder
"""
import sys
import email.parser
from .encoder import encode_with
from requests.structures import CaseInsensitiveDict
def _split_on_find(content, bound):
point = content.find(bound)
return content[:point], content[point + len(bound):]
class ImproperBodyPartContentException(Exception):
pass
class NonMultipartContentTypeException(Exception):
pass
def _header_parser(string, encoding):
major = sys.version_info[0]
if major == 3:
string = string.decode(encoding)
headers = email.parser.HeaderParser().parsestr(string).items()
return (
(encode_with(k, encoding), encode_with(v, encoding))
for k, v in headers
)
class BodyPart(object):
"""
The ``BodyPart`` object is a ``Response``-like interface to an individual
subpart of a multipart response. It is expected that these will
generally be created by objects of the ``MultipartDecoder`` class.
Like ``Response``, there is a ``CaseInsensitiveDict`` object named headers,
``content`` to access bytes, ``text`` to access unicode, and ``encoding``
to access the unicode codec.
"""
def __init__(self, content, encoding):
self.encoding = encoding
headers = {}
# Split into header section (if any) and the content
if b'\r\n\r\n' in content:
first, self.content = _split_on_find(content, b'\r\n\r\n')
if first != b'':
headers = _header_parser(first.lstrip(), encoding)
else:
raise ImproperBodyPartContentException(
'content does not contain CR-LF-CR-LF'
)
self.headers = CaseInsensitiveDict(headers)
@property
def text(self):
"""Content of the ``BodyPart`` in unicode."""
return self.content.decode(self.encoding)
class MultipartDecoder(object):
"""
The ``MultipartDecoder`` object parses the multipart payload of
a bytestring into a tuple of ``Response``-like ``BodyPart`` objects.
The basic usage is::
import requests
from requests_toolbelt import MultipartDecoder
response = request.get(url)
decoder = MultipartDecoder.from_response(response)
for part in decoder.parts:
print(part.headers['content-type'])
If the multipart content is not from a response, basic usage is::
from requests_toolbelt import MultipartDecoder
decoder = MultipartDecoder(content, content_type)
for part in decoder.parts:
print(part.headers['content-type'])
For both these usages, there is an optional ``encoding`` parameter. This is
a string, which is the name of the unicode codec to use (default is
``'utf-8'``).
"""
def __init__(self, content, content_type, encoding='utf-8'):
#: Original Content-Type header
self.content_type = content_type
#: Response body encoding
self.encoding = encoding
#: Parsed parts of the multipart response body
self.parts = tuple()
self._find_boundary()
self._parse_body(content)
def _find_boundary(self):
ct_info = tuple(x.strip() for x in self.content_type.split(';'))
mimetype = ct_info[0]
if mimetype.split('/')[0].lower() != 'multipart':
raise NonMultipartContentTypeException(
"Unexpected mimetype in content-type: '{}'".format(mimetype)
)
for item in ct_info[1:]:
attr, value = _split_on_find(
item,
'='
)
if attr.lower() == 'boundary':
self.boundary = encode_with(value.strip('"'), self.encoding)
@staticmethod
def _fix_first_part(part, boundary_marker):
bm_len = len(boundary_marker)
if boundary_marker == part[:bm_len]:
return part[bm_len:]
else:
return part
def _parse_body(self, content):
boundary = b''.join((b'--', self.boundary))
def body_part(part):
fixed = MultipartDecoder._fix_first_part(part, boundary)
return BodyPart(fixed, self.encoding)
def test_part(part):
return (part != b'' and
part != b'\r\n' and
part[:4] != b'--\r\n' and
part != b'--')
parts = content.split(b''.join((b'\r\n', boundary)))
self.parts = tuple(body_part(x) for x in parts if test_part(x))
@classmethod
def from_response(cls, response, encoding='utf-8'):
content = response.content
content_type = response.headers.get('content-type', None)
return cls(content, content_type, encoding)