Table utility commands

Delta tables support a number of utility commands.

Vacuum

You can remove files no longer referenced by a Delta table and are older than the retention threshold by running the vacuum command on the table. vacuum is not triggered automatically. The default retention threshold for the files is 7 days.

Important

The ability to time travel back to a version older than the retention period is lost after running vacuum.

SQL

VACUUM '/data/events' -- vacuum files not required by versions older than the default retention period

VACUUM delta.`/data/events/`

VACUUM delta.`/data/events/` RETAIN 100 HOURS  -- vacuum files not required by versions more than 100 hours old

See Enable SQL commands within Apache Spark for the steps to enable support for SQL commands in Apache Spark.

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

deltaTable.vacuum()        // vacuum files not required by versions older than the default retention period

deltaTable.vacuum(100)     // vacuum files not required by versions more than 100 hours old

Java

import io.delta.tables.*;
import org.apache.spark.sql.functions;

DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);

deltaTable.vacuum();        // vacuum files not required by versions older than the default retention period

deltaTable.vacuum(100);     // vacuum files not required by versions more than 100 hours old

Python

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, pathToTable)

deltaTable.vacuum()        # vacuum files not required by versions older than the default retention period

deltaTable.vacuum(100)     # vacuum files not required by versions more than 100 hours old

See the Delta Lake API reference for details.

Warning

We do not recommend that you set a retention interval shorter than 7 days, because old snapshots and uncommitted files can still be in use by concurrent readers or writers to the table. If VACUUM cleans up active files, concurrent readers can fail or, worse, tables can be corrupted when VACUUM deletes files that have not yet been committed.

Delta Lake has a safety check to prevent you from running a dangerous VACUUM command. If you are certain that there are no operations being performed on this table that take longer than the retention interval you plan to specify, you can turn off this safety check by setting the Apache Spark configuration property spark.databricks.delta.retentionDurationCheck.enabled to false. You must choose an interval that is longer than the longest running concurrent transaction and the longest period that any stream can lag behind the most recent update to the table.

History

You can retrieve information on the operations, user, timestamp, and so on for each write to a Delta table by running the history command. The operations are returned in reverse chronological order. By default table history is retained for 30 days.

SQL

DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only

See Enable SQL commands within Apache Spark for the steps to enable support for SQL commands in Apache Spark.

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

val fullHistoryDF = deltaTable.history()    // get the full history of the table

val lastOperationDF = deltaTable.history(1) // get the last operation

Java

import io.delta.tables.*;

DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);

DataFrame fullHistoryDF = deltaTable.history();       // get the full history of the table

DataFrame lastOperationDF = deltaTable.history(1);    // fetch the last operation on the DeltaTable

Python

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, pathToTable)

fullHistoryDF = deltaTable.history()    # get the full history of the table

lastOperationDF = deltaTable.history(1) # get the last operation

See the Delta Lake API reference for details.

The returned DataFrame has the following columns.

Column Type Description
version long Table version generated by the operation.
timestamp timestamp When this version was committed.
userId string ID of the user that ran the operation.
userName string Name of the user that ran the operation.
operation string Name of the operation.
operationParameters map Parameters of the operation (e.g., predicates.)
job struct Details of the job that ran the operation.
notebook struct Details of notebook from which the operation was run.
clusterId string ID of the cluster on which the operation ran.
readVersion long Version of the table that was read to perform the write operation.
isolationLevel string Isolation level used for this operation.
isBlindAppend boolean Whether this operation appended data.
operationMetrics map Metrics of the operation (e.g., number of rows and files modified.)
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|  null|    null|   DELETE|[predicate -> ["(...|null|    null|     null|          4|  Serializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|  null|    null|   UPDATE|[predicate -> (id...|null|    null|     null|          3|  Serializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|  null|    null|   DELETE|[predicate -> ["(...|null|    null|     null|          2|  Serializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|  null|    null|   UPDATE|[predicate -> (id...|null|    null|     null|          1|  Serializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|  null|    null|   DELETE|[predicate -> ["(...|null|    null|     null|          0|  Serializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|  null|    null|    WRITE|[mode -> ErrorIfE...|null|    null|     null|       null|  Serializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+

Note

  • Some of the columns may be nulls because the corresponding information may not be available in your environment.
  • Columns added in the future will always be added after the last column.

Operation metrics keys

The operationMetrics column is a map. The following table lists the key definitions by operation:

Operation Metric Description
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO    
  numFiles Number of files written.
  numOutputBytes Size in bytes of the written contents.
  numOutputRows Number of rows written.
STREAMING UPDATE    
  numAddedFiles Number of files added.
  numRemovedFiles Number of files removed.
  numOutputRows Number of rows written.
  numOutputBytes Size of write in bytes.
DELETE    
  numAddedFiles Number of files added. Not provided when partitions of the table are deleted.
  numRemovedFiles Number of files removed.
  numDeletedRows Number of rows removed. Not provided when partitions of the table are deleted.
  numCopiedRows Number of rows copied in the process of deleting files.
TRUNCATE numRemovedFiles Number of files removed.
MERGE    
  numSourceRows Number of rows in the source DataFrame.
  numTargetRowsInserted Number of rows inserted into the target table.
  numTargetRowsUpdated Number of rows updated in the target table.
  numTargetRowsDeleted Number of rows deleted in the target table.
  numTargetRowsCopied Number of target rows copied.
  numOutputRows Total number of rows written out.
  numTargetFilesAdded Number of files added to the sink(target).
  numTargetFilesRemoved Number of files removed from the sink(target).
UPDATE    
  numAddedFiles Number of files added.
  numRemovedFiles Number of files removed.
  numUpdatedRows Number of rows updated.
  numCopiedRows Number of rows just copied over in the process of updating files.
FSCK numRemovedFiles Number of files removed.
CONVERT numConvertedFiles Number of Parquet files that have been converted.
OPTIMIZE    
  numAddedFiles Number of files added.
  numRemovedFiles Number of files optimized.
  numAddedBytes Number of bytes added after the table was optimized.
  numRemovedBytes Number of bytes removed.
  minFileSize Size of the smallest file after the table was optimized.
  p25FileSize Size of the 25th percentile file after the table was optimized.
  p50FileSize Median file size after the table was optimized.
  p75FileSize Size of the 75th percentile file after the table was optimized.
  maxFileSize Size of the largest file after the table was optimized.

Generate

You can generate manifest files for Delta tables that can be used by other processing engines (that is, other than Apache Spark) to read the Delta tables. For example, to generated manifest files that can be used by Presto to read Delta tables, you can run the following:

SQL

GENERATE symlink_format_manifest FOR TABLE delta.`pathToDeltaTable`

See Enable SQL commands within Apache Spark for the steps to enable support for SQL commands in Apache Spark.

Scala

val deltaTable = DeltaTable.forPath(pathToDeltaTable)
deltaTable.generate("symlink_format_manifest")

Java

DeltaTable deltaTable = DeltaTable.forPath(pathToDeltaTable);
deltaTable.generate("symlink_format_manifest");

Python

deltaTable = DeltaTable.forPath(pathToDeltaTable)
deltaTable.generate("symlink_format_manifest")

See the Delta Lake API Reference for details. In addition, see Presto and Athena to Delta Lake integration for more information how to configure Presto to read Delta tables.

Convert to Delta

Convert an existing Parquet table to a Delta table in-place. This command lists all the files in the directory, creates a Delta Lake transaction log that tracks these files, and automatically infers the data schema by reading the footers of all Parquet files. If your data is partitioned, you must specify the schema of the partition columns as a DDL-formatted string (that is, <column-name1> <type>, <column-name2> <type>, ...).

SQL

-- Convert unpartitioned parquet table at path 'path/to/table'
CONVERT TO DELTA parquet.`path/to/table`

-- Convert partitioned parquet table at path 'path/to/table' and partitioned by integer columns named 'part' and 'part2'
CONVERT TO DELTA parquet.`path/to/table` PARTITIONED BY (part int, part2 int)

See Enable SQL commands within Apache Spark for the steps to enable support for SQL commands in Apache Spark.

Scala

import io.delta.tables._

// Convert unpartitioned parquet table at path '/path/to/table'
val deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`")

// Convert partitioned parquet table at path '/path/to/table' and partitioned by integer columns named 'part' and 'part2'
val partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`", "part int, part2 int")

Java

import io.delta.tables.*;

// Convert unpartitioned parquet table at path '/path/to/table'
DeltaTable deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`");

// Convert partitioned parquet table at path '/path/to/table' and partitioned by integer columns named 'part' and 'part2'
DeltaTable deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`", "part int, part2 int");

Python

from delta.tables import *

# Convert unpartitioned parquet table at path '/path/to/table'
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`")

# Convert partitioned parquet table at path '/path/to/table' and partitioned by integer column named 'part'
partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`", "part int")

See the Delta Lake API reference for more details.

Note

Any file not tracked by Delta Lake is invisible and can be deleted when you run vacuum. You should avoid updating or appending data files during the conversion process. After the table is converted, make sure all writes go through Delta Lake.

Convert Delta table to a Parquet table

You can easily convert a Delta table back to a Parquet table by the following steps:

  1. If you have performed Delta Lake operations that can change the data files (for example, Delete or Merge), then first run vacuum with retention of 0 hours to delete all data files that do not belong to the latest version of the table.
  2. Delete the _delta_log directory in the table directory.

Enable SQL commands within Apache Spark

Apache Spark does not native support SQL commands that are specific to Delta Lake (for example, VACUUM and DESCRIBE HISTORY). To enable such commands to be parsed, you must configure SparkSession to use an extension to the SQL that will parse only Delta Lake SQL commands and fall back to Spark’s default parser for all other SQL commands. Here are the code snippets you should use to enable our parser.

Scala

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("...")
  .master("...")
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
  .getOrCreate()

Java

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
  .builder()
  .appName("...")
  .master("...")
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
  .getOrCreate();

Python

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("...") \
    .master("...") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .getOrCreate()

# Apache Spark 2.4.x has a known issue (SPARK-25003) that requires explicit activation
# of the extension and cloning of the session. This will unnecessary in Apache Spark 3.x.
if spark.sparkContext.version < "3.":
    spark.sparkContext()._jvm.io.delta.sql.DeltaSparkSessionExtension() \
        .apply(spark._jsparkSession.extensions())
    spark = SparkSession(spark.sparkContext(), spark._jsparkSession.cloneSession())