Kafka Trigger

Descubra mais sobre o Kafka Trigger e saiba como utilizá-lo na Digibee Integration Platform.

Para utilizar este trigger é necessário entrar em contato com nossa Equipe de Suporte para obter a liberação.

O Kafka Trigger é responsável pelo consumo das mensagens de um broker Kafka.

Paramêtros

Dê uma olhada nas opções de configuração do componente. Parâmetros suportados por expressões Double Braces estão marcados com (DB).

Parâmetro
Descrição
Valor padrão
Tipo de dado

Account

Nome da conta que será utilizada.

N/A

String

Truststore

Caso seja necessário informar uma truststore para realizar o SSL Handshake usando certificados privados, crie uma conta do tipo CERTIFICATE-CHAIN e informe os certificados concatenados. No campo "password" é opcional inserir a senha a ser cadastrada na criação da truststore.

N/A

String

Keystore

Caso seja necessário informar uma keystore para realizar a autenticação SSL mútua, crie uma conta do tipo CERTIFICATE-CHAIN, informe a cadeia completa com os certificados concatenados e a chave privada a ser utilizada para a autenticação SSL mútua. Caso exista uma chave privada, é necessário informá-la no campo "password".

N/A

String

Brokers

Brokers do servidor (HOST: PORT) usados para o envio de registros. Para informar múltiplos HOSTS, você pode separá-los por vírgula. Exemplo: HOST1:PORT1,HOST2:PORT2,...,HOSTn:PORTn.

kafka-test.godigibee.io:9092

String

Topic

Nome do tópico que recupera os registros.

topic-digibee

String

Protocol

Protocolo utilizado para se comunicar com os brokers.

SASL SSL

String

Consumer Group Name

Um string único que identifica o grupo de consumidor ao qual esse consumidor pertence.

digibee-new

String

Auto Commit

Se "true", a mensagem passará automaticamente por commit assim que for recebida pelo trigger; do contrário, o trigger vai efetuar o commit manualmente após a confirmação de processamento do pipeline.

False

Booleano

Send Batch

Só pode ser utilizado com autoCommit - se "true", um poll de mais de 1 mensagem será enviado como array; do contrário, será enviada 1 mensagem por vez.

True

Booleano

Key As Avro

Se "true", as keys dos registros recebidos serão interpretadas no formato Avro, do contrário, serão interpretadas como String.

False

Booleano

Payload As Avro

Se "true", os payloads (values) dos registros recebidos serão interpretados no formato Avro, do contrário, serão interpretados como String.

False

Booleano

Schema Registry URL

Caso ao menos uma das opções Key As Avro e Payload As Avro for ativada, o campo será exibido para que seja informada a URL do Schema Registry.

N/A

String

Schema Registry Account

Account para autenticação com o Schema Registry (account Basic ou Oauth-Bearer).

N/A

String

Schema Registry Truststore

Caso seja necessário informar uma truststore para realizar o SSL Handshake utilizando certificados privados, deve-se criar uma conta do tipo CERTIFICATE-CHAIN e informar os certificados concatenados. No campo "password" é opcional inserir a senha a ser cadastrada na criação da truststore.

N/A

String

Schema Registry Keystore

Caso seja necessário informar uma keystore para realizar a autenticação SSL mútua, deve-se criar uma conta do tipo CERTIFICATE-CHAIN, informar a cadeia completa com os certificados concatenados e a chave privada a ser utilizada para a autenticação SSL mútua. Caso exista uma chave privada, é necessário informá-la no campo "password".

N/A

String

Max Poll Records

Número máximo de registros recuperados por um long poll.

100

Inteiro

Include Headers

Se a opção estiver ativada, os cabeçalhos da mensagem serão incluídos no payload de entrada do pipeline.

False

Booleano

Binary Headers

Se a opção estiver ativada, os valores dos cabeçalhos de entrada serão considerados como binários e apresentados como uma representação base64. Essa opção será apresentada apenas quando Include Headers estiver ativado também.

False

Booleano

Headers Charset

Nome do código de caracteres para a codificação dos valores dos cabeçalhos (padrão UTF-8). Essa opção será apresentada apenas quando Include Headers estiver ativada também.

UTF-8

String

Maximum Timeout

Por quanto tempo um pipeline pode ser executado (em milissegundos).

30000

Inteiro

Kerberos Service Name

Valor definido na propriedade sasl.kerberos.service.name configurado no lado server do broker Kafka.

False

Booleano

Partition Numbers

Especifica os números das partições em que o Kafka Trigger consumirá mensagens. Pode-se configurar mais de uma partição e, caso essa propriedade não seja configurada, o Kafka Trigger irá consumir de todas as partições do tópico.

N/A

Inteiro

Allow Redelivery Of Messages

False

Booleano

  • O suporte ao formato Avro está atualmente em fase Beta. Como não faz parte do padrão usar o Kafka para transmitir mensagens grandes, não aceitamos mais de 5 MB de envio de mensagens por poll. Recomendamos que você use a propriedade (message.max.bytes) no broker para no máximo 1 MB. A capacidade de tráfego de dados no formato Avro também está incluída nesta limitação de tamanho.

  • O Kafka Trigger Key e as configurações de carga útil devem corresponder às configurações dos tópicos a serem consumidos pelo Trigger: Se Key As Avro estiver habilitado, todas as chaves dos registros a serem consumidos deverão estar no formato Avro. Se Payload As Avro estiver habilitado, todas as cargas úteis (valores) de registros a serem consumidos deverão estar no formato Avro.

Estratégias de offsets commit

Esse trigger possui 2 estratégias de offsets commit configuráveis:

1. Commit sem garantia de entrega

Todas as mensagens recebidas pelo trigger são enviadas ao pipeline de forma mais rápida, porém sem garantia de entrega (ou seja, não será esperado o retorno do pipeline para que o processamento da mensagem seja confirmado).

Com o auto commit ativado, utilizaremos o commit default implementado pelo Kafka. O envio de mensagem pode ser configurado por:

Envio batch

Todas as mensagens recebidas pelo polling do consumidor serão enviadas juntas em um array. Por exemplo, se durante esse poll forem retornadas 10 mensagens, então o trigger enviará um array com essas 10 mensagens.

Envio de mensagem uma a uma

Será feito um envio ao pipeline ao invés do array total (apenas 1 mensagem por vez). Por exemplo, se durante esse poll forem retornadas 10 mensagens, então o trigger enviará somente 1 mensagem por vez. No total, serão feitos 10 envios de mensagens ao pipeline.

2. Commit com garantia de entrega

O trigger ficará responsável por realizar o offsets commit, que será feito após o recebimento de uma resposta de sucesso do pipeline. Somente o envio batch das mensagens é possível, através do qual todas as mensagens recebidas pelo polling do consumidor serão enviadas juntas em um array.

Por exemplo: se durante esse poll forem retornadas 10 mensagens, então o trigger enviará um array com essas 10 mensagens.

Poderá ocorrer um rebalanceamento dos consumidores e/ou das partições do Kafka. Caso isso ocorra entre o retorno da resposta do pipeline ao trigger, os offsets vão receber o commit. Isso pode acarretar em perdas ou mensagens duplicadas.

Autocommit "false" e Batch Mode "true"

Nessa opção, o poll realizado pode trazer um array de mensagens e o seu tamanho máximo é estipulado pelo Max Poll Records. As mensagens passam por commit somente depois que o pipeline retornar a transação com sucesso. Se ocorrer timeout durante o processamento do pipeline, as mensagens não vão passar por commit.

Autocommit "false" e Batch Mode "false"

Nessa opção, o poll vai enviar apenas 1 mensagem e não um array de mensagens. Assim, o throughput de envio/recebimento de mensagens é diminuído, mas a garantia do processamento bem sucedido é maior - ou seja, não há perda de mensagens.

Se houver rebalanceamento do Tópico no Broker do Kafka durante o processamento das mensagens e os consumidores tiverem que assumir outras partições, as mensagens vão passar por commit caso ocorra erro no término da execução do pipeline. Dessa maneira, as mensagens não vão ser processadas no poll seguinte.

Para contornar esse problema, recorra às configurações Autocommit "false" e Batch Mode "false".

Consumers

A configuração de consumidores impacta diretamente no throughput de recebimento e saída de mensagens quando o Kafka Trigger é ativado. O cenário ideal de utilização é ter a mesma quantidade de consumers configurados e partições em determinado tópico.

Caso haja mais consumers do que partições, os consumers excedentes ficarão idle até ocorra um aumento de partições. E, se esse aumento ocorrer, Kafka vai iniciar o processo de rebalanceamento de consumers.

Consumer Group

É o grupo de consumidores ao qual o seu pipeline vai fazer a subscrição no tópico do Kafka. Um tópico pode ter “n” Consumer Groups e cada um deles vai ter “n” consumidores que consomem os registros do tópico.

  • Cenário 1

Digamos que exista um tópico chamado kafka-topic, um pipeline que utilize um trigger configurado com o consumer group (groupId) nomeado digibee e um segundo pipeline que utilize um trigger configurado com o mesmo tópico, mas com um consumer group (groupId) nomeado digibee-2. Nesse caso, ambos os pipelines receberão as mesmas mensagens.

  • Cenário 2

Digamos que exista um tópico chamado kafka-topic, um pipeline que utilize um trigger configurado com o consumer group (groupId) nomeado digibee e um segundo pipeline que utilize um trigger configurado com o mesmo tópico e consumer group (digibee). Ambos os pipelines vão receber as mensagens passadas por esse tópico. No entanto, o Kafka fica encarregado de fazer o balanceamento das partições entre os consumers cadastrados nos dois triggers. Nesse caso, ambos os pipelines vão receber mensagens de forma intercalada, de acordo com a distribuição das partições.

Tecnologia

Autenticação usando Kerberos

Para utilizar a autenticação via Kerberos no Kafka Trigger é necessário ter cadastrado o arquivo de configuração “krb5.conf” no parâmetro de Realm. Caso não tenha feito isso, acione o nosso suporte via chat. Após concluir esse passo, basta configurar corretamente uma conta do tipo Kerberos e utilizá-la no componente.

Formato de mensagem na entrada do pipeline

Pipelines associados ao Kafka Trigger recebem a seguinte mensagem como entrada:

{
  "data": [
    {
      "data": <STRING conteúdo da mensagem>,
      "topic": <STRING O tópico do qual o registro é recebido>,
      "offset": <LONG A posição do registro na partição Kafka correspondente>,
      "partition": <INT A partição da qual o registro é recebido>,
      "success": <BOOLEAN Indica se a mensagem individual foi consumida com sucesso ou não>,
      "headers": {
          "header1": "value1", … (quando incluídos)
      }
    }
  ],
  "success": <BOOLEAN Indica se todas as mensagens foram consumidas com sucesso ou não>
}

Atualizado