Merge API JSON Responses With NiFi And Jolt

by Kenji Nakamura 44 views

Hey guys! Ever found yourself in a situation where you need to combine data from two different APIs into a single, unified JSON? It's a common challenge, especially when building data-driven applications. Fortunately, Apache NiFi and its powerful Jolt transformation processor provide an elegant solution. In this article, we'll dive deep into how you can merge two JSON API responses based on a unique identifier using NiFi and Jolt. We'll break down the process step by step, ensuring you understand the concepts and can implement them in your own projects. So, buckle up and let's get started!

Understanding the Challenge

Before we jump into the solution, let's clearly define the problem. Imagine you have two APIs. The first API returns customer information, including their unique ID (Cust_Id), name, and contact details. The second API provides audit logs for each customer, identified by the same Cust_Id, along with details like the timestamp and action performed. Our goal is to merge these two datasets into a single JSON structure where each customer record includes both their basic information and their audit logs. The merged JSON should be structured in such a way that it’s easy to query and analyze. Specifically, we want to focus on scenarios where we use a unique identifier, such as Cust_Id, to link the records from both APIs. If a Cust_Id is present in one API response but not the other, we need to handle it gracefully, ensuring our final output only includes records that have matching IDs in both datasets.

Prerequisites

To follow along with this tutorial, you'll need a few things set up:

  • Apache NiFi: Ensure you have NiFi installed and running. You can download it from the official Apache NiFi website (https://nifi.apache.org/).
  • Basic NiFi Knowledge: Familiarity with NiFi concepts like processors, flow files, and connections will be helpful.
  • Jolt Transformation: A basic understanding of Jolt transformation specifications and how to use them in NiFi is necessary. If you're new to Jolt, don't worry; we'll cover the essentials, but having some prior exposure will make things smoother.
  • Sample JSON Data: You should have access to sample JSON responses from your two APIs. These will serve as the input for our NiFi flow. If you don't have real APIs to work with, you can create sample JSON files for testing purposes.

Setting Up the NiFi Flow

Let's create a NiFi flow to merge the JSON responses. Here’s a step-by-step guide:

  1. Create Input Processors: Start by adding two GetFile or InvokeHTTP processors to your NiFi canvas. These processors will fetch the JSON responses from either local files or external APIs.
    • GetFile: If you're using local JSON files, configure GetFile to read from the directory where your files are stored.
    • InvokeHTTP: If you're fetching data from APIs, configure InvokeHTTP with the API endpoints and any necessary authentication details. Ensure the processor is set to handle JSON responses.
  2. Add a JoltTransformJSON Processor: Drag a JoltTransformJSON processor onto the canvas. This processor will perform the transformation logic to merge the JSON data.
  3. Configure JoltTransformJSON:
    • Connect the output of the GetFile or InvokeHTTP processors to the input of the JoltTransformJSON processor.
    • In the JoltTransformJSON processor configuration, set the Jolt Transformation DSL property to Chain. This allows us to define a series of transformations.
    • Add the Jolt specification that defines how the JSON responses will be merged. This is the core of the process, and we’ll discuss the specification in detail in the next section.
  4. Add a MergeContent Processor: After the JoltTransformJSON processor, add a MergeContent processor. This processor will combine the transformed JSON records into a single flow file.
  5. Configure MergeContent:
    • Connect the output of the JoltTransformJSON processor to the input of the MergeContent processor.
    • Configure MergeContent to use a suitable merge strategy, such as Defragment. Set the minimum and maximum number of entries to merge, depending on your data volume.
    • Adjust the correlation attribute name if necessary, ensuring it matches the attribute used to group the records (in our case, the Cust_Id).
  6. Add a PutFile Processor: Finally, add a PutFile processor to write the merged JSON data to a file or directory.
  7. Configure PutFile:
    • Connect the output of the MergeContent processor to the input of the PutFile processor.
    • Specify the directory where you want to save the merged JSON file.

Crafting the Jolt Specification

The heart of our solution lies in the Jolt specification. This specification defines how the JSON transformations should occur. We'll use a Chain transformation DSL, which allows us to apply multiple transformations in sequence. Here’s a breakdown of the Jolt specification we’ll use:

Step 1: Grouping by Cust_Id

The first step is to group the records from both APIs by their Cust_Id. This ensures that records with the same ID are processed together. We’ll use the shift operation for this.

[
  {
    "operation": "shift",
    "spec": {
      "*": {
        "Cust_Id": "@(0,Cust_Id)",
        "*"     : "@(0,Cust_Id).&"
      }
    }
  },

In this specification:

  • The * in the outer layer matches any top-level element in the input JSON array.
  • The Cust_Id key extracts the value of Cust_Id and uses it as the key for the new structure.
  • The * in the inner layer matches any other key in the input record.
  • The @(0,Cust_Id) notation fetches the value of Cust_Id from the current level.
  • The &(0,&) notation retains the original key names.

Step 2: Merging the Records

Next, we’ll merge the grouped records. We’ll use the modify-default operation to create a new structure where the customer information and audit logs are combined under a single Cust_Id.

 {
    "operation": "modify-default",
    "spec": {
      "*": {
        "customerInfo": "",
        "auditLogs": []
      }
    }
  }

This step creates a new structure with customerInfo and auditLogs fields for each Cust_Id.

Step 3: Populating Customer Information

Now, we’ll populate the customerInfo field with the customer details from the first API response. We’ll use the shift operation again.

 {
    "operation": "shift",
    "spec": {
      "*": {
        "[0]": {
          "Cust_Id": "@(2,Cust_Id).customerInfo.Cust_Id",
          "Name": "@(2,Cust_Id).customerInfo.Name",
          "Contact": "@(2,Cust_Id).customerInfo.Contact",
          "*": "@(2,Cust_Id).customerInfo.&"
        }
      }
    }
  }

In this specification:

  • The [0] matches the first element in the grouped array (assuming customer information comes first).
  • The @(2,Cust_Id) notation fetches the Cust_Id from two levels up.
  • The * matches any other key in the customer information record, and &(0,&) retains the original key names.

Step 4: Populating Audit Logs

Finally, we’ll populate the auditLogs field with the audit log details from the second API response. We’ll use the shift operation again, but this time, we’ll append the audit logs to the auditLogs array.

 {
    "operation": "shift",
    "spec": {
      "*": {
        "[1]": {
          "Audit_Id": "@(2,Cust_Id).auditLogs[]",
          "Timestamp": "@(2,Cust_Id).auditLogs[]",
          "Action": "@(2,Cust_Id).auditLogs[]",
          "*": "@(2,Cust_Id).auditLogs[]&"
        }
      }
    }
  }
]

In this specification:

  • The [1] matches the second element in the grouped array (assuming audit logs come second).
  • The [] after auditLogs indicates that we want to append the values to the auditLogs array.

Complete Jolt Specification

Here’s the complete Jolt specification:

[
  {
    "operation": "shift",
    "spec": {
      "*": {
        "Cust_Id": "@(0,Cust_Id)",
        "*"     : "@(0,Cust_Id).&"
      }
    }
  },
  {
    "operation": "modify-default",
    "spec": {
      "*": {
        "customerInfo": "",
        "auditLogs": []
      }
    }
  },
  {
    "operation": "shift",
    "spec": {
      "*": {
        "[0]": {
          "Cust_Id": "@(2,Cust_Id).customerInfo.Cust_Id",
          "Name": "@(2,Cust_Id).customerInfo.Name",
          "Contact": "@(2,Cust_Id).customerInfo.Contact",
          "*": "@(2,Cust_Id).customerInfo.&"
        }
      }
    }
  },
  {
    "operation": "shift",
    "spec": {
      "*": {
        "[1]": {
          "Audit_Id": "@(2,Cust_Id).auditLogs[]",
          "Timestamp": "@(2,Cust_Id).auditLogs[]",
          "Action": "@(2,Cust_Id).auditLogs[]",
          "*": "@(2,Cust_Id).auditLogs[]&"
        }
      }
    }
  }
]

Testing the Flow

Now that we have our NiFi flow and Jolt specification, it’s time to test it. Follow these steps:

  1. Start the NiFi Flow: Start the processors in your NiFi flow.
  2. Provide Input Data: Ensure that the GetFile or InvokeHTTP processors are receiving the JSON responses from your APIs or files.
  3. Monitor the Flow: Monitor the flow file progression through the processors. Check for any errors or warnings in the processor logs.
  4. Inspect the Output: Verify that the PutFile processor is writing the merged JSON data to the specified directory.
  5. Validate the Output: Open the merged JSON file and ensure that the data is merged correctly. Check that customer information and audit logs are combined under the correct Cust_Id, and that any records without matching IDs are excluded.

Handling Edge Cases

In real-world scenarios, you might encounter edge cases. Let’s discuss some common ones and how to handle them:

  • Missing Cust_Id: If a record is missing the Cust_Id, you might want to route it to a separate queue for further investigation or error handling. You can use a RouteOnAttribute processor to check for the presence of Cust_Id and route the flow file accordingly.
  • API Errors: If one of the APIs returns an error, you should handle it gracefully. You can use the RetryFlowFile processor to retry the API call, or route the flow file to an error queue for manual intervention.
  • Large Datasets: If you’re dealing with large datasets, consider optimizing your NiFi flow for performance. You can use techniques like compression, data partitioning, and parallel processing to improve throughput.

Conclusion

Merging JSON API responses based on a unique identifier using Apache NiFi and Jolt is a powerful technique for building integrated data solutions. By following the steps outlined in this article, you can efficiently combine data from multiple sources into a unified format. Remember to handle edge cases and optimize your flow for performance to ensure your data pipeline is robust and scalable. Now you're equipped to tackle those complex data integration challenges like a pro! Keep exploring, keep building, and happy data flowing, guys!