Table Streaming Reads and Writes
Delta Lake is deeply integrated with Spark Structured Streaming through readStream
and writeStream
. Delta Lake overcomes many of the limitations typically associated with streaming systems and files, including:
- Maintaining “exactly-once” processing with more than one stream (or concurrent batch jobs)
- Efficiently discovering which files are new when using files as the source for a stream
Delta table as a stream source
When you load a Delta table as a stream source and use it in a streaming query, the query processes all of the data present in the table as well as any new data that arrives after the stream is started.
spark.readStream.format("delta").load("/delta/events")
You can also control the maximum size of any micro-batch that Delta Lake gives to streaming by setting the maxFilesPerTrigger
option. This specifies the maximum number of new files to be considered in every trigger. The default is 1000.
Ignoring updates and deletes
Structured Streaming does not handle input that is not an append and throws an exception if any modifications occur on the table being used as a source. There are two main strategies for dealing with changes that cannot be propagated downstream automatically:
- Since Delta tables retain all history by default, in many cases you can delete the output and checkpoint and restart the stream from the beginning.
- You can set either of these two options:
ignoreDeletes
ignore transactions that delete data at partition boundaries. For example, if your source table is partitioned by date, and you delete data older than 30 days, the deletion will not be propagated downstream, but the stream can continue to operate.ignoreChanges
re-process updates if files had to be rewritten in the source table due to a data changing operation such asUPDATE
,MERGE INTO
,DELETE
(within partitions), orOVERWRITE
. Unchanged rows may still be emitted, therefore your downstream consumers should be able to handle duplicates. Deletes are not propagated downstream.ignoreChanges
subsumesignoreDeletes
. Therefore if you useignoreChanges
, your stream will not be disrupted by either deletions or updates to the source table.
Example
For example, suppose you have a table user_events
composed of date
, user_email
, and action
that is partitioned by date
. You stream out of the user_events
table, but you need to delete data from it due to GDPR.
If you’re deleting data older than 30 days, you can use:
events.readStream
.format("delta")
.option("ignoreDeletes", "true")
.load("/delta/user_events")
However, if you have to delete data based on user_email
, then you will need to use:
events.readStream
.format("delta")
.option("ignoreChanges", "true")
.load("/delta/user_events")
If you update a user_email
with the UPDATE
statement, the file containing the user_email
in question will be rewritten. When you use ignoreChanges
, the new record is propagated downstream with all other unchanged records that were in the same file. Your logic should be able to handle these incoming duplicate records.
Delta table as a sink
You can also write data into a Delta table using Structured Streaming. The transaction log enables Delta Lake to guarantee exactly-once processing, even when there are other streams or batch queries running concurrently against the table.
Append mode
By default, streams run in append mode, which adds new records to the table.
events.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
.start("/delta/events") // as a path
( events.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
.start("/delta/events") )
Complete mode
You can also use Structured Streaming to replace the entire table with every batch. One example use case is to compute a summary using aggregation:
spark.readStream
.format("delta")
.load("/delta/events")
.groupBy("customerId")
.count()
.writeStream
.format("delta")
.outputMode("complete")
.option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg")
.start("/delta/eventsByCustomer")
The preceding example continuously updates a table that contains the aggregate number of events by customer.
For applications with more lenient latency requirements, you can save computing resources with one-time triggers. Use these to update summary aggregation tables on a given schedule, processing only new data that has arrived since the last update.