Recording harp's input/output in binary from bonsai #12
Replies: 1 comment 2 replies
-
Hi, Since harp is a binary protocol, we save them using standard flat binary files. However, since each harp message needs to be individually parsed, it would be quite wasteful to save them all to the same binary file. Instead, since each address maintains the format of its message, we split each address' message on its own file (essentially demultiplexing the stream from each device on N different files, one per address and/or message type. Here's an example how to implement this logic: Workflow (Copy+Paste me)<?xml version="1.0" encoding="utf-8"?>
<WorkflowBuilder Version="2.7.1"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:io="clr-namespace:Bonsai.IO;assembly=Bonsai.System"
xmlns:sys="clr-namespace:System;assembly=mscorlib"
xmlns:rx="clr-namespace:Bonsai.Reactive;assembly=Bonsai.Core"
xmlns:harp="clr-namespace:Bonsai.Harp;assembly=Bonsai.Harp"
xmlns:dsp="clr-namespace:Bonsai.Dsp;assembly=Bonsai.Dsp"
xmlns="https://bonsai-rx.org/2018/workflow">
<Workflow>
<Nodes>
<Expression xsi:type="PropertySource" TypeArguments="io:CsvWriter,sys:String">
<MemberName>FileName</MemberName>
</Expression>
<Expression xsi:type="Combinator">
<Combinator xsi:type="rx:Take">
<rx:Count>1</rx:Count>
</Combinator>
</Expression>
<Expression xsi:type="rx:AsyncSubject">
<Name>RootPath</Name>
</Expression>
<Expression xsi:type="rx:BehaviorSubject" TypeArguments="harp:HarpMessage">
<rx:Name>Commands</rx:Name>
</Expression>
<Expression xsi:type="Combinator">
<Combinator xsi:type="harp:Device">
<harp:DeviceState>Active</harp:DeviceState>
<harp:DumpRegisters>true</harp:DumpRegisters>
<harp:LedState>On</harp:LedState>
<harp:VisualIndicators>On</harp:VisualIndicators>
<harp:Heartbeat>Disable</harp:Heartbeat>
<harp:IgnoreErrors>false</harp:IgnoreErrors>
<harp:PortName>COMx</harp:PortName>
</Combinator>
</Expression>
<Expression xsi:type="rx:PublishSubject">
<Name>Events</Name>
</Expression>
<Expression xsi:type="SubscribeSubject">
<Name>Events</Name>
</Expression>
<Expression xsi:type="GroupWorkflow">
<Name>LogHarpDemux</Name>
<Workflow>
<Nodes>
<Expression xsi:type="ExternalizedMapping">
<Property Name="LogName" />
</Expression>
<Expression xsi:type="WorkflowInput">
<Name>Source1</Name>
</Expression>
<Expression xsi:type="rx:GroupBy">
<rx:KeySelector>Address,MessageType</rx:KeySelector>
</Expression>
<Expression xsi:type="rx:SelectMany">
<Name>LogHarp</Name>
<Workflow>
<Nodes>
<Expression xsi:type="WorkflowInput">
<Name>Source1</Name>
</Expression>
<Expression xsi:type="rx:AsyncSubject">
<Name>Data</Name>
</Expression>
<Expression xsi:type="SubscribeSubject">
<Name>Data</Name>
</Expression>
<Expression xsi:type="MemberSelector">
<Selector>Key</Selector>
</Expression>
<Expression xsi:type="ExternalizedMapping">
<Property Name="Value" DisplayName="LogName" />
</Expression>
<Expression xsi:type="Combinator">
<Combinator xsi:type="StringProperty">
<Value>BehaviorEvents</Value>
</Combinator>
</Expression>
<Expression xsi:type="Combinator">
<Combinator xsi:type="rx:Take">
<rx:Count>1</rx:Count>
</Combinator>
</Expression>
<Expression xsi:type="SubscribeSubject">
<Name>RootPath</Name>
</Expression>
<Expression xsi:type="Combinator">
<Combinator xsi:type="rx:Zip" />
</Expression>
<Expression xsi:type="Format">
<Format>{0}/{1}/{2}_{3}.bin</Format>
<Selector>Item1,Item2,Item3.Item2,Item3.Item1</Selector>
</Expression>
<Expression xsi:type="rx:AsyncSubject">
<Name>LogName</Name>
</Expression>
<Expression xsi:type="SubscribeSubject">
<Name>Data</Name>
</Expression>
<Expression xsi:type="Combinator">
<Combinator xsi:type="rx:Merge" />
</Expression>
<Expression xsi:type="MemberSelector">
<Selector>MessageBytes</Selector>
</Expression>
<Expression xsi:type="SubscribeSubject">
<Name>LogName</Name>
</Expression>
<Expression xsi:type="PropertyMapping">
<PropertyMappings>
<Property Name="Path" Selector="it" />
</PropertyMappings>
</Expression>
<Expression xsi:type="Combinator">
<Combinator xsi:type="dsp:MatrixWriter">
<dsp:Suffix>None</dsp:Suffix>
<dsp:Overwrite>false</dsp:Overwrite>
<dsp:Layout>ColumnMajor</dsp:Layout>
</Combinator>
</Expression>
<Expression xsi:type="WorkflowOutput" />
</Nodes>
<Edges>
<Edge From="0" To="1" Label="Source1" />
<Edge From="2" To="3" Label="Source1" />
<Edge From="3" To="8" Label="Source3" />
<Edge From="4" To="5" Label="Source1" />
<Edge From="5" To="6" Label="Source1" />
<Edge From="6" To="8" Label="Source2" />
<Edge From="7" To="8" Label="Source1" />
<Edge From="8" To="9" Label="Source1" />
<Edge From="9" To="10" Label="Source1" />
<Edge From="11" To="12" Label="Source1" />
<Edge From="12" To="13" Label="Source1" />
<Edge From="13" To="16" Label="Source1" />
<Edge From="14" To="15" Label="Source1" />
<Edge From="15" To="16" Label="Source2" />
<Edge From="16" To="17" Label="Source1" />
</Edges>
</Workflow>
</Expression>
<Expression xsi:type="WorkflowOutput" />
</Nodes>
<Edges>
<Edge From="0" To="3" Label="Source2" />
<Edge From="1" To="2" Label="Source1" />
<Edge From="2" To="3" Label="Source1" />
<Edge From="3" To="4" Label="Source1" />
</Edges>
</Workflow>
</Expression>
</Nodes>
<Edges>
<Edge From="0" To="1" Label="Source1" />
<Edge From="1" To="2" Label="Source1" />
<Edge From="3" To="4" Label="Source1" />
<Edge From="4" To="5" Label="Source1" />
<Edge From="6" To="7" Label="Source1" />
</Edges>
</Workflow>
</WorkflowBuilder> Now, when it comes to loading it in python is quite easy. Basically, you just need to parse the first message in the file and use it to know how to parse all the others (since by definition the format of an address' message will be the same for a given device). In other words, you can see the first message in the flat binary as a sort of "header". Here's a possible implementation that we have been using (Thanks @glopesdev :P): import numpy as np
import pandas as pd
_SECONDS_PER_TICK = 32e-6
_payloadtypes = {
1 : np.dtype(np.uint8),
2 : np.dtype(np.uint16),
4 : np.dtype(np.uint32),
8 : np.dtype(np.uint64),
129 : np.dtype(np.int8),
130 : np.dtype(np.int16),
132 : np.dtype(np.int32),
136 : np.dtype(np.int64),
68 : np.dtype(np.float32)
}
def read_harp_bin(file):
data = np.fromfile(file, dtype=np.uint8)
if len(data) == 0:
return None
stride = data[1] + 2
length = len(data) // stride
payloadsize = stride - 12
payloadtype = _payloadtypes[data[4] & ~0x10]
elementsize = payloadtype.itemsize
payloadshape = (length, payloadsize // elementsize)
seconds = np.ndarray(length, dtype=np.uint32, buffer=data, offset=5, strides=stride)
ticks = np.ndarray(length, dtype=np.uint16, buffer=data, offset=9, strides=stride)
seconds = ticks * _SECONDS_PER_TICK + seconds
payload = np.ndarray(
payloadshape,
dtype=payloadtype,
buffer=data, offset=11,
strides=(stride, elementsize))
if payload.shape[1] == 1:
ret_pd = pd.DataFrame(payload, index=seconds, columns= ["Value"])
ret_pd.index.names = ['Seconds']
else:
ret_pd = pd.DataFrame(payload, index=seconds)
ret_pd.index.names = ['Seconds']
return ret_pd In the near future, we will release ways to have this parsing be even more seamless and user-friendly, so stay tuned, there are a lot of new exciting features coming to the harp/bonsai ecosystem! Hope it helps, |
Beta Was this translation helpful? Give feedback.
-
Hello,
We have a bonsai/harp task that have different inputs and outputs (analog rotary encoder, valve, servomotor). We're trying to write the corresponding data (event or value with corresponding timestamps) with as less computational cost as possible, to process them offline on python. Is there a specific way to do so using certain harp nodes ? What's more coherent with bonsai's logic, to record them in one or several specific binary files ? What's the typical pipeline to get and use harp's timestamps ? Is there a built in way to convert timestamp objects in binary ? How can we control sampling rate ? In our case we only need 1 ms.
Thank you very much
Beta Was this translation helpful? Give feedback.
All reactions