Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue/6025 implement discovery handler (2) #6415

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
cb1198e
Small fixes
arnaudsjs Aug 17, 2023
f797916
Add tests
arnaudsjs Aug 17, 2023
a1011e6
Enhance tests
arnaudsjs Aug 18, 2023
c19ae69
Fix tests
arnaudsjs Aug 18, 2023
fe76a87
Factor out common setup, teardown and error handling logic
arnaudsjs Aug 18, 2023
540f6b8
Fix typing
arnaudsjs Aug 18, 2023
c9bbe75
Small enhancements
arnaudsjs Aug 21, 2023
50e1df1
Fix formatting
arnaudsjs Aug 21, 2023
634f304
Merge branch 'master' into issue/6025-implement-discovery-handler-bis
arnaudsjs Aug 21, 2023
1cf9641
Add changelog entry
arnaudsjs Aug 21, 2023
a7282af
Revert "Factor out common setup, teardown and error handling logic"
arnaudsjs Aug 23, 2023
44281e6
Revert "Revert "Factor out common setup, teardown and error handling …
arnaudsjs Aug 23, 2023
be4868c
Update src/inmanta/agent/handler.py
arnaudsjs Aug 23, 2023
caf9ab1
Update src/inmanta/agent/handler.py
arnaudsjs Aug 23, 2023
28ba0a2
Update changelogs/unreleased/6025-implement-discovery-resource-handle…
arnaudsjs Aug 23, 2023
2846025
Implement review comments
arnaudsjs Aug 23, 2023
322efbc
Merge branch 'issue/6025-implement-discovery-handler-bis' of github.c…
arnaudsjs Aug 23, 2023
57feffb
Merge branch 'master' into issue/6025-implement-discovery-handler-bis
arnaudsjs Aug 23, 2023
ab2c3a9
Fix changelog message
arnaudsjs Aug 23, 2023
2266406
Fix typing
arnaudsjs Aug 23, 2023
30e8a3e
Update changelogs/unreleased/6025-implement-discovery-resource-handle…
arnaudsjs Aug 23, 2023
f7ab0a8
Revert move error handling logic
arnaudsjs Aug 25, 2023
8426f89
Merge branch 'master' into issue/6025-implement-discovery-handler-bis
arnaudsjs Aug 25, 2023
1959d44
Implement review comments
arnaudsjs Aug 25, 2023
80065dd
Small fix
arnaudsjs Aug 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
description: "Add handler (DiscoveryHandler) to discover unmanaged resources."
issue-nr: 6025
change-type: minor
destination-branches: [master]
sections:
feature: "{{description}}"
2 changes: 1 addition & 1 deletion docs/glossary.rst
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ Glossary
server.

infrastructure
That what Inmanta manages. This could be virtual machines with resources in these virtual
This is what Inmanta manages. This could be virtual machines with resources in these virtual
machines. Physical servers and their os. Containers or resources at a cloud provider without
any servers (e.g. "serverless")

Expand Down
175 changes: 131 additions & 44 deletions src/inmanta/agent/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
"""
import base64
import inspect
import json
import logging
import traceback
import typing
import uuid
from abc import ABC, abstractmethod
from collections import abc, defaultdict
from concurrent.futures import Future
from functools import partial
from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, Type, TypeVar, Union, cast, overload

from tornado import concurrent
Expand All @@ -33,7 +35,7 @@
from inmanta.agent import io
from inmanta.agent.cache import AgentCache
from inmanta.const import ParameterSource, ResourceState
from inmanta.data.model import AttributeStateChange, ResourceIdStr
from inmanta.data.model import AttributeStateChange, DiscoveredResource, ResourceIdStr
from inmanta.protocol import Result, json_encode
from inmanta.stable_api import stable_api
from inmanta.types import SimpleTypes
Expand All @@ -47,6 +49,10 @@
LOGGER = logging.getLogger(__name__)

T = TypeVar("T")
# A resource present in the model that describes the resources that should be discovered
TDiscovery = TypeVar("TDiscovery", bound=resources.DiscoveryResource)
# The type of elements produced by the resource discovery process.
TDiscovered = TypeVar("TDiscovered")
T_FUNC = TypeVar("T_FUNC", bound=Callable[..., Any])
TResource = TypeVar("TResource", bound=resources.Resource)

Expand Down Expand Up @@ -429,12 +435,13 @@ class HandlerAPI(ABC, Generic[TResource]):
The implementation of a handler should use the ``self._io`` instance to execute io operations. This io objects
makes abstraction of local or remote operations. See :class:`~inmanta.agent.io.local.LocalIO` for the available
operations.

:param agent: The agent that is executing this handler.
:param io: The io object to use.
"""

def __init__(self, agent: "inmanta.agent.agent.AgentInstance", io: Optional["IOBase"] = None) -> None:
"""
:param agent: The agent that is executing this handler.
:param io: The io object to use.
"""
self._agent = agent
if io is None:
raise Exception("Unsupported: no resource mgmt in RH")
Expand Down Expand Up @@ -548,30 +555,16 @@ def available(self, resource: TResource) -> bool:
"""
return True

@abstractmethod
def check_facts(self, ctx: HandlerContext, resource: TResource) -> dict[str, object]:
"""
This method is called by the agent to query for facts. It runs :func:`~inmanta.agent.handler.HandlerAPI.pre`
and :func:`~inmanta.agent.handler.HandlerAPI.post`. This method calls
:func:`~inmanta.agent.handler.HandlerAPI.facts` to do the actual querying.
This method is called by the agent to query for facts.

:param ctx: Context object to report changes and logs to the agent and server.
:param resource: The resource to query facts for.
:return: A dict with fact names as keys and facts values.
"""
try:
self.pre(ctx, resource)
facts = self.facts(ctx, resource)
finally:
try:
self.post(ctx, resource)
except Exception as e:
ctx.exception(
"An error occurred after getting facts about %(resource_id)s (exception: %(exception)s",
resource_id=resource.id,
exception=f"{e.__class__.__name__}('{e}')",
)

return facts
raise NotImplementedError()

def set_cache(self, cache: AgentCache) -> None:
"""
Expand Down Expand Up @@ -599,6 +592,7 @@ def do_reload(self, ctx: HandlerContext, resource: TResource) -> None:
:param ctx: Context object to report changes and logs to the agent and server.
:param resource: The resource to reload.
"""
pass

def pre(self, ctx: HandlerContext, resource: TResource) -> None:
"""
Expand All @@ -608,6 +602,7 @@ def pre(self, ctx: HandlerContext, resource: TResource) -> None:
:param ctx: Context object to report changes and logs to the agent and server.
:param resource: The resource being handled.
"""
pass

def post(self, ctx: HandlerContext, resource: TResource) -> None:
"""
Expand All @@ -616,6 +611,7 @@ def post(self, ctx: HandlerContext, resource: TResource) -> None:
:param ctx: Context object to report changes and logs to the agent and server.
:param resource: The resource being handled.
"""
pass

def facts(self, ctx: HandlerContext, resource: TResource) -> dict[str, object]:
"""
Expand Down Expand Up @@ -723,7 +719,7 @@ def call() -> abc.Awaitable[Result]:
@stable_api
class ResourceHandler(HandlerAPI[TResource]):
"""
A baseclass for classes that handle resources.
A class that handles resources.
"""

def _diff(self, current: TResource, desired: TResource) -> dict[str, dict[str, typing.Any]]:
Expand Down Expand Up @@ -783,16 +779,6 @@ def do_changes(self, ctx: HandlerContext, resource: TResource, changes: abc.Mapp
raise NotImplementedError()

def execute(self, ctx: HandlerContext, resource: TResource, dry_run: bool = False) -> None:
"""
Update the given resource. This method is called by the agent. Most handlers will not override this method
and will only override :func:`~inmanta.agent.handler.ResourceHandler.check_resource`, optionally
:func:`~inmanta.agent.handler.ResourceHandler.list_changes` and
:func:`~inmanta.agent.handler.ResourceHandler.do_changes`

:param ctx: Context object to report changes and logs to the agent and server.
:param resource: The resource to check the current state of.
:param dry_run: True will only determine the required changes but will not execute them.
"""
try:
self.pre(ctx, resource)

Expand All @@ -807,11 +793,10 @@ def execute(self, ctx: HandlerContext, resource: TResource, dry_run: bool = Fals
except SkipResource as e:
ctx.set_status(const.ResourceState.skipped)
ctx.warning(msg="Resource %(resource_id)s was skipped: %(reason)s", resource_id=resource.id, reason=e.args)

except Exception as e:
ctx.set_status(const.ResourceState.failed)
ctx.exception(
"An error occurred during deployment of %(resource_id)s (exception: %(exception)s",
"An error occurred during deployment of %(resource_id)s (exception: %(exception)s)",
resource_id=resource.id,
exception=f"{e.__class__.__name__}('{e}')",
)
Expand All @@ -820,11 +805,37 @@ def execute(self, ctx: HandlerContext, resource: TResource, dry_run: bool = Fals
self.post(ctx, resource)
except Exception as e:
ctx.exception(
"An error occurred after deployment of %(resource_id)s (exception: %(exception)s",
"An error occurred after deployment of %(resource_id)s (exception: %(exception)s)",
resource_id=resource.id,
exception=f"{e.__class__.__name__}('{e}')",
)

def check_facts(self, ctx: HandlerContext, resource: TResource) -> dict[str, object]:
"""
This method is called by the agent to query for facts. It runs :func:`~inmanta.agent.handler.HandlerAPI.pre`
and :func:`~inmanta.agent.handler.HandlerAPI.post`. This method calls
:func:`~inmanta.agent.handler.HandlerAPI.facts` to do the actual querying.

:param ctx: Context object to report changes and logs to the agent and server.
:param resource: The resource to query facts for.
:return: A dict with fact names as keys and facts values.
"""
facts = {}
try:
self.pre(ctx, resource)
facts = self.facts(ctx, resource)
finally:
try:
self.post(ctx, resource)
except Exception as e:
ctx.exception(
"An error occurred after getting facts about %(resource_id)s (exception: %(exception)s)",
resource_id=resource.id,
exception=f"{e.__class__.__name__}('{e}')",
)

return facts


TPurgeableResource = TypeVar("TPurgeableResource", bound=resources.PurgeableResource)

Expand Down Expand Up @@ -896,14 +907,7 @@ def calculate_diff(
"""
return self._diff(current, desired)

def execute(self, ctx: HandlerContext, resource: TPurgeableResource, dry_run: Optional[bool] = None) -> None:
"""
Update the given resource. This method is called by the agent. Override the CRUD methods of this class.

:param ctx: Context object to report changes and logs to the agent and server.
:param resource: The resource to check the current state of.
:param dry_run: True will only determine the required changes but will not execute them.
"""
def execute(self, ctx: HandlerContext, resource: TPurgeableResource, dry_run: bool = False) -> None:
try:
self.pre(ctx, resource)

Expand Down Expand Up @@ -958,7 +962,7 @@ def execute(self, ctx: HandlerContext, resource: TPurgeableResource, dry_run: Op
self.post(ctx, resource)
except Exception as e:
ctx.exception(
"An error occurred after deployment of %(resource_id)s (exception: %(exception)s",
"An error occurred after deployment of %(resource_id)s (exception: %(exception)s)",
resource_id=resource.id,
exception=f"{e.__class__.__name__}('{e}')",
)
Expand All @@ -968,6 +972,89 @@ def execute(self, ctx: HandlerContext, resource: TPurgeableResource, dry_run: Op
CRUDHandlerGeneric = CRUDHandler


@stable_api
class DiscoveryHandler(HandlerAPI[TDiscovery], Generic[TDiscovery, TDiscovered]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, I had no clue it was required to list all type vars here. I really can't wait for the 3.12 syntax.

"""
The DiscoveryHandler is generic with regard to two resource types:
- TDiscovery denotes the handler's Discovery Resource type, used to drive resource discovery. This is not a
conventional resource type expected to be deployed on a network, but rather a way to express
the intent to discover resources of the second type TDiscovered already present on the network.
- TDiscovered denotes the handler's Unmanaged Resource type. This is the type of the resources that have been
discovered and reported to the server. Objects of this type must be serializable.
"""

def check_facts(self, ctx: HandlerContext, resource: TDiscovery) -> Dict[str, object]:
return {}

@abstractmethod
def discover_resources(
self, ctx: HandlerContext, discovery_resource: TDiscovery
) -> abc.Mapping[ResourceIdStr, TDiscovered]:
"""
This method implements the resource discovery logic. This method will be called
by the handler during deployment of the corresponding discovery resource.
"""
raise NotImplementedError()

def execute(self, ctx: HandlerContext, resource: TDiscovery, dry_run: bool = False) -> None:
"""
Logic to perform during resource discovery. This method is called when the agent wants
to deploy the corresponding discovery resource. The default behaviour of this method is to call
the `discover_resources` method, serialize the returned values and report them to the server.
"""
if dry_run:
return

try:
self.pre(ctx, resource)

def _call_discovered_resource_create_batch(
discovered_resources: abc.Sequence[DiscoveredResource],
) -> typing.Awaitable[Result]:
return self.get_client().discovered_resource_create_batch(
tid=self._agent.environment, discovered_resources=discovered_resources
)

discovered_resources_raw: abc.Mapping[ResourceIdStr, TDiscovered] = self.discover_resources(ctx, resource)
discovered_resources: abc.Sequence[DiscoveredResource] = [
DiscoveredResource(discovered_resource_id=resource_id, values=json.loads(json_encode(values)))
for resource_id, values in discovered_resources_raw.items()
]
result = self.run_sync(partial(_call_discovered_resource_create_batch, discovered_resources))

if result.code != 200:
assert result.result is not None # Make mypy happy
ctx.set_status(const.ResourceState.failed)
error_msg_from_server = f": {result.result['message']}" if "message" in result.result else ""
ctx.error(
"Failed to report discovered resources to the server (status code: %(code)s)%(error_msg_from_server)s",
code=result.code,
error_msg_from_server=error_msg_from_server,
)
else:
ctx.set_status(const.ResourceState.deployed)
except SkipResource as e:
ctx.set_status(const.ResourceState.skipped)
ctx.warning(msg="Resource %(resource_id)s was skipped: %(reason)s", resource_id=resource.id, reason=e.args)
except Exception as e:
ctx.set_status(const.ResourceState.failed)
ctx.exception(
"An error occurred during deployment of %(resource_id)s (exception: %(exception)s)",
resource_id=resource.id,
exception=f"{e.__class__.__name__}('{e}')",
traceback=traceback.format_exc(),
)
finally:
try:
self.post(ctx, resource)
except Exception as e:
ctx.exception(
"An error occurred after deployment of %(resource_id)s (exception: %(exception)s)",
resource_id=resource.id,
exception=f"{e.__class__.__name__}('{e}')",
)


class Commander(object):
"""
This class handles commands
Expand Down
25 changes: 25 additions & 0 deletions src/inmanta/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,15 @@ class PurgeableResource(Resource):
purge_on_delete: bool


@stable_api
class DiscoveryResource(Resource):
"""
See :inmanta:entity:`std::DiscoveryResource` for more information.
"""

fields = ()


@stable_api
class ManagedResource(Resource):
"""
Expand Down Expand Up @@ -499,6 +508,13 @@ class Id(object):
"""

def __init__(self, entity_type: str, agent_name: str, attribute: str, attribute_value: str, version: int = 0) -> None:
"""
:attr entity_type: The resource type, as defined in the configuration model. For example :inmanta:entity:`std::File`.
:attr agent_name: The agent responsible for this resource.
:attr attribute: The key attribute that uniquely identifies this resource on the agent
:attr attribute_value: The corresponding value for this key attribute.
:attr version: The version number for this resource.
"""
self._entity_type = entity_type
self._agent_name = agent_name
self._attribute = attribute
Expand Down Expand Up @@ -553,6 +569,15 @@ def __eq__(self, other: object) -> bool:
return str(self) == str(other) and type(self) is type(other)

def resource_str(self) -> ResourceIdStr:
"""
String representation for this resource id with the following format:
<type>[<agent>,<attribute>=<value>]
- type: The resource type, as defined in the configuration model. For example :inmanta:entity:`std::File`.
- agent: The agent responsible for this resource.
- attribute: The key attribute that uniquely identifies this resource on the agent
- value: The corresponding value for this key attribute.
:return: Returns a :py:class:`inmanta.data.model.ResourceIdStr`
"""
return cast(
ResourceIdStr,
"%(type)s[%(agent)s,%(attribute)s=%(value)s]"
Expand Down
2 changes: 1 addition & 1 deletion tests/agent_server/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
logger = logging.getLogger("inmanta.test.server_agent")


async def get_agent(server, environment, *endpoints, hostname="nodes1"):
async def get_agent(server, environment, *endpoints, hostname="nodes1") -> Agent:
agentmanager = server.get_slice(SLICE_AGENT_MANAGER)
prelen = len(agentmanager.sessions)
agent = Agent(
Expand Down
Loading