-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ems to bidir streaming #190
Conversation
This avoids somewhat convoluted "hackery" making 2 one-way streams between the order client and the EMS and instead uses the new bi-directional streaming and context API from `tractor`. Add a router type to the EMS that gets setup by the initial service tree and which we'll eventually use to work toward multi-provider executions and order-trigger monitoring. Move to py3.9 style where possible throughout.
e4f25f1
to
f4c9e20
Compare
There rebased onto master so we can do the status bar stuff independently. |
This makes the paper engine look IPC-wise exactly like any broker-provider backend module and uses the new ``trades_dialogue()`` 2-way streaming endpoint for commanding order requests. This serves as a first step toward truly distributed forward testing since the paper engine can now be run out-of tree from `pikerd` if needed thus demonstrating how real-time clearing signals can be shared between fully distinct services.
This moves the entire clearing system to use typed messages using `pydantic.BaseModel` such that the streamed request-response order submission protocols can be explicitly viewed in terms of message schema, flow, and sequencing. Using the explicit message formats we can now dig into simplifying and normalizing across broker provider apis to get the best uniformity and simplicity. The order submission sequence is now fully async: an order request is expected to be explicitly acked with a new message and if cancellation is requested by the client before the ack arrives, the cancel message is stashed and then later sent immediately on receipt of the order submission's ack from the backend broker. Backend brokers are now controlled using a 2-way request-response streaming dialogue which is fully api agnostic of the clearing system's core processing; This leverages the new bi-directional streaming apis from `tractor`. The clearing core (emsd) was also simplified by moving the paper engine to it's own sub-actor and making it api-symmetric with expected `brokerd` endpoints. A couple of the ems status messages were changed/added: 'dark_executed' -> 'dark_triggered' added 'alert_triggered' More cleaning of old code to come!
its context. | ||
|
||
''' | ||
service_name = f'paperboi.{broker}' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its all about that paper $$ boy!
send_on_connect={'local_trades': 'start'} | ||
) | ||
async def stream_trades( | ||
async def handle_order_requests( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So basically broker integration to the paper engine is setting up two stream consumers, one for paper orders placed on piker
's UI, and another one for the broker order flow.
Really good!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not the paper engine per say, but the clearing engine (aka emsd
service), yes.
There's a single endpoint trades_dialogue()
which is passed a tractor.Context
, this endpoint then opens a bidir tractor
stream and starts handling inbound order requests, which this handle_order_requests()
task uses to make calls to place orders on the backend. The main entrypoint, trades_dialogue()
(or any other task you'd like) must ship back trade status messages based on updates from the target broker.
# broker specific request id | ||
reqid=reqid, | ||
time_ns=time.time_ns(), | ||
).dict() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we get some better integration for msgpack-python
and pydantic.BaseModel
(and/or msgspec.Struct
) with tractor
, we can just send the objects directly instead of coercing to dicts.
piker/brokers/ib.py
Outdated
|
||
# don't forward, it's pointless.. | ||
continue | ||
details['broker_time'] = execu.time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably do do these in the static declaration above, and maybe further do it all in one line below in the message creation line?
if err['reqid'] == -1: | ||
log.error(f'TWS external order error:\n{pformat(err)}') | ||
|
||
# don't forward for now, it's unecessary.. but if we wanted to, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do need to relay this in some cases where ib isn't sending a "cancelled" status when there's an order submission failure... it's like weird cases though that i haven't yet categorized.
piker/clearing/_ems.py
Outdated
ctx: tractor.Context, | ||
client_actor_name: str, | ||
# client_actor_name: str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop old commented arg.
broker: str, | ||
symbol: str, | ||
_mode: str = 'dark', # ('paper', 'dark', 'live') | ||
_exec_mode: str = 'dark', # ('paper', 'dark', 'live') | ||
loglevel: str = 'info', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe flip this to the newer runtime
?
|
||
""" | ||
from ._client import send_order_cmds | ||
ems_ctx = ctx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be nice if in tractor
we can require either a ctx
arg, or a named arg with ctx
in it and a type annotation of tractor.Context
instead of strictly requiring a ctx
arg.
book = _router.get_dark_book(broker) | ||
book.lasts[(broker, symbol)] = first_quote[symbol]['last'] | ||
|
||
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eventually we'll want to support independent feeds from data vs. broker providers.
this will probably be something to dig into as we get aggregate feeds going 😎
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dang, quite the load of codez to review 😂
Ports clearing systems to goodboy/tractor#209 apis.
tractor.Context.send_yield()
calls as well as look at whether we can do bidir streaming with backends for the execution/fills event processing since it might turn out cleaner.brokerd
order dialoguespaperboi