Skip to main content

Collectors

Capture data for your application


Collectors capture input data for your app. They enable you to pipe in data from production models automatically, and are the key to unlocking many platform training capabilities like active learning.

Collectors are available with Essential and Enterprise plans to help you manage data ingestion at scale.

You can create app-level collectors to monitor specific models and specify sampling rules for triggering data ingestion. Collectors can only collect data from apps where you are the app owner.

info

The Collector feature is currently exclusively available to our Professional and Enterprise users. Learn more here.

Collector Parameters

Collector ID

Give your collector a useful and descriptive name.

Description

Provide additional details about your collector.

Pre-queue Workflow

In many scenarios, you will only want to ingest a sample, or subset of a given data source into your app. Pre-queue workflows allow you to pre-process your inputs so that you can sample and filter your new data before it is ever added to your app. Pre-queue workflows allow you to specify sampling rules for triggering data ingestion.

Common pre-queue workflows are designed to:

  • Randomly sample inputs
  • Filter inputs by metadata
  • Filter inputs with a maximum probability below a given threshold
  • Filter inputs with a minimum probability above a given threshold
  • Filter specific concept probabilities above a given threshold
  • Undertake knowledge graph mapping from public General model concepts to a custom model

At least one (pre-queue or post-queue) workflow ID is required. The input to this workflow is going to be the OUTPUT of the model. We recommend that you use fast and light-weight models in it as it will affect the speed of the predictions being made.

Post Inputs Key

Select the API key that you would like to use to allow new inputs to be posted to your app. This is the post-queue workflow ID of the workflow to run to after the collector has processed the queued input. This API key must have the PostInputs scope, since it grants the collector the authority to POST inputs to your app.

This workflow uses the original input to the model as input to the workflow so that you can run additional models as well on that input to decide whether to queue the model or not. If the workflow output has any field that is non-empty, then it will be passed on to POST /inputs to the destination app.

At least one (pre-queue or post-queue) workflow ID is required.

Source

Select the model that you would like to collect data from, and the collector will automatically post the new inputs to your app. Simply enter your model name, or model ID number. When the user predicts an input against this model, the input is going to be collected.

You need to specify the app ID and user ID where the model is located. If using a publicly available model, the model's user and app ID should be clarifai and main, respectively. Otherwise, the IDs should belong to the user who created the model. You also need to specify an API key ID where the inputs are going to be added.

See also Auto Annotation walkthrough.

info

The initialization code used in the following examples is outlined in detail on the client installation page.

Add Collector

Add a new collector to your application.

###############################################################################
# In this section, we set the user authentication, app ID, and details of the
# collector we want to add. Change these strings to run your own example.
###############################################################################

USER_ID = 'YOUR_USER_ID_HERE'
# Your PAT (Personal Access Token) can be found in the portal under Authentification
PAT = 'YOUR_PAT_HERE'
APP_ID = 'YOUR_APP_ID_HERE'
# Change these to add your own collector
COLLECTOR_ID = 'YOUR_COLLECTOR_ID_HERE'
COLLECTOR_DESCRIPTION = 'YOUR_COLLECTOR_DESCRIPTION_HERE'
PRE_QUEUE_WORKFLOW_ID = 'YOUR_PRE_WORKFLOW_ID_HERE'
POST_QUEUE_WORKFLOW_ID = 'YOUR_POST_WORKFLOW_ID_HERE'
MODEL_ID = 'YOUR_MODEL_ID_HERE'
MODEL_VERSION_ID = 'YOUR_MODEL_VERSION_ID_HERE'
POST_INPUTS_KEY_ID = 'YOUR_API_KEY_HERE'

##########################################################################
# YOU DO NOT NEED TO CHANGE ANYTHING BELOW THIS LINE TO RUN THIS EXAMPLE
##########################################################################

from clarifai_grpc.channel.clarifai_channel import ClarifaiChannel
from clarifai_grpc.grpc.api import resources_pb2, service_pb2, service_pb2_grpc
from clarifai_grpc.grpc.api.status import status_code_pb2

channel = ClarifaiChannel.get_grpc_channel()
stub = service_pb2_grpc.V2Stub(channel)

metadata = (('authorization', 'Key ' + PAT),)

userDataObject = resources_pb2.UserAppIDSet(user_id=USER_ID, app_id=APP_ID) # The userDataObject is required when using a PAT

post_collectors_response = stub.PostCollectors(
service_pb2.PostCollectorsRequest(
user_app_id=userDataObject,
collectors=[
resources_pb2.Collector(
id=COLLECTOR_ID,
description=COLLECTOR_DESCRIPTION,
pre_queue_workflow_id=PRE_QUEUE_WORKFLOW_ID,
post_queue_workflow_id=POST_QUEUE_WORKFLOW_ID,
collector_source=resources_pb2.CollectorSource(
api_post_model_outputs_collector_source=resources_pb2.APIPostModelOutputsCollectorSource(
model_user_id=USER_ID,
model_app_id=APP_ID,
model_id=MODEL_ID,
model_version_id=MODEL_VERSION_ID,
post_inputs_key_id=POST_INPUTS_KEY_ID
)
)
)
]
),
metadata=metadata
)

if post_collectors_response.status.code != status_code_pb2.SUCCESS:
print(post_collectors_response.status)
raise Exception("Post collectors failed, status: " + post_collectors_response.status.description)

Update Collector

Update an existing collector.

###############################################################################
# In this section, we set the user authentication, app ID, and details of the
# collector we want to update. Change these strings to run your own example.
###############################################################################

USER_ID = 'YOUR_USER_ID_HERE'
# Your PAT (Personal Access Token) can be found in the portal under Authentification
PAT = 'YOUR_PAT_HERE'
APP_ID = 'YOUR_APP_ID_HERE'
# Change these to update your own collector
COLLECTOR_ID = 'YOUR_COLLECTOR_ID_HERE'
COLLECTOR_DESCRIPTION = 'YOUR_COLLECTOR_DESCRIPTION_HERE'
PRE_QUEUE_WORKFLOW_ID = 'YOUR_PRE_WORKFLOW_ID_HERE'
POST_QUEUE_WORKFLOW_ID = 'YOUR_POST_WORKFLOW_ID_HERE'
MODEL_ID = 'YOUR_MODEL_ID_HERE'
MODEL_VERSION_ID = 'YOUR_MODEL_VERSION_ID_HERE'
POST_INPUTS_KEY_ID = 'YOUR_KEY_ID_HERE'

##########################################################################
# YOU DO NOT NEED TO CHANGE ANYTHING BELOW THIS LINE TO RUN THIS EXAMPLE
##########################################################################

from clarifai_grpc.channel.clarifai_channel import ClarifaiChannel
from clarifai_grpc.grpc.api import resources_pb2, service_pb2, service_pb2_grpc
from clarifai_grpc.grpc.api.status import status_code_pb2

channel = ClarifaiChannel.get_grpc_channel()
stub = service_pb2_grpc.V2Stub(channel)

metadata = (('authorization', 'Key ' + PAT),)

userDataObject = resources_pb2.UserAppIDSet(user_id=USER_ID, app_id=APP_ID) # The userDataObject is required when using a PAT

patch_collectors_response = stub.PatchCollectors(
service_pb2.PatchCollectorsRequest(
user_app_id=userDataObject,
action = 'overwrite',
collectors=[
resources_pb2.Collector(
id=COLLECTOR_ID,
description=COLLECTOR_DESCRIPTION,
pre_queue_workflow_id=PRE_QUEUE_WORKFLOW_ID,
post_queue_workflow_id=POST_QUEUE_WORKFLOW_ID,
collector_source=resources_pb2.CollectorSource(
api_post_model_outputs_collector_source=resources_pb2.APIPostModelOutputsCollectorSource(
model_user_id=USER_ID,
model_app_id=APP_ID,
model_id=MODEL_ID,
model_version_id=MODEL_VERSION_ID,
post_inputs_key_id=POST_INPUTS_KEY_ID
)
)
)
]
),
metadata=metadata
)

if patch_collectors_response.status.code != status_code_pb2.SUCCESS:
print(patch_collectors_response.status)
raise Exception("Patch collectors failed, status: " + patch_collectors_response.status.description)

List Collectors

List all the collectors. See Pagination on how to control which page gets displayed.

##################################################################
# In this section, we set the user authentication and app ID.
# Change these strings to run your own example.
##################################################################

USER_ID = 'YOUR_USER_ID_HERE'
# Your PAT (Personal Access Token) can be found in the portal under Authentification
PAT = 'YOUR_PAT_HERE'
APP_ID = 'YOUR_APP_ID_HERE'

##########################################################################
# YOU DO NOT NEED TO CHANGE ANYTHING BELOW THIS LINE TO RUN THIS EXAMPLE
##########################################################################

from clarifai_grpc.channel.clarifai_channel import ClarifaiChannel
from clarifai_grpc.grpc.api import resources_pb2, service_pb2, service_pb2_grpc
from clarifai_grpc.grpc.api.status import status_code_pb2

channel = ClarifaiChannel.get_grpc_channel()
stub = service_pb2_grpc.V2Stub(channel)

metadata = (('authorization', 'Key ' + PAT),)

userDataObject = resources_pb2.UserAppIDSet(user_id=USER_ID, app_id=APP_ID) # The userDataObject is required when using a PAT

list_collectors_response = stub.ListCollectors(
service_pb2.ListCollectorsRequest(user_app_id=userDataObject),
metadata=metadata
)

if list_collectors_response.status.code != status_code_pb2.SUCCESS:
print(list_collectors_response.status)
raise Exception("List collectors failed, status: " + list_collectors_response.status.description)

for collector in list_collectors_response.collectors:
print(collector)

#print(list_collectors_response.collectors)

Get Collector

Return details of a certain collector.

#####################################################################################
# In this section, we set the user authentication, app ID, and ID of the collector
# we want its details. Change these strings to run your own example.
#####################################################################################

USER_ID = 'YOUR_USER_ID_HERE'
# Your PAT (Personal Access Token) can be found in the portal under Authentification
PAT = 'YOUR_PAT_HERE'
APP_ID = 'YOUR_APP_ID_HERE'
# Change this to get your own collector
COLLECTOR_ID = 'YOUR_COLLECTOR_ID_HERE'

##########################################################################
# YOU DO NOT NEED TO CHANGE ANYTHING BELOW THIS LINE TO RUN THIS EXAMPLE
##########################################################################

from clarifai_grpc.channel.clarifai_channel import ClarifaiChannel
from clarifai_grpc.grpc.api import resources_pb2, service_pb2, service_pb2_grpc
from clarifai_grpc.grpc.api.status import status_code_pb2

channel = ClarifaiChannel.get_grpc_channel()
stub = service_pb2_grpc.V2Stub(channel)

metadata = (('authorization', 'Key ' + PAT),)

userDataObject = resources_pb2.UserAppIDSet(user_id=USER_ID, app_id=APP_ID) # The userDataObject is required when using a PAT

get_collector_response = stub.GetCollector(
service_pb2.GetCollectorRequest(
user_app_id=userDataObject,
collector_id=COLLECTOR_ID
),
metadata=metadata
)

if get_collector_response.status.code != status_code_pb2.SUCCESS:
print(get_collector_response.status)
raise Exception("Get collector failed, status: " + get_collector_response.status.description)

print(get_collector_response.collector)

Delete Collector

Delete a collector.

######################################################################################
# In this section, we set the user authentication, app ID, and IDs of the collectors
# we want to delete. Change these strings to run your own example.
######################################################################################

USER_ID = 'YOUR_USER_ID_HERE'
# Your PAT (Personal Access Token) can be found in the portal under Authentification
PAT = 'YOUR_PAT_HERE'
APP_ID = 'YOUR_APP_ID_HERE'
# Change these to delete your own collectors
COLLECTOR_ID_1 = 'YOUR_COLLECTOR_ID_HERE'
COLLECTOR_ID_2 = 'YOUR_COLLECTOR_ID_HERE'

##########################################################################
# YOU DO NOT NEED TO CHANGE ANYTHING BELOW THIS LINE TO RUN THIS EXAMPLE
##########################################################################

from clarifai_grpc.channel.clarifai_channel import ClarifaiChannel
from clarifai_grpc.grpc.api import resources_pb2, service_pb2, service_pb2_grpc
from clarifai_grpc.grpc.api.status import status_code_pb2

channel = ClarifaiChannel.get_grpc_channel()
stub = service_pb2_grpc.V2Stub(channel)

metadata = (('authorization', 'Key ' + PAT),)

userDataObject = resources_pb2.UserAppIDSet(user_id=USER_ID, app_id=APP_ID)

delete_collectors_response = stub.DeleteCollectors(
service_pb2.DeleteCollectorsRequest(
user_app_id=userDataObject,
ids=[COLLECTOR_ID_1, COLLECTOR_ID_2],
#delete_all=True #Uncomment to delete all your collectors
),
metadata=metadata
)

if delete_collectors_response.status.code != status_code_pb2.SUCCESS:
print(delete_collectors_response.status)
raise Exception("Delete collectors failed, status: " + delete_collectors_response.status.description)