-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspark_sql.txt
107 lines (70 loc) · 4.01 KB
/
spark_sql.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
hadoop + hive sql + spark sql computations
pyspark --master yarn --num-executors 2 --executor-memory 4G --packages com.databricks:spark-csv_2.10:1.5.0 --conf spark.ui.port=11808
https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html
from pyspark.sql.types import *
from pyspark.sql import functions as F
# 1. PRIPRAVA
schema = StructType([
StructField("id", LongType()),
StructField("album", StringType()),
StructField("rok", ShortType()),
StructField("interpret", StringType()),
StructField("zanr", StringType()),
StructField("text", StringType())
])
df = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").option("delimiter", ",").schema(schema).load("/user/pascepet/data/lyrics.txt")
df.count()
df.filter(DF.interpret == "bob-dylan").count()
df.agg({"rok" : "max"}).show()
df.agg({"rok" : "min"}).show()
#collect()[0]
from pyspark.sql.functions import desc
df.groupby('zanr').count(desc('count')).sort().show(1)
# 2. CISTENI
(df.count() - df.filter((df.rok > 2018) | (df.rok < 1950)).count())
df = df.filter((df.rok < 2018) & (df.rok > 1950))
df = df.withColumn('text', F.regexp_replace(df.text, r'[,.;:?!()\[\]]', ''))
df = df.withColumn('text', F.lower(df.text))
df = df.withColumn('slova_poc', F.size(F.split(df.text, ' ')))
df = df.withColumn('text_upr', F.split(df.text, ' '))
countUnikat = F.udf(lambda a: len(set(a)))
df = df.withColumn('slova_poc_unik', countUnikat(df.text_upr))
stopw = sc.textFile("/user/pascepet/data/stopwords.txt").collect()
stopw = set(stopw)
countUniqNotStop = F.udf(lambda a: len(set(a).difference(stopw)))
df = df.withColumn('slova_poc_unik2', countUniqNotStop(df.text_upr))
df = df.drop('text_upr')
df.cache()
# 3. DOTAZY
df.groupBy(df.rok).count().show()
df.groupBy(df.interpret).count().toDF('interpret', 'pocet').filter("pocet >= 500").orderBy('pocet', ascending=False).show()
df.groupBy(df.interpret).agg({'slova_poc_unik': "avg"}).toDF('interpret', 'avg_unik_slov').orderBy('avg_unik_slov', ascending=False).show()
df.groupBy(df.interpret).agg({'slova_poc_unik2': "avg"}).toDF('interpret', 'avg_unik_slov').orderBy('avg_unik_slov', ascending=False).show()
df.groupBy(df.interpret).agg({'slova_poc_unik': 'avg', 'interpret': 'count'}).toDF('interpret', 'pocet_pisni', 'avg_unik_slov').filter("pocet_pisni >= 50").orderBy('avg_unik_slov', ascending=False).show()
df.groupBy(df.interpret).agg({'slova_poc_unik2': 'avg', 'interpret': 'count'}).toDF('interpret', 'avg_unik', 'pocet_pisni').filter("pocet_pisni >= 50").orderBy('avg_unik', ascending=False).show()
df.registerTempTable("songs")
slova_prum = sqlContext.sql("""select interpret, avg(slova_poc_unik) as slova_unik_prum, avg(slova_poc_unik2) as slova_unik2_prum, count(*) as pisne_pocet from songs
group by interpret""")
slova_prum.select('interpret', 'slova_unik_prum').orderBy(F.desc('slova_unik_prum')).show()
slova_prum.select('interpret', 'slova_unik2_prum').orderBy(F.desc('slova_unik2_prum')).show()
slova_prum.filter('pisne_pocet>=50').orderBy(F.desc('slova_unik_prum')).show()
slova_prum.filter('pisne_pocet>=50').orderBy(F.desc('slova_unik2_prum')).show()
# 4. MINING
words_top = df.flatMap(lambda r: r[5].split(" ")) \
.filter(lambda r: r not in stopw) \
.map(lambda r: (r, 1)) \
.reduceByKey(lambda a,b: a+b) \
.sortBy(lambda r: r[1], False)
words_top.take(20)
df = df.withColumn('is_love', F.when(F.regexp_extract(df.text, r'\b(love)\b', 1) == 'love', 1).otherwise(0))
df = df.withColumn('is_like', F.when(F.regexp_extract(df.text, r'\b(like)\b', 1) == 'like', 1).otherwise(0))
df = df.withColumn('is_know', F.when(F.regexp_extract(df.text, r'\b(know)\b', 1) == 'know', 1).otherwise(0))
interprets = df.groupBy('interpret').count() \
.toDF('interpret', 'pocet').filter("pocet >= 500")
interprets.show()
interprets_words = interprets.join(df, interprets.interpret==df.interpret) \
.drop(df.interpret) \
.select('interpret', 'is_love', 'is_like', 'is_know') \
.groupBy('interpret') \
.agg({'is_love':'avg', 'is_like':'avg', 'is_know':'avg'})
interprets_words.show()