Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Callback plugins #175

Merged
merged 8 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pylint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pylint
pip install pylint ansible
pip install .
- name: Analysing the code with pylint
run: |
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ packages =
ansible_deployer.modules.runners
ansible_deployer.modules.validators
ansible_deployer.modules.database
ansible_deployer.plugins
install_requires =
pyyaml>=5.3.1
cerberus>=1.3.1
Expand Down
10 changes: 5 additions & 5 deletions source/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def main():

if options["subcommand"] in ("run", "verify"):
workdir = misc.create_workdir(start_ts, configuration.conf, logger.logger)
Loggers.set_logging_to_file(logger, workdir, start_ts, configuration.conf)
log_path = Loggers.set_logging_to_file(logger, workdir, start_ts, configuration.conf)
logger.flush_memory_handler(True, options["syslog"])
else:
logger.flush_memory_handler(False, options["syslog"])
Expand Down Expand Up @@ -154,14 +154,14 @@ def main():
if not validators.verify_task_permissions(selected_items, user_groups, config):
logger.logger.critical("Task forbidden")
sys.exit(errno.EPERM)
db_connector, db_path = DbSetup.connect_to_db(DbSetup(logger.logger, start_ts,
configuration.conf, options))
db_writer = DbWriter(logger.logger, db_connector, db_path)
runner = Runners(logger.logger, lock, workdir, start_ts_raw,
config["tasks"]["setup_hooks"])
config["tasks"]["setup_hooks"], log_path, db_path)
if not options["self_setup"]:
runner.setup_ansible(selected_items["commit"], configuration.conf_dir)
lock.lock_inventory(lockpath)
db_connector, db_path = DbSetup.connect_to_db(DbSetup(logger.logger, start_ts,
configuration.conf, options))
db_writer = DbWriter(logger.logger, db_connector, db_path)
sequence_record_dict = runner.run_playitem(config, options, inv_file, lockpath,
db_writer)
lock.unlock_inventory(lockpath)
Expand Down
3 changes: 3 additions & 0 deletions source/modules/database/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
"conf_dir_on": "text"
},
"play_item_tasks": {
"task_id": "integer primary key autoincrement",
"sequence_id": "text",
"task_name": "text",
"result": "text",
"hostname": "text",
"timestamp": "text",
"task_details": "text"
}
}
25 changes: 7 additions & 18 deletions source/modules/database/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import os
import datetime
import sqlite3
import yaml
from ansible_deployer.modules.database.schema import SCHEMAS


Expand Down Expand Up @@ -38,25 +37,15 @@ def create_table(self, table_name: str, columns: dict):
table_name, self.db_path, exc)
sys.exit(104)

def parse_yaml_output(self, stream: list, sequence_id: str):
"""Parse ansible output in yaml format"""
record_dict = {}
def parse_yaml_output_for_hosts(self, stream: list, sequence_id: str):
"""Get all hosts used in runner from runner output"""
# TODO should be taken from better source
record_hosts = []
try:
for no, line in enumerate(stream):
if "TASK" in line or "RUNNING HANDLER" in line:
task_name = line.split("]")[-2].split("[")[-1]
record_dict[task_name] = {}
for line in stream:
if "changed=true" in line or "changed=false" in line:
host_name = line.split("[")[1].split("]")[0]
yaml_string = "\n".join(stream[no+1:self.find_end_of_task(
stream[no+1:], no + 1)])
record_dict[task_name][host_name] = dict(SCHEMAS["play_item_tasks"])
record_dict[task_name][host_name]["task_name"] = task_name
record_dict[task_name][host_name]["sequence_id"] = sequence_id
record_dict[task_name][host_name]["hostname"] = host_name
record_dict[task_name][host_name]["task_details"] = \
yaml.safe_load(yaml_string)
return record_dict
record_hosts.append(line.split("[")[1].split("]")[0])
LegenJCdary marked this conversation as resolved.
Show resolved Hide resolved
return list(set(record_hosts))
except Exception as exc:
self.logger.critical("Failed parsing output stream to yaml, error was %s", exc)
sys.exit(102)
Expand Down
2 changes: 2 additions & 0 deletions source/modules/globalvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@
"raw_output": "raw_runner_output",
"conf_val": "conf_validation"
}
ANSIBLE_DEFAULT_CALLBACK_PLUGIN_PATH = '~/.ansible/plugins/callback:/usr/share/ansible/plugins/' \
'callback'
2 changes: 2 additions & 0 deletions source/modules/outputs/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def set_logging_to_file(self, log_dir: str, timestamp: str, conf: dict):
file_handler.setLevel(logging.DEBUG)
self.logger.addHandler(file_handler)

return log_path

def flush_memory_handler(self, subcommand_flag: bool, syslog: bool):
"""Flush initial log messages from memory handler to logfile"""
if syslog:
Expand Down
25 changes: 17 additions & 8 deletions source/modules/runners/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@
import os
import sys
import subprocess
from ansible_deployer.modules.globalvars import ANSIBLE_DEFAULT_CALLBACK_PLUGIN_PATH
from ansible_deployer.modules.outputs.formatting import Formatters

class Runners:
"""Class handling ansible hooks and ansible plays execution"""

def __init__(self, logger, lock_obj, workdir, start_ts_raw, setup_hooks):
def __init__(self, logger, lock_obj, workdir, start_ts_raw, setup_hooks, log_path, db_path):
self.logger = logger
self.lock_obj = lock_obj
self.workdir = workdir
self.start_ts_raw = start_ts_raw
self.setup_hooks = setup_hooks
self.sequence_id = os.path.basename(self.workdir)
self.log_path = log_path
self.db_path = db_path

@staticmethod
def reassign_commit_and_workdir(commit: str, workdir: str):
Expand Down Expand Up @@ -141,10 +144,8 @@ def run_playitem(self, config: dict, options: dict, inventory: str, lockpath: st
proc.communicate()
format_obj = Formatters(self.logger)
parsed_std = format_obj.format_ansible_output(returned)
play_host_list = db_writer.write_records(db_writer.parse_yaml_output(
parsed_std["complete"], self.sequence_id))
for host in play_host_list:
host_list.append(host)
host_list = db_writer.parse_yaml_output_for_hosts(parsed_std["complete"],
self.sequence_id)
sequence_records = db_writer.start_sequence_dict(host_list,
self.setup_hooks, options,
self.start_ts_raw,
Expand Down Expand Up @@ -188,8 +189,7 @@ def get_tags_for_task(config: dict, options: dict):

return tags, skip_tags

@staticmethod
def construct_command(playitem: str, inventory: str, config: dict, options: dict):
def construct_command(self, playitem: str, inventory: str, config: dict, options: dict):
"""Create final ansible command from available variables"""
tags, skip_tags = Runners.get_tags_for_task(config, options)

Expand All @@ -216,6 +216,15 @@ def construct_command(playitem: str, inventory: str, config: dict, options: dict
if options["check_mode"]:
command.append("-C")
command_env=dict(os.environ, ANSIBLE_STDOUT_CALLBACK="yaml", ANSIBLE_NOCOWS="1",
ANSIBLE_LOAD_CALLBACK_PLUGINS="1")
ANSIBLE_LOAD_CALLBACK_PLUGINS="1", LOG_PLAYS_PATH=self.log_path,
ANSIBLE_CALLBACKS_ENABLED="log_plays_adjusted,sqlite_deployer",
ANSIBLE_CALLBACK_PLUGINS=self.append_to_ansible_callbacks_path(),
SQLITE_PATH=self.db_path, SEQUENCE_ID=self.sequence_id)

return command, command_env

@staticmethod
def append_to_ansible_callbacks_path():
"""Create final searchable path for ansible callback plugins"""
plugin_path = os.path.join(os.path.realpath(__file__).rsplit(os.sep, 3)[0], "plugins")
return f'{ANSIBLE_DEFAULT_CALLBACK_PLUGIN_PATH}:{plugin_path}'
105 changes: 105 additions & 0 deletions source/plugins/log_plays_adjusted.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""Ansible callback plugin that logs to a file, based on community.general.log_plays 2.0"""
# -*- coding: utf-8 -*-

# pylint: disable=duplicate-code

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import os
import time
import json

from ansible.utils.path import makedirs_safe
from ansible.module_utils.common.text.converters import to_bytes
from ansible.module_utils.common._collections_compat import MutableMapping
from ansible.parsing.ajson import AnsibleJSONEncoder
from ansible.plugins.callback import CallbackBase


class CallbackModule(CallbackBase):
"""
logs playbook results in defined logfile
"""
CALLBACK_VERSION = 2.0
CALLBACK_TYPE = 'notification'
CALLBACK_NAME = 'log_plays_adjusted'
CALLBACK_NEEDS_WHITELIST = True

TIME_FORMAT = "%b %d %Y %H:%M:%S"
MSG_FORMAT = "%(now)s - %(playbook)s - %(task_name)s - %(category)s\n%(data)s\n\n"

def __init__(self):

super().__init__()
self.log_path = os.environ["LOG_PLAYS_PATH"]

def set_options(self, task_keys=None, var_options=None, direct=None):
"""Set ansible options and try to create log directory"""
super().set_options(task_keys=task_keys, var_options=var_options, direct=direct)

log_dir = os.path.dirname(self.log_path)
if not os.path.exists(log_dir):
makedirs_safe(log_dir)

def log(self, result, category):
"""Log message to log file"""
data = result._result
if isinstance(data, MutableMapping):
if '_ansible_verbose_override' in data:
# avoid logging extraneous data
data = 'omitted'
else:
data = data.copy()
invocation = data.pop('invocation', None)
data = json.dumps(data, cls=AnsibleJSONEncoder)
if invocation is not None:
data = f"{json.dumps(invocation)} => {data} "

now = time.strftime(self.TIME_FORMAT, time.localtime())

msg = to_bytes(
self.MSG_FORMAT
% {
"now": now,
"playbook": self.playbook,
"task_name": result._task.name,
"category": category,
"data": data
}
)
with open(self.log_path, "ab") as fd:
fd.write(msg)

def v2_runner_on_failed(self, result, ignore_errors=False):
"""Log result when runner failed"""
self.log(result, 'FAILED')

def v2_runner_on_ok(self, result):
"""Log result when runner passed"""
self.log(result, 'OK')

def v2_runner_on_skipped(self, result):
"""Log result when runner skipped host"""
self.log(result, 'SKIPPED')

def v2_runner_on_unreachable(self, result):
"""Log result when host was unreachable"""
self.log(result, 'UNREACHABLE')

def v2_runner_on_async_failed(self, result):
"""Log result when async failed"""
self.log(result, 'ASYNC_FAILED')

def v2_playbook_on_start(self, playbook):
"""Log runner start"""
# pylint: disable=attribute-defined-outside-init
self.playbook = playbook._file_name

def v2_playbook_on_import_for_host(self, result, imported_file):
"""Log runner import"""
self.log(result, 'IMPORTED')

def v2_playbook_on_not_import_for_host(self, result, missing_file):
"""Log runner not import"""
self.log(result, 'NOTIMPORTED')
Loading