|
|
from contextlib import suppress
|
|
|
import re
|
|
|
from typing import Iterable, NamedTuple
|
|
|
|
|
|
from .color import Color
|
|
|
from .style import Style
|
|
|
from .text import Text
|
|
|
|
|
|
re_ansi = re.compile(r"(?:\x1b\[(.*?)m)|(?:\x1b\](.*?)\x1b\\)")
|
|
|
re_csi = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
|
|
|
|
|
|
|
|
|
class _AnsiToken(NamedTuple):
|
|
|
"""Result of ansi tokenized string."""
|
|
|
|
|
|
plain: str = ""
|
|
|
sgr: str = ""
|
|
|
osc: str = ""
|
|
|
|
|
|
|
|
|
def _ansi_tokenize(ansi_text: str) -> Iterable[_AnsiToken]:
|
|
|
"""Tokenize a string in to plain text and ANSI codes.
|
|
|
|
|
|
Args:
|
|
|
ansi_text (str): A String containing ANSI codes.
|
|
|
|
|
|
Yields:
|
|
|
AnsiToken: A named tuple of (plain, sgr, osc)
|
|
|
"""
|
|
|
|
|
|
def remove_csi(ansi_text: str) -> str:
|
|
|
"""Remove unknown CSI sequences."""
|
|
|
return re_csi.sub("", ansi_text)
|
|
|
|
|
|
position = 0
|
|
|
for match in re_ansi.finditer(ansi_text):
|
|
|
start, end = match.span(0)
|
|
|
sgr, osc = match.groups()
|
|
|
if start > position:
|
|
|
yield _AnsiToken(remove_csi(ansi_text[position:start]))
|
|
|
yield _AnsiToken("", sgr, osc)
|
|
|
position = end
|
|
|
if position < len(ansi_text):
|
|
|
yield _AnsiToken(remove_csi(ansi_text[position:]))
|
|
|
|
|
|
|
|
|
SGR_STYLE_MAP = {
|
|
|
1: "bold",
|
|
|
2: "dim",
|
|
|
3: "italic",
|
|
|
4: "underline",
|
|
|
5: "blink",
|
|
|
6: "blink2",
|
|
|
7: "reverse",
|
|
|
8: "conceal",
|
|
|
9: "strike",
|
|
|
21: "underline2",
|
|
|
22: "not dim not bold",
|
|
|
23: "not italic",
|
|
|
24: "not underline",
|
|
|
25: "not blink",
|
|
|
26: "not blink2",
|
|
|
27: "not reverse",
|
|
|
28: "not conceal",
|
|
|
29: "not strike",
|
|
|
30: "color(0)",
|
|
|
31: "color(1)",
|
|
|
32: "color(2)",
|
|
|
33: "color(3)",
|
|
|
34: "color(4)",
|
|
|
35: "color(5)",
|
|
|
36: "color(6)",
|
|
|
37: "color(7)",
|
|
|
39: "default",
|
|
|
40: "on color(0)",
|
|
|
41: "on color(1)",
|
|
|
42: "on color(2)",
|
|
|
43: "on color(3)",
|
|
|
44: "on color(4)",
|
|
|
45: "on color(5)",
|
|
|
46: "on color(6)",
|
|
|
47: "on color(7)",
|
|
|
49: "on default",
|
|
|
51: "frame",
|
|
|
52: "encircle",
|
|
|
53: "overline",
|
|
|
54: "not frame not encircle",
|
|
|
55: "not overline",
|
|
|
90: "color(8)",
|
|
|
91: "color(9)",
|
|
|
92: "color(10)",
|
|
|
93: "color(11)",
|
|
|
94: "color(12)",
|
|
|
95: "color(13)",
|
|
|
96: "color(14)",
|
|
|
97: "color(15)",
|
|
|
100: "on color(8)",
|
|
|
101: "on color(9)",
|
|
|
102: "on color(10)",
|
|
|
103: "on color(11)",
|
|
|
104: "on color(12)",
|
|
|
105: "on color(13)",
|
|
|
106: "on color(14)",
|
|
|
107: "on color(15)",
|
|
|
}
|
|
|
|
|
|
|
|
|
class AnsiDecoder:
|
|
|
"""Translate ANSI code in to styled Text."""
|
|
|
|
|
|
def __init__(self) -> None:
|
|
|
self.style = Style.null()
|
|
|
|
|
|
def decode(self, terminal_text: str) -> Iterable[Text]:
|
|
|
"""Decode ANSI codes in an interable of lines.
|
|
|
|
|
|
Args:
|
|
|
lines (Iterable[str]): An iterable of lines of terminal output.
|
|
|
|
|
|
Yields:
|
|
|
Text: Marked up Text.
|
|
|
"""
|
|
|
for line in terminal_text.splitlines():
|
|
|
yield self.decode_line(line)
|
|
|
|
|
|
def decode_line(self, line: str) -> Text:
|
|
|
"""Decode a line containing ansi codes.
|
|
|
|
|
|
Args:
|
|
|
line (str): A line of terminal output.
|
|
|
|
|
|
Returns:
|
|
|
Text: A Text instance marked up according to ansi codes.
|
|
|
"""
|
|
|
from_ansi = Color.from_ansi
|
|
|
from_rgb = Color.from_rgb
|
|
|
_Style = Style
|
|
|
text = Text()
|
|
|
append = text.append
|
|
|
line = line.rsplit("\r", 1)[-1]
|
|
|
for token in _ansi_tokenize(line):
|
|
|
plain_text, sgr, osc = token
|
|
|
if plain_text:
|
|
|
append(plain_text, self.style or None)
|
|
|
elif osc:
|
|
|
if osc.startswith("8;"):
|
|
|
_params, semicolon, link = osc[2:].partition(";")
|
|
|
if semicolon:
|
|
|
self.style = self.style.update_link(link or None)
|
|
|
elif sgr:
|
|
|
# Translate in to semi-colon separated codes
|
|
|
# Ignore invalid codes, because we want to be lenient
|
|
|
codes = [
|
|
|
min(255, int(_code)) for _code in sgr.split(";") if _code.isdigit()
|
|
|
]
|
|
|
iter_codes = iter(codes)
|
|
|
for code in iter_codes:
|
|
|
if code == 0:
|
|
|
# reset
|
|
|
self.style = _Style.null()
|
|
|
elif code in SGR_STYLE_MAP:
|
|
|
# styles
|
|
|
self.style += _Style.parse(SGR_STYLE_MAP[code])
|
|
|
elif code == 38:
|
|
|
# Foreground
|
|
|
with suppress(StopIteration):
|
|
|
color_type = next(iter_codes)
|
|
|
if color_type == 5:
|
|
|
self.style += _Style.from_color(
|
|
|
from_ansi(next(iter_codes))
|
|
|
)
|
|
|
elif color_type == 2:
|
|
|
self.style += _Style.from_color(
|
|
|
from_rgb(
|
|
|
next(iter_codes),
|
|
|
next(iter_codes),
|
|
|
next(iter_codes),
|
|
|
)
|
|
|
)
|
|
|
elif code == 48:
|
|
|
# Background
|
|
|
with suppress(StopIteration):
|
|
|
color_type = next(iter_codes)
|
|
|
if color_type == 5:
|
|
|
self.style += _Style.from_color(
|
|
|
None, from_ansi(next(iter_codes))
|
|
|
)
|
|
|
elif color_type == 2:
|
|
|
self.style += _Style.from_color(
|
|
|
None,
|
|
|
from_rgb(
|
|
|
next(iter_codes),
|
|
|
next(iter_codes),
|
|
|
next(iter_codes),
|
|
|
),
|
|
|
)
|
|
|
|
|
|
return text
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": # pragma: no cover
|
|
|
import pty
|
|
|
import io
|
|
|
import os
|
|
|
import sys
|
|
|
|
|
|
decoder = AnsiDecoder()
|
|
|
|
|
|
stdout = io.BytesIO()
|
|
|
|
|
|
def read(fd):
|
|
|
data = os.read(fd, 1024)
|
|
|
stdout.write(data)
|
|
|
return data
|
|
|
|
|
|
pty.spawn(sys.argv[1:], read)
|
|
|
|
|
|
from .console import Console
|
|
|
|
|
|
console = Console(record=True)
|
|
|
|
|
|
stdout_result = stdout.getvalue().decode("utf-8")
|
|
|
print(stdout_result)
|
|
|
|
|
|
for line in decoder.decode(stdout_result):
|
|
|
console.print(line)
|
|
|
|
|
|
console.save_html("stdout.html")
|