Highly scalable ETL model for Digibee

Learn how to design a scalable ETL process using event-based pipelines.

Although an iPaaS platform like Digibee is a cloud-based software that helps connect applications across different systems and organizations in real time, the need to implement ETL (Extract, Transform, and Load) processes is inevitable. Despite varying capabilities, it is possible to implement this type of solution with high efficiency.

This document describes an ETL model used for the extraction, transformation (field mapping), and delivery of large volumes of records from a relational database to a target system using the Digibee Integration Platform.

The objective is to extract the data within a predefined time window by scaling events according to demand.

Scenario

Consider a scenario where a high volume of data must be extracted from a relational database containing different tables, each representing a distinct business flow — in other words, several integrations. Altogether, these tables can represent about 20 million records per day.

Each table requires field mapping, and the data must be sent in batches to the target, which can be either an API or a File server.

Also consider that there is no restricted time window for data delivery at the destination. However, since extraction takes place from a critical-use database, it must be performed during off-peak hours (overnight). Therefore, it is important that the architecture is highly scalable and allows for parameterization by flow.

Architecture

The implementation of this model follows an event-based architecture, consisting of:

  • Two initial pipelines for the Extract phase

  • Two pipelines that together handle the Transform and Load phases

The architecture outlined in this article assumes a simple transformation, such as field mapping from source to target with a few basic rules.

However, if the scenario requires complex data transformations that depend on extensive internal processing, it may be necessary to evolve the architecture by introducing a dedicated event pipeline solely for transformation before the data is sent, as shown in the model below:

With the Digibee Integration Platform, evolving this model in a decoupled way is simple, offering great scalability.

Key components

Extract

Pagination pipeline for extraction

A generic Scheduler-type pipeline initiates the data extraction process. This pipeline acts as a trigger for integrations. Its sole responsibility is to generate page metadata for each flow and publish the extraction event.

The page metadata is a JSON structure that specifies:

  • The integration name

  • Page ID

  • Start record

  • End record

Based on a list of configured integrations and their respective parameters, it identifies:

  • Flow name

  • Source table

  • Number of records to extract per page/event

  • Extraction event to be published

  • Start time for extraction

  • List of dependencies (if any) on other integrations

Page extraction pipeline

A specific pipeline for each integration receives the page metadata upon publication, performs the extraction, and stores the metadata in a temporary Object Store (flow-specific).

As an event-driven process, this pipeline can be scaled based on the flow’s needs. Scaling can be in terms of size (Small, Medium, or Large) to support the extracted page, or in terms of replicas to handle multiple pages simultaneously. For instance, increasing replicas may be a good strategy to reduce extraction time.

With the Consumption Based model, horizontal scaling (replicas) can be configured with Autoscaling. You only need to define the minimum and maximum number of replicas, and the Platform will adjust automatically.

Transform and Load

Page dispatcher pipeline

This Scheduler-type pipeline is responsible for checking if a flow is ready for sending and publishing the page ID of each flow to the delivery pipeline.

While extractions are ongoing, it monitors the already extracted pages from each integration, their dependencies, and publishes the page ID to the final pipeline (for delivery), which will collect the data and send it to the target.

Page delivery pipeline

This Event-type pipeline is responsible for retrieving each page from the temporary Object Store, performing the transformation, and delivering it to the target. Like the extraction pipeline, this delivery pipeline should be specific to each integration.

Other important components

In addition to the pipeline structure mentioned above, the following components are important for controlling and operating the ETL model:

  • List of integrations (Flows): Can be a Global variable configured with a JSON or a table in the client's database. For this article, we used a Global as shown in the example below:

{
    "integrations": [
      {
        "pageSize": "2000",
        "nameIntegration": "Customer",
        "query": "SELECT CUSTOMER_ID, NAME ,CNPJ  FROM CUSTOMER WHERE CNPJ IS NOT NULL",
        "eventExtract": "evt-extract-customer",
        "eventSend": "evt-load-customer",
        "tableExtract": "CUSTOMER",
        "dependencyList": [
          
        ]
      },
      {
        "pageSize": "2000",
        "nameIntegration": "Product",
        "query": "SELECT PRODUCT_ID, NAME, GROUP  FROM PRODUCT WHERE GROUP IN(10,30,40)",
        "eventExtract": "evt-extract-product",
        "eventSend": "evt-load-product",
        "tableExtract": "PRODUCT",
        "dependencyList": [
          
        ]
      },
      {
        "pageSize": "5000",
        "nameIntegration": "Order",
        "query": "SELECT ORDER_ID,  CUSTOMER_ID, ORDER_VALUE, ORDER_DATE FROM ORDER WHERE ORDER_DATE = SYSDATE",
        "eventExtract": "evt-extract-order",
        "eventSend": "evt-load-order",
        "tableExtract": "ORDER",
        "dependencyList": [
          "Customer",
          "Product"
        ]
      }
    ]
  }

To register as global, you need to add the special character “Escape”, as below:

{\"integrations\":[{\"pageSize\":\"2000\",\"nameIntegration\":\"Customer\",\"query\":\"SELECT CUSTOMER_ID, NAME ,CNPJ  FROM CUSTOMER WHERE CNPJ IS NOT NULL\",\"eventExtract\":\"evt-extract-customer\",\"eventSend\":\"evt-load-customer\",\"tableExtract\":\"CUSTOMER\",\"dependencyList\":[]},{\"pageSize\":\"2000\",\"nameIntegration\":\"Product\",\"query\":\"SELECT PRODUCT_ID, NAME, GROUP  FROM PRODUCT WHERE GROUP IN(10,30,40)\",\"eventExtract\":\"evt-extract-product\",\"eventSend\":\"evt-load-product\",\"tableExtract\":\"PRODUCT\",\"dependencyList\":[]},{\"pageSize\":\"5000\",\"nameIntegration\":\"Order\",\"query\":\"SELECT ORDER_ID,  CUSTOMER_ID, ORDER_VALUE, ORDER_DATE FROM ORDER WHERE ORDER_DATE = SYSDATE\",\"eventExtract\":\"evt-extract-order\",\"eventSend\":\"evt-load-order\",\"tableExtract\":\"ORDER\",\"dependencyList\":[\"Customer\",\"Product\"]}]}
  • Extraction page control: An Object Store is required to store each “page metadata” for extraction.

  • Temporary Object Store: A dedicated Object Store per flow where extracted pages are stored before transformation and delivery.

  • Common mechanisms: Existing reusable mechanisms such as reprocessing or notifications can support the ETL process.

  • Integration control: Although not shown in the diagram, it is highly recommended to maintain an Object Store for integration control to track the daily status of each integration in the List of integrations.

Pipeline flow specs

Flow Spec: sch-generate-pages
{
  "meta": {
    "36bcada5-b65e-450f-92b3-b2486d0a1520": {
      "position": {
        "x": 200,
        "y": 0
      }
    },
    "15ae8ee4-49ad-4b46-87f3-946fd38dcf6b": {
      "position": {
        "x": 400,
        "y": 0
      }
    },
    "d4e248bf-8348-449f-9717-cf25c6f773e6": {
      "position": {
        "x": 600,
        "y": 0
      }
    },
    "e146bdb0-0773-49b4-a53a-022c7ce3d5f7": {
      "position": {
        "x": 800,
        "y": 0
      }
    },
    "85d997d9-66e4-48c7-8500-885b0e32b4c7": {
      "position": {
        "x": 0,
        "y": 150
      }
    }
  },
  "flowSpec": {
    "disconnected-root:71c6e6e6-d1b5-4ce4-8eb5-f14158f3ab45": [
      {
        "type": "connector",
        "name": "object-store-connector",
        "accountLabel": "dgb-internal-object-store-account",
        "stepName": "delete pages-for-extraction 3 days +",
        "params": {
          "operation": "FIND_BY_QUERY",
          "objectStore": "pages-for-extraction",
          "query": "{\r\n    \"date\": { $lt: {{ FORMATDATE(SUMDATE(NOW(), \"DAY\",-3),\"timestamp\",\"dd-MM-yyyy\") }}}\r\n}",
          "limit": "0",
          "skip": "0",
          "sort": "",
          "unique": true,
          "isolated": false,
          "failOnError": false
        },
        "id": "36bcada5-b65e-450f-92b3-b2486d0a1520"
      },
      {
        "name": "json-generator-connector",
        "type": "connector",
        "stepName": "get global integration",
        "params": {
          "json": "{\n    \"integrations\": [\n      {\n        \"pageSize\": \"2000\",\n        \"nameIntegration\": \"Customer\",\n        \"query\": \"SELECT id, first_name, last_name, gender, ip_address, email  FROM pagination_course WHERE first_name IS NOT NULL\",\n        \"eventExtract\": \"evt-extract-integration\",\n        \"eventSend\": \"evt-load-integration\",\n        \"tableExtract\": \"pagination_course\",\n        \"databaseExtract\": \"digibee_database\",\n        \"dependencyList\": [\n          \n        ]\n      },\n      {\n        \"pageSize\": \"2000\",\n        \"nameIntegration\": \"Product\",\n        \"query\": \"SELECT PRODUCT_ID, NAME, GROUP  FROM PRODUCT WHERE GROUP IN(10,30,40)\",\n        \"eventExtract\": \"evt-extract-product\",\n        \"eventSend\": \"evt-load-product\",\n        \"tableExtract\": \"PRODUCT\",\n        \"databaseExtract\": \"digibee_database\",\n        \"dependencyList\": [\n          \n        ]\n      },\n      {\n        \"pageSize\": \"5000\",\n        \"nameIntegration\": \"Order\",\n        \"query\": \"SELECT ORDER_ID,  CUSTOMER_ID, ORDER_VALUE, ORDER_DATE FROM ORDER WHERE ORDER_DATE = SYSDATE\",\n        \"eventExtract\": \"evt-extract-order\",\n        \"eventSend\": \"evt-load-order\",\n        \"tableExtract\": \"ORDER\",\n        \"databaseExtract\": \"digibee_database\",\n        \"dependencyList\": [\n          \"Customer\",\n          \"Product\"\n        ]\n      }\n    ]\n  }\n",
          "failOnError": false
        },
        "id": "15ae8ee4-49ad-4b46-87f3-946fd38dcf6b"
      },
      {
        "type": "connector",
        "name": "for-each-connector",
        "stepName": "For Each integration-list original",
        "params": {
          "jsonPath": "$.integrations[0]",
          "itemIdentifier": "",
          "parallel": false,
          "failOnError": false,
          "onProcess": "d4e248bf-8348-449f-9717-cf25c6f773e6-onProcessTrack",
          "onException": "d4e248bf-8348-449f-9717-cf25c6f773e6-onExceptionTrack"
        },
        "id": "d4e248bf-8348-449f-9717-cf25c6f773e6"
      },
      {
        "type": "session-management",
        "stepName": "get  resultList",
        "operation": "GET",
        "sessionType": "LOCAL",
        "scoped": false,
        "fields": [
          "resultList"
        ],
        "id": "e146bdb0-0773-49b4-a53a-022c7ce3d5f7"
      }
    ],
    "d4e248bf-8348-449f-9717-cf25c6f773e6-onProcessTrack": [
      {
        "name": "json-generator-connector",
        "type": "connector",
        "stepName": "generate temp object",
        "params": {
          "json": "{\n    \"temp\": {{ message.$ }}\n}",
          "failOnError": false
        },
        "id": "c9a5fd1c-ae74-46e5-99f9-a5db5a2f5fe9"
      },
      {
        "type": "session-management",
        "stepName": "put temp",
        "operation": "PUT",
        "sessionType": "LOCAL",
        "scoped": true,
        "fields": [
          "temp"
        ],
        "id": "98537954-2363-4631-b6c5-dd4b72cba7d6"
      },
      {
        "type": "connector",
        "name": "db-connector-v2",
        "accountLabel": "mysql-2",
        "stepName": "Count Pagination",
        "params": {
          "url": "jdbc:mysql://35.223.175.97/db-training",
          "sql": "SELECT COUNT(*) AS TOTAL_ROWS  FROM {{ message.temp.tableExtract }}",
          "failOnError": false,
          "keepConnection": false,
          "useDynamicAccount": false,
          "accountType": "basic",
          "operation": "QUERY",
          "batch": false,
          "blobAsFile": false,
          "clobAsFile": false,
          "typeProperties": "[]",
          "connectionProperties": "{\n    \"cancelQueryTimeout\": 5,\n    \"queryTimeout\": 15,\n    \"lockTimeout\": 15000,\n    \"socketTimeout\": 15000\n}",
          "dbPoolByActualConsumers": false,
          "exclusiveDbPool": false,
          "customDbPool": false,
          "columnFromLabel": false,
          "connectionTestQuery": "",
          "rawSql": true,
          "dbPoolMaxLifeTime": "1800000",
          "dbPoolMinimumIdle": "10",
          "dbPoolIdleTimeout": "600000"
        },
        "id": "e8610eab-dcc5-4211-8703-31efe0e8620d",
        "dynamicAccountNames": {},
        "accountLabels": {}
      },
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Log - Count pagination",
        "params": {
          "logLevel": "INFO",
          "message": "Count pagination {{ message.$ }}"
        },
        "id": "7a1b45ff-914b-4c71-b506-a47af8302fd0"
      },
      {
        "type": "session-management",
        "stepName": "get temp",
        "operation": "GET",
        "sessionType": "LOCAL",
        "scoped": true,
        "fields": [
          "temp"
        ],
        "id": "3aad7d94-daa2-401a-a7f7-846300fdc55d"
      },
      {
        "name": "json-generator-connector",
        "type": "connector",
        "stepName": "calc pages",
        "params": {
          "json": "{\n    \"totalPages\": {{ TOINT(SUM(TOINT(DIVIDE(TOINT(message.data[0].TOTAL_ROWS),TOINT(message.temp.pageSize))),1)) }},\n    \"totalRecords\": {{ message.data[0].TOTAL_ROWS }}\n}",
          "failOnError": false
        },
        "id": "1ef5cb1c-fc0e-4f15-a6ff-9340dbd2776a"
      },
      {
        "type": "session-management",
        "stepName": "put totalPages/totalRecords",
        "operation": "PUT",
        "sessionType": "LOCAL",
        "scoped": true,
        "fields": [
          "totalPages",
          "totalRecords"
        ],
        "id": "dee7d7c7-e87e-4e10-a4e5-c187772ac320"
      },
      {
        "type": "connector",
        "name": "do-while-connector",
        "stepName": "Send pagination",
        "params": {
          "iteration": 999999,
          "timeout": 660000,
          "showLoopIndex": true,
          "interruptLoopOnError": true,
          "onProcess": "ac3b3612-5c7f-4fde-a20f-6d0845849e69-onProcessTrack",
          "onException": "ac3b3612-5c7f-4fde-a20f-6d0845849e69-onExceptionTrack"
        },
        "id": "ac3b3612-5c7f-4fde-a20f-6d0845849e69"
      },
      {
        "type": "session-management",
        "stepName": "delete All",
        "operation": "DELETE",
        "sessionType": "LOCAL",
        "scoped": true,
        "fields": [
          "totalPages",
          "totalRecords"
        ],
        "id": "014dc2f2-041a-478f-9b65-071efce24ae0"
      },
      {
        "type": "session-management",
        "stepName": "get temp",
        "operation": "GET",
        "sessionType": "LOCAL",
        "scoped": true,
        "fields": [
          "temp"
        ],
        "id": "5e65c5e4-e9f5-47b2-984f-990118878e7f"
      },
      {
        "type": "connector",
        "name": "object-store-connector",
        "accountLabel": "dgb-internal-object-store-account",
        "stepName": "Insert integration-control",
        "params": {
          "operation": "INSERT",
          "objectStore": "integration-control",
          "objectId": "{{ UUID() }}",
          "document": "{\n    \"integrationName\": {{ message.temp.nameIntegration }},\n    \"manualMode\": \"disable\",\n    \"integrationDate\": {{ FORMATDATE(NOW(), \"timestamp\", \"dd/MM/yyyy\") }},\n    \"step\": \"pagination\",\n    \"status\": \"done\"\n}",
          "unique": true,
          "isolated": false,
          "upsert": false,
          "failOnError": false
        },
        "id": "c9798498-349d-41fd-9137-4616c023002e"
      },
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Log - Insert integration-control",
        "params": {
          "logLevel": "INFO",
          "message": "Insert integration-control {{ message.$ }}"
        },
        "id": "45651031-a34a-4ce8-9be6-9d7a9c1117b9"
      },
      {
        "type": "session-management",
        "stepName": "delete temp",
        "operation": "GET",
        "sessionType": "LOCAL",
        "scoped": true,
        "fields": [
          "temp"
        ],
        "id": "97a6f0fe-0e64-4d2a-b4ac-4159f6687397"
      },
      {
        "name": "json-generator-connector",
        "type": "connector",
        "stepName": "set success",
        "params": {
          "json": "{\n    \"success\": true\n}",
          "failOnError": false
        },
        "id": "68a9c63f-baa9-42c8-a9d4-ee6d07a441eb"
      }
    ],
    "ac3b3612-5c7f-4fde-a20f-6d0845849e69-onProcessTrack": [
      {
        "type": "session-management",
        "stepName": "Get All",
        "operation": "GET",
        "sessionType": "LOCAL",
        "scoped": true,
        "fields": [
          "end",
          "start",
          "totalPages",
          "totalRecords",
          "temp"
        ],
        "id": "9f15ccb7-b9bd-4f76-919b-aa152eb6f718"
      },
      {
        "name": "json-generator-connector",
        "type": "connector",
        "stepName": "generate loopIndex",
        "params": {
          "json": "\n{\n    \"loopIndex\": {{ message.loopIndex }},\n    \"totalPages\": {{ message.totalPages }},\n    \"start\": {{ TOINT(IF(EQUALTO(message.loopIndex,1),0,message.end)) }},\n    \"end\": {{ TOINT(IF(EQUALTO(message.loopIndex,1),TOINT(message.temp.pageSize),SUM(message.end,TOINT(message.temp.pageSize)))) }},\n    \"id\": {{ UUID() }}\n}",
          "failOnError": false
        },
        "id": "79dc1543-94f2-48c9-af31-da5e8afdd6eb"
      },
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Log - generate loopIndex",
        "params": {
          "logLevel": "INFO",
          "message": "generate loopIndex {{ message.$ }}"
        },
        "id": "741e5db0-f17a-4520-9bf5-3523e23b1b11"
      },
      {
        "type": "session-management",
        "stepName": "put end/start/loopIndex",
        "operation": "PUT",
        "sessionType": "LOCAL",
        "scoped": true,
        "fields": [
          "end",
          "start",
          "loopIndex",
          "id",
          "totalPages"
        ],
        "id": "21315db7-6125-4eb7-9026-5d8581c84765"
      },
      {
        "type": "session-management",
        "stepName": "get temp",
        "operation": "GET",
        "sessionType": "LOCAL",
        "scoped": true,
        "fields": [
          "temp",
          "loopIndex",
          "end",
          "start",
          "totalPages",
          "id"
        ],
        "id": "6831f1a0-1d46-4a8b-8ec0-d83b5d94a637"
      },
      {
        "type": "connector",
        "name": "object-store-connector",
        "accountLabel": "dgb-internal-object-store-account",
        "stepName": "pages-for-extraction",
        "params": {
          "operation": "INSERT",
          "objectStore": "pages-for-extraction",
          "objectId": "{{ message.id }}",
          "document": "{\n    \"page\": {{ message.loopIndex }},\n    \"size\": {{ message.temp.pageSize }},\n    \"query\": {{ message.temp.query }},\n    \"table\": {{ message.temp.tableExtract }},\n    \"databaseExtract\": {{ message.temp.databaseExtract }},\n    \"publisher\":{{ metadata.pipeline.name }},\n    \"integration\":{{ message.temp.nameIntegration }},\n    \"dependencyList\": {{ message.temp.dependencyList }},\n    \"start\": {{ message.start }},\n    \"end\": {{ message.end }},\n    \"event\": {{ message.temp.eventSend }},\n    \"finished\": {{ IF(EQUALTO(message.loopIndex,message.totalPages),true, false)  }} \n}",
          "unique": true,
          "isolated": false,
          "failOnError": false
        },
        "id": "9675ab2c-f739-4409-be57-9af0ee7b6b35"
      },
      {
        "type": "session-management",
        "stepName": "get temp",
        "operation": "GET",
        "sessionType": "LOCAL",
        "scoped": true,
        "fields": [
          "temp",
          "loopIndex",
          "end",
          "start",
          "totalPages",
          "id"
        ],
        "id": "d2895a35-5bd0-47ca-aaca-0cfe1066e2cf"
      },
      {
        "type": "connector",
        "stepName": "EVT",
        "name": "event-publisher-connector",
        "params": {
          "eventName": "{{ message.temp.eventExtract }}",
          "body": "{\n    \"page\": {{ message.loopIndex }},\n    \"size\": {{ message.temp.pageSize }},\n    \"select\": {{ message.temp.query }},\n    \"table\": {{ message.temp.tableExtract }},\n    \"publisher\":{{ metadata.pipeline.name }},\n    \"integration\":{{ message.temp.nameIntegration }},\n    \"start\": {{ message.start }},\n    \"end\": {{ message.end }},\n    \"_oId\": {{ message.id }},\n    \"finished\": {{ IF(EQUALTO(message.totalPages, message.loopIndex), true, false) }}\n}",
          "showSendEventLog": false,
          "stopOnError": false
        },
        "id": "c269cda9-bd7b-45ec-986d-b236cc7c12cc"
      },
      {
        "name": "json-generator-connector",
        "type": "connector",
        "stepName": "loopBreak",
        "params": {
          "json": "{\n    \"loopBreak\": {{ IF(EQUALTO(message.loopIndex,message.totalPages),true, false)  }} \n}",
          "failOnError": false
        },
        "id": "b81bcce6-d08e-4217-8800-92792b5292eb"
      },
      {
        "type": "session-management",
        "stepName": "delete id",
        "operation": "GET",
        "sessionType": "LOCAL",
        "scoped": true,
        "fields": [
          "id"
        ],
        "id": "d90853d7-fae1-4eec-9ff6-aa6ebfeed627"
      }
    ],
    "ac3b3612-5c7f-4fde-a20f-6d0845849e69-onExceptionTrack": [
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Log - Send pagination",
        "params": {
          "logLevel": "INFO",
          "message": "Send pagination {{ message.$ }}"
        },
        "id": "8e12529f-63ad-44b2-a4c6-3557236e6a09"
      },
      {
        "type": "connector",
        "name": "throw-error-connector",
        "stepName": "Throw Error",
        "params": {
          "errorMessage": "Error occurred.",
          "errorCode": 500,
          "customErrorEnabled": false
        },
        "id": "8decd80e-a35e-4bd3-b5f1-c550a32737f2"
      }
    ],
    "d4e248bf-8348-449f-9717-cf25c6f773e6-onExceptionTrack": [
      {
        "type": "session-management",
        "stepName": "get temp",
        "operation": "GET",
        "sessionType": "LOCAL",
        "scoped": true,
        "fields": [
          "temp"
        ],
        "id": "34096419-f972-4e0d-910d-9a915cafd2b5"
      },
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Log - For Each integration-list (onException)",
        "params": {
          "logLevel": "ERROR",
          "message": "For Each (onException) | Integracao: {{ message.temp.nameIntegration }} | Erro: {{ message.error }} |"
        },
        "id": "f6123d53-01fc-49ca-a7fb-53d0c8c5a8c6"
      },
      {
        "type": "capsule",
        "name": "capsule-v1-digibee-digibee-tools-add-item-to-session-array-1.0",
        "capsuleCollection": "digibee-tools",
        "capsuleCollectionVersion": 1,
        "capsule": "add-item-to-session-array-1.0",
        "capsuleVersionMajor": 1,
        "capsuleVersionMinor": 0,
        "stepName": "Add item to session (array)",
        "params": {
          "obj": "{{ message.$ }}"
        },
        "id": "82076e9b-9d8f-4aea-b665-27f922e758d5"
      }
    ],
    "disconnected-root:91352591-cf0c-44ba-b7b9-7d2611a41ef1": [
      {
        "name": "json-generator-connector",
        "type": "connector",
        "stepName": "LEIA-ME",
        "params": {
          "json": "Conteudo que deve ser salvo na global é em formato de string, contendo os escapes:\n\"{\\\"integrations\\\":[{\\\"pageSize\\\":\\\"2000\\\",\\\"nameIntegration\\\":\\\"Customer\\\",\\\"query\\\":\\\"SELECT CUSTOMER_ID, NAME ,CNPJ  FROM CUSTOMER WHERE CNPJ IS NOT NULL\\\",\\\"eventExtract\\\":\\\"evt-extract-customer\\\",\\\"eventSend\\\":\\\"evt-load-customer\\\",\\\"tableExtract\\\":\\\"CUSTOMER\\\",\\\"dependencyList\\\":[]},{\\\"pageSize\\\":\\\"2000\\\",\\\"nameIntegration\\\":\\\"Product\\\",\\\"query\\\":\\\"SELECT PRODUCT_ID, NAME, GROUP  FROM PRODUCT WHERE GROUP IN(10,30,40)\\\",\\\"eventExtract\\\":\\\"evt-extract-product\\\",\\\"eventSend\\\":\\\"evt-load-product\\\",\\\"tableExtract\\\":\\\"PRODUCT\\\",\\\"dependencyList\\\":[]},{\\\"pageSize\\\":\\\"5000\\\",\\\"nameIntegration\\\":\\\"Order\\\",\\\"query\\\":\\\"SELECT ORDER_ID,  CUSTOMER_ID, ORDER_VALUE, ORDER_DATE FROM ORDER WHERE ORDER_DATE = SYSDATE\\\",\\\"eventExtract\\\":\\\"evt-extract-order\\\",\\\"eventSend\\\":\\\"evt-load-order\\\",\\\"tableExtract\\\":\\\"ORDER\\\",\\\"dependencyList\\\":[\\\"Customer\\\",\\\"Product\\\"]}]}\"\n",
          "failOnError": false
        },
        "id": "85d997d9-66e4-48c7-8500-885b0e32b4c7"
      }
    ]
  }
}


Flow Spec: evt-extract-integration
{
  "meta": {
    "9f698689-3164-42c9-b44a-68bbb10eb63e": {
      "position": {
        "x": 200,
        "y": 150
      }
    },
    "c4af166e-523e-46f6-baa7-b4a7712a1b88": {
      "position": {
        "x": 400,
        "y": 150
      }
    },
    "7134a571-6561-49ed-b9bc-88b91d85c984": {
      "position": {
        "x": 600,
        "y": 150
      }
    },
    "44833968-110a-44d6-915c-39ec0a120333": {
      "position": {
        "x": 800,
        "y": 150
      }
    },
    "57a450bb-deb1-4c69-842f-3a29d76e037f": {
      "position": {
        "x": 1250,
        "y": 0
      }
    },
    "72c74518-0672-4da3-9b49-3aee578449cd": {
      "position": {
        "x": 1450,
        "y": 0
      }
    },
    "b760ea98-4c81-4264-8858-b092debd60c8": {
      "position": {
        "x": 1650,
        "y": 0
      }
    },
    "a867448d-4b5b-4a0d-a0bc-1e783811f5be": {
      "position": {
        "x": 1250,
        "y": 150
      }
    },
    "8f34c2c4-4336-45ec-84db-2e20073b8562": {
      "position": {
        "x": 1450,
        "y": 150
      }
    },
    "25a396d5-a98f-4bd2-b0b9-801ce6ded152": {
      "position": {
        "x": 1650,
        "y": 150
      }
    },
    "744e184e-17d0-4592-8e31-219147e18ce2": {
      "position": {
        "x": 1850,
        "y": 150
      }
    },
    "579eddc4-942c-44cb-a22c-3b78f511b5fe": {
      "position": {
        "x": 1250,
        "y": 300
      }
    },
    "5f12d793-2268-44f1-a2a8-b439490975b5": {
      "position": {
        "x": 1450,
        "y": 300
      }
    },
    "bc33a9e7-d959-4e2b-b0d8-48e2c653de17": {
      "position": {
        "x": 1650,
        "y": 300
      }
    },
    "78ab5e22-8c14-458f-b037-db94fe32edf9": {
      "position": {
        "x": 1850,
        "y": 300
      }
    },
    "57268aa7-eb74-44da-bb76-6a37e0ec3252": {
      "position": {
        "x": 2300,
        "y": 225
      }
    },
    "3b7a8dc6-ac00-4119-83c5-00bc277110d3": {
      "position": {
        "x": 2500,
        "y": 225
      }
    },
    "c051bb47-5954-4a83-808f-ee02b11a4248": {
      "position": {
        "x": 2700,
        "y": 225
      }
    },
    "7cbfdc07-ba4e-45a1-bdaa-db2a006611db": {
      "position": {
        "x": 2900,
        "y": 225
      }
    },
    "40f0c2f1-213e-4dee-8a25-fc359f94aee2": {
      "position": {
        "x": 3100,
        "y": 225
      }
    },
    "5ac13705-b797-4289-abb4-39ca26c289e0": {
      "position": {
        "x": 3300,
        "y": 225
      }
    },
    "48e9ab7b-9855-45dd-9a51-26334f8caee7": {
      "position": {
        "x": 2300,
        "y": 375
      }
    },
    "816e7b04-ff1c-4494-8b1b-91a4e387cfd1": {
      "position": {
        "x": 2500,
        "y": 375
      }
    },
    "2831b857-7c4b-45d5-9c6a-95cc91bd59cd": {
      "position": {
        "x": 2700,
        "y": 375
      }
    }
  },
  "flowSpec": {
    "disconnected-root:312fed22-a1c9-4ca6-9203-9a29b766d18e": [
      {
        "name": "json-generator-connector",
        "type": "connector",
        "stepName": "payload",
        "params": {
          "json": "{\n    \"payload\": {{ DEFAULT( message.payload,message.$) }}\n}",
          "failOnError": false
        },
        "id": "9f698689-3164-42c9-b44a-68bbb10eb63e"
      },
      {
        "type": "session-management",
        "stepName": "put payload",
        "operation": "PUT",
        "sessionType": "LOCAL",
        "scoped": false,
        "fields": [
          "payload"
        ],
        "id": "c4af166e-523e-46f6-baa7-b4a7712a1b88"
      },
      {
        "type": "connector",
        "name": "db-connector-v2",
        "accountLabel": "mysql-2",
        "stepName": "Select",
        "params": {
          "url": "jdbc:mysql://35.223.175.97/db-training",
          "sql": "{{ CONCAT(message.payload.select, \" ORDER BY id LIMIT \", message.payload.size, \" OFFSET \", TOINT(MULTIPLY(SUBTRACT(message.payload.page, 1), TOINT(message.payload.size)))) }}",
          "failOnError": false,
          "keepConnection": false,
          "useDynamicAccount": false,
          "accountType": "basic",
          "operation": "QUERY",
          "batch": false,
          "blobAsFile": false,
          "clobAsFile": false,
          "typeProperties": "[]",
          "connectionProperties": "{\n    \"cancelQueryTimeout\": 5,\n    \"queryTimeout\": 15,\n    \"lockTimeout\": 15000,\n    \"socketTimeout\": 15000\n}",
          "dbPoolByActualConsumers": false,
          "exclusiveDbPool": false,
          "customDbPool": false,
          "columnFromLabel": false,
          "connectionTestQuery": "",
          "rawSql": true,
          "rollbackOnError": false,
          "items": "{}",
          "charset": "UTF-8"
        },
        "id": "7134a571-6561-49ed-b9bc-88b91d85c984",
        "dynamicAccountNames": {},
        "accountLabels": {},
        "__documentation__": "Os campos start e end não estão sendo usados pois são para uso de outros tipos de banco, como no caso do Oracle, onde a forma de montar a query muda"
      },
      {
        "type": "choice",
        "stepName": "Choice",
        "when": [
          {
            "target": "Fail (2)",
            "jsonPath": "$.[?(@.success == false)]"
          },
          {
            "target": "Registers not Found (2)",
            "jsonPath": "$.[?(@.rowCount == 0)]"
          }
        ],
        "otherwise": "Success (2)",
        "id": "44833968-110a-44d6-915c-39ec0a120333"
      }
    ],
    "Fail (2)": [
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "DB Error",
        "params": {
          "logLevel": "ERROR",
          "message": "DB Error: {{ message.$ }}"
        },
        "id": "57a450bb-deb1-4c69-842f-3a29d76e037f"
      },
      {
        "type": "connector",
        "name": "block-execution-connector",
        "stepName": "Reprocessamento",
        "params": {
          "onProcess": "72c74518-0672-4da3-9b49-3aee578449cd-onProcessTrack",
          "onException": "72c74518-0672-4da3-9b49-3aee578449cd-onExceptionTrack"
        },
        "id": "72c74518-0672-4da3-9b49-3aee578449cd",
        "description": "Document here what this block is supposed to do."
      },
      {
        "type": "connector",
        "name": "throw-error-connector",
        "stepName": "Throw Error",
        "params": {
          "customErrorEnabled": true,
          "customError": "{{ message.$ }}"
        },
        "id": "b760ea98-4c81-4264-8858-b092debd60c8"
      }
    ],
    "72c74518-0672-4da3-9b49-3aee578449cd-onProcessTrack": [
      {
        "type": "connector",
        "stepName": "Event Publisher - Reprocessamento",
        "name": "event-publisher-connector",
        "params": {
          "eventName": "reprocessamento-exemplo",
          "body": "{}",
          "showSendEventLog": false,
          "stopOnError": false
        },
        "id": "405d60da-15af-4e8c-a75a-343a840f72f8"
      }
    ],
    "Registers not Found (2)": [
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Registers Not Found",
        "params": {
          "logLevel": "INFO",
          "message": "Registers Not Found"
        },
        "id": "a867448d-4b5b-4a0d-a0bc-1e783811f5be"
      },
      {
        "type": "session-management",
        "stepName": "get payload",
        "operation": "GET",
        "sessionType": "LOCAL",
        "scoped": false,
        "fields": [
          "payload"
        ],
        "id": "8f34c2c4-4336-45ec-84db-2e20073b8562"
      },
      {
        "type": "connector",
        "name": "object-store-connector",
        "accountLabel": "dgb-internal-object-store-account",
        "stepName": "pages-for-extraction",
        "params": {
          "operation": "DELETE",
          "objectStore": "pages-for-extraction",
          "objectId": "{{ message.payload._oId }}",
          "unique": true,
          "isolated": false,
          "failOnError": false
        },
        "id": "25a396d5-a98f-4bd2-b0b9-801ce6ded152"
      },
      {
        "name": "json-generator-connector",
        "type": "connector",
        "stepName": "Final",
        "params": {
          "json": "{\r\n    \"message\": \"Registers not found\"\r\n}",
          "failOnError": false
        },
        "id": "744e184e-17d0-4592-8e31-219147e18ce2"
      }
    ],
    "Success (2)": [
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Success DB",
        "params": {
          "logLevel": "INFO",
          "message": "Success DB"
        },
        "id": "579eddc4-942c-44cb-a22c-3b78f511b5fe"
      },
      {
        "type": "session-management",
        "stepName": "get payload",
        "operation": "GET",
        "sessionType": "LOCAL",
        "scoped": false,
        "fields": [
          "payload"
        ],
        "id": "5f12d793-2268-44f1-a2a8-b439490975b5"
      },
      {
        "type": "connector",
        "name": "object-store-connector",
        "accountLabel": "dgb-internal-object-store-account",
        "stepName": "Insert pagination",
        "params": {
          "operation": "INSERT",
          "objectStore": "temporary-database",
          "objectId": "{{ message.payload._oId }}",
          "document": "{\n    \"pagination\": {{ message.data }}\n}",
          "unique": true,
          "isolated": false,
          "failOnError": true
        },
        "id": "bc33a9e7-d959-4e2b-b0d8-48e2c653de17"
      },
      {
        "type": "choice",
        "stepName": "Choice",
        "when": [
          {
            "target": "Success ObjectStore (2)",
            "jsonPath": "$.[?(@.updateCount > 0)]"
          }
        ],
        "otherwise": "Fail ObjectStore (2)",
        "id": "78ab5e22-8c14-458f-b037-db94fe32edf9"
      }
    ],
    "Success ObjectStore (2)": [
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Success Insert Pagination",
        "params": {
          "logLevel": "INFO",
          "message": "Success Insert Pagination"
        },
        "id": "57268aa7-eb74-44da-bb76-6a37e0ec3252"
      },
      {
        "type": "session-management",
        "stepName": "get payload",
        "operation": "GET",
        "sessionType": "LOCAL",
        "scoped": false,
        "fields": [
          "payload"
        ],
        "id": "3b7a8dc6-ac00-4119-83c5-00bc277110d3"
      },
      {
        "type": "connector",
        "name": "object-store-connector",
        "accountLabel": "dgb-internal-object-store-account",
        "stepName": "update integration-control",
        "params": {
          "operation": "UPDATE_BY_QUERY",
          "objectStore": "integration-control",
          "query": "{\n    \"integrationName\": {$eq: {{ message.payload.integration }} },\n    \"integrationDate\": {$eq: {{ FORMATDATE(NOW(), \"timestamp\", \"dd/MM/yyyy\") }}}\n}",
          "document": "{\n   $set: {\n        \"integrationName\": {{ message.payload.integration }},\n        \"integrationDate\":{{ FORMATDATE(NOW(), \"timestamp\", \"dd/MM/yyyy\") }},\n        \"step\":\"extraction\",\n    \t\"status\": {{ IF( message.payload.finished, \"finished\", \"progress\" )  }}\n    }\n}\n",
          "unique": true,
          "isolated": false,
          "upsert": true,
          "failOnError": false
        },
        "id": "c051bb47-5954-4a83-808f-ee02b11a4248"
      },
      {
        "type": "session-management",
        "stepName": "get payload",
        "operation": "GET",
        "sessionType": "LOCAL",
        "scoped": false,
        "fields": [
          "payload"
        ],
        "id": "7cbfdc07-ba4e-45a1-bdaa-db2a006611db"
      },
      {
        "type": "connector",
        "name": "object-store-connector",
        "accountLabel": "dgb-internal-object-store-account",
        "stepName": "pages-for-extraction",
        "params": {
          "operation": "UPDATE",
          "objectStore": "pages-for-extraction",
          "objectId": "{{ message.payload._oId }}",
          "document": "{\n    $set:{\n        \"pageStatus\": \"done\"\n    }\n}",
          "unique": true,
          "isolated": false,
          "upsert": false,
          "failOnError": false
        },
        "id": "40f0c2f1-213e-4dee-8a25-fc359f94aee2"
      },
      {
        "name": "json-generator-connector",
        "type": "connector",
        "stepName": "Final",
        "params": {
          "json": "{\r\n    \"success\": true\r\n}",
          "failOnError": false
        },
        "id": "5ac13705-b797-4289-abb4-39ca26c289e0"
      }
    ],
    "Fail ObjectStore (2)": [
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Fail Insert Pagination",
        "params": {
          "logLevel": "ERROR",
          "message": "Fail Insert Pagination {{ message.$ }}"
        },
        "id": "48e9ab7b-9855-45dd-9a51-26334f8caee7"
      },
      {
        "type": "connector",
        "name": "block-execution-connector",
        "stepName": "Reprocessamento",
        "params": {
          "onProcess": "816e7b04-ff1c-4494-8b1b-91a4e387cfd1-onProcessTrack",
          "onException": "816e7b04-ff1c-4494-8b1b-91a4e387cfd1-onExceptionTrack"
        },
        "id": "816e7b04-ff1c-4494-8b1b-91a4e387cfd1",
        "description": "Document here what this block is supposed to do."
      },
      {
        "type": "connector",
        "name": "throw-error-connector",
        "stepName": "Throw Error",
        "params": {
          "customErrorEnabled": true,
          "customError": "{\n    \"success\": false\n}"
        },
        "id": "2831b857-7c4b-45d5-9c6a-95cc91bd59cd"
      }
    ],
    "816e7b04-ff1c-4494-8b1b-91a4e387cfd1-onProcessTrack": [
      {
        "type": "connector",
        "stepName": "Event Publisher - Reprocessamento",
        "name": "event-publisher-connector",
        "params": {
          "eventName": "reprocessamento-exemplo",
          "body": "{}",
          "showSendEventLog": false,
          "stopOnError": false
        },
        "id": "6adcd3c3-eeae-43f0-822f-023f0dd2b1b4"
      }
    ]
  }
}
Flow Spec: sch-dispatcher-pages
{
  "meta": {
    "e221a685-6195-45fa-a662-e4dd21157943": {
      "position": {
        "x": 200,
        "y": 75
      }
    },
    "ccccc951-5e7e-46fc-be5f-b008d7f7f615": {
      "position": {
        "x": 400,
        "y": 75
      }
    },
    "87b8a706-dfe8-4c09-9ff4-8eeef1210edb": {
      "position": {
        "x": 600,
        "y": 75
      }
    },
    "060480d5-7b25-4642-9d01-0b46a2090e9d": {
      "position": {
        "x": 800,
        "y": 75
      }
    },
    "ee50fdfc-b8bd-417f-ba9f-9171b9bff6f8": {
      "position": {
        "x": 1000,
        "y": 75
      }
    },
    "57b6367e-e45c-4bc6-b5a5-4380d2afefc9": {
      "position": {
        "x": 1200,
        "y": 75
      }
    },
    "bb70603c-ba06-468f-bd25-7aba77e9bfd3": {
      "position": {
        "x": 1650,
        "y": 0
      }
    },
    "4141fc4d-e6bd-4394-ae89-234d5f79fcc1": {
      "position": {
        "x": 1850,
        "y": 0
      }
    },
    "a63c03bc-adf7-4d88-87c6-ffce0bf97590": {
      "position": {
        "x": 2050,
        "y": 0
      }
    },
    "58e7fc63-ca29-49e9-a474-8c0f0692f7fc": {
      "position": {
        "x": 2250,
        "y": 0
      }
    },
    "dc36afa3-ab3d-4ba4-89cb-1c32a0067077": {
      "position": {
        "x": 2450,
        "y": 0
      }
    },
    "5e702895-12e9-4202-bdd2-7318bfd25cab": {
      "position": {
        "x": 1650,
        "y": 150
      }
    }
  },
  "flowSpec": {
    "disconnected-root:6699dfd6-0425-4887-b4be-f633ef7064fd": [
      {
        "name": "json-generator-connector",
        "type": "connector",
        "stepName": "get global integration-list",
        "params": {
          "json": "{\n    \"integrations\": [\n      {\n        \"pageSize\": \"2000\",\n        \"nameIntegration\": \"Customer\",\n        \"query\": \"SELECT id, first_name, last_name, gender, ip_address, email  FROM pagination_course WHERE first_name IS NOT NULL\",\n        \"eventExtract\": \"evt-extract-integration\",\n        \"eventSend\": \"evt-load-integration\",\n        \"tableExtract\": \"pagination_course\",\n        \"databaseExtract\": \"digibee_database\",\n        \"dependencyList\": [\n          \n        ]\n      },\n      {\n        \"pageSize\": \"2000\",\n        \"nameIntegration\": \"Product\",\n        \"query\": \"SELECT PRODUCT_ID, NAME, GROUP  FROM PRODUCT WHERE GROUP IN(10,30,40)\",\n        \"eventExtract\": \"evt-extract-product\",\n        \"eventSend\": \"evt-load-product\",\n        \"tableExtract\": \"PRODUCT\",\n        \"databaseExtract\": \"digibee_database\",\n        \"dependencyList\": [\n          \n        ]\n      },\n      {\n        \"pageSize\": \"5000\",\n        \"nameIntegration\": \"Order\",\n        \"query\": \"SELECT ORDER_ID,  CUSTOMER_ID, ORDER_VALUE, ORDER_DATE FROM ORDER WHERE ORDER_DATE = SYSDATE\",\n        \"eventExtract\": \"evt-extract-order\",\n        \"eventSend\": \"evt-load-order\",\n        \"tableExtract\": \"ORDER\",\n        \"databaseExtract\": \"digibee_database\",\n        \"dependencyList\": [\n          \"Customer\",\n          \"Product\"\n        ]\n      }\n    ]\n  }\n",
          "failOnError": false
        },
        "id": "e221a685-6195-45fa-a662-e4dd21157943"
      },
      {
        "type": "connector",
        "name": "for-each-connector",
        "stepName": "For Each integration-list",
        "params": {
          "jsonPath": "$.integrations[0]",
          "itemIdentifier": "",
          "parallel": true,
          "failOnError": false,
          "onProcess": "ccccc951-5e7e-46fc-be5f-b008d7f7f615-onProcessTrack",
          "onException": "ccccc951-5e7e-46fc-be5f-b008d7f7f615-onExceptionTrack"
        },
        "id": "ccccc951-5e7e-46fc-be5f-b008d7f7f615"
      },
      {
        "type": "session-management",
        "stepName": "get resultList",
        "operation": "GET",
        "sessionType": "LOCAL",
        "scoped": false,
        "fields": [
          "resultList"
        ],
        "id": "87b8a706-dfe8-4c09-9ff4-8eeef1210edb"
      },
      {
        "type": "transformer",
        "stepName": "Transformer (JOLT)",
        "transformSpec": [
          {
            "operation": "shift",
            "spec": {
              "resultList": {
                "*": {
                  "temp": {
                    "nameIntegration": "nameIntegration[]"
                  }
                }
              }
            }
          }
        ],
        "id": "060480d5-7b25-4642-9d01-0b46a2090e9d"
      },
      {
        "type": "connector",
        "name": "object-store-connector",
        "accountLabel": "dgb-internal-object-store-account",
        "stepName": "pages-for-extraction",
        "params": {
          "operation": "FIND_BY_QUERY",
          "objectStore": "pages-for-extraction",
          "query": "{\n    \"integration\": {$in: {{ message.nameIntegration }} },\n    \"pageStatus\": \"done\"\n}",
          "limit": "50",
          "skip": "0",
          "sort": "{\"finished\":1}",
          "unique": true,
          "isolated": false,
          "failOnError": false
        },
        "id": "ee50fdfc-b8bd-417f-ba9f-9171b9bff6f8"
      },
      {
        "type": "choice",
        "stepName": "Choice",
        "when": [
          {
            "target": "Pagination Found (2)",
            "jsonPath": "$.[?(@.rowCount > 0)]"
          }
        ],
        "otherwise": "Pagination Not Found (2)",
        "id": "57b6367e-e45c-4bc6-b5a5-4380d2afefc9"
      }
    ],
    "ccccc951-5e7e-46fc-be5f-b008d7f7f615-onProcessTrack": [
      {
        "name": "json-generator-connector",
        "type": "connector",
        "stepName": "generate temp object",
        "params": {
          "json": "{\n    \"temp\": {{ message.$ }}\n}",
          "failOnError": false
        },
        "id": "ea50152b-ed8a-491d-b047-8bcfaaa3922c"
      },
      {
        "type": "session-management",
        "stepName": "put temp",
        "operation": "PUT",
        "sessionType": "LOCAL",
        "scoped": true,
        "fields": [
          "temp"
        ],
        "id": "d500e729-a698-4bda-9209-c9e8c5654861"
      },
      {
        "type": "connector",
        "name": "object-store-connector",
        "accountLabel": "dgb-internal-object-store-account",
        "stepName": "verifica se a integração já terminou",
        "params": {
          "operation": "FIND_BY_QUERY",
          "objectStore": "integration-control",
          "query": "{\n    \"integrationName\": {{ message.temp.nameIntegration }},\n    \"integrationDate\": {{ FORMATDATE(NOW(), \"timestamp\", \"dd/MM/yyyy\", null, \"GMT-3\") }},\n    \"step\": \"integration\",\n    \"status\": \"done\"\n}",
          "limit": "0",
          "skip": "0",
          "sort": "{}",
          "unique": true,
          "isolated": false,
          "failOnError": false
        },
        "id": "71c02160-d99d-49bd-b85f-57589aa641fd"
      },
      {
        "type": "choice",
        "stepName": "Choice",
        "when": [
          {
            "target": "Integração já finalizou (1)",
            "jsonPath": "$.[?(@.rowCount > 0)]",
            "__documentation__": ""
          }
        ],
        "otherwise": "Verifica depêndencia (1)",
        "id": "86197148-fff5-4543-9479-a7c16d4e9cf8"
      }
    ],
    "Integração já finalizou (1)": [
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Integração finalizada",
        "params": {
          "logLevel": "INFO",
          "message": "Hora do filtro {{ FORMATDATE(NOW(), \"timestamp\", \"dd/MM/yyyy HH:mm\", null, \"GMT-3\") }}  | Integração {{ message.data[0].integrationName }} já processou"
        },
        "id": "f1396b86-5147-402e-aa6b-f5ba6e378404"
      }
    ],
    "Verifica depêndencia (1)": [
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Verifica se há dependência",
        "params": {
          "logLevel": "INFO",
          "message": "Hora do filtro {{ FORMATDATE(NOW(), \"timestamp\", \"dd/MM/yyyy HH:mm\", null, \"GMT-3\") }} | Verifica se a Integração {{ message.data[0].integrationName }} tem dependência"
        },
        "id": "fc71efd4-d8c9-4fbc-8317-13364597beaf"
      },
      {
        "type": "session-management",
        "stepName": "put temp",
        "operation": "GET",
        "sessionType": "LOCAL",
        "scoped": true,
        "fields": [
          "temp"
        ],
        "id": "407f2de6-d8e8-40c5-b556-e066559f1815"
      },
      {
        "type": "connector",
        "name": "block-execution-connector",
        "stepName": "dependencies",
        "params": {
          "onProcess": "c2ed8b50-51cc-4e54-a2bb-8efb75d88862-onProcessTrack",
          "onException": "c2ed8b50-51cc-4e54-a2bb-8efb75d88862-onExceptionTrack"
        },
        "id": "c2ed8b50-51cc-4e54-a2bb-8efb75d88862",
        "description": "Verify dependencies for integration, if heave do validation, else go to pages extraction"
      },
      {
        "type": "capsule",
        "name": "capsule-v1-digibee-digibee-tools-add-item-to-session-array-1.0",
        "capsuleCollection": "digibee-tools",
        "capsuleCollectionVersion": 1,
        "capsule": "add-item-to-session-array-1.0",
        "capsuleVersionMajor": 1,
        "capsuleVersionMinor": 0,
        "stepName": "Add item to session (array)",
        "params": {
          "obj": "{{ message.$}}"
        },
        "id": "3fc6683c-0bdb-4742-afd1-4222703a1b8a"
      },
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Log - Add item to session (array)",
        "params": {
          "logLevel": "WARN",
          "message": "Add item to session (array) {{ message.$ }}"
        },
        "id": "2f9add63-302d-43e1-8bc8-1a4f78181eed"
      },
      {
        "type": "session-management",
        "stepName": "delete temp",
        "operation": "DELETE",
        "sessionType": "LOCAL",
        "scoped": true,
        "fields": [
          "temp"
        ],
        "id": "dc1f5276-497e-4646-b7e5-254fd38aee3a"
      }
    ],
    "c2ed8b50-51cc-4e54-a2bb-8efb75d88862-onProcessTrack": [
      {
        "type": "choice",
        "stepName": "Choice",
        "when": [
          {
            "target": "Contains Dependencies (1)",
            "jsonPath": "$.temp.dependencyList[*]"
          }
        ],
        "otherwise": "Dont contains dependencies (1)",
        "id": "e4201d8f-0f61-4d99-a074-917c0ada3f44"
      }
    ],
    "Contains Dependencies (1)": [
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Log - Contains Dependencies",
        "params": {
          "logLevel": "INFO",
          "message": "Contains Dependencies | Integracao {{ message.temp.nameIntegration }} | Dependencias: {{ message.temp.dependencyList }}"
        },
        "id": "f15a11a0-c107-423d-8ef4-ef8722d4f6d5"
      },
      {
        "type": "connector",
        "name": "object-store-connector",
        "accountLabel": "dgb-internal-object-store-account",
        "stepName": "find integration-control",
        "params": {
          "operation": "FIND_BY_QUERY",
          "objectStore": "integration-control",
          "query": "{\n    \"integrationDate\": {{ FORMATDATE( NOW(), \"timestamp\", \"dd/MM/yyyy\") }} ,\n    \"integrationName\": { $in: {{ message.temp.dependencyList }} },\n    \"status\": { $ne: \"done\"  }\n}",
          "document": "",
          "limit": "0",
          "skip": "0",
          "sort": "",
          "unique": true,
          "isolated": false,
          "upsert": false,
          "failOnError": false
        },
        "id": "40db5a15-f63f-4023-a534-7832b813975c"
      },
      {
        "type": "connector",
        "name": "assert-connector-v2",
        "stepName": "Assert dependencies",
        "params": {
          "condition": "{{ EQUALTO( message.rowCount,0) }}",
          "errorMessage": "Found dependencies in progress for integration {{ message.$ }}",
          "internalErrorMessage": "Internal Error",
          "errorCode": 500,
          "failOnError": true
        },
        "id": "2ac498d6-e793-43cf-95af-df4df1e3f6a2"
      },
      {
        "type": "session-management",
        "stepName": "get temp",
        "operation": "GET",
        "sessionType": "LOCAL",
        "scoped": true,
        "fields": [
          "temp"
        ],
        "id": "f91437fe-a52f-41a3-a5aa-0b534be77b1b"
      },
      {
        "name": "json-generator-connector",
        "type": "connector",
        "stepName": "only temp",
        "params": {
          "json": "{\n    \"temp\": {{ message.temp }}\n}\n",
          "failOnError": false
        },
        "id": "2fce4888-73da-40c2-b155-a1f5c14a7e8a"
      }
    ],
    "Dont contains dependencies (1)": [
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Log - Dont contains dependencies",
        "params": {
          "logLevel": "INFO",
          "message": "Dont contains dependencies | Integracao {{ message.temp.nameIntegration }}"
        },
        "id": "bc0c9370-4677-4a32-a502-6c531d56ee81"
      }
    ],
    "ccccc951-5e7e-46fc-be5f-b008d7f7f615-onExceptionTrack": [
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Log - For Each integration-list (onException)",
        "params": {
          "logLevel": "ERROR",
          "message": "For Each (onException) integration-list {{ message.$ }}"
        },
        "id": "d4fabc6f-a4ca-45d9-b754-76af2bfb7fbc"
      },
      {
        "type": "capsule",
        "name": "capsule-v1-digibee-digibee-tools-add-item-to-session-array-1.0",
        "capsuleCollection": "digibee-tools",
        "capsuleCollectionVersion": 1,
        "capsule": "add-item-to-session-array-1.0",
        "capsuleVersionMajor": 1,
        "capsuleVersionMinor": 0,
        "stepName": "Add item to session (array)",
        "params": {
          "obj": "{{ message.$ }}"
        },
        "id": "945aa95b-eeaa-4b4a-a632-73b5d2b2483a"
      }
    ],
    "Pagination Found (2)": [
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Log - Pagination Found",
        "params": {
          "logLevel": "INFO",
          "message": "Pagination Found"
        },
        "id": "bb70603c-ba06-468f-bd25-7aba77e9bfd3"
      },
      {
        "type": "connector",
        "name": "for-each-connector",
        "stepName": "For Each pages-for-extraction",
        "params": {
          "jsonPath": "$.data",
          "itemIdentifier": "",
          "parallel": false,
          "failOnError": false,
          "onProcess": "4141fc4d-e6bd-4394-ae89-234d5f79fcc1-onProcessTrack",
          "onException": "4141fc4d-e6bd-4394-ae89-234d5f79fcc1-onExceptionTrack"
        },
        "id": "4141fc4d-e6bd-4394-ae89-234d5f79fcc1"
      },
      {
        "type": "session-management",
        "stepName": "get resultList",
        "operation": "GET",
        "sessionType": "LOCAL",
        "scoped": false,
        "fields": [
          "resultList"
        ],
        "id": "a63c03bc-adf7-4d88-87c6-ffce0bf97590"
      },
      {
        "type": "transformer",
        "stepName": "Transformer (JOLT)",
        "transformSpec": [
          {
            "operation": "shift",
            "spec": {
              "resultList": {
                "*": {
                  "temp": {
                    "nameIntegration": "nameIntegration[]"
                  }
                }
              }
            }
          }
        ],
        "id": "58e7fc63-ca29-49e9-a474-8c0f0692f7fc"
      },
      {
        "name": "json-generator-connector",
        "type": "connector",
        "stepName": "resume",
        "params": {
          "json": "{\n    \"integrations\":{{ message.nameIntegration }}\n}",
          "failOnError": false
        },
        "id": "dc36afa3-ab3d-4ba4-89cb-1c32a0067077"
      }
    ],
    "4141fc4d-e6bd-4394-ae89-234d5f79fcc1-onProcessTrack": [
      {
        "type": "connector",
        "stepName": "Event Publisher",
        "name": "event-publisher-connector",
        "params": {
          "eventName": "{{ message.event }}",
          "body": "{\n    \"id\": {{ message._oId }},\n    \"integration\": {{ message.integration }},\n    \"finished\": {{ message.finished }}\n}",
          "showSendEventLog": false,
          "stopOnError": false
        },
        "id": "e3957241-8b81-480e-a325-65dd7e71b397"
      },
      {
        "type": "connector",
        "name": "object-store-connector",
        "accountLabel": "dgb-internal-object-store-account",
        "stepName": "pages-for-extraction",
        "params": {
          "operation": "UPDATE",
          "objectStore": "pages-for-extraction",
          "objectId": "{{ message._oId }}",
          "document": "{\n    $set:{\n        \"pageStatus\": \"Dispached\"\n    }\n}",
          "unique": true,
          "isolated": false,
          "upsert": false,
          "failOnError": false
        },
        "id": "caba6de3-6a18-45c2-addb-012804a99faa"
      },
      {
        "name": "json-generator-connector",
        "type": "connector",
        "stepName": "set success",
        "params": {
          "json": "{\n    \"success\": true\n}",
          "failOnError": false
        },
        "id": "4ca32d91-7116-40c6-8eb7-1ede69763204"
      }
    ],
    "4141fc4d-e6bd-4394-ae89-234d5f79fcc1-onExceptionTrack": [
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Log - For Each pages-for-extraction (onException)",
        "params": {
          "logLevel": "ERROR",
          "message": "For Each pages-for-extraction (onException) {{ message.$ }}"
        },
        "id": "26ee529f-9f99-402c-80b4-44f9768d63e7"
      }
    ],
    "Pagination Not Found (2)": [
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Log - Pagination Not Found",
        "params": {
          "logLevel": "INFO",
          "message": "Pagination Not Found"
        },
        "id": "5e702895-12e9-4202-bdd2-7318bfd25cab"
      }
    ]
  }
}
Flow Spec: evt-load-integration
{
  "meta": {
    "6d0dc228-8417-48eb-ab00-476148ed0573": {
      "position": {
        "x": 200,
        "y": 75
      }
    },
    "ffeee3ac-267f-4920-b5a3-ba384a989be6": {
      "position": {
        "x": 400,
        "y": 75
      }
    },
    "7e37c303-918c-4428-91ff-7db73586f239": {
      "position": {
        "x": 600,
        "y": 75
      }
    },
    "6f94dfb9-d216-4f58-a53a-2dbf8425bf0f": {
      "position": {
        "x": 800,
        "y": 75
      }
    },
    "ae802de8-0b7c-4a2d-9570-fcb25aaf8000": {
      "position": {
        "x": 1000,
        "y": 75
      }
    },
    "9dfe9c97-2a86-4b81-b370-58171603857a": {
      "position": {
        "x": 1450,
        "y": 0
      }
    },
    "150716e1-0240-4ec0-b2c8-77152753010d": {
      "position": {
        "x": 1650,
        "y": 0
      }
    },
    "1cc4d3aa-e8e1-4712-9172-d7b6976698dc": {
      "position": {
        "x": 1850,
        "y": 0
      }
    },
    "c98a59dc-5058-4153-a512-165db3a8d449": {
      "position": {
        "x": 2050,
        "y": 0
      }
    },
    "239b532d-2eaf-4040-a1ad-a844489aa0e2": {
      "position": {
        "x": 1450,
        "y": 150
      }
    },
    "9ea39fcf-c314-44c7-a589-c62f5b1ce4c9": {
      "position": {
        "x": 1650,
        "y": 150
      }
    }
  },
  "flowSpec": {
    "disconnected-root:b6359fcd-a77e-4faa-acf7-fcb5626e4aff": [
      {
        "type": "connector",
        "name": "assert-connector-v2",
        "stepName": "Assert id",
        "params": {
          "condition": "{{ NOT(ISNULL( message.id))  }}",
          "errorMessage": "Id não enviado ao evento",
          "internalErrorMessage": "Internal Error",
          "errorCode": 500,
          "failOnError": true
        },
        "id": "6d0dc228-8417-48eb-ab00-476148ed0573"
      },
      {
        "name": "json-generator-connector",
        "type": "connector",
        "stepName": "JSON Generator",
        "params": {
          "json": "{\n    \"payload\": {{ message.$ }}\n}",
          "failOnError": false
        },
        "id": "ffeee3ac-267f-4920-b5a3-ba384a989be6"
      },
      {
        "type": "session-management",
        "stepName": "put payload",
        "operation": "PUT",
        "sessionType": "LOCAL",
        "scoped": false,
        "fields": [
          "payload"
        ],
        "id": "7e37c303-918c-4428-91ff-7db73586f239"
      },
      {
        "type": "connector",
        "name": "object-store-connector",
        "accountLabel": "dgb-internal-object-store-account",
        "stepName": "Find",
        "params": {
          "operation": "FIND",
          "objectStore": "temporary-database",
          "objectId": "{{ message.payload.id}}",
          "limit": "0",
          "skip": "0",
          "sort": "{}",
          "unique": true,
          "isolated": false,
          "failOnError": false
        },
        "id": "6f94dfb9-d216-4f58-a53a-2dbf8425bf0f"
      },
      {
        "type": "choice",
        "stepName": "Choice",
        "when": [
          {
            "target": "Found Registers ObjStr (2)",
            "jsonPath": "$.[?(@.rowCount > 0)]"
          }
        ],
        "otherwise": "Not Found ObjStr (2)",
        "id": "ae802de8-0b7c-4a2d-9570-fcb25aaf8000"
      }
    ],
    "Found Registers ObjStr (2)": [
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Found Registers ObjStr",
        "params": {
          "logLevel": "INFO",
          "message": "Found Registers ObjStr: {{ message.rowCount }}"
        },
        "id": "9dfe9c97-2a86-4b81-b370-58171603857a"
      },
      {
        "type": "connector",
        "name": "block-execution-connector",
        "stepName": "simulação de envio",
        "params": {
          "onProcess": "150716e1-0240-4ec0-b2c8-77152753010d-onProcessTrack",
          "onException": "150716e1-0240-4ec0-b2c8-77152753010d-onExceptionTrack"
        },
        "id": "150716e1-0240-4ec0-b2c8-77152753010d",
        "description": "Document here what this block is supposed to do."
      },
      {
        "type": "connector",
        "name": "block-execution-connector",
        "stepName": "Last Pagination",
        "params": {
          "onProcess": "1cc4d3aa-e8e1-4712-9172-d7b6976698dc-onProcessTrack",
          "onException": "1cc4d3aa-e8e1-4712-9172-d7b6976698dc-onExceptionTrack"
        },
        "id": "1cc4d3aa-e8e1-4712-9172-d7b6976698dc",
        "description": "Document here what this block is supposed to do."
      },
      {
        "type": "connector",
        "name": "block-execution-connector",
        "stepName": "Delete Page",
        "params": {
          "onProcess": "c98a59dc-5058-4153-a512-165db3a8d449-onProcessTrack",
          "onException": "c98a59dc-5058-4153-a512-165db3a8d449-onExceptionTrack"
        },
        "id": "c98a59dc-5058-4153-a512-165db3a8d449",
        "description": "Document here what this block is supposed to do."
      }
    ],
    "150716e1-0240-4ec0-b2c8-77152753010d-onProcessTrack": [
      {
        "name": "json-generator-connector",
        "type": "connector",
        "stepName": "MOCK",
        "params": {
          "json": "{\n  \"status\": 200\n}",
          "failOnError": false
        },
        "id": "a7abaee8-3fb1-40a1-8da4-f16594528cf2"
      }
    ],
    "150716e1-0240-4ec0-b2c8-77152753010d-onExceptionTrack": [
      {
        "type": "connector",
        "name": "block-execution-connector",
        "stepName": "reprocessa ou notifica",
        "params": {
          "onProcess": "95002c8b-ab12-4d15-b478-a8f45eb6fd19-onProcessTrack",
          "onException": "95002c8b-ab12-4d15-b478-a8f45eb6fd19-onExceptionTrack"
        },
        "id": "95002c8b-ab12-4d15-b478-a8f45eb6fd19",
        "description": "Document here what this block is supposed to do."
      },
      {
        "type": "connector",
        "name": "throw-error-connector",
        "stepName": "Throw Error",
        "params": {
          "errorMessage": "Error occurred.",
          "errorCode": 500,
          "customErrorEnabled": false
        },
        "id": "9b04d630-1c79-49d8-a5c1-f7fac9f83964"
      }
    ],
    "95002c8b-ab12-4d15-b478-a8f45eb6fd19-onProcessTrack": [
      {
        "type": "connector",
        "stepName": "Event Publisher - Reprocessamento",
        "name": "event-publisher-connector",
        "params": {
          "eventName": "reprocessamento-exemplo",
          "body": "{}",
          "showSendEventLog": false,
          "stopOnError": false
        },
        "id": "1873ebdc-41c1-490c-a26f-19a5ddbb632a"
      }
    ],
    "1cc4d3aa-e8e1-4712-9172-d7b6976698dc-onProcessTrack": [
      {
        "type": "session-management",
        "stepName": "get payload",
        "operation": "GET",
        "sessionType": "LOCAL",
        "scoped": false,
        "fields": [
          "payload"
        ],
        "id": "f291175f-3295-4943-8067-9fc7c13b9fe9"
      },
      {
        "type": "choice",
        "stepName": "Choice",
        "when": [
          {
            "target": "isLastPage (1)",
            "jsonPath": "$.[?(@.payload.finished == true )]",
            "__documentation__": ""
          }
        ],
        "otherwise": "in Progress (1)",
        "id": "1832f868-334e-409e-8e8c-ea014163fad1"
      }
    ],
    "isLastPage (1)": [
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Update integration-control",
        "params": {
          "logLevel": "INFO",
          "message": "Updating integration-control status to {{ message.finished }}"
        },
        "id": "14a90ef8-53ab-4854-a152-d191631ef224"
      },
      {
        "type": "connector",
        "name": "object-store-connector",
        "accountLabel": "dgb-internal-object-store-account",
        "stepName": "find integration-control",
        "params": {
          "operation": "UPDATE_BY_QUERY",
          "objectStore": "integration-control",
          "query": "{\n    \"integrationName\": {$eq: {{ message.payload.integration }}},\n    \"integrationDate\": {$eq: {{ FORMATDATE(NOW(), \"timestamp\", \"dd/MM/yyyy\") }}}\n}",
          "document": "{\n   $set: {\n        \"integrationName\": {{ message.payload.integration }},\n        \"integrationDate\":{{ FORMATDATE(NOW(), \"timestamp\", \"dd/MM/yyyy\") }},\n        \"step\":\"integration\",\n        \"status\": {{ IF( EQUALTO(TOSTRING(message.payload.finished),\"true\"), \"done\", \"progress\" )  }}\n    }\n}",
          "unique": true,
          "isolated": false,
          "upsert": false,
          "failOnError": false
        },
        "id": "b0df069a-efd6-4bd1-8558-a5291ebfb8e5"
      },
      {
        "type": "connector",
        "name": "assert-connector-v2",
        "stepName": "Assert integration-control",
        "params": {
          "condition": "{{ GREATERTHAN( message.updateCount,0 ) }}",
          "errorMessage": "Erro to insert integration control",
          "internalErrorMessage": "Internal Error",
          "errorCode": 500,
          "failOnError": true
        },
        "id": "693692c9-dfb3-40a5-88e1-9f9aac90cefc"
      },
      {
        "name": "json-generator-connector",
        "type": "connector",
        "stepName": "set success",
        "params": {
          "json": "{\n    \"success\": true\n}",
          "failOnError": false
        },
        "id": "58db5716-8abe-4585-87f5-daf47cb2f20b"
      }
    ],
    "in Progress (1)": [
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Nothing to do",
        "params": {
          "logLevel": "INFO",
          "message": "Last page is {{ message.finished }}"
        },
        "id": "e1798c51-dcaf-4a0c-9172-8265b61ac81b"
      }
    ],
    "c98a59dc-5058-4153-a512-165db3a8d449-onProcessTrack": [
      {
        "type": "session-management",
        "stepName": "Get payload",
        "operation": "GET",
        "sessionType": "LOCAL",
        "scoped": false,
        "fields": [
          "payload"
        ],
        "id": "60f4542f-2990-4fb6-98dd-1772e3f43456"
      },
      {
        "type": "connector",
        "name": "object-store-connector",
        "accountLabel": "dgb-internal-object-store-account",
        "stepName": "delete",
        "params": {
          "operation": "DELETE",
          "objectStore": "temporary-database",
          "objectId": "{{ message.payload.id }}",
          "unique": true,
          "isolated": false,
          "failOnError": false
        },
        "id": "470318f8-8490-42c9-8a5b-47b8565fc4dd"
      },
      {
        "type": "session-management",
        "stepName": "get payload",
        "operation": "GET",
        "sessionType": "LOCAL",
        "scoped": false,
        "fields": [
          "payload"
        ],
        "id": "80e01121-4d40-4910-97eb-dc553b589f62"
      },
      {
        "type": "connector",
        "name": "object-store-connector",
        "accountLabel": "dgb-internal-object-store-account",
        "stepName": "delete pages-for-extraction",
        "params": {
          "operation": "DELETE",
          "objectStore": "pages-for-extraction",
          "objectId": "{{ message.payload.id }}",
          "unique": true,
          "isolated": false,
          "failOnError": false
        },
        "id": "a5ba58e8-d618-4d29-8051-00a3a975ac80"
      }
    ],
    "Not Found ObjStr (2)": [
      {
        "type": "connector",
        "name": "log-connector",
        "stepName": "Not Found Registers ObjStr",
        "params": {
          "logLevel": "INFO",
          "message": "Object Store temporary-database is empty."
        },
        "id": "239b532d-2eaf-4040-a1ad-a844489aa0e2"
      },
      {
        "name": "json-generator-connector",
        "type": "connector",
        "stepName": "JSON Generator",
        "params": {
          "json": "{\n    \"Message\": \"Object Store temporary-database is empty.\"\n}",
          "failOnError": false
        },
        "id": "9ea39fcf-c314-44c7-a589-c62f5b1ce4c9"
      }
    ]
  }
}

Conclusion

The architecture presented in this article ensures high scalability in the ETL process. Depending on the demand and constraints of each component, extraction and loading can be scaled independently, pacing each stage as needed. For example, if the extraction window at the source is shorter than the loading window at the target, we can speed up extraction by increasing replicas.

This decoupling of stages also prevents an outage in the target system from blocking the extraction stage, which can be fully completed and delivered later.

Additionally, the model enables sharing of common components across various processes, such as the page generation pipelines for extraction and the page dispatching pipeline.

Last updated

Was this helpful?