public class KafkaUtils
extends Object
Constructor and Description |
---|
KafkaUtils() |
Modifier and Type | Method and Description |
---|---|
static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R> |
createDirectStream(JavaStreamingContext jssc,
Class<K> keyClass,
Class<V> valueClass,
Class<KD> keyDecoderClass,
Class<VD> valueDecoderClass,
Class<R> recordClass,
java.util.Map<String,String> kafkaParams,
java.util.Map<kafka.common.TopicAndPartition,Long> fromOffsets,
Function<kafka.message.MessageAndMetadata<K,V>,R> messageHandler)
Create an input stream that directly pulls messages from Kafka Brokers
without using any receiver.
|
static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>> |
createDirectStream(JavaStreamingContext jssc,
Class<K> keyClass,
Class<V> valueClass,
Class<KD> keyDecoderClass,
Class<VD> valueDecoderClass,
java.util.Map<String,String> kafkaParams,
java.util.Set<String> topics)
Create an input stream that directly pulls messages from Kafka Brokers
without using any receiver.
|
static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R> |
createDirectStream(StreamingContext ssc,
scala.collection.immutable.Map<String,String> kafkaParams,
scala.collection.immutable.Map<kafka.common.TopicAndPartition,Object> fromOffsets,
scala.Function1<kafka.message.MessageAndMetadata<K,V>,R> messageHandler,
scala.reflect.ClassTag<K> evidence$14,
scala.reflect.ClassTag<V> evidence$15,
scala.reflect.ClassTag<KD> evidence$16,
scala.reflect.ClassTag<VD> evidence$17,
scala.reflect.ClassTag<R> evidence$18)
Create an input stream that directly pulls messages from Kafka Brokers
without using any receiver.
|
static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>> |
createDirectStream(StreamingContext ssc,
scala.collection.immutable.Map<String,String> kafkaParams,
scala.collection.immutable.Set<String> topics,
scala.reflect.ClassTag<K> evidence$19,
scala.reflect.ClassTag<V> evidence$20,
scala.reflect.ClassTag<KD> evidence$21,
scala.reflect.ClassTag<VD> evidence$22)
Create an input stream that directly pulls messages from Kafka Brokers
without using any receiver.
|
static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R> |
createRDD(JavaSparkContext jsc,
Class<K> keyClass,
Class<V> valueClass,
Class<KD> keyDecoderClass,
Class<VD> valueDecoderClass,
Class<R> recordClass,
java.util.Map<String,String> kafkaParams,
OffsetRange[] offsetRanges,
java.util.Map<kafka.common.TopicAndPartition,Broker> leaders,
Function<kafka.message.MessageAndMetadata<K,V>,R> messageHandler)
Create an RDD from Kafka using offset ranges for each topic and partition.
|
static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>> |
createRDD(JavaSparkContext jsc,
Class<K> keyClass,
Class<V> valueClass,
Class<KD> keyDecoderClass,
Class<VD> valueDecoderClass,
java.util.Map<String,String> kafkaParams,
OffsetRange[] offsetRanges)
Create an RDD from Kafka using offset ranges for each topic and partition.
|
static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>> |
createRDD(SparkContext sc,
scala.collection.immutable.Map<String,String> kafkaParams,
OffsetRange[] offsetRanges,
scala.reflect.ClassTag<K> evidence$5,
scala.reflect.ClassTag<V> evidence$6,
scala.reflect.ClassTag<KD> evidence$7,
scala.reflect.ClassTag<VD> evidence$8)
Create an RDD from Kafka using offset ranges for each topic and partition.
|
static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R> |
createRDD(SparkContext sc,
scala.collection.immutable.Map<String,String> kafkaParams,
OffsetRange[] offsetRanges,
scala.collection.immutable.Map<kafka.common.TopicAndPartition,Broker> leaders,
scala.Function1<kafka.message.MessageAndMetadata<K,V>,R> messageHandler,
scala.reflect.ClassTag<K> evidence$9,
scala.reflect.ClassTag<V> evidence$10,
scala.reflect.ClassTag<KD> evidence$11,
scala.reflect.ClassTag<VD> evidence$12,
scala.reflect.ClassTag<R> evidence$13)
Create an RDD from Kafka using offset ranges for each topic and partition.
|
static <K,V,U extends kafka.serializer.Decoder<?>,T extends kafka.serializer.Decoder<?>> |
createStream(JavaStreamingContext jssc,
Class<K> keyTypeClass,
Class<V> valueTypeClass,
Class<U> keyDecoderClass,
Class<T> valueDecoderClass,
java.util.Map<String,String> kafkaParams,
java.util.Map<String,Integer> topics,
StorageLevel storageLevel)
Create an input stream that pulls messages from Kafka Brokers.
|
static JavaPairReceiverInputDStream<String,String> |
createStream(JavaStreamingContext jssc,
String zkQuorum,
String groupId,
java.util.Map<String,Integer> topics)
Create an input stream that pulls messages from Kafka Brokers.
|
static JavaPairReceiverInputDStream<String,String> |
createStream(JavaStreamingContext jssc,
String zkQuorum,
String groupId,
java.util.Map<String,Integer> topics,
StorageLevel storageLevel)
Create an input stream that pulls messages from Kafka Brokers.
|
static <K,V,U extends kafka.serializer.Decoder<?>,T extends kafka.serializer.Decoder<?>> |
createStream(StreamingContext ssc,
scala.collection.immutable.Map<String,String> kafkaParams,
scala.collection.immutable.Map<String,Object> topics,
StorageLevel storageLevel,
scala.reflect.ClassTag<K> evidence$1,
scala.reflect.ClassTag<V> evidence$2,
scala.reflect.ClassTag<U> evidence$3,
scala.reflect.ClassTag<T> evidence$4)
Create an input stream that pulls messages from Kafka Brokers.
|
static ReceiverInputDStream<scala.Tuple2<String,String>> |
createStream(StreamingContext ssc,
String zkQuorum,
String groupId,
scala.collection.immutable.Map<String,Object> topics,
StorageLevel storageLevel)
Create an input stream that pulls messages from Kafka Brokers.
|
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(StreamingContext ssc, String zkQuorum, String groupId, scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
ssc
- StreamingContext objectzkQuorum
- Zookeeper quorum (hostname:port,hostname:port,..)groupId
- The group id for this consumertopics
- Map of (topic_name to numPartitions) to consume. Each partition is consumed
in its own threadstorageLevel
- Storage level to use for storing the received objects
(default: StorageLevel.MEMORY_AND_DISK_SER_2)public static <K,V,U extends kafka.serializer.Decoder<?>,T extends kafka.serializer.Decoder<?>> ReceiverInputDStream<scala.Tuple2<K,V>> createStream(StreamingContext ssc, scala.collection.immutable.Map<String,String> kafkaParams, scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel, scala.reflect.ClassTag<K> evidence$1, scala.reflect.ClassTag<V> evidence$2, scala.reflect.ClassTag<U> evidence$3, scala.reflect.ClassTag<T> evidence$4)
ssc
- StreamingContext objectkafkaParams
- Map of kafka configuration parameters,
see http://kafka.apache.org/08/configuration.htmltopics
- Map of (topic_name to numPartitions) to consume. Each partition is consumed
in its own thread.storageLevel
- Storage level to use for storing the received objectsevidence$1
- (undocumented)evidence$2
- (undocumented)evidence$3
- (undocumented)evidence$4
- (undocumented)public static JavaPairReceiverInputDStream<String,String> createStream(JavaStreamingContext jssc, String zkQuorum, String groupId, java.util.Map<String,Integer> topics)
jssc
- JavaStreamingContext objectzkQuorum
- Zookeeper quorum (hostname:port,hostname:port,..)groupId
- The group id for this consumertopics
- Map of (topic_name to numPartitions) to consume. Each partition is consumed
in its own threadpublic static JavaPairReceiverInputDStream<String,String> createStream(JavaStreamingContext jssc, String zkQuorum, String groupId, java.util.Map<String,Integer> topics, StorageLevel storageLevel)
jssc
- JavaStreamingContext objectzkQuorum
- Zookeeper quorum (hostname:port,hostname:port,..).groupId
- The group id for this consumer.topics
- Map of (topic_name to numPartitions) to consume. Each partition is consumed
in its own thread.storageLevel
- RDD storage level.public static <K,V,U extends kafka.serializer.Decoder<?>,T extends kafka.serializer.Decoder<?>> JavaPairReceiverInputDStream<K,V> createStream(JavaStreamingContext jssc, Class<K> keyTypeClass, Class<V> valueTypeClass, Class<U> keyDecoderClass, Class<T> valueDecoderClass, java.util.Map<String,String> kafkaParams, java.util.Map<String,Integer> topics, StorageLevel storageLevel)
jssc
- JavaStreamingContext objectkeyTypeClass
- Key type of DStreamvalueTypeClass
- value type of DstreamkeyDecoderClass
- Type of kafka key decodervalueDecoderClass
- Type of kafka value decoderkafkaParams
- Map of kafka configuration parameters,
see http://kafka.apache.org/08/configuration.htmltopics
- Map of (topic_name to numPartitions) to consume. Each partition is consumed
in its own threadstorageLevel
- RDD storage level.public static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>> RDD<scala.Tuple2<K,V>> createRDD(SparkContext sc, scala.collection.immutable.Map<String,String> kafkaParams, OffsetRange[] offsetRanges, scala.reflect.ClassTag<K> evidence$5, scala.reflect.ClassTag<V> evidence$6, scala.reflect.ClassTag<KD> evidence$7, scala.reflect.ClassTag<VD> evidence$8)
sc
- SparkContext objectkafkaParams
- Kafka
configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers"
to be set with Kafka broker(s) (NOT zookeeper servers) specified in
host1:port1,host2:port2 form.offsetRanges
- Each OffsetRange in the batch corresponds to a
range of offsets for a given Kafka topic/partitionevidence$5
- (undocumented)evidence$6
- (undocumented)evidence$7
- (undocumented)evidence$8
- (undocumented)public static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R> RDD<R> createRDD(SparkContext sc, scala.collection.immutable.Map<String,String> kafkaParams, OffsetRange[] offsetRanges, scala.collection.immutable.Map<kafka.common.TopicAndPartition,Broker> leaders, scala.Function1<kafka.message.MessageAndMetadata<K,V>,R> messageHandler, scala.reflect.ClassTag<K> evidence$9, scala.reflect.ClassTag<V> evidence$10, scala.reflect.ClassTag<KD> evidence$11, scala.reflect.ClassTag<VD> evidence$12, scala.reflect.ClassTag<R> evidence$13)
sc
- SparkContext objectkafkaParams
- Kafka
configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers"
to be set with Kafka broker(s) (NOT zookeeper servers) specified in
host1:port1,host2:port2 form.offsetRanges
- Each OffsetRange in the batch corresponds to a
range of offsets for a given Kafka topic/partitionleaders
- Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map,
in which case leaders will be looked up on the driver.messageHandler
- Function for translating each message and metadata into the desired typeevidence$9
- (undocumented)evidence$10
- (undocumented)evidence$11
- (undocumented)evidence$12
- (undocumented)evidence$13
- (undocumented)public static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>> JavaPairRDD<K,V> createRDD(JavaSparkContext jsc, Class<K> keyClass, Class<V> valueClass, Class<KD> keyDecoderClass, Class<VD> valueDecoderClass, java.util.Map<String,String> kafkaParams, OffsetRange[] offsetRanges)
jsc
- JavaSparkContext objectkafkaParams
- Kafka
configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers"
to be set with Kafka broker(s) (NOT zookeeper servers) specified in
host1:port1,host2:port2 form.offsetRanges
- Each OffsetRange in the batch corresponds to a
range of offsets for a given Kafka topic/partitionkeyClass
- type of Kafka message keyvalueClass
- type of Kafka message valuekeyDecoderClass
- type of Kafka message key decodervalueDecoderClass
- type of Kafka message value decoderpublic static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R> JavaRDD<R> createRDD(JavaSparkContext jsc, Class<K> keyClass, Class<V> valueClass, Class<KD> keyDecoderClass, Class<VD> valueDecoderClass, Class<R> recordClass, java.util.Map<String,String> kafkaParams, OffsetRange[] offsetRanges, java.util.Map<kafka.common.TopicAndPartition,Broker> leaders, Function<kafka.message.MessageAndMetadata<K,V>,R> messageHandler)
jsc
- JavaSparkContext objectkafkaParams
- Kafka
configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers"
to be set with Kafka broker(s) (NOT zookeeper servers) specified in
host1:port1,host2:port2 form.offsetRanges
- Each OffsetRange in the batch corresponds to a
range of offsets for a given Kafka topic/partitionleaders
- Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map,
in which case leaders will be looked up on the driver.messageHandler
- Function for translating each message and metadata into the desired typekeyClass
- (undocumented)valueClass
- (undocumented)keyDecoderClass
- (undocumented)valueDecoderClass
- (undocumented)recordClass
- (undocumented)public static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R> InputDStream<R> createDirectStream(StreamingContext ssc, scala.collection.immutable.Map<String,String> kafkaParams, scala.collection.immutable.Map<kafka.common.TopicAndPartition,Object> fromOffsets, scala.Function1<kafka.message.MessageAndMetadata<K,V>,R> messageHandler, scala.reflect.ClassTag<K> evidence$14, scala.reflect.ClassTag<V> evidence$15, scala.reflect.ClassTag<KD> evidence$16, scala.reflect.ClassTag<VD> evidence$17, scala.reflect.ClassTag<R> evidence$18)
Points to note:
- No receivers: This stream does not use any receiver. It directly queries Kafka
- Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
by the stream itself. For interoperability with Kafka monitoring tools that depend on
Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
You can access the offsets used in each batch from the generated RDDs (see
HasOffsetRanges
).
- Failure Recovery: To recover from driver failures, you have to enable checkpointing
in the StreamingContext
. The information on consumed offset can be
recovered from the checkpoint. See the programming guide for details (constraints, etc.).
- End-to-end semantics: This stream ensures that every records is effectively received and
transformed exactly once, but gives no guarantees on whether the transformed data are
outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
that the output operation is idempotent, or use transactions to output records atomically.
See the programming guide for more details.
ssc
- StreamingContext objectkafkaParams
- Kafka
configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers"
to be set with Kafka broker(s) (NOT zookeeper servers) specified in
host1:port1,host2:port2 form.fromOffsets
- Per-topic/partition Kafka offsets defining the (inclusive)
starting point of the streammessageHandler
- Function for translating each message and metadata into the desired typeevidence$14
- (undocumented)evidence$15
- (undocumented)evidence$16
- (undocumented)evidence$17
- (undocumented)evidence$18
- (undocumented)public static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>> InputDStream<scala.Tuple2<K,V>> createDirectStream(StreamingContext ssc, scala.collection.immutable.Map<String,String> kafkaParams, scala.collection.immutable.Set<String> topics, scala.reflect.ClassTag<K> evidence$19, scala.reflect.ClassTag<V> evidence$20, scala.reflect.ClassTag<KD> evidence$21, scala.reflect.ClassTag<VD> evidence$22)
Points to note:
- No receivers: This stream does not use any receiver. It directly queries Kafka
- Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
by the stream itself. For interoperability with Kafka monitoring tools that depend on
Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
You can access the offsets used in each batch from the generated RDDs (see
HasOffsetRanges
).
- Failure Recovery: To recover from driver failures, you have to enable checkpointing
in the StreamingContext
. The information on consumed offset can be
recovered from the checkpoint. See the programming guide for details (constraints, etc.).
- End-to-end semantics: This stream ensures that every records is effectively received and
transformed exactly once, but gives no guarantees on whether the transformed data are
outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
that the output operation is idempotent, or use transactions to output records atomically.
See the programming guide for more details.
ssc
- StreamingContext objectkafkaParams
- Kafka
configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers"
to be set with Kafka broker(s) (NOT zookeeper servers), specified in
host1:port1,host2:port2 form.
If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
to determine where the stream starts (defaults to "largest")topics
- Names of the topics to consumeevidence$19
- (undocumented)evidence$20
- (undocumented)evidence$21
- (undocumented)evidence$22
- (undocumented)public static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R> JavaInputDStream<R> createDirectStream(JavaStreamingContext jssc, Class<K> keyClass, Class<V> valueClass, Class<KD> keyDecoderClass, Class<VD> valueDecoderClass, Class<R> recordClass, java.util.Map<String,String> kafkaParams, java.util.Map<kafka.common.TopicAndPartition,Long> fromOffsets, Function<kafka.message.MessageAndMetadata<K,V>,R> messageHandler)
Points to note:
- No receivers: This stream does not use any receiver. It directly queries Kafka
- Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
by the stream itself. For interoperability with Kafka monitoring tools that depend on
Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
You can access the offsets used in each batch from the generated RDDs (see
HasOffsetRanges
).
- Failure Recovery: To recover from driver failures, you have to enable checkpointing
in the StreamingContext
. The information on consumed offset can be
recovered from the checkpoint. See the programming guide for details (constraints, etc.).
- End-to-end semantics: This stream ensures that every records is effectively received and
transformed exactly once, but gives no guarantees on whether the transformed data are
outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
that the output operation is idempotent, or use transactions to output records atomically.
See the programming guide for more details.
jssc
- JavaStreamingContext objectkeyClass
- Class of the keys in the Kafka recordsvalueClass
- Class of the values in the Kafka recordskeyDecoderClass
- Class of the key decodervalueDecoderClass
- Class of the value decoderrecordClass
- Class of the records in DStreamkafkaParams
- Kafka
configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers"
to be set with Kafka broker(s) (NOT zookeeper servers), specified in
host1:port1,host2:port2 form.fromOffsets
- Per-topic/partition Kafka offsets defining the (inclusive)
starting point of the streammessageHandler
- Function for translating each message and metadata into the desired typepublic static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>> JavaPairInputDStream<K,V> createDirectStream(JavaStreamingContext jssc, Class<K> keyClass, Class<V> valueClass, Class<KD> keyDecoderClass, Class<VD> valueDecoderClass, java.util.Map<String,String> kafkaParams, java.util.Set<String> topics)
Points to note:
- No receivers: This stream does not use any receiver. It directly queries Kafka
- Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
by the stream itself. For interoperability with Kafka monitoring tools that depend on
Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
You can access the offsets used in each batch from the generated RDDs (see
HasOffsetRanges
).
- Failure Recovery: To recover from driver failures, you have to enable checkpointing
in the StreamingContext
. The information on consumed offset can be
recovered from the checkpoint. See the programming guide for details (constraints, etc.).
- End-to-end semantics: This stream ensures that every records is effectively received and
transformed exactly once, but gives no guarantees on whether the transformed data are
outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
that the output operation is idempotent, or use transactions to output records atomically.
See the programming guide for more details.
jssc
- JavaStreamingContext objectkeyClass
- Class of the keys in the Kafka recordsvalueClass
- Class of the values in the Kafka recordskeyDecoderClass
- Class of the key decodervalueDecoderClass
- Class type of the value decoderkafkaParams
- Kafka
configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers"
to be set with Kafka broker(s) (NOT zookeeper servers), specified in
host1:port1,host2:port2 form.
If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
to determine where the stream starts (defaults to "largest")topics
- Names of the topics to consume