Skip to content

Commit

Permalink
fix: Added Airport Fee To Schema Files And Pipeline.Yaml In New York …
Browse files Browse the repository at this point in the history
…Taxi Trips Dataset (#476)

* fix: Added airport fee to schema files and pipeline.yaml.

* fix: Extended node size in order to resolve hanging
  • Loading branch information
nlarge-google authored Sep 16, 2022
1 parent 039ff61 commit d94105a
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@
"description": "",
"mode": "NULLABLE"
},
{
"name": "airport_fee",
"type": "NUMERIC",
"description": "",
"mode": "NULLABLE"
},
{
"name": "total_amount",
"type": "NUMERIC",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@
"description": "$0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began being levied in 2015.",
"mode": "NULLABLE"
},
{
"name": "airport_fee",
"type": "NUMERIC",
"description": "",
"mode": "NULLABLE"
},
{
"name": "total_amount",
"type": "NUMERIC",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2021 Google LLC
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,10 +37,10 @@
location="us-central1-c",
body={
"name": "new-york-taxi-trips",
"initial_node_count": 3,
"initial_node_count": 2,
"network": "{{ var.value.vpc_network }}",
"node_config": {
"machine_type": "e2-standard-4",
"machine_type": "e2-standard-8",
"oauth_scopes": [
"https://www.googleapis.com/auth/devstorage.read_write",
"https://www.googleapis.com/auth/cloud-platform",
Expand Down Expand Up @@ -75,14 +75,14 @@
"TARGET_GCS_PATH": "{{ var.json.new_york_taxi_trips.container_registry.green_trips_target_gcs_path }}",
"PIPELINE_NAME": "tlc_green_trips",
"START_YEAR": "2013",
"INPUT_CSV_HEADERS": '["vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code",\n "pickup_location_id", "dropoff_location_id", "passenger_count", "trip_distance", "fare_amount",\n "extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee",\n "imp_surcharge", "total_amount", "payment_type", "trip_type", "congestion_surcharge" ]',
"DATA_DTYPES": '{ "vendor_id": "str",\n "pickup_datetime": "datetime64[ns]",\n "dropoff_datetime": "datetime64[ns]",\n "store_and_fwd_flag": "str",\n "rate_code": "str",\n "pickup_location_id": "str",\n "dropoff_location_id": "str",\n "passenger_count": "str",\n "trip_distance": "float64",\n "fare_amount": "float64",\n "extra": "float64",\n "mta_tax": "float64",\n "tip_amount": "float64",\n "tolls_amount": "float64",\n "ehail_fee": "float64",\n "imp_surcharge": "float64",\n "total_amount": "float64",\n "payment_type": "str",\n "trip_type": "str",\n "congestion_surcharge": "float64" }',
"OUTPUT_CSV_HEADERS": '[ "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code",\n "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax",\n "tip_amount", "tolls_amount", "ehail_fee", "total_amount", "payment_type",\n "distance_between_service", "time_between_service", "trip_type", "imp_surcharge", "pickup_location_id",\n "dropoff_location_id", "data_file_year", "data_file_month" ]',
"INPUT_CSV_HEADERS": '["vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code",\n "pickup_location_id", "dropoff_location_id", "passenger_count", "trip_distance", "fare_amount",\n "extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee",\n "imp_surcharge", "total_amount", "payment_type", "trip_type", "congestion_surcharge", "airport_fee" ]',
"DATA_DTYPES": '{ "vendor_id": "str",\n "pickup_datetime": "datetime64[ns]",\n "dropoff_datetime": "datetime64[ns]",\n "store_and_fwd_flag": "str",\n "rate_code": "str",\n "pickup_location_id": "str",\n "dropoff_location_id": "str",\n "passenger_count": "str",\n "trip_distance": "float64",\n "fare_amount": "float64",\n "extra": "float64",\n "mta_tax": "float64",\n "tip_amount": "float64",\n "tolls_amount": "float64",\n "ehail_fee": "float64",\n "imp_surcharge": "float64",\n "total_amount": "float64",\n "payment_type": "str",\n "trip_type": "str",\n "congestion_surcharge": "float64",\n "airport_fee": "float64" }',
"OUTPUT_CSV_HEADERS": '[ "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code",\n "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax",\n "tip_amount", "tolls_amount", "ehail_fee", "airport_fee", "total_amount", "payment_type",\n "distance_between_service", "time_between_service", "trip_type", "imp_surcharge", "pickup_location_id",\n "dropoff_location_id", "data_file_year", "data_file_month" ]',
},
resources={
"request_memory": "12G",
"request_cpu": "1",
"request_ephemeral_storage": "16G",
"request_memory": "16G",
"request_cpu": "2",
"request_ephemeral_storage": "24G",
},
)

Expand Down Expand Up @@ -112,14 +112,14 @@
"TARGET_GCS_PATH": "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_target_gcs_path }}",
"PIPELINE_NAME": "tlc_yellow_trips",
"START_YEAR": "2011",
"INPUT_CSV_HEADERS": '[ "vendor_id", "pickup_datetime", "dropoff_datetime", "passenger_count", "trip_distance",\n "rate_code", "store_and_fwd_flag", "pickup_location_id", "dropoff_location_id",\n "payment_type", "fare_amount", "extra", "mta_tax", "tip_amount",\n "tolls_amount", "imp_surcharge", "total_amount", "congestion_surcharge" ]',
"DATA_DTYPES": '{ "vendor_id": "str",\n "pickup_datetime": "datetime64[ns]",\n "dropoff_datetime": "datetime64[ns]",\n "passenger_count": "str",\n "trip_distance": "float64",\n "rate_code": "str",\n "store_and_fwd_flag": "str",\n "pickup_location_id": "str",\n "dropoff_location_id": "str",\n "payment_type": "str",\n "fare_amount": "float64",\n "extra": "float64",\n "mta_tax": "float64",\n "tip_amount": "float64",\n "tolls_amount": "float64",\n "imp_surcharge": "float64",\n "total_amount": "float64",\n "congestion_surcharge": "float64" }',
"OUTPUT_CSV_HEADERS": '[ "vendor_id", "pickup_datetime", "dropoff_datetime", "passenger_count", "trip_distance",\n "rate_code", "store_and_fwd_flag", "payment_type", "fare_amount", "extra",\n "mta_tax", "tip_amount", "tolls_amount", "imp_surcharge", "total_amount",\n "pickup_location_id", "dropoff_location_id", "data_file_year", "data_file_month" ]',
"INPUT_CSV_HEADERS": '[ "vendor_id", "pickup_datetime", "dropoff_datetime", "passenger_count", "trip_distance",\n "rate_code", "store_and_fwd_flag", "pickup_location_id", "dropoff_location_id",\n "payment_type", "fare_amount", "extra", "mta_tax", "tip_amount",\n "tolls_amount", "imp_surcharge", "total_amount", "congestion_surcharge", "airport_fee" ]',
"DATA_DTYPES": '{ "vendor_id": "str",\n "pickup_datetime": "datetime64[ns]",\n "dropoff_datetime": "datetime64[ns]",\n "passenger_count": "str",\n "trip_distance": "float64",\n "rate_code": "str",\n "store_and_fwd_flag": "str",\n "pickup_location_id": "str",\n "dropoff_location_id": "str",\n "payment_type": "str",\n "fare_amount": "float64",\n "extra": "float64",\n "mta_tax": "float64",\n "tip_amount": "float64",\n "tolls_amount": "float64",\n "imp_surcharge": "float64",\n "total_amount": "float64",\n "congestion_surcharge": "float64",\n "airport_fee": "float64" }',
"OUTPUT_CSV_HEADERS": '[ "vendor_id", "pickup_datetime", "dropoff_datetime", "passenger_count", "trip_distance",\n "rate_code", "store_and_fwd_flag", "payment_type", "fare_amount", "extra",\n "mta_tax", "tip_amount", "tolls_amount", "imp_surcharge", "airport_fee",\n "total_amount", "pickup_location_id", "dropoff_location_id", "data_file_year", "data_file_month" ]',
},
resources={
"request_memory": "12G",
"request_cpu": "1",
"request_ephemeral_storage": "16G",
"request_memory": "16G",
"request_cpu": "2",
"request_ephemeral_storage": "24G",
},
)
delete_cluster = kubernetes_engine.GKEDeleteClusterOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ dag:
location: "us-central1-c"
body:
name: new-york-taxi-trips
initial_node_count: 3
initial_node_count: 2
network: "{{ var.value.vpc_network }}"
node_config:
machine_type: e2-standard-4
machine_type: e2-standard-8
oauth_scopes:
- https://www.googleapis.com/auth/devstorage.read_write
- https://www.googleapis.com/auth/cloud-platform
Expand Down Expand Up @@ -79,7 +79,7 @@ dag:
["vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code",
"pickup_location_id", "dropoff_location_id", "passenger_count", "trip_distance", "fare_amount",
"extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee",
"imp_surcharge", "total_amount", "payment_type", "trip_type", "congestion_surcharge" ]
"imp_surcharge", "total_amount", "payment_type", "trip_type", "congestion_surcharge", "airport_fee" ]
DATA_DTYPES: >-
{ "vendor_id": "str",
"pickup_datetime": "datetime64[ns]",
Expand All @@ -100,17 +100,18 @@ dag:
"total_amount": "float64",
"payment_type": "str",
"trip_type": "str",
"congestion_surcharge": "float64" }
"congestion_surcharge": "float64",
"airport_fee": "float64" }
OUTPUT_CSV_HEADERS: >-
[ "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code",
"passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax",
"tip_amount", "tolls_amount", "ehail_fee", "total_amount", "payment_type",
"tip_amount", "tolls_amount", "ehail_fee", "airport_fee", "total_amount", "payment_type",
"distance_between_service", "time_between_service", "trip_type", "imp_surcharge", "pickup_location_id",
"dropoff_location_id", "data_file_year", "data_file_month" ]
resources:
request_memory: "12G"
request_cpu: "1"
request_ephemeral_storage: "16G"
request_memory: "16G"
request_cpu: "2"
request_ephemeral_storage: "24G"
- operator: "GKEStartPodOperator"
description: "Run CSV transform within kubernetes pod"
args:
Expand Down Expand Up @@ -142,7 +143,7 @@ dag:
[ "vendor_id", "pickup_datetime", "dropoff_datetime", "passenger_count", "trip_distance",
"rate_code", "store_and_fwd_flag", "pickup_location_id", "dropoff_location_id",
"payment_type", "fare_amount", "extra", "mta_tax", "tip_amount",
"tolls_amount", "imp_surcharge", "total_amount", "congestion_surcharge" ]
"tolls_amount", "imp_surcharge", "total_amount", "congestion_surcharge", "airport_fee" ]
DATA_DTYPES: >-
{ "vendor_id": "str",
"pickup_datetime": "datetime64[ns]",
Expand All @@ -161,16 +162,17 @@ dag:
"tolls_amount": "float64",
"imp_surcharge": "float64",
"total_amount": "float64",
"congestion_surcharge": "float64" }
"congestion_surcharge": "float64",
"airport_fee": "float64" }
OUTPUT_CSV_HEADERS: >-
[ "vendor_id", "pickup_datetime", "dropoff_datetime", "passenger_count", "trip_distance",
"rate_code", "store_and_fwd_flag", "payment_type", "fare_amount", "extra",
"mta_tax", "tip_amount", "tolls_amount", "imp_surcharge", "total_amount",
"pickup_location_id", "dropoff_location_id", "data_file_year", "data_file_month" ]
"mta_tax", "tip_amount", "tolls_amount", "imp_surcharge", "airport_fee",
"total_amount", "pickup_location_id", "dropoff_location_id", "data_file_year", "data_file_month" ]
resources:
request_memory: "12G"
request_cpu: "1"
request_ephemeral_storage: "16G"
request_memory: "16G"
request_cpu: "2"
request_ephemeral_storage: "24G"
- operator: "GKEDeleteClusterOperator"
args:
task_id: "delete_cluster"
Expand Down

0 comments on commit d94105a

Please sign in to comment.