# Kafka Trigger

{% hint style="warning" %}
Para utilizar este *trigger* é necessário entrar em contato com nossa Equipe de Suporte para obter a liberação.
{% endhint %}

O **Kafka Trigger** é responsável pelo consumo das mensagens de um *broker* Kafka.

## Paramêtros

Dê uma olhada nas opções de configuração do componente. Parâmetros suportados por [expressões *Double Braces*](/documentation/connectors-and-triggers/pt-br/double-braces/overview.md) estão marcados com `(DB)`.

<table data-full-width="true"><thead><tr><th>Parâmetro</th><th width="258">Descrição</th><th>Valor padrão</th><th>Tipo de dado</th></tr></thead><tbody><tr><td><strong>Account</strong></td><td>Nome da conta que será utilizada.</td><td>N/A</td><td><em>String</em></td></tr><tr><td><strong>Truststore</strong></td><td>Caso seja necessário informar uma <em>truststore</em> para realizar o SSL Handshake usando certificados privados, crie uma conta do tipo <em>CERTIFICATE-CHAIN</em> e informe os certificados concatenados. No campo "password" é opcional inserir a senha a ser cadastrada na criação da <em>truststore</em>.</td><td>N/A</td><td><em>String</em></td></tr><tr><td><strong>Keystore</strong></td><td>Caso seja necessário informar uma <em>keystore</em> para realizar a autenticação SSL mútua, crie uma conta do tipo <em>CERTIFICATE-CHAIN</em>, informe a cadeia completa com os certificados concatenados e a chave privada a ser utilizada para a autenticação SSL mútua. Caso exista uma chave privada, é necessário informá-la no campo "password".</td><td>N/A</td><td><em>String</em></td></tr><tr><td><strong>Brokers</strong></td><td><em>Brokers</em> do servidor (HOST: PORT) usados para o envio de registros. Para informar múltiplos HOSTS, você pode separá-los por vírgula. Exemplo: HOST1:PORT1,HOST2:PORT2,...,HOSTn:PORTn.</td><td>kafka-test.godigibee.io:9092</td><td><em>String</em></td></tr><tr><td><strong>Topic</strong></td><td>Nome do tópico que recupera os registros.</td><td>topic-digibee</td><td><em>String</em></td></tr><tr><td><strong>Protocol</strong></td><td>Protocolo utilizado para se comunicar com os <em>brokers</em>.</td><td>SASL SSL</td><td><em>String</em></td></tr><tr><td><strong>Consumer Group Name</strong></td><td>Um <em>string</em> único que identifica o grupo de consumidor ao qual esse consumidor pertence.</td><td>digibee-new</td><td><em>String</em></td></tr><tr><td><strong>Auto Commit</strong></td><td>Se "true", a mensagem passará automaticamente por <em>commit</em> assim que for recebida pelo <em>trigger</em>; do contrário, o <em>trigger</em> vai efetuar o <em>commit</em> manualmente após a confirmação de processamento do <em>pipeline</em>.</td><td><em>False</em></td><td>Booleano</td></tr><tr><td><strong>Send Batch</strong></td><td>Só pode ser utilizado com <em>autoCommit</em> - se "true", um <em>poll</em> de mais de 1 mensagem será enviado como <em>array</em>; do contrário, será enviada 1 mensagem por vez.</td><td><em>True</em></td><td>Booleano</td></tr><tr><td><strong>Key As Avro</strong></td><td>Se "true", as keys dos registros recebidos serão interpretadas no formato Avro, do contrário, serão interpretadas como String.</td><td><em>False</em></td><td>Booleano</td></tr><tr><td><strong>Payload As Avro</strong></td><td>Se "true", os payloads (values) dos registros recebidos serão interpretados no formato Avro, do contrário, serão interpretados como String.</td><td><em>False</em></td><td>Booleano</td></tr><tr><td><strong>Schema Registry URL</strong></td><td>Caso ao menos uma das opções Key As Avro e Payload As Avro for ativada, o campo será exibido para que seja informada a URL do Schema Registry.</td><td>N/A</td><td><em>String</em></td></tr><tr><td><strong>Schema Registry Account</strong></td><td>Account para autenticação com o Schema Registry (account Basic ou Oauth-Bearer).</td><td>N/A</td><td><em>String</em></td></tr><tr><td><strong>Schema Registry Truststore</strong></td><td>Caso seja necessário informar uma truststore para realizar o SSL Handshake utilizando certificados privados, deve-se criar uma conta do tipo CERTIFICATE-CHAIN e informar os certificados concatenados. No campo "password" é opcional inserir a senha a ser cadastrada na criação da truststore.</td><td>N/A</td><td><em>String</em></td></tr><tr><td><strong>Schema Registry Keystore</strong></td><td>Caso seja necessário informar uma keystore para realizar a autenticação SSL mútua, deve-se criar uma conta do tipo CERTIFICATE-CHAIN, informar a cadeia completa com os certificados concatenados e a chave privada a ser utilizada para a autenticação SSL mútua. Caso exista uma chave privada, é necessário informá-la no campo "password".</td><td>N/A</td><td><em>String</em></td></tr><tr><td><strong>Max Poll Records</strong></td><td>Número máximo de registros recuperados por um <em>long poll</em>.</td><td>100</td><td>Inteiro</td></tr><tr><td><strong>Include Headers</strong></td><td>Se a opção estiver ativada, os cabeçalhos da mensagem serão incluídos no <em>payload</em> de entrada do <em>pipeline</em>.</td><td><em>False</em></td><td>Booleano</td></tr><tr><td><strong>Binary Headers</strong></td><td>Se a opção estiver ativada, os valores dos cabeçalhos de entrada serão considerados como binários e apresentados como uma representação base64. Essa opção será apresentada apenas quando <em>Include Headers</em> estiver ativado também.</td><td><em>False</em></td><td>Booleano</td></tr><tr><td><strong>Headers Charset</strong></td><td>Nome do código de caracteres para a codificação dos valores dos cabeçalhos (padrão UTF-8). Essa opção será apresentada apenas quando <em>Include Headers</em> estiver ativada também.</td><td>UTF-8</td><td><em>String</em></td></tr><tr><td><strong>Maximum Timeout</strong></td><td>Por quanto tempo um <em>pipeline</em> pode ser executado (em milissegundos).</td><td>30000</td><td>Inteiro</td></tr><tr><td><strong>Kerberos Service Name</strong></td><td>Valor definido na propriedade <em>sasl.kerberos.service.name</em> configurado no lado <em>server</em> do <em>broker</em> Kafka.</td><td><em>False</em></td><td>Booleano</td></tr><tr><td><strong>Partition Numbers</strong></td><td>Especifica os números das partições em que o <strong>Kafka Trigger</strong> consumirá mensagens. Pode-se configurar mais de uma partição e, caso essa propriedade não seja configurada, o <strong>Kafka Trigger</strong> irá consumir de todas as partições do tópico.</td><td>N/A</td><td>Inteiro</td></tr><tr><td><strong>Allow Redelivery Of Messages</strong></td><td>Se a opção estiver habilitada, permite que a mensagem seja reenviada caso o haja falha no <em>Pipeline Engine</em>. Aprenda mais na <a href="/spaces/cO0A6g1dOsu8BiHYqO67/pages/gVjlcNsebELvvnaD62MX">documentação sobre <em>Pipeline Engine</em>.</a></td><td><em>False</em></td><td>Booleano</td></tr></tbody></table>

{% hint style="warning" %}

* O suporte ao formato Avro está atualmente em [fase Beta](/documentation/developer-guide/pt-br/help-and-faq/beta-program.md). Como não faz parte do padrão usar o Kafka para transmitir mensagens grandes, não aceitamos mais de 5 MB de envio de mensagens por *poll.* Recomendamos que você use a propriedade (message.max.bytes) no *broker* para no máximo 1 MB. A capacidade de tráfego de dados no formato Avro também está incluída nesta limitação de tamanho.
* O Kafka Trigger Key e as configurações de carga útil devem corresponder às configurações dos tópicos a serem consumidos pelo Trigger: Se **Key As Avro** estiver habilitado, todas as chaves dos registros a serem consumidos deverão estar no formato Avro. Se **Payload As Avro** estiver habilitado, todas as cargas úteis (valores) de registros a serem consumidos deverão estar no formato Avro.
  {% endhint %}

## Estratégias de *offsets commit*&#x20;

Esse *trigger* possui 2 estratégias de *offsets commit* configuráveis:

### **1.** **Commit sem garantia de entrega**

Todas as mensagens recebidas pelo *trigger* são enviadas ao *pipeline* de forma mais rápida, porém sem garantia de entrega (ou seja, não será esperado o retorno do *pipeline* para que o processamento da mensagem seja confirmado).&#x20;

Com o *auto commit* ativado, utilizaremos o *commit default* implementado pelo Kafka. O envio de mensagem pode ser configurado por:

#### **Envio batch**

Todas as mensagens recebidas pelo *polling* do consumidor serão enviadas juntas em um *array*. Por exemplo, se durante esse *poll* forem retornadas 10 mensagens, então o *trigger* enviará um *array* com essas 10 mensagens.

#### **Envio de mensagem uma a uma**

Será feito um envio ao *pipeline* ao invés do *array* total (apenas 1 mensagem por vez). Por exemplo, se durante esse *poll* forem retornadas 10 mensagens, então o *trigger* enviará somente 1 mensagem por vez. No total, serão feitos 10 envios de mensagens ao *pipeline*.

### **2. Commit com garantia de entrega**

O *trigger* ficará responsável por realizar o *offsets commit*, que será feito após o recebimento de uma resposta de sucesso do *pipeline*. Somente o envio *batch* das mensagens é possível, através do qual todas as mensagens recebidas pelo polling do consumidor serão enviadas juntas em um *array*.

Por exemplo: se durante esse *poll* forem retornadas 10 mensagens, então o *trigger* enviará um *array* com essas 10 mensagens.

{% hint style="info" %}
Poderá ocorrer um rebalanceamento dos consumidores e/ou das partições do Kafka. Caso isso ocorra entre o retorno da resposta do *pipeline* ao *trigger*, os *offsets* vão receber o *commit*. Isso pode acarretar em perdas ou mensagens duplicadas.
{% endhint %}

#### **Autocommit "false" e Batch Mode "true"**

Nessa opção, o *poll* realizado pode trazer um *array* de mensagens e o seu tamanho máximo é estipulado pelo *Max Poll Records*. As mensagens passam por *commit* somente depois que o *pipeline* retornar a transação com sucesso. Se ocorrer *timeout* durante o processamento do *pipeline*, as mensagens não vão passar por *commit*.

#### **Autocommit "false" e Batch Mode "false"**

Nessa opção, o *poll* vai enviar apenas 1 mensagem e não um *array* de mensagens. Assim, o *throughput* de envio/recebimento de mensagens é diminuído, mas a garantia do processamento bem sucedido é maior - ou seja, não há perda de mensagens.

{% hint style="info" %}
Se houver rebalanceamento do Tópico no Broker do Kafka durante o processamento das mensagens e os consumidores tiverem que assumir outras partições, as mensagens vão passar por *commit* caso ocorra erro no término da execução do *pipeline*. Dessa maneira, as mensagens não vão ser processadas no *poll* seguinte.&#x20;

Para contornar esse problema, recorra às configurações ***Autocommit "false"*** e ***Batch Mode "false***".
{% endhint %}

## *Consumers*

A configuração de consumidores impacta diretamente no *throughput* de recebimento e saída de mensagens quando o Kafka Trigger é ativado. O cenário ideal de utilização é ter a mesma quantidade de *consumers* configurados e partições em determinado tópico.

Caso haja mais *consumers* do que partições, os *consumers* excedentes ficarão *idle* até ocorra um aumento de partições. E, se esse aumento ocorrer, Kafka vai iniciar o processo de rebalanceamento de *consumers*.

### Consumer Group <a href="#consumer-group" id="consumer-group"></a>

É o grupo de consumidores ao qual o seu *pipeline* vai fazer a subscrição no tópico do Kafka. Um tópico pode ter “n” *Consumer Groups* e cada um deles vai ter “n” consumidores que consomem os registros do tópico.

* **Cenário 1**

Digamos que exista um tópico chamado *kafka-topic*, um *pipeline* que utilize um *trigger* configurado com o *consumer group* (groupId) nomeado *digibee* e um segundo *pipeline* que utilize um *trigger* configurado com o mesmo tópico, mas com um *consumer group* (groupId) nomeado digibee-2. Nesse caso, ambos os *pipelines* receberão as mesmas mensagens.<br>

* **Cenário 2**

Digamos que exista um tópico chamado *kafka-topic*, um *pipeline* que utilize um *trigger* configurado com o *consumer group* (groupId) nomeado *digibee* e um segundo *pipeline* que utilize um *trigger* configurado com o mesmo tópico e *consumer group* (digibee). Ambos os *pipelines* vão receber as mensagens passadas por esse tópico. No entanto, o Kafka fica encarregado de fazer o balanceamento das partições entre os *consumers* cadastrados nos dois *triggers*. Nesse caso, ambos os *pipelines* vão receber mensagens de forma intercalada, de acordo com a distribuição das partições.

## Tecnologia <a href="#tecnologia" id="tecnologia"></a>

### **Autenticação usando Kerberos** <a href="#autenticao-usando-kerberos" id="autenticao-usando-kerberos"></a>

Para utilizar a autenticação via Kerberos no ***Kafka Trigger*** é necessário ter cadastrado o arquivo de configuração “krb5.conf” no parâmetro de *Realm*. Caso não tenha feito isso, acione o nosso suporte via *chat*. Após concluir esse passo, basta configurar corretamente uma conta do tipo Kerberos e utilizá-la no componente.

### Formato de mensagem na entrada do pipeline <a href="#h_7510e36b4b" id="h_7510e36b4b"></a>

*Pipelines* associados ao Kafka Trigger recebem a seguinte mensagem como entrada:

```
{
  "data": [
    {
      "data": <STRING conteúdo da mensagem>,
      "topic": <STRING O tópico do qual o registro é recebido>,
      "offset": <LONG A posição do registro na partição Kafka correspondente>,
      "partition": <INT A partição da qual o registro é recebido>,
      "success": <BOOLEAN Indica se a mensagem individual foi consumida com sucesso ou não>,
      "headers": {
          "header1": "value1", … (quando incluídos)
      }
    }
  ],
  "success": <BOOLEAN Indica se todas as mensagens foram consumidas com sucesso ou não>
}
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.digibee.com/documentation/connectors-and-triggers/pt-br/triggers/messaging-and-events/kafka.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
