2016-12-15
Spark Streaming Programming Guide

2016-12-09
SparkDataFrameLikeSql

The idea of spark Datafame may be inspired from dataframe of pandas which is a package of python for structure data processing. On my opinion, dataframe can by prefered by the people with BI(business intelligence) background for high development efficiency.

DataFrame in Spark could by registered as something which could be considered approximately as a virtual table, therefore anyone who has expierence of SQL could explore the data at quite a low cost of time.

This article will focus on some dataframe processing method without the help of registering a virtual table and executing SQL, however the corresponding SQL operations such as SELECT, WHERE, GROUPBY, MIN, MAX, COUNT, SUM ,DISTINCT, ORDERBY, DESC/ASC, JOIN and GROUPBY TOP will be supplied for a better understanding of dataframe in spark.

prepare test data

Firstly we make a DataFrame object a by reading a json file

1
2
3
4
5
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val a = sqlContext.read.json("people.json")

and the content of people.json is as below

1
2
3
4
5
6
7
{"name":"Michael" , "age":23 ,"depart":"A","salary":3000 }
{"name":"Dan" , "age":23 ,"depart":"A","salary":3500 }
{"name":"Alex" , "age":23 ,"depart":"A","salary":3600 }
{"name":"Ben" , "age":23 ,"depart":"A","salary":3700 }
{"name":"Andy" , "age":30 ,"depart":"B","salary":4000 }
{"name":"Justin" , "age":19 ,"depart":"A","salary":5000 }
{"name":"Jack" , "age":19 ,"depart":"B","salary":2000 }

let us image a as a Table which is stored in a RDS database such as MySQL.

desc

1
desc people;
1
2
3
4
5
6
scala> a.printSchema
root
|-- age: long (nullable = true)
|-- depart: string (nullable = true)
|-- name: string (nullable = true)
|-- salary: long (nullable = true)

SELECT

1
select name from people;
1
2
3
a.select("name").show
a.select($"name").show
a.select(a("name")).show

the three methods above are equivelent.

WHERE

1
select name,age from people where age = 23
1
2
a.select("name", "age").where($"age"===23).show
a.select("name", "age").filter($"age"===23).show

MIN,MAX,SUM,COUNT

1
select min(age), max(age), sum(salary), count(age) from people
1
2
a.select(min("age"),max("age"),sum("salary"),count("age")).show
a.agg(min("age"),max("age"),sum("salary"),count("age")).show

and the result is

1
2
3
4
5
6
+--------+--------+-----------+----------+
|min(age)|max(age)|sum(salary)|count(age)|
+--------+--------+-----------+----------+
| 19| 30| 24800| 7|
+--------+--------+-----------+----------+

COUNT DISTINCT

1
select count (distinct age) , count ( distinct name ) from people
1
2
a.select(count("age"),countDistinct("age")).show
a.agg(count("age"), countDistinct("name")).show

and the result is

1
2
3
4
5
+-------------------+--------------------+
|count(DISTINCT age)|count(DISTINCT name)|
+-------------------+--------------------+
| 7| 3|
+-------------------+--------------------+

ORDERBY desc

1
select * from people orderby age desc, name desc
1
a.sort($"age".desc,$"name".desc).show
1
2
3
4
5
6
7
8
9
10
11
+---+------+-------+------+
|age|depart| name|salary|
+---+------+-------+------+
| 30| B| Andy| 4000|
| 23| A|Michael| 3000|
| 23| A| Dan| 3500|
| 23| A| Ben| 3700|
| 23| A| Alex| 3600|
| 19| A| Justin| 5000|
| 19| B| Jack| 2000|
+---+------+-------+------+

inner join, left outer join and convert null to a default value

first we make another dataframe based on a

1
2
3
4
5
6
7
8
9
10
val c = a.filter(not ($"age"===23))
scala> c.show
+---+------+------+------+
|age|depart| name|salary|
+---+------+------+------+
| 30| B| Andy| 4000|
| 19| A|Justin| 5000|
| 19| B| Jack| 2000|
+---+------+------+------+

now we try to join a and c

1
2
3
4
5
6
7
8
9
10
select
a.age as a_age,
if(c.age is null, 0, c.age) as c_age,
a.depart as a_depart
from
a
left outer join
c
on
a.age = c.age

the cording dataframe form is

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scala> a.join(c,a("age")===c("age"),"left").select(a("age").alias("a_age"),c("age").alias("c_age"),a("depart").alias("a_depart")).na.fill(0,Seq("c_age")).show
+-----+-----+--------+
|a_age|c_age|a_depart|
+-----+-----+--------+
| 23| 0| A|
| 23| 0| A|
| 23| 0| A|
| 23| 0| A|
| 30| 30| B|
| 19| 19| A|
| 19| 19| A|
| 19| 19| B|
| 19| 19| B|
+-----+-----+--------+

what if those records whose c.age is null is execluded

1
2
3
4
5
6
7
8
9
10
11
12
select
a.age as a_age,
if(c.age is null, 0, c.age) as c_age,
a.depart as a_depart
from
a
left outer join
c
on
a.age = c.age
where
c.age is not null

the na.drop method provided this function

1
2
3
4
5
6
7
8
9
10
scala> a.join(c,a("age")===c("age"),"left").select(a("age").alias("a_age"),c("age").alias("c_age"),a("depart").alias("a_depart")).na.drop.show
+-----+-----+--------+
|a_age|c_age|a_depart|
+-----+-----+--------+
| 30| 30| B|
| 19| 19| A|
| 19| 19| A|
| 19| 19| B|
| 19| 19| B|
+-----+-----+--------+

Top N for group

use window operation can help

1
2
3
4
5
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"depart")
import org.apache.spark.sql.expressions.Window
val rankAsc = row_number().over(w.orderBy($"salary")).alias("rank_asc")
val rankDesc = row_number().over(w.orderBy($"salary".desc)).alias("rank_desc")

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
scala> a.select($"*", rankAsc, rankDesc).filter($"rank_asc"<3 || $"rank_desc" >= 2).show
+---+------+-------+------+--------+---------+
|age|depart| name|salary|rank_asc|rank_desc|
+---+------+-------+------+--------+---------+
| 30| B| Andy| 4000| 2| 1|
| 19| B| Jack| 2000| 1| 2|
| 23| A| Ben| 3700| 4| 2|
| 23| A| Alex| 3600| 3| 3|
| 23| A| Dan| 3500| 2| 4|
| 23| A|Michael| 3000| 1| 5|
+---+------+-------+------+--------+---------+
scala> a.select($"*", rankAsc, rankDesc).filter($"rank_asc"<3 && $"rank_desc" >= 2).show
+---+------+-------+------+--------+---------+
|age|depart| name|salary|rank_asc|rank_desc|
+---+------+-------+------+--------+---------+
| 19| B| Jack| 2000| 1| 2|
| 23| A| Dan| 3500| 2| 4|
| 23| A|Michael| 3000| 1| 5|
+---+------+-------+------+--------+---------+

what’s more, it is clearly select * in SQL could by implemented by select($"*")

2016-12-05
save spark rdd into Mysql

1
2
3
4
5
6
import java.util.Properties
val target_df = targetRdd.toDF()
val prop = new Properties()
prop.put("user", "username")
prop.put("password", "password")
ret_df.write.mode("append").jdbc("jdbc:mysql://host:port/database","table",prop)

2016-11-28
SparkStreamLearning

input source

kafka
akka

output

redis
kafka
elasticSearch
hive
mySql

2016-08-19
Spark 2.0 Introduction

Spark 2.0 MLib Introduction

As of Spark 2.0, the RDD-based APIs in the spark.mllib package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the spark.ml package.

Spark2.0 ,在spark.mllib中的基于RDD的机器学习APIs将会进入维护模式。现在机器学习的主要的API基于DataFrame,位于spark.ml中。

What are the implications?

MLlib will still support the RDD-based API in spark.mllib with bug fixes.
MLlib will not add new features to the RDD-based API.
In the Spark 2.x releases, MLlib will add features to the DataFrames-based API to reach feature parity with the RDD-based API.
After reaching feature parity (roughly estimated for Spark 2.2), the RDD-based API will be deprecated.
The RDD-based API is expected to be removed in Spark 3.0.

Why is MLlib switching to the DataFrame-based API?

DataFrames provide a more user-friendly API than RDDs. The many benefits of DataFrames include Spark Datasources, SQL/DataFrame queries, Tungsten and Catalyst optimizations, and uniform APIs across languages.
The DataFrame-based API for MLlib provides a uniform API across ML algorithms and across multiple languages.
DataFrames facilitate practical ML Pipelines, particularly feature transformations. See the Pipelines guide for details.

2016-08-19
Save DataFrame into a partitioned table of HIVE

How to save a spark DataFrame as a patitioned hive table

utilise saveAsTable method

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
val conf = new SparkConf().setAppName("Simple Application").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext.sql("use database")
val cmd =
"""
select
col1,
col2
from
table
""".stripMargin
val yourDf = hiveContext.sql(cmd)
yourDf.printSchema()
yourDf.write.partitionBy("col2").saveAsTable("partitionTableName")

2016-08-11
SparkPassFunctions

1
2
3
4
5
6
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)}
}

2016-04-25
SPARK的宽依赖和窄依赖

spark 的各种不同的transformation操作,可以根据是否依赖父RDDs的所有partision分为‘窄依赖’和‘宽依赖’,简单的说,有shuffle操作的就是宽依赖,而没有shuffle操作的就是窄依赖。
对于窄依赖,spark会尽量将他们划分为同一个stage,而宽依赖则会称为另外的stage。