Modelo de mecanismo de reprocessamento centralizado na Digibee
Visão geral
Este documento apresenta um modelo de reprocessamento centralizado para a Digibee Integration Platform. Este é um modelo de uso comum que pode ser acionado por qualquer pipeline de qualquer fluxo de integração, sem a necessidade de implementações adicionais.
Cenário
Na maioria dos projetos de integração, o reprocessamento automático é necessário, por exemplo, para tentar novamente após uma falha temporária. Tradicionalmente, a lógica de reprocessamento é implementada de forma específica para cada fluxo e se limita a fluxos assíncronos que utilizam um pipeline do tipo evento na segunda etapa. Você pode encontrar um exemplo dessa abordagem no artigo Estratégia de reprocessamento em integrações orientadas a eventos.
Agora, imagine ter um único mecanismo de reprocessamento capaz de atender a todos os fluxos da sua empresa, independentemente da estrutura ou do tipo de acionamento, seguindo apenas alguns padrões de parametrização. Esse mecanismo seria altamente escalável e capaz de atender às demandas de reprocessamento de todos os seus fluxos de integração.
Arquitetura
A implementação segue uma arquitetura assíncrona e orientada a eventos, composta por três etapas:
Pipeline inicial: Recebe a mensagem a ser reprocessada e a armazena em um repositório temporário (Object Store).
Segundo pipeline: Recupera periodicamente os identificadores (IDs) das mensagens e os publica para reprocessamento.
Terceiro pipeline (pipeline de reprocessamento): Consome o ID publicado, recupera a mensagem do repositório temporário e redireciona a mensagem para o fluxo original.

Principais componentes
Solicitação de reprocessamento
Um pipeline do tipo event recebe a mensagem e a armazena em um Object Store temporário chamado reprocess
. Esse pipeline pode ser escalado tanto horizontalmente (aumentando as réplicas) quanto verticalmente (aumentando o tamanho) para suportar o volume de requisições e o tamanho das mensagens da sua empresa.
Ele também garante que o payload esteja no formato adequado para reprocessamento e calcula informações importantes, como o timestamp de nova tentativa,com base no campo intervalRetry
, que representa o intervalo, em minutos, antes da próxima tentativa.
No diagrama abaixo, essa etapa é realizada pelo pipeline add-reprocess-event
.

Exemplo de payload enviado ao mecanismo de reprocessamento:
{
"payload":{
"customerId": 123456,
"customerName": "Raimundo Nonato",
"customerCity": "Maranguape",
"customerState": "CE",
"customerJob": "Professor",
},
"pipeOrEventName": "update-customer-event",
"pipeType": "event",
"maxRetry": 3,
"intervalRetry": 5
}
A mensagem deve conter, além do payload:
O nome do pipeline ou evento original.
O tipo de acionamento do pipeline original (pipeType).
O número máximo de tentativas de reprocessamento.
O intervalo (em minutos) entre cada tentativa.
Listagem e publicação de reprocessamentos pendentes
Após as mensagens serem armazenadas corretamente no Object Store reprocess
, os próximos passos são:
Listar mensagens prontas para reprocessamento
Um pipeline do tipo scheduler é responsável por listar as mensagens cujo intervalo de reprocessamento já foi atingido. O agendamento deve ocorrer no menor intervalo necessário entre os cenários de reprocessamento. Por exemplo, se o menor intervalo de reprocessamento é de 5 minutos, o scheduler deve ser executado a cada 5 minutos.
Publicar cada ID das mensagens para o próximo evento
Cada ID de mensagem listado é publicado no próximo evento. Como cada mensagem contém um timestamp de nova tentativa, o scheduler filtra e publica apenas aquelas que estão prontas para reprocessar. Esse processo é executado pelo pipeline reprocess-timer
(como mostrado no diagrama):

O pipeline reprocess-timer
deve listar os IDs pendentes (de forma paginada) e publicá-los individualmente no evento reprocessing-event
. Como sua função se limita à listagem e publicação, ele pode ser configurado com recursos mínimos.
Reenvio para novo processamento no pipeline original
O evento reprocessing-event
é consumido por um pipeline de mesmo nome. Para cada ID publicado, o pipeline:
Recupera o payload original do Object Store
reprocess
.Verifica se o limite de tentativas foi atingido.
Se o limite não foi atingido, a mensagem é reenviada para o fluxo original:
Se o fluxo original for baseado em evento, a mensagem é enviada via evento.
Caso contrário, utiliza-se o conector Pipeline Executor.
Se o limite foi atingido, o pipeline deve publicar uma notificação no mecanismo padrão de alertas da empresa (caso já exista ou possa ser construído).
O fluxo original deve estar preparado para receber o payload do mecanismo de reprocessamento. Por exemplo, se o pipeline original for um scheduler que monta o payload ao longo do fluxo a partir de um servidor de arquivos ou banco de dados, pode ser necessário criar uma rota que receba a entrada diretamente e continue o fluxo com o payload pronto.
O diagrama abaixo representa o pipeline reprocess-event:

Exemplo de payload que o mecanismo retorna para o fluxo original de processamento:
{
"customerId": 123456,
"customerName": "Raimundo Nonato",
"customerCity": "Maranguape",
"customerState": "CE",
"customerJob": "Professor",
"_reprocess": {
"forwardTo": {
"name": "update-invoice-event",
"type": "event"
},
"retry": {
"intervalRetry": 5,
"max": 2,
"current": 1,
"lastRetry": 1687462056888
}
},
"_oId": "b1ba4fdd-4c0c-4991-a153-b715ebfe4c9c"
}
Observe que o payload reenviado ao fluxo original inclui um campo _reprocess.retry
com informações detalhadas sobre o reprocessamento. O pipeline reprocess-event
deve incrementar o valor de current
a cada novo envio. O fluxo original, caso precise devolver a mensagem ao mecanismo de reprocessamento, deve incluir esse valor atualizado na próxima chamada.
Assim como o pipeline de entrada, o pipeline de retorno também pode ser escalado horizontalmente e verticalmente conforme a necessidade de desempenho e volume.
Vantagens
Modelo de reprocessamento padronizado independente da lógica de negócio.
Manutenção centralizada sem necessidade de alterar os fluxos originais.
Escalabilidade independente que não impacta o desempenho dos pipelines originais.
Integração facilitada com mecanismos compartilhados como alertas, sem necessidade de modificar os pipelines existentes.
Conclusão
Essa arquitetura possibilita um processo de reprocessamento escalável e centralizado para os fluxos de integração da sua empresa, evitando implementações customizadas para cada caso de uso.
Pipelines de evento na entrada e na saída do mecanismo permitem escalabilidade independente, prevenindo erros como OutOfMemory
ou degradação de performance causados por altos volumes de reprocessamento.
Além disso, essa arquitetura promove o reuso ao atuar como um mecanismo compartilhado, centralizando a manutenção e evolução de seus componentes como, por exemplo, integrações com serviços compartilhados da empresa, como o mecanismo de notificações.
Atualizado
Isto foi útil?