Skip to content

Commit

Permalink
WIP tagged union message type API
Browse files Browse the repository at this point in the history
  • Loading branch information
goodboy committed May 15, 2023
1 parent e5ee2e3 commit ec22646
Showing 1 changed file with 72 additions and 1 deletion.
73 changes: 72 additions & 1 deletion tractor/msg.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,19 @@
# - https://jcristharif.com/msgspec/api.html#struct
# - https://jcristharif.com/msgspec/extending.html
# via ``msgpack-python``:
# - https://github.com/msgpack/msgpack-python#packingunpacking-of-custom-data-type
# https://github.com/msgpack/msgpack-python#packingunpacking-of-custom-data-type

from __future__ import annotations
from contextlib import contextmanager as cm
from pkgutil import resolve_name
from typing import Union, Any


from msgspec import Struct
from msgspec.msgpack import (
Encoder,
Decoder,
)


class NamespacePath(str):
Expand Down Expand Up @@ -78,3 +87,65 @@ def from_ref(
(ref.__module__,
getattr(ref, '__name__', ''))
))


# LIFO codec stack that is appended when the user opens the
# ``configure_native_msgs()`` cm below to configure a new codec set
# which will be applied to all new (msgspec relevant) IPC transports
# that are spawned **after** the configure call is made.
_lifo_codecs: list[
tuple[
Encoder,
Decoder,
],
] = [(Encoder(), Decoder())]


def get_msg_codecs() -> tuple[
Encoder,
Decoder,
]:
'''
Return the currently configured ``msgspec`` codec set.
The defaults are defined above.
'''
global _lifo_codecs
return _lifo_codecs[-1]


@cm
def configure_native_msgs(
tagged_structs: list[Struct],
):
'''
Push a codec set that will natively decode
tagged structs provied in ``tagged_structs``
in all IPC transports and pop the codec on exit.
'''
global _lifo_codecs

# See "tagged unions" docs:
# https://jcristharif.com/msgspec/structs.html#tagged-unions

# "The quickest way to enable tagged unions is to set tag=True when
# defining every struct type in the union. In this case tag_field
# defaults to "type", and tag defaults to the struct class name
# (e.g. "Get")."
enc = Encoder()

types_union = Union[tagged_structs[0]] | Any
for struct in tagged_structs[1:]:
types_union |= struct

dec = Decoder(types_union)

_lifo_codecs.append((enc, dec))
try:
print("YOYOYOOYOYOYOY")
yield enc, dec
finally:
print("NONONONONON")
_lifo_codecs.pop()

0 comments on commit ec22646

Please sign in to comment.