Modelo de mecanismo de reprocessamento centralizado na Digibee

Visão geral

Este documento apresenta um modelo de reprocessamento centralizado para a Digibee Integration Platform. Este é um modelo de uso comum que pode ser acionado por qualquer pipeline de qualquer fluxo de integração, sem a necessidade de implementações adicionais.

Cenário

Na maioria dos projetos de integração, o reprocessamento automático é necessário, por exemplo, para tentar novamente após uma falha temporária. Tradicionalmente, a lógica de reprocessamento é implementada de forma específica para cada fluxo e se limita a fluxos assíncronos que utilizam um pipeline do tipo evento na segunda etapa. Você pode encontrar um exemplo dessa abordagem no artigo Estratégia de reprocessamento em integrações orientadas a eventos.

Agora, imagine ter um único mecanismo de reprocessamento capaz de atender a todos os fluxos da sua empresa, independentemente da estrutura ou do tipo de acionamento, seguindo apenas alguns padrões de parametrização. Esse mecanismo seria altamente escalável e capaz de atender às demandas de reprocessamento de todos os seus fluxos de integração.

Arquitetura

A implementação segue uma arquitetura assíncrona e orientada a eventos, composta por três etapas:

  1. Pipeline inicial: Recebe a mensagem a ser reprocessada e a armazena em um repositório temporário (Object Store).

  2. Segundo pipeline: Recupera periodicamente os identificadores (IDs) das mensagens e os publica para reprocessamento.

  3. Terceiro pipeline (pipeline de reprocessamento): Consome o ID publicado, recupera a mensagem do repositório temporário e redireciona a mensagem para o fluxo original.

Principais componentes

1

Solicitação de reprocessamento

Um pipeline do tipo event recebe a mensagem e a armazena em um Object Store temporário chamado reprocess. Esse pipeline pode ser escalado tanto horizontalmente (aumentando as réplicas) quanto verticalmente (aumentando o tamanho) para suportar o volume de requisições e o tamanho das mensagens da sua empresa.

Ele também garante que o payload esteja no formato adequado para reprocessamento e calcula informações importantes, como o timestamp de nova tentativa,com base no campo intervalRetry, que representa o intervalo, em minutos, antes da próxima tentativa.

No diagrama abaixo, essa etapa é realizada pelo pipeline add-reprocess-event.

Exemplo de payload enviado ao mecanismo de reprocessamento:

{
   "payload":{ 
      "customerId": 123456,
      "customerName": "Raimundo Nonato",
      "customerCity": "Maranguape",
      "customerState": "CE",
      "customerJob": "Professor",
   },
   "pipeOrEventName": "update-customer-event",
   "pipeType": "event",
   "maxRetry": 3,
   "intervalRetry": 5
}

A mensagem deve conter, além do payload:

  • O nome do pipeline ou evento original.

  • O tipo de acionamento do pipeline original (pipeType).

  • O número máximo de tentativas de reprocessamento.

  • O intervalo (em minutos) entre cada tentativa.

2

Listagem e publicação de reprocessamentos pendentes

Após as mensagens serem armazenadas corretamente no Object Store reprocess, os próximos passos são:

  • Listar mensagens prontas para reprocessamento

Um pipeline do tipo scheduler é responsável por listar as mensagens cujo intervalo de reprocessamento já foi atingido. O agendamento deve ocorrer no menor intervalo necessário entre os cenários de reprocessamento. Por exemplo, se o menor intervalo de reprocessamento é de 5 minutos, o scheduler deve ser executado a cada 5 minutos.

  • Publicar cada ID das mensagens para o próximo evento

Cada ID de mensagem listado é publicado no próximo evento. Como cada mensagem contém um timestamp de nova tentativa, o scheduler filtra e publica apenas aquelas que estão prontas para reprocessar. Esse processo é executado pelo pipeline reprocess-timer (como mostrado no diagrama):

O pipeline reprocess-timer deve listar os IDs pendentes (de forma paginada) e publicá-los individualmente no evento reprocessing-event. Como sua função se limita à listagem e publicação, ele pode ser configurado com recursos mínimos.

3

Reenvio para novo processamento no pipeline original

O evento reprocessing-event é consumido por um pipeline de mesmo nome. Para cada ID publicado, o pipeline:

  • Recupera o payload original do Object Store reprocess.

  • Verifica se o limite de tentativas foi atingido.

Se o limite não foi atingido, a mensagem é reenviada para o fluxo original:

  • Se o fluxo original for baseado em evento, a mensagem é enviada via evento.

  • Caso contrário, utiliza-se o conector Pipeline Executor.

Se o limite foi atingido, o pipeline deve publicar uma notificação no mecanismo padrão de alertas da empresa (caso já exista ou possa ser construído).

O fluxo original deve estar preparado para receber o payload do mecanismo de reprocessamento. Por exemplo, se o pipeline original for um scheduler que monta o payload ao longo do fluxo a partir de um servidor de arquivos ou banco de dados, pode ser necessário criar uma rota que receba a entrada diretamente e continue o fluxo com o payload pronto.

O diagrama abaixo representa o pipeline reprocess-event:

Exemplo de payload que o mecanismo retorna para o fluxo original de processamento:

{
 "customerId": 123456,
 "customerName": "Raimundo Nonato",
 "customerCity": "Maranguape",
 "customerState": "CE",
 "customerJob": "Professor",
 "_reprocess": {
   "forwardTo": {
     "name": "update-invoice-event",
     "type": "event"
   },
   "retry": {
     "intervalRetry": 5,
     "max": 2,
     "current": 1,
     "lastRetry": 1687462056888
   }
 },
 "_oId": "b1ba4fdd-4c0c-4991-a153-b715ebfe4c9c"
 }

Observe que o payload reenviado ao fluxo original inclui um campo _reprocess.retry com informações detalhadas sobre o reprocessamento. O pipeline reprocess-event deve incrementar o valor de current a cada novo envio. O fluxo original, caso precise devolver a mensagem ao mecanismo de reprocessamento, deve incluir esse valor atualizado na próxima chamada.

Assim como o pipeline de entrada, o pipeline de retorno também pode ser escalado horizontalmente e verticalmente conforme a necessidade de desempenho e volume.

Vantagens

  • Modelo de reprocessamento padronizado independente da lógica de negócio.

  • Manutenção centralizada sem necessidade de alterar os fluxos originais.

  • Escalabilidade independente que não impacta o desempenho dos pipelines originais.

  • Integração facilitada com mecanismos compartilhados como alertas, sem necessidade de modificar os pipelines existentes.

Conclusão

Essa arquitetura possibilita um processo de reprocessamento escalável e centralizado para os fluxos de integração da sua empresa, evitando implementações customizadas para cada caso de uso.

Pipelines de evento na entrada e na saída do mecanismo permitem escalabilidade independente, prevenindo erros como OutOfMemory ou degradação de performance causados por altos volumes de reprocessamento.

Além disso, essa arquitetura promove o reuso ao atuar como um mecanismo compartilhado, centralizando a manutenção e evolução de seus componentes como, por exemplo, integrações com serviços compartilhados da empresa, como o mecanismo de notificações.

Atualizado

Isto foi útil?