Migration guide

Migrate workloads to Delta Lake

When you migrate workloads to Delta Lake, you should be aware of the following simplifications and differences compared with the data sources provided by Apache Spark and Apache Hive.

Delta Lake handles the following operations automatically, which you should never perform manually:

  • Add and remove partitions: Delta Lake automatically tracks the set of partitions present in a table and updates the list as data is added or removed. As a result, there is no need to run ALTER TABLE [ADD|DROP] PARTITION or MSCK.
  • Load a single partition: As an optimization, you may sometimes directly load the partition of data you are interested in. For example, spark.read.parquet("/data/date=2017-01-01"). This is unnecessary with Delta Lake, since it can quickly read the list of files from the transaction log to find the relevant ones. If you are interested in a single partition, specify it using a WHERE clause. For example, spark.read.delta("/data").where("date = '2017-01-01'"). For large tables with many files in the partition, this can be much faster than loading a single partition (with direct partition path, or with WHERE) from a Parquet table because listing the files in the directory is often slower than reading the list of files from the transaction log.

When you port an existing application to Delta Lake, you should avoid the following operations, which bypass the transaction log:

  • Manually modify data: Delta Lake uses the transaction log to atomically commit changes to the table. Because the log is the source of truth, files that are written out but not added to the transaction log are not read by Spark. Similarly, even if you manually delete a file, a pointer to the file is still present in the transaction log. Instead of manually modifying files stored in a Delta table, always use the commands that are described in this guide.
  • External readers: Directly reading the data stored in Delta Lake. For information on how to read Delta tables, see Integrations.

Example

Suppose you have Parquet data stored in the directory /data-pipeline and want to create a table named events. You can always read into DataFrame and save as Delta table. This approach copies data and lets Spark manage the table. Alternatively you can convert to Delta Lake which is faster but results in an unmanaged table.

Save as Delta table

  1. Read the data into a DataFrame and save it to a new directory in delta format:

    data = spark.read.parquet("/data-pipeline")
    data.write.format("delta").save("/delta/data-pipeline/")
    
  2. Create a Delta table events that refers to the files in the Delta Lake directory:

    spark.sql("CREATE TABLE events USING DELTA LOCATION '/delta/data-pipeline/'")
    

Convert to Delta table

You have two options for converting a Parquet table to a Delta table:

  • Convert files to Delta Lake format and create Delta table:

    CONVERT TO DELTA parquet.`/data-pipeline/`
    CREATE TABLE events USING DELTA LOCATION '/data-pipeline/'
    
  • Create Parquet table and convert to Delta table:

    CREATE TABLE events USING PARQUET OPTIONS (path '/data-pipeline/')
    CONVERT TO DELTA events
    

For details, see _.

Migrate Delta Lake workloads to newer versions

This section discusses any changes that may be required in the user code when migrating from older to newer versions of Delta Lake.

Delta Lake 1.0 and below to 1.1 and above

If the name of a partition column in a Delta table contains invalid characters ( ,;{}()\n\t=), you cannot read it in Delta Lake 1.1 and above, due to SPARK-36271. However, this should be rare as you cannot create such tables by using Delta Lake 0.6 and above. If you still have such legacy tables, you can overwrite your tables with new valid column names by using Delta Lake 1.0 and below before upgrading Delta Lake to 1.1 and above, such as the following:

spark.read \
  .format("delta") \
  .load("/the/delta/table/path") \
  .withColumnRenamed("column name", "column-name") \
  .write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .save("/the/delta/table/path")
spark.read
  .format("delta")
  .load("/the/delta/table/path")
  .withColumnRenamed("column name", "column-name")
  .write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .save("/the/delta/table/path")

Delta Lake 0.6 and below to 0.7 and above

If you are using DeltaTable APIs in Scala, Java, or Python to update or run utility operations on them, then you may have to add the following configurations when creating the SparkSession used to perform those operations.

from pyspark.sql import SparkSession

spark = SparkSession \
  .builder \
  .appName("...") \
  .master("...") \
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
  .getOrCreate()
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("...")
  .master("...")
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
  .getOrCreate()
import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
  .builder()
  .appName("...")
  .master("...")
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
  .getOrCreate();

Alternatively, you can add additional configurations when submitting you Spark application using spark-submit or when starting spark-shell/pyspark by specifying them as command line parameters.

spark-submit --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"  ...
pyspark --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"  ...