Tutorial de paginação - parte 3

O caminho EXTRACTING_DATA é onde os dados são recuperados de sua fonte e enviados em blocos para serem processados.

Nesse caminho, o fluxo envia cada registro para um pipeline de processamento de dados, cria um resumo da execução e atualiza os parâmetros da paginação a depender de em qual parte da migração o processo se encontra.

Se você quiser aprender mais sobre pipelines de processamento de dados, leia nossa documentação sobre arquitetura orientada a eventos.

Para construir o caminho EXTRACTING_DATA, siga esses passos:

1. Adicione um componente Log.

2. Configure a condição Choice desse caminho como $.control[?(@.step == 'EXTRACTING_DATA')].

Aplicando essa condição, o fluxo de integração segue esse caminho se o parâmetro step assumir valor EXTRACTING_DATA.

3. Adicione um componente que realiza iterações através da sua fonte de dados, como um componente For Each ou Stream DB V3

No nosso exemplo, utilizamos o Stream DB V3. Configure a conta, URL do banco de dados e nome da coluna de acordo com a fonte de dados e use o seguinte comando SQL:

select * from enb_person limit {{ message.control.start }}, {{ message.control.limit }}

Esse comando recupera um número de registros igual ao parâmetro limit, começando pelo registro cujo índice é igual ao parâmetro start.

No subpipeline onProcess desse componente, adicione um componente Event Publisher, configure o nome do evento para o evento ao qual o seu pipeline de processamento de dados é inscrito e configure a propriedade Body para {{ message.$ }} para enviar todo o payload.

Depois, gere uma mensagem de sucesso usando um componente JSON Generator. Configure o parâmetro JSON desse componente para:

{
    "success": true
}

No subpipeline onException desse componente, adicione um componente Log seguido de um Event Publisher. Este componente publica eventos de erro aos quais o seu pipeline de tratamento de erros está inscrito. Se você quiser aprender mais sobre pipelines de tratamento de erro, leia nossa documentação sobre arquitetura orientada a eventos.

Finalmente, adicione um componente Throw Error.

De volta ao caminho EXTRACTING_DATA,

4. Crie um resumo da execução usando um JSON Generator.

O componente Stream DB ou For Each usado no passo 3 envia como output um resumo da execução. Salve este resumo em uma propriedade JSON chamada summary configurando o parâmetro JSON desse componente como:

{
    "summary": {{ message.$ }}
}

5. Salve a propriedade summary usando um componente Session Management

6. Recupere a propriedade control usando um componente Session Management

7. Verifique se o processo de migração acabou usando um componente Choice

Quando o processo de migração acaba, os parâmetros de paginação são atualizados e um output informando o fim do processo é enviado no formato JSON. Para construir esse fluxo, siga esses passos:

1. Adicione um componente Log.

2. Configure a condição Choice desse caminho como $.[?(@.summary.total < @.control.limit)].

Usando essa condição, o fluxo segue esse caminho se o número de registros recuperados for menor que o parâmetro limit. Isso é uma indicação de que não há mais registros a serem recuperados neste momento.

3. Atualize os parâmetros de paginação utilizando um componente Object Store

Use o mesmo nome e ID dos Object Stores usados anteriormente, ative as opções Unique Index e Upsert e configure o parâmetro Document como:

{
    $set:{
        "start": 0,
        "end": null,
        "step": "FINISHED",
        "nextExecutionTimestamp": {{ FORMATDATE(FORMATDATE(SUMDATE( NOW() , "DAY", 1), "timestamp", "dd/MM/yyyy 01:00:00"), "dd/MM/yyyy HH:mm:ss", "timestamp") }}
    }
}

Este código atribui valor FINISHED para o parâmetro step. Isso significa que na próxima execução desse pipeline, o código seguirá o caminho FINISHED.

Note que o parâmetro start é configurado para o valor 0 porque essa é uma migração total do banco de dados. Para que a migração continuasse de onde parou no dia anterior, deveríamos alterar o valor de start para ser igual ao valor de end nessa iteração.

4. Recupere a propriedade control usando um componente Session Management.

5. Gere um output usando um componente JSON Generator.

Configure o parâmetro JSON como:

{
    "message": {{ CONCAT("A migração começará em ",  FORMATDATE( message.control.nextExecutionTimestamp, "timestamp", "dd/MM/yyyy HH:mm:ss")) }}
}

Agora, retorne ao componente Choice que verifica se o processo de migração acabou ou não. Se o processo não acabou, o fluxo atualiza os parâmetros de paginação e envia uma mensagem informando que o processo continuará.

Para construir esse fluxo, siga esses passos:

1. Adicione um componente Log

2. Configure a condição Choice desse caminho como otherwise

3. Atualize os parâmetros de paginação usando um componente Object Store

Use o mesmo nome e ID dos Object Stores usados anteriormente, ative as opções Unique Index e Upsert e configure o parâmetro Document como:

{
    $set:{
        "start": {{ message.control.end }},
        "end": {{ TOINT(SUM(message.control.end, message.control.limit)) }},
        "step": "EXTRACTING_DATA"
    }
}

4. Recupere os parâmetros de paginação usando um componente Object Store.

Use o mesmo nome e ID dos Object Stores usados anteriormente e defina a operação como Find by Object Store ID.

5. Crie um output usando um componente JSON Generator

Configure o parâmetro JSON desse componente como:

{
    "mensagem": "Os dados estão sendo migrados",
    "next_execution": {{ message.data[0] }}
}

Atualizado