Skip to content

Commit

Permalink
Setup Tracksight for reading from local log files (#1292)
Browse files Browse the repository at this point in the history
### Changelist 
<!-- Give a list of the changes covered in this PR. This will help both
you and the reviewer keep this PR within scope. -->

Added the infrastructure so we can read logged data in the form of CSV
files using Tracksight.

### Testing Done
<!-- Outline the testing that was done to demonstrate the changes are
solid. This could be unit tests, integration tests, testing on the car,
etc. Include relevant code snippets, screenshots, etc as needed. -->

Works on my laptop :)
  • Loading branch information
gtaharaedmonds authored Jun 3, 2024
1 parent 1dce97a commit aedcaf1
Show file tree
Hide file tree
Showing 26 changed files with 478 additions and 574 deletions.
39 changes: 36 additions & 3 deletions software/tracksight/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
# Tracksight

## Backend
The backend is brought up by running the pipenv shell after it is installed. Then enter /software/tracksight/backend/app and run python/telemetry.py
<!--
This is outdated, we've moved to Docker now. -Gus
The backend is brought up by running the pipenv shell after it is installed. Then enter /software/tracksight/backend/app and run python/telemetry.py
-->

Tracksight can be run in one of two ways: Either it pulls data from a local CSV data file, or it receives data wirelessly from the car in real time.
The backend is a Flask app that pipes data to the frontend in either case.

## Frontend

Expand All @@ -28,10 +34,37 @@ To learn more about Next.js, take a look at the following resources:

You can check out [the Next.js GitHub repository](https://github.com/vercel/next.js/) - your feedback and contributions are welcome!

<!--
I think this is outdated but I didn't want to delete. -Gus
### Deploy on Vercel
The easiest way to deploy your Next.js app is to use the [Vercel Platform](https://vercel.com/new?utm_medium=default-template&filter=next.js&utm_source=create-next-app&utm_campaign=create-next-app-readme) from the creators of Next.js.
Check out our [Next.js deployment documentation](https://nextjs.org/docs/deployment) for more details.
Check out our [Next.js deployment documentation](https://nextjs.org/docs/deployment) for more details. -->

## Running with Docker

### Running Locally

To start the local stack, navigate to `software/tracksight` and run `./run_local.sh`. This is used for viewing log files, such as from the onboard SD card.

This starts the frontend, backend, and InfluxDB database. These are all available at [http://localhost:3000](http://localhost:3000), [http://localhost:5000](http://localhost:5000), and [http://localhost:8086](http://localhost:8086), respectively.

The data source for running locally is from CSV data files. These are essentially a time-series list of the signals sent on the CAN bus, with
their timestamps, names, values, and units. In these CSV files, the required columns are "time" (timestamp that the signal was sent), "signal" (signal name), "value", and "unit" (physical unit that the value is in, leave blank if not applicable).

To specify a data file to read, pass it as the first positional argument to `run_local.sh`. You can upload multiple by providing a comma-seperated list instead. This will upload all of the data in your provided files to the local InfluxDB database. IMPORTANT: Your log file must be in the `software/tracksight/backend/data` directory. If it is not, it will not be uploaded. Provide the path relative to this folder. (Yes I know this is pretty silly, but comp is in 7 days)

Note that if you stop the compose stack, and restart it again, your data will remain since the data is stored on a Docker volume which isn't wiped if the compose stack is brought down. This is useful if you want to upload more data files, and keep your old ones. To clear the local database, pass the `-c` or `--clean` flag to `run_local.sh`.

Example:

```
# sine_wave_data.csv is located in software/tracksight/backend/data
./run_local.sh sine_wave_data.csv
```

### Running Wireless Telemetry

## Docker Invocation
TODO
47 changes: 0 additions & 47 deletions software/tracksight/SocketTest.py

This file was deleted.

5 changes: 2 additions & 3 deletions software/tracksight/backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ WORKDIR /backend
COPY environment/requirements.txt .
RUN pip3 install -r requirements.txt

COPY software/tracksight/backend/ .

EXPOSE 5000
CMD ["python", "app/telemetry.py"]

COPY software/tracksight/backend/app ./app
Original file line number Diff line number Diff line change
Expand Up @@ -3,74 +3,80 @@
"""

from flask import Blueprint, request

from .. import influx_handler as influx
import influx_handler
from typing import Tuple, Dict, List

# HTTP processes for data that is not live
app = Blueprint("http_app", __name__)


@app.route("/")
def hello_world():
def hello_world() -> str:
"""
:returns Hello world page for backend.
"""
return "Telemetry Backend is running!"


@app.route("/health")
def health():
def health() -> Tuple[Dict, int]:
"""
:returns Health check page for backend.
"""
return {"status": "healthy"}, 200


@app.route("/signal/measurements", methods=["GET"])
def return_all_measurements():
@app.route("/data/measurements", methods=["GET"])
def return_all_measurements() -> Tuple[List[str], int]:
"""
:returns Page displaying all measurements in the database.
"""
return influx.get_measurements(), 200
return influx_handler.get_measurements(), 200


@app.route("/signal/measurement/<string:measurement>/fields", methods=["GET"])
def return_all_fields_for_measurement(measurement: str):
@app.route("/data/measurement/<string:measurement>/signals", methods=["GET"])
def return_signals_for_measurement(measurement: str) -> Tuple[List[str], int]:
"""
:param measurement: Measurement to fetch fields for.
:returns Page displaying all fields for a specific measurement.
"""
return influx.get_fields(measurement), 200
return influx_handler.get_signals(measurement=measurement), 200


@app.route("/signal/query", methods=["GET"])
def return_query():
@app.route("/data/query", methods=["GET"])
def return_query() -> Dict[str, Dict]:
"""
:returns Page displaying the result of a single query.
"""
params = request.args
measurement = params.get("measurement")
fields: list[str] | None = params.get("fields").split(",")
signals: list[str] | None = params.get("signals")
start_epoch = params.get("start_epoch")
end_epoch = params.get("end_epoch")

if (
measurement is None
or fields is None
or signals is None
or start_epoch is None
or end_epoch is None
):
missing_keys = [
k
for k, v in [
("measurement", measurement),
("fields", fields),
("signals", signals),
("start_epoch", start_epoch),
("end_epoch", end_epoch),
]
if v is None
]
return {"error": f"Missing parameters: {missing_keys}"}, 400
try:
return influx.query(measurement, fields, (start_epoch, end_epoch))
signals = signals.split(",")
return influx_handler.query(
measurement=measurement,
signals=signals,
time_range=(start_epoch, end_epoch),
)
except Exception as e:
return {"error": str(e)}, 500
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
"""

import logging

import flask_socketio
from flask import request

logger = logging.getLogger("telemetry_logger")

from .. import signal_util as signal_util
import signal_util

logger = logging.getLogger("telemetry_logger")

# SocketIO processes for live data
socketio = flask_socketio.SocketIO(cors_allowed_origins="*")
Expand Down
147 changes: 147 additions & 0 deletions software/tracksight/backend/app/influx_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""
Influx database handler class.
File for handling influxdb queries.
This requires the influx dbrc mapping to have db name == bucket name
TODO: Implement proper error handling for things like no data available.
"""

import os
import pandas as pd
from typing import List, Tuple
import influxdb_client
import logging


logger = logging.getLogger("telemetry_logger")


REQUIRED_ENV_VARS = {
"org": "DOCKER_INFLUXDB_INIT_ORG",
"bucket": "DOCKER_INFLUXDB_INIT_BUCKET",
"token": "DOCKER_INFLUXDB_INIT_ADMIN_TOKEN",
}
for env_var in REQUIRED_ENV_VARS.values():
if os.environ.get(env_var) is None:
raise RuntimeError(f"Required environment variable not set: {env_var}")

# Configs for Influx DB instance.
INFLUX_DB_URL = "http://influx:8086"
INFLUX_DB_ORG = os.environ.get(REQUIRED_ENV_VARS["org"])
INFLUX_DB_BUCKET = os.environ.get(REQUIRED_ENV_VARS["bucket"])
INFLUX_DB_TOKEN = os.environ.get(REQUIRED_ENV_VARS["token"])

print(f"Using URL {INFLUX_DB_URL} with token {INFLUX_DB_TOKEN}.")

# Checks if the vehicle bucket exists, and if not, creates it
with influxdb_client.InfluxDBClient(
url=INFLUX_DB_URL, token=INFLUX_DB_TOKEN, org=INFLUX_DB_ORG
) as client:
if client.buckets_api().find_bucket_by_name(INFLUX_DB_BUCKET) is None:
client.buckets_api().create_bucket(bucket_name=INFLUX_DB_BUCKET)


def get_measurements(bucket=INFLUX_DB_BUCKET) -> list[str]:
"""
Get all measurements from the database.
:param bucket: Name of bucket to fetch data from.
:returns List of all measurements.
"""
query = f"""
import "influxdata/influxdb/schema"
schema.measurements(bucket: \"{bucket}\")"""
with influxdb_client.InfluxDBClient(
url=INFLUX_DB_URL, token=INFLUX_DB_TOKEN, org=INFLUX_DB_ORG
) as client:
return [
str(i[0])
for i in client.query_api().query(query).to_values(columns=["_value"])
]


def get_signals(measurement: str = None, bucket: str = INFLUX_DB_BUCKET) -> list[str]:
"""
Get all signals from the database.
:param bucket: Name of bucket to fetch data from.
:returns List of all measurements.
"""
query = f"""
import "influxdata/influxdb/schema"
schema.tagValues(
bucket: "{bucket}",
predicate: (r) => r._measurement == "{measurement}",
tag: "signal"
)"""

with influxdb_client.InfluxDBClient(
url=INFLUX_DB_URL, token=INFLUX_DB_TOKEN, org=INFLUX_DB_ORG
) as client:
return [
str(i[0])
for i in client.query_api().query(query=query).to_values(columns=["_value"])
]


def query(
measurement: str,
signals: List[str],
time_range: Tuple[str, str],
bucket: str = INFLUX_DB_BUCKET,
max_points: int = 8000, # TODO implement
ms_resolution: int = 100, # TODO implement
) -> dict[str, dict]:
"""
Make a general query to the database.
:param measurement: Measurement to pull data from.
:param fields: Fields to fetch.
:param time_range: Tuple like (time start, time end) to specify the time interval.
:param bucket: Name of bucket to fetch data from.
:param max_points: Maximum number of datapoints to fetch.
:param ms_resolution: Minimum time delta required before grabbing a new datapoint.
:return: A dictionary where the keys are the fields and the values are TimeValue objects.
"""

query = f"""
from(bucket:"{bucket}")
|> range(start: {time_range[0]}, stop: {time_range[1]})
|> filter(fn: (r) =>
r._measurement == "{measurement}" and
r._field == "value" and
contains(value: r.signal, set: {str(signals).replace("'", '"')}))
"""

query_result = {signal: {"times": [], "values": []} for signal in signals}
with influxdb_client.InfluxDBClient(
url=INFLUX_DB_URL, token=INFLUX_DB_TOKEN, org=INFLUX_DB_ORG
) as client:
for signal, value, time in (
client.query_api()
.query(query=query)
.to_values(columns=["signal", "_value", "_time"])
):
query_result[signal]["times"].append(time)
query_result[signal]["values"].append(value)

return query_result


def write(df: pd.DataFrame, measurement: str) -> None:
"""
Write a pandas dataframe to the Influx database. The dataframe should have the columns
time, value, unit, and signal.
:param db: Dataframe to upload.
"""
with influxdb_client.InfluxDBClient(
url=INFLUX_DB_URL, token=INFLUX_DB_TOKEN, org=INFLUX_DB_ORG
) as client:
# Index is used as source for time.
df.set_index("time", inplace=True)

write_api = client.write_api()
write_api.write(
bucket=INFLUX_DB_BUCKET,
org=INFLUX_DB_ORG,
record=df,
data_frame_measurement_name=measurement,
data_frame_tag_columns=["signal"],
)
5 changes: 0 additions & 5 deletions software/tracksight/backend/app/process/__init__.py

This file was deleted.

3 changes: 0 additions & 3 deletions software/tracksight/backend/app/process/definitions.py

This file was deleted.

Loading

0 comments on commit aedcaf1

Please sign in to comment.