-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspark_hw.txt
73 lines (61 loc) · 2 KB
/
spark_hw.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/*
HW2 SPARK
Course: BDT
Author: Martin Rypar
VYSLEDKY:
1) 0.013549499110061822 vs. 0.013132119386216022
2) 53
3) mesic: count
1: 3,
2: 5,
3: 0,
4: 15,
5: 0,
6: 0,
7: 38,
8: 10,
9: 2,
10: 3,
11: 3,
12: 1
*/
from PIL import Image
from PIL.ExifTags import TAGS
import StringIO
from pyspark.sql.types import *
from pyspark.sql.functions import hour, month, year, unix_timestamp, from_unixtime, avg, col, round, format_number
from datetime import datetime
def get_exif(str):
ret = {}
fn = StringIO.StringIO(str)
i = Image.open(fn)
info = i._getexif()
if info:
for tag, value in info.items():
decoded = TAGS.get(tag, tag)
if decoded in ["DateTime", "ExposureTime", "ExifImageWidth", "ExifImageHeight"]:
if decoded == "ExposureTime":
ret[decoded] = value[-1]
else:
ret[decoded] = value
return ret
imgs = sc.binaryFiles("/user/pascepet/data/images")
imgs2 = imgs.map(lambda detuple: detuple[-1])
imgs3 = imgs2.map(get_exif)
dfSchema = StructType([
StructField("DateTime", StringType(), True),
StructField("ExifImageHeight", LongType(), True),
StructField("ExifImageWidth", LongType(), True),
StructField("ExposureTime", LongType(), True) ])
df = sqlContext.createDataFrame(imgs3, dfSchema)
df = df.withColumn("ExposureTime", 1/col("ExposureTime"))
df2 = df.select(from_unixtime(unix_timestamp("DateTime", "yyyy:MM:dd HH:mm:ss")).alias("DateTime").cast(TimestampType()), df.ExposureTime, df.ExifImageWidth, df.ExifImageHeight)
tmp11 = df2.where(hour(col("DateTime")).isin([9,10,11,12,13,14,15]))
q11 = tmp11.agg(avg(col("ExposureTime"))).show()
tmp12 = df2.where(hour(col("DateTime")).isin([9,10,11,12,13,14,15])==False)
q12 = tmp12.agg(avg(col("ExposureTime"))).show()
q2 = df2.filter(df2.ExifImageHeight*df2.ExifImageWidth > 4000000).select().na.drop().count()
q2
q3 = df2.groupBy(month(df2.DateTime)).count()
q3 = df2.where(col("DateTime").isNotNull()).groupBy(month(df2.DateTime)).count()
q3.show()