Skip to content

Commit

Permalink
Update README, improve preprocessing
Browse files Browse the repository at this point in the history
  • Loading branch information
jsmailes committed Nov 8, 2023
1 parent e464b12 commit 5a6725f
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 92 deletions.
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ RUN pip3 install pandas keras h5py zmq
RUN pip3 install tqdm
RUN pip3 install tensorflow-datasets

RUN pip install tensorflow-addons==0.13.0
RUN pip install tensorflow-addons

RUN pip install scipy
RUN pip install scikit-learn
RUN pip install notebook

RUN pip install seaborn

Expand Down
88 changes: 80 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,44 @@ tensorflow-datasets
tensorflow-addons==0.13.0
scipy
seaborn
scikit-learn
notebook
```

A GPU is recommended (with all necessary drivers installed), and a moderate amount of RAM will be required to run the data preproccessing and model training.


### Downloading Data

The full dataset is stored on Zenodo at the following URL: https://zenodo.org/record/8220494

These can be downloaded from the site directly, but the following script may be preferable due to the large file size:
```bash
#!/bin/bash

for i in $(seq -w 0 5 165); do
printf -v j "%03d" $((${i#0} + 4))
wget https://zenodo.org/records/8220494/files/data_${i}_${j}.tar.gz
done
```

> [!WARNING]
> These files are very large (4.0GB each, 135.4GB total).
> Ensure you have enough disk space before downloading.
To extract the files:
```bash
#!/bin/bash

for i in $(seq -w 0 5 165); do
printf -v j "%03d" $((${i#0} + 4))
tar xzf data_${i}_${j}.tar.gz
done
```

See the instructions below on processing the resulting files for use.


## Usage

### TensorFlow Container
Expand Down Expand Up @@ -101,8 +134,14 @@ Change the `docker-compose.yml` to ensure the device is mounted in the container
The scripts in the `preprocessing` directory process the database file(s) into NumPy files, and then TFRecord datasets.
It is recommended to run these scripts from within the TensorFlow container described above.

Please note that these scripts load the full datasets into memory, and will consume large amounts of RAM.
It is recommended that you run them on a machine with at least 128GB of RAM.
> [!NOTE]
> Converting databases to NumPy files and filtering is only necessary if you are doing your own data collection.
> If the provided dataset on Zenodo is used, only the `np-to-tfrecord.py` script is needed.
> [!IMPORTANT]
> Please note that these scripts load the full datasets into memory, and will consume large amounts of RAM.
> It is recommended that you run them on a machine with at least 128GB of RAM.

#### db-to-np-multiple.py

Expand All @@ -116,6 +155,7 @@ python3 db-to-np-multiple.py

The resulting files will be placed in `code/processed` (ensure this directory already exists).


#### np-filter.py

This script normalizes the IQ samples, and filters out unusable data.
Expand All @@ -128,20 +168,52 @@ python3 np-filter.py

The resulting files will be placed in `code/filtered` (ensure this directory already exists).


#### np-to-tfrecord.py

This script converts NumPy files into the TFRecord format, for use in model training.
To run, `path_base` and `suffixes` are once again set as above.
The `chunk_size` `shuffle`, `by_id`, and `id_counts` options may also be set to adjust how the dataset is generated -- the default options should be fine, unless alternative datasets (e.g. with transmitters removed) are required.
To run this script, ensure your data has been processed into NumPy files with the following format:
- `samples_<suffix>.npy`
- `ra_sat_<suffix>.npy`
- `ra_cell_<suffix>.npy`

The script runs with no arguments:
> [!NOTE]
> The `db-to-np-multiple.py` script will produce files in this format.
> The dataset available from Zenodo is also in this format.
The script can be used as follows:
```bash
python3 np-filter.py
python3 np-to-tfrecord.py --path-in <INPUT PATH> --path-out <OUTPUT PATH>
```

The resulting files will be placed in `code/tfrecord` (ensure this directory already exists).
There are also the following optional parameters:
- `--chunk-size <CHUNK SIZE>`: number of records in each chunk. Default is 50000, set to a smaller value for smaller files.
- `-v`, `--verbose`: display progress.
- `--max-files <MAX FILES>`: stop after processing the specified number of input files.
- `--skip-files <SKIP FILES>`: skip a specified number of input files.
- `--no-shuffle`: do not shuffle the data.
- `--by-id`: see below.

The `by_id` option creates 9 datasets.
The first of these contains only the most common 10% of transmitter IDs.
The second contains 20%, and so on.
Be careful using this option, as it creates a much larger number of files, and takes significantly longer to run.

> [!WARNING]
> This script in particular will use a large amount of RAM, since it loads the entire dataset into memory at once.
> Processing may be done in batches by using the `--max-files` and `--skip-files` command-line arguments.

#### sqlite3-compress.py

This script converts database files directly into the NumPy arrays in the same format as provided in the Zenodo dataset.
This includes all columns provided by the data collection pipeline.

The script can be used as follows:
```bash
python3 sqlite3-compress.py <INPUT PATH> <OUTPUT PATH>
```

Please note that this script in particular will use a large amount of RAM.

#### Noise

Expand Down
6 changes: 3 additions & 3 deletions preprocessing/db-to-np-multiple.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
file_db = f"db-{db_index}.sqlite3"

out_dir = os.path.join(path_base, "processed")
file_samples = f"samples-{db_index}.npy"
file_ids = f"ids-{db_index}.npy"
file_cells = f"cells-{db_index}.npy"
file_samples = f"samples_{db_index}.npy"
file_ids = f"ra_sat_{db_index}.npy"
file_cells = f"ra_cell_{db_index}.npy"

db = Database(os.path.join(path_base, file_db), num_samples)

Expand Down
14 changes: 7 additions & 7 deletions preprocessing/noise/db-to-np-multiple.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@

file_db = f"db-{db_index}.sqlite3"

file_samples = f"samples-{db_index}.npy"
file_ids = f"ids-{db_index}.npy"
file_cells = f"cells-{db_index}.npy"
file_magnitudes = f"magnitudes-{db_index}.npy"
file_noises = f"noises-{db_index}.npy"
file_levels = f"levels-{db_index}.npy"
file_confidences = f"confidences-{db_index}.npy"
file_samples = f"samples_{db_index}.npy"
file_ids = f"ra_sat_{db_index}.npy"
file_cells = f"ra_cell_{db_index}.npy"
file_magnitudes = f"magnitudes_{db_index}.npy"
file_noises = f"noises_{db_index}.npy"
file_levels = f"levels_{db_index}.npy"
file_confidences = f"confidences_{db_index}.npy"

db = Database(os.path.join(path_base, file_db), num_samples)

Expand Down
28 changes: 14 additions & 14 deletions preprocessing/noise/np-filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
suffixes = ["a", "b", "c"]

def save_dataset(path, suffix, samples_array, ids_array, cells_array, magnitudes_array, noises_array, levels_array, confidences_array):
file_samples = os.path.join(path, "samples-{}.npy".format(suffix))
file_ids = os.path.join(path, "ids-{}.npy".format(suffix))
file_cells = os.path.join(path, "cells-{}.npy".format(suffix))
file_magnitudes = os.path.join(path, "magnitudes-{}.npy".format(suffix))
file_noises = os.path.join(path, "noises-{}.npy".format(suffix))
file_levels = os.path.join(path, "levels-{}.npy".format(suffix))
file_confidences = os.path.join(path, "confidences-{}.npy".format(suffix))
file_samples = os.path.join(path, "samples_{}.npy".format(suffix))
file_ids = os.path.join(path, "ra_sat_{}.npy".format(suffix))
file_cells = os.path.join(path, "ra_cell_{}.npy".format(suffix))
file_magnitudes = os.path.join(path, "magnitudes_{}.npy".format(suffix))
file_noises = os.path.join(path, "noises_{}.npy".format(suffix))
file_levels = os.path.join(path, "levels_{}.npy".format(suffix))
file_confidences = os.path.join(path, "confidences_{}.npy".format(suffix))

np.save(file_samples, samples_array)
np.save(file_ids, ids_array)
Expand All @@ -31,13 +31,13 @@ def save_dataset(path, suffix, samples_array, ids_array, cells_array, magnitudes
def process(path_in, path_out, suffix):
print("Processing dataset {}".format(suffix))

file_samples = os.path.join(path_in, "samples-{}.npy".format(suffix))
file_ids = os.path.join(path_in, "ids-{}.npy".format(suffix))
file_cells = os.path.join(path_in, "cells-{}.npy".format(suffix))
file_magnitudes = os.path.join(path_in, "magnitudes-{}.npy".format(suffix))
file_noises = os.path.join(path_in, "noises-{}.npy".format(suffix))
file_levels = os.path.join(path_in, "levels-{}.npy".format(suffix))
file_confidences = os.path.join(path_in, "confidences-{}.npy".format(suffix))
file_samples = os.path.join(path_in, "samples_{}.npy".format(suffix))
file_ids = os.path.join(path_in, "ra_sat_{}.npy".format(suffix))
file_cells = os.path.join(path_in, "ra_cell_{}.npy".format(suffix))
file_magnitudes = os.path.join(path_in, "magnitudes_{}.npy".format(suffix))
file_noises = os.path.join(path_in, "noises_{}.npy".format(suffix))
file_levels = os.path.join(path_in, "levels_{}.npy".format(suffix))
file_confidences = os.path.join(path_in, "confidences_{}.npy".format(suffix))

print("Loading ArrayDataset")
ds = NoiseArrayDataset.from_files(
Expand Down
75 changes: 45 additions & 30 deletions preprocessing/noise/np-to-tfrecord.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,20 @@
import os
import tensorflow as tf

path_base = "/data"
path_in = os.path.join(path_base, "filtered")
path_out = os.path.join(path_base, "tfrecord-magnitude")

suffixes = ["a", "b", "c"]

chunk_size = 50000
shuffle = True
import argparse

# Percentages to keep
magnitude_percentages = [
10,
20,
30,
40,
50,
60,
70,
80,
90
]
magnitude_percentages = list(range(10, 100, 10))

# Get a unique ID for the given id/cell pair
def get_id_cell(sat_id, sat_cell, num_cells=63):
return (sat_id * num_cells) + sat_cell

def load_dataset(path, suffix):
file_samples = os.path.join(path, "samples-{}.npy".format(suffix))
file_ids = os.path.join(path, "ids-{}.npy".format(suffix))
file_cells = os.path.join(path, "cells-{}.npy".format(suffix))
file_magnitudes = os.path.join(path, "magnitudes-{}.npy".format(suffix))
file_samples = os.path.join(path, "samples_{}.npy".format(suffix))
file_ids = os.path.join(path, "ra_sat_{}.npy".format(suffix))
file_cells = os.path.join(path, "ra_cell_{}.npy".format(suffix))
file_magnitudes = os.path.join(path, "magnitudes_{}.npy".format(suffix))

samples_array = np.load(file_samples)
ids_array = np.load(file_ids)
Expand Down Expand Up @@ -64,6 +47,10 @@ def save_dataset(path, suffix, samples_array, ids_array, cells_array):
def save_dataset_batches(path, chunk_size, samples_array, ids_array, cells_array, verbose):
chunk_count = 0

# Create directory if it doesn't exist
if not os.path.exists(path):
os.makedirs(path)

while samples_array.shape[0] >= chunk_size:
if verbose:
print(f"Saving chunk {chunk_count}...")
Expand All @@ -81,20 +68,31 @@ def save_dataset_batches(path, chunk_size, samples_array, ids_array, cells_array
save_dataset(path, str(chunk_count), s, i, c)
chunk_count += 1

if verbose:
print(f"Saving chunk {chunk_count}...")
print(f"Samples remaining: {samples_array.shape[0]}")
save_dataset(path, str(chunk_count), samples_array, ids_array, cells_array)
chunk_count += 1
if samples_array.shape[0] > 0:
if verbose:
print(f"Saving chunk {chunk_count}...")
print(f"Samples remaining: {samples_array.shape[0]}")
save_dataset(path, str(chunk_count), samples_array, ids_array, cells_array)
chunk_count += 1

def process_all(chunk_size=50000, verbose=False):
def process_all(chunk_size, path_in, path_out, max_files=None, skip_files=0, verbose=False, shuffle=True):
samples_array = None
ids_array = None
cells_array = None
magnitudes_array = None

message_count = 0

# Check path_in for files of the form samples_{suffix}.npy
suffixes = [ f for f in os.listdir(path_in) if f.startswith("samples_") and f.endswith(".npy") ]
suffixes.sort()
suffixes = [ f[8:-4] for f in suffixes ]
suffixes = suffixes[skip_files:]
if max_files is not None:
suffixes = suffixes[:max_files]

if verbose:
print("Loading data...")
for suffix in tqdm(suffixes, disable=not verbose):
s, i, c, m = load_dataset(path_in, suffix)
message_count += s.shape[0]
Expand Down Expand Up @@ -164,4 +162,21 @@ def process_all(chunk_size=50000, verbose=False):
print(f"Done")

if __name__ == "__main__":
process_all(chunk_size=chunk_size, verbose=True)
path_base = "/data"
path_in = path_base
path_out = os.path.join(path_base, "tfrecord")

parser = argparse.ArgumentParser(description="Process NumPy files into TFRecord datasets.")
parser.add_argument("--chunk-size", type=int, default=50000, help="Number of records in each chunk.")
parser.add_argument("--path-in", type=str, default=path_in, help="Input directory.")
parser.add_argument("--path-out", type=str, default=path_out, help="Output directory.")
parser.add_argument("--max-files", type=int, default=None, help="Maximum number of input files to process.")
parser.add_argument("--skip-files", type=int, default=0, help="Number of input files to skip.")
parser.add_argument("--no-shuffle", action='store_true', help="Do not shuffle data.")
parser.add_argument("-v", "--verbose", action='store_true', help="Display progress.")
args = parser.parse_args()

shuffle = not args.no_shuffle

process_all(args.chunk_size, args.path_in, args.path_out, args.max_files, args.skip_files, verbose=args.verbose, shuffle=shuffle)

12 changes: 6 additions & 6 deletions preprocessing/np-filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@
suffixes = ["a", "b", "c"]

def save_dataset(path, suffix, samples_array, ids_array, cells_array):
file_samples = os.path.join(path, "samples-{}.npy".format(suffix))
file_ids = os.path.join(path, "ids-{}.npy".format(suffix))
file_cells = os.path.join(path, "cells-{}.npy".format(suffix))
file_samples = os.path.join(path, "samples_{}.npy".format(suffix))
file_ids = os.path.join(path, "ra_sat_{}.npy".format(suffix))
file_cells = os.path.join(path, "ra_cell_{}.npy".format(suffix))

np.save(file_samples, samples_array)
np.save(file_ids, ids_array)
np.save(file_cells, cells_array)

def process(path_in, path_out, suffix):
file_samples = os.path.join(path_in, "samples-{}.npy".format(suffix))
file_ids = os.path.join(path_in, "ids-{}.npy".format(suffix))
file_cells = os.path.join(path_in, "cells-{}.npy".format(suffix))
file_samples = os.path.join(path_in, "samples_{}.npy".format(suffix))
file_ids = os.path.join(path_in, "ra_sat_{}.npy".format(suffix))
file_cells = os.path.join(path_in, "ra_cell_{}.npy".format(suffix))

print("Loading ArrayDataset")
ds = ArrayDataset.from_files(
Expand Down
Loading

0 comments on commit 5a6725f

Please sign in to comment.