Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lib): mitre d3fend #1394

Merged
merged 7 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,224 changes: 2,224 additions & 0 deletions backend/library/libraries/d3fend.yaml

Large diffs are not rendered by default.

25 changes: 16 additions & 9 deletions tools/convert_library.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
The first tab shall be named "library_content" and contain the description of the library in the other tabs
library_urn | <urn>
library_version | <version>
library_publication_date | <date>
library_publication_date | <date> (default value: now)
library_locale | <en/fr/...>
library_ref_id | <ref_id>
library_name | <name>
Expand Down Expand Up @@ -112,7 +112,7 @@
import yaml
from pprint import pprint
from collections import defaultdict

import datetime

LIBRARY_VARS = (
"library_urn",
Expand Down Expand Up @@ -160,7 +160,7 @@
description="convert an Excel file in a library for CISO Assistant",
)
parser.add_argument("input_file_name")
parser.add_argument("--compat", action='store_true')
parser.add_argument("--compat", action="store_true")
args = parser.parse_args()

ref_name = re.sub(r"\.\w+$", "", args.input_file_name).lower()
Expand Down Expand Up @@ -477,21 +477,21 @@ def build_ids_set(tab_name):
)
if skip_count:
counter_fix += 1
ref_id_urn = f"node{counter-counter_fix}-{counter_fix}"
ref_id_urn = f"node{counter - counter_fix}-{counter_fix}"
else:
ref_id_urn = (
ref_id.lower().replace(" ", "-")
if ref_id
else f"node{counter-counter_fix}"
else f"node{counter - counter_fix}"
)
urn = f"{root_nodes_urn}:{ref_id_urn}"
else:
if ref_id:
urn = f"{root_nodes_urn}:{ref_id.lower().replace(' ', '-')}"
urn = f"{root_nodes_urn}:{ref_id.lower().replace(' ', '-')}"
else:
p = parent_for_depth[depth]
c = count_for_depth[depth]
urn =f"{p}:{c}"
urn = f"{p}:{c}"
count_for_depth[depth] += 1
if urn in urn_unicity_checker:
print("URN duplicate:", urn)
Expand Down Expand Up @@ -679,7 +679,11 @@ def build_ids_set(tab_name):
score = row[header["score"]].value
name = row[header["name"]].value
description = row[header["description"]].value
description_doc = row[header["description_doc"]].value if "description_doc" in header else None
description_doc = (
row[header["description_doc"]].value
if "description_doc" in header
else None
)
translations = get_translations(header, row)
current_score = {
"score": score,
Expand Down Expand Up @@ -837,6 +841,9 @@ def build_ids_set(tab_name):
library_vars_dict["tab"][x] for x in library_vars_dict["tab"]
]

lib_date = library_vars.get("library_publication_date", datetime.datetime.now())
if type(lib_date) == datetime.datetime:
lib_date = lib_date.date()
Comment on lines +844 to +846
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use isinstance() for type checking.

Replace type() comparison with isinstance() for better Python practices.

lib_date = library_vars.get("library_publication_date", datetime.datetime.now())
-if type(lib_date) == datetime.datetime:
+if isinstance(lib_date, datetime.datetime):
    lib_date = lib_date.date()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
lib_date = library_vars.get("library_publication_date", datetime.datetime.now())
if type(lib_date) == datetime.datetime:
lib_date = lib_date.date()
lib_date = library_vars.get("library_publication_date", datetime.datetime.now())
if isinstance(lib_date, datetime.datetime):
lib_date = lib_date.date()
🧰 Tools
🪛 Ruff (0.8.2)

845-845: Use is and is not for type comparisons, or isinstance() for isinstance checks

(E721)

library = {
"urn": library_vars["library_urn"],
"locale": library_vars["library_locale"],
Expand All @@ -845,7 +852,7 @@ def build_ids_set(tab_name):
"description": library_vars["library_description"],
"copyright": library_vars["library_copyright"],
"version": library_vars["library_version"],
"publication_date": library_vars["library_publication_date"].date(),
"publication_date": lib_date,
"provider": library_vars["library_provider"],
"packager": library_vars["library_packager"],
}
Expand Down
219 changes: 219 additions & 0 deletions tools/mitre/d3fend.csv

Large diffs are not rendered by default.

84 changes: 84 additions & 0 deletions tools/mitre/d3fend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# d3fend library generator for CISO Assistant

import csv
from openpyxl import Workbook

tactic_to_csf_funtion = {
"Model": "identify",
"Harden": "protect",
"Detect": "detect",
"Isolate": "protect",
"Deceive": "protect",
"Evict": "respond",
}

output_file_name = "d3fend.xlsx"

library_description = """A cybersecurity ontology designed to standardize vocabulary for employing techniques to counter malicious cyber threats.
Version - 1.0.0 - 2024-12-20
https://d3fend.mitre.org/resources/"""

library_copyright = """Terms of Use
LICENSE
The MITRE Corporation (MITRE) hereby grants you a non-exclusive, royalty-free license to use D3FEND for research, development, and commercial purposes. Any copy you make for such purposes is authorized provided that you reproduce MITRE’s copyright designation and this license in any such copy.
DISCLAIMERS
ALL DOCUMENTS AND THE INFORMATION CONTAINED THEREIN ARE PROVIDED ON AN "AS IS" BASIS AND THE CONTRIBUTOR, THE ORGANIZATION HE/SHE REPRESENTS OR IS SPONSORED BY (IF ANY), THE MITRE CORPORATION, ITS BOARD OF TRUSTEES, OFFICERS, AGENTS, AND EMPLOYEES, DISCLAIM ALL WARRANTIES, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO ANY WARRANTY THAT THE USE OF THE INFORMATION THEREIN WILL NOT INFRINGE ANY RIGHTS OR ANY IMPLIED WARRANTIES OF MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE.
"""

packager = "intuitem"


with open("d3fend.csv", newline="") as csvfile:
reader = csv.reader(csvfile, delimiter=",")

Comment on lines +31 to +33
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for CSV file operations.

The script may fail silently if the CSV file is missing or inaccessible. Consider adding proper error handling.

-with open("d3fend.csv", newline="") as csvfile:
+try:
+    with open("d3fend.csv", newline="") as csvfile:
+        reader = csv.reader(csvfile, delimiter=",")
+except FileNotFoundError:
+    print("Error: d3fend.csv file not found")
+    exit(1)
+except PermissionError:
+    print("Error: Permission denied accessing d3fend.csv")
+    exit(1)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
with open("d3fend.csv", newline="") as csvfile:
reader = csv.reader(csvfile, delimiter=",")
try:
with open("d3fend.csv", newline="") as csvfile:
reader = csv.reader(csvfile, delimiter=",")
except FileNotFoundError:
print("Error: d3fend.csv file not found")
exit(1)
except PermissionError:
print("Error: Permission denied accessing d3fend.csv")
exit(1)

n = 0
current_technique = ""
current_technique_l1 = ""

wb_output = Workbook()
ws = wb_output.active
print("generating", output_file_name)
ws.title = "library_content"
ws.append(["library_urn", f"urn:{packager.lower()}:risk:library:mitre-d3fend"])
ws.append(["library_version", 1])
ws.append(["library_locale", "en"])
ws.append(["library_publication_date", "2025-01-22"])
ws.append(["library_ref_id", "d3fend"])
ws.append(["library_name", "Mitre D3FEND"])
ws.append(["library_description", library_description])
ws.append(["library_copyright", library_copyright])
ws.append(["library_provider", "Mitre D3FEND"])
ws.append(["library_packager", packager])
ws.append(["tab", "controls", "reference_controls"])
ws.append(
[
"reference_control_base_urn",
"urn:intuitem:risk:reference-controls:mitre-d3fend",
]
)
ws1 = wb_output.create_sheet("controls")
ws1.append(("ref_id", "name", "description", "category", "csf_function"))

for row in reader:
n += 1
if n == 1:
header = row
else:
id, tactic, technique, technique_l1, technique_l2, definition = row
if technique:
current_technique = technique
continue
if technique_l1:
current_technique_l1 = technique_l1
ref_id = id
name = current_technique_l1
description = f"tactic: {tactic}\ntechnique level 1: {current_technique_l1}\ndefinition: {definition}"
if technique_l2:
ref_id = id
name = technique_l2
description = f"tactic: {tactic}\ntechnique level 1: {current_technique_l1}\ntechnique level 2: {technique_l2}\ndefinition: {definition}"
ws1.append(
(ref_id, name, description, "technical", tactic_to_csf_funtion[tactic])
)

wb_output.save(output_file_name)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for workbook saving.

The workbook save operation could fail due to permission issues or disk space. Add proper error handling.

-    wb_output.save(output_file_name)
+    try:
+        wb_output.save(output_file_name)
+        print(f"Successfully generated {output_file_name}")
+    except PermissionError:
+        print(f"Error: Permission denied saving {output_file_name}")
+        exit(1)
+    except Exception as e:
+        print(f"Error saving workbook: {str(e)}")
+        exit(1)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
wb_output.save(output_file_name)
try:
wb_output.save(output_file_name)
print(f"Successfully generated {output_file_name}")
except PermissionError:
print(f"Error: Permission denied saving {output_file_name}")
exit(1)
except Exception as e:
print(f"Error saving workbook: {str(e)}")
exit(1)

Binary file added tools/mitre/d3fend.xlsx
Binary file not shown.
30 changes: 20 additions & 10 deletions tools/mitre/mitre.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
from openpyxl import Workbook

MITRE_COPYRIGHT="""
MITRE_COPYRIGHT = """
Terms of Use
LICENSE
The MITRE Corporation (MITRE) hereby grants you a non-exclusive, royalty-free license to use ATT&CK® for research, development, and commercial purposes. Any copy you make for such purposes is authorized provided that you reproduce MITRE's copyright designation and this license in any such copy.
Expand All @@ -12,6 +12,7 @@
ALL DOCUMENTS AND THE INFORMATION CONTAINED THEREIN ARE PROVIDED ON AN "AS IS" BASIS AND THE CONTRIBUTOR, THE ORGANIZATION HE/SHE REPRESENTS OR IS SPONSORED BY (IF ANY), THE MITRE CORPORATION, ITS BOARD OF TRUSTEES, OFFICERS, AGENTS, AND EMPLOYEES, DISCLAIM ALL WARRANTIES, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO ANY WARRANTY THAT THE USE OF THE INFORMATION THEREIN WILL NOT INFRINGE ANY RIGHTS OR ANY IMPLIED WARRANTIES OF MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE.
"""


def main():
mitre_attack_data = MitreAttackData("enterprise-attack.json")

Expand All @@ -21,11 +22,15 @@ def main():
ws = wb.active
ws.append(["ref_id", "name", "category", "description"])
for m in mitigations:
ws.append([m.external_references[0].external_id,
m.name,
"technical",
m.description.strip(" \n") + "\n" + m.external_references[0].url])
wb.save('measures.xlsx')
ws.append(
[
m.external_references[0].external_id,
m.name,
"technical",
m.description.strip(" \n") + "\n" + m.external_references[0].url,
]
)
wb.save("measures.xlsx")

techniques = mitre_attack_data.get_techniques(remove_revoked_deprecated=True)
print(f"Retrieved {len(techniques)} ATT&CK techniques.")
Expand All @@ -34,10 +39,15 @@ def main():
ws = wb.active
ws.append(["ref_id", "name", "description"])
for t in techniques:
ws.append([t.external_references[0].external_id,
t.name,
t.description.strip(" \n") + "\n" + t.external_references[0].url])
wb.save('techniques.xlsx')
ws.append(
[
t.external_references[0].external_id,
t.name,
t.description.strip(" \n") + "\n" + t.external_references[0].url,
]
)
wb.save("techniques.xlsx")


if __name__ == "__main__":
main()
19 changes: 13 additions & 6 deletions tools/nzism/prep_nzism.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def process_paragraph(level, paragraph):
implementation_group = ",".join(groups)

if paragraph.has_attr("cid"):
paragraph_cid = f"CID: {paragraph["cid"]}"
paragraph_cid = f"CID: {paragraph['cid']}"
paragraph_name = f"Control - {paragraph_classification} - {paragraph_compliance} [{paragraph_cid}]"

output_table.append(
Expand Down Expand Up @@ -78,8 +78,8 @@ def process_paragraph(level, paragraph):
input_file_name = f"{folder_path}/{args.filename}"

if "." in args.version:
major_version = args.version.split('.')[0]
minor_version = args.version.split('.')[1]
major_version = args.version.split(".")[0]
minor_version = args.version.split(".")[1]
else:
major_version = args.version

Expand Down Expand Up @@ -125,7 +125,9 @@ def process_paragraph(level, paragraph):
wb_output = openpyxl.Workbook()
ws = wb_output.active
ws.title = "library_content"
ws.append(["library_urn", f"urn:{packager.lower()}:risk:library:nzism-v{major_version}"])
ws.append(
["library_urn", f"urn:{packager.lower()}:risk:library:nzism-v{major_version}"]
)
ws.append(["library_version", 1])
ws.append(["library_locale", "en"])
ws.append(["library_ref_id", f"NSIZM-v{major_version}"])
Expand All @@ -149,11 +151,16 @@ def process_paragraph(level, paragraph):
)
ws.append(["library_provider", "New Zealand Government Communications Security Bureau"])
ws.append(["library_packager", packager])
ws.append(["framework_urn", f"urn:{packager.lower()}:risk:framework:nzism-v{major_version}"])
ws.append(
["framework_urn", f"urn:{packager.lower()}:risk:framework:nzism-v{major_version}"]
)
ws.append(["framework_ref_id", f"NSIZM-v{major_version}"])
ws.append(["framework_name", f"NZISM v{major_version}"])
ws.append(
["framework_description", f"New Zealand Information Security Manual v{major_version}"]
[
"framework_description",
f"New Zealand Information Security Manual v{major_version}",
]
)
ws.append(["tab", "requirements", "requirements"])
ws.append(["tab", "scores", "scores"])
Expand Down
82 changes: 59 additions & 23 deletions tools/prepare_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,22 @@
packager = "intuitem"

print("parsing", args.source_yaml, args.target_yaml)
with open(args.source_yaml, 'r') as file:
with open(args.source_yaml, "r") as file:
source = yaml.safe_load(file)
with open(args.target_yaml, 'r') as file:
with open(args.target_yaml, "r") as file:
target = yaml.safe_load(file)

source_library_urn = source['urn']
target_library_urn = target['urn']
source_framework_urn = source['objects']['framework']['urn']
target_framework_urn = target['objects']['framework']['urn']
source_framework_ref_id = source['objects']['framework']['ref_id']
target_framework_ref_id = target['objects']['framework']['ref_id']
source_framework_name = source['objects']['framework']['name']
target_framework_name = target['objects']['framework']['name']
source_library_urn = source["urn"]
target_library_urn = target["urn"]
source_framework_urn = source["objects"]["framework"]["urn"]
target_framework_urn = target["objects"]["framework"]["urn"]
source_framework_ref_id = source["objects"]["framework"]["ref_id"]
target_framework_ref_id = target["objects"]["framework"]["ref_id"]
source_framework_name = source["objects"]["framework"]["name"]
target_framework_name = target["objects"]["framework"]["name"]

source_ref_id=source_framework_urn.split(':')[-1]
target_ref_id=target_framework_urn.split(':')[-1]
source_ref_id = source_framework_urn.split(":")[-1]
target_ref_id = target_framework_urn.split(":")[-1]
ref_id = "mapping-" + source_ref_id + "-to-" + target_ref_id
name = f"{source_framework_ref_id} -> {target_framework_ref_id}"
description = f"Mapping from {source_framework_name} to {target_framework_name}"
Expand All @@ -50,21 +50,39 @@
ws.append(["library_provider", packager])
ws.append(["library_packager", packager])
ws.append(["library_dependencies", f"{source_library_urn}, {target_library_urn}"])
ws.append(["mapping_urn", f"urn:{packager.lower()}:risk:req_mapping_set:{source_ref_id}"])
ws.append(
["mapping_urn", f"urn:{packager.lower()}:risk:req_mapping_set:{source_ref_id}"]
)
ws.append(["mapping_ref_id", ref_id])
ws.append(["mapping_name", name])
ws.append(["mapping_description",description])
ws.append(["mapping_description", description])
ws.append(["mapping_source_framework_urn", source_framework_urn])
ws.append(["mapping_target_framework_urn", target_framework_urn])
ws.append(["mapping_source_node_base_urn", f"urn:{packager.lower()}:risk:req_node:{source_ref_id}"])
ws.append(["mapping_target_node_base_urn", f"urn:{packager.lower()}:risk:req_node:{target_ref_id}"])
ws.append(
[
"mapping_source_node_base_urn",
f"urn:{packager.lower()}:risk:req_node:{source_ref_id}",
]
)
ws.append(
[
"mapping_target_node_base_urn",
f"urn:{packager.lower()}:risk:req_node:{target_ref_id}",
]
)
ws.append(["tab", "mappings", "mappings"])

ws1 = wb_output.create_sheet("mappings")
ws1.append(
["source_node_id", "target_node_id", "relationship", "rationale", "strength_of_relationship"]
[
"source_node_id",
"target_node_id",
"relationship",
"rationale",
"strength_of_relationship",
]
)
for node in source['objects']['framework']['requirement_nodes']:
for node in source["objects"]["framework"]["requirement_nodes"]:
if node["assessable"]:
node_id = node["urn"].split(":")[-1]
ws1.append([node_id])
Expand All @@ -86,15 +104,33 @@

ws3 = wb_output.create_sheet("source")
ws3.append(["node_id", "assessable", "urn", "ref_id", "name", "description"])
for node in source['objects']['framework']['requirement_nodes']:
for node in source["objects"]["framework"]["requirement_nodes"]:
node_id = node["urn"].split(":")[-1] if node["assessable"] else ""
ws3.append([node_id, node["assessable"], node["urn"], node.get("ref_id"), node.get("name"), node.get("description")])
ws3.append(
[
node_id,
node["assessable"],
node["urn"],
node.get("ref_id"),
node.get("name"),
node.get("description"),
]
)

ws4 = wb_output.create_sheet("target")
ws4.append(["node_id", "assessable", "urn", "ref_id", "name", "description"])
for node in target['objects']['framework']['requirement_nodes']:
for node in target["objects"]["framework"]["requirement_nodes"]:
node_id = node["urn"].split(":")[-1] if node["assessable"] else ""
ws4.append([node_id, node["assessable"], node["urn"], node.get("ref_id"), node.get("name"), node.get("description")])
ws4.append(
[
node_id,
node["assessable"],
node["urn"],
node.get("ref_id"),
node.get("name"),
node.get("description"),
]
)

print("generate ", output_file_name)
wb_output.save(output_file_name)
wb_output.save(output_file_name)
Loading
Loading