Table batch reads and writes
Delta Lake supports most of the options provided by Apache Spark DataFrame read and write APIs for performing batch reads and writes on tables.
For many Delta Lake operations on tables, you enable integration with Apache Spark DataSourceV2 and Catalog APIs (since 3.0) by setting configurations when you create a new SparkSession
. See Configure SparkSession.
In this article:
Create a table
Delta Lake supports creating two types of tables—tables defined in the metastore and tables defined by path.
To work with metastore-defined tables, you must enable integration with Apache Spark DataSourceV2 and Catalog APIs by setting configurations when you create a new SparkSession
. See Configure SparkSession.
You can create tables in the following ways.
SQL DDL commands: You can use standard SQL DDL commands supported in Apache Spark (for example,
CREATE TABLE
andREPLACE TABLE
) to create Delta tables.CREATE IF NOT EXISTS TABLE events ( date DATE, eventId STRING, eventType STRING, data STRING) USING DELTA CREATE OR REPLACE TABLE events ( date DATE, eventId STRING, eventType STRING, data STRING) USING DELTA
SQL also supports creating a table at a path, without creating an entry in the Hive metastore.
-- Create or replace table with path CREATE OR REPLACE TABLE delta.`/delta/events` ( date DATE, eventId STRING, eventType STRING, data STRING) USING DELTA
DataFrameWriter
API: If you want to simultaneously create a table and insert data into it from Spark DataFrames or Datasets, you can use the SparkDataFrameWriter
(Scala or Java and Python).# Create table in the metastore using DataFrame's schema and write data to it df.write.format("delta").saveAsTable("events") # Create or replace partitioned table with path using DataFrame's schema and write/overwrite data to it df.write.format("delta").mode("overwrite").save("/delta/events")
// Create table in the metastore using DataFrame's schema and write data to it df.write.format("delta").saveAsTable("events") // Create table with path using DataFrame's schema and write data to it df.write.format("delta").mode("overwrite").save("/delta/events")
You can also create Delta tables using the Spark
DataFrameWriterV2
API.DeltaTableBuilder
API: You can also use theDeltaTableBuilder
API in Delta Lake to create tables. Compared to the DataFrameWriter APIs, this API makes it easier to specify additional information like column comments, table properties, and generated columns.Note
This feature is new and is in Preview.
# Create table in the metastore DeltaTable.createIfNotExists(spark) \ .tableName("event") \ .addColumn("date", DateType()) \ .addColumn("eventId", "STRING") \ .addColumn("eventType", StringType()) \ .addColumn("data", "STRING", comment = "event data") \ .execute() # Create or replace table with path and add properties DeltaTable.createOrReplace(spark) \ .addColumn("date", DateType()) \ .addColumn("eventId", "STRING") \ .addColumn("eventType", StringType()) \ .addColumn("data", "STRING", comment = "event data") \ .property("description", "table with event data") \ .location("/delta/events") \ .execute()
// Create table in the metastore DeltaTable.createOrReplace(spark) .tableName("event") .addColumn("date", DateType) .addColumn("eventId", "STRING") .addColumn("eventType", StringType) .addColumn( DeltaTable.columnBuilder("data") .dataType("STRING") .comment("event data") .build()) .execute() // Create or replace table with path and add properties DeltaTable.createOrReplace(spark) .addColumn("date", DateType) .addColumn("eventId", "STRING") .addColumn("eventType", StringType) .addColumn( DeltaTable.columnBuilder("data") .dataType("STRING") .comment("event data") .build()) .location("/delta/events") .property("description", "table with event data") .execute()
Refer to the API documentation for details.
Partition data
You can partition data to speed up queries or DML that have predicates involving the partition columns. To partition data when you create a Delta table, specify partition by columns. A common pattern is to partition by date, for example:
-- Create table in the metastore
CREATE TABLE events (
date DATE,
eventId STRING,
eventType STRING,
data STRING)
USING DELTA
PARTITIONED BY (date)
df.write.format("delta").partitionBy("date").saveAsTable("events")
DeltaTable.create(spark) \
.tableName("event") \
.addColumn("date", DateType()) \
.addColumn("eventId", "STRING") \
.addColumn("eventType", StringType()) \
.addColumn("data", "STRING") \
.partitionedBy("date") \
.execute()
df.write.format("delta").partitionBy("date").saveAsTable("events")
DeltaTable.createOrReplace(spark)
.tableName("event")
.addColumn("date", DateType)
.addColumn("eventId", "STRING")
.addColumn("eventType", StringType)
.addColumn("data", "STRING")
.partitionedBy("date")
.execute()
Control data location
For tables defined in the metastore, you can optionally specify the LOCATION
as a path. Tables created with a specified LOCATION
are considered unmanaged by the metastore. Unlike a managed table, where no path is specified, an unmanaged table’s files are not deleted when you DROP
the table.
When you run CREATE TABLE
with a LOCATION
that already contains data stored using Delta Lake, Delta Lake does the following:
If you specify only the table name and location, for example:
CREATE TABLE events USING DELTA LOCATION '/delta/events'
the table in the metastore automatically inherits the schema, partitioning, and table properties of the existing data. This functionality can be used to “import” data into the metastore.
If you specify any configuration (schema, partitioning, or table properties), Delta Lake verifies that the specification exactly matches the configuration of the existing data.
Important
If the specified configuration does not exactly match the configuration of the data, Delta Lake throws an exception that describes the discrepancy.
Note
The metastore is not the source of truth about the latest information of a Delta table. In fact, the table definition in the metastore may not contain all the metadata like schema and properties. It contains the location of the table, and the table’s transaction log at the location is the source of truth. If you query the metastore from a system that is not aware of this Delta-specific customization, you may see incomplete or stale table information.
Use generated columns
Note
This feature is new and is in Preview.
Delta Lake supports generated columns which are a special type of columns whose values are automatically generated based on a user-specified function over other columns in the Delta table. When you write to a table with generated columns and you do not explicitly provide values for them, Delta Lake automatically computes the values. For example, you can automatically generate a date column (for partitioning the table by date) from the timestamp column; any writes into the table need only specify the data for the timestamp column. However, if you explicitly provide values for them, the values must satisfy the constraint (<value> <=> <generation expression>) IS TRUE
or the write will fail with an error.
The following example shows how to create a table with generated columns:
DeltaTable.create(spark)
.tableName("event")
.addColumn("eventId", LongType())
.addColumn("eventTime", TimestampType())
.addColumn("date", DateType(), generatedAlwaysAs="CAST(eventTime AS DATE)")
.partitionedBy("date")
.execute()
DeltaTable.create(spark)
.tableName("events")
.addColumn("eventId", LongType)
.addColumn("eventTime", TimestampType)
.addColumn(
DeltaTable.columnBuilder("date")
.dataType(DateType)
.generatedAlwaysAs("CAST(eventTime AS DATE)")
.build()
)
.partitionedBy("date")
.execute()
Generated columns are stored as if they were normal columns. That is, they occupy storage.
The following restrictions apply to generated columns:
- A generation expression can use any SQL functions in Spark that always return the same result when given the same argument values, except the following types of functions:
- User-defined functions.
- Aggregate functions.
- Window functions.
- Functions returning multiple rows.
MERGE
operations do not support generated columns.
Read a table
You can load a Delta table as a DataFrame by specifying a table name or a path:
SELECT * FROM events -- query table in the metastore
SELECT * FROM delta.`/delta/events` -- query table by path
spark.table("events") # query table in the metastore
spark.read.format("delta").load("/delta/events") # query table by path
spark.table("events") // query table in the metastore
spark.read.format("delta").load("/delta/events") // create table by path
import io.delta.implicits._
spark.read.delta("/delta/events")
The DataFrame returned automatically reads the most recent snapshot of the table for any query; you never need to run REFRESH TABLE
. Delta Lake automatically uses partitioning and statistics to read the minimum amount of data when there are applicable predicates in the query.
Query an older snapshot of a table (time travel)
In this section:
Delta Lake time travel allows you to query an older snapshot of a Delta table. Time travel has many use cases, including:
- Re-creating analyses, reports, or outputs (for example, the output of a machine learning model). This could be useful for debugging or auditing, especially in regulated industries.
- Writing complex temporal queries.
- Fixing mistakes in your data.
- Providing snapshot isolation for a set of queries for fast changing tables.
This section describes the supported methods for querying older versions of tables, data retention concerns, and provides examples.
Syntax
This section shows how to query an older version of a Delta table.
SELECT * FROM events TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/delta/events` VERSION AS OF 123
DataFrameReader options
DataFrameReader options allow you to create a DataFrame from a Delta table that is fixed to a specific version of the table.
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")
For timestamp_string
, only date or timestamp strings are accepted. For example, "2019-01-01"
and "2019-01-01T00:00:00.000Z"
.
A common pattern is to use the latest state of the Delta table throughout the execution of <a Databricks> job to update downstream applications.
Because Delta tables auto update, a DataFrame loaded from a Delta table may return different results across invocations if the underlying data is updated. By using time travel, you can fix the data returned by the DataFrame across invocations:
latest_version = spark.sql("SELECT max(version) FROM (DESCRIBE HISTORY delta.`/delta/events`)").collect()
df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/delta/events")
Data retention
By default, Delta tables retain the commit history for 30 days. This means that you can specify a version from 30 days ago. However, there are some caveats:
- You did not run VACUUM on your Delta table. If you run
VACUUM
, you lose the ability to go back to a version older than the default 7 day data retention period.
You can configure retention periods using the following table properties:
delta.logRetentionDuration = "interval <interval>"
: controls how long the history for a table is kept. Each time a checkpoint is written, <Databricks> automatically cleans up log entries older than the retention interval. If you set this config to a large enough value, many log entries are retained. This should not impact performance as operations against the log are constant time. Operations on history are parallel (but will become more expensive as the log size increases). The default isinterval 30 days
.delta.deletedFileRetentionDuration = "interval <interval>"
: controls how long ago a file must have been deleted before being a candidate forVACUUM
. The default isinterval 7 days
. For access to 30 days of historical data, setdelta.deletedFileRetentionDuration = "interval 30 days"
. This setting may cause your storage costs to go up.Note
VACUUM
doesn’t clean up log files; log files are automatically cleaned up after checkpoints are written.
To time travel to a previous version, you must retain both the log and the data files for that version.
Examples
Fix accidental deletes to a table for the user
111
:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
Fix accidental incorrect updates to a table:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
Query the number of new customers added over the last week.
SELECT count(distinct userId) - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Write to a table
Append
Using append
mode you can atomically add new data to an existing Delta table:
INSERT INTO events SELECT * FROM newEvents
df.write.format("delta").mode("append").save("/delta/events")
df.write.format("delta").mode("append").saveAsTable("events")
df.write.format("delta").mode("append").save("/delta/events")
df.write.format("delta").mode("append").saveAsTable("events")
import io.delta.implicits._
df.write.mode("append").delta("/delta/events")
Overwrite
To atomically replace all of the data in a table, you can use overwrite
mode:
INSERT OVERWRITE TABLE events SELECT * FROM newEvents
df.write.format("delta").mode("overwrite").save("/delta/events")
df.write.format("delta").mode("overwrite").saveAsTable("events")
df.write.format("delta").mode("overwrite").save("/delta/events")
df.write.format("delta").mode("overwrite").saveAsTable("events")
import io.delta.implicits._
df.write.mode("overwrite").delta("/delta/events")
Using DataFrames, you can also selectively overwrite only the data that matches predicates over partition columns. The following command atomically replaces the month of January with the data in df
:
df.write \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'") \
.save("/delta/events")
df.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
.save("/delta/events")
This sample code writes out the data in df
, validates that it all falls within the specified partitions, and performs an atomic replacement.
Note
Unlike the file APIs in Apache Spark, Delta Lake remembers and enforces the schema of a table. This means that by default overwrites do not replace the schema of an existing table.
For Delta Lake support for updating tables, see Table deletes, updates, and merges.
Set user-defined commit metadata
You can specify user-defined strings as metadata in commits made by these operations, either using the DataFrameWriter option userMetadata
or the SparkSession configuration spark.databricks.delta.commitInfo.userMetadata
. If both of them have been specified, then the option takes preference. This user-defined metadata is readable in the history operation.
SET spark.databricks.delta.commitInfo.userMetadata=overwritten-for-fixing-incorrect-data
INSERT OVERWRITE events SELECT * FROM newEvents
df.write.format("delta") \
.mode("overwrite") \
.option("userMetadata", "overwritten-for-fixing-incorrect-data") \
.save("/delta/events")
df.write.format("delta")
.mode("overwrite")
.option("userMetadata", "overwritten-for-fixing-incorrect-data")
.save("/delta/events")
Schema validation
Delta Lake automatically validates that the schema of the DataFrame being written is compatible with the schema of the table. Delta Lake uses the following rules to determine whether a write from a DataFrame to a table is compatible:
- All DataFrame columns must exist in the target table. If there are columns in the DataFrame not present in the table, an exception is raised. Columns present in the table but not in the DataFrame are set to null.
- DataFrame column data types must match the column data types in the target table. If they don’t match, an exception is raised.
- DataFrame column names cannot differ only by case. This means that you cannot have columns such as “Foo” and “foo” defined in the same table. While you can use Spark in case sensitive or insensitive (default) mode, Parquet is case sensitive when storing and returning column information. Delta Lake is case-preserving but insensitive when storing the schema and has this restriction to avoid potential mistakes, data corruption, or loss issues.
Delta Lake support DDL to add new columns explicitly and the ability to update schema automatically.
If you specify other options, such as partitionBy
, in combination with append mode, Delta Lake validates that they match and throws an error for any mismatch. When partitionBy
is not present, appends automatically follow the partitioning of the existing data.
Update table schema
Delta Lake lets you update the schema of a table. The following types of changes are supported:
- Adding new columns (at arbitrary positions)
- Reordering existing columns
You can make these changes explicitly using DDL or implicitly using DML.
Important
When you update a Delta table schema, streams that read from that table terminate. If you want the stream to continue you must restart it.
Explicitly update schema
You can use the following DDL to explicitly change the schema of a table.
Add columns
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
By default, nullability is true
.
To add a column to a nested field, use:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Example
If the schema before running ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)
is:
- root
| - colA
| - colB
| +-field1
| +-field2
the schema after is:
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
Note
Adding nested columns is supported only for structs. Arrays and maps are not supported.
Change column comment or ordering
ALTER TABLE table_name CHANGE [COLUMN] col_name col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]
To change a column in a nested field, use:
ALTER TABLE table_name CHANGE [COLUMN] col_name.nested_col_name nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]
Replace columns
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
Example
When running the following DSL:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
if the schema before is:
- root
| - colA
| - colB
| +-field1
| +-field2
the schema after is:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
Change column type or name
Changing a column’s type or name or dropping a column requires rewriting the table. To do this, use the overwriteSchema
option:
Automatic schema update
Delta Lake can automatically update the schema of a table as part of a DML transaction (either appending or overwriting), and make the schema compatible with the data being written.
Add columns
Columns that are present in the DataFrame but missing from the table are automatically added as part of a write transaction when:
write
orwriteStream
have.option("mergeSchema", "true")
spark.databricks.delta.schema.autoMerge.enabled
istrue
When both options are specified, the option from the DataFrameWriter
takes precedence. The added columns are appended to the end of the struct they are present in. Case is preserved when appending a new column.
NullType
columns
Because Parquet doesn’t support NullType
, NullType
columns are dropped from the DataFrame when writing into Delta tables, but are still stored in the schema. When a different data type is received for that column, Delta Lake merges the schema to the new data type. If Delta Lake receives a NullType
for an existing column, the old schema is retained and the new column is dropped during the write.
NullType
in streaming is not supported. Since you must set schemas when using streaming this should be very rare. NullType
is also not accepted for complex types such as ArrayType
and MapType
.
Replace table schema
By default, overwriting the data in a table does not overwrite the schema. When overwriting a table using mode("overwrite")
without replaceWhere
, you may still want to overwrite the schema of the data being written. You replace the schema and partitioning of the table by setting the overwriteSchema
option to true
:
df.write.option("overwriteSchema", "true")
Views on tables
Delta Lake supports the creation of views on top of Delta tables just like you might with a data source table.
The core challenge when you operate with views is resolving the schemas. If you alter a Delta table schema, you must recreate derivative views to account for any additions to the schema. For instance, if you add a new column to a Delta table, you must make sure that this column is available in the appropriate views built on top of that base table.
Table properties
You can store your own metadata as a table property using TBLPROPERTIES
in CREATE
and ALTER
.
TBLPROPERTIES
are stored as part of Delta table metadata. You cannot define new TBLPROPERTIES
in a CREATE
statement if a Delta table already exists in a given location. See table creation for more details.
In addition, to tailor behavior and performance, Delta Lake supports certain Delta table properties:
- Block deletes and updates in a Delta table:
delta.appendOnly=true
. - Configure the time travel retention properties:
delta.logRetentionDuration=<interval-string>
anddelta.deletedFileRetentionDuration=<interval-string>
. For details, see Data retention.
Note
- Modifying a Delta table property is a write operation that will conflict with other concurrent write operations, causing them to fail. We recommend that you modify a table property only when there are no concurrent write operations on the table.
You can also set delta.
-prefixed properties during the first commit to a Delta table using Spark configurations. For example, to initialize a Delta table with the property delta.appendOnly=true
, set the Spark configuration spark.databricks.delta.properties.defaults.appendOnly
to true
. For example:
spark.sql("SET spark.databricks.delta.properties.defaults.appendOnly = true")
spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")
spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")
Table metadata
Delta Lake has rich features for exploring table metadata.
It supports DESCRIBE TABLE
, SHOW COLUMNS
, SHOW PARTITIONS
, and so on.
It also provides the following unique commands:
DESCRIBE DETAIL
Provides information about schema, partitioning, table size, and so on. For details, see Retrieve Delta table details.
DESCRIBE HISTORY
Provides provenance information, including the operation, user, and so on, and operation metrics for each write to a table. Table history is retained for 30 days. For details, see Retrieve Delta table history.
Configure SparkSession
For many Delta Lake operations, you enable integration with Apache Spark DataSourceV2 and Catalog APIs (since 3.0) by setting the following configurations when you create a new SparkSession
.
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("...") \
.master("...") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("...")
.master("...")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("...")
.master("...")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate();
Alternatively, you can add configurations when submitting your Spark application using spark-submit
or when starting spark-shell
or pyspark
by specifying them as command-line parameters.
spark-submit --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" ...
pyspark --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"