from __future__ import print_function , absolute_import , division , unicode_literals
_b = ' \u2028 \u2029 '
_a = ' \r \n \x85 '
_Z = ' while scanning a quoted scalar '
_Y = ' 0123456789ABCDEFabcdef '
_X = ' \r \n \x85 \u2028 \u2029 '
_W = ' expected a comment or a line break, but found %r '
_V = ' directive '
_U = ' \ufeff '
_T = " could not find expected ' : ' "
_S = ' while scanning a simple key '
_R = " expected ' ' , but found %r "
_Q = ' while scanning a %s '
_P = ' \r \n \x85 \u2028 \u2029 '
_O = ' while scanning a block scalar '
_N = ' expected alphabetic or numeric character, but found %r '
_M = ' ... '
_L = ' --- '
_K = ' \t '
_J = ' \x00 \r \n \x85 \u2028 \u2029 '
_I = ' \x00 '
_H = ' ! '
_G = ' while scanning a directive '
_F = ' # '
_E = ' \n '
_D = ' '
_C = None
_B = False
_A = True
from . error import MarkedYAMLError
from . tokens import *
from . compat import utf8 , unichr , PY3 , check_anchorname_char , nprint
if _B : from typing import Any , Dict , Optional , List , Union , Text ; from . compat import VersionType
__all__ = [ ' Scanner ' , ' RoundTripScanner ' , ' ScannerError ' ]
_THE_END = ' \n \x00 \r \x85 \u2028 \u2029 '
_THE_END_SPACE_TAB = ' \n \x00 \t \r \x85 \u2028 \u2029 '
_SPACE_TAB = _K
class ScannerError ( MarkedYAMLError ) : 0
class SimpleKey :
def __init__ ( self , token_number , required , index , line , column , mark ) : self . token_number = token_number ; self . required = required ; self . index = index ; self . line = line ; self . column = column ; self . mark = mark
class Scanner :
def __init__ ( self , loader = _C ) :
self . loader = loader
if self . loader is not _C and getattr ( self . loader , ' _scanner ' , _C ) is _C : self . loader . _scanner = self
self . reset_scanner ( ) ; self . first_time = _B ; self . yaml_version = _C
@property
def flow_level ( self ) : return len ( self . flow_context )
def reset_scanner ( self ) : self . done = _B ; self . flow_context = [ ] ; self . tokens = [ ] ; self . fetch_stream_start ( ) ; self . tokens_taken = 0 ; self . indent = - 1 ; self . indents = [ ] ; self . allow_simple_key = _A ; self . possible_simple_keys = { }
@property
def reader ( self ) :
try : return self . _scanner_reader
except AttributeError :
if hasattr ( self . loader , ' typ ' ) : self . _scanner_reader = self . loader . reader
else : self . _scanner_reader = self . loader . _reader
return self . _scanner_reader
@property
def scanner_processing_version ( self ) :
if hasattr ( self . loader , ' typ ' ) : return self . loader . resolver . processing_version
return self . loader . processing_version
def check_token ( self , * choices ) :
while self . need_more_tokens ( ) : self . fetch_more_tokens ( )
if bool ( self . tokens ) :
if not choices : return _A
for choice in choices :
if isinstance ( self . tokens [ 0 ] , choice ) : return _A
return _B
def peek_token ( self ) :
while self . need_more_tokens ( ) : self . fetch_more_tokens ( )
if bool ( self . tokens ) : return self . tokens [ 0 ]
def get_token ( self ) :
while self . need_more_tokens ( ) : self . fetch_more_tokens ( )
if bool ( self . tokens ) : self . tokens_taken + = 1 ; return self . tokens . pop ( 0 )
def need_more_tokens ( self ) :
if self . done : return _B
if not self . tokens : return _A
self . stale_possible_simple_keys ( )
if self . next_possible_simple_key ( ) == self . tokens_taken : return _A
return _B
def fetch_comment ( self , comment ) : raise NotImplementedError
def fetch_more_tokens ( self ) :
comment = self . scan_to_next_token ( )
if comment is not _C : return self . fetch_comment ( comment )
self . stale_possible_simple_keys ( ) ; self . unwind_indent ( self . reader . column ) ; ch = self . reader . peek ( )
if ch == _I : return self . fetch_stream_end ( )
if ch == ' % ' and self . check_directive ( ) : return self . fetch_directive ( )
if ch == ' - ' and self . check_document_start ( ) : return self . fetch_document_start ( )
if ch == ' . ' and self . check_document_end ( ) : return self . fetch_document_end ( )
if ch == ' [ ' : return self . fetch_flow_sequence_start ( )
if ch == ' { ' : return self . fetch_flow_mapping_start ( )
if ch == ' ] ' : return self . fetch_flow_sequence_end ( )
if ch == ' } ' : return self . fetch_flow_mapping_end ( )
if ch == ' , ' : return self . fetch_flow_entry ( )
if ch == ' - ' and self . check_block_entry ( ) : return self . fetch_block_entry ( )
if ch == ' ? ' and self . check_key ( ) : return self . fetch_key ( )
if ch == ' : ' and self . check_value ( ) : return self . fetch_value ( )
if ch == ' * ' : return self . fetch_alias ( )
if ch == ' & ' : return self . fetch_anchor ( )
if ch == _H : return self . fetch_tag ( )
if ch == ' | ' and not self . flow_level : return self . fetch_literal ( )
if ch == ' > ' and not self . flow_level : return self . fetch_folded ( )
if ch == " ' " : return self . fetch_single ( )
if ch == ' " ' : return self . fetch_double ( )
if self . check_plain ( ) : return self . fetch_plain ( )
raise ScannerError ( ' while scanning for the next token ' , _C , ' found character %r that cannot start any token ' % utf8 ( ch ) , self . reader . get_mark ( ) )
def next_possible_simple_key ( self ) :
min_token_number = _C
for level in self . possible_simple_keys :
key = self . possible_simple_keys [ level ]
if min_token_number is _C or key . token_number < min_token_number : min_token_number = key . token_number
return min_token_number
def stale_possible_simple_keys ( self ) :
for level in list ( self . possible_simple_keys ) :
key = self . possible_simple_keys [ level ]
if key . line != self . reader . line or self . reader . index - key . index > 1024 :
if key . required : raise ScannerError ( _S , key . mark , _T , self . reader . get_mark ( ) )
del self . possible_simple_keys [ level ]
def save_possible_simple_key ( self ) :
required = not self . flow_level and self . indent == self . reader . column
if self . allow_simple_key : self . remove_possible_simple_key ( ) ; token_number = self . tokens_taken + len ( self . tokens ) ; key = SimpleKey ( token_number , required , self . reader . index , self . reader . line , self . reader . column , self . reader . get_mark ( ) ) ; self . possible_simple_keys [ self . flow_level ] = key
def remove_possible_simple_key ( self ) :
if self . flow_level in self . possible_simple_keys :
key = self . possible_simple_keys [ self . flow_level ]
if key . required : raise ScannerError ( _S , key . mark , _T , self . reader . get_mark ( ) )
del self . possible_simple_keys [ self . flow_level ]
def unwind_indent ( self , column ) :
if bool ( self . flow_level ) : return
while self . indent > column : mark = self . reader . get_mark ( ) ; self . indent = self . indents . pop ( ) ; self . tokens . append ( BlockEndToken ( mark , mark ) )
def add_indent ( self , column ) :
if self . indent < column : self . indents . append ( self . indent ) ; self . indent = column ; return _A
return _B
def fetch_stream_start ( self ) : mark = self . reader . get_mark ( ) ; self . tokens . append ( StreamStartToken ( mark , mark , encoding = self . reader . encoding ) )
def fetch_stream_end ( self ) : self . unwind_indent ( - 1 ) ; self . remove_possible_simple_key ( ) ; self . allow_simple_key = _B ; self . possible_simple_keys = { } ; mark = self . reader . get_mark ( ) ; self . tokens . append ( StreamEndToken ( mark , mark ) ) ; self . done = _A
def fetch_directive ( self ) : self . unwind_indent ( - 1 ) ; self . remove_possible_simple_key ( ) ; self . allow_simple_key = _B ; self . tokens . append ( self . scan_directive ( ) )
def fetch_document_start ( self ) : self . fetch_document_indicator ( DocumentStartToken )
def fetch_document_end ( self ) : self . fetch_document_indicator ( DocumentEndToken )
def fetch_document_indicator ( self , TokenClass ) : self . unwind_indent ( - 1 ) ; self . remove_possible_simple_key ( ) ; self . allow_simple_key = _B ; start_mark = self . reader . get_mark ( ) ; self . reader . forward ( 3 ) ; end_mark = self . reader . get_mark ( ) ; self . tokens . append ( TokenClass ( start_mark , end_mark ) )
def fetch_flow_sequence_start ( self ) : self . fetch_flow_collection_start ( FlowSequenceStartToken , to_push = ' [ ' )
def fetch_flow_mapping_start ( self ) : self . fetch_flow_collection_start ( FlowMappingStartToken , to_push = ' { ' )
def fetch_flow_collection_start ( self , TokenClass , to_push ) : self . save_possible_simple_key ( ) ; self . flow_context . append ( to_push ) ; self . allow_simple_key = _A ; start_mark = self . reader . get_mark ( ) ; self . reader . forward ( ) ; end_mark = self . reader . get_mark ( ) ; self . tokens . append ( TokenClass ( start_mark , end_mark ) )
def fetch_flow_sequence_end ( self ) : self . fetch_flow_collection_end ( FlowSequenceEndToken )
def fetch_flow_mapping_end ( self ) : self . fetch_flow_collection_end ( FlowMappingEndToken )
def fetch_flow_collection_end ( self , TokenClass ) :
self . remove_possible_simple_key ( )
try : popped = self . flow_context . pop ( )
except IndexError : pass
self . allow_simple_key = _B ; start_mark = self . reader . get_mark ( ) ; self . reader . forward ( ) ; end_mark = self . reader . get_mark ( ) ; self . tokens . append ( TokenClass ( start_mark , end_mark ) )
def fetch_flow_entry ( self ) : self . allow_simple_key = _A ; self . remove_possible_simple_key ( ) ; start_mark = self . reader . get_mark ( ) ; self . reader . forward ( ) ; end_mark = self . reader . get_mark ( ) ; self . tokens . append ( FlowEntryToken ( start_mark , end_mark ) )
def fetch_block_entry ( self ) :
if not self . flow_level :
if not self . allow_simple_key : raise ScannerError ( _C , _C , ' sequence entries are not allowed here ' , self . reader . get_mark ( ) )
if self . add_indent ( self . reader . column ) : mark = self . reader . get_mark ( ) ; self . tokens . append ( BlockSequenceStartToken ( mark , mark ) )
else : 0
self . allow_simple_key = _A ; self . remove_possible_simple_key ( ) ; start_mark = self . reader . get_mark ( ) ; self . reader . forward ( ) ; end_mark = self . reader . get_mark ( ) ; self . tokens . append ( BlockEntryToken ( start_mark , end_mark ) )
def fetch_key ( self ) :
if not self . flow_level :
if not self . allow_simple_key : raise ScannerError ( _C , _C , ' mapping keys are not allowed here ' , self . reader . get_mark ( ) )
if self . add_indent ( self . reader . column ) : mark = self . reader . get_mark ( ) ; self . tokens . append ( BlockMappingStartToken ( mark , mark ) )
self . allow_simple_key = not self . flow_level ; self . remove_possible_simple_key ( ) ; start_mark = self . reader . get_mark ( ) ; self . reader . forward ( ) ; end_mark = self . reader . get_mark ( ) ; self . tokens . append ( KeyToken ( start_mark , end_mark ) )
def fetch_value ( self ) :
if self . flow_level in self . possible_simple_keys :
key = self . possible_simple_keys [ self . flow_level ] ; del self . possible_simple_keys [ self . flow_level ] ; self . tokens . insert ( key . token_number - self . tokens_taken , KeyToken ( key . mark , key . mark ) )
if not self . flow_level :
if self . add_indent ( key . column ) : self . tokens . insert ( key . token_number - self . tokens_taken , BlockMappingStartToken ( key . mark , key . mark ) )
self . allow_simple_key = _B
else :
if not self . flow_level :
if not self . allow_simple_key : raise ScannerError ( _C , _C , ' mapping values are not allowed here ' , self . reader . get_mark ( ) )
if not self . flow_level :
if self . add_indent ( self . reader . column ) : mark = self . reader . get_mark ( ) ; self . tokens . append ( BlockMappingStartToken ( mark , mark ) )
self . allow_simple_key = not self . flow_level ; self . remove_possible_simple_key ( )
start_mark = self . reader . get_mark ( ) ; self . reader . forward ( ) ; end_mark = self . reader . get_mark ( ) ; self . tokens . append ( ValueToken ( start_mark , end_mark ) )
def fetch_alias ( self ) : self . save_possible_simple_key ( ) ; self . allow_simple_key = _B ; self . tokens . append ( self . scan_anchor ( AliasToken ) )
def fetch_anchor ( self ) : self . save_possible_simple_key ( ) ; self . allow_simple_key = _B ; self . tokens . append ( self . scan_anchor ( AnchorToken ) )
def fetch_tag ( self ) : self . save_possible_simple_key ( ) ; self . allow_simple_key = _B ; self . tokens . append ( self . scan_tag ( ) )
def fetch_literal ( self ) : self . fetch_block_scalar ( style = ' | ' )
def fetch_folded ( self ) : self . fetch_block_scalar ( style = ' > ' )
def fetch_block_scalar ( self , style ) : self . allow_simple_key = _A ; self . remove_possible_simple_key ( ) ; self . tokens . append ( self . scan_block_scalar ( style ) )
def fetch_single ( self ) : self . fetch_flow_scalar ( style = " ' " )
def fetch_double ( self ) : self . fetch_flow_scalar ( style = ' " ' )
def fetch_flow_scalar ( self , style ) : self . save_possible_simple_key ( ) ; self . allow_simple_key = _B ; self . tokens . append ( self . scan_flow_scalar ( style ) )
def fetch_plain ( self ) : self . save_possible_simple_key ( ) ; self . allow_simple_key = _B ; self . tokens . append ( self . scan_plain ( ) )
def check_directive ( self ) :
if self . reader . column == 0 : return _A
def check_document_start ( self ) :
if self . reader . column == 0 :
if self . reader . prefix ( 3 ) == _L and self . reader . peek ( 3 ) in _THE_END_SPACE_TAB : return _A
def check_document_end ( self ) :
if self . reader . column == 0 :
if self . reader . prefix ( 3 ) == _M and self . reader . peek ( 3 ) in _THE_END_SPACE_TAB : return _A
def check_block_entry ( self ) : return self . reader . peek ( 1 ) in _THE_END_SPACE_TAB
def check_key ( self ) :
if bool ( self . flow_level ) : return _A
return self . reader . peek ( 1 ) in _THE_END_SPACE_TAB
def check_value ( self ) :
if self . scanner_processing_version == ( 1 , 1 ) :
if bool ( self . flow_level ) : return _A
elif bool ( self . flow_level ) :
if self . flow_context [ - 1 ] == ' [ ' :
if self . reader . peek ( 1 ) not in _THE_END_SPACE_TAB : return _B
elif self . tokens and isinstance ( self . tokens [ - 1 ] , ValueToken ) :
if self . reader . peek ( 1 ) not in _THE_END_SPACE_TAB : return _B
return _A
return self . reader . peek ( 1 ) in _THE_END_SPACE_TAB
def check_plain ( self ) :
A = ' \x00 \t \r \n \x85 \u2028 \u2029 -?:,[] {} #&*!|> \' " % @` ' ; srp = self . reader . peek ; ch = srp ( )
if self . scanner_processing_version == ( 1 , 1 ) : return ch not in A or srp ( 1 ) not in _THE_END_SPACE_TAB and ( ch == ' - ' or not self . flow_level and ch in ' ?: ' )
if ch not in A : return _A
ch1 = srp ( 1 )
if ch == ' - ' and ch1 not in _THE_END_SPACE_TAB : return _A
if ch == ' : ' and bool ( self . flow_level ) and ch1 not in _SPACE_TAB : return _A
return srp ( 1 ) not in _THE_END_SPACE_TAB and ( ch == ' - ' or not self . flow_level and ch in ' ?: ' )
def scan_to_next_token ( self ) :
srp = self . reader . peek ; srf = self . reader . forward
if self . reader . index == 0 and srp ( ) == _U : srf ( )
found = _B ; _the_end = _THE_END
while not found :
while srp ( ) == _D : srf ( )
if srp ( ) == _F :
while srp ( ) not in _the_end : srf ( )
if self . scan_line_break ( ) :
if not self . flow_level : self . allow_simple_key = _A
else : found = _A
def scan_directive ( self ) :
srp = self . reader . peek ; srf = self . reader . forward ; start_mark = self . reader . get_mark ( ) ; srf ( ) ; name = self . scan_directive_name ( start_mark ) ; value = _C
if name == ' YAML ' : value = self . scan_yaml_directive_value ( start_mark ) ; end_mark = self . reader . get_mark ( )
elif name == ' TAG ' : value = self . scan_tag_directive_value ( start_mark ) ; end_mark = self . reader . get_mark ( )
else :
end_mark = self . reader . get_mark ( )
while srp ( ) not in _THE_END : srf ( )
self . scan_directive_ignored_line ( start_mark ) ; return DirectiveToken ( name , value , start_mark , end_mark )
def scan_directive_name ( self , start_mark ) :
length = 0 ; srp = self . reader . peek ; ch = srp ( length )
while ' 0 ' < = ch < = ' 9 ' or ' A ' < = ch < = ' Z ' or ' a ' < = ch < = ' z ' or ch in ' -_:. ' : length + = 1 ; ch = srp ( length )
if not length : raise ScannerError ( _G , start_mark , _N % utf8 ( ch ) , self . reader . get_mark ( ) )
value = self . reader . prefix ( length ) ; self . reader . forward ( length ) ; ch = srp ( )
if ch not in _J : raise ScannerError ( _G , start_mark , _N % utf8 ( ch ) , self . reader . get_mark ( ) )
return value
def scan_yaml_directive_value ( self , start_mark ) :
srp = self . reader . peek ; srf = self . reader . forward
while srp ( ) == _D : srf ( )
major = self . scan_yaml_directive_number ( start_mark )
if srp ( ) != ' . ' : raise ScannerError ( _G , start_mark , " expected a digit or ' . ' , but found %r " % utf8 ( srp ( ) ) , self . reader . get_mark ( ) )
srf ( ) ; minor = self . scan_yaml_directive_number ( start_mark )
if srp ( ) not in _J : raise ScannerError ( _G , start_mark , " expected a digit or ' ' , but found %r " % utf8 ( srp ( ) ) , self . reader . get_mark ( ) )
self . yaml_version = major , minor ; return self . yaml_version
def scan_yaml_directive_number ( self , start_mark ) :
srp = self . reader . peek ; srf = self . reader . forward ; ch = srp ( )
if not ' 0 ' < = ch < = ' 9 ' : raise ScannerError ( _G , start_mark , ' expected a digit, but found %r ' % utf8 ( ch ) , self . reader . get_mark ( ) )
length = 0
while ' 0 ' < = srp ( length ) < = ' 9 ' : length + = 1
value = int ( self . reader . prefix ( length ) ) ; srf ( length ) ; return value
def scan_tag_directive_value ( self , start_mark ) :
srp = self . reader . peek ; srf = self . reader . forward
while srp ( ) == _D : srf ( )
handle = self . scan_tag_directive_handle ( start_mark )
while srp ( ) == _D : srf ( )
prefix = self . scan_tag_directive_prefix ( start_mark ) ; return handle , prefix
def scan_tag_directive_handle ( self , start_mark ) :
value = self . scan_tag_handle ( _V , start_mark ) ; ch = self . reader . peek ( )
if ch != _D : raise ScannerError ( _G , start_mark , _R % utf8 ( ch ) , self . reader . get_mark ( ) )
return value
def scan_tag_directive_prefix ( self , start_mark ) :
value = self . scan_tag_uri ( _V , start_mark ) ; ch = self . reader . peek ( )
if ch not in _J : raise ScannerError ( _G , start_mark , _R % utf8 ( ch ) , self . reader . get_mark ( ) )
return value
def scan_directive_ignored_line ( self , start_mark ) :
srp = self . reader . peek ; srf = self . reader . forward
while srp ( ) == _D : srf ( )
if srp ( ) == _F :
while srp ( ) not in _THE_END : srf ( )
ch = srp ( )
if ch not in _THE_END : raise ScannerError ( _G , start_mark , _W % utf8 ( ch ) , self . reader . get_mark ( ) )
self . scan_line_break ( )
def scan_anchor ( self , TokenClass ) :
A = ' while scanning an %s ' ; srp = self . reader . peek ; start_mark = self . reader . get_mark ( ) ; indicator = srp ( )
if indicator == ' * ' : name = ' alias '
else : name = ' anchor '
self . reader . forward ( ) ; length = 0 ; ch = srp ( length )
while check_anchorname_char ( ch ) : length + = 1 ; ch = srp ( length )
if not length : raise ScannerError ( A % ( name , ) , start_mark , _N % utf8 ( ch ) , self . reader . get_mark ( ) )
value = self . reader . prefix ( length ) ; self . reader . forward ( length )
if ch not in ' \x00 \t \r \n \x85 \u2028 \u2029 ?:,[] {} % @` ' : raise ScannerError ( A % ( name , ) , start_mark , _N % utf8 ( ch ) , self . reader . get_mark ( ) )
end_mark = self . reader . get_mark ( ) ; return TokenClass ( value , start_mark , end_mark )
def scan_tag ( self ) :
A = ' tag ' ; srp = self . reader . peek ; start_mark = self . reader . get_mark ( ) ; ch = srp ( 1 )
if ch == ' < ' :
handle = _C ; self . reader . forward ( 2 ) ; suffix = self . scan_tag_uri ( A , start_mark )
if srp ( ) != ' > ' : raise ScannerError ( ' while parsing a tag ' , start_mark , " expected ' > ' , but found %r " % utf8 ( srp ( ) ) , self . reader . get_mark ( ) )
self . reader . forward ( )
elif ch in _THE_END_SPACE_TAB : handle = _C ; suffix = _H ; self . reader . forward ( )
else :
length = 1 ; use_handle = _B
while ch not in _J :
if ch == _H : use_handle = _A ; break
length + = 1 ; ch = srp ( length )
handle = _H
if use_handle : handle = self . scan_tag_handle ( A , start_mark )
else : handle = _H ; self . reader . forward ( )
suffix = self . scan_tag_uri ( A , start_mark )
ch = srp ( )
if ch not in _J : raise ScannerError ( ' while scanning a tag ' , start_mark , _R % utf8 ( ch ) , self . reader . get_mark ( ) )
value = handle , suffix ; end_mark = self . reader . get_mark ( ) ; return TagToken ( value , start_mark , end_mark )
def scan_block_scalar ( self , style , rt = _B ) :
srp = self . reader . peek
if style == ' > ' : folded = _A
else : folded = _B
chunks = [ ] ; start_mark = self . reader . get_mark ( ) ; self . reader . forward ( ) ; chomping , increment = self . scan_block_scalar_indicators ( start_mark ) ; block_scalar_comment = self . scan_block_scalar_ignored_line ( start_mark ) ; min_indent = self . indent + 1
if increment is _C :
if min_indent < 1 and ( style not in ' |> ' or self . scanner_processing_version == ( 1 , 1 ) and getattr ( self . loader , ' top_level_block_style_scalar_no_indent_error_1_1 ' , _B ) ) : min_indent = 1
breaks , max_indent , end_mark = self . scan_block_scalar_indentation ( ) ; indent = max ( min_indent , max_indent )
else :
if min_indent < 1 : min_indent = 1
indent = min_indent + increment - 1 ; breaks , end_mark = self . scan_block_scalar_breaks ( indent )
line_break = ' '
while self . reader . column == indent and srp ( ) != _I :
chunks . extend ( breaks ) ; leading_non_space = srp ( ) not in _K ; length = 0
while srp ( length ) not in _THE_END : length + = 1
chunks . append ( self . reader . prefix ( length ) ) ; self . reader . forward ( length ) ; line_break = self . scan_line_break ( ) ; breaks , end_mark = self . scan_block_scalar_breaks ( indent )
if style in ' |> ' and min_indent == 0 :
if self . check_document_start ( ) or self . check_document_end ( ) : break
if self . reader . column == indent and srp ( ) != _I :
if rt and folded and line_break == _E : chunks . append ( ' \x07 ' )
if folded and line_break == _E and leading_non_space and srp ( ) not in _K :
if not breaks : chunks . append ( _D )
else : chunks . append ( line_break )
else : break
trailing = [ ]
if chomping in [ _C , _A ] : chunks . append ( line_break )
if chomping is _A : chunks . extend ( breaks )
elif chomping in [ _C , _B ] : trailing . extend ( breaks )
token = ScalarToken ( ' ' . join ( chunks ) , _B , start_mark , end_mark , style )
if block_scalar_comment is not _C : token . add_pre_comments ( [ block_scalar_comment ] )
if len ( trailing ) > 0 :
comment = self . scan_to_next_token ( )
while comment : trailing . append ( _D * comment [ 1 ] . column + comment [ 0 ] ) ; comment = self . scan_to_next_token ( )
comment_end_mark = self . reader . get_mark ( ) ; comment = CommentToken ( ' ' . join ( trailing ) , end_mark , comment_end_mark ) ; token . add_post_comment ( comment )
return token
def scan_block_scalar_indicators ( self , start_mark ) :
B = ' expected indentation indicator in the range 1-9, but found 0 ' ; A = ' 0123456789 ' ; srp = self . reader . peek ; chomping = _C ; increment = _C ; ch = srp ( )
if ch in ' +- ' :
if ch == ' + ' : chomping = _A
else : chomping = _B
self . reader . forward ( ) ; ch = srp ( )
if ch in A :
increment = int ( ch )
if increment == 0 : raise ScannerError ( _O , start_mark , B , self . reader . get_mark ( ) )
self . reader . forward ( )
elif ch in A :
increment = int ( ch )
if increment == 0 : raise ScannerError ( _O , start_mark , B , self . reader . get_mark ( ) )
self . reader . forward ( ) ; ch = srp ( )
if ch in ' +- ' :
if ch == ' + ' : chomping = _A
else : chomping = _B
self . reader . forward ( )
ch = srp ( )
if ch not in _J : raise ScannerError ( _O , start_mark , ' expected chomping or indentation indicators, but found %r ' % utf8 ( ch ) , self . reader . get_mark ( ) )
return chomping , increment
def scan_block_scalar_ignored_line ( self , start_mark ) :
srp = self . reader . peek ; srf = self . reader . forward ; prefix = ' ' ; comment = _C
while srp ( ) == _D : prefix + = srp ( ) ; srf ( )
if srp ( ) == _F :
comment = prefix
while srp ( ) not in _THE_END : comment + = srp ( ) ; srf ( )
ch = srp ( )
if ch not in _THE_END : raise ScannerError ( _O , start_mark , _W % utf8 ( ch ) , self . reader . get_mark ( ) )
self . scan_line_break ( ) ; return comment
def scan_block_scalar_indentation ( self ) :
srp = self . reader . peek ; srf = self . reader . forward ; chunks = [ ] ; max_indent = 0 ; end_mark = self . reader . get_mark ( )
while srp ( ) in _X :
if srp ( ) != _D : chunks . append ( self . scan_line_break ( ) ) ; end_mark = self . reader . get_mark ( )
else :
srf ( )
if self . reader . column > max_indent : max_indent = self . reader . column
return chunks , max_indent , end_mark
def scan_block_scalar_breaks ( self , indent ) :
chunks = [ ] ; srp = self . reader . peek ; srf = self . reader . forward ; end_mark = self . reader . get_mark ( )
while self . reader . column < indent and srp ( ) == _D : srf ( )
while srp ( ) in _P :
chunks . append ( self . scan_line_break ( ) ) ; end_mark = self . reader . get_mark ( )
while self . reader . column < indent and srp ( ) == _D : srf ( )
return chunks , end_mark
def scan_flow_scalar ( self , style ) :
if style == ' " ' : double = _A
else : double = _B
srp = self . reader . peek ; chunks = [ ] ; start_mark = self . reader . get_mark ( ) ; quote = srp ( ) ; self . reader . forward ( ) ; chunks . extend ( self . scan_flow_scalar_non_spaces ( double , start_mark ) )
while srp ( ) != quote : chunks . extend ( self . scan_flow_scalar_spaces ( double , start_mark ) ) ; chunks . extend ( self . scan_flow_scalar_non_spaces ( double , start_mark ) )
self . reader . forward ( ) ; end_mark = self . reader . get_mark ( ) ; return ScalarToken ( ' ' . join ( chunks ) , _B , start_mark , end_mark , style )
ESCAPE_REPLACEMENTS = { ' 0 ' : _I , ' a ' : ' \x07 ' , ' b ' : ' \x08 ' , ' t ' : ' \t ' , ' \t ' : ' \t ' , ' n ' : _E , ' v ' : ' \x0b ' , ' f ' : ' \x0c ' , ' r ' : ' \r ' , ' e ' : ' \x1b ' , _D : _D , ' " ' : ' " ' , ' / ' : ' / ' , ' \\ ' : ' \\ ' , ' N ' : ' \x85 ' , ' _ ' : ' \xa0 ' , ' L ' : ' \u2028 ' , ' P ' : ' \u2029 ' } ; ESCAPE_CODES = { ' x ' : 2 , ' u ' : 4 , ' U ' : 8 }
def scan_flow_scalar_non_spaces ( self , double , start_mark ) :
A = ' while scanning a double-quoted scalar ' ; chunks = [ ] ; srp = self . reader . peek ; srf = self . reader . forward
while _A :
length = 0
while srp ( length ) not in ' \n \' " \\ \x00 \t \r \x85 \u2028 \u2029 ' : length + = 1
if length != 0 : chunks . append ( self . reader . prefix ( length ) ) ; srf ( length )
ch = srp ( )
if not double and ch == " ' " and srp ( 1 ) == " ' " : chunks . append ( " ' " ) ; srf ( 2 )
elif double and ch == " ' " or not double and ch in ' " \\ ' : chunks . append ( ch ) ; srf ( )
elif double and ch == ' \\ ' :
srf ( ) ; ch = srp ( )
if ch in self . ESCAPE_REPLACEMENTS : chunks . append ( self . ESCAPE_REPLACEMENTS [ ch ] ) ; srf ( )
elif ch in self . ESCAPE_CODES :
length = self . ESCAPE_CODES [ ch ] ; srf ( )
for k in range ( length ) :
if srp ( k ) not in _Y : raise ScannerError ( A , start_mark , ' expected escape sequence of %d hexdecimal numbers, but found %r ' % ( length , utf8 ( srp ( k ) ) ) , self . reader . get_mark ( ) )
code = int ( self . reader . prefix ( length ) , 16 ) ; chunks . append ( unichr ( code ) ) ; srf ( length )
elif ch in ' \n \r \x85 \u2028 \u2029 ' : self . scan_line_break ( ) ; chunks . extend ( self . scan_flow_scalar_breaks ( double , start_mark ) )
else : raise ScannerError ( A , start_mark , ' found unknown escape character %r ' % utf8 ( ch ) , self . reader . get_mark ( ) )
else : return chunks
def scan_flow_scalar_spaces ( self , double , start_mark ) :
srp = self . reader . peek ; chunks = [ ] ; length = 0
while srp ( length ) in _K : length + = 1
whitespaces = self . reader . prefix ( length ) ; self . reader . forward ( length ) ; ch = srp ( )
if ch == _I : raise ScannerError ( _Z , start_mark , ' found unexpected end of stream ' , self . reader . get_mark ( ) )
elif ch in _P :
line_break = self . scan_line_break ( ) ; breaks = self . scan_flow_scalar_breaks ( double , start_mark )
if line_break != _E : chunks . append ( line_break )
elif not breaks : chunks . append ( _D )
chunks . extend ( breaks )
else : chunks . append ( whitespaces )
return chunks
def scan_flow_scalar_breaks ( self , double , start_mark ) :
chunks = [ ] ; srp = self . reader . peek ; srf = self . reader . forward
while _A :
prefix = self . reader . prefix ( 3 )
if ( prefix == _L or prefix == _M ) and srp ( 3 ) in _THE_END_SPACE_TAB : raise ScannerError ( _Z , start_mark , ' found unexpected document separator ' , self . reader . get_mark ( ) )
while srp ( ) in _K : srf ( )
if srp ( ) in _P : chunks . append ( self . scan_line_break ( ) )
else : return chunks
def scan_plain ( self ) :
srp = self . reader . peek ; srf = self . reader . forward ; chunks = [ ] ; start_mark = self . reader . get_mark ( ) ; end_mark = start_mark ; indent = self . indent + 1 ; spaces = [ ]
while _A :
length = 0
if srp ( ) == _F : break
while _A :
ch = srp ( length )
if ch == ' : ' and srp ( length + 1 ) not in _THE_END_SPACE_TAB : 0
elif ch == ' ? ' and self . scanner_processing_version != ( 1 , 1 ) : 0
elif ch in _THE_END_SPACE_TAB or not self . flow_level and ch == ' : ' and srp ( length + 1 ) in _THE_END_SPACE_TAB or self . flow_level and ch in ' ,:?[] {} ' : break
length + = 1
if self . flow_level and ch == ' : ' and srp ( length + 1 ) not in ' \x00 \t \r \n \x85 \u2028 \u2029 ,[] {} ' : srf ( length ) ; raise ScannerError ( ' while scanning a plain scalar ' , start_mark , " found unexpected ' : ' " , self . reader . get_mark ( ) , ' Please check http://pyyaml.org/wiki/YAMLColonInFlowContext for details. ' )
if length == 0 : break
self . allow_simple_key = _B ; chunks . extend ( spaces ) ; chunks . append ( self . reader . prefix ( length ) ) ; srf ( length ) ; end_mark = self . reader . get_mark ( ) ; spaces = self . scan_plain_spaces ( indent , start_mark )
if not spaces or srp ( ) == _F or not self . flow_level and self . reader . column < indent : break
token = ScalarToken ( ' ' . join ( chunks ) , _A , start_mark , end_mark )
if spaces and spaces [ 0 ] == _E : comment = CommentToken ( ' ' . join ( spaces ) + _E , start_mark , end_mark ) ; token . add_post_comment ( comment )
return token
def scan_plain_spaces ( self , indent , start_mark ) :
srp = self . reader . peek ; srf = self . reader . forward ; chunks = [ ] ; length = 0
while srp ( length ) in _D : length + = 1
whitespaces = self . reader . prefix ( length ) ; self . reader . forward ( length ) ; ch = srp ( )
if ch in _P :
line_break = self . scan_line_break ( ) ; self . allow_simple_key = _A ; prefix = self . reader . prefix ( 3 )
if ( prefix == _L or prefix == _M ) and srp ( 3 ) in _THE_END_SPACE_TAB : return
breaks = [ ]
while srp ( ) in _X :
if srp ( ) == _D : srf ( )
else :
breaks . append ( self . scan_line_break ( ) ) ; prefix = self . reader . prefix ( 3 )
if ( prefix == _L or prefix == _M ) and srp ( 3 ) in _THE_END_SPACE_TAB : return
if line_break != _E : chunks . append ( line_break )
elif not breaks : chunks . append ( _D )
chunks . extend ( breaks )
elif whitespaces : chunks . append ( whitespaces )
return chunks
def scan_tag_handle ( self , name , start_mark ) :
A = " expected ' ! ' , but found %r " ; srp = self . reader . peek ; ch = srp ( )
if ch != _H : raise ScannerError ( _Q % ( name , ) , start_mark , A % utf8 ( ch ) , self . reader . get_mark ( ) )
length = 1 ; ch = srp ( length )
if ch != _D :
while ' 0 ' < = ch < = ' 9 ' or ' A ' < = ch < = ' Z ' or ' a ' < = ch < = ' z ' or ch in ' -_ ' : length + = 1 ; ch = srp ( length )
if ch != _H : self . reader . forward ( length ) ; raise ScannerError ( _Q % ( name , ) , start_mark , A % utf8 ( ch ) , self . reader . get_mark ( ) )
length + = 1
value = self . reader . prefix ( length ) ; self . reader . forward ( length ) ; return value
def scan_tag_uri ( self , name , start_mark ) :
srp = self . reader . peek ; chunks = [ ] ; length = 0 ; ch = srp ( length )
while ' 0 ' < = ch < = ' 9 ' or ' A ' < = ch < = ' Z ' or ' a ' < = ch < = ' z ' or ch in " -;/?:@&=+$,_.!~* ' ()[] % " or self . scanner_processing_version > ( 1 , 1 ) and ch == _F :
if ch == ' % ' : chunks . append ( self . reader . prefix ( length ) ) ; self . reader . forward ( length ) ; length = 0 ; chunks . append ( self . scan_uri_escapes ( name , start_mark ) )
else : length + = 1
ch = srp ( length )
if length != 0 : chunks . append ( self . reader . prefix ( length ) ) ; self . reader . forward ( length ) ; length = 0
if not chunks : raise ScannerError ( ' while parsing a %s ' % ( name , ) , start_mark , ' expected URI, but found %r ' % utf8 ( ch ) , self . reader . get_mark ( ) )
return ' ' . join ( chunks )
def scan_uri_escapes ( self , name , start_mark ) :
A = ' utf-8 ' ; srp = self . reader . peek ; srf = self . reader . forward ; code_bytes = [ ] ; mark = self . reader . get_mark ( )
while srp ( ) == ' % ' :
srf ( )
for k in range ( 2 ) :
if srp ( k ) not in _Y : raise ScannerError ( _Q % ( name , ) , start_mark , ' expected URI escape sequence of 2 hexdecimal numbers, but found %r ' % utf8 ( srp ( k ) ) , self . reader . get_mark ( ) )
if PY3 : code_bytes . append ( int ( self . reader . prefix ( 2 ) , 16 ) )
else : code_bytes . append ( chr ( int ( self . reader . prefix ( 2 ) , 16 ) ) )
srf ( 2 )
try :
if PY3 : value = bytes ( code_bytes ) . decode ( A )
else : value = unicode ( b ' ' . join ( code_bytes ) , A )
except UnicodeDecodeError as exc : raise ScannerError ( _Q % ( name , ) , start_mark , str ( exc ) , mark )
return value
def scan_line_break ( self ) :
ch = self . reader . peek ( )
if ch in _a :
if self . reader . prefix ( 2 ) == ' \r \n ' : self . reader . forward ( 2 )
else : self . reader . forward ( )
return _E
elif ch in _b : self . reader . forward ( ) ; return ch
return ' '
class RoundTripScanner ( Scanner ) :
def check_token ( self , * choices ) :
while self . need_more_tokens ( ) : self . fetch_more_tokens ( )
self . _gather_comments ( )
if bool ( self . tokens ) :
if not choices : return _A
for choice in choices :
if isinstance ( self . tokens [ 0 ] , choice ) : return _A
return _B
def peek_token ( self ) :
while self . need_more_tokens ( ) : self . fetch_more_tokens ( )
self . _gather_comments ( )
if bool ( self . tokens ) : return self . tokens [ 0 ]
def _gather_comments ( self ) :
comments = [ ]
if not self . tokens : return comments
if isinstance ( self . tokens [ 0 ] , CommentToken ) : comment = self . tokens . pop ( 0 ) ; self . tokens_taken + = 1 ; comments . append ( comment )
while self . need_more_tokens ( ) :
self . fetch_more_tokens ( )
if not self . tokens : return comments
if isinstance ( self . tokens [ 0 ] , CommentToken ) : self . tokens_taken + = 1 ; comment = self . tokens . pop ( 0 ) ; comments . append ( comment )
if len ( comments ) > = 1 : self . tokens [ 0 ] . add_pre_comments ( comments )
if not self . done and len ( self . tokens ) < 2 : self . fetch_more_tokens ( )
def get_token ( self ) :
while self . need_more_tokens ( ) : self . fetch_more_tokens ( )
self . _gather_comments ( )
if bool ( self . tokens ) :
if len ( self . tokens ) > 1 and isinstance ( self . tokens [ 0 ] , ( ScalarToken , ValueToken , FlowSequenceEndToken , FlowMappingEndToken ) ) and isinstance ( self . tokens [ 1 ] , CommentToken ) and self . tokens [ 0 ] . end_mark . line == self . tokens [ 1 ] . start_mark . line :
self . tokens_taken + = 1 ; c = self . tokens . pop ( 1 ) ; self . fetch_more_tokens ( )
while len ( self . tokens ) > 1 and isinstance ( self . tokens [ 1 ] , CommentToken ) : self . tokens_taken + = 1 ; c1 = self . tokens . pop ( 1 ) ; c . value = c . value + _D * c1 . start_mark . column + c1 . value ; self . fetch_more_tokens ( )
self . tokens [ 0 ] . add_post_comment ( c )
elif len ( self . tokens ) > 1 and isinstance ( self . tokens [ 0 ] , ScalarToken ) and isinstance ( self . tokens [ 1 ] , CommentToken ) and self . tokens [ 0 ] . end_mark . line != self . tokens [ 1 ] . start_mark . line :
self . tokens_taken + = 1 ; c = self . tokens . pop ( 1 ) ; c . value = _E * ( c . start_mark . line - self . tokens [ 0 ] . end_mark . line ) + _D * c . start_mark . column + c . value ; self . tokens [ 0 ] . add_post_comment ( c ) ; self . fetch_more_tokens ( )
while len ( self . tokens ) > 1 and isinstance ( self . tokens [ 1 ] , CommentToken ) : self . tokens_taken + = 1 ; c1 = self . tokens . pop ( 1 ) ; c . value = c . value + _D * c1 . start_mark . column + c1 . value ; self . fetch_more_tokens ( )
self . tokens_taken + = 1 ; return self . tokens . pop ( 0 )
def fetch_comment ( self , comment ) :
value , start_mark , end_mark = comment
while value and value [ - 1 ] == _D : value = value [ : - 1 ]
self . tokens . append ( CommentToken ( value , start_mark , end_mark ) )
def scan_to_next_token ( self ) :
srp = self . reader . peek ; srf = self . reader . forward
if self . reader . index == 0 and srp ( ) == _U : srf ( )
found = _B
while not found :
while srp ( ) == _D : srf ( )
ch = srp ( )
if ch == _F :
start_mark = self . reader . get_mark ( ) ; comment = ch ; srf ( )
while ch not in _THE_END :
ch = srp ( )
if ch == _I : comment + = _E ; break
comment + = ch ; srf ( )
ch = self . scan_line_break ( )
while len ( ch ) > 0 : comment + = ch ; ch = self . scan_line_break ( )
end_mark = self . reader . get_mark ( )
if not self . flow_level : self . allow_simple_key = _A
return comment , start_mark , end_mark
if bool ( self . scan_line_break ( ) ) :
start_mark = self . reader . get_mark ( )
if not self . flow_level : self . allow_simple_key = _A
ch = srp ( )
if ch == _E :
start_mark = self . reader . get_mark ( ) ; comment = ' '
while ch : ch = self . scan_line_break ( empty_line = _A ) ; comment + = ch
if srp ( ) == _F : comment = comment . rsplit ( _E , 1 ) [ 0 ] + _E
end_mark = self . reader . get_mark ( ) ; return comment , start_mark , end_mark
else : found = _A
def scan_line_break ( self , empty_line = _B ) :
ch = self . reader . peek ( )
if ch in _a :
if self . reader . prefix ( 2 ) == ' \r \n ' : self . reader . forward ( 2 )
else : self . reader . forward ( )
return _E
elif ch in _b : self . reader . forward ( ) ; return ch
elif empty_line and ch in ' \t ' : self . reader . forward ( ) ; return ch
return ' '
def scan_block_scalar ( self , style , rt = _A ) : return Scanner . scan_block_scalar ( self , style , rt = rt )