-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use pybricksdev programmatically #64
Comments
FYI, the best place to ask questions about anything Pybricks is https://github.com/orgs/pybricks/discussions Here is a basic script to get you started: #!/usr/bin/env python3
# run this script on your computer
import asyncio
import contextlib
from pybricksdev.ble import find_device
from pybricksdev.connections.pybricks import PybricksHub
# this script must be in the current working directory and will be sent to the hub
MY_PROGRAM = "hub_program.py"
async def main():
async with contextlib.AsyncExitStack() as stack:
dev = await find_device()
hub = PybricksHub()
await hub.connect(dev)
stack.push_async_callback(hub.disconnect)
await hub.run(MY_PROGRAM, print_output=True, wait=True)
asyncio.run(main()) |
Oh thanks, I didn't know that. And with the script I get an error:
This means |
It looks like this is because bluetooth was turned off. How can I turn it on automatically? |
And I still have one question, is it also possible to send individual commands instead of entire files? |
It is, but the only available commands are to download a file or run a file that has already been downloaded to the hub or stop a running program. But you can write a program that runs on the hub that receives commands from the program running on your computer. |
Do you have an example of this? And on the code.pybricks.com website you can also execute individual commands, how does it work there? |
There are some examples at https://pybricks.com/projects/tutorials/wireless/hub-to-device/ |
Thank you, but can I somehow write the code shown in the resource you gave me into one single file? So first I would need to connect with the hub using this code, right? device = await find_device()
hub = PybricksHub()
await hub.connect(device) Then somehow upload the hub program (of course, the command doesn't work as shown): hub.upload_program('hub_program.py') Then start it: hub.run_program() And afterward, I would need to send the commands. Can't I do this somehow using: hub.write(b"fwd") instead of using BleakClient? |
Yes, you can do that using |
Thanks, but how can I upload the program to the hub, because hub.upload_program('hub_program.py') of course doesn't work? |
I thinks this is what you are looking for pybricksdev/pybricksdev/connections/pybricks.py Lines 431 to 447 in 11667cb
You will need to compile the program first. You can have a look at the source of |
Thank you so much for your help! When I was just reading through the source code, I came across that you can set wait to False as a parameter for hub.run(). That was exactly what I was looking for, but I seemed to have overlooked it. |
But I still have a few questions:
|
You can set the environment variable
Yes, this is the simplest way.
There is also a |
What am I doing wrong? main.pyimport asyncio
import os
from pybricksdev.ble import find_device
from pybricksdev.connections.pybricks import PybricksHub
os.environ["TQDM_DISABLE"] = "1"
async def connect_to_hub():
try:
device = await find_device()
hub = PybricksHub()
await hub.connect(device)
return hub
except asyncio.TimeoutError:
raise Exception("No device found")
except OSError:
raise Exception("Please turn bluetooth on")
async def main():
hub = await connect_to_hub()
await hub.run("hub.py", wait=False)
await hub.write_line("rev")
await asyncio.sleep(3)
await hub.write_line("fwd")
await asyncio.sleep(3)
await hub.write_line("bye")
print(await hub.read_line())
asyncio.run(main()) hub.pyfrom pybricks.pupdevices import Motor
from pybricks.parameters import Port
# Standard MicroPython modules
from usys import stdin, stdout
motor = Motor(Port.A)
while True:
# Read three bytes.
cmd = stdin.buffer.read()
# Decide what to do based on the command.
if cmd == b"fwd":
motor.dc(50)
elif cmd == b"rev":
motor.dc(-50)
elif cmd == b"bye":
break
# Send a response.
stdout.buffer.write(b"OK") My hub doesn't do anything anymore, I don't get an OK back and the TQDM progress bar is still displayed |
The hub program doesn't take into account newlines. Also, if everything is text-based, we should be able to use Also, the program on the computer should wait for something like an "OK" from the hub first before sending commands so that it knows that the hub program is loaded and running. |
Sadly it still does not work hub.pyfrom pybricks.pupdevices import Motor
from pybricks.parameters import Port
# Standard MicroPython modules
from usys import stdin, stdout
motor = Motor(Port.A)
while True:
# Read three bytes.
cmd = stdin.read()
# Decide what to do based on the command.
if cmd == b"fwd\n":
motor.dc(50)
elif cmd == b"rev\n":
motor.dc(-50)
elif cmd == b"bye\n":
break
# Send a response.
stdout.buffer.write(b"OK\n") |
It looks like this program still sends the OK at the end instead of at the beginning. Also, it looks like it is doing a |
Unfortunately it still doesn't work: main.pyimport asyncio
import os
from pybricksdev.ble import find_device
from pybricksdev.connections.pybricks import PybricksHub
os.environ["TQDM_DISABLE"] = "1"
async def connect_to_hub():
try:
device = await find_device()
hub = PybricksHub()
await hub.connect(device)
return hub
except asyncio.TimeoutError:
raise Exception("No device found")
except OSError:
raise Exception("Please turn bluetooth on")
async def main():
hub = await connect_to_hub()
await hub.run("hub.py", wait=False)
if await hub.read_line():
await hub.write_line("rev")
await asyncio.sleep(3)
await hub.write_line("fwd")
await asyncio.sleep(3)
await hub.write_line("bye")
print(await hub.read_line())
asyncio.run(main()) hub.pyfrom pybricks.pupdevices import Motor
from pybricks.parameters import Port
# Standard MicroPython modules
from usys import stdin, stdout
motor = Motor(Port.A)
# Program is ready
stdout.buffer.write(b"OK\n")
while True:
# Read three bytes.
cmd = stdin.readline()
# Decide what to do based on the command.
if cmd == b"fwd\n":
motor.dc(50)
elif cmd == b"rev\n":
motor.dc(-50)
elif cmd == b"bye\n":
break
# Send a response.
stdout.buffer.write(b"OK\n") |
The OK is still at the end instead of the beginning on the hub program and the PC program does not wait for OK before sending each command. |
Like this? main.pyimport asyncio
import os
from pybricksdev.ble import find_device
from pybricksdev.connections.pybricks import PybricksHub
os.environ["TQDM_DISABLE"] = "1"
async def connect_to_hub():
try:
device = await find_device()
hub = PybricksHub()
await hub.connect(device)
return hub
except asyncio.TimeoutError:
raise Exception("No device found")
except OSError:
raise Exception("Please turn bluetooth on")
async def main():
hub = await connect_to_hub()
await hub.run("hub.py", wait=False)
if await hub.read_line():
await hub.write_line("rev")
if await hub.read_line():
await asyncio.sleep(3)
await hub.write_line("fwd")
if await hub.read_line():
await asyncio.sleep(3)
await hub.write_line("bye")
print(await hub.read_line())
asyncio.run(main()) hub.pyfrom pybricks.pupdevices import Motor
from pybricks.parameters import Port
# Standard MicroPython modules
from usys import stdin, stdout
motor = Motor(Port.A)
# Program is ready
stdout.buffer.write(b"OK\n")
while True:
# Read three bytes.
cmd = stdin.readline()
# Decide what to do based on the command.
if cmd == b"fwd\n":
motor.dc(50)
elif cmd == b"rev\n":
motor.dc(-50)
elif cmd == b"bye\n":
break
# Send a response.
stdout.buffer.write(b"OK\n") |
This is how I would do it: main.pyimport asyncio
import contextlib
import os
# must before tqdm import!
os.environ["TQDM_DISABLE"] = "1"
from pybricksdev.ble import find_device
from pybricksdev.connections.pybricks import PybricksHub
async def connect_to_hub():
try:
device = await find_device()
hub = PybricksHub()
await hub.connect(device)
return hub
except asyncio.TimeoutError:
raise RuntimeError("No device found")
except OSError:
raise RuntimeError("Please turn bluetooth on")
async def send_command(hub: PybricksHub, cmd: str):
line = await asyncio.wait_for(hub.read_line(), timeout=5)
if line != "OK":
raise RuntimeError(f"Unexpected response: '{line}'")
await hub.write_line(cmd)
async def stop_if_running(hub: PybricksHub):
try:
await hub.stop_user_program()
except Exception:
# ignore error, e.g. if hub is already disconnected
pass
async def main():
async with contextlib.AsyncExitStack() as stack:
hub = await connect_to_hub()
stack.push_async_callback(hub.disconnect)
await hub.run("hub.py", print_output=False, wait=False)
stack.push_async_callback(stop_if_running, hub)
await send_command(hub, "rev")
await asyncio.sleep(3)
await send_command(hub, "fwd")
await asyncio.sleep(3)
await send_command(hub, "bye")
asyncio.run(main()) hub.pyfrom pybricks.pupdevices import Motor
from pybricks.parameters import Port
# Standard MicroPython modules
from usys import stdin
motor = Motor(Port.A)
while True:
# let PC know we are ready for a command
print("OK")
# wait for command from PC
cmd = stdin.readline().strip()
# Decide what to do based on the command.
if cmd == "fwd":
motor.dc(50)
elif cmd == "rev":
motor.dc(-50)
elif cmd == "bye":
break |
Hi, unfortunately I still have a problem with my code. Because when I run this, an error occurs in the console. main.pyimport asyncio
import os
os.environ["TQDM_DISABLE"] = "1"
from pybricksdev.ble import find_device
from pybricksdev.connections.pybricks import PybricksHub
class MyHub:
def __init__(self):
print("MyHub class initialized")
# Get response from hub
async def read_line(self):
return await self.hub.read_line()
# Send message to hub
async def write_line(self, value: str):
await self.hub.write_line(value)
# Connect with hub
async def connect(self):
try:
device = await find_device()
self.hub = PybricksHub()
await self.hub.connect(device)
print("Connected to hub successfully")
await self.hub.run("hub.py", print_output=False, wait=False)
# Wait for hub loading the program
response = await self.read_line()
if response != "OK":
print(f"Unexpected hub response: {response}")
await self.disconnect()
exit()
else:
print("Running hub script...")
except asyncio.TimeoutError:
print("Hub not found")
exit()
except OSError:
print("Bluetooth is turned off")
exit()
# Disconnect from hub
async def disconnect(self):
print("Disconnecting from hub...")
await self.hub.stop_user_program()
await self.hub.disconnect()
myhub = MyHub()
asyncio.run(myhub.connect())
asyncio.run(myhub.write_line("rev"))
asyncio.run(myhub.read_line())
asyncio.run(myhub.disconnect()) Error
|
You can only use one |
Unfortunately I get an error from the hub hub.pyfrom pybricks.pupdevices import Motor
from pybricks.parameters import Port, Color
from pybricks.hubs import InventorHub
from pybricks.tools import wait
from usys import stdin
import threading
import random
# [...]
# Setup hub
hub = InventorHub()
hub.light.off()
# Face animation
animatedFaceFrames = [
((100, 100, 0, 100, 100), (100, 100, 0, 100, 100), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (70, 70, 0, 70, 70), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (80, 80, 0, 80, 80), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((100, 100, 0, 100, 100), (100, 100, 0, 100, 100), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
]
def animatedFaceController():
while True:
for frame in animatedFaceFrames:
hub.display.icon(frame)
wait(50)
wait(random.uniform(1, 5))
animatedFaceThread = threading.Thread(target=animatedFaceController)
animatedFaceThread.start()
# [...] Error
What's wrong with multithreading? |
Pybricks does not support threading. The current beta version supports cooperative multitasking with coroutines (async/await). I don't think we have much documentation on it yet though. |
Can you show me the code with this cooperative multitasking? |
These are independent applications. One does not replace the other.
from pybricks.pupdevices import Motor
from pybricks.parameters import Port, Color
from pybricks.hubs import InventorHub
from pybricks.tools import wait, multitask, run_task
from usys import stdin
import random
# [...]
# Setup hub
hub = InventorHub()
hub.light.off()
# Face animation
animatedFaceFrames = [
((100, 100, 0, 100, 100), (100, 100, 0, 100, 100), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (70, 70, 0, 70, 70), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (80, 80, 0, 80, 80), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((100, 100, 0, 100, 100), (100, 100, 0, 100, 100), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
]
async def animatedFaceController():
while True:
for frame in animatedFaceFrames:
hub.display.icon(frame)
await wait(50)
await wait(random.uniform(1, 5))
async def other_task():
...
async def main():
await multitask(
animatedFaceController(),
other_task(),
)
run_task(main()) |
So can I just install the beta firmware while the normal firmware is still installed? And how can I then tell the hub to use the beta firmware? Or have I misunderstood something? |
You have to flash the firmware on the hub to change it. You can't have two firmwares installed at the same time. You can read the current version of the firmware on the hub in the status bar of the Pybricks Code/Pybricks Beta web apps or you can write a program. from pybricks import version
print(version) |
Thanks, but now I get the following error:
How can I generate a random number without the random module? |
Change |
I still get an error: Error
hub.pyfrom pybricks.pupdevices import Motor
from pybricks.parameters import Port, Color
from pybricks.hubs import InventorHub
from pybricks.tools import wait, multitask, run_task
from usys import stdin
import urandom
# Motor for moving left and right
motor_a = Motor(Port.A)
# Motor for the pen
motor_b = Motor(Port.B)
# Motor for moving up and down
motor_c = Motor(Port.C)
# Setup hub
hub = InventorHub()
hub.light.off()
# Robi face animation
animatedFaceFrames = [
((100, 100, 0, 100, 100), (100, 100, 0, 100, 100), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (70, 70, 0, 70, 70), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (80, 80, 0, 80, 80), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((100, 100, 0, 100, 100), (100, 100, 0, 100, 100), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
]
# Thread that does the face animation
async def animatedFaceController():
while True:
for frame in animatedFaceFrames:
hub.display.icon(frame)
wait(50)
wait(urandom.uniform(1, 5))
# Thread that does the main stuff, like writing
async def main():
while True:
# Let PC know we are ready for a command
print("OK")
# Read all bytes
command: str = stdin.readline().strip()
# [...]
# Controller for starting the threads
async def threadController():
await multitask(animatedFaceController(), main())
# Running the controller
run_task(threadController()) |
Blocking functions like I'm not really sure why you would be getting a memory error other than from memory fragmentation. Adding the required awaits should help with this since the main |
hub.pyfrom pybricks.pupdevices import Motor
from pybricks.parameters import Port, Color
from pybricks.hubs import InventorHub
from pybricks.tools import wait, multitask, run_task, read_input_byte
import urandom
# Motor for moving left and right
motor_a = Motor(Port.A)
# Motor for the pen
motor_b = Motor(Port.B)
# Motor for moving up and down
motor_c = Motor(Port.C)
# Setup hub
hub = InventorHub()
hub.light.off()
# Robi face animation
animatedFaceFrames = [
((100, 100, 0, 100, 100), (100, 100, 0, 100, 100), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (70, 70, 0, 70, 70), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (80, 80, 0, 80, 80), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((100, 100, 0, 100, 100), (100, 100, 0, 100, 100), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
]
# Thread that does the face animation
async def animatedFaceController():
while True:
for frame in animatedFaceFrames:
hub.display.icon(frame)
await wait(50)
await wait(urandom.uniform(1, 5))
# Thread that does the main stuff, like writing
async def main():
while True:
# Let PC know we are ready for a command
print("OK")
# Read all bytes
command: str = read_input_byte()
class Interpreter:
def parseCommand(command: str):
split = command.split("(")
cmd = split[0]
args = [item.strip() for item in split[1].rstrip(")").split(",")]
if cmd != "penUp" and cmd != "penDown" and cmd != "led_on" and cmd != "led_off":
# Not enough arguments are given
if len(args) < 2:
print(f"Two arguments are required in the function {cmd}")
return
# Arguments aren't a number
try:
args = [int(item) for item in args]
except ValueError:
print(f"Arguments of the function {cmd} must be a number")
return
if cmd == "penUp":
motor_b.run_target(target_angle=30, speed=100)
elif cmd == "penDown":
motor_b.run_target(target_angle=0, speed=100)
elif cmd == "moveUp":
motor_c.run_angle(rotation_angle=args[0], speed=args[1])
elif cmd == "moveDown":
motor_c.run_angle(rotation_angle=-args[0], speed=args[1])
elif cmd == "moveLeft":
motor_a.run_angle(rotation_angle=args[0], speed=args[1])
elif cmd == "moveRight":
motor_a.run_angle(rotation_angle=-args[0], speed=args[1])
elif cmd == "led_on":
hub.light.on(Color.CYAN)
elif cmd == "led_off":
hub.light.off()
else:
print(f"Unknown function in config: {cmd}")
if "|" in command:
print("Threading is not supported yet. Please remove the commands with the | symbol")
# split = command.split("|")
# for command in split:
# _thread.start_new_thread(Interpreter.parseCommand(command))
else:
Interpreter.parseCommand(command)
await wait(0)
# Controller for starting the threads
async def threadController():
await multitask(animatedFaceController(), main())
# Running the controller
run_task(threadController()) Error
|
Defining a class inside of a while loop doesn't seem like a good idea since it would redefine the class on each loop. Since you never create an instance of the class, it seems like it would be better to just define the
So to replace ...
CR = ord('\r')
LF = ord('\n')
...
async def read_line():
line = bytearray()
while True:
b = read_input_byte()
# no input was available yet
if b is None:
# give other tasks a chance to run
await wait(0)
# then check for another byte
continue
# ignore carriage return
if b == CR:
continue
# line feed indicates end of line
if b == LF:
return str(line, 'utf8')
line.append(b) Then use it like this: async def main():
while True:
# Let PC know we are ready for a command
print("OK")
# wait for a command from the PC
command = await read_line()
... |
ProblemAt least there are no more errors now, but the code still doesn't work as it should. The code no longer waits for the motor actions in await motor_b.run_target(target_angle=30, speed=100) hub.pyfrom pybricks.pupdevices import Motor
from pybricks.parameters import Port, Color
from pybricks.hubs import InventorHub
from pybricks.tools import wait, multitask, run_task, read_input_byte
import urandom
# Motor for moving left and right
motor_a = Motor(Port.A)
# Motor for the pen
motor_b = Motor(Port.B)
# Motor for moving up and down
motor_c = Motor(Port.C)
# Setup hub
hub = InventorHub()
hub.light.off()
# Robi face animation
animatedFaceFrames = [
((100, 100, 0, 100, 100), (100, 100, 0, 100, 100), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (70, 70, 0, 70, 70), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (80, 80, 0, 80, 80), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((100, 100, 0, 100, 100), (100, 100, 0, 100, 100), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
]
# Interprets the commands
class Interpreter:
def parseCommand(command: str):
split = command.split("(")
cmd = split[0]
args = [item.strip() for item in split[1].rstrip(")").split(",")]
if cmd != "penUp" and cmd != "penDown" and cmd != "led_on" and cmd != "led_off":
# Not enough arguments are given
if len(args) < 2:
print(f"Two arguments are required in the function {cmd}")
return
# Arguments aren't a number
try:
args = [int(item) for item in args]
except ValueError:
print(f"Arguments of the function {cmd} must be a number")
return
# Actions to be executed based on the incoming command
if cmd == "penUp":
motor_b.run_target(target_angle=30, speed=100)
elif cmd == "penDown":
motor_b.run_target(target_angle=0, speed=100)
elif cmd == "moveUp":
motor_c.run_angle(rotation_angle=args[0], speed=args[1])
elif cmd == "moveDown":
motor_c.run_angle(rotation_angle=-args[0], speed=args[1])
elif cmd == "moveLeft":
motor_a.run_angle(rotation_angle=args[0], speed=args[1])
elif cmd == "moveRight":
motor_a.run_angle(rotation_angle=-args[0], speed=args[1])
elif cmd == "led_on":
hub.light.on(Color.CYAN)
elif cmd == "led_off":
hub.light.off()
else:
print(f"Unknown function in config: {cmd}")
# Reads the message from the pc
async def read_line():
line = bytearray()
CR = ord("\r")
LF = ord("\n")
while True:
b = read_input_byte()
# no input was available yet
if b is None:
# give other tasks a chance to run
await wait(0)
# then check for another byte
continue
# ignore carriage return
if b == CR:
continue
# line feed indicates end of line
if b == LF:
return str(line, "utf8")
line.append(b)
# Thread that does the face animation
async def animatedFaceController():
while True:
for frame in animatedFaceFrames:
hub.display.icon(frame)
await wait(50)
await wait(urandom.uniform(1, 5))
# Thread that does the main stuff, like writing
async def main():
while True:
# Let PC know we are ready for a command
print("OK")
# Read the message from the pc
command: str = await read_line()
# Parse the message from the pc
if "|" in command:
print("Threading is not supported yet. Please remove the commands with the | symbol")
else:
Interpreter.parseCommand(command)
# Controller for starting the threads
async def threadController():
await multitask(animatedFaceController(), main())
# Running the controller
run_task(threadController()) |
Yes, any motor method that starts with |
Thanks! The LED on my hub is blinking orange, what does that mean? |
I want to run code on my hub using a Python script not the command, how is that possible?
The text was updated successfully, but these errors were encountered: