Table utility commands
Delta tables support a number of utility commands.
For many Delta Lake operations, you enable integration with Apache Spark DataSourceV2 and Catalog APIs (since 3.0) by setting configurations when you create a new SparkSession
. See Configure SparkSession.
In this article:
Remove files no longer referenced by a Delta table
You can remove files no longer referenced by a Delta table and are older than the retention
threshold by running the vacuum
command on the table. vacuum
is not triggered automatically. The
default retention threshold for the files is 7 days.
Important
vacuum
deletes only data files, not log files. Log files are deleted automatically and asynchronously after checkpoint operations. The default retention period of log files is 30 days, configurable through thedelta.logRetentionDuration
property which you set with theALTER TABLE SET TBLPROPERTIES
SQL method. See Table properties.- The ability to time travel back to a version older than the retention period is lost after running
vacuum
.
VACUUM eventsTable -- vacuum files not required by versions older than the default retention period
VACUUM '/data/events' -- vacuum files in path-based table
VACUUM delta.`/data/events/`
VACUUM delta.`/data/events/` RETAIN 100 HOURS -- vacuum files not required by versions more than 100 hours old
VACUUM eventsTable DRY RUN -- do dry run to get the list of files to be deleted
See Configure SparkSession for the steps to enable support for SQL commands.
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, pathToTable) # path-based tables, or
deltaTable = DeltaTable.forName(spark, tableName) # Hive metastore-based tables
deltaTable.vacuum() # vacuum files not required by versions older than the default retention period
deltaTable.vacuum(100) # vacuum files not required by versions more than 100 hours old
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToTable)
deltaTable.vacuum() // vacuum files not required by versions older than the default retention period
deltaTable.vacuum(100) // vacuum files not required by versions more than 100 hours old
import io.delta.tables.*;
import org.apache.spark.sql.functions;
DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);
deltaTable.vacuum(); // vacuum files not required by versions older than the default retention period
deltaTable.vacuum(100); // vacuum files not required by versions more than 100 hours old
Note
When using VACUUM
, to configure Spark to delete files in parallel (based on the number of shuffle partitions) set the session configuration "spark.databricks.delta.vacuum.parallelDelete.enabled"
to "true"
.
See the Delta Lake API reference for Scala, Java, and Python syntax details.
Warning
We do not recommend that you set a retention interval shorter than 7 days, because old snapshots
and uncommitted files can still be in use by concurrent readers or writers to the table. If
vacuum
cleans up active files, concurrent readers can fail or, worse, tables can be corrupted
when vacuum
deletes files that have not yet been committed.
Delta Lake has a safety check to prevent you from running a dangerous vacuum
command. If you are
certain that there are no operations being performed on this table that take longer than the
retention interval you plan to specify, you can turn off this safety check by setting the Apache Spark
configuration property spark.databricks.delta.retentionDurationCheck.enabled
to false
. You
must choose an interval that is longer than the longest running concurrent transaction and the
longest period that any stream can lag behind the most recent update to the table.
Retrieve Delta table history
You can retrieve information on the operations, user, timestamp, and so on for each write to a Delta table
by running the history
command. The operations are returned in reverse chronological order. By default table history is retained for 30 days.
DESCRIBE HISTORY '/data/events/' -- get the full history of the table
DESCRIBE HISTORY delta.`/data/events/`
DESCRIBE HISTORY '/data/events/' LIMIT 1 -- get the last operation only
DESCRIBE HISTORY eventsTable
See Configure SparkSession for the steps to enable support for SQL commands in Apache Spark.
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, pathToTable)
fullHistoryDF = deltaTable.history() # get the full history of the table
lastOperationDF = deltaTable.history(1) # get the last operation
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToTable)
val fullHistoryDF = deltaTable.history() // get the full history of the table
val lastOperationDF = deltaTable.history(1) // get the last operation
import io.delta.tables.*;
DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);
DataFrame fullHistoryDF = deltaTable.history(); // get the full history of the table
DataFrame lastOperationDF = deltaTable.history(1); // fetch the last operation on the DeltaTable
See the Delta Lake API reference for Scala/Java/Python syntax details.
History schema
The output of the history
operation has the following columns.
Column | Type | Description |
---|---|---|
version | long | Table version generated by the operation. |
timestamp | timestamp | When this version was committed. |
userId | string | ID of the user that ran the operation. |
userName | string | Name of the user that ran the operation. |
operation | string | Name of the operation. |
operationParameters | map | Parameters of the operation (for example, predicates.) |
job | struct | Details of the job that ran the operation. |
notebook | struct | Details of notebook from which the operation was run. |
clusterId | string | ID of the cluster on which the operation ran. |
readVersion | long | Version of the table that was read to perform the write operation. |
isolationLevel | string | Isolation level used for this operation. |
isBlindAppend | boolean | Whether this operation appended data. |
operationMetrics | map | Metrics of the operation (for example, number of rows and files modified.) |
userMetadata | string | User-defined commit metadata if it was specified |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| null| null| DELETE|[predicate -> ["(...|null| null| null| 4| Serializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| null| null| UPDATE|[predicate -> (id...|null| null| null| 3| Serializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| null| null| DELETE|[predicate -> ["(...|null| null| null| 2| Serializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| null| null| UPDATE|[predicate -> (id...|null| null| null| 1| Serializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| null| null| DELETE|[predicate -> ["(...|null| null| null| 0| Serializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| null| null| WRITE|[mode -> ErrorIfE...|null| null| null| null| Serializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
Note
- Some of the columns may be nulls because the corresponding information may not be available in your environment.
- Columns added in the future will always be added after the last column.
Operation metrics keys
The history
operation returns a collection of operations metrics in the operationMetrics
column map.
The following table lists the map key definitions by operation.
Operation | Metric name | Description |
---|---|---|
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO | ||
numFiles | Number of files written. | |
numOutputBytes | Size in bytes of the written contents. | |
numOutputRows | Number of rows written. | |
STREAMING UPDATE | ||
numAddedFiles | Number of files added. | |
numRemovedFiles | Number of files removed. | |
numOutputRows | Number of rows written. | |
numOutputBytes | Size of write in bytes. | |
DELETE | ||
numAddedFiles | Number of files added. Not provided when partitions of the table are deleted. | |
numRemovedFiles | Number of files removed. | |
numDeletedRows | Number of rows removed. Not provided when partitions of the table are deleted. | |
numCopiedRows | Number of rows copied in the process of deleting files. | |
executionTimeMs | Time taken to execute the entire operation. | |
scanTimeMs | Time taken to scan the files for matches. | |
rewriteTimeMs | Time taken to rewrite the matched files. | |
TRUNCATE | ||
numRemovedFiles | Number of files removed. | |
executionTimeMs | Time taken to execute the entire operation. | |
MERGE | ||
numSourceRows | Number of rows in the source DataFrame. | |
numTargetRowsInserted | Number of rows inserted into the target table. | |
numTargetRowsUpdated | Number of rows updated in the target table. | |
numTargetRowsDeleted | Number of rows deleted in the target table. | |
numTargetRowsCopied | Number of target rows copied. | |
numOutputRows | Total number of rows written out. | |
numTargetFilesAdded | Number of files added to the sink(target). | |
numTargetFilesRemoved | Number of files removed from the sink(target). | |
executionTimeMs | Time taken to execute the entire operation. | |
scanTimeMs | Time taken to scan the files for matches. | |
rewriteTimeMs | Time taken to rewrite the matched files. | |
UPDATE | ||
numAddedFiles | Number of files added. | |
numRemovedFiles | Number of files removed. | |
numUpdatedRows | Number of rows updated. | |
numCopiedRows | Number of rows just copied over in the process of updating files. | |
executionTimeMs | Time taken to execute the entire operation. | |
scanTimeMs | Time taken to scan the files for matches. | |
rewriteTimeMs | Time taken to rewrite the matched files. | |
FSCK | numRemovedFiles | Number of files removed. |
CONVERT | numConvertedFiles | Number of Parquet files that have been converted. |
Retrieve Delta table details
You can retrieve detailed information about a Delta table (for example, number of files, data size) using DESCRIBE DETAIL
.
DESCRIBE DETAIL '/data/events/'
DESCRIBE DETAIL eventsTable
See Configure SparkSession for the steps to enable support for SQL commands in Apache Spark.
Detail schema
The output of this operation has only one row with the following schema.
Column | Type | Description |
---|---|---|
format | string | Format of the table, that is, “delta”. |
id | string | Unique ID of the table. |
name | string | Name of the table as defined in the metastore. |
description | string | Description of the table. |
location | string | Location of the table. |
createdAt | timestamp | When the table was created. |
lastModified | timestamp | When the table was last modified. |
partitionColumns | array of strings | Names of the partition columns if the table is partitioned. |
numFiles | long | Number of the files in the latest version of the table. |
properties | string-string map | All the properties set for this table. |
minReaderVersion | int | Minimum version of readers (according to the log protocol) that can read the table. |
minWriterVersion | int | Minimum version of writers (according to the log protocol) that can write to the table. |
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
|format| id| name|description| location| createdAt| lastModified|partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
| delta|d31f82d2-a69f-42e...|default.deltatable| null|file:/Users/tdas/...|2020-06-05 12:20:...|2020-06-05 12:20:20| []| 10| 12345| []| 1| 2|
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
Generate a manifest file
You can a generate manifest file for a Delta table that can be used by other processing engines (that is, other than Apache Spark) to read the Delta table. For example, to generate a manifest file that can be used by Presto and Athena to read a Delta table, you run the following:
GENERATE symlink_format_manifest FOR TABLE delta.`<path-to-delta-table>`
See Configure SparkSession for the steps to enable support for SQL commands in Apache Spark.
deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")
val deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")
DeltaTable deltaTable = DeltaTable.forPath(<path-to-delta-table>);
deltaTable.generate("symlink_format_manifest");
Convert a Parquet table to a Delta table
Convert a Parquet table to a Delta table in-place. This command lists all the files in the directory, creates a Delta Lake transaction log that tracks these files, and automatically infers the data schema by reading the footers of all Parquet files. If your data is partitioned, you must specify the schema of the partition columns as a DDL-formatted string (that is, <column-name1> <type>, <column-name2> <type>, ...
).
Note
If a Parquet table was created by Structured Streaming, the listing of files can be avoided by using the _spark_metadata
sub-directory as the source of truth for files contained in the table setting the SQL configuration spark.databricks.delta.convert.useMetadataLog
to true
.
-- Convert unpartitioned Parquet table at path '<path-to-table>'
CONVERT TO DELTA parquet.`<path-to-table>`
-- Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
CONVERT TO DELTA parquet.`<path-to-table>` PARTITIONED BY (part int, part2 int)
See Configure SparkSession for the steps to enable support for SQL commands in Apache Spark.
from delta.tables import *
# Convert unpartitioned Parquet table at path '<path-to-table>'
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`")
# Convert partitioned parquet table at path '<path-to-table>' and partitioned by integer column named 'part'
partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int")
import io.delta.tables._
// Convert unpartitioned Parquet table at path '<path-to-table>'
val deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`")
// Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
val partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int, part2 int")
import io.delta.tables.*;
// Convert unpartitioned Parquet table at path '<path-to-table>'
DeltaTable deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`");
// Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
DeltaTable deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int, part2 int");
Note
Any file not tracked by Delta Lake is invisible and can be deleted when you run vacuum
. You should avoid updating or appending data files during the conversion process. After the table is converted, make sure all writes go through Delta Lake.
Convert a Delta table to a Parquet table
You can easily convert a Delta table back to a Parquet table using the following steps:
- If you have performed Delta Lake operations that can change the data files (for example,
delete
ormerge
), run vacuum with retention of 0 hours to delete all data files that do not belong to the latest version of the table. - Delete the
_delta_log
directory in the table directory.