# Modelo de mecanismo de reprocessamento centralizado na Digibee

## Visão geral

Este documento apresenta um modelo de reprocessamento centralizado para a Digibee Integration Platform. Este é um modelo de uso comum que pode ser acionado por qualquer pipeline de qualquer fluxo de integração, sem a necessidade de implementações adicionais.&#x20;

## Cenário

Na maioria dos projetos de integração, o reprocessamento automático é necessário, por exemplo, para tentar novamente após uma falha temporária. Tradicionalmente, a lógica de reprocessamento é implementada de forma específica para cada fluxo e se limita a fluxos assíncronos que utilizam um pipeline do tipo evento na segunda etapa. Você pode encontrar um exemplo dessa abordagem no artigo [Estratégia de reprocessamento em integrações orientadas a eventos](/documentation/resources/pt-br/use-cases/reprocessing-strategy-in-event-driven-integrations.md).

Agora, imagine ter um único mecanismo de reprocessamento capaz de atender a todos os fluxos da sua empresa, independentemente da estrutura ou do tipo de acionamento, seguindo apenas alguns padrões de parametrização. Esse mecanismo seria altamente escalável e capaz de atender às demandas de reprocessamento de todos os seus fluxos de integração.

## Arquitetura

A implementação segue uma arquitetura assíncrona e orientada a eventos, composta por três etapas:

1. **Pipeline inicial:** Recebe a mensagem a ser reprocessada e a armazena em um repositório temporário ([Object Store](/documentation/connectors-and-triggers/connectors/structured-data/object-store.md)).
2. **Segundo pipeline:** Recupera periodicamente os identificadores (IDs) das mensagens e os publica para reprocessamento.
3. **Terceiro pipeline (pipeline de reprocessamento):** Consome o ID publicado, recupera a mensagem do repositório temporário e redireciona a mensagem para o fluxo original.

<figure><img src="/files/5UKa2udYdXjXwzKnPYSn" alt=""><figcaption></figcaption></figure>

### Principais componentes

{% stepper %}
{% step %}

#### Solicitação de reprocessamento

Um pipeline do tipo **event** recebe a mensagem e a armazena em um [Object Store](/documentation/connectors-and-triggers/connectors/structured-data/object-store.md) temporário chamado `reprocess`. Esse pipeline pode ser escalado tanto **horizontalmente** (aumentando as réplicas) quanto **verticalmente** (aumentando o tamanho) para suportar o volume de requisições e o tamanho das mensagens da sua empresa.

Ele também garante que o payload esteja no formato adequado para reprocessamento e calcula informações importantes, como o timestamp de nova tentativa,com base no campo `intervalRetry`, que representa o intervalo, em minutos, antes da próxima tentativa.

No diagrama abaixo, essa etapa é realizada pelo pipeline `add-reprocess-event`.

<figure><img src="/files/TeRFQxQzUNy6oU7wEAhx" alt=""><figcaption></figcaption></figure>

Exemplo de payload enviado ao mecanismo de reprocessamento:

```json
{
   "payload":{ 
      "customerId": 123456,
      "customerName": "Raimundo Nonato",
      "customerCity": "Maranguape",
      "customerState": "CE",
      "customerJob": "Professor",
   },
   "pipeOrEventName": "update-customer-event",
   "pipeType": "event",
   "maxRetry": 3,
   "intervalRetry": 5
}

```

A mensagem deve conter, além do payload:

* O nome do pipeline ou evento original.
* O tipo de acionamento do pipeline original (pipeType).
* O número máximo de tentativas de reprocessamento.
* O intervalo (em minutos) entre cada tentativa.
  {% endstep %}

{% step %}

#### Listagem e publicação de reprocessamentos pendentes

Após as mensagens serem armazenadas corretamente no Object Store `reprocess`, os próximos passos são:

* **Listar mensagens prontas para reprocessamento**

Um pipeline do tipo **scheduler** é responsável por listar as mensagens cujo intervalo de reprocessamento já foi atingido. O agendamento deve ocorrer no menor intervalo necessário entre os cenários de reprocessamento. Por exemplo, se o menor intervalo de reprocessamento é de 5 minutos, o scheduler deve ser executado a cada 5 minutos.&#x20;

* **Publicar cada ID das mensagens para o próximo evento**

Cada ID de mensagem listado é publicado no próximo evento. Como cada mensagem contém um timestamp de nova tentativa, o scheduler filtra e publica apenas aquelas que estão prontas para reprocessar. Esse processo é executado pelo pipeline `reprocess-timer` (como mostrado no diagrama):<br>

<figure><img src="/files/EL72ZN04YdwqC0LKh2Yu" alt=""><figcaption></figcaption></figure>

O pipeline `reprocess-timer` deve listar os IDs pendentes (de forma paginada) e publicá-los individualmente no evento `reprocessing-event`. Como sua função se limita à listagem e publicação, ele pode ser configurado com recursos mínimos.
{% endstep %}

{% step %}

#### Reenvio para novo processamento no pipeline original

O evento `reprocessing-event` é consumido por um pipeline de mesmo nome. Para cada ID publicado, o pipeline:

* Recupera o payload original do Object Store `reprocess`.
* Verifica se o limite de tentativas foi atingido.

Se o **limite não foi atingido**, a mensagem é reenviada para o fluxo original:

* Se o fluxo original for baseado em **evento**, a mensagem é enviada via evento.
* Caso contrário, utiliza-se o conector [Pipeline Executor](/documentation/connectors-and-triggers/pt-br/connectors/tools/pipeline-executor.md).

Se o **limite foi atingido**, o pipeline deve publicar uma notificação no mecanismo padrão de alertas da empresa (caso já exista ou possa ser construído).

O fluxo original deve estar preparado para receber o payload do mecanismo de reprocessamento. Por exemplo, se o pipeline original for um scheduler que monta o payload ao longo do fluxo a partir de um servidor de arquivos ou banco de dados, pode ser necessário criar uma rota que receba a entrada diretamente e continue o fluxo com o payload pronto.

O diagrama abaixo representa o pipeline `reprocess-event:`

<figure><img src="/files/TJObuNMydWZvAmq9pmgf" alt=""><figcaption></figcaption></figure>

Exemplo de payload que o mecanismo retorna para o fluxo original de processamento:

```json
{
 "customerId": 123456,
 "customerName": "Raimundo Nonato",
 "customerCity": "Maranguape",
 "customerState": "CE",
 "customerJob": "Professor",
 "_reprocess": {
   "forwardTo": {
     "name": "update-invoice-event",
     "type": "event"
   },
   "retry": {
     "intervalRetry": 5,
     "max": 2,
     "current": 1,
     "lastRetry": 1687462056888
   }
 },
 "_oId": "b1ba4fdd-4c0c-4991-a153-b715ebfe4c9c"
 }

```

Observe que o payload reenviado ao fluxo original inclui um campo `_reprocess.retry` com informações detalhadas sobre o reprocessamento. O pipeline `reprocess-event` deve incrementar o valor de `current` a cada novo envio. O fluxo original, caso precise devolver a mensagem ao mecanismo de reprocessamento, deve incluir esse valor atualizado na próxima chamada.

Assim como o pipeline de entrada, o pipeline de retorno também pode ser escalado horizontalmente e verticalmente conforme a necessidade de desempenho e volume.
{% endstep %}
{% endstepper %}

## Vantagens

* **Modelo de reprocessamento padronizado** independente da lógica de negócio.
* **Manutenção centralizada** sem necessidade de alterar os fluxos originais.
* **Escalabilidade independente** que não impacta o desempenho dos pipelines originais.
* **Integração facilitada** com mecanismos compartilhados como alertas, sem necessidade de modificar os pipelines existentes.

## Conclusão

Essa arquitetura possibilita um processo de reprocessamento escalável e centralizado para os fluxos de integração da sua empresa, evitando implementações customizadas para cada caso de uso.

Pipelines de evento na entrada e na saída do mecanismo permitem escalabilidade independente, prevenindo erros como `OutOfMemory` ou degradação de performance causados por altos volumes de reprocessamento.

Além disso, essa arquitetura promove o reuso ao atuar como um mecanismo compartilhado, centralizando a manutenção e evolução de seus componentes como, por exemplo, integrações com serviços compartilhados da empresa, como o mecanismo de notificações.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.digibee.com/documentation/resources/pt-br/best-practices/centralized-reprocessing-engine-model-on-digibee.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
