You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
214 lines
7.4 KiB
214 lines
7.4 KiB
5 years ago
|
from __future__ import division
|
||
|
from tqdm import tqdm, trange, TMonitor
|
||
|
from tests_tqdm import with_setup, pretest, posttest, SkipTest, \
|
||
|
StringIO, closing
|
||
|
from tests_tqdm import DiscreteTimer, cpu_timify
|
||
|
from tests_perf import retry_on_except
|
||
|
|
||
|
import sys
|
||
|
from time import sleep
|
||
|
from threading import Event
|
||
|
|
||
|
|
||
|
class FakeSleep(object):
|
||
|
"""Wait until the discrete timer reached the required time"""
|
||
|
def __init__(self, dtimer):
|
||
|
self.dtimer = dtimer
|
||
|
|
||
|
def sleep(self, t):
|
||
|
end = t + self.dtimer.t
|
||
|
while self.dtimer.t < end:
|
||
|
sleep(0.0000001) # sleep a bit to interrupt (instead of pass)
|
||
|
|
||
|
|
||
|
class FakeTqdm(object):
|
||
|
_instances = []
|
||
|
|
||
|
|
||
|
def make_create_fake_sleep_event(sleep):
|
||
|
def wait(self, timeout=None):
|
||
|
if timeout is not None:
|
||
|
sleep(timeout)
|
||
|
return self.is_set()
|
||
|
|
||
|
def create_fake_sleep_event():
|
||
|
event = Event()
|
||
|
event.wait = wait
|
||
|
return event
|
||
|
|
||
|
return create_fake_sleep_event
|
||
|
|
||
|
|
||
|
def incr(x):
|
||
|
return x + 1
|
||
|
|
||
|
|
||
|
def incr_bar(x):
|
||
|
with closing(StringIO()) as our_file:
|
||
|
for _ in trange(x, lock_args=(False,), file=our_file):
|
||
|
pass
|
||
|
return incr(x)
|
||
|
|
||
|
|
||
|
@with_setup(pretest, posttest)
|
||
|
def test_monitor_thread():
|
||
|
"""Test dummy monitoring thread"""
|
||
|
maxinterval = 10
|
||
|
|
||
|
# Setup a discrete timer
|
||
|
timer = DiscreteTimer()
|
||
|
TMonitor._time = timer.time
|
||
|
# And a fake sleeper
|
||
|
sleeper = FakeSleep(timer)
|
||
|
TMonitor._event = make_create_fake_sleep_event(sleeper.sleep)
|
||
|
|
||
|
# Instanciate the monitor
|
||
|
monitor = TMonitor(FakeTqdm, maxinterval)
|
||
|
# Test if alive, then killed
|
||
|
assert monitor.report()
|
||
|
monitor.exit()
|
||
|
timer.sleep(maxinterval * 2) # need to go out of the sleep to die
|
||
|
assert not monitor.report()
|
||
|
# assert not monitor.is_alive() # not working dunno why, thread not killed
|
||
|
del monitor
|
||
|
|
||
|
|
||
|
@with_setup(pretest, posttest)
|
||
|
def test_monitoring_and_cleanup():
|
||
|
"""Test for stalled tqdm instance and monitor deletion"""
|
||
|
# Note: should fix miniters for these tests, else with dynamic_miniters
|
||
|
# it's too complicated to handle with monitoring update and maxinterval...
|
||
|
maxinterval = 2
|
||
|
|
||
|
total = 1000
|
||
|
# Setup a discrete timer
|
||
|
timer = DiscreteTimer()
|
||
|
# And a fake sleeper
|
||
|
sleeper = FakeSleep(timer)
|
||
|
# Setup TMonitor to use the timer
|
||
|
TMonitor._time = timer.time
|
||
|
TMonitor._event = make_create_fake_sleep_event(sleeper.sleep)
|
||
|
# Set monitor interval
|
||
|
tqdm.monitor_interval = maxinterval
|
||
|
with closing(StringIO()) as our_file:
|
||
|
with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1,
|
||
|
maxinterval=maxinterval) as t:
|
||
|
cpu_timify(t, timer)
|
||
|
# Do a lot of iterations in a small timeframe
|
||
|
# (smaller than monitor interval)
|
||
|
timer.sleep(maxinterval / 2) # monitor won't wake up
|
||
|
t.update(500)
|
||
|
# check that our fixed miniters is still there
|
||
|
assert t.miniters == 500
|
||
|
# Then do 1 it after monitor interval, so that monitor kicks in
|
||
|
timer.sleep(maxinterval * 2)
|
||
|
t.update(1)
|
||
|
# Wait for the monitor to get out of sleep's loop and update tqdm..
|
||
|
timeend = timer.time()
|
||
|
while not (t.monitor.woken >= timeend and t.miniters == 1):
|
||
|
timer.sleep(1) # Force monitor to wake up if it woken too soon
|
||
|
sleep(0.000001) # sleep to allow interrupt (instead of pass)
|
||
|
assert t.miniters == 1 # check that monitor corrected miniters
|
||
|
# Note: at this point, there may be a race condition: monitor saved
|
||
|
# current woken time but timer.sleep() happen just before monitor
|
||
|
# sleep. To fix that, either sleep here or increase time in a loop
|
||
|
# to ensure that monitor wakes up at some point.
|
||
|
|
||
|
# Try again but already at miniters = 1 so nothing will be done
|
||
|
timer.sleep(maxinterval * 2)
|
||
|
t.update(2)
|
||
|
timeend = timer.time()
|
||
|
while t.monitor.woken < timeend:
|
||
|
timer.sleep(1) # Force monitor to wake up if it woken too soon
|
||
|
sleep(0.000001)
|
||
|
# Wait for the monitor to get out of sleep's loop and update tqdm..
|
||
|
assert t.miniters == 1 # check that monitor corrected miniters
|
||
|
|
||
|
# Check that class var monitor is deleted if no instance left
|
||
|
tqdm.monitor_interval = 10
|
||
|
assert tqdm.monitor is None
|
||
|
|
||
|
|
||
|
@with_setup(pretest, posttest)
|
||
|
def test_monitoring_multi():
|
||
|
"""Test on multiple bars, one not needing miniters adjustment"""
|
||
|
# Note: should fix miniters for these tests, else with dynamic_miniters
|
||
|
# it's too complicated to handle with monitoring update and maxinterval...
|
||
|
maxinterval = 2
|
||
|
|
||
|
total = 1000
|
||
|
# Setup a discrete timer
|
||
|
timer = DiscreteTimer()
|
||
|
# And a fake sleeper
|
||
|
sleeper = FakeSleep(timer)
|
||
|
# Setup TMonitor to use the timer
|
||
|
TMonitor._time = timer.time
|
||
|
TMonitor._event = make_create_fake_sleep_event(sleeper.sleep)
|
||
|
# Set monitor interval
|
||
|
tqdm.monitor_interval = maxinterval
|
||
|
with closing(StringIO()) as our_file:
|
||
|
with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1,
|
||
|
maxinterval=maxinterval) as t1:
|
||
|
# Set high maxinterval for t2 so monitor does not need to adjust it
|
||
|
with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1,
|
||
|
maxinterval=1E5) as t2:
|
||
|
cpu_timify(t1, timer)
|
||
|
cpu_timify(t2, timer)
|
||
|
# Do a lot of iterations in a small timeframe
|
||
|
timer.sleep(maxinterval / 2)
|
||
|
t1.update(500)
|
||
|
t2.update(500)
|
||
|
assert t1.miniters == 500
|
||
|
assert t2.miniters == 500
|
||
|
# Then do 1 it after monitor interval, so that monitor kicks in
|
||
|
timer.sleep(maxinterval * 2)
|
||
|
t1.update(1)
|
||
|
t2.update(1)
|
||
|
# Wait for the monitor to get out of sleep and update tqdm
|
||
|
timeend = timer.time()
|
||
|
while not (t1.monitor.woken >= timeend and t1.miniters == 1):
|
||
|
timer.sleep(1)
|
||
|
sleep(0.000001)
|
||
|
assert t1.miniters == 1 # check that monitor corrected miniters
|
||
|
assert t2.miniters == 500 # check that t2 was not adjusted
|
||
|
|
||
|
# Check that class var monitor is deleted if no instance left
|
||
|
tqdm.monitor_interval = 10
|
||
|
assert tqdm.monitor is None
|
||
|
|
||
|
|
||
|
@with_setup(pretest, posttest)
|
||
|
def test_imap():
|
||
|
"""Test multiprocessing.Pool"""
|
||
|
try:
|
||
|
from multiprocessing import Pool
|
||
|
except ImportError:
|
||
|
raise SkipTest
|
||
|
|
||
|
pool = Pool()
|
||
|
res = list(tqdm(pool.imap(incr, range(100)), disable=True))
|
||
|
assert res[-1] == 100
|
||
|
|
||
|
|
||
|
# py2: locks won't propagate to incr_bar so may cause `AttributeError`
|
||
|
@retry_on_except(n=3 if sys.version_info < (3,) else 1)
|
||
|
@with_setup(pretest, posttest)
|
||
|
def test_threadpool():
|
||
|
"""Test concurrent.futures.ThreadPoolExecutor"""
|
||
|
try:
|
||
|
from concurrent.futures import ThreadPoolExecutor
|
||
|
from threading import RLock
|
||
|
except ImportError:
|
||
|
raise SkipTest
|
||
|
|
||
|
tqdm.set_lock(RLock())
|
||
|
with ThreadPoolExecutor(8) as pool:
|
||
|
try:
|
||
|
res = list(tqdm(pool.map(incr_bar, range(100)), disable=True))
|
||
|
except AttributeError:
|
||
|
if sys.version_info < (3,):
|
||
|
raise SkipTest
|
||
|
else:
|
||
|
raise
|
||
|
assert sum(res) == sum(range(1, 101))
|