Table Utility Commands
Delta tables support a number of utility commands.
Vacuum
You can remove files no longer referenced by a Delta table and are older than the retention
threshold by running the vacuum
command on the table. vacuum
is not triggered automatically. The
default retention threshold for the files is 7 days.
Important
The ability to time travel back to a version older than the retention period is lost after running vacuum
.
SQL
VACUUM '/data/events' -- vacuum files not required by versions older than the default retention period
VACUUM delta.`/data/events/`
VACUUM delta.`/data/events/` RETAIN 100 HOURS -- vacuum files not required by versions more than 100 hours old
See Enable SQL commands within Apache Spark for the steps to enable support for SQL commands in Apache Spark.
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToTable)
deltaTable.vacuum() // vacuum files not required by versions older than the default retention period
deltaTable.vacuum(100) // vacuum files not required by versions more than 100 hours old
Java
import io.delta.tables.*;
import org.apache.spark.sql.functions;
DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);
deltaTable.vacuum(); // vacuum files not required by versions older than the default retention period
deltaTable.vacuum(100); // vacuum files not required by versions more than 100 hours old
Python
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, pathToTable)
deltaTable.vacuum() # vacuum files not required by versions older than the default retention period
deltaTable.vacuum(100) # vacuum files not required by versions more than 100 hours old
See the Delta Lake API Reference for details.
Warning
We do not recommend that you set a retention interval shorter than 7 days, because old snapshots
and uncommitted files can still be in use by concurrent readers or writers to the table. If
VACUUM
cleans up active files, concurrent readers can fail or, worse, tables can be corrupted
when VACUUM
deletes files that have not yet been committed.
Delta Lake has a safety check to prevent you from running a dangerous VACUUM
command. If you are
certain that there are no operations being performed on this table that take longer than the
retention interval you plan to specify, you can turn off this safety check by setting the Apache Spark
configuration property spark.databricks.delta.retentionDurationCheck.enabled
to false
. You
must choose an interval that is longer than the longest running concurrent transaction and the
longest period that any stream can lag behind the most recent update to the table.
History
You can retrieve information on the operations, user, timestamp, and so on for each write to a Delta table by running the history
command. The operations are returned in reverse chronological order. By default table history is retained for 30 days.
SQL
DESCRIBE HISTORY '/data/events/' -- get the full history of the table
DESCRIBE HISTORY delta.`/data/events/`
DESCRIBE HISTORY '/data/events/' LIMIT 1 -- get the last operation only
See Enable SQL commands within Apache Spark for the steps to enable support for SQL commands in Apache Spark.
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToTable)
val fullHistoryDF = deltaTable.history() // get the full history of the table
val lastOperationDF = deltaTable.history(1) // get the last operation
Java
import io.delta.tables.*;
DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);
DataFrame fullHistoryDF = deltaTable.history(); // get the full history of the table
DataFrame lastOperationDF = deltaTable.history(1); // fetch the last operation on the DeltaTable
Python
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, pathToTable)
fullHistoryDF = deltaTable.history() # get the full history of the table
lastOperationDF = deltaTable.history(1) # get the last operation
See the Delta Lake API Reference for details.
The returned data has the following structure.
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
| 5|2019-07-29 14:07:47| null| null| DELETE|[predicate -> ["(...|null| null| null| 4| null| false|
| 4|2019-07-29 14:07:41| null| null| UPDATE|[predicate -> (id...|null| null| null| 3| null| false|
| 3|2019-07-29 14:07:29| null| null| DELETE|[predicate -> ["(...|null| null| null| 2| null| false|
| 2|2019-07-29 14:06:56| null| null| UPDATE|[predicate -> (id...|null| null| null| 1| null| false|
| 1|2019-07-29 14:04:31| null| null| DELETE|[predicate -> ["(...|null| null| null| 0| null| false|
| 0|2019-07-29 14:01:40| null| null| WRITE|[mode -> ErrorIfE...|null| null| null| null| null| true|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
Generate
You can generate manifest files for Delta tables that can be used by other processing engines (that is, other than Apache Spark) to read the Delta tables. For example, to generated manifest files that can be used by Presto to read Delta tables, you can run the following:
SQL
GENERATE symlink_format_manifest FOR TABLE delta.`pathToDeltaTable`
See Enable SQL commands within Apache Spark for the steps to enable support for SQL commands in Apache Spark.
Scala
val deltaTable = DeltaTable.forPath(pathToDeltaTable)
deltaTable.generate("symlink_format_manifest")
Java
DeltaTable deltaTable = DeltaTable.forPath(pathToDeltaTable);
deltaTable.generate("symlink_format_manifest");
Python
deltaTable = DeltaTable.forPath(pathToDeltaTable)
deltaTable.generate("symlink_format_manifest")
See the Delta Lake API Reference for details. In addition, see Presto and Athena to Delta Lake Integration for more information how to configure Presto to read Delta tables.
Convert to Delta
Convert an existing Parquet table to a Delta table in-place. This command lists all the files in the directory, creates a Delta Lake transaction log that tracks these files, and automatically infers the data schema by reading the footers of all Parquet files. If your data is partitioned, you must specify the schema of the partition columns.
SQL
-- Convert unpartitioned parquet table at path 'path/to/table'
CONVERT TO DELTA parquet.`path/to/table`
-- Convert partitioned parquet table at path 'path/to/table' and partitioned by integer column named 'part'
CONVERT TO DELTA parquet.`path/to/table` PARTITIONED BY (part int)
See Enable SQL commands within Apache Spark for the steps to enable support for SQL commands in Apache Spark.
Scala
import io.delta.tables._
// Convert unpartitioned parquet table at path '/path/to/table'
val deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`")
// Convert partitioned parquet table at path '/path/to/table' and partitioned by integer column named 'part'
val partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`", "part int")
Java
import io.delta.tables.*;
// Convert unpartitioned parquet table at path '/path/to/table'
DeltaTable deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`");
// Convert partitioned parquet table at path '/path/to/table' and partitioned by integer column named 'part'
DeltaTable deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`", "part int");
Python
from delta.tables import *
# Convert unpartitioned parquet table at path '/path/to/table'
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`")
# Convert partitioned parquet table at path '/path/to/table' and partitioned by integer column named 'part'
partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`", "part int")
See the Delta Lake API Reference for more details.
Note
Any file not tracked by Delta Lake is invisible and can be deleted when you run vacuum
. You
should avoid updating or appending data files during the conversion process. After the table is
converted, make sure all writes go through Delta Lake.
Convert Delta table to a Parquet table
You can easily convert a Delta table back to a Parquet table by the following steps:
- If you have performed Delta Lake operations that can change the data files (for example, Delete or Merge), then first run vacuum with retention of 0 hours to delete all data files that do not belong to the latest version of the table.
- Delete the
_delta_log
directory in the table directory.
Enable SQL commands within Apache Spark
Apache Spark does not native support SQL commands that are specific to Delta Lake (for example, VACUUM and DESCRIBE HISTORY).
To enable such commands to be parsed, you must configure SparkSession
to use an extension to the SQL that will parse only Delta Lake SQL commands and fall back
to Spark’s default parser for all other SQL commands. Here are the code snippets you should use to enable our parser.
Scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("...")
.master("...")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.getOrCreate()
Java
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("...")
.master("...")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.getOrCreate();
Python
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("...") \
.master("...") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.getOrCreate()
# Apache Spark 2.4.x has a known issue (SPARK-25003) that requires explicit activation
# of the extension and cloning of the session. This will unnecessary in Apache Spark 3.x.
if spark.sparkContext.version < "3.":
spark.sparkContext()._jvm.io.delta.sql.DeltaSparkSessionExtension() \
.apply(spark._jsparkSession.extensions())
spark = SparkSession(spark.sparkContext(), spark._jsparkSession.cloneSession())