Centralized reprocessing engine model on Digibee

Scalable and simplified message reprocessing model on Digibee Integration Platform.

Overview

This document presents a centralized reprocessing model for the Digibee Integration Platform. This common-use model can be triggered by any pipeline from any integration flow without the need for additional implementations.

Scenario

In most integration projects, automatic reprocessing is required, for example, to retry processing after a temporary failure. Traditionally, reprocessing logic is implemented specifically for each flow, and it’s often limited to asynchronous flows that include an event pipeline in the second stage. You can find an example of this approach in the Reprocessing strategy in event-driven integrations article.

Now, imagine having a single reprocessing engine that can serve all flows in your company, regardless of their structure or trigger, by following a few parameterization standards. This engine would be highly scalable and capable of meeting the reprocessing demands of all your integration flows.

Architecture

The implementation follows an event-driven, asynchronous architecture composed of three stages:

  1. Initial pipeline: Receives the message to be reprocessed and stores it in a temporary repository (Object Store).

  2. Second pipeline: Periodically retrieves the message identifiers (IDs) and publishes them for reprocessing.

  3. Third pipeline (Reprocessing pipeline): Consumes the published ID, retrieves the message from the temporary repository, and routes the message back to the original flow.

Key components

1

Reprocessing request

An event-type pipeline receives the message and stores it in a temporary Object Store named reprocess. This pipeline can be scaled both horizontally (by increasing replicas) and vertically (by increasing size) to support your company’s request volume and message sizes.

It ensures the payload is correctly formatted for reprocessing and determines key information, such as the retry timestamp,based on the intervalRetry field, which represents the number of minutes to wait before the next attempt.

In the diagram below, this step is handled by the add-reprocess-event pipeline:

Example payload sent to the reprocessing engine:

{
  "payload": {
    "customerId": 123456,
    "customerName": "Raimundo Nonato",
    "customerCity": "Maranguape",
    "customerState": "CE",
    "customerJob": "Professor"
  },
  "pipeOrEventName": "update-customer-event",
  "pipeType": "event",
  "maxRetry": 3,
  "intervalRetry": 5
}

The message must include, in addition to the payload itself:

  • The name of the original event or pipeline (if not an event-type).

  • The trigger type of the source pipeline (pipeType).

  • Maximum number of reprocessing attempts.

  • The interval (in minutes) between attempts.

2

Listing and publishing pending reprocesses

Once the messages are properly stored in the reprocess Object Store, the next steps are:

  1. List messages ready for retry A scheduler-type pipeline is responsible for listing messages whose retry interval has been reached. The scheduler must run at the smallest interval required for any reprocessing scenario. For example, if the shortest retry interval is 5 minutes, the scheduler should execute every 5 minutes.

  2. Publish message ID Each retrieved message ID is then published to the next event. Because every message includes a retry timestamp, the scheduler filters and publishes only those that are due. This process is handled by the reprocess-timer pipeline (as shown in the diagram):

The reprocess-timer pipeline should list pending reprocessing IDs (in a paginated manner) and publish them individually to the reprocessing-event. It can be configured with minimal resources, as its function is limited to listing and publishing.

3

Reprocessing and resubmitting to the original pipeline

The reprocessing-event is consumed by a pipeline of the same name. For each published ID in the event, the pipeline:

  • Retrieves the original payload from the reprocess Object Store.

  • Checks whether the retry limit has been reached.

If the retry limit has not been reached, the message is resent to the original flow:

  • If the original flow is event-based, it's sent via an event.

  • Otherwise, the Pipeline Executor connector is used.

If the retry limit has been reached, the pipeline should publish a message to the company’s standard notification mechanism (if one exists, or one can be built).

The original flow must be prepared to receive the payload from the reprocessing engine. For example, if the original pipeline is a scheduler that assembles the payload throughout the flow from a file server or database, a new route may be needed to receive the input and continue the flow once the payload is ready.

The diagram below represents the reprocess-event pipeline:

Example payload sent back to the original flow:

{
  "customerId": 123456,
  "customerName": "Raimundo Nonato",
  "customerCity": "Maranguape",
  "customerState": "CE",
  "customerJob": "Professor",
  "_reprocess": {
    "forwardTo": {
      "name": "update-invoice-event",
      "type": "event"
    },
    "retry": {
      "intervalRetry": 5,
      "max": 2,
      "current": 1,
      "lastRetry": 1687462056888
    }
  },
  "_oId": "b1ba4fdd-4c0c-4991-a153-b715ebfe4c9c"
}

Note that the payload sent back to the original processing flow includes a retry field with detailed retry information. The reprocess-event pipeline should increment the current retry value each time the message is resent. If the original flow needs to return the message to the reprocessing engine, it should include this value in the next call.

Like the input pipeline, this return pipeline can be scaled horizontally and vertically to meet performance and volume demands.

Benefits

  • A standardized reprocessing model that is independent of business-specific logic.

  • Centralized maintenance without the need to modify original processing flows.

  • Independent scalability that doesn’t impact the performance of the original pipelines.

  • Easy integration with shared mechanisms such as alerting systems, without modifying the original pipelines.

Conclusion

This architecture enables a scalable and centralized reprocessing process for your company’s integration flows, avoiding custom implementations for each use case.

Event pipelines at the entry and exit points of the mechanism allow for independent scalability, preventing errors like OutOfMemory or performance degradation caused by unexpected high reprocessing volumes.

Moreover, it promotes reuse by acting as a shared engine, centralizing the maintenance and evolution of its components, for example, integrations with other shared services such as the company’s notification mechanism.

Last updated

Was this helpful?