Tutorial de paginação - parte 3
O caminho EXTRACTING_DATA é onde os dados são recuperados de sua fonte e enviados em blocos para serem processados.
Nesse caminho, o fluxo envia cada registro para um pipeline de processamento de dados, cria um resumo da execução e atualiza os parâmetros da paginação a depender de em qual parte da migração o processo se encontra.
Se você quiser aprender mais sobre pipelines de processamento de dados, leia nossa documentação sobre arquitetura orientada a eventos.
Para construir o caminho EXTRACTING_DATA, siga esses passos:
1. Adicione um componente Log.
2. Configure a condição Choice desse caminho como $.control[?(@.step == 'EXTRACTING_DATA')]
.
Aplicando essa condição, o fluxo de integração segue esse caminho se o parâmetro step assumir valor EXTRACTING_DATA
.
3. Adicione um componente que realiza iterações através da sua fonte de dados, como um componente For Each ou Stream DB V3
No nosso exemplo, utilizamos o Stream DB V3. Configure a conta, URL do banco de dados e nome da coluna de acordo com a fonte de dados e use o seguinte comando SQL:
Esse comando recupera um número de registros igual ao parâmetro limit, começando pelo registro cujo índice é igual ao parâmetro start.
No subpipeline onProcess desse componente, adicione um componente Event Publisher, configure o nome do evento para o evento ao qual o seu pipeline de processamento de dados é inscrito e configure a propriedade Body para {{ message.$ }}
para enviar todo o payload.
Depois, gere uma mensagem de sucesso usando um componente JSON Generator. Configure o parâmetro JSON desse componente para:
No subpipeline onException desse componente, adicione um componente Log seguido de um Event Publisher. Este componente publica eventos de erro aos quais o seu pipeline de tratamento de erros está inscrito. Se você quiser aprender mais sobre pipelines de tratamento de erro, leia nossa documentação sobre arquitetura orientada a eventos.
Finalmente, adicione um componente Throw Error.
De volta ao caminho EXTRACTING_DATA,
4. Crie um resumo da execução usando um JSON Generator.
O componente Stream DB ou For Each usado no passo 3 envia como output um resumo da execução. Salve este resumo em uma propriedade JSON chamada summary configurando o parâmetro JSON desse componente como:
5. Salve a propriedade summary
usando um componente Session Management
6. Recupere a propriedade control
usando um componente Session Management
7. Verifique se o processo de migração acabou usando um componente Choice
Quando o processo de migração acaba, os parâmetros de paginação são atualizados e um output informando o fim do processo é enviado no formato JSON. Para construir esse fluxo, siga esses passos:
1. Adicione um componente Log.
2. Configure a condição Choice desse caminho como $.[?(@.summary.total < @.control.limit)]
.
Usando essa condição, o fluxo segue esse caminho se o número de registros recuperados for menor que o parâmetro limit. Isso é uma indicação de que não há mais registros a serem recuperados neste momento.
3. Atualize os parâmetros de paginação utilizando um componente Object Store
Use o mesmo nome e ID dos Object Stores usados anteriormente, ative as opções Unique Index e Upsert e configure o parâmetro Document como:
Este código atribui valor FINISHED
para o parâmetro step. Isso significa que na próxima execução desse pipeline, o código seguirá o caminho FINISHED.
Note que o parâmetro start
é configurado para o valor 0
porque essa é uma migração total do banco de dados. Para que a migração continuasse de onde parou no dia anterior, deveríamos alterar o valor de start para ser igual ao valor de end
nessa iteração.
4. Recupere a propriedade control
usando um componente Session Management.
5. Gere um output usando um componente JSON Generator.
Configure o parâmetro JSON como:
Agora, retorne ao componente Choice que verifica se o processo de migração acabou ou não. Se o processo não acabou, o fluxo atualiza os parâmetros de paginação e envia uma mensagem informando que o processo continuará.
Para construir esse fluxo, siga esses passos:
1. Adicione um componente Log
2. Configure a condição Choice desse caminho como otherwise
3. Atualize os parâmetros de paginação usando um componente Object Store
Use o mesmo nome e ID dos Object Stores usados anteriormente, ative as opções Unique Index e Upsert e configure o parâmetro Document como:
4. Recupere os parâmetros de paginação usando um componente Object Store.
Use o mesmo nome e ID dos Object Stores usados anteriormente e defina a operação como Find by Object Store ID.
5. Crie um output usando um componente JSON Generator
Configure o parâmetro JSON desse componente como:
Atualizado