forked from 0x9900/AutoFT8
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsqstatus.py
157 lines (129 loc) · 3.9 KB
/
sqstatus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#
# BSD 3-Clause License
#
# Copyright (c) 2021, Fred W6BSD
# All rights reserved.
#
#
import ctypes
import struct
SQ_MAGIC = 0xBADDECAF
SQ_VERSION = 1
SQ_STRUCT = struct.Struct('!IHHHH??10s')
SQ_HEARTBEAT = 0x01
SQ_PAUSE = 0x02
SQ_DATA = 0x08
XMIT_MAXRETRY = 5
class SQStatus:
__xmit__ = 0
"""
SQ Status heart beat (type = 0x0001 heartbeat)
1 magic number uint (4)
2 version ushort (2)
3 packet type ushort (2)
SQ Status data format (type = 0x0002 data)
1 magic number uint (4)
2 version ushort (2)
3 packet type ushort (2)
4 max retry ushort (2)
5 xmit ushort (2)
6 pause bool (1)
7 shutdown bool (1)
8 call utf-8 (10)
"""
def __init__(self):
self._max_tries = XMIT_MAXRETRY
# Private variables
self._call = b''
self._pause = False
self._shutdown = False
# Local variables
self._ip_wsjt = None
self._ip_monit = None
def __repr__(self):
msg = ("{0.__class__} Xmit:{0.xmit} Max_Tries: {0._max_tries} "
"Call: {0.call} Pause: {0._pause}")
return msg.format(self)
def heartbeat(self):
sq_struct = struct.Struct('!IHH')
packet = ctypes.create_string_buffer(sq_struct.size)
sq_struct.pack_into(packet, 0, SQ_MAGIC, SQ_VERSION, SQ_HEARTBEAT)
return packet.raw
def pause(self, flag=True):
self.__xmit__ = 0
self._call = b''
self._pause = flag
sq_struct = struct.Struct('!IHH?')
packet = ctypes.create_string_buffer(sq_struct.size)
sq_struct.pack_into(packet, 0, SQ_MAGIC, SQ_VERSION, SQ_PAUSE, flag)
return packet.raw
def encode(self):
packet = ctypes.create_string_buffer(SQ_STRUCT.size)
SQ_STRUCT.pack_into(packet, 0, SQ_MAGIC, SQ_VERSION, SQ_DATA, self._max_tries,
self.xmit, self._pause, self._shutdown, self._call)
return packet.raw
def decode(self, packet):
sq_header = struct.Struct('!IHH')
buffer = ctypes.create_string_buffer(packet[:sq_header.size], sq_header.size)
data = sq_header.unpack_from(buffer)
magic, version, pkt_type = data[:3]
if magic != SQ_MAGIC or version < SQ_VERSION:
raise IOError("SQS packet error")
if pkt_type == SQ_HEARTBEAT:
return
elif pkt_type == SQ_PAUSE:
sq_struct = struct.Struct('!IHH?')
buffer = ctypes.create_string_buffer(packet, sq_struct.size)
data = sq_struct.unpack_from(buffer)
magic, version, pkt_type, pause = data
self._pause = pause
elif pkt_type == SQ_DATA:
buffer = ctypes.create_string_buffer(packet, SQ_STRUCT.size)
data = SQ_STRUCT.unpack_from(buffer)
magic, version, pkt_type, max_tries, xmit, pause, shutdown, call = data
self._max_tries = max_tries
self.xmit = xmit
self._call = call.strip(b'\0x00')
self._pause = pause
self._shutdown = shutdown
@property
def max_tries(self):
return self._max_tries
@max_tries.setter
def max_tries(self, val):
assert isinstance(val, int)
if not 1 < val < 15:
raise ValueError('The max_tries value must be between 1 and 15')
self._max_tries = val
@property
def xmit(self):
return self.__xmit__
@xmit.setter
def xmit(self, val):
assert isinstance(val, int)
self.__xmit__ = 0 if val < 0 else val
if self.__xmit__ == 0:
self._call = b''
@property
def call(self):
return self._call.decode('utf-8')
@call.setter
def call(self, val):
assert isinstance(val, str)
self._call = val[:10].upper().encode('utf-8')
@property
def ip_wsjt(self):
return self._ip_wsjt
@ip_wsjt.setter
def ip_wsjt(self, val):
assert isinstance(val, tuple), "socket tuple expected"
self._ip_wsjt = val
@property
def ip_monit(self):
return self._ip_monit
@ip_monit.setter
def ip_monit(self, val):
assert isinstance(val, tuple), "socket tuple expected"
self._ip_monit = val
def is_pause(self):
return self._pause