Upload Custom Models
Import custom models, including from external sources like Hugging Face and OpenAI
The Clarifai Python SDK allow you to upload custom models easily. Whether you're working with a pre-trained model from an external source like Hugging Face or OpenAI, or one you've built from scratch, Clarifai allows seamless integration of your models, enabling you to take advantage of the platform’s powerful capabilities.
Once imported to our platform, your model can be utilized alongside Clarifai's vast suite of AI tools. It will be automatically deployed and ready to be evaluated, combined with other models and agent operators in a workflow, or used to serve inference requests as it is.
Let’s demonstrate how you can successfully upload different types of models to the Clarifai platform.
For the Compute Orchestration Public Preview, deployment is only supported for models that users have uploaded to our platform via the Python SDK. We plan to expand this functionality to include out-of-the-box and custom-trained models on our platform in the future.
-
This new feature is in Public Preview. If you'd like to test it out and provide feedback, please request access here.
-
This new upload experience is compatible with the latest
clarifai
Python package, starting from version 10.9.2. -
If you prefer the previous upload method, which is supported up to version 10.8.4, you can refer to the documentation here.
You can run the following command to clone the repository containing examples of how to upload various model types and follow along with this documentation:
git clone https://github.com/Clarifai/examples.git
. After cloning it, go to the models/model_upload
folder.
Prerequisites
Installation
To begin, install the latest version of the clarifai
Python package.
pip install --upgrade clarifai
Environment Set Up
Before proceeding, ensure that the CLARIFAI_PAT
(Personal Access Token) environment variable is set. You can generate the PAT key in your Personal Settings page by navigating to the Security section.
This token is essential for authenticating your connection to the Clarifai platform.
export CLARIFAI_PAT=YOUR_PERSONAL_ACCESS_TOKEN_HERE
Create Project Directory
Create a project directory and organize your files as indicated below to fit the requirements of uploading models to the Clarifai platform.
your_model_directory/
├── 1/
│ └── model.py
├── requirements.txt
└── config.yaml
- your_model_directory/ – The main directory containing your model files.
- 1/ – A subdirectory that holds the model file (Note that the folder is named as 1).
- model.py – Contains the code that defines your model, including loading the model and running inference.
- requirements.txt – Lists the Python libraries and dependencies required to run your model.
- config.yaml – Contains model metadata and configuration details necessary for building the Docker image, defining compute resources, and uploading the model to Clarifai.
- 1/ – A subdirectory that holds the model file (Note that the folder is named as 1).
How to Upload a Model
Let's talk about the common steps you'd follow to upload any type of model to the Clarifai platform.
Step 1: Define the config.yaml
File
The config.yaml
file is essential for specifying the model’s metadata, compute resource requirements, and model checkpoints.
Here’s a breakdown of the key sections in the file.
Model Info
This section defines your model ID, Clarifai user ID, and Clarifai app ID, which will determine where the model is uploaded on the Clarifai platform.
model:
id: "model_id"
user_id: "user_id"
app_id: "app_id"
model_type_id: "text-to-text" # Change this based on your model type (e.g., image-classifier, text-to-text)
Compute Resources
Here, you define the minimum compute resources required for running your model, including CPU, memory, and optional GPU specifications.
inference_compute_info:
cpu_limit: "2"
cpu_memory: "13Gi"
num_accelerators: 1
accelerator_type: ["NVIDIA-A10G"] # Specify the GPU type if needed
accelerator_memory: "15Gi"
cpu_limit
– Number of CPUs allocated for the model (follows Kubernetes notation, e.g., "1", "2").cpu_memory
– Minimum memory required for the CPU (uses Kubernetes notation, e.g., "1Gi", "1500Mi", "3Gi").num_accelerators
– Number of GPUs or TPUs to use for inference.accelerator_type
– Specifies the type of accelerators (e.g., GPU or TPU) supported by the model (e.g., "NVIDIA-A10G").accelerator_memory
– Minimum memory required for the GPU or TPU.
Hugging Face Model Checkpoints
If you're using a model from Hugging Face, you can automatically download its checkpoints by specifying the appropriate configuration in this section. For private or restricted Hugging Face repositories, include an access token.
checkpoints:
type: "huggingface"
repo_id: "meta-llama/Meta-Llama-3-8B-Instruct"
hf_token: "your_hf_token" # Required for private models
Model Concepts or Labels
This section is required if your model outputs concepts or labels and is not being directly loaded from Hugging Face.
For models that output concepts or labels, such as classification or detection models, you must define a concepts
section in the config.yaml
file:
concepts:
- id: '0'
name: bus
- id: '1'
name: person
- id: '2'
name: bicycle
- id: '3'
name: car
If you're using a model from Hugging Face and the checkpoints
section is defined, the Clarifai platform will automatically infer concepts. In this case, you don’t need to manually specify them.
Step 2: Define Dependencies in requirements.txt
The requirements.txt
file lists all the Python dependencies your model needs. This ensures that the necessary libraries are installed in the runtime environment.
Step 3: Prepare the model.py
File
The model.py
file contains the logic for your model, including how it loads and handles predictions. This file must implement a class that inherits from ModelRunner
and defines the following methods, where applicable:
load_model()
– Initializes and loads the model, preparing it for inference.predict(input_data)
– Handles the core logic for making predictions. It processes the input data and returns the output response.generate(input_data)
– Provides output in a streaming manner, if applicable to the model's use case.stream(input_data)
– Manages both streaming input and output, primarily for more advanced use cases where data is processed continuously.
from clarifai.runners.models.model_class import ModelClass
class YourCustomModel(ModelClass):
def load_model(self):
# Initialize and load the model here
pass
def predict(self, request):
# Handle input and return the model's predictions
return output_data
def generate(self, request):
# Handle streaming output (if applicable)
pass
def stream(self, request):
# Handle both streaming input and output
pass
Step 4: Test the Model Locally
Before uploading your model to the Clarifai platform, it's important to test it locally to catch any typos or misconfigurations in the code.
This can prevent upload failures due to issues in the model.py
or incorrect model implementation. It also ensures the model runs smoothly and that all dependencies are correctly configured.
You can test the model within a Docker container or a Python virtual environment.
If Docker is installed on your system, it is highly recommended to use it for testing or running the model. Docker provides better isolation and avoids dependency conflicts.
Ensure your local environment has sufficient memory and compute resources to load and run the model for testing.
There are two types of CLI (command line interface) commands you can use to test your models in your local development environment. You can learn more about the Clarifai CLI tool here.
1. Using the test-locally
Command
This method allows you to test your model with a single CLI command. It runs the model locally and sends a sample request to verify that the model responds successfully. The results of the request are displayed directly in the console.
Here is how to test a model in a Docker Container:
clarifai model test-locally --model_path {add_model_path_here} --mode container
Here is how to test a model in a virtual environment:
clarifai model test-locally --model_path {add_model_path_here} --mode env
2. Using the run-locally
Command
This method starts a local gRPC server at https://localhost:{port}/
for running the model. Once the server is running, you can perform inference on the model via the Clarifai client SDK.
Here is how to test a model in a Docker Container:
clarifai model run-locally --model_path {add_model_path_here} --mode container --port 8000
Here is how to test a model in a virtual environment:
clarifai model run-locally --model_path {add_model_path_here} --mode container --port 8000
Once the model is running locally, you need to configure the CLARIFAI_API_BASE
environment variable to point to the localhost and port where the gRPC server is running.
export CLARIFAI_API_BASE="localhost:{port}"
You can then make different types of inference requests using the model — unary-unary, unary-stream, or stream-stream predict calls.
Here is an example of a unary-unary prediction call:
- Python
from clarifai.client.model import Model
model = Model(model_id='model_id', user_id='user_id', app_id='app_id') # no need to provide any actual values of `model_id`, `user_id` and `app_id`
image_url = "https://samples.clarifai.com/metro-north.jpg"
# Model Predict
model_prediction = model.predict_by_url(image_url,)
These are the key CLI flags available for local testing and running your models:
-
--model_path
— Path to the model directory. -
--mode
— Specify how to run the model:env
for virtual environment orcontainer
for Docker container. Defaults toenv
. -
-p
or--port
— The port to host the gRPC server for running the model locally. Defaults to8000
. -
--keep_env
— Retain the virtual environment after testing the model locally (applicable forenv
mode). Defaults toFalse
. -
--keep_image
— Retain the Docker image built after testing the model locally (applicable forcontainer
mode). Defaults toFalse
.
Step 5: Upload the Model to Clarifai
Once your model is ready, upload it to the Clarifai platform by running the following command:
clarifai model upload --model_path {add_model_path_here}
This command builds the model’s Docker image using the defined compute resources and uploads it to Clarifai, where it can be served in production.
Examples
You can find various model upload examples here, which demonstrate different use cases and optimizations.
Image Classifier
model.py
- Python
# Model to be uploaded: https://huggingface.co/Falconsai/nsfw_image_detection
import os
import tempfile
from io import BytesIO
from typing import Iterator
import cv2
import torch
from clarifai.runners.models.model_class import ModelClass
from clarifai.utils.logging import logger
from clarifai_grpc.grpc.api import resources_pb2, service_pb2
from clarifai_grpc.grpc.api.status import status_code_pb2, status_pb2
from PIL import Image
from transformers import AutoModelForImageClassification, ViTImageProcessor
def preprocess_image(image_bytes):
"""Fetch and preprocess image data from bytes"""
return Image.open(BytesIO(image_bytes)).convert("RGB")
def video_to_frames(video_bytes):
"""Convert video bytes to frames"""
# Write video bytes to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_video_file:
temp_video_file.write(video_bytes)
temp_video_path = temp_video_file.name
logger.info(f"temp_video_path: {temp_video_path}")
video = cv2.VideoCapture(temp_video_path)
print("video opened")
logger.info(f"video opened: {video.isOpened()}")
while video.isOpened():
ret, frame = video.read()
if not ret:
break
# Convert the frame to byte format
frame_bytes = cv2.imencode('.jpg', frame)[1].tobytes()
yield frame_bytes
video.release()
def classify_image(images, model, processor, device):
"""Classify an image using the model and processor."""
inputs = processor(images=images, return_tensors="pt")
inputs = {name: tensor.to(device) for name, tensor in inputs.items()}
logits = model(**inputs).logits
return logits
def process_concepts(logits, images, concept_protos):
"""Process the logits and return the concepts."""
outputs = []
for i, logit in enumerate(logits):
output_concepts = []
probs = torch.softmax(logit, dim=-1)
sorted_indices = torch.argsort(probs, dim=-1, descending=True)
for idx in sorted_indices:
concept_protos[idx.item()].value = probs[idx].item()
output_concepts.append(concept_protos[idx.item()])
output = resources_pb2.Output()
output.data.image.base64 = images[i].tobytes()
output.data.concepts.extend(output_concepts)
output.status.code = status_code_pb2.SUCCESS
outputs.append(output)
return outputs
class MyModel(ModelClass):
"""A custom runner that loads the model and classifies images using it.
"""
def load_model(self):
"""Load the model here."""
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
logger.info(f"Running on device: {self.device}")
# if checkpoints section is in config.yaml file then checkpoints will be downloaded at this path during model upload time.
checkpoint_path = os.path.join(os.path.dirname(__file__), "checkpoints")
self.model = AutoModelForImageClassification.from_pretrained(checkpoint_path,).to(self.device)
self.processor = ViTImageProcessor.from_pretrained(checkpoint_path)
logger.info("Done loading!")
def predict(self, request: service_pb2.PostModelOutputsRequest
) -> Iterator[service_pb2.MultiOutputResponse]:
"""This is the method that will be called when the runner is run. It takes in an input and
returns an output.
"""
outputs = []
images = []
concept_protos = request.model.model_version.output_info.data.concepts
for input in request.inputs:
input_data = input.data
image = preprocess_image(image_bytes=input_data.image.base64)
images.append(image)
with torch.no_grad():
logits = classify_image(images, self.model, self.processor, self.device)
outputs = process_concepts(logits, images, concept_protos)
return service_pb2.MultiOutputResponse(
outputs=outputs, status=status_pb2.Status(code=status_code_pb2.SUCCESS))
def generate(self, request: service_pb2.PostModelOutputsRequest
) -> Iterator[service_pb2.MultiOutputResponse]:
if len(request.inputs) != 1:
raise ValueError("Only one input is allowed for image models for this method.")
concept_protos = request.model.model_version.output_info.data.concepts
for input in request.inputs:
input_data = input.data
video_bytes = None
if input_data.video.base64:
video_bytes = input_data.video.base64
if video_bytes:
frame_generator = video_to_frames(video_bytes)
for frame in frame_generator:
image = preprocess_image(frame)
images = [image]
with torch.no_grad():
logits = classify_image(images, self.model, self.processor, self.device)
outputs = process_concepts(logits, images, concept_protos)
yield service_pb2.MultiOutputResponse(
outputs=outputs, status=status_pb2.Status(code=status_code_pb2.SUCCESS))
else:
raise ValueError("Only video input is allowed for this method.")
def stream(self, request_iterator: Iterator[service_pb2.PostModelOutputsRequest]
) -> Iterator[service_pb2.MultiOutputResponse]:
for request in request_iterator:
if request.inputs[0].data.video.base64:
for output in self.generate(request):
yield output
elif request.inputs[0].data.image.base64:
yield self.predict(request)
requirements.txt
- Text
torch==2.5.1
tokenizers>=0.21.0
transformers>=4.47.0
pillow==10.4.0
requests==2.32.3
timm==1.0.12
opencv-python-headless==4.10.0.84
numpy
aiohttp
clarifai
config.yaml
- YAML
# This is the sample config file for the image-classifier model.
model:
id: "model_id"
user_id: "user_id"
app_id: "app_id"
model_type_id: "visual-classifier"
build_info:
python_version: "3.11"
inference_compute_info:
cpu_limit: "2"
cpu_memory: "2Gi"
num_accelerators: 1
accelerator_type: ["NVIDIA-A10G"]
accelerator_memory: "3Gi"
checkpoints:
type: "huggingface"
repo_id: "Falconsai/nsfw_image_detection"
hf_token: "hf_token"
Image Detector
model.py
- Python
# Model to be uploaded: https://huggingface.co/facebook/detr-resnet-50
import os
import tempfile
from io import BytesIO
from typing import Iterator
import time
import cv2
import torch
from clarifai.runners.models.model_class import ModelClass
from clarifai.utils.logging import logger
from clarifai_grpc.grpc.api import resources_pb2, service_pb2
from clarifai_grpc.grpc.api.status import status_code_pb2, status_pb2
from PIL import Image
from transformers import DetrForObjectDetection, DetrImageProcessor
import os
import tempfile
from io import BytesIO
from typing import Iterator
import cv2
import torch
from clarifai.utils.logging import logger
from clarifai_grpc.grpc.api import resources_pb2, service_pb2
from clarifai_grpc.grpc.api.status import status_code_pb2, status_pb2
from PIL import Image
from transformers import DetrForObjectDetection, DetrImageProcessor
def preprocess_image(image_bytes):
"""Fetch and preprocess image data from bytes"""
return Image.open(BytesIO(image_bytes)).convert("RGB")
def video_to_frames(video_bytes):
"""Convert video bytes to frames"""
# Write video bytes to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_video_file:
temp_video_file.write(video_bytes)
temp_video_path = temp_video_file.name
logger.info(f"temp_video_path: {temp_video_path}")
video = cv2.VideoCapture(temp_video_path)
print("video opened")
logger.info(f"video opened: {video.isOpened()}")
while video.isOpened():
ret, frame = video.read()
if not ret:
break
# Convert the frame to byte format
frame_bytes = cv2.imencode('.jpg', frame)[1].tobytes()
yield frame_bytes
video.release()
def detect_objects(images, model, processor, device):
model_inputs = processor(images=images, return_tensors="pt").to(device)
model_inputs = {name: tensor.to(device) for name, tensor in model_inputs.items()}
model_output = model(**model_inputs)
results = processor.post_process_object_detection(model_output)
return results
def process_bounding_boxes(results, images, concept_protos, threshold, model_labels):
outputs = []
for i, result in enumerate(results):
image = images[i]
width, height = image.size
output_regions = []
for score, label_idx, box in zip(result["scores"], result["labels"], result["boxes"]):
if score > threshold:
xmin, ymin, xmax, ymax = box
xmin, ymin, xmax, ymax = xmin, ymin, xmax, ymax
output_region = resources_pb2.Region(region_info=resources_pb2.RegionInfo(
bounding_box=resources_pb2.BoundingBox(
top_row=ymin,
left_col=xmin,
bottom_row=ymax,
right_col=xmax,
),))
label = model_labels[label_idx.item()]
concept_protos[label_idx.item()].value = score.item()
concept_protos[label_idx.item()].name = label
output_region.data.concepts.append(concept_protos[label_idx.item()])
output_regions.append(output_region)
output = resources_pb2.Output()
output.data.regions.extend(output_regions)
output.status.code = status_code_pb2.SUCCESS
outputs.append(output)
return outputs
class MyModel(ModelClass):
"""A custom runner that adds "Hello World" to the end of the text and replaces the domain of the
image URL as an example.
"""
def load_model(self):
"""Load the model here."""
checkpoint_path = os.path.join(os.path.dirname(__file__), "checkpoints")
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
logger.info(f"Running on device: {self.device}")
self.model = DetrForObjectDetection.from_pretrained(checkpoint_path,).to(self.device)
self.processor = DetrImageProcessor.from_pretrained(checkpoint_path,)
self.model.eval()
self.threshold = 0.9
self.model_labels = self.model.config.id2label
self.concept_protos = None
logger.info("Done loading!")
def predict(self, request: service_pb2.PostModelOutputsRequest
) -> Iterator[service_pb2.MultiOutputResponse]:
"""This is the method that will be called when the runner is run. It takes in an input and
returns an output.
"""
outputs = []
images = []
if not self.concept_protos:
self.concept_protos = request.model.model_version.output_info.data.concepts
for input in request.inputs:
input_data = input.data
image_bytes = input_data.image.base64
image = preprocess_image(image_bytes=image_bytes)
images.append(image)
with torch.no_grad():
results = detect_objects(images, self.model, self.processor, self.device)
# convert outputs (bounding boxes and class logits) to COCO API
# let's only keep detections with score > 0.7 (You can set it to any other value)
outputs = process_bounding_boxes(results, images, self.concept_protos, self.threshold,
self.model_labels)
for oi, out in enumerate(outputs):
out.input.id = request.inputs[oi].id
return service_pb2.MultiOutputResponse(
outputs=outputs, status=status_pb2.Status(code=status_code_pb2.SUCCESS))
def generate(self, request: service_pb2.PostModelOutputsRequest
) -> Iterator[service_pb2.MultiOutputResponse]:
if len(request.inputs) != 1:
raise ValueError("Only one input is allowed for image models for this method.")
if not self.concept_protos:
self.concept_protos = request.model.model_version.output_info.data.concepts
for input in request.inputs:
input_data = input.data
video_bytes = None
if input_data.video.base64:
video_bytes = input_data.video.base64
if video_bytes:
frame_generator = video_to_frames(video_bytes)
for frame in frame_generator:
image = preprocess_image(frame)
images = [image]
with torch.no_grad():
results = detect_objects(images, self.model, self.processor, self.device)
outputs = process_bounding_boxes(results, images, self.concept_protos, self.threshold,
self.model_labels)
for out in outputs:
out.input.id = input_id
yield service_pb2.MultiOutputResponse(
outputs=outputs, status=status_pb2.Status(code=status_code_pb2.SUCCESS))
else:
raise ValueError("Only video input is allowed for this method.")
def stream(self, request_iterator: Iterator[service_pb2.PostModelOutputsRequest]
) -> Iterator[service_pb2.MultiOutputResponse]:
last_t = time.time()
for request in request_iterator:
if request.inputs[0].data.video.base64:
for output in self.generate(request):
yield output
elif request.inputs[0].data.image.base64:
yield self.predict(request)
duration = time.time() - last_t
logger.info(f"Time taken for one frame: {duration}")
last_t = time.time()
requirements.txt
- Text
torch==2.5.1
tokenizers>=0.21.0
transformers>=4.47.0
pillow==10.4.0
requests==2.32.3
timm==1.0.12
opencv-python-headless==4.10.0.84
numpy
aiohttp
clarifai
config.yaml
- YAML
# This is the sample config file for the image-detection model.
model:
id: "model_id"
user_id: "user_id"
app_id: "app_id"
model_type_id: "visual-detector"
build_info:
python_version: "3.11"
inference_compute_info:
cpu_limit: "4"
cpu_memory: "2Gi"
num_accelerators: 1
accelerator_type: ["NVIDIA-A10G"]
accelerator_memory: "5Gi"
checkpoints:
type: "huggingface"
repo_id: "facebook/detr-resnet-50"
hf_token: "hf_token"
Large Language Models (LLMs)
model.py
- Python
# Model to be uploaded: https://huggingface.co/casperhansen/llama-3-8b-instruct-awq
import os
from threading import Thread
from typing import Iterator, List, Optional
import torch
from clarifai.runners.models.model_class import ModelClass
from clarifai.utils.logging import logger
from clarifai_grpc.grpc.api import resources_pb2, service_pb2
from clarifai_grpc.grpc.api.status import status_code_pb2, status_pb2
from google.protobuf import json_format
from transformers import (AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer)
# Custom streamer for batched text generation
class BatchTextIteratorStreamer(TextIteratorStreamer):
"""A custom streamer that handles batched text generation."""
def __init__(self,
batch_size: int,
tokenizer: "AutoTokenizer",
skip_prompt: bool = False,
timeout: Optional[float] = None,
**decode_kwargs):
super().__init__(tokenizer, skip_prompt, timeout, **decode_kwargs)
self.batch_size = batch_size
self.token_cache = [[] for _ in range(batch_size)]
self.print_len = [0 for _ in range(batch_size)]
self.generate_exception = None
def put(self, value):
if len(value.shape) != 2:
value = torch.reshape(value, (self.batch_size, value.shape[0] // self.batch_size))
if self.skip_prompt and self.next_tokens_are_prompt:
self.next_tokens_are_prompt = False
return
printable_texts = list()
for idx in range(self.batch_size):
self.token_cache[idx].extend(value[idx].tolist())
text = self.tokenizer.decode(self.token_cache[idx], **self.decode_kwargs)
if text.endswith("\n"):
printable_text = text[self.print_len[idx]:]
self.token_cache[idx] = []
self.print_len[idx] = 0
# If the last token is a CJK character, we print the characters.
elif len(text) > 0 and self._is_chinese_char(ord(text[-1])):
printable_text = text[self.print_len[idx]:]
self.print_len[idx] += len(printable_text)
else:
printable_text = text[self.print_len[idx]:text.rfind(" ") + 1]
self.print_len[idx] += len(printable_text)
printable_texts.append(printable_text)
self.on_finalized_text(printable_texts)
def end(self):
printable_texts = list()
for idx in range(self.batch_size):
if len(self.token_cache[idx]) > 0:
text = self.tokenizer.decode(self.token_cache[idx], **self.decode_kwargs)
printable_text = text[self.print_len[idx]:]
self.token_cache[idx] = []
self.print_len[idx] = 0
else:
printable_text = ""
printable_texts.append(printable_text)
self.next_tokens_are_prompt = True
self.on_finalized_text(printable_texts, stream_end=True)
def on_finalized_text(self, texts: List[str], stream_end: bool = False):
self.text_queue.put(texts, timeout=self.timeout)
if stream_end:
self.text_queue.put(self.stop_signal, timeout=self.timeout)
# Helper function to create an output
def create_output(text="", code=status_code_pb2.SUCCESS):
return resources_pb2.Output(
data=resources_pb2.Data(text=resources_pb2.Text(raw=text)),
status=status_pb2.Status(code=code))
# Helper function to get the inference params
def get_inference_params(request) -> dict:
"""Get the inference params from the request."""
inference_params = {}
if request.model.model_version.id != "":
output_info = request.model.model_version.output_info
output_info = json_format.MessageToDict(output_info, preserving_proto_field_name=True)
if "params" in output_info:
inference_params = output_info["params"]
return inference_params
# Helper function to parse the inference params
def parse_inference_params(request):
default_params = {
"temperature": 0.7,
"max_tokens": 100,
"top_k": 50,
"top_p": 1.0,
"do_sample": True,
}
inference_params = get_inference_params(request)
return {
"temperature": inference_params.get("temperature", default_params["temperature"]),
"max_tokens": int(inference_params.get("max_tokens", default_params["max_tokens"])),
"top_k": int(inference_params.get("top_k", default_params["top_k"])),
"top_p": inference_params.get("top_p", default_params["top_p"]),
"do_sample": inference_params.get("do_sample", default_params["do_sample"]),
}
class MyModel(ModelClass):
"""A custom runner that loads the model and generates text using batched inference."""
def load_model(self):
"""Load the model here."""
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
logger.info(f"Running on device: {self.device}")
# Load model and tokenizer
# if checkpoints section is in config.yaml file then checkpoints will be downloaded at this path during model upload time.
checkpoints = os.path.join(os.path.dirname(__file__), "checkpoints")
self.tokenizer = AutoTokenizer.from_pretrained(checkpoints,)
self.tokenizer.pad_token = self.tokenizer.eos_token
self.model = AutoModelForCausalLM.from_pretrained(
checkpoints,
low_cpu_mem_usage=True,
device_map=self.device,
torch_dtype=torch.float16,
)
logger.info("Done loading!")
def predict(self,
request: service_pb2.PostModelOutputsRequest) -> service_pb2.MultiOutputResponse:
"""This method generates outputs text for the given inputs using the model."""
inference_params = parse_inference_params(request)
prompts = [inp.data.text.raw for inp in request.inputs]
inputs = self.tokenizer(prompts, return_tensors="pt", padding=True).to(self.device)
output_tokens = self.model.generate(
**inputs,
max_new_tokens=inference_params["max_tokens"],
do_sample=inference_params["do_sample"],
temperature=inference_params["temperature"],
top_k=inference_params["top_k"],
top_p=inference_params["top_p"],
eos_token_id=self.tokenizer.eos_token_id,
)
outputs_text = self.tokenizer.batch_decode(
output_tokens[:, inputs['input_ids'].shape[1]:], skip_special_tokens=True)
outputs = []
for text in outputs_text:
outputs.append(create_output(text=text, code=status_code_pb2.SUCCESS))
return service_pb2.MultiOutputResponse(
outputs=outputs, status=status_pb2.Status(code=status_code_pb2.SUCCESS))
def generate(self, request: service_pb2.PostModelOutputsRequest
) -> Iterator[service_pb2.MultiOutputResponse]:
"""This method generates stream of outputs for the given batch of inputs using the model."""
inference_params = parse_inference_params(request)
prompts = [inp.data.text.raw for inp in request.inputs]
batch_size = len(prompts)
# Initialize the custom streamer
streamer = BatchTextIteratorStreamer(
batch_size=batch_size,
tokenizer=self.tokenizer,
skip_prompt=True,
decode_kwargs={
"skip_special_tokens": True
})
# Tokenize the inputs
inputs = self.tokenizer(prompts, return_tensors="pt", padding=True).to(self.device)
generation_kwargs = {
"input_ids": inputs.input_ids,
"attention_mask": inputs.attention_mask,
"max_new_tokens": inference_params["max_tokens"],
"do_sample": inference_params["do_sample"],
"temperature": inference_params["temperature"],
"top_k": inference_params["top_k"],
"top_p": inference_params["top_p"],
"eos_token_id": self.tokenizer.eos_token_id,
"streamer": streamer,
}
# Start generation in a separate thread
thread = Thread(target=self.model.generate, kwargs=generation_kwargs)
thread.start()
# Initialize outputs
outputs = [create_output() for _ in range(batch_size)]
try:
for streamed_texts in streamer: # Iterate over new texts generated
for idx, text in enumerate(streamed_texts): # Iterate over each batch
outputs[idx].data.text.raw = text # Append new text to each output
outputs[idx].status.code = status_code_pb2.SUCCESS
# Yield the current outputs
yield service_pb2.MultiOutputResponse(
outputs=outputs, status=status_pb2.Status(code=status_code_pb2.SUCCESS))
finally:
thread.join()
def stream(self, request_iterator: Iterator[service_pb2.PostModelOutputsRequest]
) -> Iterator[service_pb2.MultiOutputResponse]:
raise NotImplementedError("Stream method is not implemented for the models.")
requirements.txt
- Text
torch==2.5.1
tokenizers>=0.21.0
transformers>=4.47.0
accelerate>=1.2.0
scipy==1.10.1
optimum>=1.23.3
xformers==0.0.28.post3
protobuf==5.27.3
einops>=0.8.0
requests==2.32.2
autoawq==0.2.7.post3
clarifai
config.yaml
- YAML
# This is the sample config file for the llama model.
model:
id: "llama-3-8b-instruct"
user_id: "user_id"
app_id: "app_id"
model_type_id: "text-to-text"
build_info:
python_version: "3.11"
inference_compute_info:
cpu_limit: "1"
cpu_memory: "8Gi"
num_accelerators: 1
accelerator_type: ["NVIDIA-A10G"]
accelerator_memory: "12Gi"
checkpoints:
type: "huggingface"
repo_id: "casperhansen/llama-3-8b-instruct-awq"
hf_token: "hf_token"
You can refer to the examples repository mentioned above for additional examples of uploading other large language models (LLMs).
Speech Recognition Model
model.py
- Python
# Model to be uploaded: https://platform.openai.com/docs/guides/speech-to-text/quickstart
import io
import itertools
import wave
from typing import Iterator
from clarifai.runners.models.model_class import ModelClass
from clarifai_grpc.grpc.api import resources_pb2, service_pb2
from clarifai_grpc.grpc.api.status import status_code_pb2
from google.protobuf import json_format
from openai import OpenAI
OPENAI_API_KEY = "OPENAI_API_KEY"
def bytes_to_audio_file(audio_bytes):
"""Convert bytes data into a file-like object."""
if not audio_bytes:
raise ValueError("Audio bytes cannot be empty.")
audio_file = io.BytesIO(audio_bytes)
audio_file.name = "audio.wav" # This name is used for the API
return audio_file
def preprocess_audio(audio_bytes=None, chunk_duration=1.0, stream=False):
"""
Fetch and preprocess audio data from a URL or bytes.
Parameters:
bytes (bytes): Audio data in bytes (if provided).
chunk_duration (float): Duration of each audio chunk in seconds.
stream (bool): Whether to stream the audio in chunks.
Returns:
Audio data in bytes or a generator of audio chunks.
"""
if audio_bytes:
if stream:
# Read the original audio bytes
audio_bytes = io.BytesIO(audio_bytes)
with wave.open(audio_bytes, "rb") as wave_file:
params = wave_file.getparams()
sample_rate = params.framerate
channels = params.nchannels
sample_width = params.sampwidth
# Calculate number of frames per chunk
frames_per_chunk = int(sample_rate * chunk_duration)
# Stream the audio in chunks (generator)
def audio_stream_generator():
while True:
frames = wave_file.readframes(frames_per_chunk)
if not frames:
break
chunk_buffer = io.BytesIO()
with wave.open(chunk_buffer, "wb") as chunk_wav:
chunk_wav.setnchannels(channels)
chunk_wav.setsampwidth(sample_width)
chunk_wav.setframerate(sample_rate)
chunk_wav.writeframes(frames)
yield chunk_buffer.getvalue()
return audio_stream_generator()
else:
# Return a single chunk of audio
return audio_bytes
else:
raise ValueError("'audio_bytes' must be provided")
def get_inference_params(request) -> dict:
"""Get the inference params from the request."""
inference_params = {}
if request.model.model_version.id != "":
output_info = request.model.model_version.output_info
output_info = json_format.MessageToDict(output_info, preserving_proto_field_name=True)
if "params" in output_info:
inference_params = output_info["params"]
return inference_params
class MyModel(ModelClass):
"""A custom runner that used for transcribing audio."""
def load_model(self):
"""Load the model here."""
self.client = OpenAI(api_key=OPENAI_API_KEY)
self.model = "whisper-1"
def predict(self,
request: service_pb2.PostModelOutputsRequest) -> service_pb2.MultiOutputResponse:
"""Predict the output for the given audio data."""
inference_params = get_inference_params(request)
language = inference_params.get("language", None)
task = inference_params.get("task", "transcription")
outputs = []
# TODO: parallelize this over inputs in a single request.
for input in request.inputs:
output = resources_pb2.Output()
input_data = input.data
audio_bytes = preprocess_audio(audio_bytes=input_data.audio.base64, stream=False)
if task == "transcription":
# Send audio bytes to Whisper for transcription
whisper_output = self.client.audio.transcriptions.create(
model=self.model, language=language, file=bytes_to_audio_file(audio_bytes))
elif task == "translation":
# Send audio bytes to Whisper for translation
whisper_output = self.client.audio.translations.create(
model=self.model, file=bytes_to_audio_file(audio_bytes))
# Set the output data
output.data.text.raw = whisper_output.text
output.status.code = status_code_pb2.SUCCESS
outputs.append(output)
return service_pb2.MultiOutputResponse(outputs=outputs,)
def generate(self, request: service_pb2.PostModelOutputsRequest
) -> Iterator[service_pb2.MultiOutputResponse]:
"""Generate the output in a streaming fashion for large audio files."""
inference_params = get_inference_params(request)
language = inference_params.get("language", None)
task = inference_params.get("task", "transcription")
batch_audio_streams = []
for input in request.inputs:
output = resources_pb2.Output()
input_data = input.data
audio_bytes = input_data.audio.base64
chunk_duration = 3.0
audio_stream = preprocess_audio(
audio_bytes=audio_bytes, stream=True, chunk_duration=chunk_duration)
batch_audio_streams.append(audio_stream)
for audio_stream in itertools.zip_longest(*batch_audio_streams, fillvalue=None):
resp = service_pb2.MultiOutputResponse()
for audio_bytes in audio_stream:
output = resp.outputs.add()
if task == "transcription":
# Send audio bytes to Whisper for transcription
whisper_output = self.client.audio.transcriptions.create(
model=self.model, language=language, file=bytes_to_audio_file(audio_bytes))
elif task == "translation":
# Send audio bytes to Whisper for translation
whisper_output = self.client.audio.translations.create(
model=self.model, file=bytes_to_audio_file(audio_bytes))
output.data.text.raw = whisper_output.text
output.status.code = status_code_pb2.SUCCESS
yield resp
def stream(self, request_iterator: Iterator[service_pb2.PostModelOutputsRequest]
) -> Iterator[service_pb2.MultiOutputResponse]:
"""Stream the output in a streaming fashion"""
for request in request_iterator:
for response in self.generate(request):
yield response
requirements.txt
- Text
openai==1.55.3
requests
clarifai
config.yaml
- YAML
# This is the sample config file for the Openai Whisper model.
model:
id: "model_id"
user_id: "user_id"
app_id: "app_id"
model_type_id: "audio-to-text"
build_info:
python_version: "3.12"
inference_compute_info:
cpu_limit: "1"
cpu_memory: "500m"
num_accelerators: 0