Skip to main content

Ingest Email Messages From Salesforce Using Unstructured.io

Learn how to ingest email messages from Salesforce


Salesforce is a cloud-based customer relationship management (CRM) platform that assists businesses in managing their relationships and interactions with customers and prospects. It offers a comprehensive suite of tools including Sales Cloud, Service Cloud, Marketing Cloud, Commerce Cloud, and Analytics Cloud, among others, to streamline various business processes. In Salesforce, email messages play a crucial role in customer communication and interaction tracking. The platform integrates seamlessly with popular email services like Outlook and Gmail through Salesforce Inbox, allowing users to log emails directly into Salesforce. Additionally, Salesforce supports the creation of email templates, enabling consistent and personalized communication using merge fields from Salesforce records. In this tutorial, we are going to ingest the email messages from Salesforce to the Clarifai app and then use LLM to classify and summarize emails.

Prerequisites

  • Setting up the Clarifai Python SDK along with PAT. Refer to the installation and configuration with the PAT token here.
    note

    Guide to get your PAT

import os
os.environ['CLARIFAI_PAT'] ="YOUR_PAT"
  • Install the required packages.
! pip install "unstructured[clarifai]" 
! pip install "unstructured[salesforce]"
  • Setup JWT authorization in Salesforce. Refer this page for instructions.

Initialization

The data we are going to ingest into our app is the body of the email messages in Salesforce. Since we have already set the JWT authorization for Salesforce let’s import some required libraries,

import os  # Importing the os module for environment variable access

# Importing necessary configurations and classes from unstructured.ingest.connector.salesforce
from unstructured.ingest.connector.salesforce import SalesforceAccessConfig, SimpleSalesforceConfig

# Importing configuration classes from unstructured.ingest.interfaces
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig

# Importing the SalesforceRunner class from unstructured.ingest.runner
from unstructured.ingest.runner import SalesforceRunner

# Importing necessary configurations and classes from unstructured.ingest.connector.clarifai
from unstructured.ingest.connector.clarifai import (
ClarifaiAccessConfig,
ClarifaiWriteConfig,
SimpleClarifaiConfig,
)

# Importing base writer and ClarifaiWriter from unstructured.ingest.runner.writers.clarifai
from unstructured.ingest.runner.writers.base_writer import Writer
from unstructured.ingest.runner.writers.clarifai import ClarifaiWriter

Next we will have to write a function to setup the ingestion configurations required to upload the data into our app in the Clarifai platform.

def clarifai_writer() -> Writer:
# This function defines a writer for the Clarifai service.
# It returns an instance of ClarifaiWriter class.

return ClarifaiWriter(
connector_config=SimpleClarifaiConfig(
# Configuration for accessing the Clarifai API.
access_config=ClarifaiAccessConfig(
api_key="PAT" # API key for accessing the Clarifai service.
),
# Configuration specific to the Clarifai application.
app_id="app_id", # The ID of the Clarifai application.
user_id="user_id" # The ID of the Clarifai user.
),
write_config=ClarifaiWriteConfig() # Configuration for writing data to Clarifai.
)

Data Ingestion

In data ingestion, there are two important concepts, Source Connector and Destination Connector. For our use case the Source Connector will fetch the data from Salesforce and the Destination Connector will send the transformed data to the Clarifai app.

Click here to learn more about Ingestion.

info

In SalesforceAccessConfig, the category is set as EmailMessage for this task. Unstructured.io also supports other fields like Account, Case, Campaign, and Lead .

if __name__ == "__main__":
# Creating a writer instance using the clarifai_writer function
writer = clarifai_writer()

# Creating an instance of SalesforceRunner with various configurations
runner = SalesforceRunner(
processor_config=ProcessorConfig(
verbose=True, # Enable verbose output
output_dir="salesforce-output", # Directory to store output locally
num_processes=2, # Number of processes to use
),
read_config=ReadConfig(), # Configuration for reading data
partition_config=PartitionConfig(), # Configuration for partitioning data
connector_config=SimpleSalesforceConfig(
access_config=SalesforceAccessConfig(
consumer_key="YOUR_SALESFORCE_CONSUMER_KEY", # Salesforce consumer key
private_key="PATH_TO_server.key_FILE", # Path to the private key file
),
username="SALESFORCE_USERNAME", # Salesforce username
categories=["EmailMessage"], # Categories to read data from
recursive=True, # Whether to recursively read data
),
writer=writer, # Writer to use for output
writer_kwargs={}, # Additional arguments for the writer
)

# Running the SalesforceRunner
runner.run()
Output
2024-06-12 09:15:19,685 MainProcess DEBUG    updating download directory to: /root/.cache/unstructured/ingest/salesforce/54298b8afd
2024-06-12 09:15:19,690 MainProcess INFO running pipeline: DocFactory -> Reader -> Partitioner -> Writer -> Copier with config: {"reprocess": false, "verbose": true, "work_dir": "/root/.cache/unstructured/ingest/pipeline", "output_dir": "salesforce-output", "num_processes": 2, "raise_on_error": false}
2024-06-12 09:15:19,785 MainProcess INFO Running doc factory to generate ingest docs. Source connector: {"processor_config": {"reprocess": false, "verbose": true, "work_dir": "/root/.cache/unstructured/ingest/pipeline", "output_dir": "salesforce-output", "num_processes": 2, "raise_on_error": false}, "read_config": {"download_dir": "/root/.cache/unstructured/ingest/salesforce/54298b8afd", "re_download": false, "preserve_downloads": false, "download_only": false, "max_docs": null}, "connector_config": {"access_config": {"consumer_key": "*******", "private_key": "*******"}, "categories": ["EmailMessage"], "username": "adithyansukumar@testing.com", "recursive": true}}
2024-06-12 09:15:21,944 MainProcess INFO processing 14 docs via 2 processes
2024-06-12 09:15:21,951 MainProcess INFO Calling Reader with 14 docs
2024-06-12 09:15:21,953 MainProcess INFO Running source node to download data associated with ingest docs
2024-06-12 09:15:40,018 MainProcess INFO Calling Partitioner with 14 docs
2024-06-12 09:15:40,021 MainProcess INFO Running partition node to extract content from json files. Config: {"pdf_infer_table_structure": false, "strategy": "auto", "ocr_languages": null, "encoding": null, "additional_partition_args": {}, "skip_infer_table_types": null, "fields_include": ["element_id", "text", "type", "metadata", "embeddings"], "flatten_metadata": false, "metadata_exclude": [], "metadata_include": [], "partition_endpoint": "https://api.unstructured.io/general/v0/general", "partition_by_api": false, "api_key": "*******", "hi_res_model_name": null}, partition kwargs: {}]
2024-06-12 09:15:40,024 MainProcess INFO Creating /root/.cache/unstructured/ingest/pipeline/partitioned
2024-06-12 09:15:48,787 MainProcess INFO Calling Copier with 14 docs
2024-06-12 09:15:48,790 MainProcess INFO Running copy node to move content to desired output location
2024-06-12 09:15:52,096 MainProcess INFO uploading elements from 14 document(s) to the destination
2024-06-12 09:15:52,098 MainProcess INFO Calling Writer with 14 docs
2024-06-12 09:15:52,102 MainProcess INFO Running write node to upload content. Destination connector: {"write_config": {"batch_size": 50}, "connector_config": {"access_config": {"api_key": "*******"}, "app_id": "salesforce_app_3", "user_id": "8tzpjy1a841y", "dataset_id": null}, "_client": null}]
2024-06-12 09:15:52,442 MainProcess INFO Extending 3 json elements from content in salesforce-output/EmailMessage/02sdL000000BwlRQAS.eml.json
2024-06-12 09:15:52,443 MainProcess INFO Extending 2 json elements from content in salesforce-output/EmailMessage/02sdL000000By7JQAS.eml.json
2024-06-12 09:15:52,447 MainProcess INFO Extending 22 json elements from content in salesforce-output/EmailMessage/02sdL000000ByYjQAK.eml.json
2024-06-12 09:15:52,450 MainProcess INFO Extending 1 json elements from content in salesforce-output/EmailMessage/02sdL000000CJIDQA4.eml.json
2024-06-12 09:15:52,453 MainProcess INFO Extending 10 json elements from content in salesforce-output/EmailMessage/02sdL000000DGQMQA4.eml.json
2024-06-12 09:15:52,455 MainProcess INFO Extending 10 json elements from content in salesforce-output/EmailMessage/02sdL000000DGetQAG.eml.json
2024-06-12 09:15:52,456 MainProcess INFO Extending 10 json elements from content in salesforce-output/EmailMessage/02sdL000000DGwbQAG.eml.json
2024-06-12 09:15:52,458 MainProcess INFO Extending 9 json elements from content in salesforce-output/EmailMessage/02sdL000000DGyDQAW.eml.json
2024-06-12 09:15:52,460 MainProcess INFO Extending 11 json elements from content in salesforce-output/EmailMessage/02sdL000000DGzpQAG.eml.json
2024-06-12 09:15:52,462 MainProcess INFO Extending 11 json elements from content in salesforce-output/EmailMessage/02sdL000000DH1RQAW.eml.json
2024-06-12 09:15:52,463 MainProcess INFO Extending 12 json elements from content in salesforce-output/EmailMessage/02sdL000000DH33QAG.eml.json
2024-06-12 09:15:52,465 MainProcess INFO Extending 9 json elements from content in salesforce-output/EmailMessage/02sdL000000DH9VQAW.eml.json
2024-06-12 09:15:52,466 MainProcess INFO Extending 11 json elements from content in salesforce-output/EmailMessage/02sdL000000DHB7QAO.eml.json
2024-06-12 09:15:52,468 MainProcess INFO Extending 11 json elements from content in salesforce-output/EmailMessage/02sdL000000DHHZQA4.eml.json
2024-06-12 09:15:52,475 MainProcess INFO writing 132 objects to destination app salesforce_app_3
2024-06-12 09:15:55 INFO clarifai.client.input: input.py:706
Inputs Uploaded
code: SUCCESS
description: "Ok"
details: "All inputs successfully added"
req_id: "af1aca7b33bf498e898c859c47259ef4"

Chat

In the final step, we are going to chat with the data using RAG. You can use a workflow with a RAG prompter for initialising RAG. After successfully creating a workflow, you can get the URL from the Clarifai portal. After creating the rag object using workflow URL you can start retrieving text from the data we ingested using Unstructured.io.

from clarifai.rag import RAG

WORKFLOW_URL = 'rag_workflow_url'
# creating RAG object with prebuilt workflow
rag_object_from_url = RAG(workflow_url = WORKFLOW_URL)

result=rag_object_from_url.chat(messages=[{"role":"human", "content":"Summarize emails send to Wayne_llc "}])

answer = result[0]["content"].split('\n')
print(answer)
Output
'The email sent to Wayne LLC was from jane.doe@xyzhospitality.com. In her email, she mentioned that they have reviewed their inventory and are interested in placing a new order. She requested for an updated product catalog and availability list. Additionally, she also asked if there are any new products or special offers that they should be aware of.'
result=rag_object_from_url.chat(messages=[{"role":"human", "content":"What kind of product does Stark llc sell "}])

answer = result[0]["content"].split('\n')
print(answer)
Output
'Stark LLC sells office furniture, including Modular Office Furniture Sets and Office Chairs.'