Skip to main content

Use RAG With Unstructured.io

Learn how to use RAG with Unstructured.io


RAG systems are a powerful combination of two techniques: information retrieval and text generation. When you ask a question, the system searches for related details (context) and then leverages that context to generate a response using text generation methods. Using Unstructured.io we can transform the data into a format suitable for RAG. The Clarifai platform provides various LLMs that can be used for text generation inside RAG. Hence by integrating Clarifai with Unstructured.io, you can build RAG applications with ease.

Prerequisites

  • Setting up the Clarifai Python SDK along with PAT. Refer to the installation and configuration with the PAT token here.
note

Guide to get your PAT

import os
os.environ['CLARIFAI_PAT'] ="YOUR_PAT"
  • Install the required packages.
! pip install "unstructured[clarifai]" #make sure the unstructured version is 0.13 or above
! pip install "unstructured[s3]" #since our source is S3
! pip install httpx

Initialization

The first part of creating an app based on Unstructured.io is to set up the data we are going to ingest into the app. The data we are going to use will be stored in the s3 bucket. To access the data using Unstructured.io, we have to provide some AWS access keys.

info

Click here to learn how to get the s3 access keys.

access_key='YOUR_S3_ACCESS_KEYS'
secret_access='YOUR_S3_SECRET_ACCESS_KEYS'

After setting up the access keys for the s3 bucket, let’s import some necessary libraries.

import os  # Importing the os module for operating system related functionalities.
from unstructured.ingest.connector.fsspec.s3 import S3AccessConfig, SimpleS3Config # Importing S3AccessConfig and SimpleS3Config classes from the fsspec.s3 module of the unstructured.ingest.connector package.
from unstructured.ingest.interfaces import ( # Importing multiple classes from the interfaces module of the unstructured.ingest package.
PartitionConfig,
ProcessorConfig,
ChunkingConfig,
ReadConfig,
)
from unstructured.ingest.runner import S3Runner # Importing the S3Runner class from the runner module of the unstructured.ingest package.

# Importing classes related to Clarifai integration from the clarifai module of the unstructured.ingest.connector package.
from unstructured.ingest.connector.clarifai import (
ClarifaiAccessConfig,
ClarifaiWriteConfig,
SimpleClarifaiConfig,
)

# Importing Writer class from the base_writer module of the unstructured.ingest.runner.writers package.
from unstructured.ingest.runner.writers.base_writer import Writer

# Importing ClarifaiWriter class from the clarifai module of the unstructured.ingest.runner.writers package.
from unstructured.ingest.runner.writers.clarifai import (
ClarifaiWriter,
)

Next, we have to write a function that will configure the target Clarifai app where the ingested documents will be loaded,

def clarifai_writer() -> Writer:
# This function defines a writer for the Clarifai service.
# It returns an instance of ClarifaiWriter class.

return ClarifaiWriter(
connector_config=SimpleClarifaiConfig(
# Configuration for accessing the Clarifai API.
access_config=ClarifaiAccessConfig(
api_key="PAT" # API key for accessing the Clarifai service.
),
# Configuration specific to the Clarifai application.
app_id="app_id", # The ID of the Clarifai application.
user_id="user_id" # The ID of the Clarifai user.
),
write_config=ClarifaiWriteConfig() # Configuration for writing data to Clarifai.
)

Data Ingestion

In data ingestion, there are two important concepts Source Connector and Destination Connector. For our use case the Source Connector will fetch the data from the S3 bucket and the Destination Connector will send the transformed data to the Clarifai app.

Click here to learn more about Ingestion.

if __name__ == "__main__":
writer = clarifai_writer()
# Instantiating an S3Runner object with various configurations.
runner = S3Runner(
processor_config=ProcessorConfig(
verbose=True, # Setting verbosity to True for detailed output.
output_dir="s3-output-local", # Setting the output directory for processed data.
num_processes=2, # Specifying the number of processes to be used.
),
read_config=ReadConfig(), # Instantiating a ReadConfig object with default configurations.
partition_config=PartitionConfig(), # Instantiating a PartitionConfig object with default configurations.
connector_config=SimpleS3Config( # Instantiating a SimpleS3Config object with S3 access configurations and remote URL.
access_config=S3AccessConfig(
key=access_key, # Setting the access key for S3.
secret=secret_access, # Setting the secret access key for S3.
),
remote_url="your_s3_data_uri", # Specifying the remote URL for S3 data.
),
writer=writer, # Passing the ClarifaiWriter object as the writer.
writer_kwargs={}, # Passing empty keyword arguments to the writer.
)

# Running the S3Runner instance.
runner.run()
Output
2024-04-15 13:21:07,085 MainProcess DEBUG    updating download directory to: /root/.cache/unstructured/ingest/s3/5b2778ce2a
DEBUG:unstructured.ingest:updating download directory to: /root/.cache/unstructured/ingest/s3/5b2778ce2a
2024-04-15 13:21:07,091 MainProcess INFO running pipeline: DocFactory -> Reader -> Partitioner -> Writer -> Copier with config: {"reprocess": false, "verbose": true, "work_dir": "/root/.cache/unstructured/ingest/pipeline", "output_dir": "s3-output-local", "num_processes": 2, "raise_on_error": false}
INFO:unstructured.ingest:running pipeline: DocFactory -> Reader -> Partitioner -> Writer -> Copier with config: {"reprocess": false, "verbose": true, "work_dir": "/root/.cache/unstructured/ingest/pipeline", "output_dir": "s3-output-local", "num_processes": 2, "raise_on_error": false}
2024-04-15 13:21:07,210 MainProcess INFO Running doc factory to generate ingest docs. Source connector: {"processor_config": {"reprocess": false, "verbose": true, "work_dir": "/root/.cache/unstructured/ingest/pipeline", "output_dir": "s3-output-local", "num_processes": 2, "raise_on_error": false}, "read_config": {"download_dir": "/root/.cache/unstructured/ingest/s3/5b2778ce2a", "re_download": false, "preserve_downloads": false, "download_only": false, "max_docs": null}, "connector_config": {"remote_url": "s3://new-bucket-for-databricks-integration-23102023/procurement.txt", "uncompress": false, "recursive": false, "file_glob": null, "access_config": {"anonymous": false, "endpoint_url": null, "token": null}, "protocol": "s3", "path_without_protocol": "new-bucket-for-databricks-integration-23102023/procurement.txt", "dir_path": "new-bucket-for-databricks-integration-23102023", "file_path": "procurement.txt"}}
2024-04-15 13:21:08,156 MainProcess INFO processing 1 docs via 2 processes
INFO:unstructured.ingest:processing 1 docs via 2 processes
2024-04-15 13:21:08,161 MainProcess INFO Calling Reader with 1 docs
INFO:unstructured.ingest:Calling Reader with 1 docs
2024-04-15 13:21:08,164 MainProcess INFO Running source node to download data associated with ingest docs
INFO:unstructured.ingest:Running source node to download data associated with ingest docs
2024-04-15 13:21:11,016 MainProcess INFO Calling Partitioner with 1 docs
INFO:unstructured.ingest:Calling Partitioner with 1 docs
2024-04-15 13:21:11,026 MainProcess INFO Running partition node to extract content from json files. Config: {"pdf_infer_table_structure": true, "strategy": "auto", "ocr_languages": null, "encoding": null, "additional_partition_args": {}, "skip_infer_table_types": null, "fields_include": ["element_id", "text", "type", "metadata", "embeddings"], "flatten_metadata": false, "metadata_exclude": [], "metadata_include": [], "partition_endpoint": "https://api.unstructured.io/general/v0/general", "partition_by_api": false, "api_key": null, "hi_res_model_name": null}, partition kwargs: {}]
INFO:unstructured.ingest:Running partition node to extract content from json files. Config: {"pdf_infer_table_structure": true, "strategy": "auto", "ocr_languages": null, "encoding": null, "additional_partition_args": {}, "skip_infer_table_types": null, "fields_include": ["element_id", "text", "type", "metadata", "embeddings"], "flatten_metadata": false, "metadata_exclude": [], "metadata_include": [], "partition_endpoint": "https://api.unstructured.io/general/v0/general", "partition_by_api": false, "api_key": null, "hi_res_model_name": null}, partition kwargs: {}]
2024-04-15 13:21:11,033 MainProcess INFO Creating /root/.cache/unstructured/ingest/pipeline/partitioned
INFO:unstructured.ingest:Creating /root/.cache/unstructured/ingest/pipeline/partitioned
2024-04-15 13:21:18,576 MainProcess INFO Calling Copier with 1 docs
INFO:unstructured.ingest:Calling Copier with 1 docs
2024-04-15 13:21:18,581 MainProcess INFO Running copy node to move content to desired output location
INFO:unstructured.ingest:Running copy node to move content to desired output location
2024-04-15 13:21:20,011 MainProcess INFO uploading elements from 1 document(s) to the destination
INFO:unstructured.ingest:uploading elements from 1 document(s) to the destination
2024-04-15 13:21:20,015 MainProcess INFO Calling Writer with 1 docs
INFO:unstructured.ingest:Calling Writer with 1 docs
2024-04-15 13:21:20,018 MainProcess INFO Running write node to upload content. Destination connector: {"write_config": {"batch_size": 50}, "connector_config": {"access_config": {"api_key": "*******"}, "app_id": "unst-clf", "user_id": "8tzpjy1a841y", "dataset_id": null}, "_client": null}]
INFO:unstructured.ingest:Running write node to upload content. Destination connector: {"write_config": {"batch_size": 50}, "connector_config": {"access_config": {"api_key": "***REDACTED***"}, "app_id": "unst-clf", "user_id": "8tzpjy1a841y", "dataset_id": null}, "_client": null}]
2024-04-15 13:21:20,517 MainProcess INFO Extending 204 json elements from content in s3-output-local/procurement.txt.json
INFO:unstructured.ingest:Extending 204 json elements from content in s3-output-local/procurement.txt.json
2024-04-15 13:21:20,532 MainProcess INFO writing 204 objects to destination app unst-clf
INFO:unstructured.ingest:writing 204 objects to destination app unst-clf
2024-04-15 13:21:23 INFO clarifai.client.input: input.py:687
Inputs Uploaded
code: SUCCESS
description: "Ok"
details: "All inputs successfully added"
req_id: "5c9d83ec06888714335749a3aa572c0b"

Chat

In the final step, we are going to perform information retrieval using RAG based on the data we ingested from S3 to the Clarifai app using Unstructured.io. You can use a workflow with a RAG prompter for initialising RAG. After successfully creating a workflow, you can get the URL from the Clarifai portal. After creating the rag object using workflow URL you can start retrieving text from the data we ingested using Unstructured.io.

from clarifai.rag import RAG

WORKFLOW_URL = 'rag_workflow_url'
# creating RAG object with prebuilt workflow
rag_object_from_url = RAG(workflow_url = WORKFLOW_URL)

result=rag_object_from_url.chat(messages=[{"role":"human", "content":"What is Central Public Procurement Portal"}])

# Extract the content of the response and split it by newline character ('\n') into a list 'ans'.
answer = result[0]["content"].split('\n')
print(answer)
Output
"The Central Public Procurement Portal (CPPP) is a platform designed, developed, and hosted by the National Informatics Centre in association with the Department of Expenditure. Its main goal is to ensure transparency in public procurement processes. The portal provides a single access point to information on procurements made across various Ministries and Departments. It contains features like e-publishing and e-procurement modules and it's mandatory for all Ministries/Departments of the Central Government and other entities to publish their tender enquiries and information about the resulting contracts on the CPPP. The portal provides access to documents related to pre-qualification, Bidders’ enlistment, Bidding documents, and other procurement-related information. It also facilitates e-procurement for Ministries/Departments that do not have large procurement volumes or only require procurement for day-to-day operations."