spark repartition что делает
По данным Learning Spark
Имейте в виду, что перераспределение ваших данных является довольно дорогой операцией. Spark также имеет оптимизированную версию repartition() call, coalesce() которая позволяет избежать перемещения данных, но только если вы уменьшаете количество разделов RDD.
Одно из различий, которое я получаю, заключается в том, что с repartition() количеством разделов можно увеличивать / уменьшать, но с coalesce() количеством разделов можно только уменьшаться.
Если разделы распределены по нескольким машинам и coalesce() запущены, как можно избежать перемещения данных?
Это позволяет избежать полного перемешивания. Если известно, что число уменьшается, то исполнитель может безопасно хранить данные о минимальном количестве разделов, перемещая данные только с дополнительных узлов на те узлы, которые мы сохранили.
Итак, это будет примерно так:
Затем coalesce до 2 разделов:
Обратите внимание, что Узел 1 и Узел 3 не требовали перемещения своих исходных данных.
Ответ Джастина потрясающий, и этот ответ углубляется.
repartition Алгоритм делает полный перетасовать и создает новые разделы с данными, распределенными равномерно. Давайте создадим DataFrame с номерами от 1 до 12.
numbersDf содержит 4 раздела на моей машине.
Вот как данные делятся на разделы:
Давайте сделаем полное перемешивание с repartition методом и получим эти данные на двух узлах.
Вот как numbersDfR данные разделены на моей машине:
repartition Метод делает новые разделы и равномерно распределяет данные в новых разделах (распределение данных больше даже для больших наборов данных).
Разница между coalesce и repartition
coalesce использует существующие разделы, чтобы минимизировать объем перемешиваемых данных. repartition создает новые разделы и делает полное перемешивание. coalesce приводит к разделам с различными объемами данных (иногда разделам, которые имеют очень разные размеры) и repartition приводит к разделам примерно одинакового размера.
Есть coalesce или repartition быстрее?
Прочтите этот пост в блоге, если вы хотите еще больше деталей.
Когда вы будете использовать коалесценцию и передел на практике
All data processed by spark is stored in partitions. Today we discuss what are partitions, how partitioning works in Spark (Pyspark), why it matters and how the user can manually control the partitions using repartition and coalesce for effective distributed computing.
Introduction
Spark is a framework which provides parallel and distributed computing on big data. To perform it’s parallel processing, spark splits the data into smaller chunks(i.e. partitions) and distributes the same to each node in the cluster to provide a parallel execution of the data. This partitioning of data is performed by spark’s internals and the same can also be controlled by the user.
Table of Contents
A Partition in simple terms is a split in the input data, so partitions in spark are basically smaller logical chunks or divisions of the input data. Spark distributes this partitioned data among the different nodes to perform distributed processing on the data.
Let’s dive into a practical example and create a simple RDD using the sc.parallelize method to determine the number of partitions spark creates by default.
As shown above, spark creates 8 partitions by default here. Now, you must be wondering where this default number is come from.
This is how the data is present inside each partition. Spark uses an internal Hash Partitioning Scheme to split the data into these smaller chunks.
We can use the rdd.glom() method to display the partitions in a list.
Let’s visualize the above collect operation in the Spark WebUI(if you are using spark locally, navigate to https://localhost:4040 to see the spark webui or if spark is being accessed via a cluster navigate to your cluster specific localhost webUI)
Note: Spark shell notifies a port number before creating a Sparksession.
Since 8 partitions are present, 8 executors would be launched for this action, as shown below,
The above method used for an RDD can also be applied to a dataframe. Let’s convert the RDD to a Dataframe and apply the same method. We can see that the same number of partitions are present.
How to control the number of partitions when reading / writing files in Spark?
Now, you must’ve had the question in your mind as to how spark partitions the data when reading textfiles. Let’s read a simple textfile and see the number of partitions here.
Let’s read the CSV file which was the input dataset in my first post, [pyspark dataframe basic operations]
We can see that only 1 partition is created here. Alright let’s break this down,
Spark by default creates 1 partition for every 128 MB of the file.
So if you are reading a file of size 1GB, it creates 10 partitions.
The file being read here is of 3.6 MB, hence only 1 partition is created in this case.
The no. of partitions is determined by spark.sql.files.maxPartitionBytes parameter, which is set to 128 MB, by default.
Note: The files being read must be splittable by default for spark to create partitions when reading the file. So, in case of compressed files like snappy, gz or lzo etc, a single partition is created irrespective of the size of the file.
Let’s manually set the spark.sql.files.maxPartitionBytes to 1MB, so that we get 4 partitions upon reading the file.
As discussed, 4 partitions were created.
Caution: The above manual setting of the spark.sql.files.maxPartitionBytes to 1MB was just for the purpose of this demo and is not recommended in practical scenarios. It is recommended to use the default setting or set a value based on your input size and cluster hardware size.
Partitioning is the sole basis by which spark distributes data among different nodes to thereby producing a distributed and parallel execution of the data with reduced latency.
If this concept of partitioning is not present in spark, then there would be no parallel processing existing in spark.
Let’s analyze this with a simple example where we create an RDD with a single partition and perform an action on the same.
When the above action is seen on the Spark WebUI, only a single executor would be issued to process this data.
So, imagine a case where you are processing a huge volume of data without the concept of partitioning, then the entire data would be processed by a single executor taking a lot of time and memory.
3. Repartition
Now what if you wish to control these partitions by yourself. This is where the methods of repartition and coalesce come to effect.
What is Repartition?
Repartition is a method in spark which is used to perform a full shuffle on the data present and creates partitions based on the user’s input.
The resulting data is hash partitioned and the data is equally distributed among the partitions.
Methods to Repartition a Dataframe
There are 2 ways to repartition a dataframe, Specifying a set of columns or column along with the partition size (or) Specifying an int value to create the number of partitions
Method 1 : Repartition using Column Name
Now let’s repartition our dataset using the first method using the column present in the dataframe and check the number of partitions being created after repartition.
Method 2 : Repartition using integer value
Now let’s use method 2 to specify an integer value to create the number of partitions as per our requirement. Let’s repartition the dataset to 3 partitions.
Now, if you save the above dataframe as CSV, 3 files would be created with each one having contents as below,
Partition1 : 5,6,7,9
Partition2 : 1,2,4
Partition3 : 3,8,10
Let’s visualise the same on the Spark WebUI
The above execution when seen on the spark console would show 3 tasks being issued on the collect operation(stage 2).
Caution: Repartition performs a full shuffle on the input dataset, hence it would be a costly operation if done on large datasets.
Note: Use Repartition only when you want to increase the number of partitions (or) if you want to perform a full shuffle on the data.
4. Coalesce
Coalesce is another method to partition the data in a dataframe. This is mainly used to reduce the number of partitions in a dataframe.
Unlike repartition, coalesce doesn’t perform a shuffle to create the partitions.
Suppose there are 100 partitions with 10 records in each partition, and if the partition size is reduced to 50, it would retain 50 partitions and append the other values to these existing partitions thereby having 20 records in each partition.
Let’s analyze this using our dataset, let’s reduce the number of partitions to 2 now,
As shown above, the data from the 3rd partition is removed and appended with the 2nd partition, proving that there is no shuffle process going on here. Hence, this would be much less memory intensive.
Now, if you save the above dataframe as CSV, 3 files would be created with each one having contents as below,
Partition1 : 5,6,7,9
Partition2 : 1,2,4,3,8,10
So, when your motive is to reduce the number of partitions from say 1000 to 500, then coalesce is a better option. But the output data would not be equally partitioned.
The Answer is no. Coalesce has no shuffle taking place and the algorithm is designed to move data from some partitions to existing partitions.
Let’s prove the above point with an example. If we try to increase the coalesce number to 8, we would still receive the existing number of partitions present before the coalesce function, so in our case only 3 partitions would be the output.
Repartition can be used under these scenarios,
Coalesce can be used under the following scenarios,
Caution: Even though coalesce seems to be useful when decreasing partitions, it also reduces the degree of parallelism when you want to partition your data. Example, if you do an extreme coalesce to 1 partition, then all the computation would take place on a single node which is not a good practice. In this case repartition can be used.
For the above coalesce operation, all computation takes place on a single node and not utilizing the other ones. So, in a real world problem with a huge dataset, this would create a huge overhead on a single node thereby slowing down the process and degrading the performance. The snapshot present below clearly depicts a single task being issued for the above operation,
Now let’s perform the same process using repartition.
For the above case, repartition first performs a full shuffle of the data. The data here resides in 8 partitions, so 8 executors are launched to perform the shuffle.
After completion of shuffle, the data is placed into a single node described by the second stage. So, the save operation is distributed among the different executors. Performance wise this might increase the latency a bit depending on the number of partitions created, but the overhead on a single node would be avoided. The snapshot present below indicates the same.
6. Conclusion
By now, you must be clear on the concepts of partitioning in spark and how spark utilizes partitioning to perform parallel distributed processing.
In addition to this, the repartition and coalesce methods should be much clearer and you can use either one depending upon your use case.
Comments and feedback are encouraged on the comment section below. Cheers!
Русские Блоги
Операторы Spark Coalesce и Repartition управляют разделами
Метод повторного разбиения заключается в вызове метода coalesce, и перемешивание истинно
Таким образом, по умолчанию coalesce не генерирует перемешивание.
Введение в раздел
Если в конфигурационном файле spark-default.conf нет конфигурации, значение выбирается в соответствии со следующими правилами:
1. Локальный режим (исполнитель не будет запущен, процесс SparkSubmit будет генерировать указанное количество потоков для одновременной работы):
3. Другие режимы (здесь в основном относится к режиму пряжи, конечно, автономный тоже)
Метод coalesce уменьшает количество разделов в DataFrame. Вот как объединить данные в двух разделах:
Мы можем проверить, создал ли coalesce новый DataFrame только с одним разделом:
Используйте coalesce для увеличения раздела, но это не действует:
Метод repartition можно использовать для увеличения или уменьшения количества разделов в DataFrame.
Увеличить или уменьшить раздел
Метод перераспределения может полностью переупорядочить данные, поэтому количество разделов может быть увеличено или уменьшено.
Разница между слиянием и переделом
При повторном разделении данные полностью переупорядочиваются и создаются разделы данных одинакового размера. coalesce объединяет существующие разделы, чтобы избежать полного перемешивания.
Переразбивка по столбцу
При разбиении по столбцам Spark по умолчанию создает не менее 200 разделов. Просмотрите данные раздела, только два раздела содержат данные, и поле цвета данных в одном разделе согласовано. colorDf содержит разные разделы каждого цвета и оптимизирован для извлечения цветов. Разделение по столбцам аналогично индексированию столбцов в реляционной базе данных.
Рассмотрим раздел
1) N M и N похоже на M (если N равно 1000, а M равно 100), то несколько из N разделов могут быть объединены в новый раздел и, наконец, объединены в разделы M. Это Вы можете установить для shuff значение false, когда shuffl имеет значение false, и если M> N, coalesce недопустимо, процесс перемешивания не выполняется, и существует узкая зависимость между родительским RDD и дочерним RDD.
3) Если N> M и есть большая разница между ними, если в это время для shuffle установлено значение false, RDD родитель-потомок является узкой зависимостью, и они находятся на одной стадии, что может вызвать недостаточный параллелизм программы Spark, тем самым влияя на производительность. Если M равно 1, чтобы предыдущие операции coalesce имели лучший параллелизм, вы можете установить для shuffle значение true.
Вкратце: если shuff имеет значение false, если переданный параметр больше, чем количество существующих разделов, количество разделов RDD остается неизменным, то есть количество разделов RDDde не может быть увеличено без перемешивания.
Согласно Learning Spark
Единственное различие, которое я получаю, заключается в том, что при перераспределении() число разделов может быть увеличено/уменьшено, но с coalesce() количество разделов может быть уменьшено только.
Если разделы распределены между несколькими машинами и запущен процесс coalesce(), как он может избежать перемещения данных?
ОТВЕТЫ
Ответ 1
Это позволяет избежать полной перетасовки. Если известно, что число уменьшается, то исполнитель может безопасно хранить данные на минимальном количестве разделов, только перемещая данные с лишних узлов на узлы, которые мы сохранили.
Итак, это будет выглядеть примерно так:
Затем coalesce до двух разделов:
Обратите внимание, что Node 1 и Node 3 не требовали перемещения исходных данных.
Ответ 2
Ответ Джастина является удивительным, и этот ответ становится более глубоким.
Алгоритм repartition выполняет полную перетасовку и создает новые разделы с данными, которые распределяются равномерно. Позвольте создать DataFrame с номерами от 1 до 12.
numbersDf содержит 4 раздела на моей машине.
Вот как данные разделяются на разделы:
Позвольте выполнить полный переход с помощью метода repartition и получить эти данные на двух узлах.
Вот как данные numbersDfR разбиваются на мою машину:
Метод repartition создает новые разделы и равномерно распределяет данные в новых разделах (распределение данных более равномерно для больших наборов данных).
Разница между coalesce и repartition
coalesce использует существующие разделы, чтобы минимизировать количество перетасованных данных. repartition создает новые разделы и выполняет полную перетасовку. coalesce приводит к разделам с разным объемом данных (иногда разделов с разными размерами) и repartition приводит к примерно равным размерам разделов.
Является ли coalesce или repartition быстрее?
Прочитайте этот пост в блоге, если вы хотите получить более подробную информацию.
Ответ 3
Еще один момент, который следует отметить здесь, состоит в том, что, поскольку основным принципом Spark RDD является неизменяемость. Перераспределение или совместное создание нового RDD. Базовый RDD будет продолжать существовать с его первоначальным количеством разделов. В случае использования прецедента для сохранения RDD в кеше, то это же должно быть сделано для вновь созданного RDD.
Ответ 4
Все ответы добавляют некоторые большие знания в этот очень часто задаваемый вопрос.
Итак, по традиции этого вопроса, вот мои 2 цента.
Я обнаружил, что перераспределение быстрее, чем слияние, в очень конкретном случае.
В моем приложении, когда количество файлов, которые мы оцениваем, ниже определенного порога, перераспределение работает быстрее.
Вот что я имею в виду
В вышеприведенном фрагменте, если мои файлы были меньше 20, coalesce продолжал заканчиваться, а перераспределение было намного быстрее, и поэтому приведенный выше код.
Конечно, это число (20) будет зависеть от количества рабочих и количества данных.
Надеюсь, что это поможет.
Ответ 5
Ответ 6
Оба отлично работают
Но мы идем в общем за эти две вещи, когда нам нужно видеть вывод в одном кластере, идем с этим.
Ответ 7
Ко всем отличным ответам я хотел бы добавить, что повторное разбиение является одним из лучших вариантов использования распараллеливания данных, а объединение дает дешевый вариант сокращения раздела и очень полезно при записи данных в HDFS или какой-либо другой приемник, чтобы воспользоваться преимуществами большой пишет. Я нашел это полезным при записи данных в формате паркета, чтобы получить полное преимущество.
Ответ 8
«coalesce» будет работать с существующими разделами и перетасовывать их подмножество. Он не может исправить перекос данных так же, как «перераспределение». так что даже если это дешевле, это может быть не то, что вам нужно.
Ответ 9
Таким образом, coalesce и repartition могут быть использованы для увеличения числа разделов
С shuffle = true вы можете объединить большее количество разделов. Это полезно, если у вас есть небольшое количество разделов, скажем, 100, потенциально с несколькими разделами, которые являются необычно большими.
Еще одно важное замечание, которое необходимо подчеркнуть, заключается в том, что если вы резко уменьшите количество разделов, вам следует рассмотреть возможность использования смешанной версии coalesce (то же самое, что и repartition в этом случае). Это позволит выполнять ваши вычисления параллельно на родительских разделах (несколько задач).
Однако, если вы делаете резкое объединение, например, с numPartitions = 1, это может привести к тому, что ваши вычисления будут выполняться на меньшем количестве узлов, чем вам нравится (например, один узел в случае numPartitions = 1). Чтобы избежать этого, вы можете передать shuffle = true. Это добавит случайный шаг, но означает, что текущие разделы восходящего потока будут выполняться параллельно (независимо от текущего разделения).
Пожалуйста, также обратитесь к соответствующему ответу здесь
Ответ 10
Но также вы должны убедиться, что данные, которые поступают на объединенные узлы, должны быть хорошо сконфигурированы, если вы имеете дело с огромными данными. Поскольку все данные будут загружены в эти узлы, может возникнуть исключение памяти. Хотя возмещение затратно, я предпочитаю им пользоваться. Так как он перемешивает и распределяет данные одинаково.
Будь мудрым, чтобы выбрать между объединением и переделом.
Ответ 11
Для кого-то, у кого были проблемы с генерацией одного CSV файла из PySpark (AWS EMR) в качестве вывода и сохранением его на s3, помогло перераспределение. Причина в том, что объединение не может сделать полную перестановку, но перераспределение может. По сути, вы можете увеличить или уменьшить количество разделов, используя перераспределение, но можете только уменьшить количество разделов (но не 1), используя объединение. Вот код для тех, кто пытается записать CSV из AWS EMR в s3:
Ответ 12
Например. наш начальный фрейм данных разделен на 200 разделов.
df.repartition(500): данные будут перетасовываться с 200 разделов на новые 500 разделов
Объединить: перемешать данные в число разделов
Apache Spark: оптимизация производительности на реальных примерах
Apache Spark – фреймворк для обработки больших данных, который давно уже стал одним из самых популярных и часто встречаемых во всевозможных проектах, связанных с Big Data. Он удачно сочетает в себе скорость работы и простоту выражения своих мыслей разработчиком.
Разработчик работает с данными на достаточно высоком уровне и, кажется, что нет ничего сложного в том, чтобы, например, соединить два набора данных, написав всего одну строку кода:
Но только задумайтесь: что происходит в кластере при соединении двух наборов данных, которые могут и не находится целиком на каком-либо из узлов кластера? Обычно Spark со всем справляется быстро, но иногда, а особенно, если данных действительно много, необходимо все-таки понимать – что происходит уровнем ниже и использовать это знание, чтобы помочь Spark работать в полную силу.
Сегодня мы поговорим о том, как сделать так, чтобы ваше приложение работало быстро и использовало все ресурсы, которые вы запросили для него. В этой статье рассмотрим в основном модуль Spark SQL, запуск приложения Spark в кластере с Yarn со статическим выделением ресурсов. Но общие идеи можно применять и с другими начальными данными. Мы рассматриваем здесь Spark 2.3/2.4, чтобы лучше понять все нововведения Spark 3.X с его Adaptive Query Execution (AQE) (хотя некоторые функции уже присутствуют и в Spark 2.4) и почему они так важны.
Данные и где они обитают
Начнем с абстракции, которую нам предоставляет Spark для работы с данными – это RDD (Resilient Distributed Dataset). Для цели нашей статьи не важно, что мы работаем с DataFrame или DataSet.
RDD (Resilient Distributed Dataset)
Таким образом, для разработчиков набор данных представлен в виде единого объекта, а обрабатывается он порциями (блоками) отдельно в каком-то потоке на каком-то исполнителе (executor) в кластере. Блок является минимальной единицей обработки, исполнитель получает блок и инструкцию, которая сообщает ему о том, что необходимо сделать с этим блоком данных.
Как работает приложение Spark в кластере
На высоком уровне каждое приложение Spark в момент своей работы состоит из драйвера – программы, которая исполняет функцию main() и исполнителей, которые работают на узлах кластера. Исполнители – универсальные солдаты, они получают порцию данных (блок) и инструкцию, исполняют ее и докладывают драйверу о завершении, чтобы получить следующую инструкцию. В каждом исполнителе может быть запущено более одного потока обработки, и в этом случае каждый поток обрабатывает свой блок данных независимо от других. Таким образом, если мы при запуске нашего приложения заказали у менеджера кластера пять исполнителей по четыре ядра (потока), то в каждый момент времени мы располагаем 5*4 = 20 потоками и в лучшем случае можем обрабатывать 20 блоков данных одновременно.
Итак, каждая задача получает для выполнения:
num_executors – к-во отдельных процессов JVM, в которых будут запущена потоки обработки данных (они могут быть расположены как на одной узле кластера, так и на разных). Процессы будут работать до конца работы приложения;
executor_cores – количество параллельных потоков, выполняемых в каждом executor. Один поток в каждый момент времени обрабатывает один блок данных.
Схема работы приложения Spark
В Spark History (Web сервер для отображения логов выполнения приложений Spark в удобном виде) это выглядит так:
Spark History: Stage
Мы здесь видим два исполнителя, в каждом из которых работает по четыре потока обработки.
Перетасовка (Shuffle)
Итак, мы разобрались в том, что у нас есть N блоков данных и P потоков (работников), которые эти блоки данных могут перерабатывать параллельно.
И все у нас было бы хорошо, если бы эти блоки жили до конца приложения, но почти в любом приложении будет обработка, которая влечет за собой полную перетасовку наших блоков. Это, например, соединение двух таблиц по ключу (JOIN), группировка по ключу (GROUP BY). В этом случае работает всем хорошо известный паттерн MapReduce, при котором происходит перераспределение данных всего набора по ключу на новые блоки данных, так, чтобы строки с одним и тем же ключом находились только в одном блоке. Этот процесс в Spark называется Shuffle. Почему я написал его с большой буквы? Потому что это очень сложный и дорогостоящий процесс, при котором увеличивается потребление памяти на исполнителях, потребление дисковой памяти на узлах кластера и сетевой обмен между узлами кластера. Очень напоминает превращение гусеницы в бабочку – все разваливается на куски, чтобы потом собраться в новом обличии, и так же энергозатратно.
Задание делится на этапы
В Spark обработка блоков от одной перетасовки (Shuffle) до другой называется этапом (Stage). Заметим, что до перетасовки все блоки обрабатываются параллельно, после перетасовки они тоже обрабатываются параллельно, но новый этап не начнется пока этот процесс не пройдут все блоки в конце предыдущего этапа. Таким образом, граница между этапами – это место ожидания при параллельной обработке блоков. Заметим также, что в рамках одного этапа все задачи (task) над одним блоком происходят последовательно в рамках одного потока. То есть блок никуда не передается по сети, но все блоки обрабатываются параллельно. Получается, что количество блоков в границах этапа неизменно.
Задание делится на этапы
Мы пришли к следующей картине: все задания делятся на этапы, а в рамках каждого этапа количество блоков неизменно и равно …. И вот здесь начинается самое интересное. Количество работников у нас известно (P = executors*cores), а вот сколько блоков будет на каждом этапе – это вопрос, от которого напрямую зависит производительность нашего приложения. Ведь если блоков много, а исполнителей мало, то каждый исполнитель будет обрабатывать последовательно несколько блоков и наоборот: если блоков мало, а исполнителей больше, то некоторые исполнители будут простаивать, когда остальные трудятся не покладая рук. Самое интересное здесь – то, что, когда приложение работает медленно, пытаются выдать ему больше исполнителей, но производительность в этом случае не увеличивается.
Мы читаем небольшой файл из HDFS, в котором 150 тысяч записей. Весь файл помещается в один блок HDFS. Таким образом, на первом этапе у нас всего один блок данных, поэтому работать с ним сможет только один исполнитель. Но по логике работы трансформации в каждой строке находится поле duration (количество секунд просмотра), и нам необходимо размножить каждую строку на выходе на столько строк, сколько секунд смотрения у нас в этой строке.
Трансформация на тестовых данных работает не быстро. Глядя в Spark History, видим:
На первом этапе один блок данных
Смотрим Spark History:
Теперь обработка идет параллельно
Приведу пример, когда у нас данных мало, а spark.sql.shuffle.partitions = 200 по умолчанию. Вырожденный случай и не используем broadcast. Глядя на Spark History, видим, что наш набор данных состоит всего из 185 строк, и был поделен при перетасовке на 200 блоков (но тут и на 200 блоков не хватит). Заметим, что реальная полезная работа исполнителя окрашена здесь в зеленый цвет. То есть получается, что из всего времени работы исполнителя на обработку одного блока данных из одной записи полезное время составило
Что же у нас происходит на последнем этапе? Это опять зависит от того, куда мы выводим данные нашей трансформации. Например, мы хотим записать все в директорию в виде parquet файлов. Если мы сделаем это после перетасовки, ничего не предприняв, то обнаружим 200 файлов в этой директории после выполнения нашей программы. Почему? Потому что после перетасовки у нас получилось по умолчанию spark.sql.shuffle.partitions = 200 блоков, а так как один блок обрабатывается одним потоком, то и записывать его он будет сам в отдельный файл.
Кстати, один из интересных моментов применения метода разрыва последнего этапа при сохранении, описанного в предыдущем параграфе:
Если сравнивать показатели перед и после разрыва последнего этапа, то все кажется отлично: время работы трансформации сократилось в несколько раз (так как последние вычисления выполнялись параллельно всеми исполнителями), исчез Shuffle Spill (это когда исполнителю не хватает памяти и он устраивает своеобразный swap с локальным диском. Конечно, в этом случае все данные пришли несколькими большими блоками и исполнители с трудом их переварили).
СТОП! Приглядимся к размеру файлов, полученных при сохранении. Было 5.9 Гб, стало 10.3 Гб, количество записей одно и то же, состав данных тот же. Почему? Вот и ложка дегтя!
Обратите внимание на размер Output
Функция sortWithinPartitions() производит сортировку по полю или нескольким полям внутри каждого блока, т.е. никакой перетасовки не происходит все работает в рамках одного исполнителя в его памяти. После применения этой функции к нашей трансформации для сортировки по нескольким полям, общий размер файлов на выходе стал даже немного меньше, чем изначально. Теперь у нас все работает быстро, размер файлов на выходе нас устраивает. Кроме того, в этом случае мы записали в HDFS файлы примерно одинакового размера (это следствие repartition() ), что может быть удобно для дальнейшей обработки.
О работе оптимизатора
Раз уж мы коснулись файла формата parquet, на нем и посмотрим, как работает оптимизатор Spark на примере такого правила оптимизатора, как predicate pushdown и projection pushdown.
Рассмотрим правило оптимизатора – спуск условия (predicate pushdown). Принцип этой оптимизации достаточно прост: данных у нас много и ни к чему их обрабатывать, если они в конце концов не пригодятся, например должны быть отфильтрованы в конце выполнения нашего дерева запроса. Все условия и фильтры оптимизатор старается по возможности спустить на уровни ниже – ближе к источникам данных, в идеале до непосредственного чтения файла (или, например, запроса к RDBMS).
Вот физический план выполнения запроса, который при этом генерируется:
Обратим внимание на блок непосредственного чтения из файла (FileScan parquet) и блок PushedFilters – это те условия, которые будут накладываться при физическом чтении файла. Видим, что сюда попали три условия:
Файл parquet внутри
Важно понимать, что условие можно спустить на уровни ниже только, если оно заранее известно перед началом выполнения запроса.
Пример из реальной жизни.
Из HDFS читаются все партиции
Странно, но на первом этапе Spark вычитывает все партиции ( PartitionCount =121 ), хотя мы передаем фильтр, который состоит только из одного значения. Это как раз тот случай, когда при построении дерева запроса Spark не знает о фильтре вообще, ведь он скрыт за JOIN.
То есть фильтр у нас теперь представляет простое выражение:
И если посмотреть на план выполнения запроса, так как при его построении оптимизатор уже знает строку фильтра и условие, он спускает это условие на уровень чтения из файла. Кроме того, зная, что это партицирующее поле, применяет правило partition pruning, то есть выкидывает из рассмотрения партиции, не удовлетворяющие фильтру.
Читается из HDFS только одна партиция
Обратите внимание, что наше условие теперь находится в блоке PartitionFilters, так как поле партицирующее, из HDFS вычитывается только нужная нам партиция ( PartitionCount = 1 ).
Поэтому, если у вас есть большая таблица с партицированием и вы отбираете некоторые партиции через JOIN, то возможно будет лучше отдельным действием сформировать список значений из этой таблицы фильтра в виде строки и передать его в виде константы в условие основного запроса.
Когда оптимизатор может навредить
Отличная работа оптимизатора… Но иногда его стремление спустить условие как можно ниже к источнику может навредить. На сцену выходит UDF (user defined function). Функция, определенная пользователем, это черный ящик для оптимизатора Spark.
Рассмотрим следующий пример:
Имеем большой файл с несколькими миллиардами строк. Мы хотим отобрать только уникальные id и применить к ним нашу UDF, далее отобрать только те результаты, которые будут Null. Последовательность запросов:
Уникальных значений id в таблице всего несколько тысяч, а наша UDF работает не быстро – ходит в HBase. То есть мы, построив такое дерево запроса, рассчитываем, что наша UDF будет вызвана несколько тысяч раз. Запускаем запрос и долго-долго ждем.
Смотрим план выполнения запроса:
Условие с UDF спустилось почти на самы нижний уровень
… ой! Оптимизатор постарался на славу: он честно спустил наше условие isNull(UDF(id)) на уровень сразу после непосредственного чтения файла, даже до того момента, когда мы отбираем только уникальные id. А это значит, что наша тяжелая UDF пыталась выполниться миллиарды раз вместо тысяч.
Получили то, что и хотели в начале – UDF вычисляется только для уникальных id:
Заключение
За рамками этой статьи остались вопросы, связанные с оптимизацией JOIN: broadcast, data skew, с плюсами и минусами coalesce и repartition. Некоторые моменты достаточно подробно описаны на Хабр, а некоторые – нет. Возможно, это будет темой следующей статьи. Спасибо за внимание.





