from copy import deepcopy import collections from flask import current_app, request from werkzeug.datastructures import MultiDict, FileStorage from werkzeug import exceptions import flask_restful import decimal import six class Namespace(dict): def __getattr__(self, name): try: return self[name] except KeyError: raise AttributeError(name) def __setattr__(self, name, value): self[name] = value _friendly_location = { u'json': u'the JSON body', u'form': u'the post body', u'args': u'the query string', u'values': u'the post body or the query string', u'headers': u'the HTTP headers', u'cookies': u'the request\'s cookies', u'files': u'an uploaded file', } text_type = lambda x: six.text_type(x) class Argument(object): """ :param name: Either a name or a list of option strings, e.g. foo or -f, --foo. :param default: The value produced if the argument is absent from the request. :param dest: The name of the attribute to be added to the object returned by :meth:`~reqparse.RequestParser.parse_args()`. :param bool required: Whether or not the argument may be omitted (optionals only). :param action: The basic type of action to be taken when this argument is encountered in the request. Valid options are "store" and "append". :param ignore: Whether to ignore cases where the argument fails type conversion :param type: The type to which the request argument should be converted. If a type raises an exception, the message in the error will be returned in the response. Defaults to :class:`unicode` in python2 and :class:`str` in python3. :param location: The attributes of the :class:`flask.Request` object to source the arguments from (ex: headers, args, etc.), can be an iterator. The last item listed takes precedence in the result set. :param choices: A container of the allowable values for the argument. :param help: A brief description of the argument, returned in the response when the argument is invalid. May optionally contain an "{error_msg}" interpolation token, which will be replaced with the text of the error raised by the type converter. :param bool case_sensitive: Whether argument values in the request are case sensitive or not (this will convert all values to lowercase) :param bool store_missing: Whether the arguments default value should be stored if the argument is missing from the request. :param bool trim: If enabled, trims whitespace around the argument. :param bool nullable: If enabled, allows null value in argument. """ def __init__(self, name, default=None, dest=None, required=False, ignore=False, type=text_type, location=('json', 'values',), choices=(), action='store', help=None, operators=('=',), case_sensitive=True, store_missing=True, trim=False, nullable=True): self.name = name self.default = default self.dest = dest self.required = required self.ignore = ignore self.location = location self.type = type self.choices = choices self.action = action self.help = help self.case_sensitive = case_sensitive self.operators = operators self.store_missing = store_missing self.trim = trim self.nullable = nullable def __str__(self): if len(self.choices) > 5: choices = self.choices[0:3] choices.append('...') choices.append(self.choices[-1]) else: choices = self.choices return 'Name: {0}, type: {1}, choices: {2}'.format(self.name, self.type, choices) def __repr__(self): return "{0}('{1}', default={2}, dest={3}, required={4}, ignore={5}, location={6}, " \ "type=\"{7}\", choices={8}, action='{9}', help={10}, case_sensitive={11}, " \ "operators={12}, store_missing={13}, trim={14}, nullable={15})".format( self.__class__.__name__, self.name, self.default, self.dest, self.required, self.ignore, self.location, self.type, self.choices, self.action, self.help, self.case_sensitive, self.operators, self.store_missing, self.trim, self.nullable) def source(self, request): """Pulls values off the request in the provided location :param request: The flask request object to parse arguments from """ if isinstance(self.location, six.string_types): value = getattr(request, self.location, MultiDict()) if callable(value): value = value() if value is not None: return value else: values = MultiDict() for l in self.location: value = getattr(request, l, None) if callable(value): value = value() if value is not None: values.update(value) return values return MultiDict() def convert(self, value, op): # Don't cast None if value is None: if self.nullable: return None else: raise ValueError('Must not be null!') # and check if we're expecting a filestorage and haven't overridden `type` # (required because the below instantiation isn't valid for FileStorage) elif isinstance(value, FileStorage) and self.type == FileStorage: return value try: return self.type(value, self.name, op) except TypeError: try: if self.type is decimal.Decimal: return self.type(str(value), self.name) else: return self.type(value, self.name) except TypeError: return self.type(value) def handle_validation_error(self, error, bundle_errors): """Called when an error is raised while parsing. Aborts the request with a 400 status and an error message :param error: the error that was raised :param bundle_errors: do not abort when first error occurs, return a dict with the name of the argument and the error message to be bundled """ error_str = six.text_type(error) error_msg = self.help.format(error_msg=error_str) if self.help else error_str msg = {self.name: error_msg} if current_app.config.get("BUNDLE_ERRORS", False) or bundle_errors: return error, msg flask_restful.abort(400, message=msg) def parse(self, request, bundle_errors=False): """Parses argument value(s) from the request, converting according to the argument's type. :param request: The flask request object to parse arguments from :param bundle_errors: Do not abort when first error occurs, return a dict with the name of the argument and the error message to be bundled """ source = self.source(request) results = [] # Sentinels _not_found = False _found = True for operator in self.operators: name = self.name + operator.replace("=", "", 1) if name in source: # Account for MultiDict and regular dict if hasattr(source, "getlist"): values = source.getlist(name) else: values = source.get(name) if not (isinstance(values, collections.MutableSequence) and self.action == 'append'): values = [values] for value in values: if hasattr(value, "strip") and self.trim: value = value.strip() if hasattr(value, "lower") and not self.case_sensitive: value = value.lower() if hasattr(self.choices, "__iter__"): self.choices = [choice.lower() for choice in self.choices] try: value = self.convert(value, operator) except Exception as error: if self.ignore: continue return self.handle_validation_error(error, bundle_errors) if self.choices and value not in self.choices: if current_app.config.get("BUNDLE_ERRORS", False) or bundle_errors: return self.handle_validation_error( ValueError(u"{0} is not a valid choice".format( value)), bundle_errors) self.handle_validation_error( ValueError(u"{0} is not a valid choice".format( value)), bundle_errors) if name in request.unparsed_arguments: request.unparsed_arguments.pop(name) results.append(value) if not results and self.required: if isinstance(self.location, six.string_types): error_msg = u"Missing required parameter in {0}".format( _friendly_location.get(self.location, self.location) ) else: friendly_locations = [_friendly_location.get(loc, loc) for loc in self.location] error_msg = u"Missing required parameter in {0}".format( ' or '.join(friendly_locations) ) if current_app.config.get("BUNDLE_ERRORS", False) or bundle_errors: return self.handle_validation_error(ValueError(error_msg), bundle_errors) self.handle_validation_error(ValueError(error_msg), bundle_errors) if not results: if callable(self.default): return self.default(), _not_found else: return self.default, _not_found if self.action == 'append': return results, _found if self.action == 'store' or len(results) == 1: return results[0], _found return results, _found class RequestParser(object): """Enables adding and parsing of multiple arguments in the context of a single request. Ex:: from flask_restful import reqparse parser = reqparse.RequestParser() parser.add_argument('foo') parser.add_argument('int_bar', type=int) args = parser.parse_args() :param bool trim: If enabled, trims whitespace on all arguments in this parser :param bool bundle_errors: If enabled, do not abort when first error occurs, return a dict with the name of the argument and the error message to be bundled and return all validation errors """ def __init__(self, argument_class=Argument, namespace_class=Namespace, trim=False, bundle_errors=False): self.args = [] self.argument_class = argument_class self.namespace_class = namespace_class self.trim = trim self.bundle_errors = bundle_errors def add_argument(self, *args, **kwargs): """Adds an argument to be parsed. Accepts either a single instance of Argument or arguments to be passed into :class:`Argument`'s constructor. See :class:`Argument`'s constructor for documentation on the available options. """ if len(args) == 1 and isinstance(args[0], self.argument_class): self.args.append(args[0]) else: self.args.append(self.argument_class(*args, **kwargs)) # Do not know what other argument classes are out there if self.trim and self.argument_class is Argument: # enable trim for appended element self.args[-1].trim = kwargs.get('trim', self.trim) return self def parse_args(self, req=None, strict=False, http_error_code=400): """Parse all arguments from the provided request and return the results as a Namespace :param req: Can be used to overwrite request from Flask :param strict: if req includes args not in parser, throw 400 BadRequest exception :param http_error_code: use custom error code for `flask_restful.abort()` """ if req is None: req = request namespace = self.namespace_class() # A record of arguments not yet parsed; as each is found # among self.args, it will be popped out req.unparsed_arguments = dict(self.argument_class('').source(req)) if strict else {} errors = {} for arg in self.args: value, found = arg.parse(req, self.bundle_errors) if isinstance(value, ValueError): errors.update(found) found = None if found or arg.store_missing: namespace[arg.dest or arg.name] = value if errors: flask_restful.abort(http_error_code, message=errors) if strict and req.unparsed_arguments: raise exceptions.BadRequest('Unknown arguments: %s' % ', '.join(req.unparsed_arguments.keys())) return namespace def copy(self): """ Creates a copy of this RequestParser with the same set of arguments """ parser_copy = self.__class__(self.argument_class, self.namespace_class) parser_copy.args = deepcopy(self.args) parser_copy.trim = self.trim parser_copy.bundle_errors = self.bundle_errors return parser_copy def replace_argument(self, name, *args, **kwargs): """ Replace the argument matching the given name with a new version. """ new_arg = self.argument_class(name, *args, **kwargs) for index, arg in enumerate(self.args[:]): if new_arg.name == arg.name: del self.args[index] self.args.append(new_arg) break return self def remove_argument(self, name): """ Remove the argument matching the given name. """ for index, arg in enumerate(self.args[:]): if name == arg.name: del self.args[index] break return self