Best practices
This section describes practices to improve query performance in Delta Lake.
Choose the right partition column
You can partition a Delta table by a column. The most commonly used partition column is date
.
Follow these two rules of thumb for deciding on what column to partition by:
- If the cardinality of a column will be very high, do not use that column for partitioning. For example, if you partition by a column
userId
and if there can be 1M distinct user IDs, then that is a bad partitioning strategy. - Amount of data in each partition: You can partition by a column if you expect data in that partition to be at least 1 GB.
Compact files
If you continuously write data to a Delta table, it will over time accumulate a large number of files, especially if you add data in small batches. This can have an adverse effect on the efficiency of table reads, and it can also affect the performance of your file system. Ideally, a large number of small files should be rewritten into a smaller number of larger files on a regular basis. This is known as compaction.
You can compact a table by repartitioning it to smaller number of files. In addition, you can specify the option dataChange
to be false
indicates that the operation does not change the data, only rearranges the data layout. This would ensure that other concurrent operations are minimally affected due to this compaction operation.
For example, you can compact a table into 16 files:
Scala
val path = "..."
val numFiles = 16
spark.read
.format("delta")
.load(path)
.repartition(numFiles)
.write
.option("dataChange", "false")
.format("delta")
.mode("overwrite")
.save(path)
Python
path = "..."
numFiles = 16
(spark.read
.format("delta")
.load(path)
.repartition(numFiles)
.write
.option("dataChange", "false")
.format("delta")
.mode("overwrite")
.save(path))
If your table is partitioned and you want to repartition just one partition based on a predicate, you can read only the partition using where
and write back to that using replaceWhere
:
Scala
val path = "..."
val partition = "year = '2019'"
val numFilesPerPartition = 16
spark.read
.format("delta")
.load(path)
.where(partition)
.repartition(numFilesPerPartition)
.write
.option("dataChange", "false")
.format("delta")
.mode("overwrite")
.option("replaceWhere", partition)
.save(path)
Python
path = "..."
partition = "year = '2019'"
numFilesPerPartition = 16
(spark.read
.format("delta")
.load(path)
.where(partition)
.repartition(numFilesPerPartition)
.write
.option("dataChange", "false")
.format("delta")
.mode("overwrite")
.option("replaceWhere", partition)
.save(path))
Warning
Using dataChange = false
on an operation that changes data can corrupt the data in the table.
Note
This operation does not remove the old files. To remove them, run the VACUUM command.