Welcome!

Blog Feed Post

Kafka Connect Elasticsearch: Consuming and Indexing with Kafka Connect

In the world of DevOps, metric collection, log centralization and analysis Apache Kafka is the most commonly used middleware.  More specifically, it is used as a fast, persistent queue between data sources like log shippers and the storage that makes our data, such as logs, searchable.  In this post we’ll use the log shipping use case and Elasticsearch as logs storage, but the same principles apply to other forms of data. We assume that the data that comes to Kafka is already JSON formatted, so please keep that in mind. We also describe how to ship logs to Logsene at the very end.



Most systems we see in our logging consulting practise use Kafka to achieve high availability, fault tolerance, and expose incoming data to various consumers and have ingestion pipeline that looks a bit like this:

Kafka Log Centralisation Sematexthttps://sematext.com/wp-content/uploads/2017/03/Kafka-300x145.jpg 300w" sizes="(max-width: 490px) 100vw, 490px" />

There are lots of options when it comes to choosing the right log shipper and getting data into Kafka. We can use Logstash or one of several Logstash alternatives, such as rsyslog, Filebeat, Logagent, or anything that suits our needs – the lighter the better.

Once you figure out how to get data into Kafka the question how to get it out of Kafka and into something like Elasticsearch is arising.The answer is often “it depends”. You could implement your own solution on top of Kafka API – a consumer that will do whatever you code it to do. However, that is time consuming, requires at least basic knowledge of Kafka and Elasticsearch, is error prone and finally requires us to spend time on code management.

Instead, we could use one of the ready to use solutions like Logstash which is powerful and versatile, but if we do that we still have to care about fault tolerance and single point of failure. So, if we are seeking a solution that is less powerful when it comes to processing capabilities, but comes with out of the box distribution based on already present system component – Kafka Connect Elasticsearch (https://github.com/confluentinc/kafka-connect-elasticsearch) may be a good thing to look at.

Introducing Kafka Connect for Elasticsearch

Current Kafka versions ship with Kafka Connect – a connector framework that provides the backbone functionality that lets you connect Kafka to various external systems and either get data into Kafka or get it out. It makes it possible to quickly develop connectors that move data to or from Kafka and can leverage Kafka distributed capabilities making data flow fault tolerant and highly available. What’s more, the connector framework provides an easy to use REST API for retrieval of information about connectors, updating configuration, and managing connectors themselves. It is also highly configurable, works in both standalone and distributed mode and finally, it is easy to use.

One of the available connectors is Kafka Connect Elasticsearch which allows sending data from Kafka to Elasticsearch It uses Jest, which is a HTTP based Elasticsearch client library, which should avoid incompatibilities with different Elasticsearch versions, at least minor one. In this blog post we will see how to quickly set up this connector to send data from a Kafka topic to Elasticsearch.

Test Setup

Our test setup will be very simple: one Zookeeper instance, one Kafka broker, and one Elasticsearch node, all installed on a single machine and listening on the following ports:

  • Zookeeper – 2181
  • Kafka –  9092
  • Elasticsearch – 9200

Send data from Kafka to Elasticsearch - Sematexthttps://sematext.com/wp-content/uploads/2017/03/Kafka-2-300x87.jpg 300w" sizes="(max-width: 490px) 100vw, 490px" />

We assume that we already have a logs topic created in Kafka and we would like to send data to an index called logs_index in Elasticsearch. To simplify our test we will use Kafka Console Producer to ingest data into Kafka.

We will use Elasticsearch 2.3.2 because of compatibility issues described in issue #55 and Kafka 0.10.0. We use Kafka 0.10.0 to avoid build issues.

Building Elasticsearch Connector

When using Kafka Connect Elasticsearch, you can download one of the releases or build it yourself.

We will focus on building the package, just so you know how easy that can be done and you can use the newest version of the connector with your Kafka version. You’ll need Maven.

Clone the appropriate branch of the project:

$ git clone -b 0.10.0.0 https://github.com/confluentinc/kafka-connect-elasticsearch.git

Built it:

$ cd kafka-connect-elasticsearch
$ mvn clean package

Once Maven downloads needed libraries, compiles and builds the code you should see a message similar to the following one:

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 24.825 s
[INFO] Finished at: 2017-02-27T15:03:51+01:00
[INFO] Final Memory: 39M/558M
[INFO] ------------------------------------------------------------------------

That means that the connector has been built and we can find it in the target/kafka-connect-elasticsearch-3.2.0-SNAPSHOT-package/share/java/kafka-connect-elasticsearch/ directory. The final thing that we need to do is copying all the libraries from that directory to Kafka libs directory. Keep in mind that you have to do this on all your servers that will run the connector. For example, if you plan on running the connector in distributed mode it would be good to have the libraries on all your Kafka brokers.

Configuring Elasticsearch Connector

Before running Kafka Connect Elasticsearch we need to configure it. We’ll create elasticsearch-connect.properties with the following content:

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=logs
topic.index.map=logs:logs_index
connection.url=http://localhost:9200
type.name=log
key.ignore=true
schema.ignore=true

This file will be provided as one of the configuration files and will define the behavior of the connector. We said that we wanted to use io.confluent.connect.elasticsearch.ElasticsearchSinkConnector sink, which will be responsible for sending data to Elasticsearch and we set its name to elasticsearch-sink. The name should be unique for a given connector. We also said that we want a single task to be created for that connector to work (tasks.max property), but Kafka may create fewer tasks if it can’t achieve the specified level of parallelism. We want to read data from the logs topic (keep in mind that we can specify multiple topics as the source of the data using the topics property) and that data from the logs topics should be placed in an index called logs_index (using the topic.index.map) property. We want to use local Elasticsearch instance (specified using connection.url) and the data should use the log type (because of the value of the type.name property). Finally, we told Kafka Connect to ignore key and schema by using the key.ignore=true and schema.ignore=true, because we assume that we will use the templates in Elasticsearch to control the data structure and analysis, which is a good practice in general.

Running Kafka Connect Elasticsearch in Standalone Mode

To run the connector in standalone mode we will use the connect-standalone.sh which is provided with Kafka and can be found in the bin directory. It requires two configuration files, the one that we already created and another one, which we will call connect-standalone.properties and which will have the following contents:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

It defines Kafka brokers list, key and value converters, whether schemas should be used, and so on.

After we are done with the configuration, to run the connector we just execute the following command:

$ bin/connect-standalone.sh config/connect-standalone.properties config/elasticsearch-connect.properties

This will start the connector as a separate JVM process on the same server that Kafka is started and any data you put in the defined topic in Kafka will be sent to Elasticsearch. However – we have a single point of failure here – a single instance of the connector. We can run Kafka Connect Elasticsearch connector in distributed mode to leverage the distributed nature of Kafka itself, so let’s do that next.

Running Kafka Connect Elasticsearch in Distributed Mode

Running Kafka Connect Elasticsearch in a standalone mode is fine, but it lacks the main benefits of using Kafka Connect – leveraging the distributed nature of Kafka, fault tolerance, and high availability. The difference in running the connector in standalone and distributed mode is where Kafka Connect stores the configuration, how it assigns where the work should be done, where to store the offsets and tasks statuses.

Let’s start with the configuration. We will store it in the connect-distributed.properties file:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.flush.interval.ms=10000
group.id=connect-cluster
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status

We already covered some of the properties shown above. The new properties are:

  • group.id – the identifier of the cluster for Kafka Connect group. It should be unique and must not interfere with consumers reading data from the given Kafka cluster.
  • offset.storage.topic – the name of the topic Kafka Connect will use to store offsets. The idea behind this topic is to have many partitions, be replicated and configured for compaction.
  • config.storage.topic – the name of the topic Kafka Connect will use to store configuration. The idea behind the topic configuration is to have a single partition and be highly replicated.
  • status.storage.topic – the name of the topic Kafka Connect will use to store work status.  It should have multiple partitions, replicas and be compacted.

Once we have such configuration, we can start the connector in distributed mode:

$ bin/connect-distributed.sh config/connect-distributed.properties

You may have noticed one difference compared to running Kafka Connect in standalone mode – we didn’t provide the configuration for the connector itself. This is not a mistake!  When using Kafka Connect in distributed mode we need to create connectors using the REST API. Let’s not focus on this now, though, as we’ll get to that a bit later.

So what is the difference between standalone and distributed Kafka Connect mode? Both end up running in their own JVM process as Kafka Connect clients and as such they both need access to Kafka libraries, which is why running them on Kafka brokers makes sense. The major difference is in how the job is executed. In the standalone mode the work is performed in a single process, while in distributed mode it is shared by all available Kafka Connect client instances running along Kafka broker instances. Another difference is in where the client stores its configuration – in distributed mode it is stored inside Kafka, in its own topics defined by the configuration (using the offset.storage.topic, config.storage.topic and status.storage.topic properties). In standalone mode offsets are stored in the configuration file specified by the offset.storage.file.filename property. So, if you have your Kafka Connect Elasticsearch running in distributed mode you can leverage multiple instances of it and either create multiple tasks (using the tasks.max property) or rely on failover that comes for free if you are running Kafka Connect in distributed mode and you have multiple instances of Kafka Connect Elasticsearch started.  For example, if you set max.tasks=1 and have 2 instances of Kafka Connect started, than when one fails, the other will get the task to execute.

Troubleshooting

There are two places to look at when troubleshooting: Kafka broker logs and Kafka client logs. In Kafka broker logs you’ll find issues with classes not being found and Kafka related errors. Kafka client logs hold info from Kafka client that is started when you launched Kafka Connect Elasticsearch. By default both of them are available on standard output, but you can configure that using properties file (log4j.properties for Kafka and connect-log4j.properties for Kafka Connect).

Sending Example Data

Now we can produce some data. Keep in mind that we assumed that the data stored in Kafka will be in JSON format, so we need to stick to that. Let’s start the simple console producer that comes with Kafka:

$ bin/kafka-console-producer.sh --topic logs --broker-list localhost:9092

And start sending JSON logs, such as these:

{"name":"Test log", "severity": "INFO"}
{"name":"Test log 2", "severity": "WARN"}

To test that everything is working, we just need to run a simple Elasticsearch query:

$ curl -XGET 'localhost:9200/logs_index/_search?pretty'

In return we should get the following response:

{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 3,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "logs_index",
      "_type" : "true",
      "_id" : "logs+0+2",
      "_score" : 1.0,
      "_source" : {
        "severity" : "WARN",
        "name" : "Test log 2"
      }
    }, {
      "_index" : "logs_index",
      "_type" : "true",
      "_id" : "logs+0+1",
      "_score" : 1.0,
      "_source" : {
        "severity" : "INFO",
        "name" : "Test log"
      }
    } ]
  }
}

As you can see everything works as it should. With very little effort we got ourselves a quick and easy way of indexing data from our Kafka cluster to Elasticsearch. What’s more, this is not only simple shipping, but really distributed indexing that handles failover and fault tolerance out of the box, without us needing to do anything apart from connector configuration.

Kafka Connect REST API

By default the REST API service runs on port 8083. You can check if it is running, by executing a simple curl command:

$ curl 'localhost:8083'

In response you will get the version and the commit hash of the build:

{"version":"0.10.2.0","commit":"576d93a8dc0cf421"}

We can retrieve the list of running connectors:

$ curl 'localhost:8083/connectors'

For example, in case of running our Elasticsearch connector in standalone mode the response will be as follows:

["elasticsearch-sink"]

To retrieve the list of connector plugins installed in the Kafka cluster:

$ curl 'localhost:8083/connector-plugins'

The response looks as follows:

[{"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector"}]

To retrieve the configuration:

$ curl 'localhost:8083/connectors/elasticsearch-sink/config'

Which will result in the following response in our case:

{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type.name":"true","topics":"logs","tasks.max":"1","topic.index.map":"logs:logs_index","name":"elasticsearch-sink","connection.url":"http://localhost:9200","key.ignore":"true","schema.ignore":"true"}

Check the status of the given connector:

$ curl 'localhost:8083/connectors/elasticsearch-sink/status'

Retrieve tasks running by a given connector:

$ curl 'localhost:8083/connectors/elasticsearch-sink/tasks

Of course, this is not everything and we are not limited to retrieval part of the API. We can add new connectors by running a HTTP POST command to the /connectors end-point with the name and configuration parameters in the body (as a JSON object). For example:

$ curl -XPOST -H 'Content-type:application/json' 'localhost:8083/connectors' -d '{
 "name" : "second_es_sink",
 "config" : {
  "connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "tasks.max" : "1",
  "topics" : "logs", 
  "topic.index.map" : "logs:logs_index",
  "connection.url" : "http://localhost:9200",
  "type.name" : "true",
  "key.ignore" : "true",
  "schema.ignore" : "true"
 }
}'

We can pause, resume and restart connector if failed:

$ curl -XPUT 'localhost:8083/connectors/elasticsearch-sink/pause'
$ curl -XPUT 'localhost:8083/connectors/elasticsearch-sink/resume'
$ curl -XPOST 'localhost:8083/connectors/elasticsearch-sink/restart'

Please keep in mind that the restart uses POST HTTP verb, while the pause and resume use PUT HTTP verb.

Finally, we can remove a given connector using DELETE HTTP verb and providing the name of the connector:

$ curl -XDELETE 'localhost:8083/connectors/second_es_sink'

If you want to read about all the options regarding Kafka Connect REST API, please refer to documentation available at http://kafka.apache.org/documentation.html#connect_rest.

Sending the Data to Logsene

If you are one of our happy Logsene users you can use Kafka Connect Elasticsearch to ship logs from your Kafka to Logsene. You can use either the standalone or distributed mode, just as described above. The only thing you need to change in your configuration are the topic.index.map and connection.url properties.

The topic.index.map property needs to map the topic name to your Logsene app token:

topic.index.map=logs:your-logsene-app-token-here

The connection.url property needs to be pointed to the Logsene ingestion endpoint:

connection.url=http://logsene-receiver.sematext.com:80

The full standalone configuration would then look as follows:

name=logsene-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=logs
topic.index.map=logs:your-logsene-app-token-here
key.ignore=true
schema.ignore=true
connection.url=http://logsene-receiver.sematext.com:80
type.name=log

Once you have that done you can run the connector in standalone mode. Setting the distributed mode to work with Logsene is also simple, the only thing you need to do is run the following command which uses the Kafka Connect REST API:

$ curl -XPOST -H 'Content-type:application/json' 'localhost:8083/connectors' -d '{
 "name" : "logsene-sink",
 "config" : {
  "connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "tasks.max" : "1",
  "topics" : "logs", 
  "topic.index.map" : "logs:your-logsene-app-token-here",
  "connection.url" : "http://logsene-receiver.sematext.com:80",
  "type.name" : "true",
  "key.ignore" : "true",
  "schema.ignore" : "true"
 }
}'

And that’s it!  You can ship your logs from your Kafka to Logsene without needing to run additional log shippers, only Kafka Connect for Elasticsearch.



Summary

If you are looking for a quick, fault tolerant and efficient way of pushing data from your Kafka cluster to Elasticsearch or Logsene, or any of the other supported integrations, Kafka Connect may be a good way to go. Apart from leveraging the distributed nature of Kafka and the API, it also provides simple transformation logic making it a very versatile tool for data export. Keep an eye for the next post that will cover how to use the mentioned transformations and more.

Read the original blog entry...

More Stories By Sematext Blog

Sematext is a globally distributed organization that builds innovative Cloud and On Premises solutions for performance monitoring, alerting and anomaly detection (SPM), log management and analytics (Logsene), and search analytics (SSA). We also provide Search and Big Data consulting services and offer 24/7 production support for Solr and Elasticsearch.

Latest Stories
SYS-CON Events announced today that Loom Systems will exhibit at SYS-CON's 20th International Cloud Expo®, which will take place on June 6-8, 2017, at the Javits Center in New York City, NY. Founded in 2015, Loom Systems delivers an advanced AI solution to predict and prevent problems in the digital business. Loom stands alone in the industry as an AI analysis platform requiring no prior math knowledge from operators, leveraging the existing staff to succeed in the digital era. With offices in S...
SYS-CON Events announced today that Interoute, owner-operator of one of Europe's largest networks and a global cloud services platform, has been named “Bronze Sponsor” of SYS-CON's 20th Cloud Expo, which will take place on June 6-8, 2017 at the Javits Center in New York, New York. Interoute is the owner-operator of one of Europe's largest networks and a global cloud services platform which encompasses 12 data centers, 14 virtual data centers and 31 colocation centers, with connections to 195 add...
SYS-CON Events announced today that HTBase will exhibit at SYS-CON's 20th International Cloud Expo®, which will take place on June 6-8, 2017, at the Javits Center in New York City, NY. HTBase (Gartner 2016 Cool Vendor) delivers a Composable IT infrastructure solution architected for agility and increased efficiency. It turns compute, storage, and fabric into fluid pools of resources that are easily composed and re-composed to meet each application’s needs. With HTBase, companies can quickly prov...
SYS-CON Events announced today that CA Technologies has been named “Platinum Sponsor” of SYS-CON's 20th International Cloud Expo®, which will take place on June 6-8, 2017, at the Javits Center in New York City, NY, and the 21st International Cloud Expo®, which will take place October 31-November 2, 2017, at the Santa Clara Convention Center in Santa Clara, CA. CA Technologies helps customers succeed in a future where every business – from apparel to energy – is being rewritten by software. From ...
What if you could build a web application that could support true web-scale traffic without having to ever provision or manage a single server? Sounds magical, and it is! In his session at 20th Cloud Expo, Chris Munns, Senior Developer Advocate for Serverless Applications at Amazon Web Services, will show how to build a serverless website that scales automatically using services like AWS Lambda, Amazon API Gateway, and Amazon S3. We will review several frameworks that can help you build serverle...
SYS-CON Events announced today that SoftLayer, an IBM Company, has been named “Gold Sponsor” of SYS-CON's 18th Cloud Expo, which will take place on June 7-9, 2016, at the Javits Center in New York, New York. SoftLayer, an IBM Company, provides cloud infrastructure as a service from a growing number of data centers and network points of presence around the world. SoftLayer’s customers range from Web startups to global enterprises.
Culture is the most important ingredient of DevOps. The challenge for most organizations is defining and communicating a vision of beneficial DevOps culture for their organizations, and then facilitating the changes needed to achieve that. Often this comes down to an ability to provide true leadership. As a CIO, are your direct reports IT managers or are they IT leaders? The hard truth is that many IT managers have risen through the ranks based on their technical skills, not their leadership abi...
The essence of cloud computing is that all consumable IT resources are delivered as services. In his session at 15th Cloud Expo, Yung Chou, Technology Evangelist at Microsoft, demonstrated the concepts and implementations of two important cloud computing deliveries: Infrastructure as a Service (IaaS) and Platform as a Service (PaaS). He discussed from business and technical viewpoints what exactly they are, why we care, how they are different and in what ways, and the strategies for IT to transi...
After more than five years of DevOps, definitions are evolving, boundaries are expanding, ‘unicorns’ are no longer rare, enterprises are on board, and pundits are moving on. Can we now look at an evolution of DevOps? Should we? Is the foundation of DevOps ‘done’, or is there still too much left to do? What is mature, and what is still missing? What does the next 5 years of DevOps look like? In this Power Panel at DevOps Summit, moderated by DevOps Summit Conference Chair Andi Mann, panelists l...
Web Real-Time Communication APIs have quickly revolutionized what browsers are capable of. In addition to video and audio streams, we can now bi-directionally send arbitrary data over WebRTC's PeerConnection Data Channels. With the advent of Progressive Web Apps and new hardware APIs such as WebBluetooh and WebUSB, we can finally enable users to stitch together the Internet of Things directly from their browsers while communicating privately and securely in a decentralized way.
All organizations that did not originate this moment have a pre-existing culture as well as legacy technology and processes that can be more or less amenable to DevOps implementation. That organizational culture is influenced by the personalities and management styles of Executive Management, the wider culture in which the organization is situated, and the personalities of key team members at all levels of the organization. This culture and entrenched interests usually throw a wrench in the work...
Keeping pace with advancements in software delivery processes and tooling is taxing even for the most proficient organizations. Point tools, platforms, open source and the increasing adoption of private and public cloud services requires strong engineering rigor - all in the face of developer demands to use the tools of choice. As Agile has settled in as a mainstream practice, now DevOps has emerged as the next wave to improve software delivery speed and output. To make DevOps work, organization...
DevOps is often described as a combination of technology and culture. Without both, DevOps isn't complete. However, applying the culture to outdated technology is a recipe for disaster; as response times grow and connections between teams are delayed by technology, the culture will die. A Nutanix Enterprise Cloud has many benefits that provide the needed base for a true DevOps paradigm.
What sort of WebRTC based applications can we expect to see over the next year and beyond? One way to predict development trends is to see what sorts of applications startups are building. In his session at @ThingsExpo, Arin Sime, founder of WebRTC.ventures, will discuss the current and likely future trends in WebRTC application development based on real requests for custom applications from real customers, as well as other public sources of information,
Historically, some banking activities such as trading have been relying heavily on analytics and cutting edge algorithmic tools. The coming of age of powerful data analytics solutions combined with the development of intelligent algorithms have created new opportunities for financial institutions. In his session at 20th Cloud Expo, Sebastien Meunier, Head of Digital for North America at Chappuis Halder & Co., will discuss how these tools can be leveraged to develop a lasting competitive advanta...