Skip to content

Commit

Permalink
[DWD Obs] Cleanup parser function
Browse files Browse the repository at this point in the history
  • Loading branch information
gutzbenj committed Jan 12, 2025
1 parent d192a46 commit 9b22c2b
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 104 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Refactor
- \[DWD Obs\] Make the download function more flexible using threadpool
- \[DWD Obs\] Cleanup parser function

### Fix
- \[DWD Obs\] Reduce unnecessary file index calls during retrieval of data for stations with multiple files
Expand Down
179 changes: 75 additions & 104 deletions wetterdienst/provider/dwd/observation/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,24 @@
from __future__ import annotations

import logging
from io import BytesIO, StringIO
from typing import TYPE_CHECKING

import polars as pl
from polars import selectors as cs

from wetterdienst.metadata.columns import Columns
from wetterdienst.metadata.period import Period
from wetterdienst.metadata.resolution import Resolution
from wetterdienst.provider.dwd.metadata import DatetimeFormat
from wetterdienst.provider.dwd.observation.metadata import (
DwdObservationMetadata,
)

if TYPE_CHECKING:
from io import BytesIO

from wetterdienst.core.timeseries.metadata import DatasetModel

log = logging.getLogger(__name__)

# Parameter names used to create full 1 minute precipitation dataset wherever those
# columns are missing (which is the case for non historical data)
PRECIPITATION_PARAMETERS = (
DwdObservationMetadata.minute_1.precipitation.precipitation_height_droplet.name_original,
DwdObservationMetadata.minute_1.precipitation.precipitation_height_rocker.name_original,
)

PRECIPITATION_MINUTE_1_QUALITY = DwdObservationMetadata.minute_1.precipitation.quality

DROPPABLE_PARAMETERS = {
Expand All @@ -53,26 +46,21 @@
"strahlungstemperatur",
}

DATE_FIELDS_REGULAR = {
Columns.DATE.value,
Columns.START_DATE.value,
Columns.END_DATE.value,
}

DWD_TO_ENGLISH_COLUMNS_MAPPING = {
"stations_id": Columns.STATION_ID.value,
"mess_datum": Columns.DATE.value,
"von_datum": Columns.START_DATE.value,
"bis_datum": Columns.END_DATE.value,
"mess_datum_beginn": Columns.START_DATE.value,
"mess_datum_ende": Columns.END_DATE.value,
"stationshoehe": Columns.HEIGHT.value,
"geobreite": Columns.LATITUDE.value,
"geogr.breite": Columns.LATITUDE.value,
"geolaenge": Columns.LONGITUDE.value,
"geogr.laenge": Columns.LONGITUDE.value,
"stationsname": Columns.NAME.value,
"bundesland": Columns.STATE.value,
COLUMNS_MAPPING = {
"stations_id": "station_id",
"mess_datum": "date",
"von_datum": "date",
"bis_datum": "end_date",
"stationshoehe": "height",
"geobreite": "latitude",
"geogr.breite": "latitude",
"geolaenge": "longitude",
"geogr.laenge": "longitude",
# those two are only used in the historical 1 minute precipitation data
# we keep start_date and end_date as it is internally named date
# after exploding the date ranges
"mess_datum_beginn": "start_date",
"mess_datum_ende": "end_date",
}


Expand All @@ -95,7 +83,7 @@ def parse_climate_observations_data(
polars.LazyFrame with requested data, for different station ids the data is
still put into one DataFrame
"""
if dataset.resolution.value == Resolution.SUBDAILY and dataset == DwdObservationMetadata.subdaily.wind_extreme:
if dataset == DwdObservationMetadata.subdaily.wind_extreme:
data = [
_parse_climate_observations_data(filename_and_file, dataset, period)
for filename_and_file in filenames_and_files
Expand All @@ -107,9 +95,10 @@ def parse_climate_observations_data(
except ValueError:
return data[0]
else:
if len(filenames_and_files) > 1:
raise ValueError("only one file expected")
return _parse_climate_observations_data(filenames_and_files[0], dataset, period)
data = []
for filename_and_file in filenames_and_files:
data.append(_parse_climate_observations_data(filename_and_file, dataset, period))
return pl.concat(data)


def _parse_climate_observations_data(
Expand All @@ -130,101 +119,83 @@ def _parse_climate_observations_data(
provided or local file is not found or has no data in it
"""
filename, file = filename_and_file

try:
df = pl.read_csv(
source=StringIO(file.read().decode("latin1").replace(" ", "")),
source=file,
separator=";",
null_values=["-999"],
infer_schema_length=0,
).lazy()
)
df = df.lazy()
except pl.exceptions.SchemaError:
log.warning(f"The file representing {filename} could not be parsed and is skipped.")
return pl.LazyFrame()
except ValueError:
log.warning(f"The file representing {filename} is None and is skipped.")
return pl.LazyFrame()

df = df.with_columns(cs.string().str.strip_chars())
df = df.with_columns(cs.string().replace("-999", None), cs.numeric().replace(-999, None))
# Column names contain spaces, so strip them away.
df = df.rename(mapping=lambda col: col.strip().lower())

# End of record (EOR) has no value, so drop it right away.
df = df.drop(*DROPPABLE_PARAMETERS, strict=False)

# Assign meaningful column names (baseline).
df = df.rename(mapping=lambda col: COLUMNS_MAPPING.get(col, col))
if dataset == DwdObservationMetadata.minute_1.precipitation:
# Need to unfold historical data, as it is encoded in its run length e.g.
# from time X to time Y precipitation is 0
if period == Period.HISTORICAL:
df = df.with_columns(
pl.col("mess_datum_beginn").cast(str).str.to_datetime(DatetimeFormat.YMDHM.value, time_zone="UTC"),
pl.col("mess_datum_ende").cast(str).str.to_datetime(DatetimeFormat.YMDHM.value, time_zone="UTC"),
)
df = df.with_columns(
pl.datetime_ranges(pl.col("mess_datum_beginn"), pl.col("mess_datum_ende"), interval="1m").alias(
"mess_datum",
),
)
df = df.drop(
"mess_datum_beginn",
"mess_datum_ende",
)
# Expand dataframe over calculated date ranges -> one datetime per row
df = df.explode("mess_datum")
# this is a special case, we return as the dates are already parsed and everything is done
return _transform_minute_1_precipitation_historical(df)
else:
df = df.with_columns(
[pl.all(), *[pl.lit(None, pl.String).alias(par) for par in PRECIPITATION_PARAMETERS]],
)
df = df.with_columns(
pl.col("mess_datum").cast(str).str.to_datetime(DatetimeFormat.YMDHM.value, time_zone="UTC"),
missing_parameters = (
DwdObservationMetadata.minute_1.precipitation.precipitation_height_droplet.name_original,
DwdObservationMetadata.minute_1.precipitation.precipitation_height_rocker.name_original,
)
elif dataset == DwdObservationMetadata.minute_5.precipitation:
# apparently historical datasets differ from recent and now having all columns as described in the
# parameter enumeration when recent and now datasets only have precipitation form and
# precipitation height but not rocker and droplet information
columns = ["stations_id", "mess_datum"]
for parameter in dataset:
columns.append(parameter.name_original)

df = df.select(
pl.lit(None, dtype=pl.String).alias(col) if col not in df.collect_schema().names() else pl.col(col)
for col in columns
)
df = df.with_columns(pl.lit(None, pl.String).alias(parameter) for parameter in missing_parameters)
elif dataset == DwdObservationMetadata.minute_5.precipitation and period != Period.HISTORICAL:
missing_parameters = [
DwdObservationMetadata.minute_5.precipitation.precipitation_rocker.name_original,
DwdObservationMetadata.minute_5.precipitation.precipitation_droplet.name_original,
]
df = df.with_columns(pl.lit(None, dtype=pl.Float64).alias(parameter) for parameter in missing_parameters)
# Special handling for hourly solar data, as it has more date columns
elif dataset == DwdObservationMetadata.hourly.solar:
# Fix real date column by cutting of minutes
df = df.with_columns(pl.col("mess_datum").map_elements(lambda date: date[:-3], return_dtype=pl.String))
df = df.with_columns(pl.col("date").str.head(-3))
elif dataset == DwdObservationMetadata.subdaily.wind_extreme:
if "FX3" in filename:
alias = "qn_8_3"
elif "FX6" in filename:
alias = "qn_8_6"
else:
raise ValueError(f"Unknown dataset for wind extremes, expected FX3 or FX6 in filename {filename}")
df = df.select(
pl.all().exclude("qn_8"),
pl.col("qn_8").alias("qn_8_3" if "fx_911_3" in df.columns else "qn_8_6"),
pl.col("qn_8").alias(alias),
)

if dataset.resolution.value in (Resolution.MONTHLY, Resolution.ANNUAL):
df = df.drop("bis_datum", "mess_datum_ende", strict=False)
df = df.rename(mapping={"mess_datum_beginn": "mess_datum"})

fmt = None
if dataset.resolution.value == Resolution.MINUTE_5:
fmt = "%Y%m%d%H%M"
elif dataset.resolution.value == Resolution.MINUTE_10:
fmt = "%Y%m%d%H%M"
elif dataset.resolution.value == Resolution.HOURLY:
fmt = "%Y%m%d%H%M"
elif dataset.resolution.value == Resolution.SUBDAILY:
fmt = "%Y%m%d%H%M"
elif dataset.resolution.value == Resolution.DAILY:
fmt = "%Y%m%d"
elif dataset.resolution.value == Resolution.MONTHLY:
fmt = "%Y%m%d"
elif dataset.resolution.value == Resolution.ANNUAL:
fmt = "%Y%m%d"
df = df.drop("end_date")
# prepare date column
df = df.with_columns(pl.col("date").cast(pl.String).str.pad_end(12, "0"))
return df.with_columns(
pl.col("date").str.to_datetime("%Y%m%d%H%M", time_zone="UTC"),
)

if fmt:
if dataset.resolution.value in (Resolution.HOURLY, Resolution.SUBDAILY):
df = df.with_columns(pl.col("mess_datum") + "00")
df = df.with_columns(
pl.col("mess_datum").str.to_datetime(fmt, time_zone="UTC"),
)

# Assign meaningful column names (baseline).
return df.rename(mapping=lambda col: DWD_TO_ENGLISH_COLUMNS_MAPPING.get(col, col))
def _transform_minute_1_precipitation_historical(df: pl.LazyFrame) -> pl.LazyFrame:
"""We need to unfold historical data, as it is encoded in its run length e.g.
from time X to time Y precipitation is 0
"""
df = df.with_columns(
pl.col("start_date").cast(str).str.to_datetime("%Y%m%d%H%M", time_zone="UTC"),
pl.col("end_date").cast(str).str.to_datetime("%Y%m%d%H%M", time_zone="UTC"),
)
df = df.with_columns(
pl.datetime_ranges(pl.col("start_date"), pl.col("end_date"), interval="1m").alias(
"date",
),
)
df = df.drop(
"start_date",
"end_date",
)
# Expand dataframe over calculated date ranges -> one datetime per row
return df.explode("date")

0 comments on commit 9b22c2b

Please sign in to comment.