T
- The type of the events/records produced by this source.public class DeltaSource<T>
extends <any>
This source supports all (distributed) file systems and object stores that can be accessed
via the Flink's FileSystem
class.
To create a new instance of DeltaSource
for a Delta table that will produce
RowData
records that contain all table columns:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ... // Bounded mode. DeltaSource<RowData> deltaSource = DeltaSource.forBoundedRowData( new Path("s3://some/path"), new Configuration() ) .versionAsOf(10) .build(); env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source") .......... // Continuous mode. DeltaSource<RowData> deltaSource = DeltaSource.forContinuousRowData( new Path("s3://some/path"), new Configuration() ) .updateCheckIntervalMillis(1000) .startingVersion(10) .build(); env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source")
To create a new instance of DeltaSource
for a Delta table that will produce
RowData
records with user-selected columns:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ... // Bounded mode. DeltaSource<RowData> deltaSource = DeltaSource.forBoundedRowData( new Path("s3://some/path"), new Configuration() ) .columnNames(Arrays.asList("col1", "col2")) .versionAsOf(10) .build(); env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source") .......... // Continuous mode. DeltaSource<RowData> deltaSource = DeltaSource.forContinuousRowData( new Path("s3://some/path"), new Configuration() ) .columnNames(Arrays.asList("col1", "col2")) .updateCheckIntervalMillis(1000) .startingVersion(10) .build(); env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source")When using
columnNames(...)
method, the source will discover the data types for the
given columns from the Delta log.This source supports both bounded/batch and continuous/streaming modes. For the
bounded/batch case, the Delta Source processes the full state of the Delta table. In
the continuous/streaming case, the default Delta Source will also process the full state of the
table, and then begin to periodically check the Delta table for any appending changes and read
them. Using either of the RowDataContinuousDeltaSourceBuilder.startingVersion(java.lang.String)
or
RowDataContinuousDeltaSourceBuilder.startingTimestamp(java.lang.String)
APIs will cause the Delta Source,
in continuous mode, to stream only the changes from that historical version.
The reading of each file happens through file readers defined by file format. These define the parsing logic for the contents of the underlying Parquet files.
A BulkFormat
reads batches of records from a file at a time.,
The way that the source lists the files to be processes is defined by the AddFileEnumerator
. The AddFileEnumerator
is responsible to select the relevant AddFile
and to optionally splits files into multiple regions (file source splits) that can be
read in parallel.
Modifier and Type | Method and Description |
---|---|
static RowDataBoundedDeltaSourceBuilder |
forBoundedRowData(org.apache.flink.core.fs.Path tablePath,
org.apache.hadoop.conf.Configuration hadoopConfiguration)
Creates an instance of Delta source builder for Bounded mode and for
RowData
elements. |
static RowDataContinuousDeltaSourceBuilder |
forContinuousRowData(org.apache.flink.core.fs.Path tablePath,
org.apache.hadoop.conf.Configuration hadoopConfiguration)
Creates an instance of Delta source builder for Continuous mode and for
RowData
elements. |
public static RowDataBoundedDeltaSourceBuilder forBoundedRowData(org.apache.flink.core.fs.Path tablePath, org.apache.hadoop.conf.Configuration hadoopConfiguration)
RowData
elements.tablePath
- Path to Delta table to read data from.hadoopConfiguration
- Hadoop configuration.public static RowDataContinuousDeltaSourceBuilder forContinuousRowData(org.apache.flink.core.fs.Path tablePath, org.apache.hadoop.conf.Configuration hadoopConfiguration)
RowData
elements.tablePath
- Path to Delta table to read data from.hadoopConfiguration
- Hadoop configuration.