forked from lvgl/lv_binding_micropython
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlv_utils.py
196 lines (166 loc) · 5.6 KB
/
lv_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
##############################################################################
# Event Loop module: advancing tick count and scheduling lvgl task handler.
# Import after lvgl module.
# This should be imported and used by display driver.
# Display driver should first check if an event loop is already running.
#
# Usage example with SDL:
#
# SDL.init(auto_refresh=False)
# # Register SDL display driver.
# # Register SDL mouse driver
# event_loop = lv_utils.event_loop()
#
#
# asyncio example with SDL:
#
# SDL.init(auto_refresh=False)
# # Register SDL display driver.
# # Register SDL mouse driver
# event_loop = lv_utils.event_loop(asynchronous=True)
# asyncio.Loop.run_forever()
#
# asyncio example with ili9341:
#
# event_loop = lv_utils.event_loop(asynchronous=True) # Optional!
# self.disp = ili9341(asynchronous=True)
# asyncio.Loop.run_forever()
#
# MIT license; Copyright (c) 2021 Amir Gonnen
#
##############################################################################
import lvgl as lv
import micropython
import sys
# Try standard machine.Timer, or custom timer from lv_timer, if available
try:
from machine import Timer
except:
try:
from lv_timer import Timer
except:
if sys.platform != "darwin":
raise RuntimeError("Missing machine.Timer implementation!")
Timer = False
# Try to determine default timer id
default_timer_id = 0
if sys.platform == "pyboard":
# stm32 only supports SW timer -1
default_timer_id = -1
if sys.platform == "rp2":
# rp2 only supports SW timer -1
default_timer_id = -1
# Try importing asyncio, if available
try:
import asyncio
asyncio_available = True
except:
asyncio_available = False
##############################################################################
class event_loop:
_current_instance = None
def __init__(
self,
freq=25,
timer_id=default_timer_id,
max_scheduled=2,
refresh_cb=None,
asynchronous=False,
exception_sink=None,
):
if self.is_running():
raise RuntimeError("Event loop is already running!")
if not lv.is_initialized():
lv.init()
event_loop._current_instance = self
self.delay = 1000 // freq
self.refresh_cb = refresh_cb
self.exception_sink = (
exception_sink if exception_sink else self.default_exception_sink
)
self.asynchronous = asynchronous
if self.asynchronous:
if not asyncio_available:
raise RuntimeError(
"Cannot run asynchronous event loop. asyncio is not available!"
)
self.refresh_event = asyncio.Event()
self.refresh_task = asyncio.create_task(self.async_refresh())
self.timer_task = asyncio.create_task(self.async_timer())
else:
if Timer:
self.timer = Timer(timer_id)
self.timer.init(
mode=Timer.PERIODIC, period=self.delay, callback=self.timer_cb
)
self.task_handler_ref = self.task_handler # Allocation occurs here
self.max_scheduled = max_scheduled
self.scheduled = 0
def init_async(self):
self.refresh_event = asyncio.Event()
self.refresh_task = asyncio.create_task(self.async_refresh())
self.timer_task = asyncio.create_task(self.async_timer())
def deinit(self):
if self.asynchronous:
self.refresh_task.cancel()
self.timer_task.cancel()
else:
if Timer:
self.timer.deinit()
event_loop._current_instance = None
def disable(self):
self.scheduled += self.max_scheduled
def enable(self):
self.scheduled -= self.max_scheduled
@staticmethod
def is_running():
return event_loop._current_instance is not None
@staticmethod
def current_instance():
return event_loop._current_instance
def task_handler(self, _):
try:
if lv._nesting.value == 0:
lv.task_handler()
if self.refresh_cb:
self.refresh_cb()
self.scheduled -= 1
except Exception as e:
if self.exception_sink:
self.exception_sink(e)
def tick(self):
self.timer_cb(None)
def run(self):
if sys.platform == "darwin":
while True:
self.tick()
def timer_cb(self, t):
# Can be called in Interrupt context
# Use task_handler_ref since passing self.task_handler would cause allocation.
lv.tick_inc(self.delay)
if self.scheduled < self.max_scheduled:
try:
micropython.schedule(self.task_handler_ref, 0)
self.scheduled += 1
except:
pass
async def async_refresh(self):
while True:
await self.refresh_event.wait()
if lv._nesting.value == 0:
self.refresh_event.clear()
try:
lv.task_handler()
except Exception as e:
if self.exception_sink:
self.exception_sink(e)
if self.refresh_cb:
self.refresh_cb()
async def async_timer(self):
while True:
await asyncio.sleep_ms(self.delay)
lv.tick_inc(self.delay)
self.refresh_event.set()
def default_exception_sink(self, e):
sys.print_exception(e)
# event_loop.current_instance().deinit()