Tutorial de paginação - parte 3

O caminho EXTRACTING_DATA é onde os dados são recuperados de sua fonte e enviados em blocos para serem processados.

Nesse caminho, o fluxo envia cada registro para um pipeline de processamento de dados, cria um resumo da execução e atualiza os parâmetros da paginação a depender de em qual parte da migração o processo se encontra.

Se você quiser aprender mais sobre pipelines de processamento de dados, leia nossa documentação sobre arquitetura orientada a eventos.

O caminho EXTRACTING_DATA

Para construir o caminho EXTRACTING_DATA, siga esses passos:

1. Adicione um componente Log.

2. Configure a condição Choice desse caminho como $.control[?(@.step == 'EXTRACTING_DATA')].

Aplicando essa condição, o fluxo de integração segue esse caminho se o parâmetro step assumir valor EXTRACTING_DATA.

3. Adicione um componente que realiza iterações através da sua fonte de dados, como um componente For Each ou Stream DB V3

No nosso exemplo, utilizamos o Stream DB V3. Configure a conta, URL do banco de dados e nome da coluna de acordo com a fonte de dados e use o seguinte comando SQL:

Esse comando recupera um número de registros igual ao parâmetro limit, começando pelo registro cujo índice é igual ao parâmetro start.

No subpipeline onProcess desse componente, adicione um componente Event Publisher, configure o nome do evento para o evento ao qual o seu pipeline de processamento de dados é inscrito e configure a propriedade Body para {{ message.$ }} para enviar todo o payload.

Depois, gere uma mensagem de sucesso usando um componente JSON Generator. Configure o parâmetro JSON desse componente para:

Subpipeline onProcess do Stream DB V3

No subpipeline onException desse componente, adicione um componente Log seguido de um Event Publisher. Este componente publica eventos de erro aos quais o seu pipeline de tratamento de erros está inscrito. Se você quiser aprender mais sobre pipelines de tratamento de erro, leia nossa documentação sobre arquitetura orientada a eventos.

Finalmente, adicione um componente Throw Error.

Subpipeline onException do Stream DB V3

De volta ao caminho EXTRACTING_DATA,

4. Crie um resumo da execução usando um JSON Generator.

O componente Stream DB ou For Each usado no passo 3 envia como output um resumo da execução. Salve este resumo em uma propriedade JSON chamada summary configurando o parâmetro JSON desse componente como:

5. Salve a propriedade summary usando um componente Session Management

6. Recupere a propriedade control usando um componente Session Management

7. Verifique se o processo de migração acabou usando um componente Choice

Quando o processo de migração acaba, os parâmetros de paginação são atualizados e um output informando o fim do processo é enviado no formato JSON. Para construir esse fluxo, siga esses passos:

1. Adicione um componente Log.

2. Configure a condição Choice desse caminho como $.[?(@.summary.total < @.control.limit)].

Usando essa condição, o fluxo segue esse caminho se o número de registros recuperados for menor que o parâmetro limit. Isso é uma indicação de que não há mais registros a serem recuperados neste momento.

3. Atualize os parâmetros de paginação utilizando um componente Object Store

Use o mesmo nome e ID dos Object Stores usados anteriormente, ative as opções Unique Index e Upsert e configure o parâmetro Document como:

Este código atribui valor FINISHED para o parâmetro step. Isso significa que na próxima execução desse pipeline, o código seguirá o caminho FINISHED.

Note que o parâmetro start é configurado para o valor 0 porque essa é uma migração total do banco de dados. Para que a migração continuasse de onde parou no dia anterior, deveríamos alterar o valor de start para ser igual ao valor de end nessa iteração.

4. Recupere a propriedade control usando um componente Session Management.

5. Gere um output usando um componente JSON Generator.

Configure o parâmetro JSON como:

Agora, retorne ao componente Choice que verifica se o processo de migração acabou ou não. Se o processo não acabou, o fluxo atualiza os parâmetros de paginação e envia uma mensagem informando que o processo continuará.

Para construir esse fluxo, siga esses passos:

1. Adicione um componente Log

2. Configure a condição Choice desse caminho como otherwise

3. Atualize os parâmetros de paginação usando um componente Object Store

Use o mesmo nome e ID dos Object Stores usados anteriormente, ative as opções Unique Index e Upsert e configure o parâmetro Document como:

4. Recupere os parâmetros de paginação usando um componente Object Store.

Use o mesmo nome e ID dos Object Stores usados anteriormente e defina a operação como Find by Object Store ID.

5. Crie um output usando um componente JSON Generator

Configure o parâmetro JSON desse componente como:

Isto foi útil?