Skip to main content

Collectors

Capture inputs used for making predictions in your app


Collector is an ingenious feature that allows you to capture the inputs used for making predictions. After creating a collector, which includes specifying the caller user ID and the source model, a triggering process is established.

This process ensures that whenever the stated user makes a prediction using the specified model, the inputs used in generating the predictions are automatically ingested and stored in your app.

With collectors, you can automatically pipe in data from production models, gather inputs to feed your models with real-world training data, and unlock many platform training capabilities.

tip

You can learn how to create collectors via the UI here.

feature availability

The Collector feature is currently exclusively available to our Professional and Enterprise users. Learn more here.

Collector Parameters

Let's talk about the parameters required to create a collector via the API.

Collector ID

Give your collector a useful and descriptive name.

Description

Provide additional details about your collector.

Pre-queue Workflow

In many scenarios, you will only want to ingest a sample, or subset of a given data source into your app. Pre-queue workflows allow you to pre-process your inputs so that you can sample and filter your new data before it is ever added to your app. Pre-queue workflows allow you to specify sampling rules for triggering data ingestion.

Common pre-queue workflows are designed to:

  • Randomly sample inputs
  • Filter inputs by metadata
  • Filter inputs with a maximum probability below a given threshold
  • Filter inputs with a minimum probability above a given threshold
  • Filter specific concept probabilities above a given threshold
  • Undertake knowledge graph mapping from public General model concepts to a custom model
note

At least one (pre-queue or post-queue) workflow ID is required.

Post-queue Workflow​

This is the workflow to run after the collector has processed the queued input. This workflow uses the original input to the model as input to the workflow so that you can run additional models as well on that input to decide whether to queue the model or not.

Source

These are the details of the source model from which you want to collect data. The collector will automatically post the inputs utilized by the caller for making predictions using the specified model into your app.

Post Inputs Key

This is the PAT or the API key to use to enable inputs to be posted to your app. This key must have the PostInputs scope, since it grants the collector the authority to POST inputs to your app.

It should also have the permissions to access the source model used for making the predictions.

Caller User ID

This is the ID of the caller who will be making the prediction requests. You can even provide your own user ID.

info

The initialization code used in the following examples is outlined in detail on the client installation page.

Add Collector

Here is how to add a new collector to your application.

###############################################################################
# In this section, we set the user authentication, app ID, and details of the
# collector we want to add. Change these strings to run your own example.
###############################################################################

USER_ID = 'YOUR_USER_ID_HERE'
# Your PAT (Personal Access Token) can be found in the Account's Security section
PAT = 'YOUR_PAT_HERE'
APP_ID = 'YOUR_APP_ID_HERE'
# Change these to add your own collector
COLLECTOR_ID = 'YOUR_COLLECTOR_ID_HERE'
COLLECTOR_DESCRIPTION = 'YOUR_COLLECTOR_DESCRIPTION_HERE'
PRE_QUEUE_WORKFLOW_ID = 'YOUR_PRE_WORKFLOW_ID_HERE'
POST_QUEUE_WORKFLOW_ID = 'YOUR_POST_WORKFLOW_ID_HERE'
MODEL_ID = 'YOUR_MODEL_ID_HERE'
MODEL_VERSION_ID = 'YOUR_MODEL_VERSION_ID_HERE'
POST_INPUTS_KEY_ID = 'YOUR_API_KEY_HERE'
CALLER_USER_ID = 'YOUR_CALLER_USER_ID_HERE'

##########################################################################
# YOU DO NOT NEED TO CHANGE ANYTHING BELOW THIS LINE TO RUN THIS EXAMPLE
##########################################################################

from clarifai_grpc.channel.clarifai_channel import ClarifaiChannel
from clarifai_grpc.grpc.api import resources_pb2, service_pb2, service_pb2_grpc
from clarifai_grpc.grpc.api.status import status_code_pb2

channel = ClarifaiChannel.get_grpc_channel()
stub = service_pb2_grpc.V2Stub(channel)

metadata = (('authorization', 'Key ' + PAT),)

userDataObject = resources_pb2.UserAppIDSet(user_id=USER_ID, app_id=APP_ID) # The userDataObject is required when using a PAT

post_collectors_response = stub.PostCollectors(
service_pb2.PostCollectorsRequest(
user_app_id=userDataObject,
collectors=[
resources_pb2.Collector(
id=COLLECTOR_ID,
description=COLLECTOR_DESCRIPTION,
pre_queue_workflow_id=PRE_QUEUE_WORKFLOW_ID,
post_queue_workflow_id=POST_QUEUE_WORKFLOW_ID,
collector_source=resources_pb2.CollectorSource(
api_post_model_outputs_collector_source=resources_pb2.APIPostModelOutputsCollectorSource(
model_user_id=USER_ID,
model_app_id=APP_ID,
model_id=MODEL_ID,
model_version_id=MODEL_VERSION_ID,
post_inputs_key_id=POST_INPUTS_KEY_ID,
caller_user_id=CALLER_USER_ID
)
)
)
]
),
metadata=metadata
)

if post_collectors_response.status.code != status_code_pb2.SUCCESS:
print(post_collectors_response.status)
raise Exception("Post collectors failed, status: " + post_collectors_response.status.description)

Update Collector

You can update an existing collector.

###############################################################################
# In this section, we set the user authentication, app ID, and details of the
# collector we want to update. Change these strings to run your own example.
###############################################################################

USER_ID = 'YOUR_USER_ID_HERE'
# Your PAT (Personal Access Token) can be found in the Account's Security section
PAT = 'YOUR_PAT_HERE'
APP_ID = 'YOUR_APP_ID_HERE'
# Change these to update your own collector
COLLECTOR_ID = 'YOUR_COLLECTOR_ID_HERE'
COLLECTOR_DESCRIPTION = 'YOUR_COLLECTOR_DESCRIPTION_HERE'
PRE_QUEUE_WORKFLOW_ID = 'YOUR_PRE_WORKFLOW_ID_HERE'
POST_QUEUE_WORKFLOW_ID = 'YOUR_POST_WORKFLOW_ID_HERE'
MODEL_ID = 'YOUR_MODEL_ID_HERE'
MODEL_VERSION_ID = 'YOUR_MODEL_VERSION_ID_HERE'
POST_INPUTS_KEY_ID = 'YOUR_KEY_ID_HERE'
CALLER_USER_ID = 'YOUR_CALLER_USER_ID_HERE'

##########################################################################
# YOU DO NOT NEED TO CHANGE ANYTHING BELOW THIS LINE TO RUN THIS EXAMPLE
##########################################################################

from clarifai_grpc.channel.clarifai_channel import ClarifaiChannel
from clarifai_grpc.grpc.api import resources_pb2, service_pb2, service_pb2_grpc
from clarifai_grpc.grpc.api.status import status_code_pb2

channel = ClarifaiChannel.get_grpc_channel()
stub = service_pb2_grpc.V2Stub(channel)

metadata = (('authorization', 'Key ' + PAT),)

userDataObject = resources_pb2.UserAppIDSet(user_id=USER_ID, app_id=APP_ID) # The userDataObject is required when using a PAT

patch_collectors_response = stub.PatchCollectors(
service_pb2.PatchCollectorsRequest(
user_app_id=userDataObject,
action = 'overwrite',
collectors=[
resources_pb2.Collector(
id=COLLECTOR_ID,
description=COLLECTOR_DESCRIPTION,
pre_queue_workflow_id=PRE_QUEUE_WORKFLOW_ID,
post_queue_workflow_id=POST_QUEUE_WORKFLOW_ID,
collector_source=resources_pb2.CollectorSource(
api_post_model_outputs_collector_source=resources_pb2.APIPostModelOutputsCollectorSource(
model_user_id=USER_ID,
model_app_id=APP_ID,
model_id=MODEL_ID,
model_version_id=MODEL_VERSION_ID,
post_inputs_key_id=POST_INPUTS_KEY_ID,
caller_user_id=CALLER_USER_ID
)
)
)
]
),
metadata=metadata
)

if patch_collectors_response.status.code != status_code_pb2.SUCCESS:
print(patch_collectors_response.status)
raise Exception("Patch collectors failed, status: " + patch_collectors_response.status.description)

List Collectors

You can list all the collectors in your app. Click here to learn how to control the page that gets displayed.

##################################################################
# In this section, we set the user authentication and app ID.
# Change these strings to run your own example.
##################################################################

USER_ID = 'YOUR_USER_ID_HERE'
# Your PAT (Personal Access Token) can be found in the Account's Security section
PAT = 'YOUR_PAT_HERE'
APP_ID = 'YOUR_APP_ID_HERE'

##########################################################################
# YOU DO NOT NEED TO CHANGE ANYTHING BELOW THIS LINE TO RUN THIS EXAMPLE
##########################################################################

from clarifai_grpc.channel.clarifai_channel import ClarifaiChannel
from clarifai_grpc.grpc.api import resources_pb2, service_pb2, service_pb2_grpc
from clarifai_grpc.grpc.api.status import status_code_pb2

channel = ClarifaiChannel.get_grpc_channel()
stub = service_pb2_grpc.V2Stub(channel)

metadata = (('authorization', 'Key ' + PAT),)

userDataObject = resources_pb2.UserAppIDSet(user_id=USER_ID, app_id=APP_ID) # The userDataObject is required when using a PAT

list_collectors_response = stub.ListCollectors(
service_pb2.ListCollectorsRequest(user_app_id=userDataObject),
metadata=metadata
)

if list_collectors_response.status.code != status_code_pb2.SUCCESS:
print(list_collectors_response.status)
raise Exception("List collectors failed, status: " + list_collectors_response.status.description)

for collector in list_collectors_response.collectors:
print(collector)

#print(list_collectors_response.collectors)

Get Collector

You can return the details of a certain collector.

#####################################################################################
# In this section, we set the user authentication, app ID, and ID of the collector
# we want its details. Change these strings to run your own example.
#####################################################################################

USER_ID = 'YOUR_USER_ID_HERE'
# Your PAT (Personal Access Token) can be found in the Account's Security section
PAT = 'YOUR_PAT_HERE'
APP_ID = 'YOUR_APP_ID_HERE'
# Change this to get your own collector
COLLECTOR_ID = 'YOUR_COLLECTOR_ID_HERE'

##########################################################################
# YOU DO NOT NEED TO CHANGE ANYTHING BELOW THIS LINE TO RUN THIS EXAMPLE
##########################################################################

from clarifai_grpc.channel.clarifai_channel import ClarifaiChannel
from clarifai_grpc.grpc.api import resources_pb2, service_pb2, service_pb2_grpc
from clarifai_grpc.grpc.api.status import status_code_pb2

channel = ClarifaiChannel.get_grpc_channel()
stub = service_pb2_grpc.V2Stub(channel)

metadata = (('authorization', 'Key ' + PAT),)

userDataObject = resources_pb2.UserAppIDSet(user_id=USER_ID, app_id=APP_ID) # The userDataObject is required when using a PAT

get_collector_response = stub.GetCollector(
service_pb2.GetCollectorRequest(
user_app_id=userDataObject,
collector_id=COLLECTOR_ID
),
metadata=metadata
)

if get_collector_response.status.code != status_code_pb2.SUCCESS:
print(get_collector_response.status)
raise Exception("Get collector failed, status: " + get_collector_response.status.description)

print(get_collector_response.collector)

Delete Collector

You can delete a collector.

######################################################################################
# In this section, we set the user authentication, app ID, and IDs of the collectors
# we want to delete. Change these strings to run your own example.
######################################################################################

USER_ID = 'YOUR_USER_ID_HERE'
# Your PAT (Personal Access Token) can be found in the Account's Security section
PAT = 'YOUR_PAT_HERE'
APP_ID = 'YOUR_APP_ID_HERE'
# Change these to delete your own collectors
COLLECTOR_ID_1 = 'YOUR_COLLECTOR_ID_HERE'
COLLECTOR_ID_2 = 'YOUR_COLLECTOR_ID_HERE'

##########################################################################
# YOU DO NOT NEED TO CHANGE ANYTHING BELOW THIS LINE TO RUN THIS EXAMPLE
##########################################################################

from clarifai_grpc.channel.clarifai_channel import ClarifaiChannel
from clarifai_grpc.grpc.api import resources_pb2, service_pb2, service_pb2_grpc
from clarifai_grpc.grpc.api.status import status_code_pb2

channel = ClarifaiChannel.get_grpc_channel()
stub = service_pb2_grpc.V2Stub(channel)

metadata = (('authorization', 'Key ' + PAT),)

userDataObject = resources_pb2.UserAppIDSet(user_id=USER_ID, app_id=APP_ID)

delete_collectors_response = stub.DeleteCollectors(
service_pb2.DeleteCollectorsRequest(
user_app_id=userDataObject,
ids=[COLLECTOR_ID_1, COLLECTOR_ID_2],
#delete_all=True #Uncomment to delete all your collectors
),
metadata=metadata
)

if delete_collectors_response.status.code != status_code_pb2.SUCCESS:
print(delete_collectors_response.status)
raise Exception("Delete collectors failed, status: " + delete_collectors_response.status.description)