-
Notifications
You must be signed in to change notification settings - Fork 0
/
equipment.py
111 lines (79 loc) · 3.08 KB
/
equipment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import serial
import time
import socket
class EquipmentRS232(serial.Serial):
def __init__(self, *args):
super().__init__()
self.portNumber = args[0]
self.baudRate = int(args[1])
self.timeout = args[2]
self.userInput = ''
self.connect()
def connect(self):
self.ser = serial.Serial(port=self.portNumber,
baudrate=self.baudRate,
stopbits=serial.STOPBITS_ONE,
parity=serial.PARITY_NONE,
bytesize=serial.EIGHTBITS,
timeout=self.timeout)
try:
# sets up a serial object with given parameters and tries to open the communication port
print("Attempting communication with power supply..")
self.ser.open()
print(self.ser.isOpen())
print('port opened successfully')
except Exception as e:
# if port is already opened, close it and open it again
self.ser.close()
self.ser.open()
print('port was already open, was closed and opened again')
def reconnect(self):
self.ser.port = self.portNumber
try:
self.ser.open()
print('port opened successfully')
except Exception:
self.ser.close()
self.ser.open()
print('port was already open, was closed and opened again')
def send(self, userInput):
userInput += "\r\n"
self.ser.reset_input_buffer()
self.ser.reset_output_buffer()
self.ser.write(userInput.encode())
return self.ser.readline().decode()
def send_without_read(self, userInput):
userInput += "\r\n"
self.ser.reset_input_buffer()
self.ser.write(self.userInput.encode())
def close(self):
self.ser.close()
class EquipmentLAN(socket.socket):
def __init__(self, *args):
self.device = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.IP_ADDRESS = args[0]
self.PORT_NUMBER = args[1]
self.DEVICE_TYPE = args[2]
self.TIME_OUT = args[3]
def connect(self, DEVICE_TYPE, device):
print("Establishing connection with {}!".format(DEVICE_TYPE))
# attempts to connect to host, if connection fails returns a timeout error
device.connect(('169.254.4.61', 5025))
print('Connected successfully!')
device.settimeout(2)
def send(self, command, device, DEVICE_TYPE):
device.sendall(command.encode())
time.sleep(.1)
device.sendall('SYSTem:ERRor?\r\n'.encode())
time.sleep(.1)
print(DEVICE_TYPE + ':' + device.recv(1024).decode())
def close(self, device):
device.close()
print("CONNECTION TERMINATED!")
time.sleep(.1)
"""
u.ser.write("ADR 6\r\n".encode())
print(u.ser.readline().decode())
u.ser.write("IDN?\r\n".encode())
print(u.ser.readline().decode())
"""