@ -27,31 +27,20 @@ Copyright (C) 2010 Hiroki Ohtani(liris)
import sys
sys . path [ 0 : 0 ] = [ " " ]
import os
import os . path
import socket
import six
# websocket-client
import websocket as ws
from websocket . _handshake import _create_sec_websocket_key , \
_validate as _validate_header
from websocket . _http import read_headers
from websocket . _utils import validate_utf8
from base64 import decodebytes as base64decode
if six . PY3 :
from base64 import decodebytes as base64decode
else :
from base64 import decodestring as base64decode
if sys . version_info [ 0 ] == 2 and sys . version_info [ 1 ] < 7 :
import unittest2 as unittest
else :
import unittest
import unittest
try :
import ssl
from ssl import SSLError
except ImportError :
# dummy class of SSLError for ssl none-support environment.
@ -120,7 +109,7 @@ class WebSocketTest(unittest.TestCase):
def testWSKey ( self ) :
key = _create_sec_websocket_key ( )
self . assertTrue ( key != 24 )
self . assertTrue ( six . u ( " ¥n " ) not in key )
self . assertTrue ( str ( " ¥n " ) not in key )
def testNonce ( self ) :
""" WebSocket key should be a random 16-byte nonce.
@ -158,6 +147,7 @@ class WebSocketTest(unittest.TestCase):
header = required_header . copy ( )
header [ " sec-websocket-protocol " ] = " sub1 "
self . assertEqual ( _validate_header ( header , key , [ " sub1 " , " sub2 " ] ) , ( True , " sub1 " ) )
# This case will print out a logging error using the error() function, but that is expected
self . assertEqual ( _validate_header ( header , key , [ " sub2 " , " sub3 " ] ) , ( False , None ) )
header = required_header . copy ( )
@ -165,6 +155,7 @@ class WebSocketTest(unittest.TestCase):
self . assertEqual ( _validate_header ( header , key , [ " Sub1 " , " suB2 " ] ) , ( True , " sub1 " ) )
header = required_header . copy ( )
# This case will print out a logging error using the error() function, but that is expected
self . assertEqual ( _validate_header ( header , key , [ " Sub1 " , " suB2 " ] ) , ( False , None ) )
def testReadHeader ( self ) :
@ -185,16 +176,13 @@ class WebSocketTest(unittest.TestCase):
sock . set_mask_key ( create_mask_key )
s = sock . sock = HeaderSockMock ( " data/header01.txt " )
sock . send ( " Hello " )
self . assertEqual ( s . sent [ 0 ] , six . b ( " \x81 \x85 abcd) \x07 \x0f \x08 \x0e " ) )
self . assertEqual ( s . sent [ 0 ] , b ' \x81 \x85 abcd) \x07 \x0f \x08 \x0e ' )
sock . send ( " こんにちは " )
self . assertEqual ( s . sent [ 1 ] , six . b ( " \x81 \x8f abcd \x82 \xe3 \xf0 \x87 \xe3 \xf1 \x80 \xe5 \xca \x81 \xe2 \xc5 \x82 \xe3 \xcc " ) )
sock . send ( u " こんにちは " )
self . assertEqual ( s . sent [ 1 ] , six . b ( " \x81 \x8f abcd \x82 \xe3 \xf0 \x87 \xe3 \xf1 \x80 \xe5 \xca \x81 \xe2 \xc5 \x82 \xe3 \xcc " ) )
self . assertEqual ( s . sent [ 1 ] , b ' \x81 \x8f abcd \x82 \xe3 \xf0 \x87 \xe3 \xf1 \x80 \xe5 \xca \x81 \xe2 \xc5 \x82 \xe3 \xcc ' )
# sock.send("x" * 5000)
# self.assertEqual(s.sent[1], six.b(" \x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc") )
# self.assertEqual(s.sent[1], b'\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc")
self . assertEqual ( sock . send_binary ( b ' 1111111111101 ' ) , 19 )
@ -202,12 +190,12 @@ class WebSocketTest(unittest.TestCase):
# TODO: add longer frame data
sock = ws . WebSocket ( )
s = sock . sock = SockMock ( )
something = six . b ( " \x81 \x8f abcd\x82 \xe3 \xf0 \x87 \xe3 \xf1 \x80 \xe5 \xca \x81 \xe2 \xc5 \x82 \xe3 \xcc " )
something = b ' \x81 \x8f abcd\x82 \xe3 \xf0 \x87 \xe3 \xf1 \x80 \xe5 \xca \x81 \xe2 \xc5 \x82 \xe3 \xcc '
s . add_packet ( something )
data = sock . recv ( )
self . assertEqual ( data , " こんにちは " )
s . add_packet ( six . b ( " \x81 \x85 abcd) \x07 \x0f \x08 \x0e " ) )
s . add_packet ( b ' \x81 \x85 abcd) \x07 \x0f \x08 \x0e ' )
data = sock . recv ( )
self . assertEqual ( data , " Hello " )
@ -227,32 +215,28 @@ class WebSocketTest(unittest.TestCase):
def testInternalRecvStrict ( self ) :
sock = ws . WebSocket ( )
s = sock . sock = SockMock ( )
s . add_packet ( six . b ( " foo " ) )
s . add_packet ( b ' foo ' )
s . add_packet ( socket . timeout ( ) )
s . add_packet ( six . b ( " bar " ) )
s . add_packet ( b ' bar ' )
# s.add_packet(SSLError("The read operation timed out"))
s . add_packet ( six . b ( " baz " ) )
s . add_packet ( b ' baz ' )
with self . assertRaises ( ws . WebSocketTimeoutException ) :
sock . frame_buffer . recv_strict ( 9 )
# if six.PY2:
# with self.assertRaises(ws.WebSocketTimeoutException):
# data = sock._recv_strict(9)
# else:
# with self.assertRaises(SSLError):
# data = sock._recv_strict(9)
data = sock . frame_buffer . recv_strict ( 9 )
self . assertEqual ( data , six . b ( " foobarbaz " ) )
self . assertEqual ( data , b ' foobarbaz ' )
with self . assertRaises ( ws . WebSocketConnectionClosedException ) :
sock . frame_buffer . recv_strict ( 1 )
def testRecvTimeout ( self ) :
sock = ws . WebSocket ( )
s = sock . sock = SockMock ( )
s . add_packet ( six . b ( " \x81 " ) )
s . add_packet ( b ' \x81 ' )
s . add_packet ( socket . timeout ( ) )
s . add_packet ( six . b ( " \x8d abcd \x29 \x07 \x0f \x08 \x0e " ) )
s . add_packet ( b ' \x8d abcd \x29 \x07 \x0f \x08 \x0e ' )
s . add_packet ( socket . timeout ( ) )
s . add_packet ( six . b ( " \x4e \x43 \x33 \x0e \x10 \x0f \x00 \x40 " ) )
s . add_packet ( b ' \x4e \x43 \x33 \x0e \x10 \x0f \x00 \x40 ' )
with self . assertRaises ( ws . WebSocketTimeoutException ) :
sock . recv ( )
with self . assertRaises ( ws . WebSocketTimeoutException ) :
@ -266,9 +250,9 @@ class WebSocketTest(unittest.TestCase):
sock = ws . WebSocket ( )
s = sock . sock = SockMock ( )
# OPCODE=TEXT, FIN=0, MSG="Brevity is "
s . add_packet ( six . b ( " \x01 \x8b abcd# \x10 \x06 \x12 \x08 \x16 \x1a D \x08 \x11 C " ) )
s . add_packet ( b ' \x01 \x8b abcd# \x10 \x06 \x12 \x08 \x16 \x1a D \x08 \x11 C ' )
# OPCODE=CONT, FIN=1, MSG="the soul of wit"
s . add_packet ( six . b ( " \x80 \x8f abcd \x15 \n \x06 D \x12 \r \x16 \x08 A \r \x05 D \x16 \x0b \x17 " ) )
s . add_packet ( b ' \x80 \x8f abcd \x15 \n \x06 D \x12 \r \x16 \x08 A \r \x05 D \x16 \x0b \x17 ' )
data = sock . recv ( )
self . assertEqual ( data , " Brevity is the soul of wit " )
with self . assertRaises ( ws . WebSocketConnectionClosedException ) :
@ -278,21 +262,21 @@ class WebSocketTest(unittest.TestCase):
sock = ws . WebSocket ( fire_cont_frame = True )
s = sock . sock = SockMock ( )
# OPCODE=TEXT, FIN=0, MSG="Brevity is "
s . add_packet ( six . b ( " \x01 \x8b abcd# \x10 \x06 \x12 \x08 \x16 \x1a D \x08 \x11 C " ) )
s . add_packet ( b ' \x01 \x8b abcd# \x10 \x06 \x12 \x08 \x16 \x1a D \x08 \x11 C ' )
# OPCODE=CONT, FIN=0, MSG="Brevity is "
s . add_packet ( six . b ( " \x00 \x8b abcd# \x10 \x06 \x12 \x08 \x16 \x1a D \x08 \x11 C " ) )
s . add_packet ( b ' \x00 \x8b abcd# \x10 \x06 \x12 \x08 \x16 \x1a D \x08 \x11 C ' )
# OPCODE=CONT, FIN=1, MSG="the soul of wit"
s . add_packet ( six . b ( " \x80 \x8f abcd \x15 \n \x06 D \x12 \r \x16 \x08 A \r \x05 D \x16 \x0b \x17 " ) )
s . add_packet ( b ' \x80 \x8f abcd \x15 \n \x06 D \x12 \r \x16 \x08 A \r \x05 D \x16 \x0b \x17 ' )
_ , data = sock . recv_data ( )
self . assertEqual ( data , six . b ( " Brevity is " ) )
self . assertEqual ( data , b ' Brevity is ' )
_ , data = sock . recv_data ( )
self . assertEqual ( data , six . b ( " Brevity is " ) )
self . assertEqual ( data , b ' Brevity is ' )
_ , data = sock . recv_data ( )
self . assertEqual ( data , six . b ( " the soul of wit " ) )
self . assertEqual ( data , b ' the soul of wit ' )
# OPCODE=CONT, FIN=0, MSG="Brevity is "
s . add_packet ( six . b ( " \x80 \x8b abcd# \x10 \x06 \x12 \x08 \x16 \x1a D \x08 \x11 C " ) )
s . add_packet ( b ' \x80 \x8b abcd# \x10 \x06 \x12 \x08 \x16 \x1a D \x08 \x11 C ' )
with self . assertRaises ( ws . WebSocketException ) :
sock . recv_data ( )
@ -302,15 +286,13 @@ class WebSocketTest(unittest.TestCase):
def testClose ( self ) :
sock = ws . WebSocket ( )
sock . sock = SockMock ( )
sock . connected = True
sock . close ( )
self . assertEqual ( sock . connected , False )
self . assertRaises ( ws . _exceptions . WebSocketConnectionClosedException , sock . close )
sock = ws . WebSocket ( )
s = sock . sock = SockMock ( )
sock . connected = True
s . add_packet ( six . b ( ' \x88 \x80 \x17 \x98 p \x84 ' ) )
s . add_packet ( b ' \x88 \x80 \x17 \x98 p \x84 ' )
sock . recv ( )
self . assertEqual ( sock . connected , False )
@ -318,20 +300,18 @@ class WebSocketTest(unittest.TestCase):
sock = ws . WebSocket ( )
s = sock . sock = SockMock ( )
# OPCODE=CONT, FIN=1, MSG="the soul of wit"
s . add_packet ( six . b ( " \x80 \x8f abcd \x15 \n \x06 D \x12 \r \x16 \x08 A \r \x05 D \x16 \x0b \x17 " ) )
s . add_packet ( b ' \x80 \x8f abcd \x15 \n \x06 D \x12 \r \x16 \x08 A \r \x05 D \x16 \x0b \x17 ' )
self . assertRaises ( ws . WebSocketException , sock . recv )
def testRecvWithProlongedFragmentation ( self ) :
sock = ws . WebSocket ( )
s = sock . sock = SockMock ( )
# OPCODE=TEXT, FIN=0, MSG="Once more unto the breach, "
s . add_packet ( six . b ( " \x01 \x9b abcd. \x0c \x00 \x01 A \x0f \x0c \x16 \x04 B \x16 \n \x15 "
" \r C \x10 \t \x07 C \x06 \x13 \x07 \x02 \x07 \t NC " ) )
s . add_packet ( b ' \x01 \x9b abcd. \x0c \x00 \x01 A \x0f \x0c \x16 \x04 B \x16 \n \x15 \r C \x10 \t \x07 C \x06 \x13 \x07 \x02 \x07 \t NC ' )
# OPCODE=CONT, FIN=0, MSG="dear friends, "
s . add_packet ( six . b ( " \x00 \x8e abcd \x05 \x07 \x02 \x16 A \x04 \x11 \r \x04 \x0c \x07 "
" \x17 MB " ) )
s . add_packet ( b ' \x00 \x8e abcd \x05 \x07 \x02 \x16 A \x04 \x11 \r \x04 \x0c \x07 \x17 MB ' )
# OPCODE=CONT, FIN=1, MSG="once more"
s . add_packet ( six . b ( " \x80 \x89 abcd \x0e \x0c \x00 \x01 A \x0f \x0c \x16 \x04 " ) )
s . add_packet ( b ' \x80 \x89 abcd \x0e \x0c \x00 \x01 A \x0f \x0c \x16 \x04 ' )
data = sock . recv ( )
self . assertEqual (
data ,
@ -344,19 +324,18 @@ class WebSocketTest(unittest.TestCase):
sock . set_mask_key ( create_mask_key )
s = sock . sock = SockMock ( )
# OPCODE=TEXT, FIN=0, MSG="Too much "
s . add_packet ( six . b ( " \x01 \x89 abcd5 \r \x0c D \x0c \x17 \x00 \x0c A " ) )
s . add_packet ( b ' \x01 \x89 abcd5 \r \x0c D \x0c \x17 \x00 \x0c A ' )
# OPCODE=PING, FIN=1, MSG="Please PONG this"
s . add_packet ( six . b ( " \x89 \x90 abcd1 \x0e \x06 \x05 \x12 \x07 C4.,$D \x15 \n \n \x17 " ) )
s . add_packet ( b ' \x89 \x90 abcd1 \x0e \x06 \x05 \x12 \x07 C4.,$D \x15 \n \n \x17 ' )
# OPCODE=CONT, FIN=1, MSG="of a good thing"
s . add_packet ( six . b ( " \x80 \x8f abcd \x0e \x04 C \x05 A \x05 \x0c \x0b \x05 B \x17 \x0c "
" \x08 \x0c \x04 " ) )
s . add_packet ( b ' \x80 \x8f abcd \x0e \x04 C \x05 A \x05 \x0c \x0b \x05 B \x17 \x0c \x08 \x0c \x04 ' )
data = sock . recv ( )
self . assertEqual ( data , " Too much of a good thing " )
with self . assertRaises ( ws . WebSocketConnectionClosedException ) :
sock . recv ( )
self . assertEqual (
s . sent [ 0 ] ,
six . b ( " \x8a \x90 abcd1 \x0e \x06 \x05 \x12 \x07 C4.,$D \x15 \n \n \x17 " ) )
b ' \x8a \x90 abcd1 \x0e \x06 \x05 \x12 \x07 C4.,$D \x15 \n \n \x17 ' )
@unittest.skipUnless ( TEST_WITH_INTERNET , " Internet-requiring tests are disabled " )
def testWebSocket ( self ) :
@ -366,7 +345,7 @@ class WebSocketTest(unittest.TestCase):
result = s . recv ( )
self . assertEqual ( result , " Hello, World " )
s . send ( u " こにゃにゃちは、世界 " )
s . send ( " こにゃにゃちは、世界 " )
result = s . recv ( )
self . assertEqual ( result , " こにゃにゃちは、世界 " )
self . assertRaises ( ValueError , s . send_close , - 1 , " " )
@ -388,7 +367,10 @@ class WebSocketTest(unittest.TestCase):
self . assertTrue ( isinstance ( s . sock , ssl . SSLSocket ) )
self . assertEqual ( s . getstatus ( ) , 101 )
self . assertNotEqual ( s . getheaders ( ) , None )
s . close ( )
s . settimeout ( 10 )
self . assertEqual ( s . gettimeout ( ) , 10 )
self . assertEqual ( s . getsubprotocol ( ) , None )
s . abort ( )
@unittest.skipUnless ( TEST_WITH_INTERNET , " Internet-requiring tests are disabled " )
def testWebSocketWithCustomHeader ( self ) :
@ -421,13 +403,50 @@ class SockOptTest(unittest.TestCase):
class UtilsTest ( unittest . TestCase ) :
def testUtf8Validator ( self ) :
state = validate_utf8 ( six . b ( ' \xf0 \x90 \x80 \x80 ' ) )
state = validate_utf8 ( b ' \xf0 \x90 \x80 \x80 ' )
self . assertEqual ( state , True )
state = validate_utf8 ( six . b ( ' \xce \xba \xe1 \xbd \xb9 \xcf \x83 \xce \xbc \xce \xb5 \xed \xa0 \x80 edited ' ) )
state = validate_utf8 ( b ' \xce \xba \xe1 \xbd \xb9 \xcf \x83 \xce \xbc \xce \xb5 \xed \xa0 \x80 edited ' )
self . assertEqual ( state , False )
state = validate_utf8 ( six . b ( ' ' ) )
state = validate_utf8 ( b ' ' )
self . assertEqual ( state , True )
class HandshakeTest ( unittest . TestCase ) :
@unittest.skipUnless ( TEST_WITH_INTERNET , " Internet-requiring tests are disabled " )
def test_http_SSL ( self ) :
websock1 = ws . WebSocket ( sslopt = { " cert_chain " : ssl . get_default_verify_paths ( ) . capath } )
self . assertRaises ( ValueError ,
websock1 . connect , " wss://api.bitfinex.com/ws/2 " )
websock2 = ws . WebSocket ( sslopt = { " certfile " : " myNonexistentCertFile " } )
self . assertRaises ( FileNotFoundError ,
websock2 . connect , " wss://api.bitfinex.com/ws/2 " )
@unittest.skipUnless ( TEST_WITH_INTERNET , " Internet-requiring tests are disabled " )
def testManualHeaders ( self ) :
websock3 = ws . WebSocket ( sslopt = { " cert_reqs " : ssl . CERT_NONE ,
" ca_certs " : ssl . get_default_verify_paths ( ) . capath ,
" ca_cert_path " : ssl . get_default_verify_paths ( ) . openssl_cafile } )
self . assertRaises ( ws . _exceptions . WebSocketBadStatusException ,
websock3 . connect , " wss://api.bitfinex.com/ws/2 " , cookie = " chocolate " ,
origin = " testing_websockets.com " ,
host = " echo.websocket.org/websocket-client-test " ,
subprotocols = [ " testproto " ] ,
connection = " Upgrade " ,
header = { " CustomHeader1 " : " 123 " ,
" Cookie " : " TestValue " ,
" Sec-WebSocket-Key " : " k9kFAUWNAMmf5OEMfTlOEA== " ,
" Sec-WebSocket-Protocol " : " newprotocol " } )
def testIPv6 ( self ) :
websock2 = ws . WebSocket ( )
self . assertRaises ( ValueError , websock2 . connect , " 2001:4860:4860::8888 " )
def testBadURLs ( self ) :
websock3 = ws . WebSocket ( )
self . assertRaises ( ValueError , websock3 . connect , " ws//example.com " )
self . assertRaises ( ws . WebSocketAddressException , websock3 . connect , " ws://example " )
self . assertRaises ( ValueError , websock3 . connect , " example.com " )
if __name__ == " __main__ " :
unittest . main ( )