# Stream DB V3

O **Stream DB V3** permite estabelecer uma conexão com um serviço que suporta o protocolo JDBC (*Java Database Connectivity*) e executar instruções SQL (*Structured Query Language*). Para consultar quais os bancos de dados suportados por esse componente, leia a [documentação Bancos de dados suportados](https://docs.digibee.com/documentation/connectors-and-triggers/pt-br/connectors/structured-data/supported-databases).

Diferentemente do componente [**DB V1**](https://docs.digibee.com/documentation/connectors-and-triggers/pt-br/connectors/structured-data/db-v1), o **Stream DB** foi desenvolvido para realizar execução em lotes, ou seja, cada retorno (linha resultante ou *row*) da instrução SQL executada é tratada individualmente através de um *subpipeline*, podendo conter seu próprio fluxo de processamento. [Leia o artigo Subpipelines para saber mais.](https://app.gitbook.com/s/cO0A6g1dOsu8BiHYqO67/development-cycle/build-overview/pipelines/subpipelines)

## Parâmetros

Dê uma olhada nas opções de configuração do componente. Parâmetros suportados por [expressões *Double Braces*](https://docs.digibee.com/documentation/connectors-and-triggers/pt-br/double-braces/overview) estão marcados com `(DB)`.

### **Aba General**

<table data-full-width="true"><thead><tr><th>Parâmetro</th><th>Descrição</th><th>Valor padrão</th><th>Tipo de dado</th></tr></thead><tbody><tr><td><strong>Account Type</strong></td><td>Define o tipo de Conta a ser utilizada pelo componente. As opções são: <em>Basic</em>, <em>AWS V4</em> e <em>Kerberos.</em></td><td><em>Basic</em></td><td><em>String</em></td></tr><tr><td><strong>Account</strong></td><td>Conta a ser utilizada pelo componente para se conectar. Contas suportadas: <em>Basic</em> e Kerberos.</td><td>N/A</td><td><em>String</em></td></tr><tr><td><strong>Fail On Error</strong></td><td>Se a opção estiver ativada, a execução do pipeline com erro será interrompida. Do contrário, a execução do pipeline continua, mas o resultado irá mostrar um valor falso para a propriedade <code>"success"</code>.</td><td><em>False</em></td><td>Booleano</td></tr></tbody></table>

### **Aba Operation**

<table data-full-width="true"><thead><tr><th>Parâmetro</th><th>Descrição</th><th>Valor padrão</th><th>Tipo de dado</th></tr></thead><tbody><tr><td><strong>Database URL</strong></td><td>URL (<em>Uniform Resource Locator</em>) para estabelecer conexão ao servidor de banco de dados com suporte ao protocolo JDBC. Este parâmetro aceita <em>Double Braces</em>.</td><td>jdbc:mysql://35.223.175.97/db-training</td><td><em>String</em></td></tr><tr><td><strong>SQL Statement</strong> <code>(DB)</code></td><td>Instrução SQL a ser executada.</td><td>select * from clientes LIMIT 2</td><td><em>String</em></td></tr><tr><td><strong>Column Name</strong></td><td>Caso haja um erro no processamento do <em>subpipeline onProcess</em>, o valor associado à coluna definida neste campo será adicionado à mensagem de erro, em um novo campo chamado "<em>processedId</em>" e que poderá ser manipulado pelo <em>subpipeline onException</em>.</td><td>codigo</td><td><em>String</em></td></tr><tr><td><strong>Parallel Execution Of Each Iteration</strong></td><td>Quando ativada, essa opção faz com que cada uma das passagens pelo <em>subpipeline</em> seja feita em paralelo, reduzindo o tempo de execução total. No entanto, não há qualquer garantia de que os itens sejam executados na ordem retornada pelo banco de dados.</td><td><em>False</em></td><td>Booleano</td></tr><tr><td><strong>Blob As File</strong></td><td>Se ativada, a opção faz com que os campos do tipo blob sejam armazenados no contexto do <em>pipeline</em> como arquivo; do contrário, os campos são armazenados como textos normais (<em>strings</em>) e codificados em base64.</td><td><em>False</em></td><td>Booleano</td></tr><tr><td><strong>Clob As File</strong></td><td>A opção faz com que os campos do tipo clob sejam armazenados no contexto do <em>pipeline</em> como arquivo; do contrário, os campos são armazenados como textos normais (<em>strings</em>).</td><td><em>False</em></td><td>Booleano</td></tr><tr><td><strong>Charset</strong></td><td>Essa opção é exibida apenas quando a opção <em><strong>Clob As File</strong></em> for ativada. Esse parâmetro permite configurar o <em>encoding</em> de arquivos clob.</td><td>UTF-8</td><td><em>String</em></td></tr><tr><td><strong>Custom Connection Properties</strong></td><td>Propriedades de conexão específicas definidas pelo usuário.</td><td>N/A</td><td><em>String</em></td></tr></tbody></table>

### **Aba Advanced Settings**

<table data-full-width="true"><thead><tr><th>Parâmetro</th><th>Descrição</th><th>Valor padrão</th><th>Tipo de dado</th></tr></thead><tbody><tr><td><strong>Pool Size By Actual Consumers</strong></td><td>Se a opção estiver habilitada, o número de conexões agrupadas é igual ao número de consumidores configurados na implantação do <em>pipeline</em>. Se a opção estiver desativada, o tamanho do pool será determinado pelo tamanho da implantação do <em>pipeline</em>, independentemente do número de consumidores.</td><td><em>False</em></td><td>Booleano</td></tr><tr><td><strong>Exclusive DB Pool</strong></td><td>Se a opção estiver habilitada, um novo <em>pool</em> não compartilhado sempre é criado para uso exclusivo desse conector. Se a opção estiver desativada, um <em>pool</em> poderá ser compartilhado entre os componentes se a URL, credenciais e propriedades específicas de conexão forem as mesmas. <strong>Importante:</strong> propriedades específicas de conexão precisam ser declaradas na mesma ordem entre outros conectores para que o <em>pool</em> seja compartilhado, caso contrário será considerada como uma configuração diferente e um novo pool será criado.</td><td><em>False</em></td><td>Booleano</td></tr><tr><td><strong>Custom Pool</strong></td><td>Se a opção for habilitada, a configuração base do <em>pool</em> de conexões será feita baseada nos parâmetros <strong>Connection Maximum Lifetime</strong>, <strong>Minimum Idle Connections</strong> e <strong>Idle Connection Timeout</strong>. Se a opção for desabilitada, o <em>pool</em> será configurado com base no parâmetro <strong>Keep Connection</strong>. <strong>Importante</strong>: essa é uma funcionalidade avançada e deve ser usada com cautela. Confira abaixo a seção dedicada para o tópico Pool de Conexões.</td><td><em>False</em></td><td>Booleano</td></tr><tr><td><strong>Connection Maximum Lifetime</strong></td><td>Define o tempo de vida máximo de uma conexão no <em>pool</em> de conexões. Uma conexão em uso nunca será encerrada. Ela será removida apenas quando for fechada. O valor mínimo permitido é 30000 milissegundos (30 segundos). Se um valor menor for informado, será usado o valor padrão de 1800000 milissegundos (30 minutos). Esta opção está disponível apenas se o parâmetro <strong>Custom Pool</strong> estiver habilitado.</td><td>N/A</td><td><em>Integer</em></td></tr><tr><td><strong>Minimum Idle Connections</strong></td><td>Define o número mínimo de conexões <em>idle</em> a serem mantidas no <em>pool</em>. O valor máximo permitido é baseado no tamanho do <em>deployment</em>, ou seja, 10, 20 ou 40. Se um valor maior for informado, o máximo para o tamanho do <em>deployment</em> será usado. Esta opção está disponível apenas se o parâmetro <strong>Custom Pool</strong> estiver habilitado.</td><td>N/A</td><td><em>Integer</em></td></tr><tr><td><strong>Idle Connection Timeout</strong></td><td>Define o máximo de tempo no qual uma conexão pode ser mantida <em>idle</em> no <em>pool</em>. Para essa opção ter efeito: Seu valor deve ser inferior ao definido em <strong>Connection Maximum Lifetime</strong>. O valor configurado em <strong>Minimum Idle Connections</strong> deve ser inferior ao tamanho do <em>pool</em> (definido através de <strong>Pool Size By Actual Consumers</strong>). Esta opção está disponível apenas se o parâmetro <strong>Custom Pool</strong> estiver habilitado.</td><td>N/A</td><td><em>Integer</em></td></tr><tr><td><strong>Keep Connection</strong></td><td>Se a opção for habilitada, o <em>pool</em> de conexões sempre manterá um número mínimo de conexões abertas prontas para uso. Após 30 minutos, essas conexões serão renovadas. O número mínimo de conexões abertas é definido baseado no parâmetro <strong>Pool Size By Actual Consumers</strong>. Se a opção for desabilitada, o <em>pool</em> será criado vazio e conexões serão criadas sob demanda, sendo mantidas no <em>pool</em> por não mais que 5 minutos. Neste caso, as conexões não são renovadas. Esta opção está disponível apenas se o parâmetro <strong>Custom Pool</strong> estiver desabilitado.</td><td><em>True</em></td><td>Booleano</td></tr><tr><td><strong>Output Column From Label</strong></td><td>Para alguns bancos de dados, se seu <em>Select</em> usar um alias, você deve habilitar este sinalizador para que o nome da coluna seja exibido exatamente como o alias.</td><td><em>False</em></td><td>Booleano</td></tr><tr><td><strong>Connection Test Query</strong></td><td><em>SQL statement</em> a ser usado antes de cada conexão ser estabelecida. Este é um parâmetro opcional e deve ser usado com bancos de dados que não fornecem informações confiáveis sobre o status da conexão.</td><td>N/A</td><td><em>String</em></td></tr><tr><td><strong>Raw SQL Statement</strong> <code>(DB)</code></td><td>Se a opção estiver ativada, o parâmetro <strong>SQL Statement</strong> permite o uso de <em>queries</em> dinâmicas através de declarações <em>Double Braces</em>. Ao utilizar essa funcionalidade, você deve garantir que o pipeline possua mecanismos de segurança contra instruções SQL indesejadas (<em>SQL Injection</em>). Veja mais sobre esse parâmetro na <a href="#raw-sql-statement">seção</a> abaixo.</td><td><em>False</em></td><td>Booleano</td></tr></tbody></table>

### **Aba Documentation**

<table data-full-width="true"><thead><tr><th>Parâmetro</th><th>Descrição</th><th>Valor padrão</th><th>Tipo de dado</th></tr></thead><tbody><tr><td><strong>Documentation</strong></td><td>Seção para documentar qualquer informação necessária sobre a configuração do conector e regras de negócio.</td><td>N/A</td><td><em>String</em></td></tr></tbody></table>

{% hint style="info" %}
Em casos onde um banco de dados Apache Hive é usado, os dados de *Updatecount* podem estar indisponíveis devido a uma característica do sistema. Essa informação estará disponível apenas se o controle do *updated row count* estiver habilitado no servidor Apache Hive. [Para mais informações sobre suporte Apache Hive para a Digibee Integration Platform, leia o artigo Banco de dados suportados.](https://docs.digibee.com/documentation/connectors-and-triggers/pt-br/connectors/structured-data/supported-databases)
{% endhint %}

## Informações adicionais sobre parâmetros

### Column Name

Veja o seguinte exemplo sobre a mensagem de erro em **Column Name**:

```
{
  "timestamp": 1600797662733,
  "error": "Error message",
  "code": 500,
  "processedId": "2"
}
```

### Blob As File

Se **Blob As File** estiver ativado, os campos do tipo blob são armazenados como no exemplo a seguir:&#x20;

```
// "Blob As File" true
{
  "id": 12,
  "blob": "d3X8YK.file",
}
```

Do contrário, os campos do tipo blob são armazenados como mostrado abaixo:

{% code overflow="wrap" %}

```
// "Blob As File" false
{
  "id": 12,
  "blob": "iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAIAAACQkWg2AAAAAXNSR0IArs4c6QAAAARnQU1BAACxjwv8YQUAAAAJcEhZcwAADsMAAA7DAcdvqGQAAAAeSURBVDhPY1Da6EMSYiBJNVDxqAZiQmw0lAZHKAEAaskfEED3lr0AAAAASUVORK5CYII="
}
```

{% endcode %}

### Clob As File

Se **Clob As File** estiver ativado, os campos do tipo clob são armazenados como no exemplo a seguir:&#x20;

```
// "Clob As File" true
{
  "id": 15,
  "clob": "f7X9AS.file",
}
```

Do contrário, os campos do tipo clob são armazenados como mostrado abaixo:

```
// "Clob As File" false
{
  "id": 15,
  "clob": "AAAAABBBBBCCCC”
}
```

## Raw SQL Statement

Para trazer mais flexibilidade ao utilizar o **Stream DB V3**, podemos ativar a opção **Raw SQL Statement**, configurar previamente uma *query* e referenciá-la via *Double Braces* no parâmetro **SQL Statement** da seguinte maneira:

### **Query definida previamente via Template Transformer**

<figure><img src="https://content.gitbook.com/content/SKBJ6ZiEWBU93x170HH4/blobs/kPXN0Fupn43aqZ9ZYy9Z/img-1-template-transformer.png" alt=""><figcaption></figcaption></figure>

### **Ativação do&#x20;*****Raw SQL Statement***

<figure><img src="https://content.gitbook.com/content/SKBJ6ZiEWBU93x170HH4/blobs/mNZm0ZODmj2yHVK9AmT0/img2-raw-sql-statement.png" alt=""><figcaption></figcaption></figure>

### **Query referenciada no parâmetro SQL Statement**

<figure><img src="https://content.gitbook.com/content/SKBJ6ZiEWBU93x170HH4/blobs/QD2GhpZoZI7vqYvIZzMo/img3-sql-statement-parameter.png" alt=""><figcaption></figcaption></figure>

{% hint style="warning" %}
**Importante:** como boa prática, recomendamos fortemente que ao ativar a opção **Raw SQL Statement**, as *queries* sejam definidas previamente através do componente [**Template Transformer**](https://docs.digibee.com/documentation/connectors-and-triggers/pt-br/connectors/tools/template-transformer)*.* O uso do **Template Transformer** permite validar parâmetros através da tecnologia *FreeMarker* e também a declaração de parâmetros via *Double Braces*. Estes parâmetros não são resolvidos pelo **Template Transformer** e sim pelo componente **Stream DB V3**, que por padrão configura e valida os parâmetros da instrução SQL previamente (*PreparedStatement*). Ao aplicar essas medidas de segurança, você diminui os riscos de ataques do tipo *SQL Injection*.
{% endhint %}

Na imagem abaixo, temos à esquerda um exemplo do uso recomendado do componente (com o *Double Braces* na cláusula *WHERE*, no destaque verde); e à direita um exemplo do uso não recomendado (com o FreeMarker na cláusula *WHERE*, no destaque vermelho) que pode trazer riscos à segurança do *pipeline*:

<figure><img src="https://content.gitbook.com/content/SKBJ6ZiEWBU93x170HH4/blobs/LP2oiDa59BDUXSltz6c9/img4-usage-examples.png" alt=""><figcaption></figcaption></figure>

## Tecnologia <a href="#tecnologia" id="tecnologia"></a>

### **Autenticação via Kerberos** <a href="#autenticao-via-kerberos" id="autenticao-via-kerberos"></a>

Para realizar autenticação a um banco de dados via Kerberos é necessário:

* informar uma conta (*account*) do tipo KERBEROS;
* configurar um Kerberos principal;
* configurar uma *keytab* (que deve ser a base64 do próprio arquivo *keytab* gerado).

## Fluxo de Mensagens <a href="#fluxo-de-mensagens" id="fluxo-de-mensagens"></a>

### **Estrutura de mensagem disponível no subpipeline onProcess** <a href="#estrutura-de-mensagem-disponvel-no-subpipeline-onprocess" id="estrutura-de-mensagem-disponvel-no-subpipeline-onprocess"></a>

Uma vez que a instrução SQL é executada, o *subpipeline* será disparado recebendo o resultado da execução por meio de uma mensagem na seguinte estrutura:

```
{  
    "coluna1": "dado1",   
    "coluna2": "dado2",  
    "coluna3": "dado3"
}
```

### Saída com erro <a href="#erro" id="erro"></a>

```
{   
    "code": error_code,   
    "error": mensagem de erro,   
    "processId": the_id_column_value
}
```

### Saída <a href="#sada" id="sada"></a>

Após a execução do componente, é retornada uma mensagem na seguinte estrutura:

```
{   
    "total": 0,   
    "success": 0,   
    "failed": 0
}
```

* **total:** número total de linhas processadas.
* **success:** número total de linhas processadas com sucesso.
* **failed:** número total de linhas cujo processamento falhou.

{% hint style="info" %}
**Importante:** para detectar se uma linha foi processada corretamente, cada *subpipeline* *onProcess* deve responder com { *"success": true* } a cada elemento processado.
{% endhint %}

O **Stream DB V3** realiza processamento em lote, o que significa processar os dados de forma contínua e controlada em lotes menores.

## Pool de conexão <a href="#h_cf06a705b9" id="h_cf06a705b9"></a>

Por padrão, utilizamos um *pool* com tamanho baseado nas configurações do *pipeline* implantado. Caso seja um *pipeline Small*, então o tamanho do *pool* será de 10; para o *Medium*, será de 20; e para o *Large*, de 40.

É possível gerenciar o tamanho do *pool* também na hora da implantação. Para isso, será necessário habilitar o parâmetro **Pool Size By Actual Consumers** no componente. Com isso, será utilizado o que for configurado manualmente na tela de implantação.

No exemplo da figura abaixo, foi configurado um *pipeline* *Small* com 5 *consumers*. Se você quiser que o *pool* dos componentes de banco de dados ([DB V2](https://docs.digibee.com/documentation/connectors-and-triggers/pt-br/connectors/structured-data/db-v2) e [Stream DB V3](https://docs.digibee.com/documentation/connectors-and-triggers/pt-br/connectors/structured-data/stream-db-v3)) utilize esse tamanho, é necessário habilitar o parâmetro **Pool Size By Actual Consumers** em todos os componentes existentes.

![](https://content.gitbook.com/content/SKBJ6ZiEWBU93x170HH4/blobs/8vPyozm0y9cprHmvm7K3/streamdb%20v3.png)

Tenha atenção redobrada ao configurar o tamanho do *pool* manualmente para que não ocorra nenhum *deadlock* em chamadas concorrentes ao mesmo banco.

O nosso *pool* é compartilhado entre os componentes de banco de dados que acessam o mesmo banco de dados dentro do *pipeline*. Caso seja necessário um *pool* exclusivo para um determinado componente, habilite o parâmetro **Exclusive DB Pool**.

### **Customizando o Pool de Conexões**

Por padrão, o conector definirá como as conexões são gerenciadas no *pool* com base no parâmetro **Keep Connection**. Esta configuração visa facilitar a configuração do *pool* de conexões para cenários em que, por exemplo, alta disponibilidade é necessária ou o oposto disso.

Se habilitado, as conexões são mantidas abertas no *pool* o tempo todo e são renovadas após 30 minutos. Isso pode ser usado para cenários de alta disponibilidade reduzindo custo de abrir novas conexões frequentemente.

No entanto, se tivermos o cenário oposto onde as conexões são necessárias em intervalos de tempo mais espaçados, você pode desativar o parâmetro para configurar o *pool* sem conexões abertas previamente e abri-las apenas quando for realmente necessário. Após a conexão ser utilizada, ela será mantida no *pool* por no máximo 5 minutos e não será renovada.

Embora isso signifique que você não precisa se preocupar com a configuração do *pool* de conexões, esta opção padrão pode não ser a melhor para alguns casos.

Para ter mais flexibilidade na configuração do *pool*, você pode ativar a opção **Custom Pool**. Esta opção ignora a configuração padrão e torna possível definir uma configuração customizada.

Nesse caso, precisamos definir os três parâmetros abaixo (que são definidos implicitamente na configuração padrão):

* **Connection Maximum Lifetime**
* **Minimum Idle Connections**
* **Idle Connection Timeout**

{% hint style="warning" %}
Configurar um *pool* de conexões pode ser uma tarefa difícil, pois se espera um conhecimento mais aprofundado sobre o tema. Quando aplicado no contexto da Digibee Integration Platform, você deve considerar as variáveis que podem afetar o desempenho do *pool*.

Coisas como o tamanho do deployment e suas réplicas, acessar os mesmos bancos de dados usando credenciais e propriedades distintas nos conectores de banco de dados dentro dos mesmos e diferentes *pipelines e* a opção exclusiva de *pool* de banco de dados nos conectores de banco de dados impactam diretamente em como o *pool* de conexões deve ser configurado.

Com base em tudo mencionado acima, é altamente recomendável habilitar a opção **Custom Pool** se tiver conhecimento sobre o tema e se for realmente necessário.
{% endhint %}

{% hint style="info" %}
O conector usa o *framework* *HikariCP* para gerenciar os *pools* de conexão. Informações adicionais sobre o tema podem ser encontradas em sua [documentação oficial](https://github.com/brettwooldridge/HikariCP).
{% endhint %}
