Stream CSV Reader
Discover more about the Stream CSV Reader connector and how to use it on the Digibee Integration Platform.
Stream CSV Reader reads a local CSV file row by row in a JSON structure and triggers subflows to process each line. This resource is recommended for scenarios in which large files need to be processed efficiently and at scale.
Parameters
Configure the connector using the parameters below. Fields that support Double Braces expressions are marked in the Supports DB column.
This connector supports the Alias parameter, which allows you to save its output and reference it later in the flow using Double Braces expressions. Learn more.
File Name
Name of the local CSV file to read.
String
✅
data.csv
Charset
Character encoding used to read the file.
String
❌
UTF-8
Element Identifier
In case of errors, the defined attributes are going to be sent to the onException sub-process.
String
❌
data
Parallel Execution Of Each Iteration
If enabled, each line is processed in parallel.
Boolean
❌
False
Ignore Invalid Charset
If enabled, invalid charset characters are ignored.
Boolean
❌
False
Fail On Error
If enabled, interrupts the pipeline execution when an error occurs. If disabled, execution continues, but the "success" property will be set to false.
Boolean
❌
False
Advanced
Enables advanced parameters.
Boolean
❌
False
Delimiter
Defines which delimiter to use.
String
❌
N/A
Skip
The number of lines to skip before starting to read the file.
Integer
✅
N/A
Limit
Maximum number of rows to read from the file. A value of 0 means no limit.
Integer
✅
0
Chunk Size
Number of lines to process in each batch.
Integer
✅
1
Ignore Header
If enabled, skips the first line (header) of the file.
Boolean
❌
False
Custom Headers (,)
Defines a comma-separated list of custom headers to be used instead of the original CSV headers.
String
❌
N/A
Filter Headers (,)
Defines which CSV headers should be read, provided as a comma-separated list.
String
❌
N/A
Documentation
Optional field to describe the connector configuration and any relevant business rules.
N/A
String
Messages flow
Input
The connector waits for a message in the following format:
Output
total: Total number of processed rows.success: Total number of rows successfully processed.failed: Total number of rows whose processing failed.
To indicate that a line has been processed correctly and to set the "success" field to true, the onProcess subpipeline must return { "success": true } at the end of each execution. This is the only way to ensure that the output accurately reflects the results of the processing.
Defining Subpipelines
To define the subpipelines to be executed in each iteration, click the onProcess or onException icons on the Stream CSV Reader connector. Clicking one of these icons will create the corresponding subpipeline (or display it, if it already exists).
Accessing data within the onProcess subpipeline
The data available within the onProcess subpipeline depends on the Chunk Size parameter configuration.
Row-by-row mode (Chunk Size = 1)
When the Chunk Size is 1 (default), each CSV row is converted into a JSON object and sent individually to the onProcess subpipeline. The object keys correspond to the CSV headers (or to Custom Headers, if configured).
Given a CSV file with the following content:
Each call to the onProcess subpipeline receives a body in the following format:
You can access any field from the row directly using Double Braces expressions. For example, to reference the name column:
If the Element Identifier parameter is configured, a processedId field containing the row index is also included in the message body:
Batch mode (Chunk Size > 1)
When the Chunk Size is greater than 1, rows are grouped into batches before being sent to the onProcess subpipeline. The received message body contains the batch as an array under the chunk key:
chunk
Array of JSON objects, one for each CSV row in the batch.
chunkSize
Number of rows in the current batch.
startIndex
Index of the first row of the batch in the original file.
Loop Error Handling
When an error occurs during the processing of a row or batch, the connector triggers the onException subpipeline, if configured. The sent message body depends on the processing mode:
Error in row-by-row mode
processedId
Index of the row that failed.
error
Error message returned by the exception.
exception
Full name of the exception class.
Error in batch mode
startIndex
Index of the first row of the batch that failed.
chunkSize
Number of rows in the batch that failed.
error
Error message returned by the exception.
exception
Full name of the exception class.
If a severe structural error occurs within the onException subpipeline itself, the pipeline execution will be interrupted, and the error will be propagated to the next connector in the main flow.
Additional information
The connector throws an exception if the File Name doesn't exist or can't be read.
The file manipulation inside a pipeline occurs in a protected way. All the files can be accessed with a temporary directory only, where each pipeline key gives access to its own files set.
This connector makes batch processing, which means processing the data continuously and in a controlled manner in smaller batches.
Last updated
Was this helpful?