Phoenix favicon

Apache Phoenix

Integrations

Kafka Plugin

Ingest Kafka messages into Phoenix using PhoenixConsumer.

The plugin enables reliable and efficient streaming of large amounts of data/logs into HBase using the Phoenix API.

Apache Kafka™ is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.

At a high level, producers send messages over the network to the Kafka cluster, which then serves them to consumers:

Kafka Producer and Consumer

Phoenix provides PhoenixConsumer to receive messages from Kafka producers.

Prerequisites

  • Phoenix 4.10.0+
  • Kafka 0.9.0.0+

Installation and Setup

Use our binary artifacts for Phoenix 4.10.0+ directly or download and build Phoenix yourself (see instructions here).

PhoenixConsumer with RegexEventSerializer

Create a kafka-consumer-regex.properties file with the following properties:

serializer=regex
serializer.rowkeyType=uuid
serializer.regex=([^\,]*),([^\,]*),([^\,]*)
serializer.columns=c1,c2,c3

jdbcUrl=jdbc:phoenix:localhost
table=SAMPLE1
ddl=CREATE TABLE IF NOT EXISTS SAMPLE1(uid VARCHAR NOT NULL,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR CONSTRAINT pk PRIMARY KEY(uid))

bootstrap.servers=localhost:9092
topics=topic1,topic2
poll.timeout.ms=100

PhoenixConsumer with JsonEventSerializer

Create a kafka-consumer-json.properties file with the following properties:

serializer=json
serializer.rowkeyType=uuid
serializer.columns=c1,c2,c3

jdbcUrl=jdbc:phoenix:localhost
table=SAMPLE2
ddl=CREATE TABLE IF NOT EXISTS SAMPLE2(uid VARCHAR NOT NULL,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR CONSTRAINT pk PRIMARY KEY(uid))

bootstrap.servers=localhost:9092
topics=topic1,topic2
poll.timeout.ms=100

PhoenixConsumer Execution Procedure

Start the Kafka producer, then send some messages:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1

Learn more about Apache Kafka here.

Start PhoenixConsumer using the command below:

HADOOP_CLASSPATH=$(hbase classpath):/path/to/hbase/conf hadoop jar phoenix-kafka-<version>-minimal.jar org.apache.phoenix.kafka.consumer.PhoenixConsumerTool --file /data/kafka-consumer.properties

The input file must be present on HDFS (not the local filesystem where the command is being run).

Configuration

Property NameDefaultDescription
bootstrap.serversList of Kafka servers used to bootstrap connections to Kafka. Format: host1:port1,host2:port2,...
topicsList of topics to use as input for this connector. Format: topic1,topic2,...
poll.timeout.ms100Default poll timeout in milliseconds.
batchSize100Default number of events per transaction.
zookeeperQuorumZooKeeper quorum of the HBase cluster.
tableName of the table in HBase to write to.
ddlThe CREATE TABLE query for the HBase table where events will be upserted. If specified, this query is executed. It is recommended to include IF NOT EXISTS in the DDL.
serializerEvent serializer for processing Kafka messages. This plugin supports Phoenix Flume event serializers (for example, regex, json).
serializer.regex(.*)Regular expression for parsing the message.
serializer.columnsColumns that will be extracted from the message for inserting into HBase.
serializer.headersHeaders that go as part of the UPSERT query. Data type for these columns is VARCHAR by default.
serializer.rowkeyTypeCustom row key generator. Can be one of timestamp, date, uuid, random, or nanotimestamp. Configure this when a custom row key should be auto-generated for the primary key column.

Note: This plugin supports Phoenix Flume event serializers.

  • RegexEventSerializer parses Kafka messages based on the regex specified in the configuration file.
  • JsonEventSerializer parses Kafka messages based on the schema specified in the configuration file.
Edit on GitHub

On this page