Interface used to load a streaming Dataset
from external storage systems (e.g.
Interface used to write a streaming Dataset
to external storage systems (e.g.
Interface used to write a streaming Dataset
to external storage systems (e.g. file systems,
key-value stores, etc). Use Dataset.writeStream
to access this.
2.0.0
:: Experimental ::
:: Experimental ::
Wrapper class for interacting with per-group state data in mapGroupsWithState
and
flatMapGroupsWithState
operations on KeyValueGroupedDataset
.
Detail description on [map/flatMap]GroupsWithState
operation
--------------------------------------------------------------
Both, mapGroupsWithState
and flatMapGroupsWithState
in KeyValueGroupedDataset
will invoke the user-given function on each group (defined by the grouping function in
Dataset.groupByKey()
) while maintaining user-defined per-group state between invocations.
For a static batch Dataset, the function will be invoked once per group. For a streaming
Dataset, the function will be invoked for each group repeatedly in every trigger.
That is, in every batch of the StreamingQuery
,
the function will be invoked once for each group that has data in the trigger. Furthermore,
if timeout is set, then the function will invoked on timed out groups (more detail below).
The function is invoked with following parameters.
In case of a batch Dataset, there is only one invocation and state object will be empty as
there is no prior state. Essentially, for batch Datasets, [map/flatMap]GroupsWithState
is equivalent to [map/flatMap]Groups
and any updates to the state and/or timeouts have
no effect.
The major difference between mapGroupsWithState
and flatMapGroupsWithState
is that the
former allows the function to return one and only one record, whereas the latter
allows the function to return any number of records (including no records). Furthermore, the
flatMapGroupsWithState
is associated with an operation output mode, which can be either
Append
or Update
. Semantically, this defines whether the output records of one trigger
is effectively replacing the previously output records (from previous triggers) or is appending
to the list of previously output records. Essentially, this defines how the Result Table (refer
to the semantics in the programming guide) is updated, and allows us to reason about the
semantics of later operations.
Important points to note about the function (both mapGroupsWithState and flatMapGroupsWithState).
GroupStateTimeout
below.Important points to note about using GroupState
.
IllegalArgumentException
.GroupState
are not thread-safe. This is to avoid memory barriers.remove()
is called, then exists()
will return false
,
get()
will throw NoSuchElementException
and getOption()
will return None
update(newState)
is called, then exists()
will again return true
,
get()
and getOption()
will return the updated value.Important points to note about using GroupStateTimeout
.
timeout
param in
[map|flatMap]GroupsWithState
, but the exact timeout duration/timestamp is configurable per
group by calling setTimeout...()
in GroupState
.GroupStateTimeout.ProcessingTimeTimeout
) or event time (i.e.
GroupStateTimeout.EventTimeTimeout
).ProcessingTimeTimeout
, the timeout duration can be set by calling
GroupState.setTimeoutDuration
. The timeout will occur when the clock has advanced by the set
duration. Guarantees provided by this timeout with a duration of D ms are as follows:EventTimeTimeout
, the user also has to specify the the the event time watermark in
the query using Dataset.withWatermark()
. With this setting, data that is older than the
watermark are filtered out. The timeout can be set for a group by setting a timeout timestamp
usingGroupState.setTimeoutTimestamp()
, and the timeout would occur when the watermark
advances beyond the set timestamp. You can control the timeout delay by two parameters -
(i) watermark delay and an additional duration beyond the timestamp in the event (which
is guaranteed to be newer than watermark due to the filtering). Guarantees provided by this
timeout are as follows:GroupState.hasTimedOut()
set to true.Scala example of using GroupState in mapGroupsWithState
:
// A mapping function that maintains an integer state for string keys and returns a string. // Additionally, it sets a timeout to remove the state if it has not received data for an hour. def mappingFunction(key: String, value: Iterator[Int], state: GroupState[Int]): String = { if (state.hasTimedOut) { // If called when timing out, remove the state state.remove() } else if (state.exists) { // If state exists, use it for processing val existingState = state.get // Get the existing state val shouldRemove = ... // Decide whether to remove the state if (shouldRemove) { state.remove() // Remove the state } else { val newState = ... state.update(newState) // Set the new state state.setTimeoutDuration("1 hour") // Set the timeout } } else { val initialState = ... state.update(initialState) // Set the initial state state.setTimeoutDuration("1 hour") // Set the timeout } ... // return something } dataset .groupByKey(...) .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(mappingFunction)
Java example of using GroupState
:
// A mapping function that maintains an integer state for string keys and returns a string. // Additionally, it sets a timeout to remove the state if it has not received data for an hour. MapGroupsWithStateFunction<String, Integer, Integer, String> mappingFunction = new MapGroupsWithStateFunction<String, Integer, Integer, String>() { @Override public String call(String key, Iterator<Integer> value, GroupState<Integer> state) { if (state.hasTimedOut()) { // If called when timing out, remove the state state.remove(); } else if (state.exists()) { // If state exists, use it for processing int existingState = state.get(); // Get the existing state boolean shouldRemove = ...; // Decide whether to remove the state if (shouldRemove) { state.remove(); // Remove the state } else { int newState = ...; state.update(newState); // Set the new state state.setTimeoutDuration("1 hour"); // Set the timeout } } else { int initialState = ...; // Set the initial state state.update(initialState); state.setTimeoutDuration("1 hour"); // Set the timeout } ... // return something } }; dataset .groupByKey(...) .mapGroupsWithState( mappingFunction, Encoders.INT, Encoders.STRING, GroupStateTimeout.ProcessingTimeTimeout);
User-defined type of the state to be stored for each group. Must be encodable into
Spark SQL types (see Encoder
for more details).
2.2.0
Information about progress made for a sink in the execution of a StreamingQuery during a trigger.
Information about progress made for a sink in the execution of a StreamingQuery during a trigger. See StreamingQueryProgress for more information.
2.1.0
Information about progress made for a source in the execution of a StreamingQuery during a trigger.
Information about progress made for a source in the execution of a StreamingQuery during a trigger. See StreamingQueryProgress for more information.
2.1.0
Information about updates made to stateful operators in a StreamingQuery during a trigger.
Information about updates made to stateful operators in a StreamingQuery during a trigger.
A handle to a query that is executing continuously in the background as new data arrives.
A handle to a query that is executing continuously in the background as new data arrives. All these methods are thread-safe.
2.0.0
Exception that stopped a StreamingQuery.
Exception that stopped a StreamingQuery. Use cause
get the actual exception
that caused the failure.
2.0.0
Interface for listening to events related to StreamingQueries.
Interface for listening to events related to StreamingQueries.
2.0.0
The methods are not thread-safe as they may be called from different threads.
A class to manage all the StreamingQuery active in a SparkSession
.
A class to manage all the StreamingQuery active in a SparkSession
.
2.0.0
Information about progress made in the execution of a StreamingQuery during a trigger.
Information about progress made in the execution of a StreamingQuery during a trigger. Each event relates to processing done for a single trigger of the streaming query. Events are emitted even when no new data is available to be processed.
2.1.0
Reports information about the instantaneous status of a streaming query.
Reports information about the instantaneous status of a streaming query.
2.1.0
A trigger that runs a query periodically based on the processing time.
A trigger that runs a query periodically based on the processing time. If interval
is 0,
the query will run as fast as possible.
Scala Example:
df.writeStream.trigger(ProcessingTime("10 seconds")) import scala.concurrent.duration._ df.writeStream.trigger(ProcessingTime(10.seconds))
Java Example:
df.writeStream.trigger(ProcessingTime.create("10 seconds")) import java.util.concurrent.TimeUnit df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
(Since version 2.2.0) use Trigger.ProcessingTime(intervalMs)
2.0.0
Companion object of StreamingQueryListener that defines the listener events.
Companion object of StreamingQueryListener that defines the listener events.
2.0.0
Used to create ProcessingTime triggers for StreamingQuerys.
Used to create ProcessingTime triggers for StreamingQuerys.
(Since version 2.2.0) use Trigger.ProcessingTime(intervalMs)
2.0.0
Interface used to load a streaming
Dataset
from external storage systems (e.g. file systems, key-value stores, etc). UseSparkSession.readStream
to access this.2.0.0