Kafka Plugin
Ingest Kafka messages into Phoenix using PhoenixConsumer.
The plugin enables reliable and efficient streaming of large amounts of data/logs into HBase using the Phoenix API.
Apache Kafka™ is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.
At a high level, producers send messages over the network to the Kafka cluster, which then serves them to consumers:

Phoenix provides PhoenixConsumer to receive messages from Kafka producers.
Prerequisites
- Phoenix 4.10.0+
- Kafka 0.9.0.0+
Installation and Setup
Use our binary artifacts for Phoenix 4.10.0+ directly or download and build Phoenix yourself (see instructions here).
PhoenixConsumer with RegexEventSerializer
Create a kafka-consumer-regex.properties file with the following properties:
serializer=regex
serializer.rowkeyType=uuid
serializer.regex=([^\,]*),([^\,]*),([^\,]*)
serializer.columns=c1,c2,c3
jdbcUrl=jdbc:phoenix:localhost
table=SAMPLE1
ddl=CREATE TABLE IF NOT EXISTS SAMPLE1(uid VARCHAR NOT NULL,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR CONSTRAINT pk PRIMARY KEY(uid))
bootstrap.servers=localhost:9092
topics=topic1,topic2
poll.timeout.ms=100PhoenixConsumer with JsonEventSerializer
Create a kafka-consumer-json.properties file with the following properties:
serializer=json
serializer.rowkeyType=uuid
serializer.columns=c1,c2,c3
jdbcUrl=jdbc:phoenix:localhost
table=SAMPLE2
ddl=CREATE TABLE IF NOT EXISTS SAMPLE2(uid VARCHAR NOT NULL,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR CONSTRAINT pk PRIMARY KEY(uid))
bootstrap.servers=localhost:9092
topics=topic1,topic2
poll.timeout.ms=100PhoenixConsumer Execution Procedure
Start the Kafka producer, then send some messages:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1Learn more about Apache Kafka here.
Start PhoenixConsumer using the command below:
HADOOP_CLASSPATH=$(hbase classpath):/path/to/hbase/conf hadoop jar phoenix-kafka-<version>-minimal.jar org.apache.phoenix.kafka.consumer.PhoenixConsumerTool --file /data/kafka-consumer.propertiesThe input file must be present on HDFS (not the local filesystem where the command is being run).
Configuration
| Property Name | Default | Description |
|---|---|---|
bootstrap.servers | List of Kafka servers used to bootstrap connections to Kafka. Format: host1:port1,host2:port2,... | |
topics | List of topics to use as input for this connector. Format: topic1,topic2,... | |
poll.timeout.ms | 100 | Default poll timeout in milliseconds. |
batchSize | 100 | Default number of events per transaction. |
zookeeperQuorum | ZooKeeper quorum of the HBase cluster. | |
table | Name of the table in HBase to write to. | |
ddl | The CREATE TABLE query for the HBase table where events will be upserted. If specified, this query is executed. It is recommended to include IF NOT EXISTS in the DDL. | |
serializer | Event serializer for processing Kafka messages. This plugin supports Phoenix Flume event serializers (for example, regex, json). | |
serializer.regex | (.*) | Regular expression for parsing the message. |
serializer.columns | Columns that will be extracted from the message for inserting into HBase. | |
serializer.headers | Headers that go as part of the UPSERT query. Data type for these columns is VARCHAR by default. | |
serializer.rowkeyType | Custom row key generator. Can be one of timestamp, date, uuid, random, or nanotimestamp. Configure this when a custom row key should be auto-generated for the primary key column. |
Note: This plugin supports Phoenix Flume event serializers.
- RegexEventSerializer parses Kafka messages based on the regex specified in the configuration file.
- JsonEventSerializer parses Kafka messages based on the schema specified in the configuration file.