Skip to content
This repository has been archived by the owner on Sep 7, 2023. It is now read-only.

Commit

Permalink
Merge pull request #61 from nautobot/pk-e2e-png
Browse files Browse the repository at this point in the history
Add PNG path functionality for IP Fabric v4
  • Loading branch information
pke11y authored Jan 18, 2022
2 parents 78ba695 + 03ee356 commit 52612d5
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 3 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@

## v1.1.0 - 2022-01-17

### Deprecated

- The `end-to-end-path` command is being deprecated and will be available for users of IP Fabric v3.8. Future path simulation capability will be developed in the `pathlookup` command for IP Fabric v4.

### Added

- #60 - Added `find-host` command.
- #61 - Added `pathlookup` command to get PNG for path lookups. Supported in IP Fabric v4.


## v1.0.0 - 2021-12-06

Initial release
59 changes: 59 additions & 0 deletions nautobot_chatops_ipfabric/ipfabric.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ def get_response_json(self, method, url, payload, params=None):
response = requests.request(method, self.host_url + url, json=payload, params=params, headers=self.headers)
return response.json()

def get_response_raw(self, method, url, payload, params=None):
"""Get request and return response dict."""
headers = {**self.headers}
headers["Accept"] = "*/*"
return requests.request(method, self.host_url + url, json=payload, params=params, headers=headers)

def get_devices_info(self, snapshot_id="$last", limit=DEFAULT_PAGE_LIMIT):
"""Return Device info."""
logger.debug("Received device list request")
Expand All @@ -41,6 +47,16 @@ def get_devices_info(self, snapshot_id="$last", limit=DEFAULT_PAGE_LIMIT):
}
return self.get_response("/api/v1/tables/inventory/devices", payload)

def get_os_version(self):
"""Return IP Fabric OS version info."""
logger.debug("Received OS version request")

payload = {}
response = self.get_response_json("GET", "/api/v1/os/version", payload)
os_version = float(response.get("version", "0.0").rpartition(".")[0])
logger.debug("Your IP Fabric OS version is: %s", os_version)
return os_version

def get_device_inventory(self, search_key, search_value, snapshot_id="$last", limit=DEFAULT_PAGE_LIMIT):
"""Return Device info."""
logger.debug("Received device inventory request")
Expand Down Expand Up @@ -116,6 +132,42 @@ def get_path_simulation(
payload = {}
return self.get_response_json("GET", "/api/v1/graph/end-to-end-path", payload, params)

def get_pathlookup(
self, src_ip, dst_ip, src_port, dst_port, protocol, snapshot_id
): # pylint: disable=too-many-arguments
"""Return pathlookup simulation as PNG output. Requires v4 IP Fabric server."""
no_png_flags = ["no-dgw", "no-receiver", "no-source"] # a path with these flags results in any empty PNG
payload = {
"snapshot": snapshot_id,
"parameters": {
"type": "pathLookup",
"pathLookupType": "unicast",
"protocol": protocol,
"startingPoint": src_ip,
"startingPort": src_port,
"destinationPoint": dst_ip,
"destinationPort": dst_port,
"groupBy": "siteName",
"networkMode": "true",
"securedPath": "false",
},
}
logger.debug( # pylint: disable=logging-too-many-args
"Received end-to-end PNG path simulation request: ", payload
)

# no params required
params = {}

json_response = self.get_response_json("POST", "/api/v1/graphs", payload, params=params)
pathlookup = json_response.get("pathlookup", {})
png_response = self.get_response_raw("POST", "/api/v1/graphs/png", payload, params=params)

for flag in pathlookup.get("eventsSummary", {}).get("flags"):
if flag in no_png_flags:
return None
return png_response.content

def get_interfaces_errors_info(self, device, snapshot_id="$last", limit=DEFAULT_PAGE_LIMIT):
"""Return bi-directional interface errors info."""
logger.debug("Received interface error counters request")
Expand Down Expand Up @@ -325,6 +377,13 @@ def get_wireless_ssids(self, snapshot_id="$last", limit=DEFAULT_PAGE_LIMIT):

return self.get_response("/api/v1/tables/wireless/radio", payload)

def validate_version(self, operator_func, version):
"""Validate the IP Fabric OS version."""
logger.debug("Validate IP Fabric OS version is %s %s", operator_func, version)

ipfabric_version = self.get_os_version()
return operator_func(ipfabric_version, version)

def get_host(self, search_key, search_value, snapshot_id="$last", limit=DEFAULT_PAGE_LIMIT):
"""Return inventory host information."""
logger.debug("Received host inventory request - %s %s", search_key, search_value)
Expand Down
105 changes: 102 additions & 3 deletions nautobot_chatops_ipfabric/worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
"""Worker functions implementing Nautobot "ipfabric" command and subcommands."""
import logging
import tempfile
import os
from datetime import datetime
from operator import ge

from django.conf import settings
from django_rq import job
Expand Down Expand Up @@ -361,6 +365,7 @@ def end_to_end_path(
): # pylint: disable=too-many-arguments, too-many-locals
"""Execute end-to-end path simulation between source and target IP address."""
snapshot_id = get_user_snapshot(dispatcher)
sub_cmd = "end-to-end-path"

dialog_list = [
{
Expand Down Expand Up @@ -390,14 +395,14 @@ def end_to_end_path(
]

if not all([src_ip, dst_ip, src_port, dst_port, protocol]):
dispatcher.multi_input_dialog("ipfabric", "end-to-end-path", "Path Simulation", dialog_list)
dispatcher.multi_input_dialog(f"{BASE_CMD}", f"{sub_cmd}", "Path Simulation", dialog_list)
return CommandStatusChoices.STATUS_SUCCEEDED

dispatcher.send_blocks(
[
*dispatcher.command_response_header(
"ipfabric",
"end-to-end-path",
f"{BASE_CMD}",
f"{sub_cmd}",
[
("src_ip", src_ip),
("dst_ip", dst_ip),
Expand Down Expand Up @@ -439,6 +444,100 @@ def end_to_end_path(
return True


@subcommand_of("ipfabric")
def pathlookup(
dispatcher, src_ip, dst_ip, src_port, dst_port, protocol
): # pylint: disable=too-many-arguments, too-many-locals
"""Path simulation diagram lookup between source and target IP address."""
snapshot_id = get_user_snapshot(dispatcher)
sub_cmd = "pathlookup"
supported_protocols = ["tcp", "udp", "icmp"]
protocols = [(protocol.upper(), protocol) for protocol in supported_protocols]

# identical to dialog_list in end-to-end-path; consolidate dialog_list if maintaining both cmds
dialog_list = [
{
"type": "text",
"label": "Source IP",
},
{
"type": "text",
"label": "Destination IP",
},
{
"type": "text",
"label": "Source Port",
"default": "1000",
},
{
"type": "text",
"label": "Destination Port",
"default": "22",
},
{
"type": "select",
"label": "Protocol",
"choices": protocols,
"default": protocols[0],
},
]

if not all([src_ip, dst_ip, src_port, dst_port, protocol]):
dispatcher.multi_input_dialog(f"{BASE_CMD}", f"{sub_cmd}", "Path Lookup", dialog_list)
return CommandStatusChoices.STATUS_SUCCEEDED

# verify IP address and protocol is valid
if not is_ip(src_ip) or not is_ip(dst_ip):
dispatcher.send_error("You've entered an invalid IP address")
return CommandStatusChoices.STATUS_FAILED
if protocol not in supported_protocols:
dispatcher.send_error(f"You've entered an unsupported protocol: {protocol}")
return CommandStatusChoices.STATUS_FAILED

dispatcher.send_blocks(
[
*dispatcher.command_response_header(
f"{BASE_CMD}",
f"{sub_cmd}",
[
("src_ip", src_ip),
("dst_ip", dst_ip),
("src_port", src_port),
("dst_port", dst_port),
("protocol", protocol),
],
"Path Lookup",
ipfabric_logo(dispatcher),
),
dispatcher.markdown_block(f"{ipfabric_api.host_url}/diagrams/pathlookup"),
]
)

# only supported in IP Fabric OS version 4.0+
try:
if ipfabric_api.validate_version(ge, 4.0):
raw_png = ipfabric_api.get_pathlookup(src_ip, dst_ip, src_port, dst_port, protocol, snapshot_id)
if not raw_png:
raise RuntimeError(
"An error occurred while retrieving the path lookup. Please verify the path using the link above."
)
with tempfile.TemporaryDirectory() as tempdir:
# Note: Microsoft Teams will silently fail if we have ":" in our filename, so the timestamp has to skip them.
time_str = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
img_path = os.path.join(tempdir, f"{sub_cmd}_{time_str}.png")
with open(img_path, "wb") as img_file:
img_file.write(raw_png)
dispatcher.send_image(img_path)
else:
raise RuntimeError(
"Your IP Fabric OS version does not support PNG output. Please try the end-to-end-path command."
)
except (RuntimeError, OSError) as error:
dispatcher.send_error(error)
return CommandStatusChoices.STATUS_FAILED
return True


# ROUTING COMMAND


Expand Down

0 comments on commit 52612d5

Please sign in to comment.