# Tutorial de paginação - parte 3

O caminho EXTRACTING\_DATA é onde os dados são recuperados de sua fonte e enviados em blocos para serem processados.

Nesse caminho, o fluxo envia cada registro para um *pipeline* de processamento de dados, cria um resumo da execução e atualiza os parâmetros da paginação a depender de em qual parte da migração o processo se encontra.

Se você quiser aprender mais sobre *pipelines* de processamento de dados, leia nossa [documentação sobre arquitetura orientada a eventos](/documentation/resources/pt-br/best-practices/event-oriented-architecture.md).

<figure><img src="/files/v9xXXVQeU1cD6T2RPeMN" alt=""><figcaption><p><strong>O caminho EXTRACTING_DATA</strong></p></figcaption></figure>

Para construir o caminho EXTRACTING\_DATA, siga esses passos:

&#x20; 1\. Adicione um componente Log.

&#x20; 2\. Configure a condição Choice desse caminho como `$.control[?(@.step == 'EXTRACTING_DATA')]`.

Aplicando essa condição, o fluxo de integração segue esse caminho se o parâmetro *step* assumir valor `EXTRACTING_DATA`.

&#x20; 3\. Adicione um componente que realiza iterações através da sua fonte de dados, como um componente For Each ou Stream DB V3

No nosso exemplo, utilizamos o Stream DB V3. Configure a conta, URL do banco de dados e nome da coluna de acordo com a fonte de dados e use o seguinte comando SQL:

```sql
select * from enb_person limit {{ message.control.start }}, {{ message.control.limit }}
```

Esse comando recupera um número de registros igual ao parâmetro *limit*, começando pelo registro cujo índice é igual ao parâmetro *start*.

No subpipeline onProcess desse componente, adicione um componente Event Publisher, configure o nome do evento para o evento ao qual o seu *pipeline* de processamento de dados é inscrito e configure a propriedade *Body* para `{{ message.$ }}` para enviar todo o *payload*.

Depois, gere uma mensagem de sucesso usando um componente JSON Generator. Configure o parâmetro JSON desse componente para:

```json
{
    "success": true
}
```

<figure><img src="/files/sw5OYPBlYTfm8tBI8eGh" alt=""><figcaption><p><em>Subpipeline</em> onProcess do Stream DB V3</p></figcaption></figure>

No *subpipeline* onException desse componente, adicione um componente Log seguido de um Event Publisher. Este componente publica eventos de erro aos quais o seu *pipeline* de tratamento de erros está inscrito. Se você quiser aprender mais sobre pipelines de tratamento de erro, leia [nossa documentação sobre arquitetura orientada a eventos](/documentation/resources/pt-br/best-practices/event-oriented-architecture.md).

Finalmente, adicione um componente Throw Error.

<figure><img src="/files/AyvK2KRy4TNXiFPsWhDd" alt=""><figcaption><p><em>Subpipeline</em> onException do Stream DB V3</p></figcaption></figure>

De volta ao caminho EXTRACTING\_DATA,

&#x20; 4\. Crie um resumo da execução usando um JSON Generator.

O componente Stream DB ou For Each usado no passo 3 envia como *output* um resumo da execução. Salve este resumo em uma propriedade JSON chamada *summary* configurando o parâmetro JSON desse componente como:

```json
{
    "summary": {{ message.$ }}
}
```

&#x20; 5\. Salve a propriedade `summary` usando um componente Session Management

&#x20; 6\. Recupere a propriedade `control` usando um componente Session Management

&#x20; 7\. Verifique se o processo de migração acabou usando um componente Choice

Quando o processo de migração acaba, os parâmetros de paginação são atualizados e um *output* informando o fim do processo é enviado no formato JSON. Para construir esse fluxo, siga esses passos:

&#x20; 1\. Adicione um componente Log.

&#x20; 2\. Configure a condição Choice desse caminho como `$.[?(@.summary.total < @.control.limit)]`.

Usando essa condição, o fluxo segue esse caminho se o número de registros recuperados for menor que o parâmetro *limit*. Isso é uma indicação de que não há mais registros a serem recuperados neste momento.

&#x20; 3\. Atualize os parâmetros de paginação utilizando um componente Object Store

Use o mesmo nome e ID dos Object Stores usados anteriormente, ative as opções *Unique Index* e *Upsert* e configure o parâmetro *Document* como:

```json
{
    $set:{
        "start": 0,
        "end": null,
        "step": "FINISHED",
        "nextExecutionTimestamp": {{ FORMATDATE(FORMATDATE(SUMDATE( NOW() , "DAY", 1), "timestamp", "dd/MM/yyyy 01:00:00"), "dd/MM/yyyy HH:mm:ss", "timestamp") }}
    }
}
```

Este código atribui valor `FINISHED` para o parâmetro *step*. Isso significa que na próxima execução desse *pipeline*, o código seguirá o caminho FINISHED.

Note que o parâmetro `start` é configurado para o valor `0` porque essa é uma migração total do banco de dados. Para que a migração continuasse de onde parou no dia anterior, deveríamos alterar o valor de *start* para ser igual ao valor de `end` nessa iteração.

&#x20; 4\. Recupere a propriedade `control` usando um componente Session Management.

&#x20; 5\. Gere um *output* usando um componente JSON Generator.

Configure o parâmetro JSON como:

```json
{
    "message": {{ CONCAT("A migração começará em ",  FORMATDATE( message.control.nextExecutionTimestamp, "timestamp", "dd/MM/yyyy HH:mm:ss")) }}
}
```

Agora, retorne ao componente Choice que verifica se o processo de migração acabou ou não. Se o processo não acabou, o fluxo atualiza os parâmetros de paginação e envia uma mensagem informando que o processo continuará.

Para construir esse fluxo, siga esses passos:

&#x20; 1\. Adicione um componente Log

&#x20; 2\. Configure a condição Choice desse caminho como `otherwise`

&#x20; 3\. Atualize os parâmetros de paginação usando um componente Object Store

Use o mesmo nome e ID dos Object Stores usados anteriormente, ative as opções *Unique Index* e *Upsert* e configure o parâmetro *Document* como:

```json
{
    $set:{
        "start": {{ message.control.end }},
        "end": {{ TOINT(SUM(message.control.end, message.control.limit)) }},
        "step": "EXTRACTING_DATA"
    }
}
```

&#x20; 4\. Recupere os parâmetros de paginação usando um componente Object Store.

Use o mesmo nome e ID dos Object Stores usados anteriormente e defina a operação como *Find by Object Store ID*.

&#x20; 5\. Crie um output usando um componente JSON Generator

Configure o parâmetro JSON desse componente como:

```json
{
    "mensagem": "Os dados estão sendo migrados",
    "next_execution": {{ message.data[0] }}
}
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.digibee.com/documentation/resources/pt-br/best-practices/pagination-tutorial/pagination-tutorial-part-3.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
