Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue/6025 implement discovery handler #6264

Closed
wants to merge 68 commits into from
Closed
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
b55d152
[WIP]
Hugo-Inmanta Jul 7, 2023
cad6ced
[FIX] fix broken anchor in link
Hugo-Inmanta Jul 7, 2023
31dec93
[WIP]
Hugo-Inmanta Jul 7, 2023
8dba6f1
[WIP] Implement top-level handler ABC
Hugo-Inmanta Jul 10, 2023
769ab33
[FIX] remove unrelated change
Hugo-Inmanta Jul 10, 2023
5a6919f
[WIP]
Hugo-Inmanta Jul 10, 2023
15856ba
[WIP]
Hugo-Inmanta Jul 11, 2023
61a75cb
[FIX]
Hugo-Inmanta Jul 12, 2023
1b0b7c9
Merge branch 'issue/6025-implement-top-level-handler-ABC' into issue/…
Hugo-Inmanta Jul 12, 2023
cfc0ce3
[DOC] add documentation for Id and resource_str
Hugo-Inmanta Jul 12, 2023
9e010ca
[WIP]
Hugo-Inmanta Jul 12, 2023
247b5e9
[WIP]
Hugo-Inmanta Jul 12, 2023
7f02a3c
[WIP]
Hugo-Inmanta Jul 13, 2023
1357fef
[WIP]
Hugo-Inmanta Jul 13, 2023
69f5439
Merge branch 'master' into issue/6025-implement-discovery-handler
Hugo-Inmanta Jul 13, 2023
6996342
[WIP]
Hugo-Inmanta Jul 13, 2023
9b75118
[WIP]
Hugo-Inmanta Jul 13, 2023
8f0092d
[WIP]
Hugo-Inmanta Jul 13, 2023
1f010bb
[GIT] move documentation to a separate PR
Hugo-Inmanta Jul 13, 2023
af7fbdd
[FORMAT]
Hugo-Inmanta Jul 17, 2023
4549578
[FIX] PEP8
Hugo-Inmanta Jul 17, 2023
5bcc5e4
Update src/inmanta/agent/handler.py
Hugo-Inmanta Jul 19, 2023
9819c83
[FIX] fix typo
Hugo-Inmanta Jul 17, 2023
466dfda
[WIP]
Hugo-Inmanta Jul 19, 2023
e6c65cd
[WIP]
Hugo-Inmanta Jul 19, 2023
982db23
Merge branch 'master' into issue/6025-implement-top-level-handler-ABC
Hugo-Inmanta Jul 19, 2023
d1b54b0
[FORMAT]
Hugo-Inmanta Jul 19, 2023
c6d5a69
[DOC] add changelog
Hugo-Inmanta Jul 19, 2023
172fc07
Merge branch 'issue/6025-implement-top-level-handler-ABC' into issue/…
Hugo-Inmanta Jul 19, 2023
be3ea06
[FORMAT] comply to black/pep8
Hugo-Inmanta Jul 19, 2023
1a70961
[WIP]
Hugo-Inmanta Jul 19, 2023
c4274b8
[WIP]
Hugo-Inmanta Jul 19, 2023
62f9d3c
[FORMAT]
Hugo-Inmanta Jul 19, 2023
d7c7996
[WIP]
Hugo-Inmanta Jul 19, 2023
b1dcdf9
[WIP]
Hugo-Inmanta Jul 19, 2023
3a22bc6
[WIP]
Hugo-Inmanta Jul 19, 2023
2b67264
[WIP]
Hugo-Inmanta Jul 21, 2023
b80dafa
[WIP]
Hugo-Inmanta Jul 21, 2023
3fd115d
Merge branch 'master' into issue/6025-implement-discovery-handler
Hugo-Inmanta Jul 21, 2023
66d5008
Update src/inmanta/agent/handler.py
Hugo-Inmanta Jul 21, 2023
ce2330a
[WIP]
Hugo-Inmanta Jul 21, 2023
5312f01
Merge remote-tracking branch 'refs/remotes/origin/issue/6025-implemen…
Hugo-Inmanta Jul 21, 2023
a14cdfd
[WIP]
Hugo-Inmanta Jul 21, 2023
cd84b14
[WIP]
Hugo-Inmanta Jul 21, 2023
4db9db7
[WIP]
Hugo-Inmanta Jul 21, 2023
705024c
[FORMAT] comply to black/pep8
Hugo-Inmanta Jul 24, 2023
806acd1
[DOC]
Hugo-Inmanta Jul 24, 2023
01023e6
[WIP]
Hugo-Inmanta Jul 24, 2023
5c62a8a
Merge branch 'issue/6025-implement-top-level-handler-ABC' into issue/…
Hugo-Inmanta Jul 24, 2023
e863677
Merge branch 'master' into issue/6025-implement-discovery-handler
Hugo-Inmanta Jul 25, 2023
cf516ae
[WIP]
Hugo-Inmanta Jul 25, 2023
ed99bd0
[WIP]
Hugo-Inmanta Jul 25, 2023
3357e12
[WIP]
Hugo-Inmanta Jul 25, 2023
8e894c2
[WIP]
Hugo-Inmanta Jul 25, 2023
8127220
[WIP]
Hugo-Inmanta Jul 25, 2023
806e730
proposal for handler api class
wouterdb Jul 31, 2023
2f3555b
Merge branch 'issue/handler_abc_wdb' into issue/6025-implement-top-le…
Hugo-Inmanta Aug 1, 2023
f3473eb
[WIP]
Hugo-Inmanta Aug 1, 2023
d99ceee
Merge branch 'issue/6025-implement-top-level-handler-ABC' into issue/…
Hugo-Inmanta Aug 1, 2023
72a61bb
[WIP]
Hugo-Inmanta Aug 1, 2023
4c97da5
Update src/inmanta/agent/handler.py
Hugo-Inmanta Aug 2, 2023
90f7509
Merge branch 'issue/6025-implement-top-level-handler-ABC' into issue/…
Hugo-Inmanta Aug 2, 2023
221a0f0
[WIP]
Hugo-Inmanta Aug 2, 2023
f1f37ed
[WIP]
Hugo-Inmanta Aug 2, 2023
1fa818d
Merge branch 'master' into issue/6025-implement-discovery-handler
Hugo-Inmanta Aug 2, 2023
be04e9a
[WIP]
Hugo-Inmanta Aug 2, 2023
334fb7f
Merge branch 'master' into issue/6025-implement-discovery-handler
Hugo-Inmanta Aug 2, 2023
ebb3580
[FIX] fix bound type
Hugo-Inmanta Aug 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 169 additions & 50 deletions src/inmanta/agent/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,19 @@
import typing
import uuid
from abc import ABC, abstractmethod
from collections import defaultdict
from collections import abc, defaultdict
from concurrent.futures import Future
from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, Type, TypeVar, Union, cast, overload

import pydantic
from tornado import concurrent

import inmanta
from inmanta import const, data, protocol, resources
from inmanta.agent import io
from inmanta.agent.cache import AgentCache
from inmanta.const import ParameterSource, ResourceState
from inmanta.data.model import AttributeStateChange, ResourceIdStr
from inmanta.data.model import AttributeStateChange, DiscoveredResource, ResourceIdStr
from inmanta.protocol import Result, json_encode
from inmanta.stable_api import stable_api
from inmanta.types import SimpleTypes
Expand All @@ -47,6 +48,10 @@
LOGGER = logging.getLogger(__name__)

T = TypeVar("T")
# Discovery Resource Type
DRT = TypeVar("DRT", bound=resources.Resource)
# Unmanaged Resource Type
URT = TypeVar("URT", bound=pydantic.BaseModel) # bound=... enough to enforce the fact it needs to be serializable ?
sanderr marked this conversation as resolved.
Show resolved Hide resolved
T_FUNC = TypeVar("T_FUNC", bound=Callable[..., Any])


Expand All @@ -55,7 +60,7 @@ class provider(object): # noqa: N801
"""
A decorator that registers a new handler.

:param resource_type: The type of the resource this handler provides an implementation for.
:param resource_type: The type of the resource this handler is responsible for.
For example, :inmanta:entity:`std::File`
:param name: A name to reference this provider.
"""
Expand Down Expand Up @@ -419,34 +424,45 @@ def _log_msg(self, level: int, msg: str, *args: object, exc_info: bool = False,


@stable_api
class ResourceHandler(object):
class HandlerABC(ABC):
sanderr marked this conversation as resolved.
Show resolved Hide resolved
"""
Top-level abstract base class all handlers should inherit from.
"""
A baseclass for classes that handle resources. New handler are registered with the
:func:`~inmanta.agent.handler.provider` decorator.

The implementation of a handler should use the ``self._io`` instance to execute io operations. This io objects
makes abstraction of local or remote operations. See :class:`~inmanta.agent.io.local.LocalIO` for the available
operations.
# _client: Optional[protocol.SessionClient] = None
# _agent: Optional[inmanta.agent.agent.AgentInstance] = None
# _io_loop = None
sanderr marked this conversation as resolved.
Show resolved Hide resolved

:param agent: The agent that is executing this handler.
:param io: The io object to use.
"""
def pre(self, ctx: HandlerContext, resource: resources.Resource) -> None:
"""
Method executed before a handler operation (Facts, dryrun, real deployment, ...) is executed. Override this method
to run before an operation.

def __init__(self, agent: "inmanta.agent.agent.AgentInstance", io: Optional["IOBase"] = None) -> None:
self._agent = agent
:param ctx: Context object to report changes and logs to the agent and server.
:param resource: The resource to query facts for.
"""

if io is None:
raise Exception("Unsupported: no resource mgmt in RH")
else:
self._io = io
def post(self, ctx: HandlerContext, resource: resources.Resource) -> None:
"""
Method executed after an operation. Override this method to run after an operation.

self._client: Optional[protocol.SessionClient] = None
# explicit ioloop reference, as we don't want the ioloop for the current thread, but the one for the agent
self._ioloop = agent.process._io_loop
:param ctx: Context object to report changes and logs to the agent and server.
:param resource: The resource to query facts for.
"""

def get_client(self) -> protocol.SessionClient:
"""
Get the client instance that identifies itself with the agent session.

:return: A client that is associated with the session of the agent that executes this handler.
"""
if self._client is None:
self._client = protocol.SessionClient("agent", self._agent.sessionid)
return self._client

def run_sync(self, func: typing.Callable[[], typing.Awaitable[T]]) -> T:
"""
Run a the given async function on the ioloop of the agent. It will block the current thread until the future
Run the given async function on the ioloop of the agent. It will block the current thread until the future
resolves.

:param func: A function that returns a yieldable future.
Expand All @@ -470,18 +486,35 @@ def run() -> None:

return f.result()

def set_cache(self, cache: AgentCache) -> None:
self.cache = cache

def get_client(self) -> protocol.SessionClient:
"""
Get the client instance that identifies itself with the agent session.
@stable_api
class ResourceHandler(HandlerABC):
"""
A baseclass for classes that handle resources. New handler are registered with the
sanderr marked this conversation as resolved.
Show resolved Hide resolved
:func:`~inmanta.agent.handler.provider` decorator.

:return: A client that is associated with the session of the agent that executes this handler.
"""
if self._client is None:
self._client = protocol.SessionClient("agent", self._agent.sessionid)
return self._client
The implementation of a handler should use the ``self._io`` instance to execute io operations. This io objects
makes abstraction of local or remote operations. See :class:`~inmanta.agent.io.local.LocalIO` for the available
operations.

:param agent: The agent that is executing this handler.
:param io: The io object to use.
"""

def __init__(self, agent: "inmanta.agent.agent.AgentInstance", io: Optional["IOBase"] = None) -> None:
self._agent = agent

if io is None:
raise Exception("Unsupported: no resource mgmt in RH")
else:
self._io = io

self._client: Optional[protocol.SessionClient] = None
# explicit ioloop reference, as we don't want the ioloop for the current thread, but the one for the agent
self._ioloop = agent.process._io_loop

def set_cache(self, cache: AgentCache) -> None:
self.cache = cache

def can_reload(self) -> bool:
"""
Expand All @@ -499,23 +532,6 @@ def do_reload(self, ctx: HandlerContext, resource: resources.Resource) -> None:
:param resource: The resource to reload.
"""

def pre(self, ctx: HandlerContext, resource: resources.Resource) -> None:
"""
Method executed before a handler operation (Facts, dryrun, real deployment, ...) is executed. Override this method
to run before an operation.

:param ctx: Context object to report changes and logs to the agent and server.
:param resource: The resource to query facts for.
"""

def post(self, ctx: HandlerContext, resource: resources.Resource) -> None:
"""
Method executed after an operation. Override this method to run after an operation.

:param ctx: Context object to report changes and logs to the agent and server.
:param resource: The resource to query facts for.
"""

def close(self) -> None:
pass

Expand Down Expand Up @@ -582,7 +598,7 @@ def deploy(
requires: Dict[ResourceIdStr, ResourceState],
) -> None:
"""
This method is always be called by the agent, even when one of the requires of the given resource
This method is always called by the agent, even when one of the requires of the given resource
failed to deploy. The default implementation of this method will deploy the given resource when all its
requires were deployed successfully. Override this method if a different condition determines whether the
resource should deploy.
Expand Down Expand Up @@ -961,6 +977,109 @@ def execute(self, ctx: HandlerContext, resource: TPurgeableResource, dry_run: bo
super().execute(ctx, resource, dry_run)


@stable_api
class DiscoveryHandler(HandlerABC, Generic[DRT, URT]):
sanderr marked this conversation as resolved.
Show resolved Hide resolved
# The DiscoveryHandler is generic in both the handler's Discovery Resource type (DRT)
# and the Unmanaged Resource type (URT) it reports to the server. The second has to be serializable.

# This class deploys instances of DRT and reports URT to the server.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deploys instances of DRT
I'm not sure this is effectively the case with my implementation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, this is still missing. The main resource entry-point (at the moment, depends very slightly on the final outcome of the other PR) is deploy so that method should be implemented in this class to call discover_resources and report_discovered_resources.

sanderr marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, agent: "inmanta.agent.agent.AgentInstance") -> None:
self._agent = agent
self._ioloop = agent._io_loop
sanderr marked this conversation as resolved.
Show resolved Hide resolved

@abstractmethod
def discover_resources(self, ctx: HandlerContext, discovery_resource: DRT) -> abc.Mapping[ResourceIdStr, URT]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch that this should be a ResourceIdStr, looks like that was missing from the design.

raise NotImplementedError

def report_discovered_resources(self, ctx: HandlerContext, resource: DRT) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unsure of whether this logic should happen in a synchronous or asynchronous context.
The testcase currently uses the async version of this (async_report_discovered_resources) and passes.
I was having second thoughts about it and tried the synchronous version (report_discovered_resources) and it looks like it is hanging somewhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All agent ABC methods are synchronous so this should be synchronous as well. I would even go as far as to drop the async method. For testing purposes, using the sync client is more complicated because if both server and client run on the same thread, a synchronous call would hang the server so it can never respond. I'm pretty sure we have some fixtures to work with that, I'll add a suggestion to your test cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be deploy. Could optionally be split into discover -> serialize -> report, so deploy just has to chain them together. No strong preference on the last part, unless the serialization turns out to be very verbose.

""" """

def _call_discovered_resource_create_batch() -> typing.Awaitable[Result]:
return self.get_client().discovered_resource_create_batch(
tid=self._agent.environment, discovered_resources=discovered_resources
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a style preference but I would always pass discovered_resources as a parameter rather than counting on an implicit closure of something that's defined below it.

)

try:
self.pre(ctx, resource)
# report to the server
discovered_resources: List[DiscoveredResource] = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be typed as lowercase list or abc.Sequence. You're not modifying it so abc.Sequence presents the clearer picture.

DiscoveredResource(discovered_resource_id=resource_id, values=values)
for resource_id, values in self.discover_resources(ctx, resource).items()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

purely a matter of taste, but I would put the self.discover_resources(ctx, resource) on a line of its own, as it is the main thing here. It is kind of hidden now.

]
result = self.run_sync(_call_discovered_resource_create_batch)

if result.code != 200:
error_msg_from_server = f": {result.result['message']}" if "message" in result.result else ""
raise Exception(f"Failed to report discovered resources to the server{error_msg_from_server}")

except Exception as e:
ctx.set_status(const.ResourceState.failed)
ctx.exception(
(
"An error occurred during resource discovery of type %(urt)s, "
"triggered by %(resource_id)s (exception: %(exception)s"
),
urt=str(URT),
resource_id=resource.id,
exception=f"{e.__class__.__name__}('{e}')",
)
finally:
try:
self.post(ctx, resource)
except Exception as e:
ctx.exception(
(
"An error occurred after resource discovery of type %(urt)s, "
Hugo-Inmanta marked this conversation as resolved.
Show resolved Hide resolved
"triggered by %(resource_id)s (exception: %(exception)s"
),
urt=str(URT),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

URT is just a TypeVar, not the bound type. It is also just an intermediate representation type so I'm not sure if it is even relevant to the end user. Same for the other message

resource_id=resource.id,
exception=f"{e.__class__.__name__}('{e}')",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think repr() does pretty much this:

>>> e = Exception("hellow orld")
>>> str(e)
'hellow orld'
>>> repr(e)
"Exception('hellow orld')"

Same for other message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah turns out it is not the same regarding formatting:

e = Exception("An\nError\tMessage")

print(str(e))
print(repr(e))
print(f"{e.__class__.__name__}('{e}')")
An
Error	Message
Exception('An\nError\tMessage')
Exception('An
Error	Message')

And this test fails if we change it to repr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. To be honest I personally prefer the repr output but if we were already using the other approach (I hadn't realized when I commented) I guess we should stick with that.

)

async def async_report_discovered_resources(self, ctx: HandlerContext, resource: DRT) -> None:
try:
self.pre(ctx, resource)
# report to the server
discovered_resources: List[DiscoveredResource] = [
DiscoveredResource(discovered_resource_id=resource_id, values=values)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These types do not match, we should serialize here.

for resource_id, values in self.discover_resources(ctx, resource).items()
]
result = await self.get_client().discovered_resource_create_batch(
tid=self._agent.environment, discovered_resources=discovered_resources
)

if result.code != 200:
error_msg_from_server = f": {result.result['message']}" if "message" in result.result else ""
raise Exception(f"Failed to report discovered resources to the server{error_msg_from_server}")

except Exception as e:
ctx.set_status(const.ResourceState.failed)
ctx.exception(
(
"An error occurred during resource discovery of type %(urt)s, "
"triggered by %(resource_id)s (exception: %(exception)s"
),
urt=str(URT),
resource_id=resource.id,
exception=f"{e.__class__.__name__}('{e}')",
)
finally:
try:
self.post(ctx, resource)
except Exception as e:
ctx.exception(
(
"An error occurred after resource discovery of type %(urt)s, "
"triggered by %(resource_id)s (exception: %(exception)s"
),
urt=str(URT),
resource_id=resource.id,
exception=f"{e.__class__.__name__}('{e}')",
)


class Commander(object):
"""
This class handles commands
Expand Down
28 changes: 28 additions & 0 deletions src/inmanta/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,15 @@ class PurgeableResource(Resource):
purge_on_delete: bool


@stable_api
class DiscoveryResource(Resource):
"""
See :inmanta:entity:`std::DiscoveryResource` for more information.
"""

fields = ()


@stable_api
class ManagedResource(Resource):
"""
Expand Down Expand Up @@ -499,6 +508,13 @@ class Id(object):
"""

def __init__(self, entity_type: str, agent_name: str, attribute: str, attribute_value: str, version: int = 0) -> None:
"""
:attr entity_type: The resource type, as defined in the configuration model. For example :inmanta:entity:`std::File`.
:attr agent_name: The agent responsible for this resource.
:attr attribute: The key attribute that uniquely identifies this resource on the agent
:attr attribute_value: The corresponding value for this key attribute.
:attr version: The version number for this resource.
"""
self._entity_type = entity_type
self._agent_name = agent_name
self._attribute = attribute
Expand Down Expand Up @@ -553,6 +569,18 @@ def __eq__(self, other: object) -> bool:
return str(self) == str(other) and type(self) == type(other)

def resource_str(self) -> ResourceIdStr:
"""
String representation for this resource with the following format:

<type>[<agent>,<attribute>=<value>]

- type: The resource type, as defined in the configuration model. For example :inmanta:entity:`std::File`.
- agent: The agent responsible for this resource.
- attribute: The key attribute that uniquely identifies this resource on the agent
- value: The corresponding value for this key attribute.

:return: Returns a :py:class:`inmanta.data.model.ResourceIdStr`
"""
return cast(
ResourceIdStr,
"%(type)s[%(agent)s,%(attribute)s=%(value)s]"
Expand Down
Loading