diff --git a/flower/app.py b/flower/app.py index 3427e098..569e1c78 100644 --- a/flower/app.py +++ b/flower/app.py @@ -62,7 +62,9 @@ def __init__(self, options=None, capp=None, events=None, enable_events=self.options.enable_events, io_loop=self.io_loop, max_workers_in_memory=self.options.max_workers, - max_tasks_in_memory=self.options.max_tasks) + max_tasks_in_memory=self.options.max_tasks, + limit_tasks_by_type=self.options.limit_tasks_by_type + ) self.started = False def start(self): diff --git a/flower/events.py b/flower/events.py index cd15d7a2..d133747a 100644 --- a/flower/events.py +++ b/flower/events.py @@ -1,4 +1,5 @@ import collections +import datetime import logging import shelve import threading @@ -115,7 +116,7 @@ class Events(threading.Thread): # pylint: disable=too-many-arguments def __init__(self, capp, io_loop, db=None, persistent=False, - enable_events=True, state_save_interval=0, + enable_events=True, state_save_interval=0, limit_tasks_by_type=None, **kwargs): threading.Thread.__init__(self) self.daemon = True @@ -128,6 +129,7 @@ def __init__(self, capp, io_loop, db=None, persistent=False, self.enable_events = enable_events self.state = None self.state_save_timer = None + self.limit_tasks_by_type = limit_tasks_by_type if self.persistent: logger.debug("Loading state from '%s'...", self.db) @@ -140,6 +142,10 @@ def __init__(self, capp, io_loop, db=None, persistent=False, self.state_save_timer = PeriodicCallback(self.save_state, state_save_interval) + if self.limit_tasks_by_type: + self.clear_tasks_by_type_timer = PeriodicCallback(self.clear_tasks_by_type, + 1000 * 60) + if not self.state: self.state = EventsState(**kwargs) @@ -156,6 +162,10 @@ def start(self): logger.debug("Starting state save timer...") self.state_save_timer.start() + if self.clear_tasks_by_type: + logger.debug("Starting clear tasks by type timer...") + self.clear_tasks_by_type_timer.start() + def stop(self): if self.enable_events: logger.debug("Stopping enable events timer...") @@ -208,3 +218,18 @@ def on_enable_events(self): def on_event(self, event): # Call EventsState.event in ioloop thread to avoid synchronization self.io_loop.add_callback(partial(self.state.event, event)) + + def clear_tasks_by_type(self): + now = datetime.datetime.now() + for obj in self.limit_tasks_by_type: + timedelta, max_count = obj.get('timedelta'), obj.get('max_count') + + # self.state.tasks_by_type are weakSet, so we could get task after deletion. + for count, (uuid, task) in enumerate(self.state._tasks_by_type(obj.get('type')), start=1): + if task.state != 'SUCCESS' or task.state == 'FAILURE' and obj.get('clear_failed'): + continue + + if timedelta and task.timestamp <= (now - timedelta).timestamp() or max_count and max_count < count: + del self.state.tasks[task.uuid] + self.state.rebuild_taskheap() + diff --git a/flower/options.py b/flower/options.py index 083d4b5b..521f55aa 100644 --- a/flower/options.py +++ b/flower/options.py @@ -31,6 +31,8 @@ help="maximum number of workers to keep in memory") define("max_tasks", type=int, default=100000, help="maximum number of tasks to keep in memory") +define("limit_tasks_by_type", type=list, default=[], + help="limits number of tasks to keep in memory by type (by max number or timedelta)") define("db", type=str, default='flower', help="flower database file") define("persistent", type=bool, default=False,