Elasticsearch connector failed to connect to Kafka cluster

Keywords: Kafka - Google Cloud Platform - Technical issue - Other
Description:
Hello, I’m using elasticsearch sink connector to ingest data to my elasticsearch from secured Apache Kafka. The Kafka cluster is deployed using Bitnami image in GCP. Using the following security configuration (see below) with KSQL server and Golang SDK (Sarama), I have no issue connecting, publish and consume the data. However with elasticsearch sink connector, using the same security configuration it will spit the following message infinitely.

WARN [Consumer clientId=connector-consumer-elasticsearch-sink-0, groupId=connect-elasticsearch-sink] Bootstrap broker kafka-cluster-sk-kafka-1.c.data-analytics-mizal.internal:9092 (id: -2 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1019)

And the Kafka server log will stated

Failed authentication with /10.148.0.2 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

I followed the confluent documentation to also specifying the security configuration in the connector configuration properties itself and prefix it with the ‘consumer’ but still the same failure.

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username=“user”
password=“xxxxxxx”;

Does anyone have any idea how to fix or debug this issue?

Hi, i suffer from almost the same.
Connector to BigQuery.
Any solution, or new approach?

Hi @hasmizal,
We have a Support Tool that will gather relevant information for us to analyze your configuration and logs. Could you please download and execute it on the machine where the stack is running by following the steps described in the guide below?

How to Run the Bitnami Support Tool

Please note that you need to paste the code ID that is shown at the end.

I think i cannot use bitnami support tool because i use multi tier.
But anyway.
Two cases:
-Run connect with built-in connectors; (2684de18-75cb-4ddd-ffb4-35525056742e)
-Run connect with wepay connector. (2f755e8f-2d91-1366-7996-92b672ebcee9)

Neither can’t run a producer. At least wepay connector work, tested with on Rest api, Running State, yet producer not work.

Hi @joaquimcardoso1995,
We cannot determine the problem with the information provided, could you please share logs and all relevant information?
Also I found a guide to setup BigQuery on kafka, can you take a look? Maybe it can help you https://docs.confluent.io/current/connect/kafka-connect-bigquery/index.html

When i arrive at the end of day i will share the logs.

Do you think that if i remove all firewall rulles and allow all traffic with one rule can cause this problem?
I only have a rule that allow all traffic to all ports.

I know that exist some properties that haven’t any effect.
Alter from localhost to kafka-cluster-3-kafka-0:9092, also don’t work.

The execption is like :
[2019-10-28 23:18:53,337] DEBUG [AdminClient clientId=adminclient-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:882)
[2019-10-28 23:18:53,337] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:748)
[2019-10-28 23:18:53,840] DEBUG [AdminClient clientId=adminclient-1] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)
[2019-10-28 23:18:53,841] DEBUG Set SASL client state to SEND_APIVERSIONS_REQUEST (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator:351)
[2019-10-28 23:18:53,841] DEBUG Creating SaslClient: client=null;service=kafka;serviceHostname=localhost;mechs=[PLAIN] (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator:180)
[2019-10-28 23:18:53,848] DEBUG [AdminClient clientId=adminclient-1] Connection with localhost/127.0.0.1 disconnected (org.apache.kafka.common.network.Selector:607)
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:216)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:531)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152)
at java.lang.Thread.run(Thread.java:748)

Log_configs.zip (66.6 KB)

So to enquadrate, by steps:

  • I go to GCP Bitnami Kafka Launcher.
  • Changed kafka instances to 3 and zookeeper instances to 3, SSD size to 10 GB.
  • SSH to kafka-X-instance-X via GCE.
  • Run the export of java auth
  • Go to /opt/binami/kafka/bin and run ./connect-distributed.sh …/conf/connect-distributed.properties
  • OR to /opt/binami/kafka/bin and run ./connect-standalone.sh …/conf/connect-standalone.properties …/conf/connect-file-source.properties

And simplily don’t work, is any step i forget? Any other config or change that needed to be done?

Hi @joaquimcardoso1995,

After export the java auth with:

export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/bitnami/kafka/conf/kafka_jaas.conf"

The command you provided:

./connect-distributed.sh ../conf/connect-distributed.properties

Fails with:

[2019-10-29 15:14:38,364] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed:84)
org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
        at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
        at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:45)
        at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:95)
        at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:58)
        ... 3 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

Is this the error you are getting?

I forget to tell uou to copy sasl configs inside server.properties to connect-distributed.properties

Hi @joaquimcardoso1995,
Are you using a guide or something to do that? How do you generate the sasl config?
I am trying to replicate the issue but cannot find that configuration.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.