Skip to main content

Add Inputs From Cloud Storage

Learn how to extract inputs from cloud platforms to Clarifai


You can add inputs from various cloud storage platforms, such as S3 (Amazon Simple Storage Service) and GCP (Google Cloud Platform), by simply providing their corresponding URLs. In cases where access credentials are necessary, you can include them as part of the request.

This simplifies the process of adding inputs to our platform, offering a more efficient alternative to the conventional method of using the PostInputs endpoint for users who already have data stored in the cloud platforms.

note

This functionality has been introduced starting from the 10.1 release.

info
  • Image files stored in the cloud platforms will be treated as image inputs, video files as video inputs, etc. Archives will be extracted, and their contents will also be processed like this.

  • We do not support extraction of archives located inside other archives.

  • The cloud URL will serve as a filter prefix. For instance, in the case of an S3 URL like s3:/bucket/images_folder/abc, files within the images_folder will be processed starting with abc, or within a subfolder beginning with abc. For example, files such as bucket/images_folder/abcImage.png or bucket/images_folder/abc-1/Data.zip will be processed accordingly.

Add Inputs via Cloud Storage URLs

Below is an example of pulling inputs from a subfolder of an S3 bucket.

######################################################################################################
# In this section, we set the user authentication, app ID, ID to collect statistics about inputs job
# to be created, and cloud storage URL. Change these strings to run your own example.
######################################################################################################

USER_ID = "YOUR_USER_ID_HERE"
# Your PAT (Personal Access Token) can be found in the Portal under Account > Security
PAT = "YOUR_PAT_HERE"
APP_ID = "YOUR_APP_ID_HERE"
# Change these to create your own extraction job
INPUTS_JOB_ID = "" # If empty, ID will be autogenerated; if non-empty, the given ID will be used
CLOUD_STORAGE_URL = "s3://samples.clarifai.com/storage/"

##########################################################################
# YOU DO NOT NEED TO CHANGE ANYTHING BELOW THIS LINE TO RUN THIS EXAMPLE
##########################################################################

from clarifai_grpc.channel.clarifai_channel import ClarifaiChannel
from clarifai_grpc.grpc.api import resources_pb2, service_pb2, service_pb2_grpc
from clarifai_grpc.grpc.api.status import status_code_pb2

channel = ClarifaiChannel.get_grpc_channel()
stub = service_pb2_grpc.V2Stub(channel)

metadata = (("authorization", "Key " + PAT),)

userDataObject = resources_pb2.UserAppIDSet(user_id=USER_ID, app_id=APP_ID)

post_inputs_response = stub.PostInputsDataSources(
service_pb2.PostInputsDataSourcesRequest(
user_app_id=userDataObject,
app_pat=PAT,
data_sources=[
resources_pb2.InputsDataSource(
inputs_add_job_id=INPUTS_JOB_ID,
url=resources_pb2.DataSourceURL(
url=CLOUD_STORAGE_URL,
# Uncomment to add credentials
# credentials=resources_pb2.DataSourceCredentials(
# s3_creds=resources_pb2.AWSCreds(
# id="ADD_ACCESS_ID_HERE",
# secret="ADD_SECRET_HERE",
# region="ADD_AWS_REGION_HERE"
# )
# If using GCP
# gcpCreds="" # GCP uses service account key data (creds.json) as Byte array for authentication
# ),
),
)
],
),
metadata=metadata,
)

if post_inputs_response.status.code != status_code_pb2.SUCCESS:
print(post_inputs_response.status)
raise Exception(
"Post inputs failed, status: " + post_inputs_response.status.description
)

print(post_inputs_response)
Output Example
status {
code: SUCCESS
description: "Ok"
req_id: "8759d87e31403bbd838794fe6016f36d"
}
inputs_add_jobs {
id: "2581ebd8d7cd42e7ac0da2bec14d5426"
progress {
}
created_at {
seconds: 1708361354
nanos: 820114719
}
modified_at {
seconds: 1708361354
nanos: 847655746
}
extraction_jobs {
status {
code: JOB_QUEUED
description: "Job is queued to be ran."
}
id: "2a6f1f69cced42029986a72009e7d4da"
url: "s3://samples.clarifai.com/storage/"
progress {
}
created_at {
seconds: 1708361354
nanos: 835105396
}
modified_at {
seconds: 1708361354
nanos: 835105396
}
}
status {
code: JOB_COMPLETED
description: "Job successfully ran."
}
}

Track Upload Process

After starting to pull the inputs from a cloud storage service, you can track the progress of the exercise. Note that we’ll use the inputs_extraction_job_id returned after running the extraction job.

###################################################################################################
# In this section, we set the user authentication, app ID, and the inputs extraction job ID.
# Change these strings to run your own example.
###################################################################################################

USER_ID = "YOUR_USER_ID_HERE"
# Your PAT (Personal Access Token) can be found in the Portal under Account > Security
PAT = "YOUR_PAT_HERE"
APP_ID = "YOUR_APP_ID_HERE"
# Change this ID to whatever inputs you want to track their upload process
INPUTS_EXTRACTION_JOB_ID = "2a6f1f69cced42029986a72009e7d4da"

##########################################################################
# YOU DO NOT NEED TO CHANGE ANYTHING BELOW THIS LINE TO RUN THIS EXAMPLE
##########################################################################

from clarifai_grpc.channel.clarifai_channel import ClarifaiChannel
from clarifai_grpc.grpc.api import resources_pb2, service_pb2, service_pb2_grpc
from clarifai_grpc.grpc.api.status import status_code_pb2

channel = ClarifaiChannel.get_grpc_channel()
stub = service_pb2_grpc.V2Stub(channel)

metadata = (("authorization", "Key " + PAT),)

userDataObject = resources_pb2.UserAppIDSet(user_id=USER_ID, app_id=APP_ID)

get_inputs_extraction_response = stub.GetInputsExtractionJob(
service_pb2.GetInputsExtractionJobRequest(
user_app_id=userDataObject,
inputs_extraction_job_id=INPUTS_EXTRACTION_JOB_ID
),
metadata=metadata,
)

if get_inputs_extraction_response.status.code != status_code_pb2.SUCCESS:
print(get_inputs_extraction_response.status)
raise Exception(
"Get input failed, status: " + get_inputs_extraction_response.status.description
)

print(get_inputs_extraction_response)
Output Example
status {
code: SUCCESS
description: "Ok"
req_id: "bae1f832c8931d47388f875653e7035d"
}
inputs_extraction_job {
status {
code: JOB_COMPLETED
description: "Job successfully ran."
}
id: "2a6f1f69cced42029986a72009e7d4da"
url: "s3://samples.clarifai.com/storage/"
progress {
image_inputs_count: 3
video_inputs_count: 1
}
created_at {
seconds: 1708361354
nanos: 835105000
}
modified_at {
seconds: 1708361355
nanos: 386004000
}
}

List Inputs Extraction Jobs

You can list all your inputs extraction jobs and get their details.

##################################################################
# In this section, we set the user authentication and app ID.
# Change these strings to run your own example.
###################################################################

USER_ID = "YOUR_USER_ID_HERE"
# Your PAT (Personal Access Token) can be found in the Portal under Account > Security
PAT = "YOUR_PAT_HERE"
APP_ID = "YOUR_APP_ID_HERE"

##########################################################################
# YOU DO NOT NEED TO CHANGE ANYTHING BELOW THIS LINE TO RUN THIS EXAMPLE
##########################################################################

from clarifai_grpc.channel.clarifai_channel import ClarifaiChannel
from clarifai_grpc.grpc.api import resources_pb2, service_pb2, service_pb2_grpc
from clarifai_grpc.grpc.api.status import status_code_pb2

channel = ClarifaiChannel.get_grpc_channel()
stub = service_pb2_grpc.V2Stub(channel)

metadata = (("authorization", "Key " + PAT),)

userDataObject = resources_pb2.UserAppIDSet(user_id=USER_ID, app_id=APP_ID)

list_inputs_extraction_jobs = stub.ListInputsExtractionJobs(
service_pb2.ListInputsExtractionJobsRequest(
user_app_id=userDataObject, per_page=1000, page=1
),
metadata=metadata,
)

if list_inputs_extraction_jobs.status.code != status_code_pb2.SUCCESS:
print(list_inputs_extraction_jobs.status)
raise Exception(
"List input failed, status: " + list_inputs_extraction_jobs.status.description
)

print(list_inputs_extraction_jobs)
Output Example
----
inputs_extraction_jobs {
status {
code: JOB_COMPLETED
description: "Job successfully ran."
}
id: "487d863784804390a92e1108ee1ae1fb"
url: "s3://samples.clarifai.com/storage/"
progress {
image_inputs_count: 3
video_inputs_count: 1
}
created_at {
seconds: 1708406450
nanos: 685101000
}
modified_at {
seconds: 1708406451
nanos: 191007000
}
}
inputs_extraction_jobs {
status {
code: JOB_COMPLETED
description: "Job successfully ran."
}
id: "16d65cdff5d64ae8ba94ae59f5d7f43c"
url: "s3://samples.clarifai.com/storage/"
progress {
image_inputs_count: 3
video_inputs_count: 1
}
created_at {
seconds: 1708406156
nanos: 2926000
}
modified_at {
seconds: 1708406156
nanos: 560108000
}
}
inputs_extraction_jobs {
status {
code: JOB_COMPLETED
description: "Job successfully ran."
}
id: "423b4dfa36f64fffbe79cf845918d4c0"
url: "s3://samples.clarifai.com/storage/"
progress {
image_inputs_count: 3
video_inputs_count: 1
}
created_at {
seconds: 1708405684
nanos: 297689000
}
modified_at {
seconds: 1708405684
nanos: 778885000
}
}
inputs_extraction_jobs {
status {
code: JOB_COMPLETED
description: "Job successfully ran."
}
id: "a5af6a185ab148d4b7eb02e713d3340d"
url: "s3://samples.clarifai.com/storage/"
progress {
image_inputs_count: 3
video_inputs_count: 1
}
created_at {
seconds: 1708405639
nanos: 186106000
}
modified_at {
seconds: 1708405639
nanos: 696943000
}
}
inputs_extraction_jobs {
status {
code: JOB_COMPLETED
description: "Job successfully ran."
}
id: "1c10da09706d40448bf11fc5aaa8664b"
url: "s3://samples.clarifai.com/storage/"
progress {
image_inputs_count: 3
video_inputs_count: 1
}
created_at {
seconds: 1708405297
nanos: 953730000
}
modified_at {
seconds: 1708405298
nanos: 506209000
}
}
inputs_extraction_jobs {
status {
code: JOB_COMPLETED
description: "Job successfully ran."
}
id: "10ad7ba72e5e49899a042637178c9452"
url: "s3://samples.clarifai.com/storage/"
progress {
image_inputs_count: 3
video_inputs_count: 1
}
created_at {
seconds: 1708404787
nanos: 575667000
}
modified_at {
seconds: 1708404788
nanos: 141744000
}
}
inputs_extraction_jobs {
status {
code: JOB_COMPLETED
description: "Job successfully ran."
}
id: "8d7a240f39494ce18c3a5f4aeea687c1"
url: "s3://samples.clarifai.com/storage/"
progress {
image_inputs_count: 3
video_inputs_count: 1
}
created_at {
seconds: 1708403207
nanos: 89134000
}
modified_at {
seconds: 1708403207
nanos: 729276000
}
}
inputs_extraction_jobs {
status {
code: JOB_COMPLETED
description: "Job successfully ran."
}
id: "2a6f1f69cced42029986a72009e7d4da"
url: "s3://samples.clarifai.com/storage/"
progress {
image_inputs_count: 3
video_inputs_count: 1
}
created_at {
seconds: 1708361354
nanos: 835105000
}
modified_at {
seconds: 1708361355
nanos: 386004000
}
}
inputs_extraction_jobs {
status {
code: JOB_COMPLETED
description: "Job successfully ran."
}
id: "6db64516daf04abd97852407f9076e42"
url: "s3://samples.clarifai.com/storage/"
progress {
image_inputs_count: 3
video_inputs_count: 1
}
created_at {
seconds: 1708361312
nanos: 309789000
}
modified_at {
seconds: 1708361313
nanos: 435552000
}
}
inputs_extraction_jobs {
status {
code: JOB_COMPLETED
description: "Job successfully ran."
}
id: "7e4bd42e84294e8f9423e0a01783e3b1"
url: "s3://samples.clarifai.com/storage/"
progress {
image_inputs_count: 3
video_inputs_count: 1
}
created_at {
seconds: 1708354769
nanos: 17131000
}
modified_at {
seconds: 1708354769
nanos: 473323000
}
input_template {
data {
concepts {
id: "lamborghini23_A"
value: 1
}
concepts {
id: "spiderman_a"
value: 1
}
metadata {
fields {
key: "id"
value {
string_value: "id001"
}
}
}
}
dataset_ids: "dataset-1"
}
}
-----

Cancel Extraction Jobs

You can cancel the process of extraction of inputs from a cloud storage service. Note that we’ll use the inputs_extraction_job_id returned after starting the extraction process.

#####################################################################################################
# In this section, we set the user authentication, app ID, and the inputs extraction job ID.
# Change these strings to run your own example.
#####################################################################################################

USER_ID = "YOUR_USER_ID_HERE"
# Your PAT (Personal Access Token) can be found in the Portal under Account > Security
PAT = "YOUR_PAT_HERE"
APP_ID = "YOUR_APP_ID_HERE"
# Change this ID to whatever inputs you want to cancel their upload process
INPUTS_EXTRACTION_JOB_ID = "2a6f1f69cced42029986a72009e7d4da"

##########################################################################
# YOU DO NOT NEED TO CHANGE ANYTHING BELOW THIS LINE TO RUN THIS EXAMPLE
##########################################################################

from clarifai_grpc.channel.clarifai_channel import ClarifaiChannel
from clarifai_grpc.grpc.api import resources_pb2, service_pb2, service_pb2_grpc
from clarifai_grpc.grpc.api.status import status_code_pb2

channel = ClarifaiChannel.get_grpc_channel()
stub = service_pb2_grpc.V2Stub(channel)

metadata = (("authorization", "Key " + PAT),)

userDataObject = resources_pb2.UserAppIDSet(user_id=USER_ID, app_id=APP_ID)

cancel_inputs_extraction_response = stub.CancelInputsExtractionJobs(
service_pb2.CancelInputsExtractionJobsRequest(
user_app_id=userDataObject, ids=[INPUTS_EXTRACTION_JOB_ID]
),
metadata=metadata,
)

if cancel_inputs_extraction_response.status.code != status_code_pb2.SUCCESS:
print(cancel_inputs_extraction_response.status)
raise Exception(
"Cancel input failed, status: "
+ cancel_inputs_extraction_response.status.description
)

print(cancel_inputs_extraction_response)

Add Inputs With Concepts and Datasets

You can also add inputs from cloud storage platforms while attaching relevant concepts, assigning them to an already existing dataset, or adding other metadata information to them.

The input_template parameter allows you to do that.

#####################################################################################################
# In this section, we set the user authentication, app ID, and the details of the extraction job.
# Change these strings to run your own example.
####################################################################################################

USER_ID = "YOUR_USER_ID_HERE"
# Your PAT (Personal Access Token) can be found in the Portal under Account > Security
PAT = "YOUR_PAT_HERE"
APP_ID = "YOUR_APP_ID_HERE"
# Change these to make your own extraction
INPUTS_JOB_ID = ""
CLOUD_STORAGE_URL = "s3://samples.clarifai.com/storage/"
CUSTOM_METADATA = {"id": "id001"}
DATASET_ID_1 = "dataset-1"
CONCEPT_ID_1 = "lamborghini23_A"
CONCEPT_ID_2 = "spiderman_a"

##############################################################################
# YOU DO NOT NEED TO CHANGE ANYTHING BELOW THIS LINE TO RUN THIS EXAMPLE
##############################################################################

from clarifai_grpc.channel.clarifai_channel import ClarifaiChannel
from clarifai_grpc.grpc.api import resources_pb2, service_pb2, service_pb2_grpc
from clarifai_grpc.grpc.api.status import status_code_pb2
from google.protobuf.struct_pb2 import Struct

channel = ClarifaiChannel.get_grpc_channel()
stub = service_pb2_grpc.V2Stub(channel)

metadata = (("authorization", "Key " + PAT),)

userDataObject = resources_pb2.UserAppIDSet(user_id=USER_ID, app_id=APP_ID)

input_metadata = Struct()

input_metadata.update(CUSTOM_METADATA)

post_inputs_response = stub.PostInputsDataSources(
service_pb2.PostInputsDataSourcesRequest(
user_app_id=userDataObject,
app_pat=PAT,
data_sources=[
resources_pb2.InputsDataSource(
inputs_add_job_id=INPUTS_JOB_ID,
url=resources_pb2.DataSourceURL(url=CLOUD_STORAGE_URL),
input_template=resources_pb2.Input(
dataset_ids=[DATASET_ID_1], # List of dataset IDs that this input is part of
data=resources_pb2.Data(
metadata=input_metadata,
concepts=[
resources_pb2.Concept(id=CONCEPT_ID_1, value=1),
resources_pb2.Concept(id=CONCEPT_ID_2, value=1),
],
),
),
)
],
),
metadata=metadata,
)


if post_inputs_response.status.code != status_code_pb2.SUCCESS:
print(post_inputs_response.status)
raise Exception(
"Post inputs failed, status: " + post_inputs_response.status.description
)

print(post_inputs_response)
Output Example
status {
code: SUCCESS
description: "Ok"
req_id: "32694c6a3ef8fe3f6704502c0b053734"
}
inputs_add_jobs {
id: "66b5ca001e754111a81c4839cdabed10"
progress {
}
created_at {
seconds: 1708500170
nanos: 508992497
}
modified_at {
seconds: 1708500170
nanos: 582792601
}
extraction_jobs {
status {
code: JOB_QUEUED
description: "Job is queued to be ran."
}
id: "7e9b139f65fb4426a3d273d609758d34"
url: "s3://samples.clarifai.com/storage/"
progress {
}
created_at {
seconds: 1708500170
nanos: 550291872
}
modified_at {
seconds: 1708500170
nanos: 550291872
}
input_template {
data {
concepts {
id: "lamborghini23_A"
value: 1
}
concepts {
id: "spiderman_a"
value: 1
}
metadata {
fields {
key: "id"
value {
string_value: "id001"
}
}
}
}
dataset_ids: "dataset-1"
}
}
status {
code: JOB_COMPLETED
description: "Job successfully ran."
}
}