# Stream CSV Reader

**Stream CSV Reader** reads a local CSV file row by row in a JSON structure and triggers subflows to process each line. This resource is recommended for scenarios in which large files need to be processed efficiently and at scale.

## **Parameters**

Configure the connector using the parameters below. Fields that support [Double Braces expressions](/documentation/connectors-and-triggers/double-braces/overview.md) are marked in the Supports DB column.

{% hint style="info" %}
This connector supports the **Alias** parameter, which allows you to save its output and reference it later in the flow using Double Braces expressions. [Learn more](/documentation/connectors-and-triggers/double-braces/how-to-reference-data-using-double-braces/previous-steps-access.md).
{% endhint %}

{% tabs fullWidth="true" %}
{% tab title="General tab" %}

<table data-full-width="true"><thead><tr><th>Parameter</th><th>Description</th><th>Type</th><th>Supports DB</th><th>Default </th></tr></thead><tbody><tr><td><strong>File Name</strong> </td><td>Name of the local CSV file to read.</td><td>String</td><td>✅</td><td><code>data.csv</code></td></tr><tr><td><strong>Charset</strong></td><td>Character encoding used to read the file.</td><td>String</td><td>❌</td><td><code>UTF-8</code></td></tr><tr><td><strong>Element Identifier</strong></td><td>In case of errors, the defined attributes are going to be sent to the onException sub-process.</td><td>String</td><td>❌</td><td>data</td></tr><tr><td><strong>Parallel Execution Of Each Iteration</strong></td><td>If enabled, each line is processed in parallel.</td><td>Boolean</td><td>❌</td><td>False</td></tr><tr><td><strong>Ignore Invalid Charset</strong></td><td>If enabled, invalid charset characters are ignored.</td><td>Boolean</td><td>❌</td><td>False</td></tr><tr><td><strong>Fail On Error</strong></td><td>If enabled, interrupts the pipeline execution when an error occurs. If disabled, execution continues, but the <code>"success"</code> property will be set to <code>false</code>.</td><td>Boolean</td><td>❌</td><td>False</td></tr><tr><td><strong>Advanced</strong></td><td>Enables advanced parameters.</td><td>Boolean</td><td>❌</td><td>False</td></tr><tr><td><strong>Delimiter</strong></td><td>Defines which delimiter to use.</td><td>String</td><td>❌</td><td>N/A</td></tr><tr><td><strong>Skip</strong></td><td>The number of lines to skip before starting to read the file.</td><td>Integer</td><td>✅</td><td>N/A</td></tr><tr><td><strong>Limit</strong> </td><td>Maximum number of rows to read from the file. A value of 0 means no limit.</td><td>Integer</td><td>✅</td><td><code>0</code></td></tr><tr><td><strong>Chunk Size</strong></td><td>Number of lines to process in each batch.</td><td>Integer</td><td>✅</td><td>1</td></tr><tr><td><strong>Ignore Header</strong></td><td>If enabled, skips the first line (header) of the file.</td><td>Boolean</td><td>❌</td><td>False</td></tr><tr><td><strong>Custom Headers (,)</strong></td><td>Defines a comma-separated list of custom headers to be used instead of the original CSV headers.</td><td>String</td><td>❌</td><td>N/A</td></tr><tr><td><strong>Filter Headers (,)</strong></td><td>Defines which CSV headers should be read, provided as a comma-separated list.</td><td>String</td><td>❌</td><td>N/A</td></tr></tbody></table>

{% endtab %}

{% tab title="Documentation tab" %}

<table data-full-width="true"><thead><tr><th>Parameter</th><th>Description</th><th>Default value</th><th>Data type</th></tr></thead><tbody><tr><td><strong>Documentation</strong></td><td>Optional field to describe the connector configuration and any relevant business rules.</td><td>N/A</td><td>String</td></tr></tbody></table>

{% endtab %}
{% endtabs %}

## **Messages flow**

### **Input**

The connector waits for a message in the following format:

```json
{
    "filename": "fileName"
}
```

### **Output**

```json
{
    "total": 0,
    "success": 0,
    "failed": 0
}
```

* `total`: Total number of processed rows.
* `success`: Total number of rows successfully processed.
* `failed`: Total number of rows whose processing failed.

{% hint style="info" %}
To indicate that a line has been processed correctly and to set the `"success"` field to true, the onProcess subpipeline must return `{ "success": true }` at the end of each execution. This is the only way to ensure that the output accurately reflects the results of the processing.
{% endhint %}

## Defining Subpipelines

To define the subpipelines to be executed in each iteration, click the **onProcess** or **onException** icons on the **Stream CSV Reader** connector. Clicking one of these icons will create the corresponding subpipeline (or display it, if it already exists).

### Accessing data within the onProcess subpipeline

The data available within the **onProcess** subpipeline depends on the **Chunk Size** parameter configuration.

#### Row-by-row mode (Chunk Size = 1)

When the **Chunk Size** is 1 (default), each CSV row is converted into a JSON object and sent individually to the **onProcess** subpipeline. The object keys correspond to the CSV headers (or to **Custom Headers**, if configured).

Given a CSV file with the following content:

```
id,name,email
1,John Smith,john@example.com
2,Jane Smith,jane@example.com
```

Each call to the **onProcess** subpipeline receives a body in the following format:

```json
{
    "id": "1",
    "name": "John Smith",
    "email": "john@example.com"
}
```

You can access any field from the row directly using **Double Braces** expressions. For example, to reference the `name` column:

```
{{$.nome}}
```

If the **Element Identifier** parameter is configured, a `processedId` field containing the row index is also included in the message body:

```json
{
    "id": "1",
    "name": "John Smith",
    "email": "john@example.com"
    "processedId": "1"
}
```

#### Batch mode (Chunk Size > 1)

When the **Chunk Size** is greater than `1`, rows are grouped into batches before being sent to the **onProcess** subpipeline. The received message body contains the batch as an array under the `chunk` key:

```json
{
    "chunk": [
        { "id": "1", "name": "John Smith", "email": "john@example.com" },
        { "id": "2", "name": "Jane Smith", "email": "jane@example.com" }
    ],
    "chunkSize": 2,
    "startIndex": 0
}
```

| Field            | Description                                               |
| ---------------- | --------------------------------------------------------- |
| **`chunk`**      | Array of JSON objects, one for each CSV row in the batch. |
| **`chunkSize`**  | Number of rows in the current batch.                      |
| **`startIndex`** | Index of the first row of the batch in the original file. |

## Loop Error Handling

When an error occurs during the processing of a row or batch, the connector triggers the **onException** subpipeline, if configured. The sent message body depends on the processing mode:

### Error in row-by-row mode

```json
{
    "processedId": "1",
    "error": "error message",
    "exception": "java.io.IOException"
}
```

| Field             | Description                              |
| ----------------- | ---------------------------------------- |
| **`processedId`** | Index of the row that failed.            |
| **`error`**       | Error message returned by the exception. |
| **`exception`**   | Full name of the exception class.        |

### Error in batch mode

```json
{
    "startIndex": 10,
    "chunkSize": 5,
    "error": "error message",
    "exception": "java.lang.Exception"
}
```

| Field            | Description                                      |
| ---------------- | ------------------------------------------------ |
| **`startIndex`** | Index of the first row of the batch that failed. |
| **`chunkSize`**  | Number of rows in the batch that failed.         |
| **`error`**      | Error message returned by the exception.         |
| **`exception`**  | Full name of the exception class.                |

{% hint style="warning" %}
If a severe structural error occurs within the **onException** subpipeline itself, the pipeline execution will be interrupted, and the error will be propagated to the next connector in the main flow.
{% endhint %}

## **Additional information**

* The connector throws an exception if the **File Name** doesn't exist or can't be read.
* The file manipulation inside a pipeline occurs in a protected way. All the files can be accessed with a temporary directory only, where each pipeline key gives access to its own files set.
* This connector makes batch processing, which means processing the data continuously and in a controlled manner in smaller batches.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.digibee.com/documentation/connectors-and-triggers/connectors/files/stream-csv-reader.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
