Skip to content

Commit

Permalink
Make FIM workflow function
Browse files Browse the repository at this point in the history
  • Loading branch information
jpolchlo committed Mar 22, 2023
1 parent 5b2f1f9 commit a7aae0e
Showing 1 changed file with 57 additions and 63 deletions.
120 changes: 57 additions & 63 deletions workflows/fim/inundate_gms.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@ spec:
arguments:
parameters:
- name: hucfile-location
value: s3://noaa-hydro-data/jp-scratch/hucs-test.json
value: s3://noaa-hydro-data/jp-scratch/hucs.json
- name: hand-s3-location
value: s3://noaa-nws-owp-fim/hand_fim/fim_4_0_18_0/
- name: s3-mosaic-uri
value: s3://noaa-hydro-data/jp-scratch/mosaics/
- name: destination-bucket
value: noaa-hydro-data
- name: destination-prefix
value: fim-gms
- name: fim-image
value: 564456820271.dkr.ecr.us-east-1.amazonaws.com/noaa-fim:20230217
value: 564456820271.dkr.ecr.us-east-1.amazonaws.com/noaa-fim:20230317
- name: fim-parallelism
value: 4
volumes:
- name: noaa
persistentVolumeClaim:
Expand All @@ -26,27 +30,20 @@ spec:
templates:
- name: fim
steps:
- - name: prepare-environment
- - name: get-hucs
inline:
script:
image: bash:5.2
image: amazon/aws-cli:2.11.0
command: [bash]
source: |
[ ! -d "/shared/fim/hand-cache" ] && mkdir -pv /shared/fim/hand-cache && chown 1000:1000 /shared/fim/hand-cache || echo "hand-cache exists" ;
[ ! -d "/shared/fim/forecasts" ] && mkdir -pv /shared/fim/forecasts && chown 1000:1000 /shared/fim/forecasts || echo "forecasts exists" ;
[ ! -d "/shared/fim/maps" ] && mkdir -pv /shared/fim/maps && chown 1000:1000 /shared/fim/maps || echo "maps exists"
(>&2 [ ! -d "/shared/fim/hand-cache" ] && mkdir -pv /shared/fim/hand-cache && aws s3 cp --request-payer requester {{workflow.parameters.hand-s3-location}}gms_inputs.csv /shared/fim/hand-cache/ && chown 1000:1000 --recursive /shared/fim/hand-cache || >&2 echo "hand-cache exists" ) &&
(>&2 [ ! -d "/shared/fim/forecasts" ] && mkdir -pv /shared/fim/forecasts && chown 1000:1000 /shared/fim/forecasts || >&2 echo "forecasts exists") &&
(>&2 [ ! -d "/shared/fim/maps" ] && mkdir -pv /shared/fim/maps && chown 1000:1000 /shared/fim/maps || >&2 echo "maps exists") &&
aws s3 cp {{workflow.parameters.hucfile-location}} -
volumeMounts:
- name: noaa
mountPath: /shared

- - name: get-hucs
inline:
script:
image: amazon/aws-cli:2.11.0
command: [ bash ]
source: |
aws s3 cp {{workflow.parameters.hucfile-location}} -
- - name: generate-mosaic
template: process-huc
withParam: "{{steps.get-hucs.outputs.result}}"
Expand All @@ -70,12 +67,12 @@ spec:
mountPath: /shared
source: |
set -x ;
[ ! -d "/shared/fim/hand-cache/{{inputs.parameters.huc}}" -o -z "$(ls -A /shared/fim/hand-cache/{{inputs.parameters.huc}})" ] && aws s3 sync --request-payer requester {{workflow.parameters.hand-s3-location}}{{inputs.parameters.huc}} /shared/fim/hand-cache/{{inputs.parameters.huc}} && chown 1000:1000 --recursive /shared/fim/hand-cache/{{inputs.parameters.huc}} || echo "HAND data for HUC {{inputs.parameters.huc}} already cached" ;
[ ! -d "/shared/fim/forecasts/{{inputs.parameters.huc}}" ] && mkdir -pv /shared/fim/forecasts/{{inputs.parameters.huc}} && chown 1000:1000 /shared/fim/forecasts/{{inputs.parameters.huc}} || echo "Forecast dir for {{inputs.parameters.huc}} already exists" ;
[ ! -d "/shared/fim/maps/{{inputs.parameters.huc}}" ] && mkdir -pv /shared/fim/maps/{{inputs.parameters.huc}} && chown 1000:1000 /shared/fim/maps/{{inputs.parameters.huc}} || echo "Map dir for {{inputs.parameters.huc}} already exists"
([ ! -d "/shared/fim/hand-cache/{{inputs.parameters.huc}}" -o -z "$(ls -A /shared/fim/hand-cache/{{inputs.parameters.huc}})" ] && aws s3 sync --request-payer requester {{workflow.parameters.hand-s3-location}}{{inputs.parameters.huc}} /shared/fim/hand-cache/{{inputs.parameters.huc}} && chown 1000:1000 --recursive /shared/fim/hand-cache/{{inputs.parameters.huc}} || echo "HAND data for HUC {{inputs.parameters.huc}} already cached") &&
([ ! -d "/shared/fim/forecasts/{{inputs.parameters.huc}}" ] && mkdir -pv /shared/fim/forecasts/{{inputs.parameters.huc}} && chown 1000:1000 /shared/fim/forecasts/{{inputs.parameters.huc}} || echo "Forecast dir for {{inputs.parameters.huc}} already exists") &&
([ ! -d "/shared/fim/maps/{{inputs.parameters.huc}}" ] && mkdir -pv /shared/fim/maps/{{inputs.parameters.huc}} && chown 1000:1000 /shared/fim/maps/{{inputs.parameters.huc}} || echo "Map dir for {{inputs.parameters.huc}} already exists")
- - name: collect-forecast
- - name: collect-forecasts
inline:
script:
image: python:3.10
Expand Down Expand Up @@ -124,69 +121,66 @@ spec:
reaches = set([x[0] for x in cur.fetchall()])
target_time = datetime.now() - timedelta(hours=2)
nwm_uri = target_time.strftime('s3://noaa-nwm-pds/nwm.%Y%m%d/short_range/nwm.t%Hz.short_range.channel_rt.f001.conus.nc')
fs = S3FileSystem(anon=True)
nwm = h5py.File(fs.open(nwm_uri), 'r')
ids = np.array(nwm['feature_id'])
[ixs, feature_ids] = list(zip(*[(i,x) for (i,x) in enumerate(ids) if x in reaches]))
discharge = nwm['streamflow'][list(ixs)]
pd.DataFrame(zip(feature_ids, discharge), columns=['feature_id', 'discharge']).to_csv(f'/shared/fim/forecasts/{huc_string}/forecast_001.csv')
for t in range(1, 19):
nwm_uri = target_time.strftime(f's3://noaa-nwm-pds/nwm.%Y%m%d/short_range/nwm.t%Hz.short_range.channel_rt.f{t:03d}.conus.nc')
with fs.open(nwm_uri) as f:
with h5py.File(f, 'r') as nwm:
ids = np.array(nwm['feature_id'])
[ixs, feature_ids] = list(zip(*[(i,x) for (i,x) in enumerate(ids) if x in reaches]))
discharge = nwm['streamflow'][list(ixs)]
- - name: identify-level-paths
inline:
script:
image: bash:5.2
command: [ bash ]
volumeMounts:
- name: noaa
mountPath: /shared
source: |
apk add -q sed ;
cat /shared/fim/hand-cache/{{inputs.parameters.huc}}/branch_id.lst | sed '/^0$/d;s/^\([0-9]*\)$/"\1"/' | sed -z 's/\n/,/g' | sed 's/^\(.*[^,]\),\?/[\1]/'
pd.DataFrame(zip(feature_ids, discharge), columns=['feature_id', 'discharge']).to_csv(f'/shared/fim/forecasts/{huc_string}/forecast_{t:03d}.csv')
- - name: generate-lp-tiff
withParam: "{{steps.identify-level-paths.outputs.result}}"
inline:
securityContext:
runAsGroup: 1000
runAsUser: 1000
nodeSelector:
node-type: worker
script:
image: '{{workflow.parameters.fim-image}}'
command: [bash]
nodeSelector:
node-type: worker
volumeMounts:
- name: noaa
mountPath: /shared
resources:
requests:
cpu: 1024m
memory: 2Gi
source: |
/foss_fim/tools/inundation.py \
-r /shared/fim/hand-cache/{{inputs.parameters.huc}}/branches/{{item}}/rem_zeroed_masked_{{item}}.tif \
-c /shared/fim/hand-cache/{{inputs.parameters.huc}}/branches/{{item}}/gw_catchments_reaches_filtered_addedAttributes_{{item}}.tif \
-b /shared/fim/hand-cache/{{inputs.parameters.huc}}/branches/{{item}}/gw_catchments_reaches_filtered_addedAttributes_crosswalked_{{item}}.gpkg \
-t /shared/fim/hand-cache/{{inputs.parameters.huc}}/branches/{{item}}/hydroTable_{{item}}.csv \
-f /shared/fim/forecasts/{{inputs.parameters.huc}}/forecast_001.csv \
-i /shared/fim/maps/{{inputs.parameters.huc}}/nwm_short_range_f001_{{inputs.parameters.huc}}_branch_{{item}}.tif
outputs:
parameters:
- name: branch
value: '{{item}}'
for t in $(seq -f "%03g" 1 18)
do
/foss_fim/tools/inundate_gms.py \
-y /shared/fim/hand-cache/ \
-u {{inputs.parameters.huc}} \
-f /shared/fim/forecasts/{{inputs.parameters.huc}}/forecast_${t}.csv \
-i /shared/fim/maps/{{inputs.parameters.huc}}/nwm_short_range_f${t}_{{inputs.parameters.huc}}.tif \
-w {{workflow.parameters.fim-parallelism}} \
-o /tmp/inundation.csv
/foss_fim/tools/mosaic_inundation.py \
-i /tmp/inundation.csv \
-t inundation_rasters \
-w 0 \
-m /shared/fim/maps/{{inputs.parameters.huc}}/nwm_short_range_f${t}_{{inputs.parameters.huc}}.tif
done
- - name: collect-mosaic
inputs:
parameters:
- name: branches
# This is separated from the above because the previous step has trouble
# running aws cli with IRSA-provided credentials when the group and user are
# specified. These are given explicitly so that the /shared volume doesn't
# get polluted with files owned by root
- - name: send-to-s3
inline:
securityContext:
runAsGroup: 1000
runAsUser: 1000
script:
image: '{{workflow.parameters.fim-image}}'
command: [ python ]
nodeSelector:
node-type: worker
image: amazon/aws-cli:2.11.0
command: [bash]
source: |
for t in $(seq -f "%03g" 1 18)
do
aws s3 cp /shared/fim/maps/{{inputs.parameters.huc}}/nwm_short_range_f${t}_{{inputs.parameters.huc}}.tif {{workflow.parameters.s3-mosaic-uri}}{{inputs.parameters.huc}}/nwm_short_range_f${t}_{{inputs.parameters.huc}}.tif
done
volumeMounts:
- name: noaa
mountPath: /shared
source: |

0 comments on commit a7aae0e

Please sign in to comment.