-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy path提高并行度.txt
37 lines (25 loc) · 989 Bytes
/
提高并行度.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
①$SPARK_HOME/conf的配置文件spark-env.sh中修改
spark.sql.shuffle.partitions
spark.default.parallelism
②
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
defPar=10
spark=SparkSession.builder.appName("spark_skew_test").master("local[2]").config("spark.default.parallelism",defPar).getOrCreate();
③在xxxByKey中指定numPartitions
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON']='/home/appleyuchi/anaconda3/bin/python'
conf = SparkConf().setAppName("wordcount").setMaster("local[2]")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
inputdata = sc.textFile("hdfs://Desktop:9000/rdd1.csv")
output = inputdata.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b,numPartitions=5)
result = output.collect()
for i in result:
print(i)
sc.stop()