Skip to content

Commit

Permalink
Add handler (DiscoveryHandler) to discover unmanaged resources. (Issue
Browse files Browse the repository at this point in the history
…#6025, PR #6415)

# Description

- [x] implement `DiscoveryHandler`
- [x] Verify that the `Id` constructor and `resource_str` method are part of the stable API and documented.

This PR replaces #6264

Part of #6025

# Self Check:

- [x] Attached issue to pull request
- [ ] Changelog entry
- [x] Type annotations are present
- [x] Code is clear and sufficiently documented
- [x] No (preventable) type errors (check using make mypy or make mypy-diff)
- [x] Sufficient test cases (reproduces the bug/tests the requested feature)
- [x] Correct, in line with design
- [x] End user documentation is included or an issue is created for end-user documentation (#6270)
- [ ] ~~If this PR fixes a race condition in the test suite, also push the fix to the relevant stable branche(s) (see [test-fixes](https://internal.inmanta.com/development/core/tasks/build-master.html#test-fixes) for more info)~~
  • Loading branch information
arnaudsjs authored and inmantaci committed Aug 28, 2023
1 parent 943efb5 commit 80cb4b4
Show file tree
Hide file tree
Showing 7 changed files with 441 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
description: "Add handler (DiscoveryHandler) to discover unmanaged resources."
issue-nr: 6025
change-type: minor
destination-branches: [master]
sections:
feature: "{{description}}"
2 changes: 1 addition & 1 deletion docs/glossary.rst
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ Glossary
server.

infrastructure
That what Inmanta manages. This could be virtual machines with resources in these virtual
This is what Inmanta manages. This could be virtual machines with resources in these virtual
machines. Physical servers and their os. Containers or resources at a cloud provider without
any servers (e.g. "serverless")

Expand Down
175 changes: 131 additions & 44 deletions src/inmanta/agent/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
"""
import base64
import inspect
import json
import logging
import traceback
import typing
import uuid
from abc import ABC, abstractmethod
from collections import abc, defaultdict
from concurrent.futures import Future
from functools import partial
from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, Type, TypeVar, Union, cast, overload

from tornado import concurrent
Expand All @@ -33,7 +35,7 @@
from inmanta.agent import io
from inmanta.agent.cache import AgentCache
from inmanta.const import ParameterSource, ResourceState
from inmanta.data.model import AttributeStateChange, ResourceIdStr
from inmanta.data.model import AttributeStateChange, DiscoveredResource, ResourceIdStr
from inmanta.protocol import Result, json_encode
from inmanta.stable_api import stable_api
from inmanta.types import SimpleTypes
Expand All @@ -47,6 +49,10 @@
LOGGER = logging.getLogger(__name__)

T = TypeVar("T")
# A resource present in the model that describes the resources that should be discovered
TDiscovery = TypeVar("TDiscovery", bound=resources.DiscoveryResource)
# The type of elements produced by the resource discovery process.
TDiscovered = TypeVar("TDiscovered")
T_FUNC = TypeVar("T_FUNC", bound=Callable[..., Any])
TResource = TypeVar("TResource", bound=resources.Resource)

Expand Down Expand Up @@ -429,12 +435,13 @@ class HandlerAPI(ABC, Generic[TResource]):
The implementation of a handler should use the ``self._io`` instance to execute io operations. This io objects
makes abstraction of local or remote operations. See :class:`~inmanta.agent.io.local.LocalIO` for the available
operations.
:param agent: The agent that is executing this handler.
:param io: The io object to use.
"""

def __init__(self, agent: "inmanta.agent.agent.AgentInstance", io: Optional["IOBase"] = None) -> None:
"""
:param agent: The agent that is executing this handler.
:param io: The io object to use.
"""
self._agent = agent
if io is None:
raise Exception("Unsupported: no resource mgmt in RH")
Expand Down Expand Up @@ -548,30 +555,16 @@ def available(self, resource: TResource) -> bool:
"""
return True

@abstractmethod
def check_facts(self, ctx: HandlerContext, resource: TResource) -> dict[str, object]:
"""
This method is called by the agent to query for facts. It runs :func:`~inmanta.agent.handler.HandlerAPI.pre`
and :func:`~inmanta.agent.handler.HandlerAPI.post`. This method calls
:func:`~inmanta.agent.handler.HandlerAPI.facts` to do the actual querying.
This method is called by the agent to query for facts.
:param ctx: Context object to report changes and logs to the agent and server.
:param resource: The resource to query facts for.
:return: A dict with fact names as keys and facts values.
"""
try:
self.pre(ctx, resource)
facts = self.facts(ctx, resource)
finally:
try:
self.post(ctx, resource)
except Exception as e:
ctx.exception(
"An error occurred after getting facts about %(resource_id)s (exception: %(exception)s",
resource_id=resource.id,
exception=f"{e.__class__.__name__}('{e}')",
)

return facts
raise NotImplementedError()

def set_cache(self, cache: AgentCache) -> None:
"""
Expand Down Expand Up @@ -599,6 +592,7 @@ def do_reload(self, ctx: HandlerContext, resource: TResource) -> None:
:param ctx: Context object to report changes and logs to the agent and server.
:param resource: The resource to reload.
"""
pass

def pre(self, ctx: HandlerContext, resource: TResource) -> None:
"""
Expand All @@ -608,6 +602,7 @@ def pre(self, ctx: HandlerContext, resource: TResource) -> None:
:param ctx: Context object to report changes and logs to the agent and server.
:param resource: The resource being handled.
"""
pass

def post(self, ctx: HandlerContext, resource: TResource) -> None:
"""
Expand All @@ -616,6 +611,7 @@ def post(self, ctx: HandlerContext, resource: TResource) -> None:
:param ctx: Context object to report changes and logs to the agent and server.
:param resource: The resource being handled.
"""
pass

def facts(self, ctx: HandlerContext, resource: TResource) -> dict[str, object]:
"""
Expand Down Expand Up @@ -723,7 +719,7 @@ def call() -> abc.Awaitable[Result]:
@stable_api
class ResourceHandler(HandlerAPI[TResource]):
"""
A baseclass for classes that handle resources.
A class that handles resources.
"""

def _diff(self, current: TResource, desired: TResource) -> dict[str, dict[str, typing.Any]]:
Expand Down Expand Up @@ -783,16 +779,6 @@ def do_changes(self, ctx: HandlerContext, resource: TResource, changes: abc.Mapp
raise NotImplementedError()

def execute(self, ctx: HandlerContext, resource: TResource, dry_run: bool = False) -> None:
"""
Update the given resource. This method is called by the agent. Most handlers will not override this method
and will only override :func:`~inmanta.agent.handler.ResourceHandler.check_resource`, optionally
:func:`~inmanta.agent.handler.ResourceHandler.list_changes` and
:func:`~inmanta.agent.handler.ResourceHandler.do_changes`
:param ctx: Context object to report changes and logs to the agent and server.
:param resource: The resource to check the current state of.
:param dry_run: True will only determine the required changes but will not execute them.
"""
try:
self.pre(ctx, resource)

Expand All @@ -807,11 +793,10 @@ def execute(self, ctx: HandlerContext, resource: TResource, dry_run: bool = Fals
except SkipResource as e:
ctx.set_status(const.ResourceState.skipped)
ctx.warning(msg="Resource %(resource_id)s was skipped: %(reason)s", resource_id=resource.id, reason=e.args)

except Exception as e:
ctx.set_status(const.ResourceState.failed)
ctx.exception(
"An error occurred during deployment of %(resource_id)s (exception: %(exception)s",
"An error occurred during deployment of %(resource_id)s (exception: %(exception)s)",
resource_id=resource.id,
exception=f"{e.__class__.__name__}('{e}')",
)
Expand All @@ -820,11 +805,37 @@ def execute(self, ctx: HandlerContext, resource: TResource, dry_run: bool = Fals
self.post(ctx, resource)
except Exception as e:
ctx.exception(
"An error occurred after deployment of %(resource_id)s (exception: %(exception)s",
"An error occurred after deployment of %(resource_id)s (exception: %(exception)s)",
resource_id=resource.id,
exception=f"{e.__class__.__name__}('{e}')",
)

def check_facts(self, ctx: HandlerContext, resource: TResource) -> dict[str, object]:
"""
This method is called by the agent to query for facts. It runs :func:`~inmanta.agent.handler.HandlerAPI.pre`
and :func:`~inmanta.agent.handler.HandlerAPI.post`. This method calls
:func:`~inmanta.agent.handler.HandlerAPI.facts` to do the actual querying.
:param ctx: Context object to report changes and logs to the agent and server.
:param resource: The resource to query facts for.
:return: A dict with fact names as keys and facts values.
"""
facts = {}
try:
self.pre(ctx, resource)
facts = self.facts(ctx, resource)
finally:
try:
self.post(ctx, resource)
except Exception as e:
ctx.exception(
"An error occurred after getting facts about %(resource_id)s (exception: %(exception)s)",
resource_id=resource.id,
exception=f"{e.__class__.__name__}('{e}')",
)

return facts


TPurgeableResource = TypeVar("TPurgeableResource", bound=resources.PurgeableResource)

Expand Down Expand Up @@ -896,14 +907,7 @@ def calculate_diff(
"""
return self._diff(current, desired)

def execute(self, ctx: HandlerContext, resource: TPurgeableResource, dry_run: Optional[bool] = None) -> None:
"""
Update the given resource. This method is called by the agent. Override the CRUD methods of this class.
:param ctx: Context object to report changes and logs to the agent and server.
:param resource: The resource to check the current state of.
:param dry_run: True will only determine the required changes but will not execute them.
"""
def execute(self, ctx: HandlerContext, resource: TPurgeableResource, dry_run: bool = False) -> None:
try:
self.pre(ctx, resource)

Expand Down Expand Up @@ -958,7 +962,7 @@ def execute(self, ctx: HandlerContext, resource: TPurgeableResource, dry_run: Op
self.post(ctx, resource)
except Exception as e:
ctx.exception(
"An error occurred after deployment of %(resource_id)s (exception: %(exception)s",
"An error occurred after deployment of %(resource_id)s (exception: %(exception)s)",
resource_id=resource.id,
exception=f"{e.__class__.__name__}('{e}')",
)
Expand All @@ -968,6 +972,89 @@ def execute(self, ctx: HandlerContext, resource: TPurgeableResource, dry_run: Op
CRUDHandlerGeneric = CRUDHandler


@stable_api
class DiscoveryHandler(HandlerAPI[TDiscovery], Generic[TDiscovery, TDiscovered]):
"""
The DiscoveryHandler is generic with regard to two resource types:
- TDiscovery denotes the handler's Discovery Resource type, used to drive resource discovery. This is not a
conventional resource type expected to be deployed on a network, but rather a way to express
the intent to discover resources of the second type TDiscovered already present on the network.
- TDiscovered denotes the handler's Unmanaged Resource type. This is the type of the resources that have been
discovered and reported to the server. Objects of this type must be serializable.
"""

def check_facts(self, ctx: HandlerContext, resource: TDiscovery) -> Dict[str, object]:
return {}

@abstractmethod
def discover_resources(
self, ctx: HandlerContext, discovery_resource: TDiscovery
) -> abc.Mapping[ResourceIdStr, TDiscovered]:
"""
This method implements the resource discovery logic. This method will be called
by the handler during deployment of the corresponding discovery resource.
"""
raise NotImplementedError()

def execute(self, ctx: HandlerContext, resource: TDiscovery, dry_run: bool = False) -> None:
"""
Logic to perform during resource discovery. This method is called when the agent wants
to deploy the corresponding discovery resource. The default behaviour of this method is to call
the `discover_resources` method, serialize the returned values and report them to the server.
"""
if dry_run:
return

try:
self.pre(ctx, resource)

def _call_discovered_resource_create_batch(
discovered_resources: abc.Sequence[DiscoveredResource],
) -> typing.Awaitable[Result]:
return self.get_client().discovered_resource_create_batch(
tid=self._agent.environment, discovered_resources=discovered_resources
)

discovered_resources_raw: abc.Mapping[ResourceIdStr, TDiscovered] = self.discover_resources(ctx, resource)
discovered_resources: abc.Sequence[DiscoveredResource] = [
DiscoveredResource(discovered_resource_id=resource_id, values=json.loads(json_encode(values)))
for resource_id, values in discovered_resources_raw.items()
]
result = self.run_sync(partial(_call_discovered_resource_create_batch, discovered_resources))

if result.code != 200:
assert result.result is not None # Make mypy happy
ctx.set_status(const.ResourceState.failed)
error_msg_from_server = f": {result.result['message']}" if "message" in result.result else ""
ctx.error(
"Failed to report discovered resources to the server (status code: %(code)s)%(error_msg_from_server)s",
code=result.code,
error_msg_from_server=error_msg_from_server,
)
else:
ctx.set_status(const.ResourceState.deployed)
except SkipResource as e:
ctx.set_status(const.ResourceState.skipped)
ctx.warning(msg="Resource %(resource_id)s was skipped: %(reason)s", resource_id=resource.id, reason=e.args)
except Exception as e:
ctx.set_status(const.ResourceState.failed)
ctx.exception(
"An error occurred during deployment of %(resource_id)s (exception: %(exception)s)",
resource_id=resource.id,
exception=f"{e.__class__.__name__}('{e}')",
traceback=traceback.format_exc(),
)
finally:
try:
self.post(ctx, resource)
except Exception as e:
ctx.exception(
"An error occurred after deployment of %(resource_id)s (exception: %(exception)s)",
resource_id=resource.id,
exception=f"{e.__class__.__name__}('{e}')",
)


class Commander(object):
"""
This class handles commands
Expand Down
25 changes: 25 additions & 0 deletions src/inmanta/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,15 @@ class PurgeableResource(Resource):
purge_on_delete: bool


@stable_api
class DiscoveryResource(Resource):
"""
See :inmanta:entity:`std::DiscoveryResource` for more information.
"""

fields = ()


@stable_api
class ManagedResource(Resource):
"""
Expand Down Expand Up @@ -499,6 +508,13 @@ class Id(object):
"""

def __init__(self, entity_type: str, agent_name: str, attribute: str, attribute_value: str, version: int = 0) -> None:
"""
:attr entity_type: The resource type, as defined in the configuration model. For example :inmanta:entity:`std::File`.
:attr agent_name: The agent responsible for this resource.
:attr attribute: The key attribute that uniquely identifies this resource on the agent
:attr attribute_value: The corresponding value for this key attribute.
:attr version: The version number for this resource.
"""
self._entity_type = entity_type
self._agent_name = agent_name
self._attribute = attribute
Expand Down Expand Up @@ -553,6 +569,15 @@ def __eq__(self, other: object) -> bool:
return str(self) == str(other) and type(self) is type(other)

def resource_str(self) -> ResourceIdStr:
"""
String representation for this resource id with the following format:
<type>[<agent>,<attribute>=<value>]
- type: The resource type, as defined in the configuration model. For example :inmanta:entity:`std::File`.
- agent: The agent responsible for this resource.
- attribute: The key attribute that uniquely identifies this resource on the agent
- value: The corresponding value for this key attribute.
:return: Returns a :py:class:`inmanta.data.model.ResourceIdStr`
"""
return cast(
ResourceIdStr,
"%(type)s[%(agent)s,%(attribute)s=%(value)s]"
Expand Down
2 changes: 1 addition & 1 deletion tests/agent_server/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
logger = logging.getLogger("inmanta.test.server_agent")


async def get_agent(server, environment, *endpoints, hostname="nodes1"):
async def get_agent(server, environment, *endpoints, hostname="nodes1") -> Agent:
agentmanager = server.get_slice(SLICE_AGENT_MANAGER)
prelen = len(agentmanager.sessions)
agent = Agent(
Expand Down
Loading

0 comments on commit 80cb4b4

Please sign in to comment.