-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathslider.py
executable file
·134 lines (99 loc) · 4.22 KB
/
slider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import threading, logging, time
logger = logging.getLogger(__name__)
DOWN = 0
UP = 1
class SliderThread(threading.Thread):
def __init__(self, resolution = 0.0001, full_slide_time = 10):
threading.Thread.__init__(self)
self._shutdown = False
self._lock = threading.Lock()
self._cond = threading.Condition(self._lock)
self.resolution = resolution
self._wait = float(full_slide_time) * float(resolution)
self._slider_movements = dict()
self._active_movements = set()
def registerSlider(self, slider_id, up_function, down_function):
self._slider_movements[(slider_id, UP)] = up_function
self._slider_movements[(slider_id, DOWN)] = down_function
def start_movement(self, sliderId, direction):
try:
self._lock.acquire()
self._active_movements.add((sliderId, direction))
self._cond.notify()
finally:
self._lock.release()
def stop_movement(self, slider_id, direction):
try:
self._lock.acquire()
if (slider_id, direction) in self._active_movements:
self._active_movements.remove((slider_id, direction))
finally:
self._lock.release()
def shutdown(self):
try:
self._lock.acquire()
self._shutdown = True
self._cond.notify()
finally:
self._lock.release()
def is_running(self):
return not self._shutdown
def run(self):
logger.info("Starting slider thread ...")
while not self._shutdown:
# sleep until we have work to do
try:
self._cond.acquire()
if not self._active_movements and not self._shutdown:
self._cond.wait()
finally:
self._cond.release()
# work until nothing is do do and sleep again
while self._active_movements and not self._shutdown:
try:
self._cond.acquire()
for movement in self._active_movements:
self._slider_movements.get(movement)(self.resolution)
finally:
self._cond.release()
time.sleep(self._wait)
logger.info("Shutdown slider thread...")
if __name__ == '__main__':
# just for testing the sliderthread ...
class Mock:
def __init__(self):
self.light = 0.5
self.color = 0.3333
def light_up(self, step=0.01):
self.light = min(1.0, self.light + step)
print 'light + ' + str(step) + " = " + str(self.light)
def light_down(self, step=0.01):
self.light = max(0.02, self.light - step)
print 'light + ' + str(step) + " = " + str(self.light)
def color_up(self, step=0.01):
self.color = (self.color + step) % 1
print 'color + ' + str(step) + " = " + str(self.color)
def color_down(self, step=0.01):
self.color = (self.color - step) % 1
print 'color - ' + str(step) + " = " + str(self.color)
mock = Mock()
slider_buttons = { 'light_down' : ('light', DOWN),
'light_up' : ('light', UP),
'colorDown' : ('color', DOWN),
'colorUp' : ('color', UP) } ;
slider_thread = SliderThread()
slider_thread.registerSlider('light', mock.light_up, mock.light_down)
slider_thread.registerSlider('color', mock.color_up, mock.color_down)
slider_thread.start()
slider_thread.start_movement(*slider_buttons['light_up'])
time.sleep(2)
slider_thread.start_movement(*slider_buttons['colorDown'])
time.sleep(3)
slider_thread.stop_movement(*slider_buttons['colorDown'])
slider_thread.stop_movement(*slider_buttons['light_down']) # no active movement
time.sleep(2)
slider_thread.stop_movement(*slider_buttons['light_up'])
time.sleep(2)
slider_thread.start_movement(*slider_buttons['light_down'])
time.sleep(2)
slider_thread.shutdown()