Spark joinwith

Spark joinwith DEFAULT

The art of joining in Spark

First of all, let’s see what happens if we decide to broadcast a table during a join. Note that the Spark execution plan could be automatically translated into a broadcast (without us forcing it), although this can vary depending on the Spark version and on how it is configured.

We will be joining two tables: fact_table and dimension_table. First of all, let’s see how big they are:

fact_table.count // #rows 3,301,889,672
dimension_table.count // #rows 3,922,556

In this case, the data are not skewed and the partitioning is all right — you’ll have to trust my word. Note that the dimension_table is not exactly “small” (although size is not information that we can infer by only observing the number of rows, we’d rather prefer to look at the file size on HDFS).

By the way, let’s try to join the tables without broadcasting to see how long it takes:

Output: Elapsed time: 215.115751969s

Now, what happens if we broadcast the dimension table? By a simple addition to the join operation, i.e. replace the variable dimension_table with broadcast(dimension_table), we can force Spark to handle our tables using a broadcast:

Output: Elapsed time: 61.135962017s

The broadcast made the code run 71% faster! Again, read this outcome having in mind what I wrote earlier about absolute execution time.

Is broadcasting always good for performance? Not at all! If you try to execute the snippets above giving more resources to the cluster (in particular more executors), the non-broadcast version will run faster than the broadcast one! One reason why this happens is because the broadcasting operation is itself quite expensive (it means that all the nodes need to receive a copy of the table), so it’s not surprising that if we increase the amount of executors that need to receive the table, we increase the broadcasting cost, which suddenly may become higher than the join cost itself.

It’s important to remember that when we broadcast, we are hitting on the memory available on each Executor node (here’s a brief article about Spark memory). This can easily lead to Out Of Memory exceptions or make your code unstable: imagine to broadcast a medium-sized table. You run the code, everything is fine and super fast. A couple of months later you suddenly find out that your code breaks, OOM. After some hours of debugging, you may discover that the medium-sized table you broadcast to make your code fast is not that “medium” anymore. Takeaway, if you broadcast a medium-sized table, you need to be sure it will remain medium-sized in the future!

Skewness is a common issue when you want to join two tables. We say a join is skewed when the join key is not uniformly distributed in the dataset. During a skewed join, Spark cannot perform operations in parallel, since the join’s load will be distributed unevenly across the Executors.

Let’s take our old fact_table and a new dimension:

fact_table.count // #rows 3,301,889,672
dimension_table2.count // #rows 52

Great our dimension_table2 is very small and we can decide to broadcast it straightforward! Let’s join and see what happens:

Output: Elapsed time: 329.991336182s

Now, observe on the SparkUI what happened to the tasks during the execution:

As you can see in the image above, one of the tasks took much more time to complete compared to the others. This is clearly an indication of skewness in the data — and this conjecture would be easily verifiable by looking at the distribution of the join key in the fact_table.

To make things work, we need to find a way to redistribute the workload to improve our join’s performance. I want to propose two ideas:

  • Option 1: we can try to repartition our fact table, in order to distribute the effort in the nodes
  • Option 2: we can artificially create a repartitioning key (key salting)

Option 1: Repartition the table

We can select a column that is uniformly distributed and repartition our table accordingly; if we combine this with broadcasting, we should have achieved the goal of redistributing the workload:

Output: Elapsed time: 106.708180448s

Note that we want to choose a column also looking at the cardinality (e.g. I wouldn’t choose a key with “too high” or “too low” cardinality, I let you quantify those terms).

Important note: if you cannot broadcast the dimension table and you still want to use this strategy, the left side and the right side of the join need to be repartitioned using the same partitioner! Let’s see what happens if we don’t.

Consider the following snippet and let’s look at the DAG on the Spark UI

As you can see, it this case my repartitioning is basically ignored: after it is performed, spark still decides to re-exchange the data using the default configuration. Let’s look at how the DAG changes if we use the same partitioner:

Option 2: Key salting

Another strategy is to forge a new join key!

We still want to force spark to do a uniform repartitioning of the big table; in this case, we can also combine Key salting with broadcasting, since the dimension table is very small.

The join key of the left table is stored into the field dimension_2_key, which is not evenly distributed. The first step is to make this field more “uniform”. An easy way to do that is to randomly append a number between 0 and N to the join key, e.g.:

Trick to craft a uniformly distributed repartition key
Trick to craft a uniformly distributed repartition key

As you can see we modified the dimension_2_key which is now “uniformly” distributed, we are on the right path to a better workload on the cluster. We have modified the join key, so we need to do the same operation on the dimension table. To do so, we create for each “new” key value in the fact table, a corresponding value in the dimension: for each value of the id in the dimension table we generate N values in which we append to the old ids the numbers in the [0,N] interval. Let’s make this clearer with the following image:

Exploding dimension table for Apache Spark repartitioning
Exploding dimension table for Apache Spark repartitioning

At this point, we can join the two datasets using the “new” salted key.

This simple trick will improve the degree of parallelism of the DAG execution. Of course, we have increased the number of rows of the dimension table (in the example N=4). A higher N (e.g. 100 or 1000) will result in a more uniform distribution of the key in the fact, but in a higher number of rows for the dimension table!

Let’s code this idea.

First, we need to append the salt to the keys in the fact table. This is a surprisingly challenging task, or, better, it’s a decision point:

  • We can use a UDF: easy, but can be slow because Catalyst is not very happy with UDFs!
  • We can use the “rand” SQL operator
  • We can use the monotonically_increasing_id function

Just for fun, let’s go with this third option (it also appear to be a bit faster)

Now we need to “explode” the dimension table with the new key. The fastest way that I have found to do so is to create a dummy dataset containing the numbers between 0 and N (in the example between 0 and 1000) and cross-join the dimension table with this “dummy” dataset:

Finally, we can join the tables using the salted key and see what happens!

Output: Elapsed time: 182.160146932s

Again, execution time is not really a good indicator to understand our improvement, so let’s look at the event timeline:

As you can see we greatly increased the parallelism.

In this case, a simple repartitioning plus broadcast, worked better than crafting a new key. Note that this difference is not due to the join, but to the random number generation during the fact table lift.

  • Joins can be difficult to tune since performance are bound to both the code and the Spark configuration (number of executors, memory, etc.)
  • Some of the most common issues with joins are all-to-all communication between the nodes and data skewness
  • We can avoid all-to-all communication using broadcasting of small tables or of medium-sized tables if we have enough memory in the cluster
  • Broadcasting is not always beneficial to performance: we need to have an eye for the Spark config
  • Broadcasting can make the code unstable if broadcast tables grow through time
  • Skewness leads to an uneven workload on the cluster, resulting in a very small subset of tasks to take much longer than the average
  • There are multiple ways to fight skewness, one is repartitioning.
  • We can create our own repartitioning key, e.g. using the key salting technique
Sours: https://towardsdatascience.com/the-art-of-joining-in-spark-dcbd33d693c

classDataset[T] extends Serializable

Instance Constructors

  1. newDataset(sqlContext: SQLContext, logicalPlan: LogicalPlan, encoder: Encoder[T])
  2. newDataset(sparkSession: SparkSession, logicalPlan: LogicalPlan, encoder: Encoder[T])

Value Members

  1. final def!=(arg0: Any): Boolean
  2. final def##(): Int
  3. final def==(arg0: Any): Boolean
  4. defagg(expr: Column, exprs: Column*): DataFrame
  5. defagg(exprs: Map[String, String]): DataFrame
  6. defagg(exprs: Map[String, String]): DataFrame
  7. defagg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame
  8. defalias(alias: Symbol): Dataset[T]
  9. defalias(alias: String): Dataset[T]
  10. defapply(colName: String): Column
  11. defas(alias: Symbol): Dataset[T]
  12. defas(alias: String): Dataset[T]
  13. defas[U](implicit arg0: Encoder[U]): Dataset[U]
  14. final defasInstanceOf[T0]: T0
  15. defcache(): Dataset.this.type
  16. defcheckpoint(eager: Boolean): Dataset[T]
  17. defcheckpoint(): Dataset[T]
  18. defclone(): AnyRef
  19. defcoalesce(numPartitions: Int): Dataset[T]
  20. defcol(colName: String): Column
  21. defcolRegex(colName: String): Column
  22. defcollect(): Array[T]
  23. defcollectAsList(): List[T]
  24. defcolumns: Array[String]
  25. defcount(): Long
  26. defcreateGlobalTempView(viewName: String): Unit
  27. defcreateOrReplaceGlobalTempView(viewName: String): Unit
  28. defcreateOrReplaceTempView(viewName: String): Unit
  29. defcreateTempView(viewName: String): Unit
  30. defcrossJoin(right: Dataset[_]): DataFrame
  31. defcube(col1: String, cols: String*): RelationalGroupedDataset
  32. defcube(cols: Column*): RelationalGroupedDataset
  33. defdescribe(cols: String*): DataFrame
  34. defdistinct(): Dataset[T]
  35. defdrop(col: Column): DataFrame
  36. defdrop(colNames: String*): DataFrame
  37. defdrop(colName: String): DataFrame
  38. defdropDuplicates(col1: String, cols: String*): Dataset[T]
  39. defdropDuplicates(colNames: Array[String]): Dataset[T]
  40. defdropDuplicates(colNames: Seq[String]): Dataset[T]
  41. defdropDuplicates(): Dataset[T]
  42. defdtypes: Array[(String, String)]
  43. valencoder: Encoder[T]
  44. final defeq(arg0: AnyRef): Boolean
  45. defequals(arg0: Any): Boolean
  46. defexcept(other: Dataset[T]): Dataset[T]
  47. defexceptAll(other: Dataset[T]): Dataset[T]
  48. defexplain(): Unit
  49. defexplain(extended: Boolean): Unit
  50. defexplain(mode: String): Unit
  51. deffilter(func: FilterFunction[T]): Dataset[T]
  52. deffilter(func: (T) ⇒ Boolean): Dataset[T]
  53. deffilter(conditionExpr: String): Dataset[T]
  54. deffilter(condition: Column): Dataset[T]
  55. deffinalize(): Unit
  56. deffirst(): T
  57. defflatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U]
  58. defflatMap[U](func: (T) ⇒ TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U]
  59. defforeach(func: ForeachFunction[T]): Unit
  60. defforeach(f: (T) ⇒ Unit): Unit
  61. defforeachPartition(func: ForeachPartitionFunction[T]): Unit
  62. defforeachPartition(f: (Iterator[T]) ⇒ Unit): Unit
  63. final defgetClass(): Class[_]
  64. defgroupBy(col1: String, cols: String*): RelationalGroupedDataset
  65. defgroupBy(cols: Column*): RelationalGroupedDataset
  66. defgroupByKey[K](func: MapFunction[T, K], encoder: Encoder[K]): KeyValueGroupedDataset[K, T]
  67. defgroupByKey[K](func: (T) ⇒ K)(implicit arg0: Encoder[K]): KeyValueGroupedDataset[K, T]
  68. defhashCode(): Int
  69. defhead(): T
  70. defhead(n: Int): Array[T]
  71. defhint(name: String, parameters: Any*): Dataset[T]
  72. definputFiles: Array[String]
  73. defintersect(other: Dataset[T]): Dataset[T]
  74. defintersectAll(other: Dataset[T]): Dataset[T]
  75. defisEmpty: Boolean
  76. final defisInstanceOf[T0]: Boolean
  77. defisLocal: Boolean
  78. defisStreaming: Boolean
  79. defjavaRDD: JavaRDD[T]
  80. defjoin(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
  81. defjoin(right: Dataset[_], joinExprs: Column): DataFrame
  82. defjoin(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame
  83. defjoin(right: Dataset[_], usingColumns: Seq[String]): DataFrame
  84. defjoin(right: Dataset[_], usingColumn: String): DataFrame
  85. defjoin(right: Dataset[_]): DataFrame
  86. defjoinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)]
  87. defjoinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)]
  88. deflimit(n: Int): Dataset[T]
  89. deflocalCheckpoint(eager: Boolean): Dataset[T]
  90. deflocalCheckpoint(): Dataset[T]
  91. defmap[U](func: MapFunction[T, U], encoder: Encoder[U]): Dataset[U]
  92. defmap[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U]
  93. defmapPartitions[U](f: MapPartitionsFunction[T, U], encoder: Encoder[U]): Dataset[U]
  94. defmapPartitions[U](func: (Iterator[T]) ⇒ Iterator[U])(implicit arg0: Encoder[U]): Dataset[U]
  95. defna: DataFrameNaFunctions
  96. final defne(arg0: AnyRef): Boolean
  97. final defnotify(): Unit
  98. final defnotifyAll(): Unit
  99. defobserve(name: String, expr: Column, exprs: Column*): Dataset[T]
  100. deforderBy(sortExprs: Column*): Dataset[T]
  101. deforderBy(sortCol: String, sortCols: String*): Dataset[T]
  102. defpersist(newLevel: StorageLevel): Dataset.this.type
  103. defpersist(): Dataset.this.type
  104. defprintSchema(level: Int): Unit
  105. defprintSchema(): Unit
  106. valqueryExecution: QueryExecution
  107. defrandomSplit(weights: Array[Double]): Array[Dataset[T]]
  108. defrandomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]
  109. defrandomSplitAsList(weights: Array[Double], seed: Long): List[Dataset[T]]
  110. lazy valrdd: RDD[T]
  111. defreduce(func: ReduceFunction[T]): T
  112. defreduce(func: (T, T) ⇒ T): T
  113. defrepartition(partitionExprs: Column*): Dataset[T]
  114. defrepartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
  115. defrepartition(numPartitions: Int): Dataset[T]
  116. defrepartitionByRange(partitionExprs: Column*): Dataset[T]
  117. defrepartitionByRange(numPartitions: Int, partitionExprs: Column*)
Sours: https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html
  1. Nes link amiibo
  2. Petland cleveland
  3. Iowa 511

Chapter 4. Joins (SQL and Core)

Joining data is an important part of many of our pipelines, and both Spark Core and SQL support the same fundamental types of joins. While joins are very common and powerful, they warrant special performance consideration as they may require large network transfers or even create datasets beyond our capability to handle.1 In core Spark it can be more important to think about the ordering of operations, since the DAG optimizer, unlike the SQL optimizer, isn’t able to re-order or push down filters.

In this section we will go over the RDD type joins. Joins in general are expensive since they require that corresponding keys from each RDD are located at the same partition so that they can be combined locally. If the RDDs do not have known partitioners, they will need to be shuffled so that both RDDs share a partitioner, and data with the same keys lives in the same partitions, as shown in Figure 4-1. If they have the same partitioner, the data may be colocated, as in Figure 4-3, so as to avoid network transfer. Regardless of whether the partitioners are the same, if one (or both) of the RDDs have a known partitioner only a narrow dependency is created, as in Figure 4-2. As with most key/value operations, the cost of the join increases with the number of keys and the distance the records have to travel in order to get to their correct partition.

Join, full shuffle
Figure 4-1. Shuffle join
Join one partitioner known
Figure 4-2. Both known partitioner join
Colocated join
Figure 4-3. Colocated join
Tip

Two RDDs will be colocated if they have the same partitioner and were shuffled as part of the same action.

Tip

Core Spark joins are implemented using the function. We discuss in “Co-Grouping”.

Choosing a Join Type

The default join operation in Spark includes only values for keys present in both RDDs, and in the case of multiple values per key, provides all permutations of the key/value pair. The best scenario for a standard join is when both RDDs contain the same set of distinct keys. With duplicate keys, the size of the data may expand dramatically causing performance issues, and if one key is not present in both RDDs you will lose that row of data. Here are a few guidelines:

  • When both RDDs have duplicate keys, the join can cause the size of the data to expand dramatically. It may be better to perform a or operation to reduce the key space or to use to handle duplicate keys instead of producing the full cross product. By using smart partitioning during the combine step, it is possible to prevent a second shuffle in the join (we will discuss this in detail later).

  • If keys are not present in both RDDs you risk losing your data unexpectedly. It can be safer to use an outer join, so that you are guaranteed to keep all the data in either the left or the right RDD, then filter the data after the join.

  • If one RDD has some easy-to-define subset of the keys, in the other you may be better off filtering or reducing before the join to avoid a big shuffle of data, which you will ultimately throw away anyway.

Tip

Join is one of the most expensive operations you will commonly use in Spark, so it is worth doing what you can to shrink your data before performing a join.

For example, suppose you have one RDD with some data in the form and another RDD with , and you want to send each panda some mail with her best score. You could join the RDDs on and then compute the best score for each , as shown in Example 4-1.

Example 4-1. Basic RDD join
, , , ,

However, this is probably not as fast as first reducing the score data, so that the first dataset contains only one row for each panda with her best score, and then joining that data with the address data (as shown in Example 4-2).

Example 4-2. Pre-filter before join
, , , ,

If each Panda had 1,000 different scores then the size of the shuffle we did in the first approach was 1,000 times the size of the shuffle we did with this approach!

If we wanted to we could also perform a left outer join to keep all keys for processing even those missing in the right RDD by using in place of , as in Example 4-3. Spark also has and depending on which records we wish to keep. Any missing values are and present values are .

Example 4-3. Basic RDD left outer join
, , , ,

Choosing an Execution Plan

In order to join data, Spark needs the data that is to be joined (i.e., the data based on each key) to live on the same partition. The default implementation of a join in Spark is a shuffled hash join. The shuffled hash join ensures that data on each partition will contain the same keys by partitioning the second dataset with the same default partitioner as the first, so that the keys with the same hash value from both datasets are in the same partition. While this approach always works, it can be more expensive than necessary because it requires a shuffle. The shuffle can be avoided if:

  1. Both RDDs have a known partitioner.

  2. One of the datasets is small enough to fit in memory, in which case we can do a broadcast hash join (we will explain what this is later).

Note that if the RDDs are colocated the network transfer can be avoided, along with the shuffle.

Speeding up joins by assigning a known partitioner

If you have to do an operation before the join that requires a shuffle, such as or , you can prevent the shuffle by adding a hash partitioner with the same number of partitions as an explicit argument to the first operation before the join. You could make the example in the previous section even faster, by using the partitioner for the address data as an argument for the step, as in Example 4-4 and Figure 4-4.

Example 4-4. Known partitioner join
, , , ,
Tip

If the RDDs sharing the same partitioner are materialized by the same action, they will end up being co-located (which can even reduce network traffic).

Tip

(Almost) always persist after repartitioning.

Join both partitioners known
Figure 4-4. Both known partitioner join

Speeding up joins using a broadcast hash join

A broadcast hash join pushes one of the RDDs (the smaller one) to each of the worker nodes. Then it does a map-side combine with each partition of the larger RDD. If one of your RDDs can fit in memory or can be made to fit in memory it is always beneficial to do a broadcast hash join, since it doesn’t require a shuffle. Sometimes (but not always) Spark SQL will be smart enough to configure the broadcast join itself; in Spark SQL this is controlled with and . This is illustrated in Figure 4-5.

Broadcast Hash Join
Figure 4-5. Broadcast hash join

Spark Core does not have an implementation of the broadcast hash join. Instead, we can manually implement a version of the broadcast hash join by collecting the smaller RDD to the driver as a map, then broadcasting the result, and using to combine the elements.

Example 4-5 is a general function that could be used to join a larger and smaller RDD. Its behavior mirrors the default “join” operation in Spark. We exclude elements whose keys do not appear in both RDDs.

Example 4-5. Manual broadcast hash join
, , , , , , ,

Partial manual broadcast hash join

Sometimes not all of our smaller RDD will fit into memory, but some keys are so overrepresented in the large dataset that you want to broadcast just the most common keys. This is especially useful if one key is so large that it can’t fit on a single partition. In this case you can use 2 on the large RDD to get an approximate idea of which keys would most benefit from a broadcast. You then filter the smaller RDD for only these keys, collecting the result locally in a HashMap. Using you can broadcast the HashMap so that each worker only has one copy and manually perform the join against the HashMap. Using the same HashMap you can then filter your large RDD down to not include the large number of duplicate keys and perform your standard join, unioning it with the result of your manual join. This approach is quite convoluted but may allow you to handle highly skewed data you couldn’t otherwise process.

Spark SQL supports the same basic join types as core Spark, but the optimizer is able to do more of the heavy lifting for you—although you also give up some of your control. For example, Spark SQL can sometimes push down or reorder operations to make your joins more efficient. On the other hand, you don’t control the partitioner for or , so you can’t manually avoid shuffles as you did with core Spark joins.

DataFrame Joins

Joining data between is one of the most common multi- transformations. The standard SQL join types are all supported and can be specified as the in when performing a join. As with joins between RDDs, joining with nonunique keys will result in the cross product (so if the left table has R1 and R2 with key1 and the right table has R3 and R5 with key1 you will get (R1, R3), (R1, R5), (R2, R3), (R2, R5)) in the output. While we explore Spark SQL joins we will use two example tables of pandas, Tables 4-1 and 4-2.

Warning

While self joins are supported, you must alias the fields you are interested in to different names beforehand, so they can be accessed.

NameSize

Happy

1.0

Sad

0.9

Happy

1.5

Coffee

3.0

NameZip

Happy

94110

Happy

94103

Coffee

10504

Tea

07012

Spark’s supported join types are “inner,” “left_outer” (aliased as “outer”), “left_anti,” “right_outer,” “full_outer,” and “left_semi.”3 With the exception of “left_semi” these join types all join the two tables, but they behave differently when handling rows that do not have keys in both tables.

The “inner” join is both the default and likely what you think of when you think of joining tables. It requires that the key be present in both tables, or the result is dropped as shown in Example 4-6 and Table 4-3.

Example 4-6. Simple inner join
NameSizeNameZip

Coffee

3.0

Coffee

10504

Happy

1.5

Happy

94110

Happy

1.5

Happy

94103

Happy

1.0

Happy

94110

Happy

1.0

Happy

94103

Left outer joins will produce a table with all of the keys from the left table, and any rows without matching keys in the right table will have null values in the fields that would be populated by the right table. Right outer joins are the same, but with the requirements reversed. A sample left outer join is in Example 4-7, and the result is shown in Table 4-4.

Example 4-7. Left outer join
NameSizeNameZip

Sad

0.9

null

null

Coffee

3.0

Coffee

10504

Happy

1.0

Happy

94110

Happy

1.0

Happy

94103

Happy

1.5

Happy

94110

Happy

1.5

Happy

94103

A sample right outer join is in Example 4-8, and the result is shown in Table 4-5.

Example 4-8. Right outer join
NameSizeNameZip

Coffee

3.0

Coffee

10504

Happy

1.0

Happy

94110

Happy

1.0

Happy

94103

Happy

1.5

Happy

94110

Happy

1.5

Happy

94103

null

null

Tea

07012

To keep all records from both tables you can use the full outer join, which results in Table 4-6.

NameSizeNameZip

Sad

0.9

null

null

Coffee

3.0

Coffee

10504

Happy

1.0

Happy

94110

Happy

1.0

Happy

94103

Happy

1.5

Happy

94110

Happy

1.5

Happy

94103

null

null

Tea

07012

Left semi joins (as in Example 4-9 and Table 4-7) and left anti joins (as in Table 4-8) are the only kinds of joins that only have values from the left table. A left semi join is the same as filtering the left table for only rows with keys present in the right table. The left anti join also only returns data from the left table, but instead only returns records that are not present in the right table.

Example 4-9. Left semi join
NameSize

Coffee

3.0

Happy

1.0

Happy

1.5

NameSize

Sad

0.9

Self joins

Self joins are supported on , but we end up with duplicated columns names. So that you can access the results, you need to alias the to different names—otherwise you will be unable to select the columns due to name collision (see Example 4-10). Once you’ve aliased each , in the result you can access the individual columns for each with .

Example 4-10. Self join

Broadcast hash joins

In Spark SQL you can see the type of join being performed by calling . As with core Spark, if one of the tables is much smaller than the other you may want a broadcast hash join. You can hint to Spark SQL that a given DF should be broadcast for join by calling on the before joining it (e.g., ). Spark also automatically uses the to determine if a table should be broadcast.

Dataset Joins

Joining is done with , and this behaves similarly to a regular relational join, except the result is a tuple of the different record types as shown in Example 4-11. This is somewhat more awkward to work with after the join, but also does make self joins, as shown in Example 4-12, much easier, as you don’t need to alias the columns first.

Example 4-11. Joining two Datasets
,
Example 4-12. Self join a Dataset
,
Note

Using a self join and a , you can produce the cartesian product of your , which can be useful but also illustrates how joins (especially self joins) can easily result in unworkable data sizes.

As with you can specify the type of join desired (e.g., inner, left_outer, right_outer, left_semi), changing how records present only in one are handled. Missing records are represented by null values, so be careful.

Now that you have explored joins, it’s time to focus on transformations and the performance considerations associated with them.

1 As the saying goes, the cross product of big data and big data is an out-of-memory exception.

2 If the number of distinct keys is too high, you can also use , sort on the value, and take the top k.

3 The quotes are optional and can be left out. We use them in our examples because we think it is easier to read with the quotes present.

Sours: https://www.oreilly.com/library/view/high-performance-spark/9781491943199/ch04.html
[100% Spark Interview Question] Broadcast Join with Example - PART 2
Modifier and TypeMethod and Description

Aggregates on the entire Dataset without groups.

Aggregates on the entire Dataset without groups.

(Scala-specific) Aggregates on the entire Dataset without groups.

(Java-specific) Aggregates on the entire Dataset without groups.

(Scala-specific) Aggregates on the entire Dataset without groups.

Returns a new Dataset with an alias set.

(Scala-specific) Returns a new Dataset with an alias set.

Selects column based on the column name and return it as a .

:: Experimental :: Returns a new Dataset where each record has been mapped on to the specified type.

Returns a new Dataset with an alias set.

(Scala-specific) Returns a new Dataset with an alias set.

Persist this Dataset with the default storage level ().

Eagerly checkpoint a Dataset and return the new Dataset.

Returns a checkpointed version of this Dataset.

 

Returns a new Dataset that has exactly partitions.

Selects column based on the column name and return it as a .

Returns an array that contains all of s in this Dataset.

Returns a Java list that contains all of s in this Dataset.

Returns all column names as an array.

Returns the number of rows in the Dataset.

Creates a global temporary view using the given name.

Creates a local temporary view using the given name.

Creates a local temporary view using the given name.

Explicit cartesian join with another .

Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them.

Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them.

Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them.

Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them.

Computes statistics for numeric and string columns, including count, mean, stddev, min, and max.

Computes statistics for numeric and string columns, including count, mean, stddev, min, and max.

Returns a new Dataset that contains only the unique rows from this Dataset.

Returns a new Dataset with a column dropped.

Returns a new Dataset with columns dropped.

Returns a new Dataset with columns dropped.

Returns a new Dataset with a column dropped.

Returns a new Dataset that contains only the unique rows from this Dataset.

(Scala-specific) Returns a new Dataset with duplicate rows removed, considering only the subset of columns.

Returns a new Dataset with duplicate rows removed, considering only the subset of columns.

Returns a new with duplicate rows removed, considering only the subset of columns.

Returns a new with duplicate rows removed, considering only the subset of columns.

Returns all column names and their data types as an array.

Returns a new Dataset containing rows in this Dataset but not in another Dataset.

Prints the physical plan to the console for debugging purposes.

Prints the plans (logical and physical) to the console for debugging purposes.

Deprecated. 

use flatMap() or select() with functions.explode() instead. Since 2.0.0.

Deprecated. 

use flatMap() or select() with functions.explode() instead. Since 2.0.0.

Filters rows using the given condition.

:: Experimental :: (Java-specific) Returns a new Dataset that only contains elements where returns .

:: Experimental :: (Scala-specific) Returns a new Dataset that only contains elements where returns .

Filters rows using the given SQL expression.

Returns the first row.

:: Experimental :: (Java-specific) Returns a new Dataset by first applying a function to all elements of this Dataset, and then flattening the results.

:: Experimental :: (Scala-specific) Returns a new Dataset by first applying a function to all elements of this Dataset, and then flattening the results.

(Java-specific) Runs on each element of this Dataset.

Applies a function to all rows.

(Java-specific) Runs on each partition of this Dataset.

Applies a function to each partition of this Dataset.

Groups the Dataset using the specified columns, so we can run aggregation on them.

Groups the Dataset using the specified columns, so we can run aggregation on them.

Groups the Dataset using the specified columns, so that we can run aggregation on them.

Groups the Dataset using the specified columns, so that we can run aggregation on them.

:: Experimental :: (Scala-specific) Returns a where the data is grouped by the given key .

:: Experimental :: (Java-specific) Returns a where the data is grouped by the given key .

Returns the first row.

Returns the first rows.

Returns a best-effort snapshot of the files that compose this Dataset.

Returns a new Dataset containing rows only in both this Dataset and another Dataset.

Returns true if the and methods can be run locally (without any Spark executors).

Returns true if this Dataset contains one or more sources that continuously return data as it arrives.

Returns the content of the Dataset as a of s.

Join with another .

Inner join with another , using the given join expression.

Join with another , using the given join expression.

Inner equi-join with another using the given columns.

Equi-join with another using the given columns.

Inner equi-join with another using the given column.

:: Experimental :: Using inner equi-join to join this Dataset returning a for each pair where evaluates to true.

:: Experimental :: Joins this Dataset returning a for each pair where evaluates to true.

Returns a new Dataset by taking the first rows.

:: Experimental :: (Scala-specific) Returns a new Dataset that contains the result of applying to each element.

:: Experimental :: (Java-specific) Returns a new Dataset that contains the result of applying to each element.

:: Experimental :: (Scala-specific) Returns a new Dataset that contains the result of applying to each partition.

:: Experimental :: (Java-specific) Returns a new Dataset that contains the result of applying to each partition.

Returns a for working with missing data.

 

Returns a new Dataset sorted by the given expressions.

Returns a new Dataset sorted by the given expressions.

Returns a new Dataset sorted by the given expressions.

Returns a new Dataset sorted by the given expressions.

Persist this Dataset with the default storage level ().

Persist this Dataset with the given storage level.

Prints the schema to the console in a nice tree format.

 

Randomly splits this Dataset with the provided weights.

Randomly splits this Dataset with the provided weights.

Returns a Java list that contains randomly split Dataset with the provided weights.

Represents the content of the Dataset as an of .

:: Experimental :: (Scala-specific) Reduces the elements of this Dataset using the specified binary function.

:: Experimental :: (Java-specific) Reduces the elements of this Dataset using the specified binary function.

Deprecated. 

Use createOrReplaceTempView(viewName) instead. Since 2.0.0.

Returns a new Dataset partitioned by the given partitioning expressions, using as number of partitions.

Returns a new Dataset that has exactly partitions.

Returns a new Dataset partitioned by the given partitioning expressions into .

Returns a new Dataset partitioned by the given partitioning expressions into .

Returns a new Dataset partitioned by the given partitioning expressions, using as number of partitions.

Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them.

Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them.

Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them.

Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them.

Returns a new by sampling a fraction of rows, using a random seed.

Returns a new by sampling a fraction of rows, using a user-supplied seed.

Returns the schema of this Dataset.

Selects a set of column based expressions.

Selects a set of column based expressions.

Selects a set of columns.

Selects a set of columns.

:: Experimental :: Returns a new Dataset by computing the given expression for each element.

:: Experimental :: Returns a new Dataset by computing the given expressions for each element.

:: Experimental :: Returns a new Dataset by computing the given expressions for each element.

:: Experimental :: Returns a new Dataset by computing the given expressions for each element.

:: Experimental :: Returns a new Dataset by computing the given expressions for each element.

Selects a set of SQL expressions.

Selects a set of SQL expressions.

Displays the top 20 rows of Dataset in a tabular form.

Displays the top 20 rows of Dataset in a tabular form.

Displays the Dataset in a tabular form.

Displays the Dataset in a tabular form.

Displays the Dataset in a tabular form.

Returns a new Dataset sorted by the given expressions.

Returns a new Dataset sorted by the given expressions.

Returns a new Dataset sorted by the specified column, all in ascending order.

Returns a new Dataset sorted by the specified column, all in ascending order.

Returns a new Dataset with each partition sorted by the given expressions.

Returns a new Dataset with each partition sorted by the given expressions.

Returns a new Dataset with each partition sorted by the given expressions.

Returns a new Dataset with each partition sorted by the given expressions.

  

Returns a for working statistic functions support.

Get the Dataset's current storage level, or StorageLevel.NONE if not persisted.

Returns the first rows in the Dataset.

Returns the first rows in the Dataset as a list.

Converts this strongly typed collection of data to generic Dataframe.

Converts this strongly typed collection of data to generic with columns renamed.

Converts this strongly typed collection of data to generic with columns renamed.

Returns the content of the Dataset as a of s.

Returns the content of the Dataset as a Dataset of JSON strings.

Return an iterator that contains all of s in this Dataset.

 

Concise syntax for chaining custom transformations.

Returns a new Dataset containing union of rows in this Dataset and another Dataset.

Deprecated. 

use union(). Since 2.0.0.

Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk.

Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk.

Filters rows using the given condition.

Filters rows using the given SQL expression.

Returns a new Dataset by adding a column or replacing the existing column that has the same name.

Returns a new Dataset with a column renamed.

:: Experimental :: Defines an event time watermark for this .

Interface for saving the content of the non-streaming Dataset out into external storage.

:: Experimental :: Interface for saving the content of the streaming Dataset out into external storage.

Sours: https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/Dataset.html

Joinwith spark

Spark DataFrame supports all basic SQL Join Types like , , , , , , JOIN. Spark SQL Joins are wider transformations that result in data shuffling over the network hence they have huge performance issues when not designed with care.

On the other hand Spark SQL Joins comes with more optimization by default (thanks to DataFrames & Dataset) however still there would be some performance issues to consider while using.

In this tutorial, you will learn different Join syntaxes and using different Join types on two DataFrames and Datasets using Scala examples. Please access Join on Multiple DataFrames in case if you wanted to join more than two DataFrames.

1. SQL Join Types & Syntax

Below are the list of all Spark SQL Join Types and Syntaxes.

The rest of the tutorial explains Join Types using syntax 6 which takes arguments right join DataFrame, join expression and type of join in String.

For Syntax 4 & 5 you can use either “JoinType” or “Join String” defined on the above table for “joinType” string argument. When you use “JoinType”, you should as this package defines JoinType objects.

JoinTypeJoin StringEquivalent SQL Join
Inner.sql innerINNER JOIN
FullOuter.sql outer, full, fullouter, full_outerFULL OUTER JOIN
LeftOuter.sql left, leftouter, left_outerLEFT JOIN
RightOuter.sql right, rightouter, right_outerRIGHT JOIN
Cross.sql cross
LeftAnti.sql anti, leftanti, left_anti
LeftSemi.sql semi, leftsemi, left_semi

All Join objects are defined at joinTypes class, In order to use these you need to import .

Before we jump into Spark SQL Join examples, first, let’s create an and DataFrame’s. here, column is unique on emp and is unique on the dept dataset’s and emp_dept_id from emp has a reference to dept_id on dept dataset.

This print “emp” and “dept” DataFrame to console.

2. Inner Join

Spark join is the default join and it’s mostly used, It is used to join two DataFrames/Datasets on key columns, and where keys don’t match the rows get dropped from both datasets ( & ).

When we apply Inner join on our datasets, It drops “” 50 from “” and “” 30 from “” datasets. Below is the result of the above Join expression.

3. Full Outer Join

a.k.a , join returns all rows from both Spark DataFrame/Datasets, where join expression doesn’t match it returns null on respective record columns.

From our “” dataset’s “” with value 50 doesn’t have a record on “” hence dept columns have null and “” 30 doesn’t have a record in “” hence you see null’s on emp columns. Below is the result of the above Join expression.

4. Left Outer Join

Spark a.k.a join returns all rows from the left DataFrame/Dataset regardless of match found on the right dataset when join expression doesn’t match, it assigns null for that record and drops records from right where match not found.

From our dataset, “” 5o doesn’t have a record on “” dataset hence, this record contains null on “” columns (dept_name & dept_id). and “” 30 from “” dataset dropped from the results. Below is the result of the above Join expression.

5. Right Outer Join

Spark a.k.a join is opposite of join, here it returns all rows from the right DataFrame/Dataset regardless of math found on the left dataset, when join expression doesn’t match, it assigns null for that record and drops records from left where match not found.

From our example, the right dataset “” 30 doesn’t have it on the left dataset “” hence, this record contains null on “” columns. and “” 50 dropped as a match not found on left. Below is the result of the above Join expression.

6. Left Semi Join

Spark join is similar to join difference being join returns all columns from the left DataFrame/Dataset and ignores all columns from the right dataset. In other words, this join returns columns from the only left dataset for the records match in the right dataset on join expression, records not matched on join expression are ignored from both left and right datasets.

The same result can be achieved using select on the result of the inner join however, using this join would be efficient.

Below is the result of the above join expression.

7. Left Anti Join

L join does the exact opposite of the Spark join, join returns only columns from the left DataFrame/Dataset for non-matched records.

Yields below output

8. Self Join

Spark Joins are not complete without a self join, Though there is no self-join type available, we can use any of the above-explained join types to join DataFrame to itself. below example use self join

Here, we are joining dataset with itself to find out superior and for all employees.

9. Using SQL Expression

Since Spark SQL support native SQL syntax, we can also write join operations after creating temporary tables on DataFrame’s and using

10. Source Code | Scala Example

Examples explained here are available at the GitHub project for reference.

Conclusion

In this tutorial, you have learned Spark SQL Join types , , , , , , joins usage, and examples with Scala.

References:

Happy Learning !!

Tags: Cross Join,DataFrame Join,Inner Join,Left Anti Semi Join,Left Join,Left Semi Join,Outer Join,Right Join,SQL JOIN

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

Spark SQL Join Types with examples
Sours: https://sparkbyexamples.com/spark/spark-sql-dataframe-join/
You light the spark...

JOIN

Description

A SQL join is used to combine rows from two relations based on join criteria. The following section describes the overall join syntax and the sub-sections cover different types of joins along with examples.

Syntax

Parameters

  • relation

    Specifies the relation to be joined.

  • join_type

    Specifies the join type.

    Syntax:

  • join_criteria

    Specifies how the rows from one relation will be combined with the rows of another relation.

    Syntax:

    Specifies an expression with a return type of boolean.

Join Types

Inner Join

The inner join is the default join in Spark SQL. It selects rows that have matching values in both relations.

Syntax:

Left Join

A left join returns all values from the left relation and the matched values from the right relation, or appends NULL if there is no match. It is also referred to as a left outer join.

Syntax:

Right Join

A right join returns all values from the right relation and the matched values from the left relation, or appends NULL if there is no match. It is also referred to as a right outer join.

Syntax:

Full Join

A full join returns all values from both relations, appending NULL values on the side that does not have a match. It is also referred to as a full outer join.

Syntax:

Cross Join

A cross join returns the Cartesian product of two relations.

Syntax:

Semi Join

A semi join returns values from the left side of the relation that has a match with the right. It is also referred to as a left semi join.

Syntax:

Anti Join

An anti join returns values from the left relation that has no match with the right. It is also referred to as a left anti join.

Syntax:

Examples

Related Statements

Sours: https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-join.html

You will also like:

Best Java code snippets using org.apache.spark.sql.Dataset.joinWith(Showing top 3 results out of 315)

  • Common ways to obtain Dataset

    private void myMethod () {

    Datasetd=
    • Codota IconSparkSession sparkSession;JavaRDD javaRDD;StructType structType;sparkSession.createDataFrame(javaRDD, structType)
    • Codota IconSparkSession sparkSession;String str;sparkSession.read().text(str)
    • Smart code suggestions by Tabnine

    }

    @Test publicvoid testJoin() { List<Integer> data = Arrays.asList(1, 2, 3); Dataset<Integer> ds = spark.createDataset(data, Encoders.INT()).as("a"); List<Integer> data2 = Arrays.asList(2, 3, 4); Dataset<Integer> ds2 = spark.createDataset(data2, Encoders.INT()).as("b"); Dataset<Tuple2<Integer, Integer>> joined = ds.joinWith(ds2, col("a.value").equalTo(col("b.value"))); Assert.assertEquals( Arrays.asList(tuple2(2, 2), tuple2(3, 3)), joined.collectAsList()); }
    @Test publicvoid testJoin() { List<Integer> data = Arrays.asList(1, 2, 3); Dataset<Integer> ds = spark.createDataset(data, Encoders.INT()).as("a"); List<Integer> data2 = Arrays.asList(2, 3, 4); Dataset<Integer> ds2 = spark.createDataset(data2, Encoders.INT()).as("b"); Dataset<Tuple2<Integer, Integer>> joined = ds.joinWith(ds2, col("a.value").equalTo(col("b.value"))); Assert.assertEquals( Arrays.asList(tuple2(2, 2), tuple2(3, 3)), joined.collectAsList()); }
    @Test publicvoid testJoin() { List<Integer> data = Arrays.asList(1, 2, 3); Dataset<Integer> ds = spark.createDataset(data, Encoders.INT()).as("a"); List<Integer> data2 = Arrays.asList(2, 3, 4); Dataset<Integer> ds2 = spark.createDataset(data2, Encoders.INT()).as("b"); Dataset<Tuple2<Integer, Integer>> joined = ds.joinWith(ds2, col("a.value").equalTo(col("b.value"))); Assert.assertEquals( Arrays.asList(tuple2(2, 2), tuple2(3, 3)), joined.collectAsList()); }
    Sours: https://www.tabnine.com/code/java/methods/org.apache.spark.sql.Dataset/joinWith


    11440 11441 11442 11443 11444