Delta Standalone
The Delta Standalone library is a single-node Java library that can be used to read from and write to Delta tables. Specifically, this library provides APIs to interact with a table’s metadata in the transaction log, implementing the Delta Transaction Log Protocol to achieve the transactional guarantees of the Delta Lake format. Notably, this project doesn’t depend on Apache Spark and has only a few transitive dependencies. Therefore, it can be used by any processing engine or application to access Delta tables.
- Delta Standalone
- Use cases
- APIs
- API compatibility
- Project setup
- Usage
- 1. SBT configuration
- 2. Mock situation
- 3. Starting a transaction and finding relevant files
- 4. Writing updated Parquet data
- 5. Committing to our Delta table
- 6. Reading from the Delta table
- 6.1. Reading Parquet data (distributed)
- 6.2. Reading Parquet data (single-JVM)
- Reporting issues
- Contributing
- Community
- Local development
Use cases
Delta Standalone is optimized for cases when you want to read and write Delta tables by using a non-Spark engine of your choice. It is a “low-level” library, and we encourage developers to contribute open-source, higher-level connectors for their desired engines that use Delta Standalone for all Delta Lake metadata interaction. You can find a Hive source connector and Flink sink connector in the Delta Lake Connectors repository, and additional connectors are in development.
Caveats
Delta Standalone minimizes memory usage in the JVM by loading the Delta Lake transaction log incrementally, using an iterator. However, Delta Standalone runs in a single JVM, and is limited to the processing and memory capabilities of that JVM. Users must configure the JVM to avoid out of memory (OOM) issues.
Delta Standalone does provide basic APIs for reading Parquet data, but does not include APIs for writing Parquet data. Users must write out new Parquet data files themselves and then use Delta Standalone to commit those changes to the Delta table and make the new data visible to readers.
APIs
Delta Standalone provides classes and entities to read data, query metadata, and commit to the transaction log. A few of them are highlighted here and with their key interfaces. See the Java API docs for the full set of classes and entities.
DeltaLog
DeltaLog is the main interface for programmatically interacting with the metadata in the transaction log of a Delta table.
- Instantiate a
DeltaLog
withDeltaLog.forTable(hadoopConf, path)
and pass in thepath
of the root location of the Delta table. - Access the current snapshot with
DeltaLog::snapshot
. - Get the latest snapshot, including any new data files that were added to the log, with
DeltaLog::update
. - Get the snapshot at some historical state of the log with
DeltaLog::getSnapshotForTimestampAsOf
orDeltaLog::getSnapshotForVersionAsOf
. - Start a new transaction to commit to the transaction log by using
DeltaLog::startTransaction
.
Snapshot
A Snapshot represents the state of the table at a specific version.
- Get a list of the metadata files by using
Snapshot::getAllFiles
. - For a memory-optimized iterator over the metadata files, use
Snapshot::scan
to get aDeltaScan
(as described later), optionally by passing in apredicate
for partition filtering. - Read actual data with
Snapshot::open
, which returns an iterator over the rows of the Delta table.
OptimisticTransaction
The main class for committing a set of updates to the transaction log is OptimisticTransaction. During a transaction, all reads must go through the OptimisticTransaction
instance rather than the DeltaLog
in order to detect logical conflicts and concurrent updates.
- Read metadata files during a transaction with
OptimisticTransaction::markFilesAsRead
, which returns aDeltaScan
of files that match thereadPredicate
. - Commit to the transaction log with
OptimisticTransaction::commit
. - Get the latest version committed for a given application ID (for example, for idempotency) with
OptimisticTransaction::txnVersion
. (Note that this API requires users to commitSetTransaction
actions.) - Update the medadata of the table upon committing with
OptimisticTransaction::updateMetadata
.
DeltaScan
DeltaScan is a wrapper class for the files inside a Snapshot
that match a given readPredicate
.
- Access the files that match the partition filter portion of the
readPredicate
withDeltaScan::getFiles
. This returns a memory-optimized iterator over the metadata files in the table. - To further filter the returned files on non-partition columns, get the portion of input predicate not applied with
DeltaScan::getResidualPredicate
.
API compatibility
The only public APIs currently provided by Delta Standalone are in the io.delta.standalone
package. Classes and methods in the io.delta.standalone.internal
package are considered internal and are subject to change across minor and patch releases.
Project setup
You can add the Delta Standalone library as a dependency by using your preferred build tool. Delta Standalone depends upon the hadoop-client
and parquet-hadoop
packages. Example build files are listed in the following sections.
Build files
Maven
Replace the versions of hadoop-client
and parquet-hadoop
with the ones you are using.
Scala 2.12:
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>0.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.10.1</version>
</dependency>
Scala 2.11:
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.11</artifactId>
<version>0.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.10.1</version>
</dependency>
Storage configuration
Delta Lake ACID guarantees are based on the atomicity and durability guarantees of the storage system. Not all storage systems provide all the necessary guarantees.
Delta Standalone supports concurrent reads on any storage system that provides an implementation of the Hadoop FileSystem
API.
For concurrent writes with transactional guarantees, if the implementation provides consistent listing and atomic renames-without-overwrite (that is, rename(...,, overwrite=false)
either generates the target file atomically or fails if it already exists with java.nio.file.FileAlreadyExistsException
), then the default LogStore
implementation allows concurrent writes with guarantees. See Storage Configuration for more details.
For storage systems that do not provide the necessary guarantees, you must use a custom LogStore
implementation for concurrent writes with transactional guarantees. Delta Standalone currently provides implementations for Amazon S3, Microsoft Azure Blob Storage, Microsoft Azure Data Lake Storage Gen1, and Microsoft Azure Data Lake Storage Gen2.
Amazon S3 configuration
Delta Standalone supports concurrent reads from multiple clusters, but concurrent writes to S3 must originate from a single cluster to provide transactional guarantees. To use Delta Standalone with S3, you must meet the following requirements, and also ensure you configure a Hadoop Configuration specified as follows when you instantiate a DeltaLog
with DeltaLog.fromTable(hadoopConf, path)
.
Requirements
- S3 credentials: IAM roles (recommended) or access keys.
- Hadoop’s AWS connector (hadoop-aws) for the version of Hadoop that Delta Standalone is compiled with.
Configuration
Include
hadoop-aws
JAR in the classpath.Set up S3 credentials. We recommend that you use IAM roles for authentication and authorization. But if you want to use keys, configure your
org.apache.hadoop.conf.Configuration
with:conf.set("fs.s3a.access.key", "<your-s3-access-key>"); conf.set("fs.s3a.secret.key", "<your-s3-secret-key>");
Configure Delta Standalone to use the correct
LogStore
implementation with:conf.set("io.delta.standalone.LOG_STORE_CLASS_KEY", "io.delta.standalone.internal.storage.S3SingleDriverLogStore");
Microsoft Azure configuration
Delta Standalone supports concurrent reads and writes from multiple clusters with full transactional guarantees for various Azure storage systems. To use an Azure storage system, you must satisfy the following requirements, and also configure a Hadoop Configuration as specified later in this article, when you instantiate a DeltaLog
with DeltaLog.fromTable(hadoopConf, path)
.
Azure Blob Storage
Requirements
- A shared key or shared access signature (SAS).
- Hadoop’s Azure Blob Storage libraries for a version compatible with the Hadoop version Delta Standalone was compiled with.
- 2.9.1+ for Hadoop 2
- 3.0.1+ for Hadoop 3
Configuration
Include
hadoop-azure
JAR in the classpath.Set up credentials.
For an SAS token, configure
org.apache.hadoop.conf.Configuration
:conf.set( "fs.azure.sas.<your-container-name>.<your-storage-account-name>.blob.core.windows.net", "<complete-query-string-of-your-sas-for-the-container>");
To specify an account access key:
conf.set( "fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net", "<your-storage-account-access-key>");
Configure Delta Standalone to use the correct
LogStore
implementation:hadoopConf.set("io.delta.standalone.LOG_STORE_CLASS_KEY", "io.delta.standalone.internal.storage.AzureLogStore");
Azure Data Lake Storage Gen1
Requirements
- A service principal for OAuth 2.0 access.
- Hadoop’s Azure Data Lake Storage Gen1 libraries for a version that is compatible with the Hadoop version that was used to compile Delta Standalone.
- 2.9.1+ for Hadoop 2
- 3.0.1+ for Hadoop 3
Configuration
Include
hadoop-azure-datalake
JAR in the classpath.Set up Azure Data Lake Storage Gen1 credentials. Configure
org.apache.hadoop.conf.Configuration
:conf.set("dfs.adls.oauth2.access.token.provider.type", "ClientCredential"); conf.set("dfs.adls.oauth2.client.id", "<your-oauth2-client-id>"); conf.set("dfs.adls.oauth2.credential", "<your-oauth2-credential>"); conf.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/<your-directory-id>/oauth2/token");
Configure Delta Standalone to use the correct
LogStore
implementation:hadoopConf.set("io.delta.standalone.LOG_STORE_CLASS_KEY", "io.delta.standalone.internal.storage.AzureLogStore");
Azure Data Lake Storage Gen2
Requirements
- Account created in Azure Data Lake Storage Gen2.
- Service principal created and assigned the Storage Blob Data Contributor role for the storage account.
- Make a note of the storage-account-name, directory-id (also known as tenant-id), application-id, and password of the principal. These will be used for configuration.
- Hadoop’s Azure Data Lake Storage Gen2 libraries version 3.2+ and Delta Standalone compiled with Hadoop 3.2+.
Configuration
Include
hadoop-azure-datalake
JAR in the classpath. In addition, you may also have to include JARs for Maven artifactshadoop-azure
andwildfly-openssl
.Set up Azure Data Lake Storage Gen2 credentials. Configure your
org.apache.hadoop.conf.Configuration
with:conf.set("fs.azure.account.auth.type.<storage-account-name>.dfs.core.windows.net", "OAuth"); conf.set("fs.azure.account.oauth.provider.type.<storage-account-name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"); conf.set("fs.azure.account.oauth2.client.id.<storage-account-name>.dfs.core.windows.net", "<application-id>"); conf.set("fs.azure.account.oauth2.client.secret.<storage-account-name>.dfs.core.windows.net","<password>"); conf.set("fs.azure.account.oauth2.client.endpoint.<storage-account-name>.dfs.core.windows.net", "https://login.microsoftonline.com/<directory-id>/oauth2/token");
where
<storage-account-name>
,<application-id>
,<directory-id>
and<password>
are details of the service principal we set as requirements earlier.Configure Delta Standalone to use the correct
LogStore
implementation with:hadoopConf.set("io.delta.standalone.LOG_STORE_CLASS_KEY", "io.delta.standalone.internal.storage.AzureLogStore");
Usage
This example shows how to use Delta Standalone along with a fictitious, non-Spark engine Zappy
to:
- Find parquet files.
- Write parquet data.
- Commit to the transaction log.
- Read from the transaction log.
- Read back the Parquet data.
1. SBT configuration
The following SBT project configuration is used:
// <project-root>/build.sbt
scalaVersion := "2.12.8"
libraryDependencies ++= Seq(
"io.delta" %% "delta-standalone" % "0.4.1",
"org.apache.hadoop" % "hadoop-client" % "3.1.0",
"org.apache.parquet" % "parquet-hadoop" % "1.10.1",
)
2. Mock situation
We have a Delta table Sales
storing sales data, but have realized all the data written on November 2021 for customer XYZ
had incorrect total_cost
values. Thus, we need to update all those records with the correct values. We will use a fictious distributed engine Zappy
and Delta Standalone to update our Delta table.
The sales table schema is given below.
Sales
|-- year: int // partition column
|-- month: int // partition column
|-- day: int // partition column
|-- customer: string
|-- sale_id: string
|-- total_cost: float
3. Starting a transaction and finding relevant files
Since we must read existing data in order to perform the desired update operation, we must use OptimisticTransaction::markFilesAsRead
in order to automatically detect any concurrent modifications made to our read partitions. Since Delta Standalone only supports partition pruning, we must apply the residual predicate to further filter the returned files.
import io.delta.standalone.DeltaLog;
import io.delta.standalone.DeltaScan;
import io.delta.standalone.OptimisticTransaction;
import io.delta.standalone.actions.AddFile;
import io.delta.standalone.data.CloseableIterator;
import io.delta.standalone.expressions.And;
import io.delta.standalone.expressions.EqualTo;
import io.delta.standalone.expressions.Literal;
DeltaLog log = DeltaLog.forTable(new Configuration(), "/data/sales");
OptimisticTransaction txn = log.startTransaction();
DeltaScan scan = txn.markFilesAsRead(
new And(
new And(
new EqualTo(schema.column("year"), Literal.of(2021)), // partition filter
new EqualTo(schema.column("month"), Literal.of(11))), // partition filter
new EqualTo(schema.column("customer"), Literal.of("XYZ")) // non-partition filter
)
);
CloseableIterator<AddFile> iter = scan.getFiles();
Map<String, AddFile> addFileMap = new HashMap<String, AddFile>(); // partition filtered files: year=2021, month=11
while (iter.hasNext()) {
AddFile addFile = iter.next();
addFileMap.put(addFile.getPath(), addFile);
}
iter.close();
List<String> filteredFiles = ZappyReader.filterFiles( // fully filtered files: year=2021, month=11, customer=XYZ
addFileMap.keySet(),
toZappyExpression(scan.getResidualPredicate())
);
4. Writing updated Parquet data
Since Delta Standalone does not provide any Parquet data write APIs, we use Zappy
to write the data.
ZappyDataFrame correctedSaleIdToTotalCost = ...;
ZappyDataFrame invalidSales = ZappyReader.readParquet(filteredFiles);
ZappyDataFrame correctedSales = invalidSales.join(correctedSaleIdToTotalCost, "id");
ZappyWriteResult dataWriteResult = ZappyWritter.writeParquet("/data/sales", correctedSales);
The written data files from the preceding code will have a hierarchy similar to the following:
$ tree /data/sales
.
├── _delta_log
│ └── ...
│ └── 00000000000000001082.json
│ └── 00000000000000001083.json
├── year=2019
│ └── month=1
...
├── year=2020
│ └── month=1
│ └── day=1
│ └── part-00000-195768ae-bad8-4c53-b0c2-e900e0f3eaee-c000.snappy.parquet // previous
│ └── part-00001-53c3c553-f74b-4384-b9b5-7aa45bc2291b-c000.snappy.parquet // new
| ...
│ └── day=2
│ └── part-00000-b9afbcf5-b90d-4f92-97fd-a2522aa2d4f6-c000.snappy.parquet // previous
│ └── part-00001-c0569730-5008-42fa-b6cb-5a152c133fde-c000.snappy.parquet // new
| ...
5. Committing to our Delta table
Now that we’ve written the correct data, we need to commit to the transaction log to add the new files, and remove the old incorrect files.
import io.delta.standalone.Operation;
import io.delta.standalone.actions.RemoveFile;
import io.delta.standalone.exceptions.DeltaConcurrentModificationException;
import io.delta.standalone.types.StructType;
List<RemoveFile> removeOldFiles = filteredFiles.stream()
.map(path -> addFileMap.get(path).remove())
.collect(Collectors.toList());
List<AddFile> addNewFiles = dataWriteResult.getNewFiles()
.map(file ->
new AddFile(
file.getPath(),
file.getPartitionValues(),
file.getSize(),
System.currentTimeMillis(),
true, // isDataChange
null, // stats
null // tags
);
).collect(Collectors.toList());
List<Action> totalCommitFiles = new ArrayList<>();
totalCommitFiles.addAll(removeOldFiles);
totalCommitFiles.addAll(addNewFiles);
try {
txn.commit(totalCommitFiles, new Operation(Operation.Name.UPDATE), "Zippy/1.0.0");
} catch (DeltaConcurrentModificationException e) {
// handle exception here
}
6. Reading from the Delta table
Delta Standalone provides APIs that read both metadata and data, as follows.
6.1. Reading Parquet data (distributed)
For most use cases, and especially when you deal with large volumes of data, we recommend that you use the Delta Standalone library as your metadata-only reader, and then perform the Parquet data reading yourself, most likely in a distributed manner.
Delta Standalone provides two APIs for reading the files in a given table snapshot. Snapshot::getAllFiles
returns an in-memory list. As of 0.3.0, we also provide Snapshot::scan(filter)::getFiles
, which supports partition pruning and an optimized internal iterator implementation. We will use the latter here.
import io.delta.standalone.Snapshot;
DeltaLog log = DeltaLog.forTable(new Configuration(), "/data/sales");
Snapshot latestSnapshot = log.update();
StructType schema = latestSnapshot.getMetadata().getSchema();
DeltaScan scan = latestSnapshot.scan(
new And(
new And(
new EqualTo(schema.column("year"), Literal.of(2021)),
new EqualTo(schema.column("month"), Literal.of(11))),
new EqualTo(schema.column("customer"), Literal.of("XYZ"))
)
);
CloseableIterator<AddFile> iter = scan.getFiles();
try {
while (iter.hasNext()) {
AddFile addFile = iter.next();
// Zappy engine to handle reading data in `addFile.getPath()` and apply any `scan.getResidualPredicate()`
}
} finally {
iter.close();
}
6.2. Reading Parquet data (single-JVM)
Delta Standalone allows reading the Parquet data directly, using Snapshot::open
.
import io.delta.standalone.data.RowRecord;
CloseableIterator<RowRecord> dataIter = log.update().open();
try {
while (dataIter.hasNext()) {
RowRecord row = dataIter.next();
int year = row.getInt("year");
String customer = row.getString("customer");
float totalCost = row.getFloat("total_cost");
}
} finally {
dataIter.close();
}
Reporting issues
We use GitHub Issues to track community reported issues. You can also contact the community for getting answers.
Contributing
We welcome contributions to Delta Lake Connectors repository. We use GitHub Pull Requests for accepting changes.
Community
There are two ways to communicate with the Delta Lake community.
- Public Slack Channel
- Public mailing list