""" A WSGI and HTTP server for use **during development only**. This
server is convenient to use , but is not designed to be particularly
stable , secure , or efficient . Use a dedicate WSGI server and HTTP
server when deploying to production .
It provides features like interactive debugging and code reloading . Use
` ` run_simple ` ` to start the server . Put this in a ` ` run . py ` ` script :
. . code - block : : python
from myapp import create_app
from werkzeug import run_simple
"""
from __future__ import annotations
import errno
import io
import os
import selectors
import socket
import socketserver
import sys
import typing as t
from datetime import datetime as dt
from datetime import timedelta
from datetime import timezone
from http . server import BaseHTTPRequestHandler
from http . server import HTTPServer
from urllib . parse import unquote
from urllib . parse import urlsplit
from . _internal import _log
from . _internal import _wsgi_encoding_dance
from . exceptions import InternalServerError
from . urls import uri_to_iri
try :
import ssl
except ImportError :
class _SslDummy :
def __getattr__ ( self , name : str ) - > t . Any :
raise RuntimeError ( # noqa: B904
" SSL is unavailable because this Python runtime was not "
" compiled with SSL/TLS support. "
)
ssl = _SslDummy ( ) # type: ignore
_log_add_style = True
if os . name == " nt " :
try :
__import__ ( " colorama " )
except ImportError :
_log_add_style = False
can_fork = hasattr ( os , " fork " )
if can_fork :
ForkingMixIn = socketserver . ForkingMixIn
else :
class ForkingMixIn : # type: ignore
pass
try :
af_unix = socket . AF_UNIX
except AttributeError :
af_unix = None # type: ignore
LISTEN_QUEUE = 128
_TSSLContextArg = t . Optional [
t . Union [ " ssl.SSLContext " , t . Tuple [ str , t . Optional [ str ] ] , t . Literal [ " adhoc " ] ]
]
if t . TYPE_CHECKING :
from _typeshed . wsgi import WSGIApplication
from _typeshed . wsgi import WSGIEnvironment
from cryptography . hazmat . primitives . asymmetric . rsa import (
RSAPrivateKeyWithSerialization ,
)
from cryptography . x509 import Certificate
class DechunkedInput ( io . RawIOBase ) :
""" An input stream that handles Transfer-Encoding ' chunked ' """
def __init__ ( self , rfile : t . IO [ bytes ] ) - > None :
self . _rfile = rfile
self . _done = False
self . _len = 0
def readable ( self ) - > bool :
return True
def read_chunk_len ( self ) - > int :
try :
line = self . _rfile . readline ( ) . decode ( " latin1 " )
_len = int ( line . strip ( ) , 16 )
except ValueError as e :
raise OSError ( " Invalid chunk header " ) from e
if _len < 0 :
raise OSError ( " Negative chunk length not allowed " )
return _len
def readinto ( self , buf : bytearray ) - > int : # type: ignore
read = 0
while not self . _done and read < len ( buf ) :
if self . _len == 0 :
# This is the first chunk or we fully consumed the previous
# one. Read the next length of the next chunk
self . _len = self . read_chunk_len ( )
if self . _len == 0 :
# Found the final chunk of size 0. The stream is now exhausted,
# but there is still a final newline that should be consumed
self . _done = True
if self . _len > 0 :
# There is data (left) in this chunk, so append it to the
# buffer. If this operation fully consumes the chunk, this will
# reset self._len to 0.
n = min ( len ( buf ) , self . _len )
# If (read + chunk size) becomes more than len(buf), buf will
# grow beyond the original size and read more data than
# required. So only read as much data as can fit in buf.
if read + n > len ( buf ) :
buf [ read : ] = self . _rfile . read ( len ( buf ) - read )
self . _len - = len ( buf ) - read
read = len ( buf )
else :
buf [ read : read + n ] = self . _rfile . read ( n )
self . _len - = n
read + = n
if self . _len == 0 :
# Skip the terminating newline of a chunk that has been fully
# consumed. This also applies to the 0-sized final chunk
terminator = self . _rfile . readline ( )
if terminator not in ( b " \n " , b " \r \n " , b " \r " ) :
raise OSError ( " Missing chunk terminating newline " )
return read
class WSGIRequestHandler ( BaseHTTPRequestHandler ) :
""" A request handler that implements WSGI dispatching. """
server : BaseWSGIServer
@property
def server_version ( self ) - > str : # type: ignore
return self . server . _server_version
def make_environ ( self ) - > WSGIEnvironment :
request_url = urlsplit ( self . path )
url_scheme = " http " if self . server . ssl_context is None else " https "
if not self . client_address :
self . client_address = ( " <local> " , 0 )
elif isinstance ( self . client_address , str ) :
self . client_address = ( self . client_address , 0 )
# If there was no scheme but the path started with two slashes,
# the first segment may have been incorrectly parsed as the
# netloc, prepend it to the path again.
if not request_url . scheme and request_url . netloc :
path_info = f " / { request_url . netloc } { request_url . path } "
else :
path_info = request_url . path
path_info = unquote ( path_info )
environ : WSGIEnvironment = {
" wsgi.version " : ( 1 , 0 ) ,
" wsgi.url_scheme " : url_scheme ,
" wsgi.input " : self . rfile ,
" wsgi.errors " : sys . stderr ,
" wsgi.multithread " : self . server . multithread ,
" wsgi.multiprocess " : self . server . multiprocess ,
" wsgi.run_once " : False ,
" werkzeug.socket " : self . connection ,
" SERVER_SOFTWARE " : self . server_version ,
" REQUEST_METHOD " : self . command ,
" SCRIPT_NAME " : " " ,
" PATH_INFO " : _wsgi_encoding_dance ( path_info ) ,
" QUERY_STRING " : _wsgi_encoding_dance ( request_url . query ) ,
# Non-standard, added by mod_wsgi, uWSGI
" REQUEST_URI " : _wsgi_encoding_dance ( self . path ) ,
# Non-standard, added by gunicorn
" RAW_URI " : _wsgi_encoding_dance ( self . path ) ,
" REMOTE_ADDR " : self . address_string ( ) ,
" REMOTE_PORT " : self . port_integer ( ) ,
" SERVER_NAME " : self . server . server_address [ 0 ] ,
" SERVER_PORT " : str ( self . server . server_address [ 1 ] ) ,
" SERVER_PROTOCOL " : self . request_version ,
}
for key , value in self . headers . items ( ) :
if " _ " in key :
continue
key = key . upper ( ) . replace ( " - " , " _ " )
value = value . replace ( " \r \n " , " " )
if key not in ( " CONTENT_TYPE " , " CONTENT_LENGTH " ) :
key = f " HTTP_ { key } "
if key in environ :
value = f " { environ [ key ] } , { value } "
environ [ key ] = value
if environ . get ( " HTTP_TRANSFER_ENCODING " , " " ) . strip ( ) . lower ( ) == " chunked " :
environ [ " wsgi.input_terminated " ] = True
environ [ " wsgi.input " ] = DechunkedInput ( environ [ " wsgi.input " ] )
# Per RFC 2616, if the URL is absolute, use that as the host.
# We're using "has a scheme" to indicate an absolute URL.
if request_url . scheme and request_url . netloc :
environ [ " HTTP_HOST " ] = request_url . netloc
try :
# binary_form=False gives nicer information, but wouldn't be compatible with
# what Nginx or Apache could return.
peer_cert = self . connection . getpeercert ( binary_form = True )
if peer_cert is not None :
# Nginx and Apache use PEM format.
environ [ " SSL_CLIENT_CERT " ] = ssl . DER_cert_to_PEM_cert ( peer_cert )
except ValueError :
# SSL handshake hasn't finished.
self . server . log ( " error " , " Cannot fetch SSL peer certificate info " )
except AttributeError :
# Not using TLS, the socket will not have getpeercert().
pass
return environ
def run_wsgi ( self ) - > None :
if self . headers . get ( " Expect " , " " ) . lower ( ) . strip ( ) == " 100-continue " :
self . wfile . write ( b " HTTP/1.1 100 Continue \r \n \r \n " )
self . environ = environ = self . make_environ ( )
status_set : str | None = None
headers_set : list [ tuple [ str , str ] ] | None = None
status_sent : str | None = None
headers_sent : list [ tuple [ str , str ] ] | None = None
chunk_response : bool = False
def write ( data : bytes ) - > None :
nonlocal status_sent , headers_sent , chunk_response
assert status_set is not None , " write() before start_response "
assert headers_set is not None , " write() before start_response "
if status_sent is None :
status_sent = status_set
headers_sent = headers_set
try :
code_str , msg = status_sent . split ( None , 1 )
except ValueError :
code_str , msg = status_sent , " "
code = int ( code_str )
self . send_response ( code , msg )
header_keys = set ( )
for key , value in headers_sent :
self . send_header ( key , value )
header_keys . add ( key . lower ( ) )
# Use chunked transfer encoding if there is no content
# length. Do not use for 1xx and 204 responses. 304
# responses and HEAD requests are also excluded, which
# is the more conservative behavior and matches other
# parts of the code.
# https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.1
if (
not (
" content-length " in header_keys
or environ [ " REQUEST_METHOD " ] == " HEAD "
or ( 100 < = code < 200 )
or code in { 204 , 304 }
)
and self . protocol_version > = " HTTP/1.1 "
) :
chunk_response = True
self . send_header ( " Transfer-Encoding " , " chunked " )
# Always close the connection. This disables HTTP/1.1
# keep-alive connections. They aren't handled well by
# Python's http.server because it doesn't know how to
# drain the stream before the next request line.
self . send_header ( " Connection " , " close " )
self . end_headers ( )
assert isinstance ( data , bytes ) , " applications must write bytes "
if data :
if chunk_response :
self . wfile . write ( hex ( len ( data ) ) [ 2 : ] . encode ( ) )
self . wfile . write ( b " \r \n " )
self . wfile . write ( data )
if chunk_response :
self . wfile . write ( b " \r \n " )
self . wfile . flush ( )
def start_response ( status , headers , exc_info = None ) : # type: ignore
nonlocal status_set , headers_set
if exc_info :
try :
if headers_sent :
raise exc_info [ 1 ] . with_traceback ( exc_info [ 2 ] )
finally :
exc_info = None
elif headers_set :
raise AssertionError ( " Headers already set " )
status_set = status
headers_set = headers
return write
def execute ( app : WSGIApplication ) - > None :
application_iter = app ( environ , start_response )
try :
for data in application_iter :
write ( data )
if not headers_sent :
write ( b " " )
if chunk_response :
self . wfile . write ( b " 0 \r \n \r \n " )
finally :
# Check for any remaining data in the read socket, and discard it. This
# will read past request.max_content_length, but lets the client see a
# 413 response instead of a connection reset failure. If we supported
# keep-alive connections, this naive approach would break by reading the
# next request line. Since we know that write (above) closes every
# connection we can read everything.
selector = selectors . DefaultSelector ( )
selector . register ( self . connection , selectors . EVENT_READ )
total_size = 0
total_reads = 0
# A timeout of 0 tends to fail because a client needs a small amount of
# time to continue sending its data.
while selector . select ( timeout = 0.01 ) :
# Only read 10MB into memory at a time.
data = self . rfile . read ( 10_000_000 )
total_size + = len ( data )
total_reads + = 1
# Stop reading on no data, >=10GB, or 1000 reads. If a client sends
# more than that, they'll get a connection reset failure.
if not data or total_size > = 10_000_000_000 or total_reads > 1000 :
break
selector . close ( )
if hasattr ( application_iter , " close " ) :
application_iter . close ( )
try :
execute ( self . server . app )
except ( ConnectionError , socket . timeout ) as e :
self . connection_dropped ( e , environ )
except Exception as e :
if self . server . passthrough_errors :
raise
if status_sent is not None and chunk_response :
self . close_connection = True
try :
# if we haven't yet sent the headers but they are set
# we roll back to be able to set them again.
if status_sent is None :
status_set = None
headers_set = None
execute ( InternalServerError ( ) )
except Exception :
pass
from . debug . tbtools import DebugTraceback
msg = DebugTraceback ( e ) . render_traceback_text ( )
self . server . log ( " error " , f " Error on request: \n { msg } " )
def handle ( self ) - > None :
""" Handles a request ignoring dropped connections. """
try :
super ( ) . handle ( )
except ( ConnectionError , socket . timeout ) as e :
self . connection_dropped ( e )
except Exception as e :
if self . server . ssl_context is not None and is_ssl_error ( e ) :
self . log_error ( " SSL error occurred: %s " , e )
else :
raise
def connection_dropped (
self , error : BaseException , environ : WSGIEnvironment | None = None
) - > None :
""" Called if the connection was closed by the client. By default
nothing happens .
"""
def __getattr__ ( self , name : str ) - > t . Any :
# All HTTP methods are handled by run_wsgi.
if name . startswith ( " do_ " ) :
return self . run_wsgi
# All other attributes are forwarded to the base class.
return getattr ( super ( ) , name )
def address_string ( self ) - > str :
if getattr ( self , " environ " , None ) :
return self . environ [ " REMOTE_ADDR " ] # type: ignore
if not self . client_address :
return " <local> "
return self . client_address [ 0 ]
def port_integer ( self ) - > int :
return self . client_address [ 1 ]
# Escape control characters. This is defined (but private) in Python 3.12.
_control_char_table = str . maketrans (
{ c : rf " \ x { c : 02x } " for c in [ * range ( 0x20 ) , * range ( 0x7F , 0xA0 ) ] }
)
_control_char_table [ ord ( " \\ " ) ] = r " \\ "
def log_request ( self , code : int | str = " - " , size : int | str = " - " ) - > None :
try :
path = uri_to_iri ( self . path )
msg = f " { self . command } { path } { self . request_version } "
except AttributeError :
# path isn't set if the requestline was bad
msg = self . requestline
# Escape control characters that may be in the decoded path.
msg = msg . translate ( self . _control_char_table )
code = str ( code )
if code [ 0 ] == " 1 " : # 1xx - Informational
msg = _ansi_style ( msg , " bold " )
elif code == " 200 " : # 2xx - Success
pass
elif code == " 304 " : # 304 - Resource Not Modified
msg = _ansi_style ( msg , " cyan " )
elif code [ 0 ] == " 3 " : # 3xx - Redirection
msg = _ansi_style ( msg , " green " )
elif code == " 404 " : # 404 - Resource Not Found
msg = _ansi_style ( msg , " yellow " )
elif code [ 0 ] == " 4 " : # 4xx - Client Error
msg = _ansi_style ( msg , " bold " , " red " )
else : # 5xx, or any other response
msg = _ansi_style ( msg , " bold " , " magenta " )
self . log ( " info " , ' " %s " %s %s ' , msg , code , size )
def log_error ( self , format : str , * args : t . Any ) - > None :
self . log ( " error " , format , * args )
def log_message ( self , format : str , * args : t . Any ) - > None :
self . log ( " info " , format , * args )
def log ( self , type : str , message : str , * args : t . Any ) - > None :
_log (
type ,
f " { self . address_string ( ) } - - [ { self . log_date_time_string ( ) } ] { message } \n " ,
* args ,
)
def _ansi_style ( value : str , * styles : str ) - > str :
if not _log_add_style :
return value
codes = {
" bold " : 1 ,
" red " : 31 ,
" green " : 32 ,
" yellow " : 33 ,
" magenta " : 35 ,
" cyan " : 36 ,
}
for style in styles :
value = f " \x1b [ { codes [ style ] } m { value } "
return f " { value } \x1b [0m "
def generate_adhoc_ssl_pair (
cn : str | None = None ,
) - > tuple [ Certificate , RSAPrivateKeyWithSerialization ] :
try :
from cryptography import x509
from cryptography . x509 . oid import NameOID
from cryptography . hazmat . backends import default_backend
from cryptography . hazmat . primitives import hashes
from cryptography . hazmat . primitives . asymmetric import rsa
except ImportError :
raise TypeError (
" Using ad-hoc certificates requires the cryptography library. "
) from None
backend = default_backend ( )
pkey = rsa . generate_private_key (
public_exponent = 65537 , key_size = 2048 , backend = backend
)
# pretty damn sure that this is not actually accepted by anyone
if cn is None :
cn = " * "
subject = x509 . Name (
[
x509 . NameAttribute ( NameOID . ORGANIZATION_NAME , " Dummy Certificate " ) ,
x509 . NameAttribute ( NameOID . COMMON_NAME , cn ) ,
]
)
backend = default_backend ( )
cert = (
x509 . CertificateBuilder ( )
. subject_name ( subject )
. issuer_name ( subject )
. public_key ( pkey . public_key ( ) )
. serial_number ( x509 . random_serial_number ( ) )
. not_valid_before ( dt . now ( timezone . utc ) )
. not_valid_after ( dt . now ( timezone . utc ) + timedelta ( days = 365 ) )
. add_extension ( x509 . ExtendedKeyUsage ( [ x509 . OID_SERVER_AUTH ] ) , critical = False )
. add_extension ( x509 . SubjectAlternativeName ( [ x509 . DNSName ( cn ) ] ) , critical = False )
. sign ( pkey , hashes . SHA256 ( ) , backend )
)
return cert , pkey
def make_ssl_devcert (
base_path : str , host : str | None = None , cn : str | None = None
) - > tuple [ str , str ] :
""" Creates an SSL key for development. This should be used instead of
the ` ` ' adhoc ' ` ` key which generates a new cert on each server start .
It accepts a path for where it should store the key and cert and
either a host or CN . If a host is given it will use the CN
` ` * . host / CN = host ` ` .
For more information see : func : ` run_simple ` .
. . versionadded : : 0.9
: param base_path : the path to the certificate and key . The extension
` ` . crt ` ` is added for the certificate , ` ` . key ` ` is
added for the key .
: param host : the name of the host . This can be used as an alternative
for the ` cn ` .
: param cn : the ` CN ` to use .
"""
if host is not None :
cn = f " *. { host } /CN= { host } "
cert , pkey = generate_adhoc_ssl_pair ( cn = cn )
from cryptography . hazmat . primitives import serialization
cert_file = f " { base_path } .crt "
pkey_file = f " { base_path } .key "
with open ( cert_file , " wb " ) as f :
f . write ( cert . public_bytes ( serialization . Encoding . PEM ) )
with open ( pkey_file , " wb " ) as f :
f . write (
pkey . private_bytes (
encoding = serialization . Encoding . PEM ,
format = serialization . PrivateFormat . TraditionalOpenSSL ,
encryption_algorithm = serialization . NoEncryption ( ) ,
)
)
return cert_file , pkey_file
def generate_adhoc_ssl_context ( ) - > ssl . SSLContext :
""" Generates an adhoc SSL context for the development server. """
import tempfile
import atexit
cert , pkey = generate_adhoc_ssl_pair ( )
from cryptography . hazmat . primitives import serialization
cert_handle , cert_file = tempfile . mkstemp ( )
pkey_handle , pkey_file = tempfile . mkstemp ( )
atexit . register ( os . remove , pkey_file )
atexit . register ( os . remove , cert_file )
os . write ( cert_handle , cert . public_bytes ( serialization . Encoding . PEM ) )
os . write (
pkey_handle ,
pkey . private_bytes (
encoding = serialization . Encoding . PEM ,
format = serialization . PrivateFormat . TraditionalOpenSSL ,
encryption_algorithm = serialization . NoEncryption ( ) ,
) ,
)
os . close ( cert_handle )
os . close ( pkey_handle )
ctx = load_ssl_context ( cert_file , pkey_file )
return ctx
def load_ssl_context (
cert_file : str , pkey_file : str | None = None , protocol : int | None = None
) - > ssl . SSLContext :
""" Loads SSL context from cert/private key files and optional protocol.
Many parameters are directly taken from the API of
: py : class : ` ssl . SSLContext ` .
: param cert_file : Path of the certificate to use .
: param pkey_file : Path of the private key to use . If not given , the key
will be obtained from the certificate file .
: param protocol : A ` ` PROTOCOL ` ` constant from the : mod : ` ssl ` module .
Defaults to : data : ` ssl . PROTOCOL_TLS_SERVER ` .
"""
if protocol is None :
protocol = ssl . PROTOCOL_TLS_SERVER
ctx = ssl . SSLContext ( protocol )
ctx . load_cert_chain ( cert_file , pkey_file )
return ctx
def is_ssl_error ( error : Exception | None = None ) - > bool :
""" Checks if the given error (or the current one) is an SSL error. """
if error is None :
error = t . cast ( Exception , sys . exc_info ( ) [ 1 ] )
return isinstance ( error , ssl . SSLError )
def select_address_family ( host : str , port : int ) - > socket . AddressFamily :
""" Return ``AF_INET4``, ``AF_INET6``, or ``AF_UNIX`` depending on
the host and port . """
if host . startswith ( " unix:// " ) :
return socket . AF_UNIX
elif " : " in host and hasattr ( socket , " AF_INET6 " ) :
return socket . AF_INET6
return socket . AF_INET
def get_sockaddr (
host : str , port : int , family : socket . AddressFamily
) - > tuple [ str , int ] | str :
""" Return a fully qualified socket address that can be passed to
: func : ` socket . bind ` . """
if family == af_unix :
# Absolute path avoids IDNA encoding error when path starts with dot.
return os . path . abspath ( host . partition ( " :// " ) [ 2 ] )
try :
res = socket . getaddrinfo (
host , port , family , socket . SOCK_STREAM , socket . IPPROTO_TCP
)
except socket . gaierror :
return host , port
return res [ 0 ] [ 4 ] # type: ignore
def get_interface_ip ( family : socket . AddressFamily ) - > str :
""" Get the IP address of an external interface. Used when binding to
0.0 .0 .0 or : : 1 to show a more useful URL .
: meta private :
"""
# arbitrary private address
host = " fd31:f903:5ab5:1::1 " if family == socket . AF_INET6 else " 10.253.155.219 "
with socket . socket ( family , socket . SOCK_DGRAM ) as s :
try :
s . connect ( ( host , 58162 ) )
except OSError :
return " ::1 " if family == socket . AF_INET6 else " 127.0.0.1 "
return s . getsockname ( ) [ 0 ] # type: ignore
class BaseWSGIServer ( HTTPServer ) :
""" A WSGI server that that handles one request at a time.
Use : func : ` make_server ` to create a server instance .
"""
multithread = False
multiprocess = False
request_queue_size = LISTEN_QUEUE
allow_reuse_address = True
def __init__ (
self ,
host : str ,
port : int ,
app : WSGIApplication ,
handler : type [ WSGIRequestHandler ] | None = None ,
passthrough_errors : bool = False ,
ssl_context : _TSSLContextArg | None = None ,
fd : int | None = None ,
) - > None :
if handler is None :
handler = WSGIRequestHandler
# If the handler doesn't directly set a protocol version and
# thread or process workers are used, then allow chunked
# responses and keep-alive connections by enabling HTTP/1.1.
if " protocol_version " not in vars ( handler ) and (
self . multithread or self . multiprocess
) :
handler . protocol_version = " HTTP/1.1 "
self . host = host
self . port = port
self . app = app
self . passthrough_errors = passthrough_errors
self . address_family = address_family = select_address_family ( host , port )
server_address = get_sockaddr ( host , int ( port ) , address_family )
# Remove a leftover Unix socket file from a previous run. Don't
# remove a file that was set up by run_simple.
if address_family == af_unix and fd is None :
server_address = t . cast ( str , server_address )
if os . path . exists ( server_address ) :
os . unlink ( server_address )
# Bind and activate will be handled manually, it should only
# happen if we're not using a socket that was already set up.
super ( ) . __init__ (
server_address , # type: ignore[arg-type]
handler ,
bind_and_activate = False ,
)
if fd is None :
# No existing socket descriptor, do bind_and_activate=True.
try :
self . server_bind ( )
self . server_activate ( )
except OSError as e :
# Catch connection issues and show them without the traceback. Show
# extra instructions for address not found, and for macOS.
self . server_close ( )
print ( e . strerror , file = sys . stderr )
if e . errno == errno . EADDRINUSE :
print (
f " Port { port } is in use by another program. Either identify and "
" stop that program, or start the server with a different "
" port. " ,
file = sys . stderr ,
)
if sys . platform == " darwin " and port == 5000 :
print (
" On macOS, try disabling the ' AirPlay Receiver ' service "
" from System Preferences -> General -> AirDrop & Handoff. " ,
file = sys . stderr ,
)
sys . exit ( 1 )
except BaseException :
self . server_close ( )
raise
else :
# TCPServer automatically opens a socket even if bind_and_activate is False.
# Close it to silence a ResourceWarning.
self . server_close ( )
# Use the passed in socket directly.
self . socket = socket . fromfd ( fd , address_family , socket . SOCK_STREAM )
self . server_address = self . socket . getsockname ( )
if address_family != af_unix :
# If port was 0, this will record the bound port.
self . port = self . server_address [ 1 ]
if ssl_context is not None :
if isinstance ( ssl_context , tuple ) :
ssl_context = load_ssl_context ( * ssl_context )
elif ssl_context == " adhoc " :
ssl_context = generate_adhoc_ssl_context ( )
self . socket = ssl_context . wrap_socket ( self . socket , server_side = True )
self . ssl_context : ssl . SSLContext | None = ssl_context
else :
self . ssl_context = None
import importlib . metadata
self . _server_version = f " Werkzeug/ { importlib . metadata . version ( ' werkzeug ' ) } "
def log ( self , type : str , message : str , * args : t . Any ) - > None :
_log ( type , message , * args )
def serve_forever ( self , poll_interval : float = 0.5 ) - > None :
try :
super ( ) . serve_forever ( poll_interval = poll_interval )
except KeyboardInterrupt :
pass
finally :
self . server_close ( )
def handle_error (
self , request : t . Any , client_address : tuple [ str , int ] | str
) - > None :
if self . passthrough_errors :
raise
return super ( ) . handle_error ( request , client_address )
def log_startup ( self ) - > None :
""" Show information about the address when starting the server. """
dev_warning = (
" WARNING: This is a development server. Do not use it in a production "
" deployment. Use a production WSGI server instead. "
)
dev_warning = _ansi_style ( dev_warning , " bold " , " red " )
messages = [ dev_warning ]
if self . address_family == af_unix :
messages . append ( f " * Running on { self . host } " )
else :
scheme = " http " if self . ssl_context is None else " https "
display_hostname = self . host
if self . host in { " 0.0.0.0 " , " :: " } :
messages . append ( f " * Running on all addresses ( { self . host } ) " )
if self . host == " 0.0.0.0 " :
localhost = " 127.0.0.1 "
display_hostname = get_interface_ip ( socket . AF_INET )
else :
localhost = " [::1] "
display_hostname = get_interface_ip ( socket . AF_INET6 )
messages . append ( f " * Running on { scheme } :// { localhost } : { self . port } " )
if " : " in display_hostname :
display_hostname = f " [ { display_hostname } ] "
messages . append ( f " * Running on { scheme } :// { display_hostname } : { self . port } " )
_log ( " info " , " \n " . join ( messages ) )
class ThreadedWSGIServer ( socketserver . ThreadingMixIn , BaseWSGIServer ) :
""" A WSGI server that handles concurrent requests in separate
threads .
Use : func : ` make_server ` to create a server instance .
"""
multithread = True
daemon_threads = True
class ForkingWSGIServer ( ForkingMixIn , BaseWSGIServer ) :
""" A WSGI server that handles concurrent requests in separate forked
processes .
Use : func : ` make_server ` to create a server instance .
"""
multiprocess = True
def __init__ (
self ,
host : str ,
port : int ,
app : WSGIApplication ,
processes : int = 40 ,
handler : type [ WSGIRequestHandler ] | None = None ,
passthrough_errors : bool = False ,
ssl_context : _TSSLContextArg | None = None ,
fd : int | None = None ,
) - > None :
if not can_fork :
raise ValueError ( " Your platform does not support forking. " )
super ( ) . __init__ ( host , port , app , handler , passthrough_errors , ssl_context , fd )
self . max_children = processes
def make_server (
host : str ,
port : int ,
app : WSGIApplication ,
threaded : bool = False ,
processes : int = 1 ,
request_handler : type [ WSGIRequestHandler ] | None = None ,
passthrough_errors : bool = False ,
ssl_context : _TSSLContextArg | None = None ,
fd : int | None = None ,
) - > BaseWSGIServer :
""" Create an appropriate WSGI server instance based on the value of
` ` threaded ` ` and ` ` processes ` ` .
This is called from : func : ` run_simple ` , but can be used separately
to have access to the server object , such as to run it in a separate
thread .
See : func : ` run_simple ` for parameter docs .
"""
if threaded and processes > 1 :
raise ValueError ( " Cannot have a multi-thread and multi-process server. " )
if threaded :
return ThreadedWSGIServer (
host , port , app , request_handler , passthrough_errors , ssl_context , fd = fd
)
if processes > 1 :
return ForkingWSGIServer (
host ,
port ,
app ,
processes ,
request_handler ,
passthrough_errors ,
ssl_context ,
fd = fd ,
)
return BaseWSGIServer (
host , port , app , request_handler , passthrough_errors , ssl_context , fd = fd
)
def is_running_from_reloader ( ) - > bool :
""" Check if the server is running as a subprocess within the
Werkzeug reloader .
. . versionadded : : 0.10
"""
return os . environ . get ( " WERKZEUG_RUN_MAIN " ) == " true "
def run_simple (
hostname : str ,
port : int ,
application : WSGIApplication ,
use_reloader : bool = False ,
use_debugger : bool = False ,
use_evalex : bool = True ,
extra_files : t . Iterable [ str ] | None = None ,
exclude_patterns : t . Iterable [ str ] | None = None ,
reloader_interval : int = 1 ,
reloader_type : str = " auto " ,
threaded : bool = False ,
processes : int = 1 ,
request_handler : type [ WSGIRequestHandler ] | None = None ,
static_files : dict [ str , str | tuple [ str , str ] ] | None = None ,
passthrough_errors : bool = False ,
ssl_context : _TSSLContextArg | None = None ,
) - > None :
""" Start a development server for a WSGI application. Various
optional features can be enabled .
. . warning : :
Do not use the development server when deploying to production .
It is intended for use only during local development . It is not
designed to be particularly efficient , stable , or secure .
: param hostname : The host to bind to , for example ` ` ' localhost ' ` ` .
Can be a domain , IPv4 or IPv6 address , or file path starting
with ` ` unix : / / ` ` for a Unix socket .
: param port : The port to bind to , for example ` ` 8080 ` ` . Using ` ` 0 ` `
tells the OS to pick a random free port .
: param application : The WSGI application to run .
: param use_reloader : Use a reloader process to restart the server
process when files are changed .
: param use_debugger : Use Werkzeug ' s debugger, which will show
formatted tracebacks on unhandled exceptions .
: param use_evalex : Make the debugger interactive . A Python terminal
can be opened for any frame in the traceback . Some protection is
provided by requiring a PIN , but this should never be enabled
on a publicly visible server .
: param extra_files : The reloader will watch these files for changes
in addition to Python modules . For example , watch a
configuration file .
: param exclude_patterns : The reloader will ignore changes to any
files matching these : mod : ` fnmatch ` patterns . For example ,
ignore cache files .
: param reloader_interval : How often the reloader tries to check for
changes .
: param reloader_type : The reloader to use . The ` ` ' stat ' ` ` reloader
is built in , but may require significant CPU to watch files . The
` ` ' watchdog ' ` ` reloader is much more efficient but requires
installing the ` ` watchdog ` ` package first .
: param threaded : Handle concurrent requests using threads . Cannot be
used with ` ` processes ` ` .
: param processes : Handle concurrent requests using up to this number
of processes . Cannot be used with ` ` threaded ` ` .
: param request_handler : Use a different
: class : ` ~ BaseHTTPServer . BaseHTTPRequestHandler ` subclass to
handle requests .
: param static_files : A dict mapping URL prefixes to directories to
serve static files from using
: class : ` ~ werkzeug . middleware . SharedDataMiddleware ` .
: param passthrough_errors : Don ' t catch unhandled exceptions at the
server level , let the server crash instead . If ` ` use_debugger ` `
is enabled , the debugger will still catch such errors .
: param ssl_context : Configure TLS to serve over HTTPS . Can be an
: class : ` ssl . SSLContext ` object , a ` ` ( cert_file , key_file ) ` `
tuple to create a typical context , or the string ` ` ' adhoc ' ` ` to
generate a temporary self - signed certificate .
. . versionchanged : : 2.1
Instructions are shown for dealing with an " address already in
use " error.
. . versionchanged : : 2.1
Running on ` ` 0.0 .0 .0 ` ` or ` ` : : ` ` shows the loopback IP in
addition to a real IP .
. . versionchanged : : 2.1
The command - line interface was removed .
. . versionchanged : : 2.0
Running on ` ` 0.0 .0 .0 ` ` or ` ` : : ` ` shows a real IP address that
was bound as well as a warning not to run the development server
in production .
. . versionchanged : : 2.0
The ` ` exclude_patterns ` ` parameter was added .
. . versionchanged : : 0.15
Bind to a Unix socket by passing a ` ` hostname ` ` that starts with
` ` unix : / / ` ` .
. . versionchanged : : 0.10
Improved the reloader and added support for changing the backend
through the ` ` reloader_type ` ` parameter .
. . versionchanged : : 0.9
A command - line interface was added .
. . versionchanged : : 0.8
` ` ssl_context ` ` can be a tuple of paths to the certificate and
private key files .
. . versionchanged : : 0.6
The ` ` ssl_context ` ` parameter was added .
. . versionchanged : : 0.5
The ` ` static_files ` ` and ` ` passthrough_errors ` ` parameters were
added .
"""
if not isinstance ( port , int ) :
raise TypeError ( " port must be an integer " )
if static_files :
from . middleware . shared_data import SharedDataMiddleware
application = SharedDataMiddleware ( application , static_files )
if use_debugger :
from . debug import DebuggedApplication
application = DebuggedApplication ( application , evalex = use_evalex )
if not is_running_from_reloader ( ) :
fd = None
else :
fd = int ( os . environ [ " WERKZEUG_SERVER_FD " ] )
srv = make_server (
hostname ,
port ,
application ,
threaded ,
processes ,
request_handler ,
passthrough_errors ,
ssl_context ,
fd = fd ,
)
srv . socket . set_inheritable ( True )
os . environ [ " WERKZEUG_SERVER_FD " ] = str ( srv . fileno ( ) )
if not is_running_from_reloader ( ) :
srv . log_startup ( )
_log ( " info " , _ansi_style ( " Press CTRL+C to quit " , " yellow " ) )
if use_reloader :
from . _reloader import run_with_reloader
try :
run_with_reloader (
srv . serve_forever ,
extra_files = extra_files ,
exclude_patterns = exclude_patterns ,
interval = reloader_interval ,
reloader_type = reloader_type ,
)
finally :
srv . server_close ( )
else :
srv . serve_forever ( )