Build In Functions

前言

需要处理的数据结构往往是复杂的,在 Spark 中该如何操作 Map,Array,struct 这些结构呢?Spark 已经为我们提供了很多内置函数来处理这一切。这些函数大多定义在 org.apache.saprk.sql.functions。

日期处理

获取当前 date,timestamp

current_date,current_timestamp,包括下面说的处理函数可以直接在 sql 语句中使用,spark.sql(“Your  SQL”)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
scala> spark.emptyDataFrame.withColumn("current_date", current_date).schema
res3: org.apache.spark.sql.types.StructType = StructType(StructField(current_date,DateType,false))

scala> spark.range(1).select(current_date).show()
+--------------+
|current_date()|
+--------------+
| 2020-03-17|
+--------------+


scala> spark.range(1).select(current_timestamp).show()
+--------------------+
| current_timestamp()|
+--------------------+
|2020-03-17 10:37:...|
+--------------------+


scala> spark.range(1).select(current_timestamp).show(false)
+-----------------------+
|current_timestamp() |
+-----------------------+
|2020-03-17 10:39:53.896|
+-----------------------+

column 转换到 date,timestamp,时间戳

to_date,to_timestamp 接受 date,timestamp ,string 类型的参数。如果参数为 string,指定可选的 format,转换失败不会报错,会返回 null。默认的 format 是 yyyy-MM-dd HH:mm:ss。

unix_timestamp 返回当前时间戳或者根据指定字段生成时间戳,这个经常会用到。

from_unixtime 转换时间戳到 timestamp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
scala> spark.range(1).select(current_date).withColumn("to_timestamp", to_timestamp($"current_date()", "yyyy-MM-dd HH:mm:ss")).show()
+--------------+-------------------+
|current_date()| to_timestamp|
+--------------+-------------------+
| 2020-03-17|2020-03-17 00:00:00|
+--------------+-------------------+

scala> spark.range(1).select(current_timestamp).withColumn("to_date", to_date($"current_timestamp()")).show()
+--------------------+----------+
| current_timestamp()| to_date|
+--------------------+----------+
|2020-03-17 10:43:...|2020-03-17|
+--------------------+----------+

scala> Seq("2020-03-17 00:00:00").toDF("time").withColumn("unix_timestamp", unix_timestamp($"time")).show()
+-------------------+--------------+
| time|unix_timestamp|
+-------------------+--------------+
|2020-03-17 00:00:00| 1584374400|
+-------------------+--------------+


scala> spark.range(1).select(unix_timestamp).show()
+--------------------------------------------------------+
|unix_timestamp(current_timestamp(), yyyy-MM-dd HH:mm:ss)|
+--------------------------------------------------------+
| 1584413996|
+--------------------------------------------------------+

scala> spark.sql("select current_date as current_date").show()
+------------+
|current_date|
+------------+
| 2020-03-17|
+------------+

scala> Seq(1582720143).toDF("time").withColumn("from_unixtime", from_unixtime($"time")).show()
+----------+-------------------+
| time| from_unixtime|
+----------+-------------------+
|1582720143|2020-02-26 20:29:03|
+----------+-------------------+

格式化日期

date_format 用来格式化日期

1
2
3
4
5
6
scala> Seq("2020-03-17 00:00:00").toDF("time").withColumn("date_foramt", date_format($"time", "yyyy/MM/dd hh:mm:ss")).show()
+-------------------+-------------------+
| time| date_foramt|
+-------------------+-------------------+
|2020-03-17 00:00:00|2020/03/17 12:00:00|
+-------------------+-------------------+

复杂数据结构处理

就跟我们平常使用这些数据结构一样,取元素,追加元素,求长度,根据里面的数据做处理,修改数组,记录状态等。 org.apache.spark.sql.functions 中 @group 为 collection_funcs 的函数就是用来实现这些功能的。下面只介绍下经常遇到和使用的函数,更多的可以去看源码。

处理 Array

Method Description
array_contains 某个值是否在 array 中。
array_join 类似于 Python 中的 “,”.join。指定分隔符和可选的空值替换符连接 array 中的值。
element_at 按索引取值,索引从 1 开始。
explode 平铺数组中内容到多行
reverse 反转数组
array_zip 合并数组到结构体,array1 array2 →  array

这里 explode 比较重要,像一个用户的多个事件被存在了数组里面,就可以使用 explode 提取出来,变成一行一个事件处理。explode 变多行,那么我想把数组里面的值依次取出来变成多列呢?

这是一个问题,你可以去了解下 transform,想想如何实现。

再有一些比较复杂的操作,比如我们想通过数组前面的均值来计算填充 null 值,这时候你就需要用 UDF 自己写处理逻辑来实现了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
scala> val array = Seq("1 2 3 4 5").toDF("id_string").withColumn("id_array", split($"id_string", " "))
array: org.apache.spark.sql.DataFrame = [id_string: string, id_array: array<string>]

scala> array.show()
+---------+---------------+
|id_string| id_array|
+---------+---------------+
|1 2 3 4 5|[1, 2, 3, 4, 5]|
+---------+---------------+

scala> array.withColumn("head", element_at($"id_array", 1)).show()
+---------+---------------+----+
|id_string| id_array|head|
+---------+---------------+----+
|1 2 3 4 5|[1, 2, 3, 4, 5]| 1|
+---------+---------------+----+

scala> array.withColumn("array_contains", array_contains($"id_array", "1")).show()
+---------+---------------+--------------+
|id_string| id_array|array_contains|
+---------+---------------+--------------+
|1 2 3 4 5|[1, 2, 3, 4, 5]| true|
+---------+---------------+--------------+

scala> array.withColumn("array_join", array_join($"id_array", ",")).show()
+---------+---------------+----------+
|id_string| id_array|array_join|
+---------+---------------+----------+
|1 2 3 4 5|[1, 2, 3, 4, 5]| 1,2,3,4,5|
+---------+---------------+----------+

scala> array.select(explode($"id_array")).show()
+---+
|col|
+---+
| 1|
| 2|
| 3|
| 4|
| 5|
+---+
scala> val df = spark.sql("select arrays_zip(array(1,2,3),array('4','5')) as array_zip")
df: org.apache.spark.sql.DataFrame = [array_zip: array<struct<0:int,1:string>>]

scala> df.show(false)
+----------------------+
|array_zip |
+----------------------+
|[[1, 4], [2, 5], [3,]]|
+----------------------+

处理 Map

Method Description
map_keys 返回 map 的 key
map_values 返回 map 的 value
map_from_arrays 接收两个数组,第一个数组为 key 数组,第二个数组为 value 数组,key 数组不允许存在空值。
getField / getItem 根据 key 取值

不同于结构体,map 里面的数据类型要求是一致的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scala> val map = Seq(Map(1 -> "1"), Map(2 -> "2")).toDF("map")
map: org.apache.spark.sql.DataFrame = [map: map<int,string>]

scala> map.show()
+--------+
| map|
+--------+
|[1 -> 1]|
|[2 -> 2]|
+--------+
scala> map.withColumn("getField", $"map".getItem(1)).show()
+--------+--------+
| map|getField|
+--------+--------+
|[1 -> 1]| 1|
|[2 -> 2]| null|
+--------+--------+

处理 struct

类似于 C++ 中的结构体。在 Scala 中可以用 case class  实现。取出数据可以用 getField。如果你想构造自己的结构体,可以直接使用 struct 函数,接收多列组合成一列,组合列的类型是结构体。如果你想展开结构体,可以直接在 select 中使用 * 通配符。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scala> val simple_struct = df.select(explode($"array_zip"))
simple_struct: org.apache.spark.sql.DataFrame = [col: struct<0: int, 1: string>]

scala> simple_struct.select("col.*").show()
+---+----+
| 0| 1|
+---+----+
| 1| 4|
| 2| 5|
| 3|null|
+---+----+
scala> spark.sql("select struct(1, 2, 3, 4, 5)").show()
+---------------------------------------------------------+
|named_struct(col1, 1, col2, 2, col3, 3, col4, 4, col5, 5)|
+---------------------------------------------------------+
| [1, 2, 3, 4, 5]|
+---------------------------------------------------------+

处理 Json

Spark 在读取 json 数据格式的时候会自动解析出各列。有时候你可能需要在数据中生成 json 字符串。这里会用到 to_json。 

聚合计算

聚合计算主要是 sum,min,max,avg,countDistinct 等等,常和 groupBy,agg 等结合使用。在后面的 Aggregate 中详细讲

其他

这里的其他函数指的是  org.apache.spark.sql.functions 中 @group 为 normal_funcs 的函数。介绍一下常用的。

Method Description
lit 生成一个 Column,该 Column 的值是不变的
typedLit 生成一个 Column,该 Column 的值是不变的,和上面的区别是支持 Scala 中的 Seq,Map,List 类型
struct 生成结构体
when 类似于 sql 中的 when,和 otherwise 结合使用

tip:对于 lit,我们在指定 track 的类型。对于 typedLit,我们可以用于指定生成一个空的 properties。对与 struct,我们用来选取指定字段构造 properties。这里直接拿一段 Spark SQL 的代码举例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
pvInfoDF
.select(cols.map(col(_)): _*)
.filter(adjust_valid)
.filter(delimiter_valid)
.filter(!col("$ip").contains(","))
.filter($"channel_id" isin (channel: _*))
.na.replace(cols, Map("-" -> ""))
.withColumn("original_id", UDF.getUUID($"original_id"))
.withColumn("distinct_id", when($"distinct_id".notEqual(""), $"distinct_id").otherwise($"original_id"))
.withColumn("hour", UDF.intUDF($"hour"))
.withColumn("type", lit("track"))
.withColumn("time_free", lit(true))
.withColumn("project", lit(s"$project"))
.withColumn("event", lit("$pageview"))
.withColumn("$screen_width", UDF.getScreen($"screen", lit(0)))
.withColumn("$screen_height", UDF.getScreen($"screen", lit(1)))
.withColumn("time", unix_timestamp($"time", "yyyy-MM-dd HH:mm:ss"))
.transform(UDF.transformInt(int_cols))
使用搜索:谷歌必应百度