Stream CSV Reader

Discover more about the Stream CSV Reader connector and how to use it on the Digibee Integration Platform.

Stream CSV Reader reads a local CSV file row by row in a JSON structure and triggers subflows to process each line. This resource is recommended for scenarios in which large files need to be processed efficiently and at scale.

Parameters

Configure the connector using the parameters below. Fields that support Double Braces expressions are marked in the Supports DB column.

circle-info

This connector supports the Alias parameter, which allows you to save its output and reference it later in the flow using Double Braces expressions. Learn more.

Parameter
Description
Type
Supports DB
Default

File Name

Name of the local CSV file to read.

String

data.csv

Charset

Character encoding used to read the file.

String

UTF-8

Element Identifier

In case of errors, the defined attributes are going to be sent to the onException sub-process.

String

data

Parallel Execution Of Each Iteration

If enabled, each line is processed in parallel.

Boolean

False

Ignore Invalid Charset

If enabled, invalid charset characters are ignored.

Boolean

False

Fail On Error

If enabled, interrupts the pipeline execution when an error occurs. If disabled, execution continues, but the "success" property will be set to false.

Boolean

False

Advanced

Enables advanced parameters.

Boolean

False

Delimiter

Defines which delimiter to use.

String

N/A

Skip

The number of lines to skip before starting to read the file.

Integer

N/A

Limit

Maximum number of rows to read from the file. A value of 0 means no limit.

Integer

0

Chunk Size

Number of lines to process in each batch.

Integer

1

Ignore Header

If enabled, skips the first line (header) of the file.

Boolean

False

Custom Headers (,)

Defines a comma-separated list of custom headers to be used instead of the original CSV headers.

String

N/A

Filter Headers (,)

Defines which CSV headers should be read, provided as a comma-separated list.

String

N/A

Messages flow

Input

The connector waits for a message in the following format:

Output

  • total: Total number of processed rows.

  • success: Total number of rows successfully processed.

  • failed: Total number of rows whose processing failed.

circle-info

To indicate that a line has been processed correctly and to set the "success" field to true, the onProcess subpipeline must return { "success": true } at the end of each execution. This is the only way to ensure that the output accurately reflects the results of the processing.

Defining Subpipelines

To define the subpipelines to be executed in each iteration, click the onProcess or onException icons on the Stream CSV Reader connector. Clicking one of these icons will create the corresponding subpipeline (or display it, if it already exists).

Accessing data within the onProcess subpipeline

The data available within the onProcess subpipeline depends on the Chunk Size parameter configuration.

Row-by-row mode (Chunk Size = 1)

When the Chunk Size is 1 (default), each CSV row is converted into a JSON object and sent individually to the onProcess subpipeline. The object keys correspond to the CSV headers (or to Custom Headers, if configured).

Given a CSV file with the following content:

Each call to the onProcess subpipeline receives a body in the following format:

You can access any field from the row directly using Double Braces expressions. For example, to reference the name column:

If the Element Identifier parameter is configured, a processedId field containing the row index is also included in the message body:

Batch mode (Chunk Size > 1)

When the Chunk Size is greater than 1, rows are grouped into batches before being sent to the onProcess subpipeline. The received message body contains the batch as an array under the chunk key:

Field
Description

chunk

Array of JSON objects, one for each CSV row in the batch.

chunkSize

Number of rows in the current batch.

startIndex

Index of the first row of the batch in the original file.

Loop Error Handling

When an error occurs during the processing of a row or batch, the connector triggers the onException subpipeline, if configured. The sent message body depends on the processing mode:

Error in row-by-row mode

Field
Description

processedId

Index of the row that failed.

error

Error message returned by the exception.

exception

Full name of the exception class.

Error in batch mode

Field
Description

startIndex

Index of the first row of the batch that failed.

chunkSize

Number of rows in the batch that failed.

error

Error message returned by the exception.

exception

Full name of the exception class.

circle-exclamation

Additional information

  • The connector throws an exception if the File Name doesn't exist or can't be read.

  • The file manipulation inside a pipeline occurs in a protected way. All the files can be accessed with a temporary directory only, where each pipeline key gives access to its own files set.

  • This connector makes batch processing, which means processing the data continuously and in a controlled manner in smaller batches.

Last updated

Was this helpful?