Spark小知识点

前言

工作中碰到的一些 Spark 的问题

spark load impala

采用 jdbc format,不过需要注意,ImpalaJDBC41.jar 的版本。在 Maven 中央仓库中的 ImpalaJDBC41 的依赖都是不管用的。因此需要自己去下载并放入 libs 目录 下载地址。如果采用 sbt 打包管理的话,在 build.sbt 中加入以下内容用于标识本地依赖 jar 的路径。

1
2
unmanagedBase := baseDirectory.value / "libs"

连接 impala 代码示例,这里加了一些参数,numPartitions,partitionColumn,lowerBound,upperBound 用于调节 load 数据时的并发度。不加这些参数的时候。默认只有一个 task 在加载数据,大数据量的情况下相对较慢。建议 jdbc format 的数据源都调节一下这些参数,以达到一个理想的并发度。

1
2
3
4
5
6
7
8
9
10
11
spark.read
.format("jdbc")
.option("url", s"$impala_url")
.option("driver", "com.cloudera.impala.jdbc41.Driver")
.option("dbtable", "users")
.option("numPartitions", 32)
.option("partitionColumn", "id")
.option("lowerBound", "1")
.option("upperBound", "100000")
.load()

spark 非等值 join 的执行计划

默认使用 BNLJ,及 BrocastNestLoopJoin,执行效率会非常低。如果查询条件比较少,此时可以想办法转换为等值连接来优化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
scala> array.join(b, array_contains($"id_array", $"id")).explain(true)
== Parsed Logical Plan ==
Join Inner, array_contains(id_array#7, id#19)
:- Project [id_string#5, split(id_string#5, ) AS id_array#7]
: +- Project [value#3 AS id_string#5]
: +- LocalRelation [value#3]
+- Project [cast(id#0L as string) AS id#19]
+- Range (0, 5, step=1, splits=Some(8))

== Analyzed Logical Plan ==
id_string: string, id_array: array<string>, id: string
Join Inner, array_contains(id_array#7, id#19)
:- Project [id_string#5, split(id_string#5, ) AS id_array#7]
: +- Project [value#3 AS id_string#5]
: +- LocalRelation [value#3]
+- Project [cast(id#0L as string) AS id#19]
+- Range (0, 5, step=1, splits=Some(8))

== Optimized Logical Plan ==
Join Inner, array_contains(id_array#7, id#19)
:- LocalRelation [id_string#5, id_array#7]
+- Project [cast(id#0L as string) AS id#19]
+- Range (0, 5, step=1, splits=Some(8))

== Physical Plan ==
BroadcastNestedLoopJoin BuildLeft, Inner, array_contains(id_array#7, id#19)
:- BroadcastExchange IdentityBroadcastMode
: +- LocalTableScan [id_string#5, id_array#7]
+- *(1) Project [cast(id#0L as string) AS id#19]
+- *(1) Range (0, 5, step=1, splits=8)

Spark SQL 处理多层嵌套数据

写代码要以尽量简洁的形式表达出想要的意思,避免代码的冗余。如果要取多层嵌套数据中的某些字段做处理,比如像下面的的 schema,是大量的 getFiled 还是采用其他的办法呢。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
root
|-- ak: string (nullable = true)
|-- pl: string (nullable = true)
|-- usr: struct (nullable = true)
| |-- did: string (nullable = true)
|-- ut: string (nullable = true)
|-- ip: string (nullable = true)
|-- st: long (nullable = true)
|-- ua: string (nullable = true)
|-- data: struct (nullable = true)
| |-- dt: string (nullable = true)
| |-- pr: struct (nullable = true)
| | |-- $an: string (nullable = true)
| | |-- $br: string (nullable = true)
| | |-- $cn: string (nullable = true)
| | |-- $cr: string (nullable = true)
| | |-- $ct: long (nullable = true)
| | |-- $cuid: string (nullable = true)
| | |-- $dru: long (nullable = true)
| | |-- $dv: string (nullable = true)
| | |-- $eid: string (nullable = true)
| | |-- $imei: string (nullable = true)
| | |-- $jail: long (nullable = true)
| | |-- $lang: string (nullable = true)
| | |-- $mkr: string (nullable = true)
| | |-- $mnet: string (nullable = true)
| | |-- $net: string (nullable = true)
| | |-- $os: string (nullable = true)
| | |-- $ov: string (nullable = true)
| | |-- $private: long (nullable = true)
| | |-- $ps: string (nullable = true)
| | |-- $rs: string (nullable = true)
| | |-- $sc: long (nullable = true)
| | |-- $sid: long (nullable = true)
| | |-- $ss_name: string (nullable = true)
| | |-- $tz: long (nullable = true)
| | |-- $vn: string (nullable = true)

在字段具有相似特征的情况下可以用以下方法处理,得到所有列的集合在做后续处理。

1
2
3
4
5
6
import org.apache.spark.sql.functions._

val pr = $"data".getFiled("pr")
val prDF = testDataDF.select("data.pr.*")
val prColumns = prDF.columns.filter(_.startsWith("$"))
val custom_columns = prColumns.map(x => pr.getField(x).alias(x)).toSeq

Spark UDF WrappedArray

UDF 在接收 Array 作为参数的时候,类型其实是 WrappedArray。

Spark UDF 接受常量作为参数

使用 lit 或者 typedlit 包装一下,成为一个常量 Column 在处理。

1
2
3
4
5
6
7
8
9
val getScreen = udf((t: String, index: Int) =>
t match {
case t if !t.contains("*") => ""
case t if t.split("\\*").length.equals(2) => t.split("\\*")(index)
case _ => ""
}

)
getScreen($"Screen", lit(0))

Spark SQL lit 与 typedLit

typedLit 用于包装 scala 的数据结构类型,比如 Seq,Map 等。
lit 用于包装简单数据类型,比如 int,string 等。

cast 强转类型的坑

针对数据质量较差的数据做类型转换时,建议使用 UDF 。因为 cast 转换失败就为 null,容易丢数据。可以查看 org.apache.spark.sql.catalyst.expressions.Cast.scala

spark Ambiguous reference to fields StructField

spark 默认大小写不敏感,这就导致如果你的数据中存在 name 和 Name,就会认为是相同的列,从而抛出此错误,此时可以开启大小写敏感

1
2
3
4
5
val spark = SparkSession.builder()
.master("local[*]")
.appName("zhugeProcessor")
.config("spark.sql.caseSensitive", true)
.getOrCreate()
使用搜索:谷歌必应百度