# Modelo de mecanismo de reprocessamento centralizado na Digibee

## Visão geral

Este documento apresenta um modelo de reprocessamento centralizado para a Digibee Integration Platform. Este é um modelo de uso comum que pode ser acionado por qualquer pipeline de qualquer fluxo de integração, sem a necessidade de implementações adicionais.&#x20;

## Cenário

Na maioria dos projetos de integração, o reprocessamento automático é necessário, por exemplo, para tentar novamente após uma falha temporária. Tradicionalmente, a lógica de reprocessamento é implementada de forma específica para cada fluxo e se limita a fluxos assíncronos que utilizam um pipeline do tipo evento na segunda etapa. Você pode encontrar um exemplo dessa abordagem no artigo [Estratégia de reprocessamento em integrações orientadas a eventos](https://docs.digibee.com/documentation/resources/pt-br/use-cases/reprocessing-strategy-in-event-driven-integrations).

Agora, imagine ter um único mecanismo de reprocessamento capaz de atender a todos os fluxos da sua empresa, independentemente da estrutura ou do tipo de acionamento, seguindo apenas alguns padrões de parametrização. Esse mecanismo seria altamente escalável e capaz de atender às demandas de reprocessamento de todos os seus fluxos de integração.

## Arquitetura

A implementação segue uma arquitetura assíncrona e orientada a eventos, composta por três etapas:

1. **Pipeline inicial:** Recebe a mensagem a ser reprocessada e a armazena em um repositório temporário ([Object Store](https://app.gitbook.com/s/EKM2LD3uNAckQgy1OUyZ/connectors/structured-data/object-store)).
2. **Segundo pipeline:** Recupera periodicamente os identificadores (IDs) das mensagens e os publica para reprocessamento.
3. **Terceiro pipeline (pipeline de reprocessamento):** Consome o ID publicado, recupera a mensagem do repositório temporário e redireciona a mensagem para o fluxo original.

<figure><img src="https://2538031102-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FXfrDexGOLMin51pAiWkq%2Fuploads%2FqkGcp4nXLdXD2jRrCFMI%2FReprocessing_Model_1.png?alt=media&#x26;token=0362cd29-95f4-48b5-8809-12cd6e1b7cf2" alt=""><figcaption></figcaption></figure>

### Principais componentes

{% stepper %}
{% step %}

#### Solicitação de reprocessamento

Um pipeline do tipo **event** recebe a mensagem e a armazena em um [Object Store](https://app.gitbook.com/s/EKM2LD3uNAckQgy1OUyZ/connectors/structured-data/object-store) temporário chamado `reprocess`. Esse pipeline pode ser escalado tanto **horizontalmente** (aumentando as réplicas) quanto **verticalmente** (aumentando o tamanho) para suportar o volume de requisições e o tamanho das mensagens da sua empresa.

Ele também garante que o payload esteja no formato adequado para reprocessamento e calcula informações importantes, como o timestamp de nova tentativa,com base no campo `intervalRetry`, que representa o intervalo, em minutos, antes da próxima tentativa.

No diagrama abaixo, essa etapa é realizada pelo pipeline `add-reprocess-event`.

<figure><img src="https://2538031102-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FXfrDexGOLMin51pAiWkq%2Fuploads%2Fr2SBgfSQ7FiTZJTZBjrM%2FReprocessing_Model_2.png?alt=media&#x26;token=8153423f-796b-411a-b13a-40f5ff5a4422" alt=""><figcaption></figcaption></figure>

Exemplo de payload enviado ao mecanismo de reprocessamento:

```json
{
   "payload":{ 
      "customerId": 123456,
      "customerName": "Raimundo Nonato",
      "customerCity": "Maranguape",
      "customerState": "CE",
      "customerJob": "Professor",
   },
   "pipeOrEventName": "update-customer-event",
   "pipeType": "event",
   "maxRetry": 3,
   "intervalRetry": 5
}

```

A mensagem deve conter, além do payload:

* O nome do pipeline ou evento original.
* O tipo de acionamento do pipeline original (pipeType).
* O número máximo de tentativas de reprocessamento.
* O intervalo (em minutos) entre cada tentativa.
  {% endstep %}

{% step %}

#### Listagem e publicação de reprocessamentos pendentes

Após as mensagens serem armazenadas corretamente no Object Store `reprocess`, os próximos passos são:

* **Listar mensagens prontas para reprocessamento**

Um pipeline do tipo **scheduler** é responsável por listar as mensagens cujo intervalo de reprocessamento já foi atingido. O agendamento deve ocorrer no menor intervalo necessário entre os cenários de reprocessamento. Por exemplo, se o menor intervalo de reprocessamento é de 5 minutos, o scheduler deve ser executado a cada 5 minutos.&#x20;

* **Publicar cada ID das mensagens para o próximo evento**

Cada ID de mensagem listado é publicado no próximo evento. Como cada mensagem contém um timestamp de nova tentativa, o scheduler filtra e publica apenas aquelas que estão prontas para reprocessar. Esse processo é executado pelo pipeline `reprocess-timer` (como mostrado no diagrama):<br>

<figure><img src="https://2538031102-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FXfrDexGOLMin51pAiWkq%2Fuploads%2FJiEwerpjiIHlqoT9R52L%2FReprocessing_Model_3.png?alt=media&#x26;token=598db494-0d0d-4299-826a-15e7c6b4cf9a" alt=""><figcaption></figcaption></figure>

O pipeline `reprocess-timer` deve listar os IDs pendentes (de forma paginada) e publicá-los individualmente no evento `reprocessing-event`. Como sua função se limita à listagem e publicação, ele pode ser configurado com recursos mínimos.
{% endstep %}

{% step %}

#### Reenvio para novo processamento no pipeline original

O evento `reprocessing-event` é consumido por um pipeline de mesmo nome. Para cada ID publicado, o pipeline:

* Recupera o payload original do Object Store `reprocess`.
* Verifica se o limite de tentativas foi atingido.

Se o **limite não foi atingido**, a mensagem é reenviada para o fluxo original:

* Se o fluxo original for baseado em **evento**, a mensagem é enviada via evento.
* Caso contrário, utiliza-se o conector [Pipeline Executor](https://app.gitbook.com/s/SKBJ6ZiEWBU93x170HH4/connectors/tools/pipeline-executor).

Se o **limite foi atingido**, o pipeline deve publicar uma notificação no mecanismo padrão de alertas da empresa (caso já exista ou possa ser construído).

O fluxo original deve estar preparado para receber o payload do mecanismo de reprocessamento. Por exemplo, se o pipeline original for um scheduler que monta o payload ao longo do fluxo a partir de um servidor de arquivos ou banco de dados, pode ser necessário criar uma rota que receba a entrada diretamente e continue o fluxo com o payload pronto.

O diagrama abaixo representa o pipeline `reprocess-event:`

<figure><img src="https://2538031102-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FXfrDexGOLMin51pAiWkq%2Fuploads%2FNLrukepbPNK0SasKNoTv%2FReprocessing_Model_4.png?alt=media&#x26;token=c4b85f2c-28f0-4215-bc33-15cdc9c1c16b" alt=""><figcaption></figcaption></figure>

Exemplo de payload que o mecanismo retorna para o fluxo original de processamento:

```json
{
 "customerId": 123456,
 "customerName": "Raimundo Nonato",
 "customerCity": "Maranguape",
 "customerState": "CE",
 "customerJob": "Professor",
 "_reprocess": {
   "forwardTo": {
     "name": "update-invoice-event",
     "type": "event"
   },
   "retry": {
     "intervalRetry": 5,
     "max": 2,
     "current": 1,
     "lastRetry": 1687462056888
   }
 },
 "_oId": "b1ba4fdd-4c0c-4991-a153-b715ebfe4c9c"
 }

```

Observe que o payload reenviado ao fluxo original inclui um campo `_reprocess.retry` com informações detalhadas sobre o reprocessamento. O pipeline `reprocess-event` deve incrementar o valor de `current` a cada novo envio. O fluxo original, caso precise devolver a mensagem ao mecanismo de reprocessamento, deve incluir esse valor atualizado na próxima chamada.

Assim como o pipeline de entrada, o pipeline de retorno também pode ser escalado horizontalmente e verticalmente conforme a necessidade de desempenho e volume.
{% endstep %}
{% endstepper %}

## Vantagens

* **Modelo de reprocessamento padronizado** independente da lógica de negócio.
* **Manutenção centralizada** sem necessidade de alterar os fluxos originais.
* **Escalabilidade independente** que não impacta o desempenho dos pipelines originais.
* **Integração facilitada** com mecanismos compartilhados como alertas, sem necessidade de modificar os pipelines existentes.

## Conclusão

Essa arquitetura possibilita um processo de reprocessamento escalável e centralizado para os fluxos de integração da sua empresa, evitando implementações customizadas para cada caso de uso.

Pipelines de evento na entrada e na saída do mecanismo permitem escalabilidade independente, prevenindo erros como `OutOfMemory` ou degradação de performance causados por altos volumes de reprocessamento.

Além disso, essa arquitetura promove o reuso ao atuar como um mecanismo compartilhado, centralizando a manutenção e evolução de seus componentes como, por exemplo, integrações com serviços compartilhados da empresa, como o mecanismo de notificações.
