Kafka

Discover more about the Kafka component and how to use it on the Digibee Integration Platform.

Kafka produces records to the Kafka brokers configured in it.

Parameters

Take a look at the configuration parameters of the component. Parameters supported by Double Braces expressions are marked with (DB).

ParameterDescriptionDefault valueData type

Kafka Authentication Account

If the Kafka server needs authentication, it will be necessary to create an account type Basic for this component. We also support authentication via Kerberos.

N/A

String

Truststore

If it’s necessary to inform a truststore to make the SSL Handshake using private certificates, a Certificate Chain type account must be created, and the concatenated certificates must be informed. It’s optional to inform the password to be registered in the truststore creation, in the password field.

N/A

String

Keystore

If it’s necessary to inform a keystore to make the mutual SSL authentication, a Certificate Chain type account must be created, the complete chain with the concatenated certificates and the private key to be used for the SSL mutual authentication must be informed. If there’s a private key, it’s necessary to inform it in the password field.

N/A

String

Brokers

Brokers of the server (HOST: PORT) used to send registers. To inform multiple HOSTS, you can separate them by comma. Example: HOST1:PORT1,HOST2:PORT2,...,HOSTn:PORTn

N/A

String

Security Protocol

The way the connection is established. It's optional to use a security channel (SSL) and an authentication channel (SASL). The use of both (SASL_SSL) is also possible.

SSL

String

Topic Name

Kafka's topic name.

{{ DEFAULT(message.topic, "new-topic") }}

String

Schema Registry URL

If at least one of the options Headers By Avro Schema, Payload As Avro, and Partition Key As Avro is enabled, the field will be shown to configure the Schema Registry's URL.

N/A

String

Schema Registry Account

Account to authenticate with Schema Registry.

N/A

N/A

Schema Registry Truststore

If it’s necessary to inform a truststore to make the SSL Handshake using private certificates, a Certificate Chain type account must be created, and the concatenated certificates must be informed. It’s optional to inform the password to be registered in the truststore creation, in the password field.

N/A

N/A

Schema Registry Keystore

If it’s necessary to inform a keystore to make the mutual SSL authentication, a Certificate Chain type account must be created, the complete chain with the concatenated certificates and the private key to be used for the SSL mutual authentication must be informed. If there’s a private key, it’s necessary to inform it in the password field.

N/A

N/A

Headers

Set of key-value inputs, with headers to be sent in the message (optional field).

N/A

Key-value

Binary Headers

If the option is active, the header values are considered binary and are interpreted as a string with the base64 representation; otherwise, the header values are interpreted as text.

False

Boolean

Headers By Avro Schema

If the option is active, the component will validate the Headers based on an Avro Schema before sending the Headers.

False

Boolean

Headers Schema

If the option Headers By Avro Schema is active, the field will be shown to set the Headers Schemas to be validated.

N/A

N/A

Headers Charset

Name of the character's code for the header values codification (standard UTF-8).

UTF-8

String

Payload

Payload to be dispatched.

{{ message.$ }}

String

Payload As Avro

If the option is active, the component will send the payload in Avro format.

False

Boolean

Payload Schema

This field is available only if the option Payload As Avro is active and informs the Payload Schema to be validated.

N/A

N/A

Request Timeout

Configuration that controls the maximum time (in milliseconds) that the client waits for the response of an inquiry. If the response isn't received before the maximum time elapses, the inquiry is automatically resent. Otherwise, there'll be an error if the retries are exhausted.

60000

Integer

Retries

If a value different than 0 (zero) is established, any register whose dispatch fails will be resent. These registers might be resent with a probable transient error.

N/A

Integer

Metadata Timeout

Maximum time to the Kafka register dispatch.

5000

Integer

Key Strategy

If the option Partition Key As Avro is active, the field will be shown to inform the subject to be used to construct the subject name for message keys.

N/A

String

Value Strategy

If the option Payload as Avro is enabled, the field will be shown to inform the subject to be used to construct the subject name for message values.

N/A

String

Fail On Error

If the option is active, the execution of the pipeline with error will be interrupted; otherwise, the pipeline execution proceeds, but the result will show a false value for the “success” property.

False

Boolean

Advanced Settings

If the option is active, you can access the following configurations:

False

Boolean

Kerberos Service Name

Value defined in the sasl.kerberos.service.name property configured in the Kafka broker server side.

N/A

String

Partition Number

Specifies the numbers of the partition where Kafka Trigger will send the messages to. If the property isn’t configured, the Kafka server will be responsible for deciding which topic partition the message will be sent to.

N/A

Integer

Partition Key

A partition key can be specified to indicate the partition where the message will be sent to. If the field isn’t filled, a partitioner based on hashing is used to determine the partition id given to each key.

N/A

String

Partition Key As Avro

If the option is enabled, the component will send the partition key in Avro format.

N/A

Boolean

Partition Key Schema

If the option Partition Key As Avro is active, the field will be shown to inform the Partition Key Schema to be validated.

N/A

String

Producer Client Name

Origin identifier of the requests (optional).

N/A

String

ACKS

Configuration for acknowledging the message receipt by the Kafka broker (values: 0, 1, or ALL).

1

Integer

Use Dynamic Account

When the option is active, the account will be used dynamically. Set the account name to allow for flexible usage. When deactivated, the account name will be static.

False

Boolean

Account Name

The name of the Basic account that is generated dynamically via the Store Account component.

N/A

String

Scoped

When the option is active, the stored account is isolated to other sub-process. In that case, sub-processes will see their own version of the stored account data.

N/A

Boolean

Truststore Account Name

The name of the truststore account that is generated dynamically via the Store Account component. Supported accounts: Certificate Chain.

N/A

String

Keystore Account Name

The name of the keystore account that is generated dynamically via the Store Account component. Supported accounts: Certificate Chain.

N/A

String

Account Name for Registry

The name of the account for the registry that is generated dynamically via the Store Account component. Supported accounts: Basic or Oauth Bearer Token.

N/A

String

Truststore Account Name for Registry

The name of the truststore account for the registry that is generated dynamically via the Store Account component. Supported accounts: Certificate Chain.

N/A

String

Keystore Account Name for Registry

The name of the keystore account for the registry that is generated dynamically via the Store Account component. Supported accounts: Certificate Chain.

N/A

String

Important information:

  • Due to the need of a great memory allocation, we don't support the following types of Security Protocol: PLAINTEXT and SASL_PLAINTEXT. For more information, visit the external Apache Kafka documentation.

  • The messages sent in Avro format must be of the maximum size supported by Pipelines SMALL, MEDIUM and LARGE. The component does not support extreme reading scenarios of mega/giga/tera/peta bytes. The Avro format support is currently in Beta phase.

  • Currently, the Use Dynamic Account, Account Name, Scoped, Truststore Account Name, Keystore Account Name, Account Name for Registry, Truststore Account Name for Registry, and Keystore Account Name for Registry parameters can only be used in Pipeline Engine v2 and are only available in the Restricted Beta phase. To learn more about it, read the article Beta program

Example of request response to Kafka

{
  "message": "{}",
  "offset": 201,
  "timestamp": 1585168528773,
  "serializedKeySize": -1,
  "serializedValueSize": 2,
  "topic": "Welcome-Kafka",
  "partition": 1,
  "success": true
}
  • message: message sent.

  • offset: offset of the record in the topic/partition.

  • timestamp: time stamp of the record in the topic/partition.

  • serializedKeySize: size of the serialized key, uncompressed in bytes. If the value is null, the returned size is -1.

  • serializedValueSize: size of the serialized value, uncompressed in bytes. If the value is null, the returned size is -1.

  • topic: name of the topic.

  • partition: partition the record was sent to.

  • success: if "true", the dispatch was successfully made.

Messages flow

Input

The component accepts any input message and can use it through Double Braces.

Output

The component doesn't change any information of the input message. Therefore, it's returned to the following component or it's used as final answer if this component is the last step of the pipeline.

Kafka in Action

Authentication using SSL or SASL

That allows the authentication of your producers and clients to the Kafka cluster (identity verification). This is also a secure way to allow your clients to confirm their identity.

Authentication using Kerberos

To use the authentication via Kerberos in Kafka is necessary to have registered the configuration file “krb5.conf” in the Realm parameter. If you haven't done it yet, get in touch with us by the chat service. After finishing this step, all you have to do is to correctly set a Kerberos-type account and use it in the component.

Last updated