-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added ability to merge .ini and .cfg files
- Loading branch information
Showing
5 changed files
with
242 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
"""Merge two .ini files into one.""" | ||
import configparser | ||
from collections import defaultdict | ||
from pathlib import Path | ||
|
||
from cookie_composer import data_merge | ||
from cookie_composer.composition import ( | ||
COMPREHENSIVE, | ||
DO_NOT_MERGE, | ||
NESTED_OVERWRITE, | ||
OVERWRITE, | ||
) | ||
from cookie_composer.exceptions import MergeError | ||
|
||
|
||
def merge_ini_files(new_file: Path, existing_file: Path, merge_strategy: str): | ||
""" | ||
Merge two INI files into one. | ||
Raises: | ||
MergeError: If something goes wrong | ||
Args: | ||
new_file: The path to the data file to merge | ||
existing_file: The path to the data file to merge into and write out. | ||
merge_strategy: How to do the merge | ||
""" | ||
if merge_strategy == DO_NOT_MERGE: | ||
raise MergeError( | ||
str(new_file), | ||
str(existing_file), | ||
merge_strategy, | ||
"Can not merge with do-not-merge strategy.", | ||
) | ||
try: | ||
existing_config = configparser.ConfigParser() | ||
existing_config.read_file(existing_file.open()) | ||
|
||
if merge_strategy == OVERWRITE: | ||
new_config = configparser.ConfigParser() | ||
new_config.read_file(new_file.open()) | ||
existing_config.update(new_config) | ||
elif merge_strategy == NESTED_OVERWRITE: | ||
existing_config.read(new_file) | ||
elif merge_strategy == COMPREHENSIVE: | ||
new_config = configparser.ConfigParser() | ||
new_config.read_file(new_file.open()) | ||
new_config_dict = config_to_dict(new_config) | ||
existing_config_dict = config_to_dict(existing_config) | ||
existing_config_dict = data_merge.comprehensive_merge(existing_config_dict, new_config_dict) | ||
existing_config = dict_to_config(existing_config_dict) | ||
else: | ||
raise MergeError(error_message=f"Unrecognized merge strategy {merge_strategy}") | ||
except (configparser.Error, FileNotFoundError) as e: | ||
raise MergeError(str(new_file), str(existing_file), merge_strategy, str(e)) from e | ||
|
||
existing_config.write(existing_file.open("w")) | ||
|
||
|
||
def config_to_dict(config: configparser.ConfigParser) -> dict: | ||
"""Convert a configparser object to a dictionary.""" | ||
result = defaultdict(dict) | ||
|
||
for section in config.sections(): | ||
for k, v in config.items(section): | ||
if "\n" in v: | ||
v = v.strip().split("\n") | ||
result[section][k] = v | ||
|
||
return result | ||
|
||
|
||
def dict_to_config(dictionary: dict) -> configparser.ConfigParser: | ||
"""Convert a dict to a configparser object.""" | ||
result = configparser.ConfigParser() | ||
|
||
for section, items in dictionary.items(): | ||
result.add_section(section) | ||
for k, v in items.items(): | ||
if isinstance(v, list): | ||
v = "\n" + "\n".join(v) | ||
result.set(section, k, v) | ||
|
||
return result |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
[Section1] | ||
number=1 | ||
string=abc | ||
list= | ||
a | ||
1 | ||
c | ||
[Section2.dictionary] | ||
a=1 | ||
b= | ||
1 | ||
2 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
[Section1] | ||
number=2 | ||
string=def | ||
list= | ||
a | ||
2 | ||
[Section2.dictionary] | ||
b= | ||
3 | ||
2 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
"""Test merging INI files.""" | ||
import configparser | ||
import shutil | ||
from io import StringIO | ||
|
||
import pytest | ||
|
||
from cookie_composer.composition import ( | ||
COMPREHENSIVE, | ||
DO_NOT_MERGE, | ||
NESTED_OVERWRITE, | ||
OVERWRITE, | ||
) | ||
from cookie_composer.exceptions import MergeError | ||
from cookie_composer.merge_files import ini_file | ||
|
||
|
||
def test_do_not_merge(fixtures_path): | ||
"""This should raise an exception.""" | ||
with pytest.raises(MergeError): | ||
existing_file = fixtures_path / "existing.ini" | ||
new_file = fixtures_path / "new.ini" | ||
ini_file.merge_ini_files(new_file, existing_file, DO_NOT_MERGE) | ||
|
||
|
||
def test_overwrite_merge(tmp_path, fixtures_path): | ||
"""the new overwrites the old.""" | ||
initial_file = fixtures_path / "existing.ini" | ||
existing_file = tmp_path / "existing.ini" | ||
shutil.copy(initial_file, existing_file) | ||
|
||
new_file = fixtures_path / "new.ini" | ||
|
||
ini_file.merge_ini_files(new_file, existing_file, OVERWRITE) | ||
|
||
rendered = existing_file.read_text() | ||
expected_config = configparser.ConfigParser() | ||
expected_config.read_dict( | ||
{ | ||
"Section1": { | ||
"number": 2, | ||
"string": "def", | ||
"list": "\n".join(["", "a", "2"]), | ||
}, | ||
"Section2.dictionary": {"b": "\n".join(["", "3", "2"])}, | ||
} | ||
) | ||
expected = StringIO() | ||
expected_config.write(expected) | ||
assert rendered == expected.getvalue() | ||
|
||
|
||
def test_overwrite_nested_merge(tmp_path, fixtures_path): | ||
"""Test using the nested overwrite merge strategy.""" | ||
initial_file = fixtures_path / "existing.ini" | ||
existing_file = tmp_path / "existing.ini" | ||
shutil.copy(initial_file, existing_file) | ||
|
||
new_file = fixtures_path / "new.ini" | ||
ini_file.merge_ini_files(new_file, existing_file, NESTED_OVERWRITE) | ||
|
||
rendered = existing_file.read_text() | ||
expected_config = configparser.ConfigParser() | ||
expected_config.read_dict( | ||
{ | ||
"Section1": { | ||
"number": 2, | ||
"string": "def", | ||
"list": "\n".join(["", "a", "2"]), | ||
}, | ||
"Section2.dictionary": {"a": 1, "b": "\n".join(["", "3", "2"])}, | ||
} | ||
) | ||
expected = StringIO() | ||
expected_config.write(expected) | ||
assert rendered == expected.getvalue() | ||
|
||
|
||
def test_comprehensive_merge(tmp_path, fixtures_path): | ||
"""Merge using the comprehensive_merge strategy.""" | ||
initial_file = fixtures_path / "existing.ini" | ||
existing_file = tmp_path / "existing.ini" | ||
shutil.copy(initial_file, existing_file) | ||
|
||
new_file = fixtures_path / "new.ini" | ||
ini_file.merge_ini_files(new_file, existing_file, COMPREHENSIVE) | ||
|
||
rendered = configparser.ConfigParser() | ||
rendered.read_file(existing_file.open()) | ||
rendered_data = ini_file.config_to_dict(rendered) | ||
|
||
assert rendered_data["Section1"]["number"] == "2" | ||
assert rendered_data["Section1"]["string"] == "def" | ||
assert set(rendered_data["Section1"]["list"]) == {"a", "1", "2", "c"} | ||
assert rendered_data["Section2.dictionary"]["a"] == "1" | ||
assert set(rendered_data["Section2.dictionary"]["b"]) == {"1", "2", "3"} | ||
|
||
|
||
def test_bad_files(tmp_path, fixtures_path): | ||
"""Missing files should raise an error.""" | ||
with pytest.raises(MergeError): | ||
ini_file.merge_ini_files( | ||
fixtures_path / "missing.ini", | ||
fixtures_path / "new.ini", | ||
OVERWRITE, | ||
) | ||
|
||
with pytest.raises(MergeError): | ||
ini_file.merge_ini_files( | ||
fixtures_path / "existing.ini", | ||
fixtures_path / "missing.ini", | ||
OVERWRITE, | ||
) | ||
|
||
with pytest.raises(MergeError): | ||
ini_file.merge_ini_files( | ||
fixtures_path / "gibberish.txt", | ||
fixtures_path / "new.json", | ||
OVERWRITE, | ||
) | ||
|
||
|
||
def test_bad_strategy(tmp_path, fixtures_path): | ||
"""A bad strategy should raise an error.""" | ||
initial_file = fixtures_path / "existing.ini" | ||
existing_file = tmp_path / "existing.ini" | ||
shutil.copy(initial_file, existing_file) | ||
|
||
new_file = fixtures_path / "new.ini" | ||
|
||
with pytest.raises(MergeError): | ||
ini_file.merge_ini_files(new_file, existing_file, "not-a-stragegy") |