EN-US
Ask or search…
K
Links

Pagination tutorial - part 3

Digibee provides a complete tutorial on how to set up Paginations on the Digibee iPaaS. Check out the third page of our tutorial here.
The EXTRACTING_DATA path is where data is retrieved from its source and sent in batches for processing.
In this path, the flow sends each record to a data processing pipeline, creates an execution summary and updates the pagination parameters depending on which part of the migration process it is.
If you want to know more about data processing pipelines, read our article on event-driven architecture.
The EXTRACTING_DATA path
1. Add a Choice component
2. Set the Choice condition to $.control[?(@.step == 'EXTRACTING_DATA')]
By applying this condition, the integration flow follows this path if the step parameter is set to the value EXTRACTING_DATA.
3. Add a component that iterates through your data source, such as a For Each or Stream DB V3
In our example, we use a Stream DB V3. Set the account, database URL and column name according to the data source and use the following SQL statement:
select * from enb_person limit {{ message.control.start }}, {{ message.control.limit }}
This statement retrieves a number of records equal to the limit parameter, starting with the record whose index is equal to the start parameter.
In the onProcess subpipeline of this component, add an Event Publisher component, set the event name to the event your data processing pipeline subscribes to and set the Body property to {{ message.$ }} to send the entire payload.
Next, create a success message using a JSON Generator component. Set the JSON parameter of this component to:
{
"success": true
}
Stream DB V3 onProcess subpipeline
In the onException subpipeline of this component, add a Log component followed by an Event Publisher. This component publishes error events that your error treatment pipeline subscribes to. If you want to learn more about error treatment pipelines, read our documentation on event-driven architecture.
Finally, add a Throw Error component.
Stream DB V3 onException subpipeline
Returning to the EXTRACTING_DATA path:
4. Create an execution summary using a JSON Generator
The Stream DB or For Each component we used in step 3 outputs an execution summary. Save it as JSON property called summary by setting the JSON parameter to:
{
"summary": {{ message.$ }}
}
5. Save the summary property using a Session Management component
6. Retrieve the control property using a Session Management component
7. Check if the migration process is over using a Choice component
When the process finishes, the pagination parameters are updated and a JSON output is sent informing the end. To build this flow, follow these steps:
1. Add a Log component
2. Set the Choice condition of this flow to $.[?(@.summary.total < @.control.limit)]
By using this condition, the flow follows this path if the number of records retrieved is lower than the limit parameter. This is an indication that there are no more data records to be retrieved at that time.
3. Update pagination parameters using an Object Store component
Use the same object store name and ID as the OS components used before, activate the Unique index and Upsert options and set the Document parameter to:
{
$set:{
"start": 0,
"end": null,
"step": "FINISHED",
"nextExecutionTimestamp": {{ FORMATDATE(FORMATDATE(SUMDATE( NOW() , "DAY", 1), "timestamp", "dd/MM/yyyy 01:00:00"), "dd/MM/yyyy HH:mm:ss", "timestamp") }}
}
}
This code assigns the value FINISHED to the step component. That means in the next execution of this pipeline, the flow will follow the FINISHED path.
Note that the start parameter is set to zero because this is a total migration of a database. If we wanted the migration to continue where it stopped the previous day, we would have set the start parameter equal to the end value of this iteration.
4. Retrieve the control property using a Session Management component
5. Create an output using a JSON Generator component
Set the JSON parameter to:
{
"message": {{ CONCAT("Migration will begin on ", FORMATDATE( message.control.nextExecutionTimestamp, "timestamp", "dd/MM/yyyy HH:mm:ss")) }}
}
Now, return to the Choice component which checks whether the migration process is over or not. If the process is not over, the flow updates the pagination parameters and sends a message informing that the process will go on.
To build this flow, follow these steps:
1. Add a Log component
2. Set the Choice condition to this path to otherwise
3. Update pagination parameters using an Object Store component
Use the same object store name and ID as the OS components used before, activate the Unique index and Upsert options and set the Document parameter to:
{
$set:{
"start": {{ message.control.end }},
"end": {{ TOINT(SUM(message.control.end, message.control.limit)) }},
"step": "EXTRACTING_DATA"
}
}
4. Get pagination parameters using an Object Store component
Use the same object store name and ID as the OS components used before and set the operation to Find by Object Store ID.
5. Set an output using a JSON Generator component
Set the JSON parameter of this component to:
{
"message": "The data is being migrated",
"next_execution": {{ message.data[0] }}
}
Last modified 7mo ago