impuls/lib/python3.11/site-packages/autopage/tests/test_end_to_end.py

313 lines
11 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import unittest
import sys
from autopage.tests import isolation
import typing
import autopage
MAX_LINES_PER_PAGE = isolation.LINES - 1
class finite:
def __init__(self, num_lines: int, **kwargs: typing.Any):
self.num_lines = num_lines
self.kwargs = kwargs
def __call__(self) -> int:
ap = autopage.AutoPager(pager_command=autopage.command.Less(),
**self.kwargs)
with ap as out:
for i in range(self.num_lines):
print(i, file=out)
return ap.exit_code()
def infinite() -> int:
ap = autopage.AutoPager(pager_command=autopage.command.Less())
try:
with ap as out:
for i in itertools.count():
print(i, file=out)
except KeyboardInterrupt:
pass
return ap.exit_code()
def from_stdin() -> int:
ap = autopage.AutoPager(pager_command=autopage.command.Less(),
line_buffering=autopage.line_buffer_from_input())
with ap as out:
try:
for line in sys.stdin:
print(line.rstrip(), file=out)
except KeyboardInterrupt:
pass
return ap.exit_code()
def with_exception() -> int:
class MyException(Exception):
pass
ap = autopage.AutoPager(pager_command=autopage.command.Less())
try:
with ap as out:
for i in range(50):
print(i, file=out)
raise MyException()
except MyException:
pass
return ap.exit_code()
def with_stderr_output() -> int:
ap = autopage.AutoPager(pager_command=autopage.command.Less())
with ap as out:
for i in range(50):
print(i, file=out)
print("Hello world", file=sys.stderr)
return ap.exit_code()
class InvokePagerTest(unittest.TestCase):
def test_page_to_end(self) -> None:
num_lines = 100
with isolation.isolate(finite(num_lines)) as env:
pager = isolation.PagerControl(env)
lines = num_lines
while lines > 0:
expected = min(lines, MAX_LINES_PER_PAGE)
self.assertEqual(expected, pager.advance())
lines -= expected
self.assertEqual(0, pager.advance())
self.assertEqual(0, pager.advance())
self.assertEqual(0, pager.quit())
self.assertEqual(num_lines, pager.total_lines())
self.assertFalse(env.error_output())
self.assertEqual(0, env.exit_code())
def test_page_to_middle(self) -> None:
num_lines = 100
with isolation.isolate(finite(num_lines)) as env:
pager = isolation.PagerControl(env)
self.assertEqual(MAX_LINES_PER_PAGE, pager.advance())
self.assertEqual(MAX_LINES_PER_PAGE, pager.advance())
self.assertEqual(MAX_LINES_PER_PAGE, pager.quit())
self.assertFalse(env.error_output())
self.assertEqual(0, env.exit_code())
def test_exit_pager_early(self) -> None:
with isolation.isolate(infinite) as env:
pager = isolation.PagerControl(env)
self.assertEqual(MAX_LINES_PER_PAGE, pager.advance())
self.assertEqual(MAX_LINES_PER_PAGE, pager.quit())
self.assertFalse(env.error_output())
self.assertEqual(141, env.exit_code())
def test_interrupt_early(self) -> None:
with isolation.isolate(infinite) as env:
pager = isolation.PagerControl(env)
self.assertEqual(MAX_LINES_PER_PAGE, pager.advance())
env.interrupt()
while pager.advance():
continue
pager.quit()
self.assertGreater(pager.total_lines(), MAX_LINES_PER_PAGE)
self.assertFalse(env.error_output())
self.assertEqual(130, env.exit_code())
def test_interrupt_early_quit(self) -> None:
with isolation.isolate(infinite) as env:
pager = isolation.PagerControl(env)
self.assertEqual(MAX_LINES_PER_PAGE, pager.advance())
env.interrupt()
pager.quit()
self.assertGreater(pager.total_lines(), MAX_LINES_PER_PAGE)
self.assertFalse(env.error_output())
self.assertEqual(130, env.exit_code())
def test_interrupt_in_middle_after_complete(self) -> None:
num_lines = 100
with isolation.isolate(finite(num_lines)) as env:
pager = isolation.PagerControl(env)
self.assertEqual(MAX_LINES_PER_PAGE, pager.advance())
for i in range(100):
env.interrupt()
self.assertEqual(MAX_LINES_PER_PAGE, pager.quit())
self.assertFalse(env.error_output())
self.assertEqual(0, env.exit_code())
def test_interrupt_at_end_after_complete(self) -> None:
num_lines = 100
with isolation.isolate(finite(num_lines)) as env:
pager = isolation.PagerControl(env)
while pager.advance():
continue
self.assertEqual(num_lines, pager.total_lines())
for i in range(100):
env.interrupt()
self.assertEqual(0, pager.quit())
self.assertFalse(env.error_output())
self.assertEqual(0, env.exit_code())
def test_short_output(self) -> None:
num_lines = 10
with isolation.isolate(finite(num_lines)) as env:
pager = isolation.PagerControl(env)
for i, l in enumerate(pager.read_lines(num_lines)):
self.assertEqual(str(i), l.rstrip())
self.assertFalse(env.error_output())
self.assertEqual(0, env.exit_code())
def test_short_output_reset(self) -> None:
num_lines = 10
with isolation.isolate(finite(num_lines, reset_on_exit=True)) as env:
pager = isolation.PagerControl(env)
self.assertEqual(num_lines, pager.quit())
self.assertFalse(env.error_output())
self.assertEqual(0, env.exit_code())
def test_short_streaming_output(self) -> None:
num_lines = 10
with isolation.isolate(from_stdin, stdin_pipe=True) as env:
pager = isolation.PagerControl(env)
with env.stdin_pipe() as in_pipe:
for i in range(num_lines):
print(i, file=in_pipe)
for i, l in enumerate(pager.read_lines(num_lines)):
self.assertEqual(i, int(l))
env.interrupt()
self.assertEqual(0, pager.quit())
self.assertFalse(env.error_output())
self.assertEqual(0, env.exit_code())
def test_exception(self) -> None:
num_lines = 50
with isolation.isolate(with_exception) as env:
pager = isolation.PagerControl(env)
lines = num_lines
while lines > 0:
expected = min(lines, MAX_LINES_PER_PAGE)
self.assertEqual(expected, pager.advance())
lines -= expected
self.assertEqual(0, pager.advance())
self.assertEqual(0, pager.advance())
self.assertEqual(0, pager.quit())
self.assertEqual(num_lines, pager.total_lines())
self.assertFalse(env.error_output())
self.assertEqual(1, env.exit_code())
def test_stderr_output(self) -> None:
num_lines = 50
with isolation.isolate(with_stderr_output) as env:
pager = isolation.PagerControl(env)
lines = num_lines
while lines > 0:
expected = min(lines, MAX_LINES_PER_PAGE)
self.assertEqual(expected, pager.advance())
lines -= expected
self.assertEqual(0, pager.advance())
self.assertEqual(0, pager.advance())
self.assertEqual(0, pager.quit())
self.assertEqual(num_lines, pager.total_lines())
self.assertEqual('Hello world\n', env.error_output())
self.assertEqual(0, env.exit_code())
class NoPagerTest(unittest.TestCase):
def test_pipe_output_to_end(self) -> None:
num_lines = 100
with isolation.isolate(finite(num_lines),
stdout_pipe=True) as env:
with env.stdout_pipe() as out:
output = out.readlines()
self.assertEqual(num_lines, len(output))
self.assertFalse(env.error_output())
self.assertEqual(0, env.exit_code())
def test_exit_early(self) -> None:
with isolation.isolate(infinite, stdout_pipe=True) as env:
with env.stdout_pipe() as out:
for i in range(500):
out.readline()
self.assertFalse(env.error_output())
self.assertEqual(141, env.exit_code())
def test_exit_early_buffered(self) -> None:
num_lines = 10
with isolation.isolate(from_stdin,
stdin_pipe=True, stdout_pipe=True) as env:
with env.stdin_pipe() as in_pipe:
for i in range(num_lines):
print(i, file=in_pipe)
with env.stdout_pipe():
# Close output without reading contents of buffer
pass
self.assertFalse(env.error_output())
self.assertEqual(141, env.exit_code())
def test_interrupt_early(self) -> None:
with isolation.isolate(infinite, stdout_pipe=True) as env:
env.interrupt()
with env.stdout_pipe() as out:
output = out.readlines()
self.assertGreater(len(output), 0)
self.assertFalse(env.error_output())
self.assertEqual(130, env.exit_code())
def test_short_streaming_output(self) -> None:
num_lines = 10
with isolation.isolate(from_stdin,
stdin_pipe=True, stdout_pipe=True) as env:
with env.stdin_pipe() as in_pipe:
for i in range(num_lines):
print(i, file=in_pipe)
with env.stdout_pipe() as out:
for i in range(num_lines):
self.assertEqual(i, int(out.readline()))
env.interrupt()
self.assertEqual(0, len(out.readlines()))
self.assertFalse(env.error_output())
self.assertEqual(0, env.exit_code())