Delta Standalone

The Delta Standalone library is a single-node Java library that can be used to read from and write to Delta tables. Specifically, this library provides APIs to interact with a table’s metadata in the transaction log, implementing the Delta Transaction Log Protocol to achieve the transactional guarantees of the Delta Lake format. Notably, this project doesn’t depend on Apache Spark and has only a few transitive dependencies. Therefore, it can be used by any processing engine or application to access Delta tables.

Use cases

Delta Standalone is optimized for cases when you want to read and write Delta tables by using a non-Spark engine of your choice. It is a “low-level” library, and we encourage developers to contribute open-source, higher-level connectors for their desired engines that use Delta Standalone for all Delta Lake metadata interaction. You can find a Hive source connector and Flink sink connector in the Delta Lake Connectors repository, and additional connectors are in development.

Caveats

Delta Standalone minimizes memory usage in the JVM by loading the Delta Lake transaction log incrementally, using an iterator. However, Delta Standalone runs in a single JVM, and is limited to the processing and memory capabilities of that JVM. Users must configure the JVM to avoid out of memory (OOM) issues.

Delta Standalone does provide basic APIs for reading Parquet data, but does not include APIs for writing Parquet data. Users must write out new Parquet data files themselves and then use Delta Standalone to commit those changes to the Delta table and make the new data visible to readers.

APIs

Delta Standalone provides classes and entities to read data, query metadata, and commit to the transaction log. A few of them are highlighted here and with their key interfaces. See the Java API docs for the full set of classes and entities.

DeltaLog

DeltaLog is the main interface for programmatically interacting with the metadata in the transaction log of a Delta table.

  • Instantiate a DeltaLog with DeltaLog.forTable(hadoopConf, path) and pass in the path of the root location of the Delta table.
  • Access the current snapshot with DeltaLog::snapshot.
  • Get the latest snapshot, including any new data files that were added to the log, with DeltaLog::update.
  • Get the snapshot at some historical state of the log with DeltaLog::getSnapshotForTimestampAsOf or DeltaLog::getSnapshotForVersionAsOf.
  • Start a new transaction to commit to the transaction log by using DeltaLog::startTransaction.

Snapshot

A Snapshot represents the state of the table at a specific version.

  • Get a list of the metadata files by using Snapshot::getAllFiles.
  • For a memory-optimized iterator over the metadata files, use Snapshot::scan to get a DeltaScan (as described later), optionally by passing in a predicate for partition filtering.
  • Read actual data with Snapshot::open, which returns an iterator over the rows of the Delta table.

OptimisticTransaction

The main class for committing a set of updates to the transaction log is OptimisticTransaction. During a transaction, all reads must go through the OptimisticTransaction instance rather than the DeltaLog in order to detect logical conflicts and concurrent updates.

  • Read metadata files during a transaction with OptimisticTransaction::markFilesAsRead, which returns a DeltaScan of files that match the readPredicate.
  • Commit to the transaction log with OptimisticTransaction::commit.
  • Get the latest version committed for a given application ID (for example, for idempotency) with OptimisticTransaction::txnVersion. (Note that this API requires users to commit SetTransaction actions.)
  • Update the medadata of the table upon committing with OptimisticTransaction::updateMetadata.

DeltaScan

DeltaScan is a wrapper class for the files inside a Snapshot that match a given readPredicate.

  • Access the files that match the partition filter portion of the readPredicate with DeltaScan::getFiles. This returns a memory-optimized iterator over the metadata files in the table.
  • To further filter the returned files on non-partition columns, get the portion of input predicate not applied with DeltaScan::getResidualPredicate.

API compatibility

The only public APIs currently provided by Delta Standalone are in the io.delta.standalone package. Classes and methods in the io.delta.standalone.internal package are considered internal and are subject to change across minor and patch releases.

Project setup

You can add the Delta Standalone library as a dependency by using your preferred build tool. Delta Standalone depends upon the hadoop-client and parquet-hadoop packages. Example build files are listed in the following sections.

Environment requirements

  • JDK 8 or above.
  • Scala 2.11 or 2.12.

Build files

Maven

Replace the versions of hadoop-client and parquet-hadoop with the ones you are using.

Scala 2.12:

<dependency>
  <groupId>io.delta</groupId>
  <artifactId>delta-standalone_2.12</artifactId>
  <version>0.4.0</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>3.1.0</version>
</dependency>
<dependency>
  <groupId>org.apache.parquet</groupId>
  <artifactId>parquet-hadoop</artifactId>
  <version>1.10.1</version>
</dependency>

Scala 2.11:

<dependency>
  <groupId>io.delta</groupId>
  <artifactId>delta-standalone_2.11</artifactId>
  <version>0.4.0</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>3.1.0</version>
</dependency>
<dependency>
  <groupId>org.apache.parquet</groupId>
  <artifactId>parquet-hadoop</artifactId>
  <version>1.10.1</version>
</dependency>

SBT

Replace the versions of hadoop-client and parquet-hadoop with the ones you are using.

libraryDependencies ++= Seq(
  "io.delta" %% "delta-standalone" % "0.4.0",
  "org.apache.hadoop" % "hadoop-client" % "3.1.0,
  "org.apache.parquet" % "parquet-hadoop" % "1.10.1")

Storage configuration

Delta Lake ACID guarantees are based on the atomicity and durability guarantees of the storage system. Not all storage systems provide all the necessary guarantees.

Delta Standalone supports concurrent reads on any storage system that provides an implementation of the Hadoop FileSystem API.

For concurrent writes with transactional guarantees, if the implementation provides consistent listing and atomic renames-without-overwrite (that is, rename(...,, overwrite=false) either generates the target file atomically or fails if it already exists with java.nio.file.FileAlreadyExistsException), then the default LogStore implementation allows concurrent writes with guarantees. See Storage Configuration for more details.

For storage systems that do not provide the necessary guarantees, you must use a custom LogStore implementation for concurrent writes with transactional guarantees. Delta Standalone currently provides implementations for Amazon S3, Microsoft Azure Blob Storage, Microsoft Azure Data Lake Storage Gen1, and Microsoft Azure Data Lake Storage Gen2.

Amazon S3 configuration

Delta Standalone supports concurrent reads from multiple clusters, but concurrent writes to S3 must originate from a single cluster to provide transactional guarantees. To use Delta Standalone with S3, you must meet the following requirements, and also ensure you configure a Hadoop Configuration specified as follows when you instantiate a DeltaLog with DeltaLog.fromTable(hadoopConf, path).

Requirements
Configuration
  1. Include hadoop-aws JAR in the classpath.

  2. Set up S3 credentials. We recommend that you use IAM roles for authentication and authorization. But if you want to use keys, configure your org.apache.hadoop.conf.Configuration with:

    conf.set("fs.s3a.access.key", "<your-s3-access-key>");
    conf.set("fs.s3a.secret.key", "<your-s3-secret-key>");
    
  3. Configure Delta Standalone to use the correct LogStore implementation with:

    conf.set("io.delta.standalone.LOG_STORE_CLASS_KEY",  "io.delta.standalone.internal.storage.S3SingleDriverLogStore");
    

Microsoft Azure configuration

Delta Standalone supports concurrent reads and writes from multiple clusters with full transactional guarantees for various Azure storage systems. To use an Azure storage system, you must satisfy the following requirements, and also configure a Hadoop Configuration as specified later in this article, when you instantiate a DeltaLog with DeltaLog.fromTable(hadoopConf, path).

Azure Blob Storage
Requirements
Configuration
  1. Include hadoop-azure JAR in the classpath.

  2. Set up credentials.

    • For an SAS token, configure org.apache.hadoop.conf.Configuration:

      conf.set(
        "fs.azure.sas.<your-container-name>.<your-storage-account-name>.blob.core.windows.net",
        "<complete-query-string-of-your-sas-for-the-container>");
      
    • To specify an account access key:

      conf.set(
        "fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net",
        "<your-storage-account-access-key>");
      
  3. Configure Delta Standalone to use the correct LogStore implementation:

    hadoopConf.set("io.delta.standalone.LOG_STORE_CLASS_KEY", "io.delta.standalone.internal.storage.AzureLogStore");
    
Azure Data Lake Storage Gen1
Requirements
Configuration
  1. Include hadoop-azure-datalake JAR in the classpath.

  2. Set up Azure Data Lake Storage Gen1 credentials. Configure org.apache.hadoop.conf.Configuration:

    conf.set("dfs.adls.oauth2.access.token.provider.type", "ClientCredential");
    conf.set("dfs.adls.oauth2.client.id", "<your-oauth2-client-id>");
    conf.set("dfs.adls.oauth2.credential", "<your-oauth2-credential>");
    conf.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/<your-directory-id>/oauth2/token");
    
  3. Configure Delta Standalone to use the correct LogStore implementation:

    hadoopConf.set("io.delta.standalone.LOG_STORE_CLASS_KEY", "io.delta.standalone.internal.storage.AzureLogStore");
    
Azure Data Lake Storage Gen2
Requirements
Configuration
  1. Include hadoop-azure-datalake JAR in the classpath. In addition, you may also have to include JARs for Maven artifacts hadoop-azure and wildfly-openssl.

  2. Set up Azure Data Lake Storage Gen2 credentials. Configure your org.apache.hadoop.conf.Configuration with:

    conf.set("fs.azure.account.auth.type.<storage-account-name>.dfs.core.windows.net", "OAuth");
    conf.set("fs.azure.account.oauth.provider.type.<storage-account-name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider");
    conf.set("fs.azure.account.oauth2.client.id.<storage-account-name>.dfs.core.windows.net", "<application-id>");
    conf.set("fs.azure.account.oauth2.client.secret.<storage-account-name>.dfs.core.windows.net","<password>");
    conf.set("fs.azure.account.oauth2.client.endpoint.<storage-account-name>.dfs.core.windows.net", "https://login.microsoftonline.com/<directory-id>/oauth2/token");
    

    where <storage-account-name>, <application-id>, <directory-id> and <password> are details of the service principal we set as requirements earlier.

  3. Configure Delta Standalone to use the correct LogStore implementation with:

    hadoopConf.set("io.delta.standalone.LOG_STORE_CLASS_KEY", "io.delta.standalone.internal.storage.AzureLogStore");
    

Usage

This example shows how to use Delta Standalone along with a fictitious, non-Spark engine Zappy to:

  • Find parquet files.
  • Write parquet data.
  • Commit to the transaction log.
  • Read from the transaction log.
  • Read back the Parquet data.

1. SBT configuration

The following SBT project configuration is used:

// <project-root>/build.sbt

scalaVersion := "2.12.8"

libraryDependencies ++= Seq(
  "io.delta" %% "delta-standalone" % "0.3.0",
  "org.apache.hadoop" % "hadoop-client" % "3.1.0",
  "org.apache.parquet" % "parquet-hadoop" % "1.10.1",
)

2. Mock situation

We have a Delta table Sales storing sales data, but have realized all the data written on November 2021 for customer XYZ had incorrect total_cost values. Thus, we need to update all those records with the correct values. We will use a fictious distributed engine Zappy and Delta Standalone to update our Delta table.

The sales table schema is given below.

Sales
 |-- year: int          // partition column
 |-- month: int         // partition column
 |-- day: int           // partition column
 |-- customer: string
 |-- sale_id: string
 |-- total_cost: float

3. Starting a transaction and finding relevant files

Since we must read existing data in order to perform the desired update operation, we must use OptimisticTransaction::markFilesAsRead in order to automatically detect any concurrent modifications made to our read partitions. Since Delta Standalone only supports partition pruning, we must apply the residual predicate to further filter the returned files.

import io.delta.standalone.DeltaLog;
import io.delta.standalone.DeltaScan;
import io.delta.standalone.OptimisticTransaction;
import io.delta.standalone.actions.AddFile;
import io.delta.standalone.data.CloseableIterator;
import io.delta.standalone.expressions.And;
import io.delta.standalone.expressions.EqualTo;
import io.delta.standalone.expressions.Literal;

DeltaLog log = DeltaLog.forTable(new Configuration(), "/data/sales");
OptimisticTransaction txn = log.startTransaction();

DeltaScan scan = txn.markFilesAsRead(
    new And(
        new And(
            new EqualTo(schema.column("year"), Literal.of(2021)),  // partition filter
            new EqualTo(schema.column("month"), Literal.of(11))),  // partition filter
        new EqualTo(schema.column("customer"), Literal.of("XYZ"))  // non-partition filter
    )
);

CloseableIterator<AddFile> iter = scan.getFiles();
Map<String, AddFile> addFileMap = new HashMap<String, AddFile>();  // partition filtered files: year=2021, month=11
while (iter.hasNext()) {
    AddFile addFile = iter.next();
    addFileMap.put(addFile.getPath(), addFile);
}
iter.close();

List<String> filteredFiles = ZappyReader.filterFiles( // fully filtered files: year=2021, month=11, customer=XYZ
    addFileMap.keySet(),
    toZappyExpression(scan.getResidualPredicate())
);

4. Writing updated Parquet data

Since Delta Standalone does not provide any Parquet data write APIs, we use Zappy to write the data.

ZappyDataFrame correctedSaleIdToTotalCost = ...;

ZappyDataFrame invalidSales = ZappyReader.readParquet(filteredFiles);
ZappyDataFrame correctedSales = invalidSales.join(correctedSaleIdToTotalCost, "id");

ZappyWriteResult dataWriteResult = ZappyWritter.writeParquet("/data/sales", correctedSales);

The written data files from the preceding code will have a hierarchy similar to the following:

$ tree /data/sales
.
├── _delta_log
│   └── ...
│   └── 00000000000000001082.json
│   └── 00000000000000001083.json
├── year=2019
│   └── month=1
...
├── year=2020
│   └── month=1
│       └── day=1
│           └── part-00000-195768ae-bad8-4c53-b0c2-e900e0f3eaee-c000.snappy.parquet // previous
│           └── part-00001-53c3c553-f74b-4384-b9b5-7aa45bc2291b-c000.snappy.parquet // new
|           ...
│       └── day=2
│           └── part-00000-b9afbcf5-b90d-4f92-97fd-a2522aa2d4f6-c000.snappy.parquet // previous
│           └── part-00001-c0569730-5008-42fa-b6cb-5a152c133fde-c000.snappy.parquet // new
|           ...

5. Committing to our Delta table

Now that we’ve written the correct data, we need to commit to the transaction log to add the new files, and remove the old incorrect files.

import io.delta.standalone.Operation;
import io.delta.standalone.actions.RemoveFile;
import io.delta.standalone.exceptions.DeltaConcurrentModificationException;
import io.delta.standalone.types.StructType;

List<RemoveFile> removeOldFiles = filteredFiles.stream()
    .map(path -> addFileMap.get(path).remove())
    .collect(Collectors.toList());

List<AddFile> addNewFiles = dataWriteResult.getNewFiles()
    .map(file ->
        new AddFile(
            file.getPath(),
            file.getPartitionValues(),
            file.getSize(),
            System.currentTimeMillis(),
            true, // isDataChange
            null, // stats
            null  // tags
        );
    ).collect(Collectors.toList());

List<Action> totalCommitFiles = new ArrayList<>();
totalCommitFiles.addAll(removeOldFiles);
totalCommitFiles.addAll(addNewFiles);

try {
    txn.commit(totalCommitFiles, new Operation(Operation.Name.UPDATE), "Zippy/1.0.0");
} catch (DeltaConcurrentModificationException e) {
    // handle exception here
}

6. Reading from the Delta table

Delta Standalone provides APIs that read both metadata and data, as follows.

6.1. Reading Parquet data (distributed)

For most use cases, and especially when you deal with large volumes of data, we recommend that you use the Delta Standalone library as your metadata-only reader, and then perform the Parquet data reading yourself, most likely in a distributed manner.

Delta Standalone provides two APIs for reading the files in a given table snapshot. Snapshot::getAllFiles returns an in-memory list. As of 0.3.0, we also provide Snapshot::scan(filter)::getFiles, which supports partition pruning and an optimized internal iterator implementation. We will use the latter here.

import io.delta.standalone.Snapshot;

DeltaLog log = DeltaLog.forTable(new Configuration(), "/data/sales");
Snapshot latestSnapshot = log.update();
StructType schema = latestSnapshot.getMetadata().getSchema();
DeltaScan scan = latestSnapshot.scan(
    new And(
        new And(
            new EqualTo(schema.column("year"), Literal.of(2021)),
            new EqualTo(schema.column("month"), Literal.of(11))),
        new EqualTo(schema.column("customer"), Literal.of("XYZ"))
    )
);

CloseableIterator<AddFile> iter = scan.getFiles();

try {
    while (iter.hasNext()) {
        AddFile addFile = iter.next();

        // Zappy engine to handle reading data in `addFile.getPath()` and apply any `scan.getResidualPredicate()`
    }
} finally {
    iter.close();
}

6.2. Reading Parquet data (single-JVM)

Delta Standalone allows reading the Parquet data directly, using Snapshot::open.

import io.delta.standalone.data.RowRecord;

CloseableIterator<RowRecord> dataIter = log.update().open();

try {
    while (dataIter.hasNext()) {
        RowRecord row = dataIter.next();
        int year = row.getInt("year");
        String customer = row.getString("customer");
        float totalCost = row.getFloat("total_cost");
    }
} finally {
    dataIter.close();
}

Reporting issues

We use GitHub Issues to track community reported issues. You can also contact the community for getting answers.

Contributing

We welcome contributions to Delta Lake Connectors repository. We use GitHub Pull Requests for accepting changes.

Community

There are two ways to communicate with the Delta Lake community.

Local development

  • Before local debugging of standalone tests in IntelliJ, run all tests with build/sbt standalone/test. This helps IntelliJ recognize the golden tables as class resources.