Delta Lake quickstart

This guide helps you quickly explore the main features of Delta Lake. It provides code snippets that show how to read from and write to Delta tables from interactive, batch, and streaming queries.

Set up Apache Spark with Delta Lake

Delta Lake requires Apache Spark version 2.4.2 or above. Follow the instructions below to set up Delta Lake with Spark. You can run the steps in this guide on your local machine in the following two ways:

  1. Run interactively: Start the Spark shell (Scala or Python) with Delta Lake and run the code snippets interactively in the shell.
  2. Run as a project: Set up a Maven or SBT project (Scala or Java) with Delta Lake, copy the code snippets into a source file, and run the project.

Set up interactive shell

To use Delta Lake interactively within the Spark shell you need a local installation of Apache Spark. Depending on whether you want to use Python or Scala, you can set up either PySpark or the Spark shell, respectively.

PySpark

If you need to install or upgrade PySpark, run:

pip install --upgrade pyspark

Run PySpark with the Delta Lake package:

pyspark --packages io.delta:delta-core_2.11:0.6.1

Spark Scala Shell

Download the latest version of Apache Spark (2.4.2 or above) by following instructions from Downloading Spark, either using pip or by downloading and extracting the archive and running spark-shell in the extracted directory.

Run spark-shell with the Delta Lake package:

bin/spark-shell --packages io.delta:delta-core_2.11:0.6.1

Note

If you are seeing the following error, make sure that Apache Spark and delta-core is built for the same Scala version (2.11 or 2.12). The pre-built distributions of Apache Spark 2.4.3 from the download page are built with Scala 2.11:

java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.delta.sources.DeltaDataSource could not be instantiated

See this issue for details.

Set up project

If you want to build a project using Delta Lake binaries from Maven Central Repository, you can use the following Maven coordinates.

Maven

You include Delta Lake in your Maven project by adding it as a dependency in your POM file. Delta Lake is cross compiled with Scala versions 2.11 and 2.12; choose the version that matches your project. If you are writing a Java project, you can use either version.

<dependency>
  <groupId>io.delta</groupId>
  <artifactId>delta-core_2.11</artifactId>
  <version>0.6.1</version>
</dependency>

SBT

You include Delta Lake in your SBT project by adding the following line to your build.sbt file:

libraryDependencies += "io.delta" %% "delta-core" % "0.6.1"

Python

For setting up a Python project (e.g., for unit testing), you have to first start the Spark session first with the Delta package and then import the Python APIs.

spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.jars.packages", "io.delta:delta-core_2.11:0.6.1") \
    .getOrCreate()

from delta.tables import *

Create a table

To create a Delta table, write a DataFrame out in the delta format. You can use existing Spark SQL code and change the format from parquet, csv, json, and so on, to delta.

Python

data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

Scala

val data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

Java

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

SparkSession spark = ...   // create SparkSession

Dataset<Row> data = data = spark.range(0, 5);
data.write().format("delta").save("/tmp/delta-table");

These operations create a new Delta table using the schema that was inferred from your DataFrame. For the full set of options available when you create a new Delta table, see Create a table and Write to a table.

Note

This quickstart uses local paths for Delta table locations. For configuring HDFS or cloud storage for Delta tables, see delta-storage.

Read data

You read data in your Delta table by specifying the path to the files: "/tmp/delta-table":

Python

df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

Scala

val df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

Java

Dataset<Row> df = spark.read().format("delta").load("/tmp/delta-table");
df.show();

Update table data

Delta Lake supports several operations to modify tables using standard DataFrame APIs. This example runs a batch job to overwrite the data in the table:

Overwrite

Python

data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")

Scala

val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
df.show()

Java

Dataset<Row> data = data = spark.range(5, 10);
data.write().format("delta").mode("overwrite").save("/tmp/delta-table");

If you read this table again, you should see only the values 5-9 you have added because you overwrote the previous data.

Conditional update without overwrite

Delta Lake provides programmatic APIs to conditional update, delete, and merge (upsert) data into tables. Here are a few examples.

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")

# Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })

# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))

# Upsert (merge) new data
newData = spark.range(0, 20)

deltaTable.alias("oldData") \
  .merge(
    newData.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()

deltaTable.toDF().show()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath("/tmp/delta-table")

// Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = Map("id" -> expr("id + 100")))

// Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))

// Upsert (merge) new data
val newData = spark.range(0, 20).toDF

deltaTable.as("oldData")
  .merge(
    newData.as("newData"),
    "oldData.id = newData.id")
  .whenMatched
  .update(Map("id" -> col("newData.id")))
  .whenNotMatched
  .insert(Map("id" -> col("newData.id")))
  .execute()

deltaTable.toDF.show()

Java

import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;

DeltaTable deltaTable = DeltaTable.forPath("/tmp/delta-table");

// Update every even value by adding 100 to it
deltaTable.update(
  functions.expr("id % 2 == 0"),
  new HashMap<String, Column>() {{
    put("id", functions.expr("id + 100"));
  }}
);

// Delete every even value
deltaTable.delete(condition = functions.expr("id % 2 == 0"));

// Upsert (merge) new data
Dataset<Row> newData = spark.range(0, 20).toDF();

deltaTable.as("oldData")
  .merge(
    newData.as("newData"),
    "oldData.id = newData.id")
  .whenMatched()
  .update(
    new HashMap<String, Column>() {{
      put("id", functions.col("newData.id"));
    }})
  .whenNotMatched()
  .insertExpr(
    new HashMap<String, Column>() {{
      put("id", functions.col("newData.id"));
    }})
  .execute();

deltaTable.toDF().show();

You should see that some of the existing rows have been updated and new rows have been inserted.

For more information on these operations, see Table deletes, updates, and merges.

Read older versions of data using time travel

You can query previous snapshots of your Delta table by using a feature called time travel. If you want to access the data that you overwrote, you can query a snapshot of the table before you overwrote the first set of data using the versionAsOf option.

Python

df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()

Scala

val df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()

Java

Dataset<Row> df = spark.read().format("delta").option("versionAsOf", 0).load("/tmp/delta-table");
df.show();

You should see the first set of data, from before you overwrote it. Time travel is an extremely powerful feature that takes advantage of the power of the Delta Lake transaction log to access data that is no longer in the table. Removing the version 0 option (or specifying version 1) would let you see the newer data again. For more information, see Query an older snapshot of a table (time travel).

Write a stream of data to a table

You can also write to a Delta table using Structured Streaming. The Delta Lake transaction log guarantees exactly-once processing, even when there are other streams or batch queries running concurrently against the table. By default, streams run in append mode, which adds new records to the table:

Python

streamingDf = spark.readStream.format("rate").load()
stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")

Scala

val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")

Java

import org.apache.spark.sql.streaming.StreamingQuery;

Dataset<Row> streamingDf = spark.readStream().format("rate").load();
StreamingQuery stream = streamingDf.selectExpr("value as id").writeStream().format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table");

While the stream is running, you can read the table using the earlier commands.

Note

If you’re running this in a shell, you may see the streaming task progress, which make it hard to type commands in that shell. It may be useful to start another shell in a new terminal for querying the table.

You can stop the stream by running stream.stop() in the same terminal that started the stream.

For more information about Delta Lake integration with Structured Streaming, see Table streaming reads and writes.

Read a stream of changes from a table

While the stream is writing to the Delta table, you can also read from that table as streaming source. For example, you can start another streaming query that prints all the changes made to the Delta table.

Python

stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()

Scala

val stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()

Java

StreamingQuery stream2 = spark.readStream().format("delta").load("/tmp/delta-table").writeStream().format("console").start();