# High Performance Spark

## Metadata
- Author: [[Holden Karau, Rachel Warren]]
- Full Title: High Performance Spark
- Category: #apache-spark #python #scala
## Highlights
- Apache Spark is a high-performance, general-purpose distributed computing system that has become the most active Apache open source project, with more than 1,000 active contributors.1 Spark enables us to process large quantities of data, beyond what can fit on a single machine, with a high-level, relatively easy-to-use API. Spark’s design and interface are unique, and it is one of the fastest systems of its kind. Uniquely, Spark allows us to write the logic of data transformations and machine learning algorithms in a way that is parallelizable, but relatively system agnostic. So it is often possible to write computations that are fast for distributed storage systems of varying kind and size. ([Location 124](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=124))
- Note: A Quick Overview of Spark
- Because Spark is written in Scala, it will be difficult to interact with the Spark source code without the ability, at least, to read Scala code. Furthermore, the methods in the Resilient Distributed Datasets (RDD) class closely mimic those in the Scala collections API. ([Location 176](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=176))
- Fundamentally Spark is a functional framework, relying heavily on concepts like immutability and lambda definition, so using the Spark API may be more intuitive with some knowledge of functional programming. ([Location 183](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=183))
- It can be attractive to write Spark in Python, since it is easy to learn, quick to write, interpreted, and includes a very rich set of data science toolkits. However, Spark code written in Python is often slower than equivalent code written in the JVM, since Scala is statically typed, and the cost of JVM communication (from Python to Scala) can be very high. ([Location 191](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=191))
- However, Spark’s internals, especially how it handles failures, differ from many traditional systems. Spark’s ability to leverage lazy evaluation within memory computations makes it particularly unique. Spark’s creators believe it to be the first high-level programming language for fast, distributed data processing. ([Location 245](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=245))
- Apache Spark is an open source framework that provides methods to process data in parallel that are generalizable; the same high-level Spark functions can be used to perform disparate data processing tasks on data of different sizes and structures. ([Location 253](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=253))
- Spark is built around a data abstraction called Resilient Distributed Datasets (RDDs). RDDs are a representation of lazily evaluated, statically typed, distributed collections. ([Location 269](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=269))
- Spark SQL is a very important component for Spark performance, and much of what can be accomplished with Spark Core can be done by leveraging Spark SQL. ([Location 283](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=283))
- Spark MLlib is primarily built on top of RDDs and uses functions from Spark Core, while ML is built on top of Spark SQL DataFrames.6 Eventually the Spark community plans to move over to ML and deprecate MLlib. ([Location 290](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=290))
- Spark Streaming uses the scheduling of the Spark Core for streaming analytics on minibatches of data. ([Location 293](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=293))
- GraphX is a graph processing framework built on top of Spark with an API for graph computations. ([Location 296](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=296))
- Spark allows users to write a program for the driver (or master node) on a cluster computing system that can perform operations on data in parallel. ([Location 309](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=309))
- Rather than evaluating each transformation as soon as specified by the driver program, Spark evaluates RDDs lazily, computing RDD transformations only when the final RDD data needs to be computed (often by writing out to storage or collecting an aggregate to the driver). ([Location 316](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=316))
- As they are implemented in Spark, RDDs are immutable, so transforming an RDD returns a new RDD rather than the existing one. As we will explore in this chapter, this paradigm of lazy evaluation, in-memory storage, and immutability allows Spark to be easy-to-use, fault-tolerant, scalable, and efficient. ([Location 319](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=319))
- Spark does not begin computing the partitions until an action is called. ([Location 325](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=325))
- Lazy evaluation allows Spark to combine operations that don’t require communication with the driver (called transformations with one-to-one dependencies) to avoid doing multiple passes through the data. ([Location 336](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=336))
- Spark is fault-tolerant, meaning Spark will not fail, lose data, or return inaccurate results in the event of a host machine or network failure. Spark’s unique method of fault tolerance is achieved because each partition of the data contains the dependency information needed to recalculate the partition. ([Location 386](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=386))
- Because of lazy evaluation, stack traces from failed Spark jobs (especially when embedded in larger systems) will often appear to fail consistently at the point of the action, even if the problem in the logic occurs in a transformation much earlier in the program. ([Location 402](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=402))
- Spark offers two ways of allocating resources across applications: static allocation and dynamic allocation. With static allocation, each application is allotted a finite maximum of resources on the cluster and reserves them for the duration of the application ([Location 576](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=576))
- In dynamic allocation, executors are added and removed from a Spark application as needed, based on a set of heuristics for estimated resource requirement. ([Location 583](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=583))
- A Spark application corresponds to a set of Spark jobs defined by one SparkContext in the driver program. A Spark application begins when a SparkContext is started. ([Location 586](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=586))
- The SparkContext determines how many resources are allotted to each executor. ([Location 592](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=592))
- With each action, the Spark scheduler builds an execution graph and launches a Spark job. Each job consists of stages, which are steps in the transformation of the data needed to materialize the final RDD. Each stage consists of a collection of tasks that represent each parallel computation and are performed on the executors. ([Location 618](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=618))
- Each application may contain many jobs that correspond to one RDD action. Each job may contain several stages that correspond to each wide transformation. Each stage is composed of one or many tasks that correspond to a parallelizable unit of computation done in each stage. There is one task for each partition in the resulting RDD of that stage. ([Location 624](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=624))
- Tags: [[orange]]
- Spark’s high-level scheduling layer uses RDD dependencies to build a Directed Acyclic Graph (a DAG) of stages for each Spark job. ([Location 629](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=629))
- A Spark job is the highest element of Spark’s execution hierarchy. Each Spark job corresponds to one action, and each action is called by the driver program of a Spark application. ([Location 639](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=639))
- Tags: [[orange]]
- Each stage corresponds to a shuffle dependency created by a wide transformation in the Spark program. At a high level, one stage can be thought of as the set of computations (tasks) that can each be computed on one executor without communication with other executors or with the driver. In other words, a new stage begins whenever network communication between workers is required; for instance, in a shuffle. ([Location 653](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=653))
- Tags: [[orange]]
- A stage consists of tasks. The task is the smallest unit in the execution hierarchy, and each can represent one local computation. ([Location 673](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=673))
- Tags: [[orange]]
- The number of tasks per stage corresponds to the number of partitions in the output RDD of that stage. ([Location 677](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=677))
- Tags: [[orange]]
- In some ways, the simplest way to think of the Spark execution model is that a Spark job is the set of RDD transformations needed to compute one final result. Each stage corresponds to a segment of work, which can be accomplished without involving the driver. In other words, one stage can be computed without moving data across the partitions. Within one stage, the tasks are the units of work done for each partition of the data. ([Location 704](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=704))
- Tags: [[orange]]
- Spark SQL and its DataFrames and Datasets interfaces are the future of Spark performance, with more efficient storage options, advanced optimizer, and direct operations on serialized data. These components are super important for getting the best of Spark performance ([Location 738](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=738))
- Spark’s DataFrames have very different functionality compared to traditional DataFrames like Panda’s and R’s. While these all deal with structured data, it is important not to depend on your existing intuition surrounding DataFrames. ([Location 748](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=748))
- Tags: [[orange]]
- Like RDDs, DataFrames and Datasets represent distributed collections, with additional schema information not found in RDDs. ([Location 750](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=750))
- DataFrames are Datasets of a special Row object, which doesn’t provide any compile-time type checking. ([Location 755](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=755))
- Tags: [[orange]]
- The schema information, and the optimizations it enables, is one of the core differences between Spark SQL and core Spark. ([Location 894](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=894))
- DataFrames expose the schema in both human-readable or programmatic formats. printSchema() will show us the schema of a DataFrame and is most commonly used when working in the shell to figure out what you are working with. ([Location 899](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=899))
- Tags: [[orange]]
- The first part is a StructType, which contains a list of fields. It’s important to note you can nest StructTypes, like how a case class can contain additional case classes. ([Location 952](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=952))
- Tags: [[blue]]
- Spark SQL’s DataFrame API allows us to work with DataFrames without having to register temporary tables or generate SQL expressions. The DataFrame API has both transformations and actions. The transformations on DataFrames are more relational in nature, with the Dataset API (covered next) offering a more functional-style API. ([Location 983](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=983))
- Transformations on DataFrames are similar in concept to RDD transformations, but with a more relational flavor. Instead of specifying arbitrary functions, which the optimizer is unable to introspect, you use a restricted expression syntax so the optimizer can have more information. ([Location 989](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=989))
- Spark SQL transformations are only partially lazy; the schema is eagerly evaluated. ([Location 994](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=994))
- DataFrame functions, like filter, accept Spark SQL expressions instead of lambdas. These expressions allow the optimizer to understand what the condition represents, and with filter, it can often be used to skip reading unnecessary records. ([Location 1001](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1001))
- Spark SQL’s column operators are defined on the column class, so a filter containing the expression 0 >= df.col("friends") will not compile since Scala will use the >= defined on 0. Instead you would write df.col("friend") <= 0 or convert 0 to a column literal with lit. ([Location 1029](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1029))
- Tags: [[blue]]
- Not all Spark SQL expressions can be used in every API call. For example, Spark SQL joins do not support complex operations, and filter requires that the expression result in a boolean, and similar. ([Location 1078](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1078))
- Tags: [[blue]]
- When you construct a sequence of operations, the generated column names can quickly become unwieldy, so the as or alias operators are useful to specify the resulting column name. ([Location 1142](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1142))
- Tags: [[blue]]
- if/else semantics. Example 3-21 is a simple example of this, and it encodes the different types of panda as a numeric value.4 The when and otherwise functions can be chained together to create the same effect. ([Location 1145](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1145))
- Tags: [[orange]]
- Spark SQL also provides special tools for handling missing, null, and invalid data. By using isNan or isNull along with filters, you can create conditions for the rows you want to keep. ([Location 1162](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1162))
- Tags: [[orange]]
- Spark SQL also allows us to select the unique rows by calling dropDuplicates, but as with the similar operation on RDDs (distinct), this can require a shuffle, so is often much slower than filter. Unlike with RDDs, dropDuplicates can optionally drop rows based on only a subset of the columns, such as an ID field, ([Location 1173](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1173))
- Tags: [[orange]]
- min, max, avg, and sum are all implemented as convenience functions directly on GroupedData, and more can be specified by providing the expressions to agg. ([Location 1191](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1191))
- Tags: [[orange]]
- While Example 3-23 computes the max on a per-key basis, these aggregates can also be applied over the entire DataFrame or all numeric columns in a DataFrame. This is often useful when trying to collect some summary statistics for the data with which you are working. In fact, there is a built-in describe transformation which does just that, although it can also be limited to certain columns, ([Location 1201](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1201))
- Tags: [[orange]]
- For computing multiple different aggregations, or more complex aggregations, you should use the agg API on the GroupedData instead of directly calling count, mean, or similar convenience functions. ([Location 1217](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1217))
- Tags: [[orange]]
- In addition to using aggregates on groupBy, you can run the same aggregations on multidimensional cubes with cube and rollups with rollup. ([Location 1262](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1262))
- Tags: [[orange]]
- Using this specification each input row is related to some set of rows, called a frame, that is used to compute the resulting aggregate. Window functions can be very useful for things like computing average speed with noisy data, relative sales, and more. ([Location 1271](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1271))
- Tags: [[blue]]
- When limiting results, sorting is often used to only bring back the top or bottom K results. When limiting you specify the number of rows with limit(numRows) to restrict the number of rows in the DataFrame. ([Location 1302](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1302))
- Tags: [[orange]]
- The DataFrame set-like operations allow us to perform many operations that are most commonly thought of as set operations. These operations behave a bit differently than traditional set operations since we don’t have the restriction of unique elements. ([Location 1315](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1315))
- DataFrames and Datasets have a specialized representation and columnar cache format. The specialized representation is not only more space efficient, but also can be much faster to encode than even Kryo serialization. ([Location 1349](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1349))
- Tungsten’s representation is substantially smaller than objects serialized using Java or even Kryo serializers. ([Location 1364](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1364))
- Spark SQL has a different way of loading and saving data than core Spark. To be able to push down certaintypes of operations to the storage layer, Spark SQL has its own Data Source API. Data sources are able to specify and control which type of operations should be pushed down to the data source. As developers, you don’t need to worry too much about the internal activity going on here, unless the data sources you are looking for are not supported. ([Location 1382](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1382))
- Loading and writing JSON is supported directly in Spark SQL, and despite the lack of schema information in JSON, Spark SQL is able to infer a schema for us by sampling the records. ([Location 1407](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1407))
- The JDBC data source represents a natural Spark SQL data source, one that supports many of the same operations. ([Location 1429](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1429))
- While Spark supports many different JDBC sources, it does not ship with the JARs required to talk to all of these databases. If you are submitting your Spark job with spark-submit you can download the required JARs to the host you are launching and include them by specifying --jars or supply the Maven coordinates to --packages. ([Location 1434](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1434))
- Apache Parquet files are a common format directly supported in Spark SQL, and they are incredibly space-efficient and popular. Apache Parquet’s popularity comes from a number of features, including the ability to easily split across multiple files, compression, nested types, and many others ([Location 1473](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1473))
- Converting a DataFrame to an RDD is a transformation (not an action); however, converting an RDD to a DataFrame or Dataset may involve computing (or sampling some of) the input RDD. ([Location 1542](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1542))
- When you create a DataFrame from an RDD, Spark SQL needs to add schema information. If you are creating the DataFrame from an RDD of case classes or plain old Java objects (POJOs), Spark SQL is able to use reflection to automatically determine the schema, ([Location 1546](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1546))
- Since a row can contain anything, you need to specify the type (or cast the result) as you fetch the values for each column in the row. ([Location 1582](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1582))
- Collecting data back as a local collection is more common and often done post aggregations or filtering on the data. For example, with ML pipelines collecting the coefficents or (as discussed in our Goldilocks example in Chapter 6) collecting the quantiles to the driver. ([Location 1614](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1614))
- Filter push-down can make a huge difference when working with large datasets by allowing Spark to only access the subset of data required for your computation instead of doing effectively a full table scan. ([Location 1686](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1686))
- When reading partitioned data, you point Spark to the root path of your data, and it will automatically discover the different partitions. Not all data types can be used as partition keys; currently only strings and numeric data are the supported types. ([Location 1688](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1688))
- Datasets are an exciting extension of Spark SQL that provide additional compile-time type checking. Starting in Spark 2.0, DataFrames are now a specialized version of Datasets that operate on generic Row objects and therefore lack the normal compile-time type checking of Datasets. Datasets can be used when your data can be encoded for Spark SQL and you know the type information at compile time. ([Location 1702](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1702))
- For loading data into a Dataset, unless a special API is provided by your data source, you can first load your data into a DataFrame and then convert it to a Dataset. Since the conversion to the Dataset simply adds information, you do not have the problem of eagerly evaluating, and future filters and similar operations can still be pushed down to the data store. ([Location 1731](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1731))
- One of the reasons to use Datasets over traditional DataFrames is their compile-time strong typing. DataFrames have runtime schema information but lack compile-time information about the schema. This strong typing is especially useful when making libraries, because you can more clearly specify the requirements of your inputs and your return types. ([Location 1752](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1752))
- In addition to single Dataset transformations, there are also transformations for working with multiple Datasets. The standard set operations, namely intersect, union, and subtract, ([Location 1788](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1788))
- Catalyst is the Spark SQL query optimizer, which is used to take the query plan and transform it into an execution plan that Spark can run. Much as our transformations on RDDs build up a DAG, as we apply relational and functional transformations on DataFrames/Datasets, Spark SQL builds up a tree representing our query plan, called a logical plan. Spark is able to apply a number of optimizations on the logical plan and can also choose between multiple physical plans for the same logical plan using a cost-based model. ([Location 1915](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1915))
- These simplifications can be written using pattern matching on the tree, such as the rule for simplifying additions between two literals. The optimizer is not limited to pattern matching, and rules can also include arbitrary Scala code. ([Location 1926](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1926))
- Once the logical plan has been optimized, Spark will produce a physical plan. The physical plan stage has both rule-based and cost-based optimizations to produce the optimal physical plan. One of the most important optimizations at this stage is predicate pushdown to the data source level. ([Location 1928](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1928))
- While the Catalyst optimizer is quite powerful, one of the cases where it currently runs into challenges is with very large query plans. These query plans tend to be the result of iterative algorithms, like graph algorithms or machine learning algorithms. One simple workaround for this is converting the data to an RDD and back to DataFrame/Dataset at the end of each iteration, ([Location 1941](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1941))
- While RDDs lack the Catalyst optimizer and relational style queries, they are able to work with a wider variety of data types and provide more direct control over certain types of operations. ([Location 1991](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1991))
- DataFrames and Datasets also only work with a restricted subset of data types — but when your data is in one of these supported classes the performance improvements of using the Catalyst optimizer provide a compelling case for accepting those restrictions. ([Location 1993](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1993))
- DataFrames can be used when you have primarily relational transformations, which can be extended with UDFs when necessary. Compared to RDDs, DataFrames benefit from the efficient storage format of Spark SQL, the Catalyst optimizer, and the ability to perform certain operations directly on the serialized data. One drawback to working with DataFrames is that they are not strongly typed at compile time, which can lead to errors with incorrect column access and other simple mistakes. ([Location 1995](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1995))
- Datasets can be used when you want a mix of functional and relational transformations while benefiting from the optimizations for DataFrames and are, therefore, a great alternative to RDDs in many cases. ([Location 1998](https://readwise.io/to_kindle?action=open&asin=B0725YT69J&location=1998))