For the complete documentation index, see llms.txt. This page is also available as Markdown.

Modelo de mecanismo de reprocessamento centralizado na Digibee

Visão geral

Este documento apresenta um modelo de reprocessamento centralizado para a Digibee Integration Platform. Este é um modelo de uso comum que pode ser acionado por qualquer pipeline de qualquer fluxo de integração, sem a necessidade de implementações adicionais.

Cenário

Na maioria dos projetos de integração, o reprocessamento automático é necessário, por exemplo, para tentar novamente após uma falha temporária. Tradicionalmente, a lógica de reprocessamento é implementada de forma específica para cada fluxo e se limita a fluxos assíncronos que utilizam um pipeline do tipo evento na segunda etapa. Você pode encontrar um exemplo dessa abordagem no artigo Estratégia de reprocessamento em integrações orientadas a eventos.

Agora, imagine ter um único mecanismo de reprocessamento capaz de atender a todos os fluxos da sua empresa, independentemente da estrutura ou do tipo de acionamento, seguindo apenas alguns padrões de parametrização. Esse mecanismo seria altamente escalável e capaz de atender às demandas de reprocessamento de todos os seus fluxos de integração.

Arquitetura

A implementação segue uma arquitetura assíncrona e orientada a eventos, composta por três etapas:

  1. Pipeline inicial: Recebe a mensagem a ser reprocessada e a armazena em um repositório temporário (Object Store).

  2. Segundo pipeline: Recupera periodicamente os identificadores (IDs) das mensagens e os publica para reprocessamento.

  3. Terceiro pipeline (pipeline de reprocessamento): Consome o ID publicado, recupera a mensagem do repositório temporário e redireciona a mensagem para o fluxo original.

Principais componentes

1

Solicitação de reprocessamento

Um pipeline do tipo event recebe a mensagem e a armazena em um Object Store temporário chamado reprocess. Esse pipeline pode ser escalado tanto horizontalmente (aumentando as réplicas) quanto verticalmente (aumentando o tamanho) para suportar o volume de requisições e o tamanho das mensagens da sua empresa.

Ele também garante que o payload esteja no formato adequado para reprocessamento e calcula informações importantes, como o timestamp de nova tentativa,com base no campo intervalRetry, que representa o intervalo, em minutos, antes da próxima tentativa.

No diagrama abaixo, essa etapa é realizada pelo pipeline add-reprocess-event.

Exemplo de payload enviado ao mecanismo de reprocessamento:

A mensagem deve conter, além do payload:

  • O nome do pipeline ou evento original.

  • O tipo de acionamento do pipeline original (pipeType).

  • O número máximo de tentativas de reprocessamento.

  • O intervalo (em minutos) entre cada tentativa.

2

Listagem e publicação de reprocessamentos pendentes

Após as mensagens serem armazenadas corretamente no Object Store reprocess, os próximos passos são:

  • Listar mensagens prontas para reprocessamento

Um pipeline do tipo scheduler é responsável por listar as mensagens cujo intervalo de reprocessamento já foi atingido. O agendamento deve ocorrer no menor intervalo necessário entre os cenários de reprocessamento. Por exemplo, se o menor intervalo de reprocessamento é de 5 minutos, o scheduler deve ser executado a cada 5 minutos.

  • Publicar cada ID das mensagens para o próximo evento

Cada ID de mensagem listado é publicado no próximo evento. Como cada mensagem contém um timestamp de nova tentativa, o scheduler filtra e publica apenas aquelas que estão prontas para reprocessar. Esse processo é executado pelo pipeline reprocess-timer (como mostrado no diagrama):

O pipeline reprocess-timer deve listar os IDs pendentes (de forma paginada) e publicá-los individualmente no evento reprocessing-event. Como sua função se limita à listagem e publicação, ele pode ser configurado com recursos mínimos.

3

Reenvio para novo processamento no pipeline original

O evento reprocessing-event é consumido por um pipeline de mesmo nome. Para cada ID publicado, o pipeline:

  • Recupera o payload original do Object Store reprocess.

  • Verifica se o limite de tentativas foi atingido.

Se o limite não foi atingido, a mensagem é reenviada para o fluxo original:

  • Se o fluxo original for baseado em evento, a mensagem é enviada via evento.

  • Caso contrário, utiliza-se o conector Pipeline Executor.

Se o limite foi atingido, o pipeline deve publicar uma notificação no mecanismo padrão de alertas da empresa (caso já exista ou possa ser construído).

O fluxo original deve estar preparado para receber o payload do mecanismo de reprocessamento. Por exemplo, se o pipeline original for um scheduler que monta o payload ao longo do fluxo a partir de um servidor de arquivos ou banco de dados, pode ser necessário criar uma rota que receba a entrada diretamente e continue o fluxo com o payload pronto.

O diagrama abaixo representa o pipeline reprocess-event:

Exemplo de payload que o mecanismo retorna para o fluxo original de processamento:

Observe que o payload reenviado ao fluxo original inclui um campo _reprocess.retry com informações detalhadas sobre o reprocessamento. O pipeline reprocess-event deve incrementar o valor de current a cada novo envio. O fluxo original, caso precise devolver a mensagem ao mecanismo de reprocessamento, deve incluir esse valor atualizado na próxima chamada.

Assim como o pipeline de entrada, o pipeline de retorno também pode ser escalado horizontalmente e verticalmente conforme a necessidade de desempenho e volume.

Vantagens

  • Modelo de reprocessamento padronizado independente da lógica de negócio.

  • Manutenção centralizada sem necessidade de alterar os fluxos originais.

  • Escalabilidade independente que não impacta o desempenho dos pipelines originais.

  • Integração facilitada com mecanismos compartilhados como alertas, sem necessidade de modificar os pipelines existentes.

Conclusão

Essa arquitetura possibilita um processo de reprocessamento escalável e centralizado para os fluxos de integração da sua empresa, evitando implementações customizadas para cada caso de uso.

Pipelines de evento na entrada e na saída do mecanismo permitem escalabilidade independente, prevenindo erros como OutOfMemory ou degradação de performance causados por altos volumes de reprocessamento.

Além disso, essa arquitetura promove o reuso ao atuar como um mecanismo compartilhado, centralizando a manutenção e evolução de seus componentes como, por exemplo, integrações com serviços compartilhados da empresa, como o mecanismo de notificações.

Atualizado

Isto foi útil?