-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathglue_job_1.py
52 lines (39 loc) · 1.57 KB
/
glue_job_1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import IntegerType,DecimalType
from pyspark.sql.functions import to_date, coalesce
RAW_PATH = 's3://data-rawzone/data/2m_Sales_Records.csv'
PROCESSING_PATH = 's3://data-processingzone/data/'
def convert_to_parquet(path_source,path_target):
df = (
spark.read.format("csv")
.option("sep", ",")
.option("header", True)
.load(path_source)
)
for name in df.schema.names:
df = df.withColumnRenamed(name, name.replace(' ', '_').upper())
for column in df.columns:
if column in ('ORDER_ID','UNITS_SOLD'):
df = df.withColumn(column, df[f'{column}'].cast(IntegerType()))
elif column in ('UNIT_PRICE','UNIT_COST','TOTAL_REVENUE','TOTAL_COST','TOTAL_PROFIT'):
df = df.withColumn(column, df[f'{column}'].cast(DecimalType(10,2)))
elif column in ('ORDER_DATE','SHIP_DATE'):
df = df.withColumn(column, coalesce(to_date(f'{column}', 'M/d/yyyy'), to_date(f'{column}', 'MM/dd/yyyy')))
df.cache()
(
df.coalesce(1)
.write
.format("parquet")
.mode("overwrite")
.save(path_target)
)
if __name__ == "__main__":
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
convert_to_parquet(RAW_PATH, PROCESSING_PATH)