Storage configuration

Delta Lake ACID guarantees are predicated on the atomicity and durability guarantees of the storage system. Specifically, Delta Lake relies on the following when interacting with storage systems:

  • Atomic visibility: There must a way for a file to visible in its entirety or not visible at all.
  • Mutual exclusion: Only one writer must be able to create (or rename) a file at the final destination.
  • Consistent listing: Once a file has been written in a directory, all future listings for that directory must return that file.

Because storage systems do not necessarily provide all of these guarantees out-of-the-box, Delta Lake transactional operations typically go through the LogStore API instead of accessing the storage system directly. To provide the ACID guarantees for different storage systems, you may have to use different LogStore implementations. This article covers how to configure Delta Lake for various storage systems. There are two categories of storage systems:

  • Storage systems with built-in support: For some storage systems, you do not need additional configurations. Delta Lake uses the scheme of the path (that is, s3a in s3a://path) to dynamically identify the storage system and use the corresponding LogStore implementation that provides the transactional guarantees. However, for S3, there are additional caveats on concurrent writes. See the section on S3 for details.

  • Other storage systems: The LogStore, similar to Apache Spark, uses Hadoop FileSystem API to perform reads and writes. So Delta Lake supports concurrent reads on any storage system that provides an implementation of FileSystem API. For concurrent writes with transactional guarantees, there are two cases based on the guarantees provided by FileSystem implementation. If the implementation provides consistent listing and atomic renames-without-overwrite (that is, rename(... , overwrite = false) will either generate the target file atomically or fail if it already exists with java.nio.file.FileAlreadyExistsException), then the default LogStore implementation using renames will allow concurrent writes with guarantees. Otherwise, you must configure a custom implementation of LogStore by setting the following Spark configuration

    spark.delta.logStore.<scheme>.impl=<full-qualified-class-name>
    

    where <scheme> is the scheme of the paths of your storage system. This configures Delta Lake to dynamically use the given LogStore implementation only for those paths. You can have multiple such configurations for different schemes in your application, thus allowing it to simultaneously read and write from different storage systems.

    Note

    • Delta Lake on local file system may not support concurrent transactional writes. This is because the local file system may or may not provide atomic renames. So you should not use the local file system for testing concurrent writes.
    • Before version 1.0, Delta Lake supported configuring LogStores by setting spark.delta.logStore.class. This approach is now deprecated. Setting this configuration will use the configured LogStore for all paths, thereby disabling the dynamic scheme-based delegation.

In this article:

Amazon S3

Delta Lake has built-in support for S3. Delta Lake supports concurrent reads from multiple clusters, but concurrent writes to S3 must originate from a single Spark driver in order for Delta Lake to provide transactional guarantees. This is because S3 currently does provide mutual exclusion, that is, there is no way to ensure that only one writer is able to create a file.

Warning

Concurrent writes to the same Delta table from multiple Spark drivers can lead to data loss.

Requirements

  • S3 credentials: IAM roles (recommended) or access keys
  • Apache Spark associated with the corresponding Delta Lake version.
  • Hadoop’s AWS connector (hadoop-aws) for the version of Hadoop that Apache Spark is compiled for.

Quickstart

This section explains how to quickly start reading and writing Delta tables on S3. For a detailed explanation of the configuration, see Configuration.

  1. Use the following command to launch a Spark shell with Delta Lake and S3 support (assuming you use Spark pre-built for Hadoop 3.2):

    bin/spark-shell \
     --packages io.delta:delta-core_2.12:1.1.0,org.apache.hadoop:hadoop-aws:3.2.0 \
     --conf spark.hadoop.fs.s3a.access.key=<your-s3-access-key> \
     --conf spark.hadoop.fs.s3a.secret.key=<your-s3-secret-key>
    
  2. Try out some basic Delta table operations on S3 (in Scala):

    // Create a Delta table on S3:
    spark.range(5).write.format("delta").save("s3a://<your-s3-bucket>/<path-to-delta-table>")
    
    // Read a Delta table on S3:
    spark.read.format("delta").load("s3a://<your-s3-bucket>/<path-to-delta-table>").show()
    

For other languages and more examples of Delta table operations, see Quickstart.

Configuration

Here are the steps to configure Delta Lake for S3.

  1. Include hadoop-aws JAR in the classpath.

    Delta Lake needs the org.apache.hadoop.fs.s3a.S3AFileSystem class from the hadoop-aws package, which implements Hadoop’s FileSystem API for S3. Make sure the version of this package matches the Hadoop version with which Spark was built.

  2. Set up S3 credentials.

    We recommend using IAM roles for authentication and authorization. But if you want to use keys, here is one way is to set up the Hadoop configurations (in Scala):

    sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-s3-access-key>")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-s3-secret-key>")
    

Microsoft Azure storage

Delta Lake has built-in support for the various Azure storage systems with full transactional guarantees for concurrent reads and writes from multiple clusters.

Delta Lake relies on Hadoop FileSystem APIs to access Azure storage services. Specifically, Delta Lake requires the implementation of FileSystem.rename() to be atomic, which is only supported in newer Hadoop versions (Hadoop-15156 and Hadoop-15086)). For this reason, you may need to build Spark with newer Hadoop versions and use them for deploying your application. See Specifying the Hadoop Version and Enabling YARN for building Spark with a specific Hadoop version and Quickstart for setting up Spark with Delta Lake.

Here is a list of requirements specific to each type of Azure storage system:

Azure Blob storage

Requirements

  • A shared key or shared access signature (SAS)
  • Delta Lake 0.2.0 or above
  • Hadoop’s Azure Blob Storage libraries for deployment with the following versions:
    • 2.9.1+ for Hadoop 2
    • 3.0.1+ for Hadoop 3
  • Apache Spark associated with the corresponding Delta Lake version (see the Quick Start page of the relevant Delta version’s documentation) and compiled with Hadoop version that is compatible with the chosen Hadoop libraries.

For example, a possible combination that will work is Delta 0.7.0 or above, along with Apache Spark 3.0 compiled and deployed with Hadoop 3.2.

Configuration

Here are the steps to configure Delta Lake on Azure Blob storage.

  1. Include hadoop-azure JAR in the classpath. See the requirements above for version details.

  2. Set up credentials.

    You can set up your credentials in the Spark configuration property.

    We recommend that you use a SAS token. In Scala, you can use the following:

    spark.conf.set(
      "fs.azure.sas.<your-container-name>.<your-storage-account-name>.blob.core.windows.net",
       "<complete-query-string-of-your-sas-for-the-container>")
    

    Or you can specify an account access key:

    spark.conf.set(
      "fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net",
       "<your-storage-account-access-key>")
    

Usage

spark.range(5).write.format("delta").save("wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<path-to-delta-table>")
spark.read.format("delta").load("wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<path-to-delta-table>").show()

Azure Data Lake Storage Gen1

Requirements

  • A service principal for OAuth 2.0 access
  • Delta Lake 0.2.0 or above
  • Hadoop’s Azure Data Lake Storage Gen1 libraries for deployment with the following versions:
    • 2.9.1+ for Hadoop 2
    • 3.0.1+ for Hadoop 3
  • Apache Spark associated with the corresponding Delta Lake version (see the Quick Start page of the relevant Delta version’s documentation) and compiled with Hadoop version that is compatible with the chosen Hadoop libraries.

For example, a possible combination that will work is Delta 0.7.0 or above, along with Apache Spark 3.0 compiled and deployed with Hadoop 3.2.

Configuration

Here are the steps to configure Delta Lake on Azure Data Lake Storage Gen1.

  1. Include hadoop-azure-datalake JAR in the classpath. See the requirements above for version details.

  2. Set up Azure Data Lake Storage Gen1 credentials.

    You can set the following Hadoop configurations with your credentials (in Scala):

    spark.conf.set("dfs.adls.oauth2.access.token.provider.type", "ClientCredential")
    spark.conf.set("dfs.adls.oauth2.client.id", "<your-oauth2-client-id>")
    spark.conf.set("dfs.adls.oauth2.credential", "<your-oauth2-credential>")
    spark.conf.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/<your-directory-id>/oauth2/token")
    

Usage

spark.range(5).write.format("delta").save("adl://<your-adls-account>.azuredatalakestore.net/<path-to-delta-table>")

spark.read.format("delta").load("adl://<your-adls-account>.azuredatalakestore.net/<path-to-delta-table>").show()

Azure Data Lake Storage Gen2

Requirements

  • Account created in Azure Data Lake Storage Gen2)
  • Service principal created and assigned the Storage Blob Data Contributor role for the storage account.
    • Note the storage-account-name, directory-id (also known as tenant-id), application-id, and password of the principal. These will be used for configuring Spark.
  • Delta Lake 0.7.0 or above
  • Apache Spark 3.0 or above
  • Apache Spark used must be built with Hadoop 3.2 or above.

For example, a possible combination that will work is Delta 0.7.0 or above, along with Apache Spark 3.0 compiled and deployed with Hadoop 3.2.

Configuration

Here are the steps to configure Delta Lake on Azure Data Lake Storage Gen1.

  1. Include the JAR of the Maven artifact hadoop-azure-datalake in the classpath. See the requirements for version details. In addition, you may also have to include JARs for Maven artifacts hadoop-azure and wildfly-openssl.

  2. Set up Azure Data Lake Storage Gen2 credentials.

     spark.conf.set("fs.azure.account.auth.type.<storage-account-name>.dfs.core.windows.net", "OAuth")
     spark.conf.set("fs.azure.account.oauth.provider.type.<storage-account-name>.dfs.core.windows.net",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
     spark.conf.set("fs.azure.account.oauth2.client.id.<storage-account-name>.dfs.core.windows.net", "<application-id>")
     spark.conf.set("fs.azure.account.oauth2.client.secret.<storage-account-name>.dfs.core.windows.net","<password>")
     spark.conf.set("fs.azure.account.oauth2.client.endpoint.<storage-account-name>.dfs.core.windows.net", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
    

    where <storage-account-name>, <application-id>, <directory-id> and <password> are details of the service principal we set as requirements earlier.

  3. Initialize the file system if needed

spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
dbutils.fs.ls("abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")

Usage

spark.range(5).write.format("delta").save("abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-delta-table>")

spark.read.format("delta").load("abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-delta-table>").show()

where <container-name> is the file system name under the container.

HDFS

Delta Lake has built-in support for HDFS with full transactional guarantees on concurrent reads and writes from multiple clusters. See Hadoop and Spark documentation for configuring credentials.

Google Cloud Storage

Note

This support is new and experimental.

You must configure Delta Lake to use the correct LogStore for concurrently reading and writing from GCS.

Requirements

Configuration

  1. Configure LogStore implementation for the scheme gs.

    spark.delta.logStore.gs.impl=io.delta.storage.GCSLogStore
    
  2. Include the JARs for delta-contribs and gcs-connector in the classpath. See the documentation for details on how to configure Spark with GCS.

Usage

spark.range(5, 10).write.format("delta").mode("append").save("gs://<bucket-name>/<path-to-delta-table>")

spark.read.format("delta").load("gs://<bucket-name>/<path-to-delta-table>").show()

Oracle Cloud Infrastructure

Note

This support is new and experimental.

You have to configure Delta Lake to use the correct LogStore for concurrently reading and writing.

Configuration

  1. Configure LogStore implementation for the scheme oci.

    spark.delta.logStore.oci.impl=io.delta.storage.OracleCloudLogStore
    
  2. Include the JARs for delta-contribs and hadoop-oci-connector in the classpath. See Using the HDFS Connector with Spark for details on how to configure Spark with OCI.

  3. Set the OCI Object Store credentials as explained in the documentation.

Usage

spark.range(5).write.format("delta").save("oci://<ociBucket>@<ociNameSpace>/<path-to-delta-table>")

spark.read.format("delta").load("oci://<ociBucket>@<ociNameSpace>/<path-to-delta-table>").show()

IBM Cloud Object Storage

Note

This support is new and experimental.

You have to configure Delta Lake to use the correct LogStore for concurrently reading and writing.

Requirements

Configuration

  1. Configure LogStore implementation for the scheme cos.

    spark.delta.logStore.cos.impl=io.delta.storage.IBMCOSLogStore
    
  2. Include the JARs for delta-contribs and Stocator in the classpath.

  3. Configure Stocator with atomic write support by setting the following properties in the Hadoop configuration.

    fs.stocator.scheme.list=cos
    fs.cos.impl=com.ibm.stocator.fs.ObjectStoreFileSystem
    fs.stocator.cos.impl=com.ibm.stocator.fs.cos.COSAPIClient
    fs.stocator.cos.scheme=cos
    fs.cos.atomic.write=true
    
  4. Set up IBM COS credentials. The example below uses access keys with a service named service (in Scala):

    sc.hadoopConfiguration.set("fs.cos.service.endpoint", "<your-cos-endpoint>")
    sc.hadoopConfiguration.set("fs.cos.service.access.key", "<your-cos-access-key>")
    sc.hadoopConfiguration.set("fs.cos.service.secret.key", "<your-cos-secret-key>")
    

Usage

spark.range(5).write.format("delta").save("cos://<your-cos-bucket>.service/<path-to-delta-table>")
spark.read.format("delta").load("cos://<your-cos-bucket>.service/<path-to-delta-table>").show()