Modelo ETL de alta escalabilidade para Digibee
Aprenda a desenvolver um processo ETL escalável usando pipelines baseados em eventos.
Embora uma plataforma iPaaS como a Digibee seja um software em nuvem que ajuda a conectar aplicativos de diferentes sistemas e organizações em tempo real, é inevitável que existam necessidades de implementação de processos de ETL (Extract, Transform and Load). Mesmo com diferentes capacidades, é possível implementar esse tipo de solução com grande eficiência.
Este documento descreve um modelo ETL utilizado na extração, transformação de dados (mapeamento de campos) e envio de arquivos de grande volume de registros em um banco de dados relacional para o sistema de destino utilizando a Digibee Integration Platform.
O objetivo é conseguir realizar a extração dos dados dentro do intervalo de tempo pré-determinado, escalando eventos conforme a demanda.
Cenário
Considere um cenário onde um alto volume de dados deve ser extraído de uma base de dados relacional que contém diferentes tabelas, onde cada uma contém dados de um fluxo de negócio distinto, ou seja, várias integrações. Todas essas tabelas juntas podem representar cerca de 20 milhões de registros por dia.
Para cada uma das tabelas, é necessário realizar um mapeamento de campos, e os dados devem ser enviados em lotes para o destino que pode ser uma API ou um File Server.
Considere também que no destino não há uma janela restrita de tempo para entregar os dados, mas a extração se dá em um banco de dados de uso crítico e, por isso, deve ser realizada durante a madrugada. Dessa forma, é importante que a arquitetura seja altamente escalável, permitindo parametrização por fluxo.
Arquitetura
A implementação deste modelo segue uma arquitetura baseada em eventos, na qual temos:
Dois pipelines iniciais para a etapa de Extract
Dois pipelines que juntos contemplam as etapas de Transform e Load

A arquitetura apresentada neste artigo prevê uma transformação simples como, por exemplo, um mapeamento de campos da origem para o destino, contemplando algumas regras simples.
Entretanto se o cenário requer transformações complexas de dados, que dependem de grandes processamentos internos, talvez seja necessário evoluir a arquitetura com um pipeline de evento somente para transformação, antes do envio dos dados, como mostra o modelo abaixo:

Com a Digibee Integration Platform, é simples evoluir esse modelo de forma desacoplada, ganhando muito em escalabilidade.
Principais componentes
Extract
Pipeline para a paginação de registros para extração
Um pipeline do tipo Scheduler de uso comum inicia o processo para extração dos dados. Este pipeline funciona como uma “alavanca” de integrações. Sua responsabilidade é apenas gerar os metadados das páginas de cada fluxo e publicar o evento de extração.
Os metadados das páginas são uma estrutura JSON que informa:
O nome da integração
ID da página
Registro inicial
Registro final
A partir de uma lista de integrações configurada com os parâmetros necessários para cada uma delas, o pipeline identifica por exemplo:
Nome do fluxo
Tabela de origem
Número de registros a ser extraídos por página em cada evento
Evento de extração a ser publicado
Horário inicial da extração
Lista de dependências de outras integrações (caso exista)
Pipeline para a extração de páginas
Um pipeline específico para cada integração que recebe em cada publicação os metadados da página para realizar sua extração e armazenar esses metadados no Object Store temporário (específico do fluxo).
Por ser um evento, esse pipeline pode ser escalado conforme a necessidade do fluxo em questão. Isso pode ser feito tanto em tamanho (Small, Medium ou Large) para suportar a página extraída, quanto em réplicas para suportar a quantidade de páginas previstas simultaneamente. Por exemplo, se for necessário reduzir o tempo de extração, o aumento de réplicas pode ser uma boa estratégia.
Com o modelo baseado em consumo, a escala horizontal (réplica) pode ser configurada para Autoscaling, sendo necessário definir apenas o mínimo e máximo de réplicas para que a Plataforma se ajuste conforme necessário.
Transform e Load
Pipeline despachante de páginas
A responsabilidade desse pipeline tipo Scheduler de uso comum é verificar se um fluxo está liberado para envio e publicar o ID de cada página do fluxo para o pipeline de envio.
Enquanto as extrações acontecem, ele verifica as páginas já extraídas de cada integração, suas dependências em relação a outras integrações e publica o ID da página para o pipeline final (de envio) que coletará os dados da página para enviar ao destino.
Pipeline para envio de páginas
Esse pipeline do tipo Event é o responsável por capturar cada página da integração que está no Object Store temporário, fazer a transformação, e enviar ao destino. Assim como o pipeline de extração de páginas, esse pipeline deve ser específico para cada integração.
Outros componentes importantes
Além da estrutura de pipelines citada acima, alguns componentes são importantes para controle e funcionamento do modelo ETL:
Lista de integrações (fluxos): Pode ser uma Global configurada com um JSON ou uma tabela no banco de dados do cliente. Para este artigo, consideramos uma Global como mostrado no exemplo abaixo:
{
"integrations": [
{
"pageSize": "2000",
"nameIntegration": "Customer",
"query": "SELECT CUSTOMER_ID, NAME ,CNPJ FROM CUSTOMER WHERE CNPJ IS NOT NULL",
"eventExtract": "evt-extract-customer",
"eventSend": "evt-load-customer",
"tableExtract": "CUSTOMER",
"dependencyList": [
]
},
{
"pageSize": "2000",
"nameIntegration": "Product",
"query": "SELECT PRODUCT_ID, NAME, GROUP FROM PRODUCT WHERE GROUP IN(10,30,40)",
"eventExtract": "evt-extract-product",
"eventSend": "evt-load-product",
"tableExtract": "PRODUCT",
"dependencyList": [
]
},
{
"pageSize": "5000",
"nameIntegration": "Order",
"query": "SELECT ORDER_ID, CUSTOMER_ID, ORDER_VALUE, ORDER_DATE FROM ORDER WHERE ORDER_DATE = SYSDATE",
"eventExtract": "evt-extract-order",
"eventSend": "evt-load-order",
"tableExtract": "ORDER",
"dependencyList": [
"Customer",
"Product"
]
}
]
}
Para cadastrar como global, é necessário adicionar o “Escape” de caracter especial, como abaixo:
{\"integrations\":[{\"pageSize\":\"2000\",\"nameIntegration\":\"Customer\",\"query\":\"SELECT CUSTOMER_ID, NAME ,CNPJ FROM CUSTOMER WHERE CNPJ IS NOT NULL\",\"eventExtract\":\"evt-extract-customer\",\"eventSend\":\"evt-load-customer\",\"tableExtract\":\"CUSTOMER\",\"dependencyList\":[]},{\"pageSize\":\"2000\",\"nameIntegration\":\"Product\",\"query\":\"SELECT PRODUCT_ID, NAME, GROUP FROM PRODUCT WHERE GROUP IN(10,30,40)\",\"eventExtract\":\"evt-extract-product\",\"eventSend\":\"evt-load-product\",\"tableExtract\":\"PRODUCT\",\"dependencyList\":[]},{\"pageSize\":\"5000\",\"nameIntegration\":\"Order\",\"query\":\"SELECT ORDER_ID, CUSTOMER_ID, ORDER_VALUE, ORDER_DATE FROM ORDER WHERE ORDER_DATE = SYSDATE\",\"eventExtract\":\"evt-extract-order\",\"eventSend\":\"evt-load-order\",\"tableExtract\":\"ORDER\",\"dependencyList\":[\"Customer\",\"Product\"]}]}
Controle de páginas para extração: É necessário um Object Store para armazenar cada “metadata de página” para extração.
Object Store temporário: Um Object Store específico para cada fluxo, onde suas páginas extraídas são armazenadas para posterior transformação e envio.
Mecanismos de uso comum: Mecanismos de uso comum já existentes, como reprocessamento ou notificação. Podem dar suporte ao processo de ETL.
Controle de integração: Embora não representado no diagrama acima, é altamente recomendado manter um Object Store de controle de integrações para manter o status diário de cada integração configurada na Lista de integrações.
Flow specs do Pipeline
Flow Spec: sch-generate-pages
{
"meta": {
"36bcada5-b65e-450f-92b3-b2486d0a1520": {
"position": {
"x": 200,
"y": 0
}
},
"15ae8ee4-49ad-4b46-87f3-946fd38dcf6b": {
"position": {
"x": 400,
"y": 0
}
},
"d4e248bf-8348-449f-9717-cf25c6f773e6": {
"position": {
"x": 600,
"y": 0
}
},
"e146bdb0-0773-49b4-a53a-022c7ce3d5f7": {
"position": {
"x": 800,
"y": 0
}
},
"85d997d9-66e4-48c7-8500-885b0e32b4c7": {
"position": {
"x": 0,
"y": 150
}
}
},
"flowSpec": {
"disconnected-root:71c6e6e6-d1b5-4ce4-8eb5-f14158f3ab45": [
{
"type": "connector",
"name": "object-store-connector",
"accountLabel": "dgb-internal-object-store-account",
"stepName": "delete pages-for-extraction 3 days +",
"params": {
"operation": "FIND_BY_QUERY",
"objectStore": "pages-for-extraction",
"query": "{\r\n \"date\": { $lt: {{ FORMATDATE(SUMDATE(NOW(), \"DAY\",-3),\"timestamp\",\"dd-MM-yyyy\") }}}\r\n}",
"limit": "0",
"skip": "0",
"sort": "",
"unique": true,
"isolated": false,
"failOnError": false
},
"id": "36bcada5-b65e-450f-92b3-b2486d0a1520"
},
{
"name": "json-generator-connector",
"type": "connector",
"stepName": "get global integration",
"params": {
"json": "{\n \"integrations\": [\n {\n \"pageSize\": \"2000\",\n \"nameIntegration\": \"Customer\",\n \"query\": \"SELECT id, first_name, last_name, gender, ip_address, email FROM pagination_course WHERE first_name IS NOT NULL\",\n \"eventExtract\": \"evt-extract-integration\",\n \"eventSend\": \"evt-load-integration\",\n \"tableExtract\": \"pagination_course\",\n \"databaseExtract\": \"digibee_database\",\n \"dependencyList\": [\n \n ]\n },\n {\n \"pageSize\": \"2000\",\n \"nameIntegration\": \"Product\",\n \"query\": \"SELECT PRODUCT_ID, NAME, GROUP FROM PRODUCT WHERE GROUP IN(10,30,40)\",\n \"eventExtract\": \"evt-extract-product\",\n \"eventSend\": \"evt-load-product\",\n \"tableExtract\": \"PRODUCT\",\n \"databaseExtract\": \"digibee_database\",\n \"dependencyList\": [\n \n ]\n },\n {\n \"pageSize\": \"5000\",\n \"nameIntegration\": \"Order\",\n \"query\": \"SELECT ORDER_ID, CUSTOMER_ID, ORDER_VALUE, ORDER_DATE FROM ORDER WHERE ORDER_DATE = SYSDATE\",\n \"eventExtract\": \"evt-extract-order\",\n \"eventSend\": \"evt-load-order\",\n \"tableExtract\": \"ORDER\",\n \"databaseExtract\": \"digibee_database\",\n \"dependencyList\": [\n \"Customer\",\n \"Product\"\n ]\n }\n ]\n }\n",
"failOnError": false
},
"id": "15ae8ee4-49ad-4b46-87f3-946fd38dcf6b"
},
{
"type": "connector",
"name": "for-each-connector",
"stepName": "For Each integration-list original",
"params": {
"jsonPath": "$.integrations[0]",
"itemIdentifier": "",
"parallel": false,
"failOnError": false,
"onProcess": "d4e248bf-8348-449f-9717-cf25c6f773e6-onProcessTrack",
"onException": "d4e248bf-8348-449f-9717-cf25c6f773e6-onExceptionTrack"
},
"id": "d4e248bf-8348-449f-9717-cf25c6f773e6"
},
{
"type": "session-management",
"stepName": "get resultList",
"operation": "GET",
"sessionType": "LOCAL",
"scoped": false,
"fields": [
"resultList"
],
"id": "e146bdb0-0773-49b4-a53a-022c7ce3d5f7"
}
],
"d4e248bf-8348-449f-9717-cf25c6f773e6-onProcessTrack": [
{
"name": "json-generator-connector",
"type": "connector",
"stepName": "generate temp object",
"params": {
"json": "{\n \"temp\": {{ message.$ }}\n}",
"failOnError": false
},
"id": "c9a5fd1c-ae74-46e5-99f9-a5db5a2f5fe9"
},
{
"type": "session-management",
"stepName": "put temp",
"operation": "PUT",
"sessionType": "LOCAL",
"scoped": true,
"fields": [
"temp"
],
"id": "98537954-2363-4631-b6c5-dd4b72cba7d6"
},
{
"type": "connector",
"name": "db-connector-v2",
"accountLabel": "mysql-2",
"stepName": "Count Pagination",
"params": {
"url": "jdbc:mysql://35.223.175.97/db-training",
"sql": "SELECT COUNT(*) AS TOTAL_ROWS FROM {{ message.temp.tableExtract }}",
"failOnError": false,
"keepConnection": false,
"useDynamicAccount": false,
"accountType": "basic",
"operation": "QUERY",
"batch": false,
"blobAsFile": false,
"clobAsFile": false,
"typeProperties": "[]",
"connectionProperties": "{\n \"cancelQueryTimeout\": 5,\n \"queryTimeout\": 15,\n \"lockTimeout\": 15000,\n \"socketTimeout\": 15000\n}",
"dbPoolByActualConsumers": false,
"exclusiveDbPool": false,
"customDbPool": false,
"columnFromLabel": false,
"connectionTestQuery": "",
"rawSql": true,
"dbPoolMaxLifeTime": "1800000",
"dbPoolMinimumIdle": "10",
"dbPoolIdleTimeout": "600000"
},
"id": "e8610eab-dcc5-4211-8703-31efe0e8620d",
"dynamicAccountNames": {},
"accountLabels": {}
},
{
"type": "connector",
"name": "log-connector",
"stepName": "Log - Count pagination",
"params": {
"logLevel": "INFO",
"message": "Count pagination {{ message.$ }}"
},
"id": "7a1b45ff-914b-4c71-b506-a47af8302fd0"
},
{
"type": "session-management",
"stepName": "get temp",
"operation": "GET",
"sessionType": "LOCAL",
"scoped": true,
"fields": [
"temp"
],
"id": "3aad7d94-daa2-401a-a7f7-846300fdc55d"
},
{
"name": "json-generator-connector",
"type": "connector",
"stepName": "calc pages",
"params": {
"json": "{\n \"totalPages\": {{ TOINT(SUM(TOINT(DIVIDE(TOINT(message.data[0].TOTAL_ROWS),TOINT(message.temp.pageSize))),1)) }},\n \"totalRecords\": {{ message.data[0].TOTAL_ROWS }}\n}",
"failOnError": false
},
"id": "1ef5cb1c-fc0e-4f15-a6ff-9340dbd2776a"
},
{
"type": "session-management",
"stepName": "put totalPages/totalRecords",
"operation": "PUT",
"sessionType": "LOCAL",
"scoped": true,
"fields": [
"totalPages",
"totalRecords"
],
"id": "dee7d7c7-e87e-4e10-a4e5-c187772ac320"
},
{
"type": "connector",
"name": "do-while-connector",
"stepName": "Send pagination",
"params": {
"iteration": 999999,
"timeout": 660000,
"showLoopIndex": true,
"interruptLoopOnError": true,
"onProcess": "ac3b3612-5c7f-4fde-a20f-6d0845849e69-onProcessTrack",
"onException": "ac3b3612-5c7f-4fde-a20f-6d0845849e69-onExceptionTrack"
},
"id": "ac3b3612-5c7f-4fde-a20f-6d0845849e69"
},
{
"type": "session-management",
"stepName": "delete All",
"operation": "DELETE",
"sessionType": "LOCAL",
"scoped": true,
"fields": [
"totalPages",
"totalRecords"
],
"id": "014dc2f2-041a-478f-9b65-071efce24ae0"
},
{
"type": "session-management",
"stepName": "get temp",
"operation": "GET",
"sessionType": "LOCAL",
"scoped": true,
"fields": [
"temp"
],
"id": "5e65c5e4-e9f5-47b2-984f-990118878e7f"
},
{
"type": "connector",
"name": "object-store-connector",
"accountLabel": "dgb-internal-object-store-account",
"stepName": "Insert integration-control",
"params": {
"operation": "INSERT",
"objectStore": "integration-control",
"objectId": "{{ UUID() }}",
"document": "{\n \"integrationName\": {{ message.temp.nameIntegration }},\n \"manualMode\": \"disable\",\n \"integrationDate\": {{ FORMATDATE(NOW(), \"timestamp\", \"dd/MM/yyyy\") }},\n \"step\": \"pagination\",\n \"status\": \"done\"\n}",
"unique": true,
"isolated": false,
"upsert": false,
"failOnError": false
},
"id": "c9798498-349d-41fd-9137-4616c023002e"
},
{
"type": "connector",
"name": "log-connector",
"stepName": "Log - Insert integration-control",
"params": {
"logLevel": "INFO",
"message": "Insert integration-control {{ message.$ }}"
},
"id": "45651031-a34a-4ce8-9be6-9d7a9c1117b9"
},
{
"type": "session-management",
"stepName": "delete temp",
"operation": "GET",
"sessionType": "LOCAL",
"scoped": true,
"fields": [
"temp"
],
"id": "97a6f0fe-0e64-4d2a-b4ac-4159f6687397"
},
{
"name": "json-generator-connector",
"type": "connector",
"stepName": "set success",
"params": {
"json": "{\n \"success\": true\n}",
"failOnError": false
},
"id": "68a9c63f-baa9-42c8-a9d4-ee6d07a441eb"
}
],
"ac3b3612-5c7f-4fde-a20f-6d0845849e69-onProcessTrack": [
{
"type": "session-management",
"stepName": "Get All",
"operation": "GET",
"sessionType": "LOCAL",
"scoped": true,
"fields": [
"end",
"start",
"totalPages",
"totalRecords",
"temp"
],
"id": "9f15ccb7-b9bd-4f76-919b-aa152eb6f718"
},
{
"name": "json-generator-connector",
"type": "connector",
"stepName": "generate loopIndex",
"params": {
"json": "\n{\n \"loopIndex\": {{ message.loopIndex }},\n \"totalPages\": {{ message.totalPages }},\n \"start\": {{ TOINT(IF(EQUALTO(message.loopIndex,1),0,message.end)) }},\n \"end\": {{ TOINT(IF(EQUALTO(message.loopIndex,1),TOINT(message.temp.pageSize),SUM(message.end,TOINT(message.temp.pageSize)))) }},\n \"id\": {{ UUID() }}\n}",
"failOnError": false
},
"id": "79dc1543-94f2-48c9-af31-da5e8afdd6eb"
},
{
"type": "connector",
"name": "log-connector",
"stepName": "Log - generate loopIndex",
"params": {
"logLevel": "INFO",
"message": "generate loopIndex {{ message.$ }}"
},
"id": "741e5db0-f17a-4520-9bf5-3523e23b1b11"
},
{
"type": "session-management",
"stepName": "put end/start/loopIndex",
"operation": "PUT",
"sessionType": "LOCAL",
"scoped": true,
"fields": [
"end",
"start",
"loopIndex",
"id",
"totalPages"
],
"id": "21315db7-6125-4eb7-9026-5d8581c84765"
},
{
"type": "session-management",
"stepName": "get temp",
"operation": "GET",
"sessionType": "LOCAL",
"scoped": true,
"fields": [
"temp",
"loopIndex",
"end",
"start",
"totalPages",
"id"
],
"id": "6831f1a0-1d46-4a8b-8ec0-d83b5d94a637"
},
{
"type": "connector",
"name": "object-store-connector",
"accountLabel": "dgb-internal-object-store-account",
"stepName": "pages-for-extraction",
"params": {
"operation": "INSERT",
"objectStore": "pages-for-extraction",
"objectId": "{{ message.id }}",
"document": "{\n \"page\": {{ message.loopIndex }},\n \"size\": {{ message.temp.pageSize }},\n \"query\": {{ message.temp.query }},\n \"table\": {{ message.temp.tableExtract }},\n \"databaseExtract\": {{ message.temp.databaseExtract }},\n \"publisher\":{{ metadata.pipeline.name }},\n \"integration\":{{ message.temp.nameIntegration }},\n \"dependencyList\": {{ message.temp.dependencyList }},\n \"start\": {{ message.start }},\n \"end\": {{ message.end }},\n \"event\": {{ message.temp.eventSend }},\n \"finished\": {{ IF(EQUALTO(message.loopIndex,message.totalPages),true, false) }} \n}",
"unique": true,
"isolated": false,
"failOnError": false
},
"id": "9675ab2c-f739-4409-be57-9af0ee7b6b35"
},
{
"type": "session-management",
"stepName": "get temp",
"operation": "GET",
"sessionType": "LOCAL",
"scoped": true,
"fields": [
"temp",
"loopIndex",
"end",
"start",
"totalPages",
"id"
],
"id": "d2895a35-5bd0-47ca-aaca-0cfe1066e2cf"
},
{
"type": "connector",
"stepName": "EVT",
"name": "event-publisher-connector",
"params": {
"eventName": "{{ message.temp.eventExtract }}",
"body": "{\n \"page\": {{ message.loopIndex }},\n \"size\": {{ message.temp.pageSize }},\n \"select\": {{ message.temp.query }},\n \"table\": {{ message.temp.tableExtract }},\n \"publisher\":{{ metadata.pipeline.name }},\n \"integration\":{{ message.temp.nameIntegration }},\n \"start\": {{ message.start }},\n \"end\": {{ message.end }},\n \"_oId\": {{ message.id }},\n \"finished\": {{ IF(EQUALTO(message.totalPages, message.loopIndex), true, false) }}\n}",
"showSendEventLog": false,
"stopOnError": false
},
"id": "c269cda9-bd7b-45ec-986d-b236cc7c12cc"
},
{
"name": "json-generator-connector",
"type": "connector",
"stepName": "loopBreak",
"params": {
"json": "{\n \"loopBreak\": {{ IF(EQUALTO(message.loopIndex,message.totalPages),true, false) }} \n}",
"failOnError": false
},
"id": "b81bcce6-d08e-4217-8800-92792b5292eb"
},
{
"type": "session-management",
"stepName": "delete id",
"operation": "GET",
"sessionType": "LOCAL",
"scoped": true,
"fields": [
"id"
],
"id": "d90853d7-fae1-4eec-9ff6-aa6ebfeed627"
}
],
"ac3b3612-5c7f-4fde-a20f-6d0845849e69-onExceptionTrack": [
{
"type": "connector",
"name": "log-connector",
"stepName": "Log - Send pagination",
"params": {
"logLevel": "INFO",
"message": "Send pagination {{ message.$ }}"
},
"id": "8e12529f-63ad-44b2-a4c6-3557236e6a09"
},
{
"type": "connector",
"name": "throw-error-connector",
"stepName": "Throw Error",
"params": {
"errorMessage": "Error occurred.",
"errorCode": 500,
"customErrorEnabled": false
},
"id": "8decd80e-a35e-4bd3-b5f1-c550a32737f2"
}
],
"d4e248bf-8348-449f-9717-cf25c6f773e6-onExceptionTrack": [
{
"type": "session-management",
"stepName": "get temp",
"operation": "GET",
"sessionType": "LOCAL",
"scoped": true,
"fields": [
"temp"
],
"id": "34096419-f972-4e0d-910d-9a915cafd2b5"
},
{
"type": "connector",
"name": "log-connector",
"stepName": "Log - For Each integration-list (onException)",
"params": {
"logLevel": "ERROR",
"message": "For Each (onException) | Integracao: {{ message.temp.nameIntegration }} | Erro: {{ message.error }} |"
},
"id": "f6123d53-01fc-49ca-a7fb-53d0c8c5a8c6"
},
{
"type": "capsule",
"name": "capsule-v1-digibee-digibee-tools-add-item-to-session-array-1.0",
"capsuleCollection": "digibee-tools",
"capsuleCollectionVersion": 1,
"capsule": "add-item-to-session-array-1.0",
"capsuleVersionMajor": 1,
"capsuleVersionMinor": 0,
"stepName": "Add item to session (array)",
"params": {
"obj": "{{ message.$ }}"
},
"id": "82076e9b-9d8f-4aea-b665-27f922e758d5"
}
],
"disconnected-root:91352591-cf0c-44ba-b7b9-7d2611a41ef1": [
{
"name": "json-generator-connector",
"type": "connector",
"stepName": "LEIA-ME",
"params": {
"json": "Conteudo que deve ser salvo na global é em formato de string, contendo os escapes:\n\"{\\\"integrations\\\":[{\\\"pageSize\\\":\\\"2000\\\",\\\"nameIntegration\\\":\\\"Customer\\\",\\\"query\\\":\\\"SELECT CUSTOMER_ID, NAME ,CNPJ FROM CUSTOMER WHERE CNPJ IS NOT NULL\\\",\\\"eventExtract\\\":\\\"evt-extract-customer\\\",\\\"eventSend\\\":\\\"evt-load-customer\\\",\\\"tableExtract\\\":\\\"CUSTOMER\\\",\\\"dependencyList\\\":[]},{\\\"pageSize\\\":\\\"2000\\\",\\\"nameIntegration\\\":\\\"Product\\\",\\\"query\\\":\\\"SELECT PRODUCT_ID, NAME, GROUP FROM PRODUCT WHERE GROUP IN(10,30,40)\\\",\\\"eventExtract\\\":\\\"evt-extract-product\\\",\\\"eventSend\\\":\\\"evt-load-product\\\",\\\"tableExtract\\\":\\\"PRODUCT\\\",\\\"dependencyList\\\":[]},{\\\"pageSize\\\":\\\"5000\\\",\\\"nameIntegration\\\":\\\"Order\\\",\\\"query\\\":\\\"SELECT ORDER_ID, CUSTOMER_ID, ORDER_VALUE, ORDER_DATE FROM ORDER WHERE ORDER_DATE = SYSDATE\\\",\\\"eventExtract\\\":\\\"evt-extract-order\\\",\\\"eventSend\\\":\\\"evt-load-order\\\",\\\"tableExtract\\\":\\\"ORDER\\\",\\\"dependencyList\\\":[\\\"Customer\\\",\\\"Product\\\"]}]}\"\n",
"failOnError": false
},
"id": "85d997d9-66e4-48c7-8500-885b0e32b4c7"
}
]
}
}
Flow Spec: evt-extract-integration
{
"meta": {
"9f698689-3164-42c9-b44a-68bbb10eb63e": {
"position": {
"x": 200,
"y": 150
}
},
"c4af166e-523e-46f6-baa7-b4a7712a1b88": {
"position": {
"x": 400,
"y": 150
}
},
"7134a571-6561-49ed-b9bc-88b91d85c984": {
"position": {
"x": 600,
"y": 150
}
},
"44833968-110a-44d6-915c-39ec0a120333": {
"position": {
"x": 800,
"y": 150
}
},
"57a450bb-deb1-4c69-842f-3a29d76e037f": {
"position": {
"x": 1250,
"y": 0
}
},
"72c74518-0672-4da3-9b49-3aee578449cd": {
"position": {
"x": 1450,
"y": 0
}
},
"b760ea98-4c81-4264-8858-b092debd60c8": {
"position": {
"x": 1650,
"y": 0
}
},
"a867448d-4b5b-4a0d-a0bc-1e783811f5be": {
"position": {
"x": 1250,
"y": 150
}
},
"8f34c2c4-4336-45ec-84db-2e20073b8562": {
"position": {
"x": 1450,
"y": 150
}
},
"25a396d5-a98f-4bd2-b0b9-801ce6ded152": {
"position": {
"x": 1650,
"y": 150
}
},
"744e184e-17d0-4592-8e31-219147e18ce2": {
"position": {
"x": 1850,
"y": 150
}
},
"579eddc4-942c-44cb-a22c-3b78f511b5fe": {
"position": {
"x": 1250,
"y": 300
}
},
"5f12d793-2268-44f1-a2a8-b439490975b5": {
"position": {
"x": 1450,
"y": 300
}
},
"bc33a9e7-d959-4e2b-b0d8-48e2c653de17": {
"position": {
"x": 1650,
"y": 300
}
},
"78ab5e22-8c14-458f-b037-db94fe32edf9": {
"position": {
"x": 1850,
"y": 300
}
},
"57268aa7-eb74-44da-bb76-6a37e0ec3252": {
"position": {
"x": 2300,
"y": 225
}
},
"3b7a8dc6-ac00-4119-83c5-00bc277110d3": {
"position": {
"x": 2500,
"y": 225
}
},
"c051bb47-5954-4a83-808f-ee02b11a4248": {
"position": {
"x": 2700,
"y": 225
}
},
"7cbfdc07-ba4e-45a1-bdaa-db2a006611db": {
"position": {
"x": 2900,
"y": 225
}
},
"40f0c2f1-213e-4dee-8a25-fc359f94aee2": {
"position": {
"x": 3100,
"y": 225
}
},
"5ac13705-b797-4289-abb4-39ca26c289e0": {
"position": {
"x": 3300,
"y": 225
}
},
"48e9ab7b-9855-45dd-9a51-26334f8caee7": {
"position": {
"x": 2300,
"y": 375
}
},
"816e7b04-ff1c-4494-8b1b-91a4e387cfd1": {
"position": {
"x": 2500,
"y": 375
}
},
"2831b857-7c4b-45d5-9c6a-95cc91bd59cd": {
"position": {
"x": 2700,
"y": 375
}
}
},
"flowSpec": {
"disconnected-root:312fed22-a1c9-4ca6-9203-9a29b766d18e": [
{
"name": "json-generator-connector",
"type": "connector",
"stepName": "payload",
"params": {
"json": "{\n \"payload\": {{ DEFAULT( message.payload,message.$) }}\n}",
"failOnError": false
},
"id": "9f698689-3164-42c9-b44a-68bbb10eb63e"
},
{
"type": "session-management",
"stepName": "put payload",
"operation": "PUT",
"sessionType": "LOCAL",
"scoped": false,
"fields": [
"payload"
],
"id": "c4af166e-523e-46f6-baa7-b4a7712a1b88"
},
{
"type": "connector",
"name": "db-connector-v2",
"accountLabel": "mysql-2",
"stepName": "Select",
"params": {
"url": "jdbc:mysql://35.223.175.97/db-training",
"sql": "{{ CONCAT(message.payload.select, \" ORDER BY id LIMIT \", message.payload.size, \" OFFSET \", TOINT(MULTIPLY(SUBTRACT(message.payload.page, 1), TOINT(message.payload.size)))) }}",
"failOnError": false,
"keepConnection": false,
"useDynamicAccount": false,
"accountType": "basic",
"operation": "QUERY",
"batch": false,
"blobAsFile": false,
"clobAsFile": false,
"typeProperties": "[]",
"connectionProperties": "{\n \"cancelQueryTimeout\": 5,\n \"queryTimeout\": 15,\n \"lockTimeout\": 15000,\n \"socketTimeout\": 15000\n}",
"dbPoolByActualConsumers": false,
"exclusiveDbPool": false,
"customDbPool": false,
"columnFromLabel": false,
"connectionTestQuery": "",
"rawSql": true,
"rollbackOnError": false,
"items": "{}",
"charset": "UTF-8"
},
"id": "7134a571-6561-49ed-b9bc-88b91d85c984",
"dynamicAccountNames": {},
"accountLabels": {},
"__documentation__": "Os campos start e end não estão sendo usados pois são para uso de outros tipos de banco, como no caso do Oracle, onde a forma de montar a query muda"
},
{
"type": "choice",
"stepName": "Choice",
"when": [
{
"target": "Fail (2)",
"jsonPath": "$.[?(@.success == false)]"
},
{
"target": "Registers not Found (2)",
"jsonPath": "$.[?(@.rowCount == 0)]"
}
],
"otherwise": "Success (2)",
"id": "44833968-110a-44d6-915c-39ec0a120333"
}
],
"Fail (2)": [
{
"type": "connector",
"name": "log-connector",
"stepName": "DB Error",
"params": {
"logLevel": "ERROR",
"message": "DB Error: {{ message.$ }}"
},
"id": "57a450bb-deb1-4c69-842f-3a29d76e037f"
},
{
"type": "connector",
"name": "block-execution-connector",
"stepName": "Reprocessamento",
"params": {
"onProcess": "72c74518-0672-4da3-9b49-3aee578449cd-onProcessTrack",
"onException": "72c74518-0672-4da3-9b49-3aee578449cd-onExceptionTrack"
},
"id": "72c74518-0672-4da3-9b49-3aee578449cd",
"description": "Document here what this block is supposed to do."
},
{
"type": "connector",
"name": "throw-error-connector",
"stepName": "Throw Error",
"params": {
"customErrorEnabled": true,
"customError": "{{ message.$ }}"
},
"id": "b760ea98-4c81-4264-8858-b092debd60c8"
}
],
"72c74518-0672-4da3-9b49-3aee578449cd-onProcessTrack": [
{
"type": "connector",
"stepName": "Event Publisher - Reprocessamento",
"name": "event-publisher-connector",
"params": {
"eventName": "reprocessamento-exemplo",
"body": "{}",
"showSendEventLog": false,
"stopOnError": false
},
"id": "405d60da-15af-4e8c-a75a-343a840f72f8"
}
],
"Registers not Found (2)": [
{
"type": "connector",
"name": "log-connector",
"stepName": "Registers Not Found",
"params": {
"logLevel": "INFO",
"message": "Registers Not Found"
},
"id": "a867448d-4b5b-4a0d-a0bc-1e783811f5be"
},
{
"type": "session-management",
"stepName": "get payload",
"operation": "GET",
"sessionType": "LOCAL",
"scoped": false,
"fields": [
"payload"
],
"id": "8f34c2c4-4336-45ec-84db-2e20073b8562"
},
{
"type": "connector",
"name": "object-store-connector",
"accountLabel": "dgb-internal-object-store-account",
"stepName": "pages-for-extraction",
"params": {
"operation": "DELETE",
"objectStore": "pages-for-extraction",
"objectId": "{{ message.payload._oId }}",
"unique": true,
"isolated": false,
"failOnError": false
},
"id": "25a396d5-a98f-4bd2-b0b9-801ce6ded152"
},
{
"name": "json-generator-connector",
"type": "connector",
"stepName": "Final",
"params": {
"json": "{\r\n \"message\": \"Registers not found\"\r\n}",
"failOnError": false
},
"id": "744e184e-17d0-4592-8e31-219147e18ce2"
}
],
"Success (2)": [
{
"type": "connector",
"name": "log-connector",
"stepName": "Success DB",
"params": {
"logLevel": "INFO",
"message": "Success DB"
},
"id": "579eddc4-942c-44cb-a22c-3b78f511b5fe"
},
{
"type": "session-management",
"stepName": "get payload",
"operation": "GET",
"sessionType": "LOCAL",
"scoped": false,
"fields": [
"payload"
],
"id": "5f12d793-2268-44f1-a2a8-b439490975b5"
},
{
"type": "connector",
"name": "object-store-connector",
"accountLabel": "dgb-internal-object-store-account",
"stepName": "Insert pagination",
"params": {
"operation": "INSERT",
"objectStore": "temporary-database",
"objectId": "{{ message.payload._oId }}",
"document": "{\n \"pagination\": {{ message.data }}\n}",
"unique": true,
"isolated": false,
"failOnError": true
},
"id": "bc33a9e7-d959-4e2b-b0d8-48e2c653de17"
},
{
"type": "choice",
"stepName": "Choice",
"when": [
{
"target": "Success ObjectStore (2)",
"jsonPath": "$.[?(@.updateCount > 0)]"
}
],
"otherwise": "Fail ObjectStore (2)",
"id": "78ab5e22-8c14-458f-b037-db94fe32edf9"
}
],
"Success ObjectStore (2)": [
{
"type": "connector",
"name": "log-connector",
"stepName": "Success Insert Pagination",
"params": {
"logLevel": "INFO",
"message": "Success Insert Pagination"
},
"id": "57268aa7-eb74-44da-bb76-6a37e0ec3252"
},
{
"type": "session-management",
"stepName": "get payload",
"operation": "GET",
"sessionType": "LOCAL",
"scoped": false,
"fields": [
"payload"
],
"id": "3b7a8dc6-ac00-4119-83c5-00bc277110d3"
},
{
"type": "connector",
"name": "object-store-connector",
"accountLabel": "dgb-internal-object-store-account",
"stepName": "update integration-control",
"params": {
"operation": "UPDATE_BY_QUERY",
"objectStore": "integration-control",
"query": "{\n \"integrationName\": {$eq: {{ message.payload.integration }} },\n \"integrationDate\": {$eq: {{ FORMATDATE(NOW(), \"timestamp\", \"dd/MM/yyyy\") }}}\n}",
"document": "{\n $set: {\n \"integrationName\": {{ message.payload.integration }},\n \"integrationDate\":{{ FORMATDATE(NOW(), \"timestamp\", \"dd/MM/yyyy\") }},\n \"step\":\"extraction\",\n \t\"status\": {{ IF( message.payload.finished, \"finished\", \"progress\" ) }}\n }\n}\n",
"unique": true,
"isolated": false,
"upsert": true,
"failOnError": false
},
"id": "c051bb47-5954-4a83-808f-ee02b11a4248"
},
{
"type": "session-management",
"stepName": "get payload",
"operation": "GET",
"sessionType": "LOCAL",
"scoped": false,
"fields": [
"payload"
],
"id": "7cbfdc07-ba4e-45a1-bdaa-db2a006611db"
},
{
"type": "connector",
"name": "object-store-connector",
"accountLabel": "dgb-internal-object-store-account",
"stepName": "pages-for-extraction",
"params": {
"operation": "UPDATE",
"objectStore": "pages-for-extraction",
"objectId": "{{ message.payload._oId }}",
"document": "{\n $set:{\n \"pageStatus\": \"done\"\n }\n}",
"unique": true,
"isolated": false,
"upsert": false,
"failOnError": false
},
"id": "40f0c2f1-213e-4dee-8a25-fc359f94aee2"
},
{
"name": "json-generator-connector",
"type": "connector",
"stepName": "Final",
"params": {
"json": "{\r\n \"success\": true\r\n}",
"failOnError": false
},
"id": "5ac13705-b797-4289-abb4-39ca26c289e0"
}
],
"Fail ObjectStore (2)": [
{
"type": "connector",
"name": "log-connector",
"stepName": "Fail Insert Pagination",
"params": {
"logLevel": "ERROR",
"message": "Fail Insert Pagination {{ message.$ }}"
},
"id": "48e9ab7b-9855-45dd-9a51-26334f8caee7"
},
{
"type": "connector",
"name": "block-execution-connector",
"stepName": "Reprocessamento",
"params": {
"onProcess": "816e7b04-ff1c-4494-8b1b-91a4e387cfd1-onProcessTrack",
"onException": "816e7b04-ff1c-4494-8b1b-91a4e387cfd1-onExceptionTrack"
},
"id": "816e7b04-ff1c-4494-8b1b-91a4e387cfd1",
"description": "Document here what this block is supposed to do."
},
{
"type": "connector",
"name": "throw-error-connector",
"stepName": "Throw Error",
"params": {
"customErrorEnabled": true,
"customError": "{\n \"success\": false\n}"
},
"id": "2831b857-7c4b-45d5-9c6a-95cc91bd59cd"
}
],
"816e7b04-ff1c-4494-8b1b-91a4e387cfd1-onProcessTrack": [
{
"type": "connector",
"stepName": "Event Publisher - Reprocessamento",
"name": "event-publisher-connector",
"params": {
"eventName": "reprocessamento-exemplo",
"body": "{}",
"showSendEventLog": false,
"stopOnError": false
},
"id": "6adcd3c3-eeae-43f0-822f-023f0dd2b1b4"
}
]
}
}
Flow Spec: sch-dispatcher-pages
{
"meta": {
"e221a685-6195-45fa-a662-e4dd21157943": {
"position": {
"x": 200,
"y": 75
}
},
"ccccc951-5e7e-46fc-be5f-b008d7f7f615": {
"position": {
"x": 400,
"y": 75
}
},
"87b8a706-dfe8-4c09-9ff4-8eeef1210edb": {
"position": {
"x": 600,
"y": 75
}
},
"060480d5-7b25-4642-9d01-0b46a2090e9d": {
"position": {
"x": 800,
"y": 75
}
},
"ee50fdfc-b8bd-417f-ba9f-9171b9bff6f8": {
"position": {
"x": 1000,
"y": 75
}
},
"57b6367e-e45c-4bc6-b5a5-4380d2afefc9": {
"position": {
"x": 1200,
"y": 75
}
},
"bb70603c-ba06-468f-bd25-7aba77e9bfd3": {
"position": {
"x": 1650,
"y": 0
}
},
"4141fc4d-e6bd-4394-ae89-234d5f79fcc1": {
"position": {
"x": 1850,
"y": 0
}
},
"a63c03bc-adf7-4d88-87c6-ffce0bf97590": {
"position": {
"x": 2050,
"y": 0
}
},
"58e7fc63-ca29-49e9-a474-8c0f0692f7fc": {
"position": {
"x": 2250,
"y": 0
}
},
"dc36afa3-ab3d-4ba4-89cb-1c32a0067077": {
"position": {
"x": 2450,
"y": 0
}
},
"5e702895-12e9-4202-bdd2-7318bfd25cab": {
"position": {
"x": 1650,
"y": 150
}
}
},
"flowSpec": {
"disconnected-root:6699dfd6-0425-4887-b4be-f633ef7064fd": [
{
"name": "json-generator-connector",
"type": "connector",
"stepName": "get global integration-list",
"params": {
"json": "{\n \"integrations\": [\n {\n \"pageSize\": \"2000\",\n \"nameIntegration\": \"Customer\",\n \"query\": \"SELECT id, first_name, last_name, gender, ip_address, email FROM pagination_course WHERE first_name IS NOT NULL\",\n \"eventExtract\": \"evt-extract-integration\",\n \"eventSend\": \"evt-load-integration\",\n \"tableExtract\": \"pagination_course\",\n \"databaseExtract\": \"digibee_database\",\n \"dependencyList\": [\n \n ]\n },\n {\n \"pageSize\": \"2000\",\n \"nameIntegration\": \"Product\",\n \"query\": \"SELECT PRODUCT_ID, NAME, GROUP FROM PRODUCT WHERE GROUP IN(10,30,40)\",\n \"eventExtract\": \"evt-extract-product\",\n \"eventSend\": \"evt-load-product\",\n \"tableExtract\": \"PRODUCT\",\n \"databaseExtract\": \"digibee_database\",\n \"dependencyList\": [\n \n ]\n },\n {\n \"pageSize\": \"5000\",\n \"nameIntegration\": \"Order\",\n \"query\": \"SELECT ORDER_ID, CUSTOMER_ID, ORDER_VALUE, ORDER_DATE FROM ORDER WHERE ORDER_DATE = SYSDATE\",\n \"eventExtract\": \"evt-extract-order\",\n \"eventSend\": \"evt-load-order\",\n \"tableExtract\": \"ORDER\",\n \"databaseExtract\": \"digibee_database\",\n \"dependencyList\": [\n \"Customer\",\n \"Product\"\n ]\n }\n ]\n }\n",
"failOnError": false
},
"id": "e221a685-6195-45fa-a662-e4dd21157943"
},
{
"type": "connector",
"name": "for-each-connector",
"stepName": "For Each integration-list",
"params": {
"jsonPath": "$.integrations[0]",
"itemIdentifier": "",
"parallel": true,
"failOnError": false,
"onProcess": "ccccc951-5e7e-46fc-be5f-b008d7f7f615-onProcessTrack",
"onException": "ccccc951-5e7e-46fc-be5f-b008d7f7f615-onExceptionTrack"
},
"id": "ccccc951-5e7e-46fc-be5f-b008d7f7f615"
},
{
"type": "session-management",
"stepName": "get resultList",
"operation": "GET",
"sessionType": "LOCAL",
"scoped": false,
"fields": [
"resultList"
],
"id": "87b8a706-dfe8-4c09-9ff4-8eeef1210edb"
},
{
"type": "transformer",
"stepName": "Transformer (JOLT)",
"transformSpec": [
{
"operation": "shift",
"spec": {
"resultList": {
"*": {
"temp": {
"nameIntegration": "nameIntegration[]"
}
}
}
}
}
],
"id": "060480d5-7b25-4642-9d01-0b46a2090e9d"
},
{
"type": "connector",
"name": "object-store-connector",
"accountLabel": "dgb-internal-object-store-account",
"stepName": "pages-for-extraction",
"params": {
"operation": "FIND_BY_QUERY",
"objectStore": "pages-for-extraction",
"query": "{\n \"integration\": {$in: {{ message.nameIntegration }} },\n \"pageStatus\": \"done\"\n}",
"limit": "50",
"skip": "0",
"sort": "{\"finished\":1}",
"unique": true,
"isolated": false,
"failOnError": false
},
"id": "ee50fdfc-b8bd-417f-ba9f-9171b9bff6f8"
},
{
"type": "choice",
"stepName": "Choice",
"when": [
{
"target": "Pagination Found (2)",
"jsonPath": "$.[?(@.rowCount > 0)]"
}
],
"otherwise": "Pagination Not Found (2)",
"id": "57b6367e-e45c-4bc6-b5a5-4380d2afefc9"
}
],
"ccccc951-5e7e-46fc-be5f-b008d7f7f615-onProcessTrack": [
{
"name": "json-generator-connector",
"type": "connector",
"stepName": "generate temp object",
"params": {
"json": "{\n \"temp\": {{ message.$ }}\n}",
"failOnError": false
},
"id": "ea50152b-ed8a-491d-b047-8bcfaaa3922c"
},
{
"type": "session-management",
"stepName": "put temp",
"operation": "PUT",
"sessionType": "LOCAL",
"scoped": true,
"fields": [
"temp"
],
"id": "d500e729-a698-4bda-9209-c9e8c5654861"
},
{
"type": "connector",
"name": "object-store-connector",
"accountLabel": "dgb-internal-object-store-account",
"stepName": "verifica se a integração já terminou",
"params": {
"operation": "FIND_BY_QUERY",
"objectStore": "integration-control",
"query": "{\n \"integrationName\": {{ message.temp.nameIntegration }},\n \"integrationDate\": {{ FORMATDATE(NOW(), \"timestamp\", \"dd/MM/yyyy\", null, \"GMT-3\") }},\n \"step\": \"integration\",\n \"status\": \"done\"\n}",
"limit": "0",
"skip": "0",
"sort": "{}",
"unique": true,
"isolated": false,
"failOnError": false
},
"id": "71c02160-d99d-49bd-b85f-57589aa641fd"
},
{
"type": "choice",
"stepName": "Choice",
"when": [
{
"target": "Integração já finalizou (1)",
"jsonPath": "$.[?(@.rowCount > 0)]",
"__documentation__": ""
}
],
"otherwise": "Verifica depêndencia (1)",
"id": "86197148-fff5-4543-9479-a7c16d4e9cf8"
}
],
"Integração já finalizou (1)": [
{
"type": "connector",
"name": "log-connector",
"stepName": "Integração finalizada",
"params": {
"logLevel": "INFO",
"message": "Hora do filtro {{ FORMATDATE(NOW(), \"timestamp\", \"dd/MM/yyyy HH:mm\", null, \"GMT-3\") }} | Integração {{ message.data[0].integrationName }} já processou"
},
"id": "f1396b86-5147-402e-aa6b-f5ba6e378404"
}
],
"Verifica depêndencia (1)": [
{
"type": "connector",
"name": "log-connector",
"stepName": "Verifica se há dependência",
"params": {
"logLevel": "INFO",
"message": "Hora do filtro {{ FORMATDATE(NOW(), \"timestamp\", \"dd/MM/yyyy HH:mm\", null, \"GMT-3\") }} | Verifica se a Integração {{ message.data[0].integrationName }} tem dependência"
},
"id": "fc71efd4-d8c9-4fbc-8317-13364597beaf"
},
{
"type": "session-management",
"stepName": "put temp",
"operation": "GET",
"sessionType": "LOCAL",
"scoped": true,
"fields": [
"temp"
],
"id": "407f2de6-d8e8-40c5-b556-e066559f1815"
},
{
"type": "connector",
"name": "block-execution-connector",
"stepName": "dependencies",
"params": {
"onProcess": "c2ed8b50-51cc-4e54-a2bb-8efb75d88862-onProcessTrack",
"onException": "c2ed8b50-51cc-4e54-a2bb-8efb75d88862-onExceptionTrack"
},
"id": "c2ed8b50-51cc-4e54-a2bb-8efb75d88862",
"description": "Verify dependencies for integration, if heave do validation, else go to pages extraction"
},
{
"type": "capsule",
"name": "capsule-v1-digibee-digibee-tools-add-item-to-session-array-1.0",
"capsuleCollection": "digibee-tools",
"capsuleCollectionVersion": 1,
"capsule": "add-item-to-session-array-1.0",
"capsuleVersionMajor": 1,
"capsuleVersionMinor": 0,
"stepName": "Add item to session (array)",
"params": {
"obj": "{{ message.$}}"
},
"id": "3fc6683c-0bdb-4742-afd1-4222703a1b8a"
},
{
"type": "connector",
"name": "log-connector",
"stepName": "Log - Add item to session (array)",
"params": {
"logLevel": "WARN",
"message": "Add item to session (array) {{ message.$ }}"
},
"id": "2f9add63-302d-43e1-8bc8-1a4f78181eed"
},
{
"type": "session-management",
"stepName": "delete temp",
"operation": "DELETE",
"sessionType": "LOCAL",
"scoped": true,
"fields": [
"temp"
],
"id": "dc1f5276-497e-4646-b7e5-254fd38aee3a"
}
],
"c2ed8b50-51cc-4e54-a2bb-8efb75d88862-onProcessTrack": [
{
"type": "choice",
"stepName": "Choice",
"when": [
{
"target": "Contains Dependencies (1)",
"jsonPath": "$.temp.dependencyList[*]"
}
],
"otherwise": "Dont contains dependencies (1)",
"id": "e4201d8f-0f61-4d99-a074-917c0ada3f44"
}
],
"Contains Dependencies (1)": [
{
"type": "connector",
"name": "log-connector",
"stepName": "Log - Contains Dependencies",
"params": {
"logLevel": "INFO",
"message": "Contains Dependencies | Integracao {{ message.temp.nameIntegration }} | Dependencias: {{ message.temp.dependencyList }}"
},
"id": "f15a11a0-c107-423d-8ef4-ef8722d4f6d5"
},
{
"type": "connector",
"name": "object-store-connector",
"accountLabel": "dgb-internal-object-store-account",
"stepName": "find integration-control",
"params": {
"operation": "FIND_BY_QUERY",
"objectStore": "integration-control",
"query": "{\n \"integrationDate\": {{ FORMATDATE( NOW(), \"timestamp\", \"dd/MM/yyyy\") }} ,\n \"integrationName\": { $in: {{ message.temp.dependencyList }} },\n \"status\": { $ne: \"done\" }\n}",
"document": "",
"limit": "0",
"skip": "0",
"sort": "",
"unique": true,
"isolated": false,
"upsert": false,
"failOnError": false
},
"id": "40db5a15-f63f-4023-a534-7832b813975c"
},
{
"type": "connector",
"name": "assert-connector-v2",
"stepName": "Assert dependencies",
"params": {
"condition": "{{ EQUALTO( message.rowCount,0) }}",
"errorMessage": "Found dependencies in progress for integration {{ message.$ }}",
"internalErrorMessage": "Internal Error",
"errorCode": 500,
"failOnError": true
},
"id": "2ac498d6-e793-43cf-95af-df4df1e3f6a2"
},
{
"type": "session-management",
"stepName": "get temp",
"operation": "GET",
"sessionType": "LOCAL",
"scoped": true,
"fields": [
"temp"
],
"id": "f91437fe-a52f-41a3-a5aa-0b534be77b1b"
},
{
"name": "json-generator-connector",
"type": "connector",
"stepName": "only temp",
"params": {
"json": "{\n \"temp\": {{ message.temp }}\n}\n",
"failOnError": false
},
"id": "2fce4888-73da-40c2-b155-a1f5c14a7e8a"
}
],
"Dont contains dependencies (1)": [
{
"type": "connector",
"name": "log-connector",
"stepName": "Log - Dont contains dependencies",
"params": {
"logLevel": "INFO",
"message": "Dont contains dependencies | Integracao {{ message.temp.nameIntegration }}"
},
"id": "bc0c9370-4677-4a32-a502-6c531d56ee81"
}
],
"ccccc951-5e7e-46fc-be5f-b008d7f7f615-onExceptionTrack": [
{
"type": "connector",
"name": "log-connector",
"stepName": "Log - For Each integration-list (onException)",
"params": {
"logLevel": "ERROR",
"message": "For Each (onException) integration-list {{ message.$ }}"
},
"id": "d4fabc6f-a4ca-45d9-b754-76af2bfb7fbc"
},
{
"type": "capsule",
"name": "capsule-v1-digibee-digibee-tools-add-item-to-session-array-1.0",
"capsuleCollection": "digibee-tools",
"capsuleCollectionVersion": 1,
"capsule": "add-item-to-session-array-1.0",
"capsuleVersionMajor": 1,
"capsuleVersionMinor": 0,
"stepName": "Add item to session (array)",
"params": {
"obj": "{{ message.$ }}"
},
"id": "945aa95b-eeaa-4b4a-a632-73b5d2b2483a"
}
],
"Pagination Found (2)": [
{
"type": "connector",
"name": "log-connector",
"stepName": "Log - Pagination Found",
"params": {
"logLevel": "INFO",
"message": "Pagination Found"
},
"id": "bb70603c-ba06-468f-bd25-7aba77e9bfd3"
},
{
"type": "connector",
"name": "for-each-connector",
"stepName": "For Each pages-for-extraction",
"params": {
"jsonPath": "$.data",
"itemIdentifier": "",
"parallel": false,
"failOnError": false,
"onProcess": "4141fc4d-e6bd-4394-ae89-234d5f79fcc1-onProcessTrack",
"onException": "4141fc4d-e6bd-4394-ae89-234d5f79fcc1-onExceptionTrack"
},
"id": "4141fc4d-e6bd-4394-ae89-234d5f79fcc1"
},
{
"type": "session-management",
"stepName": "get resultList",
"operation": "GET",
"sessionType": "LOCAL",
"scoped": false,
"fields": [
"resultList"
],
"id": "a63c03bc-adf7-4d88-87c6-ffce0bf97590"
},
{
"type": "transformer",
"stepName": "Transformer (JOLT)",
"transformSpec": [
{
"operation": "shift",
"spec": {
"resultList": {
"*": {
"temp": {
"nameIntegration": "nameIntegration[]"
}
}
}
}
}
],
"id": "58e7fc63-ca29-49e9-a474-8c0f0692f7fc"
},
{
"name": "json-generator-connector",
"type": "connector",
"stepName": "resume",
"params": {
"json": "{\n \"integrations\":{{ message.nameIntegration }}\n}",
"failOnError": false
},
"id": "dc36afa3-ab3d-4ba4-89cb-1c32a0067077"
}
],
"4141fc4d-e6bd-4394-ae89-234d5f79fcc1-onProcessTrack": [
{
"type": "connector",
"stepName": "Event Publisher",
"name": "event-publisher-connector",
"params": {
"eventName": "{{ message.event }}",
"body": "{\n \"id\": {{ message._oId }},\n \"integration\": {{ message.integration }},\n \"finished\": {{ message.finished }}\n}",
"showSendEventLog": false,
"stopOnError": false
},
"id": "e3957241-8b81-480e-a325-65dd7e71b397"
},
{
"type": "connector",
"name": "object-store-connector",
"accountLabel": "dgb-internal-object-store-account",
"stepName": "pages-for-extraction",
"params": {
"operation": "UPDATE",
"objectStore": "pages-for-extraction",
"objectId": "{{ message._oId }}",
"document": "{\n $set:{\n \"pageStatus\": \"Dispached\"\n }\n}",
"unique": true,
"isolated": false,
"upsert": false,
"failOnError": false
},
"id": "caba6de3-6a18-45c2-addb-012804a99faa"
},
{
"name": "json-generator-connector",
"type": "connector",
"stepName": "set success",
"params": {
"json": "{\n \"success\": true\n}",
"failOnError": false
},
"id": "4ca32d91-7116-40c6-8eb7-1ede69763204"
}
],
"4141fc4d-e6bd-4394-ae89-234d5f79fcc1-onExceptionTrack": [
{
"type": "connector",
"name": "log-connector",
"stepName": "Log - For Each pages-for-extraction (onException)",
"params": {
"logLevel": "ERROR",
"message": "For Each pages-for-extraction (onException) {{ message.$ }}"
},
"id": "26ee529f-9f99-402c-80b4-44f9768d63e7"
}
],
"Pagination Not Found (2)": [
{
"type": "connector",
"name": "log-connector",
"stepName": "Log - Pagination Not Found",
"params": {
"logLevel": "INFO",
"message": "Pagination Not Found"
},
"id": "5e702895-12e9-4202-bdd2-7318bfd25cab"
}
]
}
}
Flow Spec: evt-load-integration
{
"meta": {
"6d0dc228-8417-48eb-ab00-476148ed0573": {
"position": {
"x": 200,
"y": 75
}
},
"ffeee3ac-267f-4920-b5a3-ba384a989be6": {
"position": {
"x": 400,
"y": 75
}
},
"7e37c303-918c-4428-91ff-7db73586f239": {
"position": {
"x": 600,
"y": 75
}
},
"6f94dfb9-d216-4f58-a53a-2dbf8425bf0f": {
"position": {
"x": 800,
"y": 75
}
},
"ae802de8-0b7c-4a2d-9570-fcb25aaf8000": {
"position": {
"x": 1000,
"y": 75
}
},
"9dfe9c97-2a86-4b81-b370-58171603857a": {
"position": {
"x": 1450,
"y": 0
}
},
"150716e1-0240-4ec0-b2c8-77152753010d": {
"position": {
"x": 1650,
"y": 0
}
},
"1cc4d3aa-e8e1-4712-9172-d7b6976698dc": {
"position": {
"x": 1850,
"y": 0
}
},
"c98a59dc-5058-4153-a512-165db3a8d449": {
"position": {
"x": 2050,
"y": 0
}
},
"239b532d-2eaf-4040-a1ad-a844489aa0e2": {
"position": {
"x": 1450,
"y": 150
}
},
"9ea39fcf-c314-44c7-a589-c62f5b1ce4c9": {
"position": {
"x": 1650,
"y": 150
}
}
},
"flowSpec": {
"disconnected-root:b6359fcd-a77e-4faa-acf7-fcb5626e4aff": [
{
"type": "connector",
"name": "assert-connector-v2",
"stepName": "Assert id",
"params": {
"condition": "{{ NOT(ISNULL( message.id)) }}",
"errorMessage": "Id não enviado ao evento",
"internalErrorMessage": "Internal Error",
"errorCode": 500,
"failOnError": true
},
"id": "6d0dc228-8417-48eb-ab00-476148ed0573"
},
{
"name": "json-generator-connector",
"type": "connector",
"stepName": "JSON Generator",
"params": {
"json": "{\n \"payload\": {{ message.$ }}\n}",
"failOnError": false
},
"id": "ffeee3ac-267f-4920-b5a3-ba384a989be6"
},
{
"type": "session-management",
"stepName": "put payload",
"operation": "PUT",
"sessionType": "LOCAL",
"scoped": false,
"fields": [
"payload"
],
"id": "7e37c303-918c-4428-91ff-7db73586f239"
},
{
"type": "connector",
"name": "object-store-connector",
"accountLabel": "dgb-internal-object-store-account",
"stepName": "Find",
"params": {
"operation": "FIND",
"objectStore": "temporary-database",
"objectId": "{{ message.payload.id}}",
"limit": "0",
"skip": "0",
"sort": "{}",
"unique": true,
"isolated": false,
"failOnError": false
},
"id": "6f94dfb9-d216-4f58-a53a-2dbf8425bf0f"
},
{
"type": "choice",
"stepName": "Choice",
"when": [
{
"target": "Found Registers ObjStr (2)",
"jsonPath": "$.[?(@.rowCount > 0)]"
}
],
"otherwise": "Not Found ObjStr (2)",
"id": "ae802de8-0b7c-4a2d-9570-fcb25aaf8000"
}
],
"Found Registers ObjStr (2)": [
{
"type": "connector",
"name": "log-connector",
"stepName": "Found Registers ObjStr",
"params": {
"logLevel": "INFO",
"message": "Found Registers ObjStr: {{ message.rowCount }}"
},
"id": "9dfe9c97-2a86-4b81-b370-58171603857a"
},
{
"type": "connector",
"name": "block-execution-connector",
"stepName": "simulação de envio",
"params": {
"onProcess": "150716e1-0240-4ec0-b2c8-77152753010d-onProcessTrack",
"onException": "150716e1-0240-4ec0-b2c8-77152753010d-onExceptionTrack"
},
"id": "150716e1-0240-4ec0-b2c8-77152753010d",
"description": "Document here what this block is supposed to do."
},
{
"type": "connector",
"name": "block-execution-connector",
"stepName": "Last Pagination",
"params": {
"onProcess": "1cc4d3aa-e8e1-4712-9172-d7b6976698dc-onProcessTrack",
"onException": "1cc4d3aa-e8e1-4712-9172-d7b6976698dc-onExceptionTrack"
},
"id": "1cc4d3aa-e8e1-4712-9172-d7b6976698dc",
"description": "Document here what this block is supposed to do."
},
{
"type": "connector",
"name": "block-execution-connector",
"stepName": "Delete Page",
"params": {
"onProcess": "c98a59dc-5058-4153-a512-165db3a8d449-onProcessTrack",
"onException": "c98a59dc-5058-4153-a512-165db3a8d449-onExceptionTrack"
},
"id": "c98a59dc-5058-4153-a512-165db3a8d449",
"description": "Document here what this block is supposed to do."
}
],
"150716e1-0240-4ec0-b2c8-77152753010d-onProcessTrack": [
{
"name": "json-generator-connector",
"type": "connector",
"stepName": "MOCK",
"params": {
"json": "{\n \"status\": 200\n}",
"failOnError": false
},
"id": "a7abaee8-3fb1-40a1-8da4-f16594528cf2"
}
],
"150716e1-0240-4ec0-b2c8-77152753010d-onExceptionTrack": [
{
"type": "connector",
"name": "block-execution-connector",
"stepName": "reprocessa ou notifica",
"params": {
"onProcess": "95002c8b-ab12-4d15-b478-a8f45eb6fd19-onProcessTrack",
"onException": "95002c8b-ab12-4d15-b478-a8f45eb6fd19-onExceptionTrack"
},
"id": "95002c8b-ab12-4d15-b478-a8f45eb6fd19",
"description": "Document here what this block is supposed to do."
},
{
"type": "connector",
"name": "throw-error-connector",
"stepName": "Throw Error",
"params": {
"errorMessage": "Error occurred.",
"errorCode": 500,
"customErrorEnabled": false
},
"id": "9b04d630-1c79-49d8-a5c1-f7fac9f83964"
}
],
"95002c8b-ab12-4d15-b478-a8f45eb6fd19-onProcessTrack": [
{
"type": "connector",
"stepName": "Event Publisher - Reprocessamento",
"name": "event-publisher-connector",
"params": {
"eventName": "reprocessamento-exemplo",
"body": "{}",
"showSendEventLog": false,
"stopOnError": false
},
"id": "1873ebdc-41c1-490c-a26f-19a5ddbb632a"
}
],
"1cc4d3aa-e8e1-4712-9172-d7b6976698dc-onProcessTrack": [
{
"type": "session-management",
"stepName": "get payload",
"operation": "GET",
"sessionType": "LOCAL",
"scoped": false,
"fields": [
"payload"
],
"id": "f291175f-3295-4943-8067-9fc7c13b9fe9"
},
{
"type": "choice",
"stepName": "Choice",
"when": [
{
"target": "isLastPage (1)",
"jsonPath": "$.[?(@.payload.finished == true )]",
"__documentation__": ""
}
],
"otherwise": "in Progress (1)",
"id": "1832f868-334e-409e-8e8c-ea014163fad1"
}
],
"isLastPage (1)": [
{
"type": "connector",
"name": "log-connector",
"stepName": "Update integration-control",
"params": {
"logLevel": "INFO",
"message": "Updating integration-control status to {{ message.finished }}"
},
"id": "14a90ef8-53ab-4854-a152-d191631ef224"
},
{
"type": "connector",
"name": "object-store-connector",
"accountLabel": "dgb-internal-object-store-account",
"stepName": "find integration-control",
"params": {
"operation": "UPDATE_BY_QUERY",
"objectStore": "integration-control",
"query": "{\n \"integrationName\": {$eq: {{ message.payload.integration }}},\n \"integrationDate\": {$eq: {{ FORMATDATE(NOW(), \"timestamp\", \"dd/MM/yyyy\") }}}\n}",
"document": "{\n $set: {\n \"integrationName\": {{ message.payload.integration }},\n \"integrationDate\":{{ FORMATDATE(NOW(), \"timestamp\", \"dd/MM/yyyy\") }},\n \"step\":\"integration\",\n \"status\": {{ IF( EQUALTO(TOSTRING(message.payload.finished),\"true\"), \"done\", \"progress\" ) }}\n }\n}",
"unique": true,
"isolated": false,
"upsert": false,
"failOnError": false
},
"id": "b0df069a-efd6-4bd1-8558-a5291ebfb8e5"
},
{
"type": "connector",
"name": "assert-connector-v2",
"stepName": "Assert integration-control",
"params": {
"condition": "{{ GREATERTHAN( message.updateCount,0 ) }}",
"errorMessage": "Erro to insert integration control",
"internalErrorMessage": "Internal Error",
"errorCode": 500,
"failOnError": true
},
"id": "693692c9-dfb3-40a5-88e1-9f9aac90cefc"
},
{
"name": "json-generator-connector",
"type": "connector",
"stepName": "set success",
"params": {
"json": "{\n \"success\": true\n}",
"failOnError": false
},
"id": "58db5716-8abe-4585-87f5-daf47cb2f20b"
}
],
"in Progress (1)": [
{
"type": "connector",
"name": "log-connector",
"stepName": "Nothing to do",
"params": {
"logLevel": "INFO",
"message": "Last page is {{ message.finished }}"
},
"id": "e1798c51-dcaf-4a0c-9172-8265b61ac81b"
}
],
"c98a59dc-5058-4153-a512-165db3a8d449-onProcessTrack": [
{
"type": "session-management",
"stepName": "Get payload",
"operation": "GET",
"sessionType": "LOCAL",
"scoped": false,
"fields": [
"payload"
],
"id": "60f4542f-2990-4fb6-98dd-1772e3f43456"
},
{
"type": "connector",
"name": "object-store-connector",
"accountLabel": "dgb-internal-object-store-account",
"stepName": "delete",
"params": {
"operation": "DELETE",
"objectStore": "temporary-database",
"objectId": "{{ message.payload.id }}",
"unique": true,
"isolated": false,
"failOnError": false
},
"id": "470318f8-8490-42c9-8a5b-47b8565fc4dd"
},
{
"type": "session-management",
"stepName": "get payload",
"operation": "GET",
"sessionType": "LOCAL",
"scoped": false,
"fields": [
"payload"
],
"id": "80e01121-4d40-4910-97eb-dc553b589f62"
},
{
"type": "connector",
"name": "object-store-connector",
"accountLabel": "dgb-internal-object-store-account",
"stepName": "delete pages-for-extraction",
"params": {
"operation": "DELETE",
"objectStore": "pages-for-extraction",
"objectId": "{{ message.payload.id }}",
"unique": true,
"isolated": false,
"failOnError": false
},
"id": "a5ba58e8-d618-4d29-8051-00a3a975ac80"
}
],
"Not Found ObjStr (2)": [
{
"type": "connector",
"name": "log-connector",
"stepName": "Not Found Registers ObjStr",
"params": {
"logLevel": "INFO",
"message": "Object Store temporary-database is empty."
},
"id": "239b532d-2eaf-4040-a1ad-a844489aa0e2"
},
{
"name": "json-generator-connector",
"type": "connector",
"stepName": "JSON Generator",
"params": {
"json": "{\n \"Message\": \"Object Store temporary-database is empty.\"\n}",
"failOnError": false
},
"id": "9ea39fcf-c314-44c7-a589-c62f5b1ce4c9"
}
]
}
}
Conclusão
A arquitetura apresentada neste artigo garante alta escalabilidade no processo de ETL. De acordo com a demanda e restrições de cada componente, podemos escalar a extração e carga de formas diferentes, cadenciando cada uma das etapas conforme necessário. Por exemplo, se a janela de extração na origem é menor do que a de carga no destino, podemos acelerar a extração com mais réplicas.
Esse desacoplamento das etapas também evita que uma indisponibilidade no sistema de destino bloqueie a etapa de extração, que pode ser 100% concluída para posterior envio dos dados.
Além disso, o modelo permite compartilhar componentes comuns entre diversos processos, como é o caso dos pipelines de geração de páginas para extração e o pipeline de liberação de envio de páginas.
Atualizado
Isto foi útil?