-
Notifications
You must be signed in to change notification settings - Fork 67
/
commparser.py
310 lines (271 loc) · 11.6 KB
/
commparser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
#!/usr/bin/env python3
"""
Module for parsing draft-ietf-grow-yang-bgp-communities style BGP community definitions
Based on code written by Martin Pels
This file is part of the NLNOG Looking Glass code.
Source code: https://github.com/NLNOG/nlnog-lg
Copyright (c) 2022-2024 Stichting NLNOG <[email protected]>
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SH671025ALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""
import re
import json
import requests
class BGPCommunityParser:
"""
An object to keep track of one or more draft-ietf-grow-yang-bgp-communities style BGP community definitions
and do lookups on them.
"""
def __init__(self, sources=None):
self.comm_regular = []
self.comm_large = []
self.comm_extended = []
self.sources = []
if not sources:
return
if not isinstance(sources, list):
sources = [sources]
for source in sources:
self.load_source(source)
def load_source(self, source: str):
"""
Load a draft-yang-bgp-communities style BGP community definition from
an URL or file.
"""
jdata = None
if source.startswith("http://") or source.startswith("https://"):
jdata = requests.get(source, timeout=5).json()
else:
jdata = json.load(source)
self.comm_regular += jdata["draft-ietf-grow-yang-bgp-communities:bgp-communities"]["regular"]
self.comm_large += jdata["draft-ietf-grow-yang-bgp-communities:bgp-communities"]["large"]
self.comm_extended += jdata["draft-ietf-grow-yang-bgp-communities:bgp-communities"]["extended"]
self.sources.append(source)
def __str__(self):
"""
Simple string representation of the object.
"""
return f"BGPCommunityParser object with {len(self.sources)} sources, " \
f"{len(self.comm_regular)} regular, " \
f"{len(self.comm_large)} large and {len(self.comm_extended)} communities"
def parse_community(self, community: str) -> str:
"""
Lookup a community string in the loaded community definitions.
"""
if re.match(r"^\d+:\d+$", community):
return self.parse_regular_community(community)
if re.match(r"^\d+:\d+:\d+$", community):
return self.parse_large_community(community)
if re.match(r"^0x\d\d:0x\d\d:\d+:\d+$", community):
return self.parse_extended_community(community)
return None
def parse_regular_community(self, community: str) -> str:
"""
Process RFC1997 community
"""
asn, content = community.split(":", 1)
found = self._try_candidates_regular(asn, content, self.comm_regular)
if found:
fieldvals = self._candidate2fields(content, found["localadmin"])
return self._print_match(community, found, fieldvals)
return None
def parse_large_community(self, community: str) -> str:
"""
Process RFC8092 community
"""
asn, content1, content2 = community.split(":", 2)
found = self._try_candidates_large(asn, content1, content2, self.comm_large)
if found:
fieldvals = self._candidate2fields_large(
content1, content2, found["localdatapart1"], found["localdatapart2"]
)
return self._print_match(community, found, fieldvals)
return None
def parse_extended_community(self, community: str) -> str:
"""
Process RFC4360 community
"""
extype, exsubtype, asn, content = community.split(":", 3)
found = self._try_candidates_extended(
extype, exsubtype, asn, content, self.comm_extended
)
if found:
fieldvals = self._candidate2fields(content, found["localadmin"])
return self._print_match(community, found, fieldvals)
return None
def _try_candidates_regular(self, asn: str, content: str, candidates: list):
"""
Try to find a matching Regular Community amongst candidate JSON definitions
"""
for candidate in candidates:
if asn != str(candidate["globaladmin"]):
continue
if "format" in candidate["localadmin"]:
if candidate["localadmin"]["format"] == "binary":
content = self._decimal2bits(content, 16)
if self._try_candidate_fields(content, candidate["localadmin"]["fields"]):
return candidate
return False
def _try_candidates_large(self, asn, content1, content2, candidates):
"""
Try to find a matching Large Community amongst candidate JSON definitions
"""
for candidate in candidates:
if asn != str(candidate["globaladmin"]):
continue
if "format" in candidate["localdatapart1"]:
if candidate["localdatapart1"]["format"] == "binary":
content1 = self._decimal2bits(content1, 32)
if "format" in candidate["localdatapart2"]:
if candidate["localdatapart2"]["format"] == "binary":
content2 = self._decimal2bits(content2, 32)
if self._try_candidate_fields(
content1, candidate["localdatapart1"]["fields"]
) and self._try_candidate_fields(
content2, candidate["localdatapart2"]["fields"]
):
return candidate
return False
def _try_candidates_extended(self, extype, exsubtype, asn, content, candidates):
"""
Try to find a matching Extended Community amongst candidate JSON definitions
"""
for candidate in candidates:
contentstring = content
if int(extype, 16) != candidate["type"]:
continue
if int(exsubtype, 16) != candidate["subtype"]:
continue
if "asn" in candidate:
if asn != str(candidate["asn"]):
continue
elif "asn4" in candidate:
if asn != str(candidate["asn4"]):
continue
else:
continue
if "format" in candidate["localadmin"]:
if candidate["localadmin"]["format"] == "binary":
if "asn4" in candidate:
contentstring = self._decimal2bits(content, 16)
else:
contentstring = self._decimal2bits(content, 32)
if self._try_candidate_fields(
contentstring, candidate["localadmin"]["fields"]
):
return candidate
return False
def _try_candidate_fields(self, content, cfields):
"""
Try to match fields from a single candidate JSON definition
"""
pos = 0
for cfield in cfields:
if "length" in cfield:
value = content[pos: pos + cfield["length"]]
else:
value = content
pattern = cfield["pattern"]
if pattern.startswith("^"):
pattern = pattern[1:]
if pattern.endswith("$"):
pattern = pattern[:-1]
if not re.match("^{}$".format(pattern), value):
# print('{} != {}'.format(pattern,value))
return False
if "length" in cfield:
pos = pos + cfield["length"]
return True
def _candidate2fields(self, contentbits, clocaladmin):
"""
Link values from tested community to field names in matched candidate
"""
fields = {}
pos = 0
if "format" in clocaladmin:
if clocaladmin["format"] == "binary":
contentbits = self._decimal2bits(contentbits, 16)
for fid, field in enumerate(clocaladmin["fields"]):
if "length" in field:
length = field["length"]
else:
length = len(contentbits)
fields[fid] = contentbits[pos: pos + length]
pos = pos + length
return fields
def _candidate2fields_large(
self, contentbits1, contentbits2, clocaldatapart1, clocaldatapart2
):
"""
Link values from tested large community to field names in matched candidate
"""
fields = {}
if "format" in clocaldatapart1:
if clocaldatapart1["format"] == "binary":
contentbits1 = self._decimal2bits(contentbits1, 32)
if "format" in clocaldatapart2:
if clocaldatapart2["format"] == "binary":
contentbits2 = self._decimal2bits(contentbits2, 32)
pos = 0
foffset = 0
for fid, field in enumerate(clocaldatapart1["fields"]):
if "length" in field:
length = field["length"]
else:
length = len(contentbits1)
fields[foffset + fid] = contentbits1[pos: pos + length]
pos = pos + length
pos = 0
foffset = len(clocaldatapart1["fields"])
for fid, field in enumerate(clocaldatapart2["fields"]):
if "length" in field:
length = field["length"]
else:
length = len(contentbits2)
fields[foffset + fid] = contentbits2[pos: pos + length]
pos = pos + length
return fields
def _decimal2bits(self, decimal, length):
"""
Convert decimal value to bit string
"""
return f"{int(decimal):0{length}b}"
def _print_match(self, community, candidate, fieldvals):
"""
Return out a matched community description
"""
output_sections = []
output_fields = []
if "localadmin" in candidate:
for fid, field in enumerate(candidate["localadmin"]["fields"]):
if "description" in field:
output_fields.append(f'{field["name"]}={field["description"]}')
else:
output_fields.append(f'{field["name"]}={fieldvals[fid]}')
output_sections.append(",".join(output_fields))
elif "localdatapart1" in candidate:
offset = 0
output_fields = []
for fid, field in enumerate(candidate["localdatapart1"]["fields"]):
if "description" in field:
output_fields.append(f"{field['name']}={field['description']}")
else:
output_fields.append(f"{field['name']}={fieldvals[offset + fid]}")
output_sections.append(",".join(output_fields))
offset = len(candidate["localdatapart1"]["fields"])
output_fields = []
for fid, field in enumerate(candidate["localdatapart2"]["fields"]):
if "description" in field:
output_fields.append(f'{field["name"]}={field["description"]}')
else:
output_fields.append(f'{field["name"]}={fieldvals[offset + fid]}')
output_sections.append(",".join(output_fields))
return f"{':'.join(output_sections)}"