Kafka Trigger
Descubra mais sobre o Kafka Trigger e saiba como utilizá-lo na Digibee Integration Platform.
Para utilizar este trigger é necessário entrar em contato com nossa Equipe de Suporte para obter a liberação.
O Kafka Trigger é responsável pelo consumo das mensagens de um broker Kafka.
Paramêtros
Dê uma olhada nas opções de configuração do componente. Parâmetros suportados por expressões Double Braces estão marcados com (DB)
.
Parâmetro | Descrição | Valor padrão | Tipo de dado |
---|---|---|---|
Account | Nome da conta que será utilizada. | N/A | String |
Truststore | Caso seja necessário informar uma truststore para realizar o SSL Handshake usando certificados privados, crie uma conta do tipo CERTIFICATE-CHAIN e informe os certificados concatenados. No campo "password" é opcional inserir a senha a ser cadastrada na criação da truststore. | N/A | String |
Keystore | Caso seja necessário informar uma keystore para realizar a autenticação SSL mútua, crie uma conta do tipo CERTIFICATE-CHAIN, informe a cadeia completa com os certificados concatenados e a chave privada a ser utilizada para a autenticação SSL mútua. Caso exista uma chave privada, é necessário informá-la no campo "password". | N/A | String |
Brokers | Brokers do servidor (HOST: PORT) usados para o envio de registros. Para informar múltiplos HOSTS, você pode separá-los por vírgula. Exemplo: HOST1:PORT1,HOST2:PORT2,...,HOSTn:PORTn. | kafka-test.godigibee.io:9092 | String |
Topic | Nome do tópico que recupera os registros. | topic-digibee | String |
Protocol | Protocolo utilizado para se comunicar com os brokers. | SASL SSL | String |
Consumer Group Name | Um string único que identifica o grupo de consumidor ao qual esse consumidor pertence. | digibee-new | String |
Auto Commit | Se "true", a mensagem passará automaticamente por commit assim que for recebida pelo trigger; do contrário, o trigger vai efetuar o commit manualmente após a confirmação de processamento do pipeline. | False | Booleano |
Send Batch | Só pode ser utilizado com autoCommit - se "true", um poll de mais de 1 mensagem será enviado como array; do contrário, será enviada 1 mensagem por vez. | True | Booleano |
Key As Avro | Se "true", as keys dos registros recebidos serão interpretadas no formato Avro, do contrário, serão interpretadas como String. | False | Booleano |
Payload As Avro | Se "true", os payloads (values) dos registros recebidos serão interpretados no formato Avro, do contrário, serão interpretados como String. | False | Booleano |
Schema Registry URL | Caso ao menos uma das opções Key As Avro e Payload As Avro for ativada, o campo será exibido para que seja informada a URL do Schema Registry. | N/A | String |
Schema Registry Account | Account para autenticação com o Schema Registry (account Basic ou Oauth-Bearer). | N/A | String |
Schema Registry Truststore | Caso seja necessário informar uma truststore para realizar o SSL Handshake utilizando certificados privados, deve-se criar uma conta do tipo CERTIFICATE-CHAIN e informar os certificados concatenados. No campo "password" é opcional inserir a senha a ser cadastrada na criação da truststore. | N/A | String |
Schema Registry Keystore | Caso seja necessário informar uma keystore para realizar a autenticação SSL mútua, deve-se criar uma conta do tipo CERTIFICATE-CHAIN, informar a cadeia completa com os certificados concatenados e a chave privada a ser utilizada para a autenticação SSL mútua. Caso exista uma chave privada, é necessário informá-la no campo "password". | N/A | String |
Max Poll Records | Número máximo de registros recuperados por um long poll. | 100 | Inteiro |
Include Headers | Se a opção estiver ativada, os cabeçalhos da mensagem serão incluídos no payload de entrada do pipeline. | False | Booleano |
Binary Headers | Se a opção estiver ativada, os valores dos cabeçalhos de entrada serão considerados como binários e apresentados como uma representação base64. Essa opção será apresentada apenas quando Include Headers estiver ativado também. | False | Booleano |
Headers Charset | Nome do código de caracteres para a codificação dos valores dos cabeçalhos (padrão UTF-8). Essa opção será apresentada apenas quando Include Headers estiver ativada também. | UTF-8 | String |
Maximum Timeout | Por quanto tempo um pipeline pode ser executado (em milissegundos). | 30000 | Inteiro |
Kerberos Service Name | Valor definido na propriedade sasl.kerberos.service.name configurado no lado server do broker Kafka. | False | Booleano |
Partition Numbers | Especifica os números das partições em que o Kafka Trigger consumirá mensagens. Pode-se configurar mais de uma partição e, caso essa propriedade não seja configurada, o Kafka Trigger irá consumir de todas as partições do tópico. | N/A | Inteiro |
Allow Redelivery Of Messages | Se a opção estiver habilitada, permite que a mensagem seja reenviada caso o haja falha no Pipeline Engine. Aprenda mais na documentação sobre Pipeline Engine. | False | Booleano |
O suporte ao formato Avro está atualmente em fase Beta. Como não faz parte do padrão usar o Kafka para transmitir mensagens grandes, não aceitamos mais de 5 MB de envio de mensagens por poll. Recomendamos que você use a propriedade (message.max.bytes) no broker para no máximo 1 MB. A capacidade de tráfego de dados no formato Avro também está incluída nesta limitação de tamanho.
O Kafka Trigger Key e as configurações de carga útil devem corresponder às configurações dos tópicos a serem consumidos pelo Trigger: Se Key As Avro estiver habilitado, todas as chaves dos registros a serem consumidos deverão estar no formato Avro. Se Payload As Avro estiver habilitado, todas as cargas úteis (valores) de registros a serem consumidos deverão estar no formato Avro.
Estratégias de offsets commit
Esse trigger possui 2 estratégias de offsets commit configuráveis:
1. Commit sem garantia de entrega
Todas as mensagens recebidas pelo trigger são enviadas ao pipeline de forma mais rápida, porém sem garantia de entrega (ou seja, não será esperado o retorno do pipeline para que o processamento da mensagem seja confirmado).
Com o auto commit ativado, utilizaremos o commit default implementado pelo Kafka. O envio de mensagem pode ser configurado por:
Envio batch
Todas as mensagens recebidas pelo polling do consumidor serão enviadas juntas em um array. Por exemplo, se durante esse poll forem retornadas 10 mensagens, então o trigger enviará um array com essas 10 mensagens.
Envio de mensagem uma a uma
Será feito um envio ao pipeline ao invés do array total (apenas 1 mensagem por vez). Por exemplo, se durante esse poll forem retornadas 10 mensagens, então o trigger enviará somente 1 mensagem por vez. No total, serão feitos 10 envios de mensagens ao pipeline.
2. Commit com garantia de entrega
O trigger ficará responsável por realizar o offsets commit, que será feito após o recebimento de uma resposta de sucesso do pipeline. Somente o envio batch das mensagens é possível, através do qual todas as mensagens recebidas pelo polling do consumidor serão enviadas juntas em um array.
Por exemplo: se durante esse poll forem retornadas 10 mensagens, então o trigger enviará um array com essas 10 mensagens.
Poderá ocorrer um rebalanceamento dos consumidores e/ou das partições do Kafka. Caso isso ocorra entre o retorno da resposta do pipeline ao trigger, os offsets vão receber o commit. Isso pode acarretar em perdas ou mensagens duplicadas.
Autocommit "false" e Batch Mode "true"
Nessa opção, o poll realizado pode trazer um array de mensagens e o seu tamanho máximo é estipulado pelo Max Poll Records. As mensagens passam por commit somente depois que o pipeline retornar a transação com sucesso. Se ocorrer timeout durante o processamento do pipeline, as mensagens não vão passar por commit.
Autocommit "false" e Batch Mode "false"
Nessa opção, o poll vai enviar apenas 1 mensagem e não um array de mensagens. Assim, o throughput de envio/recebimento de mensagens é diminuído, mas a garantia do processamento bem sucedido é maior - ou seja, não há perda de mensagens.
Se houver rebalanceamento do Tópico no Broker do Kafka durante o processamento das mensagens e os consumidores tiverem que assumir outras partições, as mensagens vão passar por commit caso ocorra erro no término da execução do pipeline. Dessa maneira, as mensagens não vão ser processadas no poll seguinte.
Para contornar esse problema, recorra às configurações Autocommit "false" e Batch Mode "false".
Consumers
A configuração de consumidores impacta diretamente no throughput de recebimento e saída de mensagens quando o Kafka Trigger é ativado. O cenário ideal de utilização é ter a mesma quantidade de consumers configurados e partições em determinado tópico.
Caso haja mais consumers do que partições, os consumers excedentes ficarão idle até ocorra um aumento de partições. E, se esse aumento ocorrer, Kafka vai iniciar o processo de rebalanceamento de consumers.
Consumer Group
É o grupo de consumidores ao qual o seu pipeline vai fazer a subscrição no tópico do Kafka. Um tópico pode ter “n” Consumer Groups e cada um deles vai ter “n” consumidores que consomem os registros do tópico.
Cenário 1
Digamos que exista um tópico chamado kafka-topic, um pipeline que utilize um trigger configurado com o consumer group (groupId) nomeado digibee e um segundo pipeline que utilize um trigger configurado com o mesmo tópico, mas com um consumer group (groupId) nomeado digibee-2. Nesse caso, ambos os pipelines receberão as mesmas mensagens.
Cenário 2
Digamos que exista um tópico chamado kafka-topic, um pipeline que utilize um trigger configurado com o consumer group (groupId) nomeado digibee e um segundo pipeline que utilize um trigger configurado com o mesmo tópico e consumer group (digibee). Ambos os pipelines vão receber as mensagens passadas por esse tópico. No entanto, o Kafka fica encarregado de fazer o balanceamento das partições entre os consumers cadastrados nos dois triggers. Nesse caso, ambos os pipelines vão receber mensagens de forma intercalada, de acordo com a distribuição das partições.
Tecnologia
Autenticação usando Kerberos
Para utilizar a autenticação via Kerberos no Kafka Trigger é necessário ter cadastrado o arquivo de configuração “krb5.conf” no parâmetro de Realm. Caso não tenha feito isso, acione o nosso suporte via chat. Após concluir esse passo, basta configurar corretamente uma conta do tipo Kerberos e utilizá-la no componente.
Formato de mensagem na entrada do pipeline
Pipelines associados ao Kafka Trigger recebem a seguinte mensagem como entrada:
Atualizado