You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
407 lines
15 KiB
407 lines
15 KiB
"""
|
|
|
|
"""
|
|
|
|
"""
|
|
websocket - WebSocket client library for Python
|
|
|
|
Copyright (C) 2010 Hiroki Ohtani(liris)
|
|
|
|
This library is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU Lesser General Public
|
|
License as published by the Free Software Foundation; either
|
|
version 2.1 of the License, or (at your option) any later version.
|
|
|
|
This library is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
Lesser General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Lesser General Public
|
|
License along with this library; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
|
|
"""
|
|
import selectors
|
|
import sys
|
|
import threading
|
|
import time
|
|
import traceback
|
|
from ._abnf import ABNF
|
|
from ._core import WebSocket, getdefaulttimeout
|
|
from ._exceptions import *
|
|
from . import _logging
|
|
|
|
|
|
__all__ = ["WebSocketApp"]
|
|
|
|
|
|
class Dispatcher:
|
|
"""
|
|
Dispatcher
|
|
"""
|
|
def __init__(self, app, ping_timeout):
|
|
self.app = app
|
|
self.ping_timeout = ping_timeout
|
|
|
|
def read(self, sock, read_callback, check_callback):
|
|
while self.app.keep_running:
|
|
sel = selectors.DefaultSelector()
|
|
sel.register(self.app.sock.sock, selectors.EVENT_READ)
|
|
|
|
r = sel.select(self.ping_timeout)
|
|
if r:
|
|
if not read_callback():
|
|
break
|
|
check_callback()
|
|
sel.close()
|
|
|
|
|
|
class SSLDispatcher:
|
|
"""
|
|
SSLDispatcher
|
|
"""
|
|
def __init__(self, app, ping_timeout):
|
|
self.app = app
|
|
self.ping_timeout = ping_timeout
|
|
|
|
def read(self, sock, read_callback, check_callback):
|
|
while self.app.keep_running:
|
|
r = self.select()
|
|
if r:
|
|
if not read_callback():
|
|
break
|
|
check_callback()
|
|
|
|
def select(self):
|
|
sock = self.app.sock.sock
|
|
if sock.pending():
|
|
return [sock,]
|
|
|
|
sel = selectors.DefaultSelector()
|
|
sel.register(sock, selectors.EVENT_READ)
|
|
|
|
r = sel.select(self.ping_timeout)
|
|
sel.close()
|
|
|
|
if len(r) > 0:
|
|
return r[0][0]
|
|
|
|
|
|
class WebSocketApp(object):
|
|
"""
|
|
Higher level of APIs are provided. The interface is like JavaScript WebSocket object.
|
|
"""
|
|
|
|
def __init__(self, url, header=None,
|
|
on_open=None, on_message=None, on_error=None,
|
|
on_close=None, on_ping=None, on_pong=None,
|
|
on_cont_message=None,
|
|
keep_running=True, get_mask_key=None, cookie=None,
|
|
subprotocols=None,
|
|
on_data=None):
|
|
"""
|
|
WebSocketApp initialization
|
|
|
|
Parameters
|
|
----------
|
|
url: <type>
|
|
websocket url.
|
|
header: list or dict
|
|
custom header for websocket handshake.
|
|
on_open: <type>
|
|
callable object which is called at opening websocket.
|
|
this function has one argument. The argument is this class object.
|
|
on_message: <type>
|
|
callable object which is called when received data.
|
|
on_message has 2 arguments.
|
|
The 1st argument is this class object.
|
|
The 2nd argument is utf-8 string which we get from the server.
|
|
on_error: <type>
|
|
callable object which is called when we get error.
|
|
on_error has 2 arguments.
|
|
The 1st argument is this class object.
|
|
The 2nd argument is exception object.
|
|
on_close: <type>
|
|
callable object which is called when closed the connection.
|
|
this function has one argument. The argument is this class object.
|
|
on_cont_message: <type>
|
|
callback object which is called when receive continued
|
|
frame data.
|
|
on_cont_message has 3 arguments.
|
|
The 1st argument is this class object.
|
|
The 2nd argument is utf-8 string which we get from the server.
|
|
The 3rd argument is continue flag. if 0, the data continue
|
|
to next frame data
|
|
on_data: <type>
|
|
callback object which is called when a message received.
|
|
This is called before on_message or on_cont_message,
|
|
and then on_message or on_cont_message is called.
|
|
on_data has 4 argument.
|
|
The 1st argument is this class object.
|
|
The 2nd argument is utf-8 string which we get from the server.
|
|
The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came.
|
|
The 4th argument is continue flag. if 0, the data continue
|
|
keep_running: <type>
|
|
this parameter is obsolete and ignored.
|
|
get_mask_key: func
|
|
a callable to produce new mask keys,
|
|
see the WebSocket.set_mask_key's docstring for more information
|
|
cookie: str
|
|
cookie value.
|
|
subprotocols: <type>
|
|
array of available sub protocols. default is None.
|
|
"""
|
|
self.url = url
|
|
self.header = header if header is not None else []
|
|
self.cookie = cookie
|
|
|
|
self.on_open = on_open
|
|
self.on_message = on_message
|
|
self.on_data = on_data
|
|
self.on_error = on_error
|
|
self.on_close = on_close
|
|
self.on_ping = on_ping
|
|
self.on_pong = on_pong
|
|
self.on_cont_message = on_cont_message
|
|
self.keep_running = False
|
|
self.get_mask_key = get_mask_key
|
|
self.sock = None
|
|
self.last_ping_tm = 0
|
|
self.last_pong_tm = 0
|
|
self.subprotocols = subprotocols
|
|
|
|
def send(self, data, opcode=ABNF.OPCODE_TEXT):
|
|
"""
|
|
send message
|
|
|
|
Parameters
|
|
----------
|
|
data: <type>
|
|
Message to send. If you set opcode to OPCODE_TEXT,
|
|
data must be utf-8 string or unicode.
|
|
opcode: <type>
|
|
Operation code of data. default is OPCODE_TEXT.
|
|
"""
|
|
|
|
if not self.sock or self.sock.send(data, opcode) == 0:
|
|
raise WebSocketConnectionClosedException(
|
|
"Connection is already closed.")
|
|
|
|
def close(self, **kwargs):
|
|
"""
|
|
Close websocket connection.
|
|
"""
|
|
self.keep_running = False
|
|
if self.sock:
|
|
self.sock.close(**kwargs)
|
|
self.sock = None
|
|
|
|
def _send_ping(self, interval, event, payload):
|
|
while not event.wait(interval):
|
|
self.last_ping_tm = time.time()
|
|
if self.sock:
|
|
try:
|
|
self.sock.ping(payload)
|
|
except Exception as ex:
|
|
_logging.warning("send_ping routine terminated: {}".format(ex))
|
|
break
|
|
|
|
def run_forever(self, sockopt=None, sslopt=None,
|
|
ping_interval=0, ping_timeout=None,
|
|
ping_payload="",
|
|
http_proxy_host=None, http_proxy_port=None,
|
|
http_no_proxy=None, http_proxy_auth=None,
|
|
skip_utf8_validation=False,
|
|
host=None, origin=None, dispatcher=None,
|
|
suppress_origin=False, proxy_type=None):
|
|
"""
|
|
Run event loop for WebSocket framework.
|
|
|
|
This loop is an infinite loop and is alive while websocket is available.
|
|
|
|
Parameters
|
|
----------
|
|
sockopt: tuple
|
|
values for socket.setsockopt.
|
|
sockopt must be tuple
|
|
and each element is argument of sock.setsockopt.
|
|
sslopt: dict
|
|
optional dict object for ssl socket option.
|
|
ping_interval: int or float
|
|
automatically send "ping" command
|
|
every specified period (in seconds)
|
|
if set to 0, not send automatically.
|
|
ping_timeout: int or float
|
|
timeout (in seconds) if the pong message is not received.
|
|
ping_payload: str
|
|
payload message to send with each ping.
|
|
http_proxy_host: <type>
|
|
http proxy host name.
|
|
http_proxy_port: <type>
|
|
http proxy port. If not set, set to 80.
|
|
http_no_proxy: <type>
|
|
host names, which doesn't use proxy.
|
|
skip_utf8_validation: bool
|
|
skip utf8 validation.
|
|
host: str
|
|
update host header.
|
|
origin: str
|
|
update origin header.
|
|
dispatcher: <type>
|
|
customize reading data from socket.
|
|
suppress_origin: bool
|
|
suppress outputting origin header.
|
|
|
|
Returns
|
|
-------
|
|
teardown: bool
|
|
False if caught KeyboardInterrupt, True if other exception was raised during a loop
|
|
"""
|
|
|
|
if ping_timeout is not None and ping_timeout <= 0:
|
|
raise WebSocketException("Ensure ping_timeout > 0")
|
|
if ping_interval is not None and ping_interval < 0:
|
|
raise WebSocketException("Ensure ping_interval >= 0")
|
|
if ping_timeout and ping_interval and ping_interval <= ping_timeout:
|
|
raise WebSocketException("Ensure ping_interval > ping_timeout")
|
|
if not sockopt:
|
|
sockopt = []
|
|
if not sslopt:
|
|
sslopt = {}
|
|
if self.sock:
|
|
raise WebSocketException("socket is already opened")
|
|
thread = None
|
|
self.keep_running = True
|
|
self.last_ping_tm = 0
|
|
self.last_pong_tm = 0
|
|
|
|
def teardown(close_frame=None):
|
|
"""
|
|
Tears down the connection.
|
|
|
|
If close_frame is set, we will invoke the on_close handler with the
|
|
statusCode and reason from there.
|
|
"""
|
|
|
|
if thread and thread.is_alive():
|
|
event.set()
|
|
thread.join()
|
|
self.keep_running = False
|
|
if self.sock:
|
|
self.sock.close()
|
|
close_status_code, close_reason = self._get_close_args(
|
|
close_frame if close_frame else None)
|
|
self._callback(self.on_close, close_status_code, close_reason)
|
|
self.sock = None
|
|
|
|
try:
|
|
self.sock = WebSocket(
|
|
self.get_mask_key, sockopt=sockopt, sslopt=sslopt,
|
|
fire_cont_frame=self.on_cont_message is not None,
|
|
skip_utf8_validation=skip_utf8_validation,
|
|
enable_multithread=True if ping_interval else False)
|
|
self.sock.settimeout(getdefaulttimeout())
|
|
self.sock.connect(
|
|
self.url, header=self.header, cookie=self.cookie,
|
|
http_proxy_host=http_proxy_host,
|
|
http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy,
|
|
http_proxy_auth=http_proxy_auth, subprotocols=self.subprotocols,
|
|
host=host, origin=origin, suppress_origin=suppress_origin,
|
|
proxy_type=proxy_type)
|
|
if not dispatcher:
|
|
dispatcher = self.create_dispatcher(ping_timeout)
|
|
|
|
self._callback(self.on_open)
|
|
|
|
if ping_interval:
|
|
event = threading.Event()
|
|
thread = threading.Thread(
|
|
target=self._send_ping, args=(ping_interval, event, ping_payload))
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
def read():
|
|
if not self.keep_running:
|
|
return teardown()
|
|
|
|
op_code, frame = self.sock.recv_data_frame(True)
|
|
if op_code == ABNF.OPCODE_CLOSE:
|
|
return teardown(frame)
|
|
elif op_code == ABNF.OPCODE_PING:
|
|
self._callback(self.on_ping, frame.data)
|
|
elif op_code == ABNF.OPCODE_PONG:
|
|
self.last_pong_tm = time.time()
|
|
self._callback(self.on_pong, frame.data)
|
|
elif op_code == ABNF.OPCODE_CONT and self.on_cont_message:
|
|
self._callback(self.on_data, frame.data,
|
|
frame.opcode, frame.fin)
|
|
self._callback(self.on_cont_message,
|
|
frame.data, frame.fin)
|
|
else:
|
|
data = frame.data
|
|
if op_code == ABNF.OPCODE_TEXT:
|
|
data = data.decode("utf-8")
|
|
self._callback(self.on_data, data, frame.opcode, True)
|
|
self._callback(self.on_message, data)
|
|
|
|
return True
|
|
|
|
def check():
|
|
if (ping_timeout):
|
|
has_timeout_expired = time.time() - self.last_ping_tm > ping_timeout
|
|
has_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0
|
|
has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > ping_timeout
|
|
|
|
if (self.last_ping_tm and
|
|
has_timeout_expired and
|
|
(has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)):
|
|
raise WebSocketTimeoutException("ping/pong timed out")
|
|
return True
|
|
|
|
dispatcher.read(self.sock.sock, read, check)
|
|
except (Exception, KeyboardInterrupt, SystemExit) as e:
|
|
self._callback(self.on_error, e)
|
|
if isinstance(e, SystemExit):
|
|
# propagate SystemExit further
|
|
raise
|
|
teardown()
|
|
return not isinstance(e, KeyboardInterrupt)
|
|
|
|
def create_dispatcher(self, ping_timeout):
|
|
timeout = ping_timeout or 10
|
|
if self.sock.is_ssl():
|
|
return SSLDispatcher(self, timeout)
|
|
|
|
return Dispatcher(self, timeout)
|
|
|
|
def _get_close_args(self, close_frame):
|
|
"""
|
|
_get_close_args extracts the close code and reason from the close body
|
|
if it exists (RFC6455 says WebSocket Connection Close Code is optional)
|
|
"""
|
|
# Need to catch the case where close_frame is None
|
|
# Otherwise the following if statement causes an error
|
|
if not self.on_close or not close_frame:
|
|
return [None, None]
|
|
|
|
# Extract close frame status code
|
|
if close_frame.data and len(close_frame.data) >= 2:
|
|
close_status_code = 256 * close_frame.data[0] + close_frame.data[1]
|
|
reason = close_frame.data[2:].decode('utf-8')
|
|
return [close_status_code, reason]
|
|
else:
|
|
# Most likely reached this because len(close_frame_data.data) < 2
|
|
return [None, None]
|
|
|
|
def _callback(self, callback, *args):
|
|
if callback:
|
|
try:
|
|
callback(self, *args)
|
|
|
|
except Exception as e:
|
|
_logging.error("error from callback {}: {}".format(callback, e))
|
|
if _logging.isEnabledForDebug():
|
|
_, _, tb = sys.exc_info()
|
|
traceback.print_tb(tb)
|