-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathcrazyflie.py
80 lines (64 loc) · 3.43 KB
/
crazyflie.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#!/usr/bin/env python
import rospy
import numpy as np
from crazyflie_driver.srv import *
from crazyflie_driver.msg import TrajectoryPolynomialPiece
def arrayToGeometryPoint(a):
return geometry_msgs.msg.Point(a[0], a[1], a[2])
class Crazyflie:
def __init__(self, prefix, tf):
self.prefix = prefix
self.tf = tf
rospy.wait_for_service(prefix + "/set_group_mask")
self.setGroupMaskService = rospy.ServiceProxy(prefix + "/set_group_mask", SetGroupMask)
rospy.wait_for_service(prefix + "/takeoff")
self.takeoffService = rospy.ServiceProxy(prefix + "/takeoff", Takeoff)
rospy.wait_for_service(prefix + "/land")
self.landService = rospy.ServiceProxy(prefix + "/land", Land)
rospy.wait_for_service(prefix + "/stop")
self.stopService = rospy.ServiceProxy(prefix + "/stop", Stop)
rospy.wait_for_service(prefix + "/go_to")
self.goToService = rospy.ServiceProxy(prefix + "/go_to", GoTo)
rospy.wait_for_service(prefix + "/upload_trajectory")
self.uploadTrajectoryService = rospy.ServiceProxy(prefix + "/upload_trajectory", UploadTrajectory)
rospy.wait_for_service(prefix + "/start_trajectory")
self.startTrajectoryService = rospy.ServiceProxy(prefix + "/start_trajectory", StartTrajectory)
rospy.wait_for_service(prefix + "/update_params")
self.updateParamsService = rospy.ServiceProxy(prefix + "/update_params", UpdateParams)
def setGroup(self, groupMask):
self.setGroupMaskService(groupMask)
def takeoff(self, targetHeight, duration, groupMask = 0):
self.takeoffService(groupMask, targetHeight, rospy.Duration.from_sec(duration))
def land(self, targetHeight, duration, groupMask = 0):
self.landService(groupMask, targetHeight, rospy.Duration.from_sec(duration))
def stop(self, groupMask = 0):
self.stopService(groupMask)
def goTo(self, goal, yaw, duration, relative = False, groupMask = 0):
gp = arrayToGeometryPoint(goal)
self.goToService(groupMask, relative, gp, yaw, rospy.Duration.from_sec(duration))
def uploadTrajectory(self, trajectoryId, pieceOffset, trajectory):
pieces = []
for poly in trajectory.polynomials:
piece = TrajectoryPolynomialPiece()
piece.duration = rospy.Duration.from_sec(poly.duration)
piece.poly_x = poly.px.p
piece.poly_y = poly.py.p
piece.poly_z = poly.pz.p
piece.poly_yaw = poly.pyaw.p
pieces.append(piece)
self.uploadTrajectoryService(trajectoryId, pieceOffset, pieces)
def startTrajectory(self, trajectoryId, timescale = 1.0, reverse = False, relative = True, groupMask = 0):
self.startTrajectoryService(groupMask, trajectoryId, timescale, reverse, relative)
def position(self):
self.tf.waitForTransform("/world", "/cf" + str(self.id), rospy.Time(0), rospy.Duration(10))
position, quaternion = self.tf.lookupTransform("/world", "/cf" + str(self.id), rospy.Time(0))
return np.array(position)
def getParam(self, name):
return rospy.get_param(self.prefix + "/" + name)
def setParam(self, name, value):
rospy.set_param(self.prefix + "/" + name, value)
self.updateParamsService([name])
def setParams(self, params):
for name, value in params.iteritems():
rospy.set_param(self.prefix + "/" + name, value)
self.updateParamsService(params.keys())