Skip to main content

Model Upload Examples

Learn how to upload and customize your own models on the Clarifai platform


This section provides examples that guide you through uploading custom models to Clarifai. You’ll learn how to adapt pre-built examples, configure model settings, and prepare your project for deployment on the platform.

tip

All the examples here are available in the Clarifai Runners Examples GitHub repository. You can use the Clarifai CLI to download a model template. For example:

clarifai model init --github-url https://github.com/Clarifai/runners-examples/tree/main/local-runners/ollama-model-upload

To use these examples, create a project directory and organize your files as shown below. This structure is required for successfully uploading models to the Clarifai platform:

your_model_directory/
├── 1/
│ └── model.py
├── requirements.txt
└── config.yaml
  • your_model_directory/ – Root directory containing all files related to your custom model.
    • 1/ – A subdirectory (named 1) that contains the main model file.
  • model.py – Defines your model logic, including loading the model and handling inference.
  • requirements.txt – Lists all Python dependencies required to run your model.
  • config.yaml – Includes metadata and configuration details for building the model, such as compute resources and environment settings.

Hello World

1/model.py
from clarifai.runners.models.model_class import ModelClass
from clarifai.runners.utils.data_utils import Param
from typing import Iterator
import random
import string

class MyModel(ModelClass):
"""This is a model that does some string manipulation."""

def load_model(self):
"""Nothing to load for this model."""

@ModelClass.method
def predict(self, prompt: str, number_of_letters: int = Param(default=3, description="number of letters to add")) -> str:
"""Function to append some string information"""
return new_str(prompt, number_of_letters)

@ModelClass.method
def generate(self, prompt: str = "", number_of_letters: int = Param(default=3, description="number of letters to add")) -> Iterator[str]:
"""Example yielding a whole batch of streamed stuff back."""
for i in range(10): # fake something iterating generating 10 times.
yield new_str(str(i) + "-" + prompt, number_of_letters)

@ModelClass.method
def s(self, input_iterator: Iterator[str], number_of_letters: int = Param(default=3, description="number of letters to add")) -> Iterator[str]:
"""Example yielding getting an iterator and yielding back results."""
for i, inp in enumerate(input_iterator):
yield new_str(inp, number_of_letters)


def new_str(input_str: str, number_of_letters: int = 3) -> str:
"""Append a dash and random letters to the input string."""
random_letters = ''.join(random.choices(string.ascii_letters, k=number_of_letters))
return f"{input_str}-{random_letters}\n"


def test_predict() -> None:
"""Test the predict method of MyModel by printing its output."""
model = MyModel()
model.load_model()
print("Testing predict method:")
output = model.predict("TestPredict", number_of_letters=5)
print(output, end="\n")

def test_generate() -> None:
"""Test the generate method of MyModel by printing its outputs."""
model = MyModel()
model.load_model()
print("Testing generate method:")
for output in model.generate("Test", number_of_letters=5):
print(output, end="\n")

if __name__ == "__main__":
test_predict()
test_generate()
requirements.txt
clarifai
config.yaml
model:
id: "my-model-id"
user_id: "my-user-id"
app_id: "my-app-id"
model_type_id: "text-to-text"

build_info:
python_version: "3.12"

inference_compute_info:
cpu_limit: 50m
cpu_memory: 250Mi
num_accelerators: 0

NSFW Image Classifier

1/model.py
import os
from typing import List, Iterator

# Third-party imports
import cv2
import torch
from PIL import Image as PILImage
from transformers import AutoModelForImageClassification, ViTImageProcessor

# Clarifai imports
from clarifai.runners.models.visual_classifier_class import VisualClassifierClass
from clarifai.runners.utils.data_types import Concept, Image, Video
from clarifai.runners.models.model_builder import ModelBuilder
from clarifai.utils.logging import logger


class ImageClassifierModel(VisualClassifierClass):
"""A custom runner that classifies images and outputs concepts."""

def load_model(self):
"""Load the model and processor."""

model_path = os.path.dirname(os.path.dirname(__file__))
builder = ModelBuilder(model_path, download_validation_only=True)
checkpoints = builder.download_checkpoints(stage="runtime")

self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
logger.info(f"Running on device: {self.device}")

self.model = AutoModelForImageClassification.from_pretrained(checkpoints,).to(self.device)
self.model_labels = self.model.config.id2label
self.processor = ViTImageProcessor.from_pretrained(checkpoints)

logger.info("Done loading!")

@VisualClassifierClass.method
def predict(self, image: Image) -> List[Concept]:
"""Predict concepts for a list of images."""
pil_image = VisualClassifierClass.preprocess_image(image.bytes)
inputs = self.processor(images=pil_image, return_tensors="pt")
inputs = {name: tensor.to(self.device) for name, tensor in inputs.items()}
with torch.no_grad():
logits = self.model(**inputs).logits
outputs = VisualClassifierClass.process_concepts(logits, self.model_labels)
return outputs[0]

@VisualClassifierClass.method
def generate(self, video: Video) -> Iterator[List[Concept]]:
"""Generate concepts for frames extracted from a video."""
video_bytes = video.bytes
frame_generator = VisualClassifierClass.video_to_frames(video_bytes)
for frame in frame_generator:
image = VisualClassifierClass.preprocess_image(frame.image.bytes)
inputs = self.processor(images=image, return_tensors="pt")
inputs = {name: tensor.to(self.device) for name, tensor in inputs.items()}
with torch.no_grad():
logits = self.model(**inputs).logits
outputs = VisualClassifierClass.process_concepts(logits, self.model_labels) # Yield concepts for each frame
yield outputs[0]

@VisualClassifierClass.method
def stream_image(self, image_stream: Iterator[Image]) -> Iterator[List[Concept]]:
"""Stream process image inputs."""
for image in image_stream:
result = self.predict(image)
yield result

@VisualClassifierClass.method
def stream_video(self, video_stream: Iterator[Video]) -> Iterator[List[Concept]]:
"""Stream process video inputs."""
for video in video_stream:
for frame_result in self.generate(video):
yield frame_result

def test(self):
"""Test the model functionality."""
import requests # Import moved here as it's only used for testing

# Test configuration
TEST_URLS = {
"images": [
"https://samples.clarifai.com/metro-north.jpg",
"https://samples.clarifai.com/dog.tiff"
],
"video": "https://samples.clarifai.com/beer.mp4"
}

def get_test_data(url):
return Image(bytes=requests.get(url).content)

def get_test_video():
return Video(bytes=requests.get(TEST_URLS["video"]).content)

def run_test(name, test_fn):
logger.info(f"\nTesting {name}...")
try:
test_fn()
logger.info(f"{name} test completed successfully")
except Exception as e:
logger.error(f"Error in {name} test: {e}")

# Test predict
def test_predict():
result = self.predict(get_test_data(TEST_URLS["images"][0]))
logger.info(f"Predict result: {result}")

# Test generate
def test_generate():
for classifications in self.generate(get_test_video()):
logger.info(f"First frame classifications: {classifications}")
break

# Test stream
def test_stream():
# Split into two separate test functions for clarity
def test_stream_image():
images = [get_test_data(url) for url in TEST_URLS["images"]]
for result in self.stream_image(iter(images)):
logger.info(f"Image stream result: {result}")

def test_stream_video():
for result in self.stream_video(iter([get_test_video()])):
logger.info(f"Video stream result: {result}")
break # Just test first frame

logger.info("\nTesting image streaming...")
test_stream_image()
logger.info("\nTesting video streaming...")
test_stream_video()

# Run all tests
for test_name, test_fn in [
("predict", test_predict),
("generate", test_generate),
("stream", test_stream)
]:
run_test(test_name, test_fn)
requirements.txt
torch==2.6.0
transformers>=4.51.1
pillow==10.4.0
requests==2.32.3
timm==1.0.12
opencv-python-headless==4.10.0.84
clarifai>=11.5.0,<12.0.0
config.yaml
# This is the sample config file for the image-classification model.

model:
id: "nsfw_image_detection"
user_id: "user_id"
app_id: "app_id"
model_type_id: "visual-detector"

build_info:
python_version: "3.11"

inference_compute_info:
cpu_limit: "2"
cpu_memory: "2Gi"
num_accelerators: 1
accelerator_type: ["NVIDIA-*"]
accelerator_memory: "3Gi"


checkpoints:
type: "huggingface"
repo_id: "Falconsai/nsfw_image_detection"

DETR ResNet Image Detector

1/model.py
# Standard library imports
import os
from typing import List, Dict, Any, Iterator

# Third-party imports
import cv2
import torch
from PIL import Image as PILImage
from transformers import DetrForObjectDetection, DetrImageProcessor

# Clarifai imports
from clarifai.runners.models.model_builder import ModelBuilder
from clarifai.runners.models.visual_detector_class import VisualDetectorClass
from clarifai.runners.utils.data_types import Image, Video, Region, Frame
from clarifai.utils.logging import logger


def detect_objects(
images: List[PILImage],
model: DetrForObjectDetection,
processor: DetrImageProcessor,
device: str
) -> Dict[str, Any]:
"""Process images through the DETR model to detect objects.

Args:
images: List of preprocessed images
model: DETR model instance
processor: Image processor for DETR
device: Computation device (CPU/GPU)

Returns:
Detection results from the model
"""
model_inputs = processor(images=images, return_tensors="pt").to(device)
model_inputs = {name: tensor.to(device) for name, tensor in model_inputs.items()}
model_output = model(**model_inputs)
results = processor.post_process_object_detection(model_output)
return results


class MyRunner(VisualDetectorClass):
"""A custom runner for DETR object detection model that processes images and videos"""

def load_model(self):
"""Load the model here."""
model_path = os.path.dirname(os.path.dirname(__file__))
builder = ModelBuilder(model_path, download_validation_only=True)
checkpoint_path = builder.download_checkpoints(stage="runtime")

self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
logger.info(f"Running on device: {self.device}")

self.model = DetrForObjectDetection.from_pretrained(checkpoint_path).to(self.device)
self.processor = DetrImageProcessor.from_pretrained(checkpoint_path)
self.model.eval()
self.threshold = 0.9
self.model_labels = self.model.config.id2label

logger.info("Done loading!")

@VisualDetectorClass.method
def predict(self, image: Image) -> List[Region]:
"""Process a single image and return detected objects."""
image_bytes = image.bytes
image = VisualDetectorClass.preprocess_image(image_bytes)

with torch.no_grad():
results = detect_objects([image], self.model, self.processor, self.device)
outputs = VisualDetectorClass.process_detections(results, self.threshold, self.model_labels)
return outputs[0] # Return detections for single image

@VisualDetectorClass.method
def generate(self, video: Video) -> Iterator[Frame]:
"""Process video frames and yield detected objects for each frame."""
frame_generator = VisualDetectorClass.video_to_frames(video.bytes)
for frame in frame_generator:
with torch.no_grad():
image = VisualDetectorClass.preprocess_image(frame.image.bytes)
results = detect_objects([image], self.model, self.processor, self.device)
outputs = VisualDetectorClass.process_detections(results, self.threshold, self.model_labels)
frame.regions = outputs[0] # Assign detections to the frame
yield frame # Yield the frame with detections

@VisualDetectorClass.method
def stream_image(self, image_stream: Iterator[Image]) -> Iterator[List[Region]]:
"""Stream process image inputs."""
for image in image_stream:
result = self.predict(image)
yield result

@VisualDetectorClass.method
def stream_video(self, video_stream: Iterator[Video]) -> Iterator[Frame]:
"""Stream process video inputs."""
for video in video_stream:
for frame_result in self.generate(video):
yield frame_result

def test(self):
"""Test the model functionality."""
import requests # Import moved here as it's only used for testing

# Test configuration
TEST_URLS = {
"images": [
"https://samples.clarifai.com/metro-north.jpg",
"https://samples.clarifai.com/dog.tiff"
],
"video": "https://samples.clarifai.com/beer.mp4"
}

def get_test_data(url):
return Image(bytes=requests.get(url).content)

def get_test_video():
return Video(bytes=requests.get(TEST_URLS["video"]).content)

def run_test(name, test_fn):
logger.info(f"\nTesting {name}...")
try:
test_fn()
logger.info(f"{name} test completed successfully")
except Exception as e:
logger.error(f"Error in {name} test: {e}")

# Test predict
def test_predict():
result = self.predict(get_test_data(TEST_URLS["images"][0]))
logger.info(f"Predict result: {result}")

# Test generate
def test_generate():
for detections in self.generate(get_test_video()):
logger.info(f"First frame detections: {detections}")
break

# Test stream
def test_stream():
# Split into two separate test functions for clarity
def test_stream_image():
images = [get_test_data(url) for url in TEST_URLS["images"]]
for result in self.stream_image(iter(images)):
logger.info(f"Image stream result: {result}")

def test_stream_video():
for result in self.stream_video(iter([get_test_video()])):
logger.info(f"Video stream result: {result}")
break # Just test first frame

logger.info("\nTesting image streaming...")
test_stream_image()
logger.info("\nTesting video streaming...")
test_stream_video()

# Run all tests
for test_name, test_fn in [
("predict", test_predict),
("generate", test_generate),
("stream", test_stream)
]:
run_test(test_name, test_fn)
requirements.txt
torch==2.6.0
transformers>=4.51.1
pillow==10.4.0
requests==2.32.3
timm==1.0.12
opencv-python-headless==4.10.0.84
clarifai>=11.4.10,<12.0.0
config.yaml
# This is the sample config file for the image-detection model.

model:
id: "detr-resnet-50"
user_id: "user_id"
app_id: "app_id"
model_type_id: "visual-detector"

build_info:
python_version: "3.11"

inference_compute_info:
cpu_limit: "4"
cpu_memory: "2Gi"
num_accelerators: 1
accelerator_type: ["NVIDIA-*"]
accelerator_memory: "5Gi"


checkpoints:
type: "huggingface"
repo_id: "facebook/detr-resnet-50"
hf_token: "hf_token"

Image Segmenter

Mask2Former ADE

1/model.py
import os

from typing import Iterator, List, Tuple

from clarifai.runners.models.visual_detector_class import VisualDetectorClass
from clarifai.runners.utils import data_types as dt
from clarifai.utils.logging import logger

import yaml
import torch
from PIL import Image as PILImage
from transformers import AutoImageProcessor, Mask2FormerForUniversalSegmentation


ROOT = os.path.dirname(__file__)


class MyRunner(VisualDetectorClass):

def _load_concepts(self, config_path, name, model_path):

with open(config_path, "r") as f:
data = yaml.safe_load(f)

# Map Clarifai concept name to id and reverse
self.conceptid2name = {each["id"] : each["name"] for each in data.get("concepts", [])}
self.conceptname2id = {each["name"] : each["id"] for each in data.get("concepts", [])}

def load_model(self):
"""Load the model here."""
checkpoint_path = "facebook/mask2former-swin-tiny-ade-semantic"
self.device = 'cuda' #if torch.cuda.is_available() else 'cpu'
logger.info(f"Running on device: {self.device}")
self.model = Mask2FormerForUniversalSegmentation.from_pretrained(
checkpoint_path, trust_remote_code=True).to(self.device)
self.processor = AutoImageProcessor.from_pretrained(checkpoint_path, trust_remote_code=True)
self.model.eval()
# Load clarifai concept
config_path = os.path.join(ROOT, "../config.yaml")
self._load_concepts(config_path, "mask2former-ade", checkpoint_path)

logger.info("Done loading!")


@VisualDetectorClass.method
def model_predict(
self,
images: List[PILImage.Image]
) -> List[List[dt.Region]]:

inputs = self.processor(images=images, return_tensors="pt").to(self.device)
with torch.no_grad():
outputs = self.model(**inputs)
target_sizes = [image.size[::-1] for image in images]
results = self.processor.post_process_semantic_segmentation(outputs, target_sizes=target_sizes)
outputs = []
for i, all_masks_tensor in enumerate(results):
masks = []
for clss_id in all_masks_tensor.unique().tolist():
label = self.model.config.id2label[clss_id]
mask = torch.zeros_like(all_masks_tensor)
mask[all_masks_tensor == clss_id] = 255
mask = mask.cpu().numpy() if self.device == "cuda" else mask.numpy()
mask = PILImage.fromarray(mask.astype("uint8"))
region = dt.Region(
mask=mask,
concepts=[dt.Concept(id=self.conceptname2id[label], name=label)]
)
masks.append(region)
outputs.append(masks)

return outputs

@VisualDetectorClass.method
def predict(self, image: dt.Image) -> List[dt.Region]:
return self.model_predict([image.to_pil()])[0]

@VisualDetectorClass.method
def stream(self, images: Iterator[dt.Image]) -> Iterator[dt.Region]:
for each in images:
yield self.predict(image=each)

@VisualDetectorClass.method
def generate(self, video: dt.Video) -> Iterator[dt.Region]:
for frame in self.video_to_frames(video.bytes):
yield self.predict(image=frame.image)

def test(self):
import requests

image = dt.Image(bytes=requests.get("https://samples.clarifai.com/metro-north.jpg").content)
video = dt.Video(bytes=requests.get("https://samples.clarifai.com/beer.mp4").content)

logger.info("# -------- Test predict/detect -------------")
logger.info(f"{self.predict(image=image)}")
logger.info("# -------- Test generate -------------")
n=5
for i, each in enumerate(self.generate(video=video)):
print(each)
if i > n:
break

logger.info("# -------- Test stream -------------")

def iteration():
for each in [image]*10:
yield each

for i, each in enumerate(self.stream(images=iteration())):
print(each)
if i > n:
break
requirements.txt
torch==2.6.0
tokenizers>=0.19.1
transformers>=4.44.2
pillow>=10.4.0
requests>=2.32.3
timm
opencv-python-headless==4.10.0.84
numpy
aiohttp
scipy>=1.10.1
config.yaml
model:
id: facebook_mask2former-swin-tiny-ade-semantic
user_id: a
app_id: b
model_type_id: visual-segmenter
build_info:
python_version: '3.12'
inference_compute_info:
cpu_limit: '2'
cpu_memory: 8Gi
num_accelerators: 1
accelerator_type:
- NVIDIA-*
accelerator_memory: 21Gi

concepts:
- id: id-mask2former-ade-0
name: wall
- id: id-mask2former-ade-1
name: building
- id: id-mask2former-ade-2
name: sky
- id: id-mask2former-ade-3
name: floor
- id: id-mask2former-ade-4
name: tree
- id: id-mask2former-ade-5
name: ceiling
- id: id-mask2former-ade-6
name: road
- id: id-mask2former-ade-7
name: 'bed '
- id: id-mask2former-ade-8
name: windowpane
- id: id-mask2former-ade-9
name: grass
- id: id-mask2former-ade-10
name: cabinet
- id: id-mask2former-ade-11
name: sidewalk
- id: id-mask2former-ade-12
name: person
- id: id-mask2former-ade-13
name: earth
- id: id-mask2former-ade-14
name: door
- id: id-mask2former-ade-15
name: table
- id: id-mask2former-ade-16
name: mountain
- id: id-mask2former-ade-17
name: plant
- id: id-mask2former-ade-18
name: curtain
- id: id-mask2former-ade-19
name: chair
- id: id-mask2former-ade-20
name: car
- id: id-mask2former-ade-21
name: water
- id: id-mask2former-ade-22
name: painting
- id: id-mask2former-ade-23
name: sofa
- id: id-mask2former-ade-24
name: shelf
- id: id-mask2former-ade-25
name: house
- id: id-mask2former-ade-26
name: sea
- id: id-mask2former-ade-27
name: mirror
- id: id-mask2former-ade-28
name: rug
- id: id-mask2former-ade-29
name: field
- id: id-mask2former-ade-30
name: armchair
- id: id-mask2former-ade-31
name: seat
- id: id-mask2former-ade-32
name: fence
- id: id-mask2former-ade-33
name: desk
- id: id-mask2former-ade-34
name: rock
- id: id-mask2former-ade-35
name: wardrobe
- id: id-mask2former-ade-36
name: lamp
- id: id-mask2former-ade-37
name: bathtub
- id: id-mask2former-ade-38
name: railing
- id: id-mask2former-ade-39
name: cushion
- id: id-mask2former-ade-40
name: base
- id: id-mask2former-ade-41
name: box
- id: id-mask2former-ade-42
name: column
- id: id-mask2former-ade-43
name: signboard
- id: id-mask2former-ade-44
name: chest of drawers
- id: id-mask2former-ade-45
name: counter
- id: id-mask2former-ade-46
name: sand
- id: id-mask2former-ade-47
name: sink
- id: id-mask2former-ade-48
name: skyscraper
- id: id-mask2former-ade-49
name: fireplace
- id: id-mask2former-ade-50
name: refrigerator
- id: id-mask2former-ade-51
name: grandstand
- id: id-mask2former-ade-52
name: path
- id: id-mask2former-ade-53
name: stairs
- id: id-mask2former-ade-54
name: runway
- id: id-mask2former-ade-55
name: case
- id: id-mask2former-ade-56
name: pool table
- id: id-mask2former-ade-57
name: pillow
- id: id-mask2former-ade-58
name: screen door
- id: id-mask2former-ade-59
name: stairway
- id: id-mask2former-ade-60
name: river
- id: id-mask2former-ade-61
name: bridge
- id: id-mask2former-ade-62
name: bookcase
- id: id-mask2former-ade-63
name: blind
- id: id-mask2former-ade-64
name: coffee table
- id: id-mask2former-ade-65
name: toilet
- id: id-mask2former-ade-66
name: flower
- id: id-mask2former-ade-67
name: book
- id: id-mask2former-ade-68
name: hill
- id: id-mask2former-ade-69
name: bench
- id: id-mask2former-ade-70
name: countertop
- id: id-mask2former-ade-71
name: stove
- id: id-mask2former-ade-72
name: palm
- id: id-mask2former-ade-73
name: kitchen island
- id: id-mask2former-ade-74
name: computer
- id: id-mask2former-ade-75
name: swivel chair
- id: id-mask2former-ade-76
name: boat
- id: id-mask2former-ade-77
name: bar
- id: id-mask2former-ade-78
name: arcade machine
- id: id-mask2former-ade-79
name: hovel
- id: id-mask2former-ade-80
name: bus
- id: id-mask2former-ade-81
name: towel
- id: id-mask2former-ade-82
name: light
- id: id-mask2former-ade-83
name: truck
- id: id-mask2former-ade-84
name: tower
- id: id-mask2former-ade-85
name: chandelier
- id: id-mask2former-ade-86
name: awning
- id: id-mask2former-ade-87
name: streetlight
- id: id-mask2former-ade-88
name: booth
- id: id-mask2former-ade-89
name: television receiver
- id: id-mask2former-ade-90
name: airplane
- id: id-mask2former-ade-91
name: dirt track
- id: id-mask2former-ade-92
name: apparel
- id: id-mask2former-ade-93
name: pole
- id: id-mask2former-ade-94
name: land
- id: id-mask2former-ade-95
name: bannister
- id: id-mask2former-ade-96
name: escalator
- id: id-mask2former-ade-97
name: ottoman
- id: id-mask2former-ade-98
name: bottle
- id: id-mask2former-ade-99
name: buffet
- id: id-mask2former-ade-100
name: poster
- id: id-mask2former-ade-101
name: stage
- id: id-mask2former-ade-102
name: van
- id: id-mask2former-ade-103
name: ship
- id: id-mask2former-ade-104
name: fountain
- id: id-mask2former-ade-105
name: conveyer belt
- id: id-mask2former-ade-106
name: canopy
- id: id-mask2former-ade-107
name: washer
- id: id-mask2former-ade-108
name: plaything
- id: id-mask2former-ade-109
name: swimming pool
- id: id-mask2former-ade-110
name: stool
- id: id-mask2former-ade-111
name: barrel
- id: id-mask2former-ade-112
name: basket
- id: id-mask2former-ade-113
name: waterfall
- id: id-mask2former-ade-114
name: tent
- id: id-mask2former-ade-115
name: bag
- id: id-mask2former-ade-116
name: minibike
- id: id-mask2former-ade-117
name: cradle
- id: id-mask2former-ade-118
name: oven
- id: id-mask2former-ade-119
name: ball
- id: id-mask2former-ade-120
name: food
- id: id-mask2former-ade-121
name: step
- id: id-mask2former-ade-122
name: tank
- id: id-mask2former-ade-123
name: trade name
- id: id-mask2former-ade-124
name: microwave
- id: id-mask2former-ade-125
name: pot
- id: id-mask2former-ade-126
name: animal
- id: id-mask2former-ade-127
name: bicycle
- id: id-mask2former-ade-128
name: lake
- id: id-mask2former-ade-129
name: dishwasher
- id: id-mask2former-ade-130
name: screen
- id: id-mask2former-ade-131
name: blanket
- id: id-mask2former-ade-132
name: sculpture
- id: id-mask2former-ade-133
name: hood
- id: id-mask2former-ade-134
name: sconce
- id: id-mask2former-ade-135
name: vase
- id: id-mask2former-ade-136
name: traffic light
- id: id-mask2former-ade-137
name: tray
- id: id-mask2former-ade-138
name: ashcan
- id: id-mask2former-ade-139
name: fan
- id: id-mask2former-ade-140
name: pier
- id: id-mask2former-ade-141
name: crt screen
- id: id-mask2former-ade-142
name: plate
- id: id-mask2former-ade-143
name: monitor
- id: id-mask2former-ade-144
name: bulletin board
- id: id-mask2former-ade-145
name: shower
- id: id-mask2former-ade-146
name: radiator
- id: id-mask2former-ade-147
name: glass
- id: id-mask2former-ade-148
name: clock
- id: id-mask2former-ade-149
name: flag

Image-Text-to-Image

Stable Diffusion 2 Depth

1/model.py
from typing import List
import os

from clarifai.runners.models.model_builder import ModelBuilder
from clarifai.runners.models.openai_class import OpenAIModelClass
from clarifai.runners.utils.data_types import Image

from clarifai.runners.utils.data_utils import Param
from clarifai.utils.logging import logger

from diffusers import StableDiffusionDepth2ImgPipeline
from diffusers.utils import load_image

import torch



class StableDiffusion(OpenAIModelClass):
"""
A Model that integrates with the Clarifai platform and uses the FluxFillPipeline for image inpainting.
"""

client = True # This will be set in load_model method
model = True # This will be set in load_model method

def load_model(self):
"""Load the model here."""
model_path = os.path.dirname(os.path.dirname(__file__))
builder = ModelBuilder(model_path, download_validation_only=True)
hf_token = builder.config["checkpoints"]["hf_token"]

self.client = StableDiffusionDepth2ImgPipeline.from_pretrained("stabilityai/stable-diffusion-2-depth",
torch_dtype=torch.float16,
use_safetensors=True,
token=hf_token)
self.client.to("cuda" if torch.cuda.is_available() else "cpu")
logger.info("stable-diffusion model loaded successfully.")


@OpenAIModelClass.method
def predict(self,
prompt: str,
image: Image = None,
negative_prompt: str = None,
mask: Image = None,
strength: float = 0.8
) -> Image:
"""
Predict method that uses the FluxFillPipeline to inpaint images based on the provided prompt.

"""
if image:
if image.url:
image = load_image(image.url)
elif image.bytes:
image=image.to_pil()

response = self.client(prompt=prompt,
image=image,
negative_prompt=negative_prompt,
strength=strength).images[0]

return Image.from_pil(pil_image = response)
requirements.txt
tokenizers==0.21.0
transformers>=4.48
diffusers==0.32.2
accelerate==1.2.0
optimum==1.23.3
xformers
einops==0.8.0
requests==2.32.3
numpy>2.0
torch==2.5.1
clarifai
clarifai-protocol
config.yaml
model:
id: "stable-diffusion-2-depth"
user_id: "user_id"
app_id: "app_id"
model_type_id: "multimodal-to-text"

build_info:
python_version: '3.11'

inference_compute_info:
cpu_limit: '3'
cpu_memory: 15Gi
num_accelerators: 1
accelerator_type: ["NVIDIA-*"]
accelerator_memory: 6Gi

checkpoints:
type: huggingface
repo_id: "stabilityai/stable-diffusion-2-depth"
hf_token: "hf_token"
when: runtime

LLM

SmolLM2 1.7B Instruct (SGLang)

1/model.py
import os
import sys

sys.path.append(os.path.dirname(__file__))
from typing import Iterator, List

from clarifai.runners.models.model_builder import ModelBuilder
from clarifai.runners.models.openai_class import OpenAIModelClass
from clarifai.runners.utils.data_utils import Param
from clarifai.runners.utils.openai_convertor import build_openai_messages
from clarifai.utils.logging import logger
from openai import OpenAI
from openai_server_starter import OpenAI_APIServer

##################

class SglangModel(OpenAIModelClass):
"""
A custom runner that integrates with the Clarifai platform and uses Server inference
to process inputs, including text.
"""

client = True # This will be set in load_model method
model = True # This will be set in load_model method

def load_model(self):
"""Load the model here and start the server."""
os.path.join(os.path.dirname(__file__))
# Use downloaded checkpoints.
# Or if you intend to download checkpoint at runtime, set hf id instead. For example:
# checkpoints = "Qwen/Qwen2-7B-Instruct"

# server args were generated by `upload` module
server_args = {
'dtype': 'auto',
'kv_cache_dtype': 'auto',
'tp_size': 1,
'load_format': 'auto',
'context_length': None,
'device': 'cuda',
'port': 23333,
'host': '0.0.0.0',
'mem_fraction_static': 0.9,
'max_total_tokens': '8192',
'max_prefill_tokens': None,
'schedule_policy': 'fcfs',
'schedule_conservativeness': 1.0,
'checkpoints': 'runtime'}

# if checkpoints == "checkpoints" => assign to checkpoints var aka local checkpoints path
stage = server_args.get("checkpoints")
if stage in ["build", "runtime"]:
#checkpoints = os.path.join(os.path.dirname(__file__), "checkpoints")
config_path = os.path.dirname(os.path.dirname(__file__))
builder = ModelBuilder(config_path, download_validation_only=True)
checkpoints = builder.download_checkpoints(stage=stage)
server_args.update({"checkpoints": checkpoints})

if server_args.get("additional_list_args") == ['']:
server_args.pop("additional_list_args")

# Start server
# This line were generated by `upload` module
self.server = OpenAI_APIServer.from_sglang_backend(**server_args)

# Create client
self.client = OpenAI(
api_key="notset",
base_url=SglangModel.make_api_url(self.server.host, self.server.port))
self.model = self._get_model()

logger.info(f"OpenAI {self.model} model loaded successfully!")

def _get_model(self):
try:
return self.client.models.list().data[0].id
except Exception as e:
raise ConnectionError("Failed to retrieve model ID from API") from e

@staticmethod
def make_api_url(host: str, port: int, version: str = "v1") -> str:
return f"http://{host}:{port}/{version}"

@OpenAIModelClass.method
def predict(self,
prompt: str,
chat_history: List[dict] = None,
max_tokens: int = Param(default=512, description="The maximum number of tokens to generate. Shorter token lengths will provide faster performance.", ),
temperature: float = Param(default=0.7, description="A decimal number that determines the degree of randomness in the response", ),
top_p: float = Param(default=0.8, description="An alternative to sampling with temperature, where the model considers the results of the tokens with top_p probability mass.", )
) -> str:
"""This is the method that will be called when the runner is run. It takes in an input and
returns an output.
"""
openai_messages = build_openai_messages(prompt=prompt, messages=chat_history)
response = self.client.chat.completions.create(
model=self.model,
messages=openai_messages,
max_completion_tokens=max_tokens,
temperature=temperature,
top_p=top_p)
if response.usage and response.usage.prompt_tokens and response.usage.completion_tokens:
self.set_output_context(prompt_tokens=response.usage.prompt_tokens, completion_tokens=response.usage.completion_tokens)
return response.choices[0].message.content

@OpenAIModelClass.method
def generate(self,
prompt: str,
chat_history: List[dict] = None,
max_tokens: int = Param(default=512, description="The maximum number of tokens to generate. Shorter token lengths will provide faster performance.", ),
temperature: float = Param(default=0.7, description="A decimal number that determines the degree of randomness in the response", ),
top_p: float = Param(default=0.8, description="An alternative to sampling with temperature, where the model considers the results of the tokens with top_p probability mass.", )
) -> Iterator[str]:
"""Example yielding a whole batch of streamed stuff back."""
openai_messages = build_openai_messages(prompt=prompt, messages=chat_history)
for chunk in self.client.chat.completions.create(
model=self.model,
messages=openai_messages,
max_completion_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stream=True):
if chunk.choices:
text = (chunk.choices[0].delta.content
if (chunk and chunk.choices[0].delta.content) is not None else '')
yield text

# This method is needed to test the model with the test-locally CLI command.
def test(self):
"""Test the model here."""
try:
print("Testing predict...")
# Test predict
print(self.predict(prompt="Hello, how are you?",))
except Exception as e:
print("Error in predict", e)

try:
print("Testing generate...")
# Test generate
for each in self.generate(prompt="Hello, how are you?",):
print(each, end=" ")
except Exception as e:
print("Error in generate", e)
1/openai_server_starter.py
import os
import signal
import subprocess
import sys
import threading
from typing import List

import psutil
from clarifai.utils.logging import logger

PYTHON_EXEC = sys.executable


def kill_process_tree(parent_pid, include_parent: bool = True, skip_pid: int = None):
"""Kill the process and all its child processes."""
if parent_pid is None:
parent_pid = os.getpid()
include_parent = False

try:
itself = psutil.Process(parent_pid)
except psutil.NoSuchProcess:
return

children = itself.children(recursive=True)
for child in children:
if child.pid == skip_pid:
continue
try:
child.kill()
except psutil.NoSuchProcess:
pass

if include_parent:
try:
itself.kill()

# Sometime processes cannot be killed with SIGKILL (e.g, PID=1 launched by kubernetes),
# so we send an additional signal to kill them.
itself.send_signal(signal.SIGQUIT)
except psutil.NoSuchProcess:
pass


class OpenAI_APIServer:

def __init__(self, **kwargs):
self.server_started_event = threading.Event()
self.process = None
self.backend = None
self.server_thread = None

def __del__(self, *exc):
# This is important
# close the server when exit the program
self.close()

def close(self):
if self.process:
try:
kill_process_tree(self.process.pid)
except:
self.process.terminate()
if self.server_thread:
self.server_thread.join()

def wait_for_startup(self):
self.server_started_event.wait()

def validate_if_server_start(self, line: str):
line_lower = line.lower()
if self.backend in ["vllm", "sglang", "lmdeploy"]:
if self.backend == "vllm":
return "application startup complete" in line_lower or "vllm api server on" in line_lower
else:
return f" running on http://{self.host}:" in line.strip()
elif self.backend == "llamacpp":
return "waiting for new tasks" in line_lower
elif self.backend == "tgi":
return "Connected" in line.strip()

def _start_server(self, cmds):
try:
env = os.environ.copy()
env["VLLM_USAGE_SOURCE"] = "production-docker-image"
self.process = subprocess.Popen(
cmds,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
for line in self.process.stdout:
logger.info("Server Log: " + line.strip())
if self.validate_if_server_start(line):
self.server_started_event.set()
# break
except Exception as e:
if self.process:
self.process.terminate()
raise RuntimeError(f"Failed to start Server server: {e}")

def start_server_thread(self, cmds: str):
try:
# Start the server in a separate thread
self.server_thread = threading.Thread(target=self._start_server, args=(cmds,), daemon=None)
self.server_thread.start()

# Wait for the server to start
self.wait_for_startup()
except Exception as e:
raise Exception(e)

@classmethod
def from_sglang_backend(
cls,
checkpoints,
dtype: str = "auto",
kv_cache_dtype: str = "auto",
tp_size: int = 1,
quantization: str = None,
load_format: str = "auto",
context_length: str = None,
device: str = "cuda",
port=23333,
host="0.0.0.0",
chat_template: str = None,
mem_fraction_static: float = 0.8,
max_running_requests: int = None,
max_total_tokens: int = None,
max_prefill_tokens: int = None,
schedule_policy: str = "fcfs",
schedule_conservativeness: float = 1.0,
cpu_offload_gb: int = 0,
additional_list_args: List[str] = [],
):
"""Start SGlang OpenAI compatible server.

Args:
checkpoints (str): model id or path.
dtype (str, optional): Dtype used for the model {"auto", "half", "float16", "bfloat16", "float", "float32"}. Defaults to "auto".
kv_cache_dtype (str, optional): Dtype of the kv cache, defaults to the dtype. Defaults to "auto".
tp_size (int, optional): The number of GPUs the model weights get sharded over. Mainly for saving memory rather than for high throughput. Defaults to 1.
quantization (str, optional): Quantization format {"awq","fp8","gptq","marlin","gptq_marlin","awq_marlin","bitsandbytes","gguf","modelopt","w8a8_int8"}. Defaults to None.
load_format (str, optional): The format of the model weights to load:\n* `auto`: will try to load the weights in the safetensors format and fall back to the pytorch bin format if safetensors format is not available.\n* `pt`: will load the weights in the pytorch bin format. \n* `safetensors`: will load the weights in the safetensors format. \n* `npcache`: will load the weights in pytorch format and store a numpy cache to speed up the loading. \n* `dummy`: will initialize the weights with random values, which is mainly for profiling.\n* `gguf`: will load the weights in the gguf format. \n* `bitsandbytes`: will load the weights using bitsandbytes quantization."\n* `layered`: loads weights layer by layer so that one can quantize a layer before loading another to make the peak memory envelope smaller.\n. Defaults to "auto".\n
context_length (str, optional): The model's maximum context length. Defaults to None (will use the value from the model's config.json instead). Defaults to None.
device (str, optional): The device type {"cuda", "xpu", "hpu", "cpu"}. Defaults to "cuda".
port (int, optional): Port number. Defaults to 23333.
host (str, optional): Host name. Defaults to "0.0.0.0".
chat_template (str, optional): The buliltin chat template name or the path of the chat template file. This is only used for OpenAI-compatible API server.. Defaults to None.
mem_fraction_static (float, optional): The fraction of the memory used for static allocation (model weights and KV cache memory pool). Use a smaller value if you see out-of-memory errors. Defaults to 0.8.
max_running_requests (int, optional): The maximum number of running requests.. Defaults to None.
max_total_tokens (int, optional): The maximum number of tokens in the memory pool. If not specified, it will be automatically calculated based on the memory usage fraction. This option is typically used for development and debugging purposes.. Defaults to None.
max_prefill_tokens (int, optional): The maximum number of tokens in a prefill batch. The real bound will be the maximum of this value and the model's maximum context length. Defaults to None.
schedule_policy (str, optional): The scheduling policy of the requests {"lpm", "random", "fcfs", "dfs-weight"}. Defaults to "fcfs".
schedule_conservativeness (float, optional): How conservative the schedule policy is. A larger value means more conservative scheduling. Use a larger value if you see requests being retracted frequently. Defaults to 1.0.
cpu_offload_gb (int, optional): How many GBs of RAM to reserve for CPU offloading. Defaults to 0.
additional_list_args (List[str], optional): additional args to run subprocess cmd e.g. ["--arg-name", "arg value"]. See more at [github](https://github.com/sgl-project/sglang/blob/1baa9e6cf90b30aaa7dae51c01baa25229e8f7d5/python/sglang/srt/server_args.py#L298). Defaults to [].

Returns:
_type_: _description_
"""

from sglang.utils import execute_shell_command, wait_for_server

cmds = [
PYTHON_EXEC, '-m', 'sglang.launch_server', '--model-path', checkpoints, '--dtype',
str(dtype), '--device',
str(device), '--kv-cache-dtype',
str(kv_cache_dtype), '--tp-size',
str(tp_size), '--load-format',
str(load_format), '--mem-fraction-static',
str(mem_fraction_static), '--schedule-policy',
str(schedule_policy), '--schedule-conservativeness',
str(schedule_conservativeness), '--port',
str(port), '--host',
host, "--trust-remote-code"
]
if chat_template:
cmds += ["--chat-template", chat_template]
if quantization:
cmds += [
'--quantization',
quantization,
]
if context_length:
cmds += [
'--context-length',
context_length,
]
if max_running_requests:
cmds += [
'--max-running-requests',
max_running_requests,
]
if max_total_tokens:
cmds += [
'--max-total-tokens',
max_total_tokens,
]
if max_prefill_tokens:
cmds += [
'--max-prefill-tokens',
max_prefill_tokens,
]

if additional_list_args:
cmds += additional_list_args

print("CMDS to run `sglang` server: ", " ".join(cmds), "\n")
_self = cls()

_self.host = host
_self.port = port
_self.backend = "sglang"
# _self.start_server_thread(cmds)
# new_path = os.environ["PATH"] + ":/sbin"
# _self.process = subprocess.Popen(cmds, text=True, stderr=subprocess.STDOUT, env={**os.environ, "PATH": new_path})
_self.process = execute_shell_command(" ".join(cmds))

logger.info("Waiting for " + f"http://{_self.host}:{_self.port}")
wait_for_server(f"http://{_self.host}:{_self.port}")
logger.info("Done")

return _self
requirements.txt
torch==2.6.0
tokenizers==0.21.1
accelerate==1.2.0
optimum==1.23.3
xformers
einops==0.8.0
packaging
ninja

qwen-vl-utils==0.0.8
timm==1.0.12
openai
clarifai>=11.5.0,<12.0.0
psutil
--extra-index-url https://flashinfer.ai/whl/cu124/torch2.4/
flashinfer
sglang[all]==0.4.6
transformers==4.51.1
config.yaml
# Config file for the Sglang runner

model:
id: "SmolLM2-1_7B-Instruct"
user_id: "USER_ID"
app_id: "APP_ID"
model_type_id: "text-to-text"

build_info:
python_version: "3.11"

inference_compute_info:
cpu_limit: "1"
cpu_memory: "6Gi"
num_accelerators: 1
accelerator_type: ["NVIDIA-*"]
accelerator_memory: "44Gi"

checkpoints:
type: "huggingface"
repo_id: "HuggingFaceTB/SmolLM2-1.7B-Instruct"
hf_token: "hf_token"

LLaMA 3.2 1B Instruct (Hugging Face)

1/model.py
from typing import List, Iterator
from threading import Thread
import os
import torch

from clarifai.runners.models.model_class import ModelClass
from clarifai.utils.logging import logger
from clarifai.runners.models.model_builder import ModelBuilder
from clarifai.runners.utils.openai_convertor import openai_response
from clarifai.runners.utils.data_utils import Param
from transformers import (AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer)


class MyModel(ModelClass):
"""A custom runner for llama-3.2-1b-instruct llm that integrates with the Clarifai platform"""

def load_model(self):
"""Load the model here."""
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
logger.info(f"Running on device: {self.device}")

# Load checkpoints
model_path = os.path.dirname(os.path.dirname(__file__))
builder = ModelBuilder(model_path, download_validation_only=True)
self.checkpoints = builder.download_checkpoints(stage="runtime")

# Load model and tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(self.checkpoints,)
self.tokenizer.pad_token = self.tokenizer.eos_token # Set pad token to eos token
self.model = AutoModelForCausalLM.from_pretrained(
self.checkpoints,
low_cpu_mem_usage=True,
device_map=self.device,
torch_dtype=torch.bfloat16,
)
self.streamer = TextIteratorStreamer(tokenizer=self.tokenizer, skip_prompt=True, skip_special_tokens=True)
self.chat_template = None
logger.info("Done loading!")

@ModelClass.method
def predict(self,
prompt: str ="",
chat_history: List[dict] = None,
max_tokens: int = Param(default=512, description="The maximum number of tokens to generate. Shorter token lengths will provide faster performance.", ),
temperature: float = Param(default=0.7, description="A decimal number that determines the degree of randomness in the response", ),
top_p: float = Param(default=0.8, description="An alternative to sampling with temperature, where the model considers the results of the tokens with top_p probability mass.", )) -> str:
"""
Predict the response for the given prompt and chat history using the model.
"""
# Construct chat-style messages
messages = chat_history if chat_history else []
if prompt:
messages.append({
"role": "user",
"content": [{"type": "text", "text": prompt}]
})

inputs = self.tokenizer.apply_chat_template(messages, tokenize=True, add_generation_prompt=True, return_tensors="pt").to(self.model.device)

generation_kwargs = {
"input_ids": inputs,
"do_sample": True,
"max_new_tokens": max_tokens,
"temperature": temperature,
"top_p": top_p,
"eos_token_id": self.tokenizer.eos_token_id,
}

output = self.model.generate(**generation_kwargs)
generated_tokens = output[0][inputs.shape[-1]:]
return self.tokenizer.decode(generated_tokens, skip_special_tokens=True)

@ModelClass.method
def generate(self,
prompt: str="",
chat_history: List[dict] = None,
max_tokens: int = Param(default=512, description="The maximum number of tokens to generate. Shorter token lengths will provide faster performance.", ),
temperature: float = Param(default=0.7, description="A decimal number that determines the degree of randomness in the response", ),
top_p: float = Param(default=0.8, description="An alternative to sampling with temperature, where the model considers the results of the tokens with top_p probability mass.", )) -> Iterator[str]:
"""Stream generated text tokens from a prompt + optional chat history."""


# Construct chat-style messages
messages = chat_history if chat_history else []
if prompt:
messages.append({
"role": "user",
"content": [{"type": "text", "text": prompt}]
})

response = self.chat(
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p
)

for each in response:
if 'choices' in each and 'delta' in each['choices'][0] and 'content' in each['choices'][0]['delta']:
yield each['choices'][0]['delta']['content']


@ModelClass.method
def chat(self,
messages: List[dict],
max_tokens: int = Param(default=512, description="The maximum number of tokens to generate. Shorter token lengths will provide faster performance.", ),
temperature: float = Param(default=0.7, description="A decimal number that determines the degree of randomness in the response", ),
top_p: float = Param(default=0.8, description="An alternative to sampling with temperature, where the model considers the results of the tokens with top_p probability mass.", )
) -> Iterator[dict]:
"""
Stream back JSON dicts for assistant messages.
Example return format:
{"role": "assistant", "content": [{"type": "text", "text": "response here"}]}
"""
# Tokenize using chat template
inputs = self.tokenizer.apply_chat_template(
messages,
tokenize=True,
add_generation_prompt=True,
return_tensors="pt"
).to(self.model.device)

generation_kwargs = {
"input_ids": inputs,
"do_sample": True,
"max_new_tokens": max_tokens,
"temperature": temperature,
"top_p": top_p,
"eos_token_id": self.tokenizer.eos_token_id,
"streamer": self.streamer
}

thread = Thread(target=self.model.generate, kwargs=generation_kwargs)
thread.start()

# Accumulate response text
for chunk in openai_response(self.streamer):
yield chunk

thread.join()


def test(self):
"""Test the model here."""
try:
print("Testing predict...")
# Test predict
print(self.predict(prompt="What is the capital of India?",))
except Exception as e:
print("Error in predict", e)

try:
print("Testing generate...")
# Test generate
for each in self.generate(prompt="What is the capital of India?",):
print(each, end="")
print()
except Exception as e:
print("Error in generate", e)

try:
print("Testing chat...")
messages = [
{"role": "system", "content": "You are an helpful assistant."},
{"role": "user", "content": "What is the capital of India?"},
]
for each in self.chat(messages=messages,):
print(each, end="")
print()
except Exception as e:
print("Error in generate", e)
requirements.txt
torch==2.5.1
tokenizers>=0.21.0
transformers>=4.47.0
accelerate>=1.2.0
scipy==1.10.1
optimum>=1.23.3
protobuf==5.27.3
einops>=0.8.0
requests==2.32.3
clarifai>=11.4.1
config.yaml
model:
id: "Llama-3_2-1B-Instruct"
user_id: "user_id"
app_id: "app_id"
model_type_id: "text-to-text"

build_info:
python_version: "3.11"

inference_compute_info:
cpu_limit: "1"
cpu_memory: "13Gi"
num_accelerators: 1
accelerator_type: ["NVIDIA-*"]
accelerator_memory: "44Gi"

checkpoints:
type: "huggingface"
repo_id: "unsloth/Llama-3.2-1B-Instruct"
hf_token: "hf_token"
when: "runtime"

LLaMA 3.2 3B Instruct (LMDeploy)

1/model.py
import os
import sys

from typing import Iterator, List
import json

from clarifai.runners.models.model_builder import ModelBuilder
from clarifai.runners.models.openai_class import OpenAIModelClass
from clarifai.utils.logging import logger
from clarifai.runners.utils.data_utils import Param
from clarifai.runners.utils.openai_convertor import build_openai_messages

from openai import OpenAI


PYTHON_EXEC = sys.executable

def lmdeploy_openai_server(checkpoints, **kwargs):
"""Start lmdeploy OpenAI compatible server."""

from clarifai.runners.utils.model_utils import execute_shell_command, wait_for_server, terminate_process
# Start building the command
cmds = [
PYTHON_EXEC, '-m', 'lmdeploy', 'serve', 'api_server', checkpoints,
]
# Add all parameters from kwargs to the command
for key, value in kwargs.items():
if value is None: # Skip None values
continue
param_name = key.replace('_', '-')
if isinstance(value, bool):
if value: # Only add the flag if True
cmds.append(f'--{param_name}')
else:
cmds.extend([f'--{param_name}', str(value)])
# Create server instance
server = type('Server', (), {
'host': kwargs.get('server_name', '0.0.0.0'),
'port': kwargs.get('server_port', 23333),
'backend': "lmdeploy",
'process': None
})()

try:
server.process = execute_shell_command(" ".join(cmds))
logger.info("Waiting for " + f"http://{server.host}:{server.port}")
wait_for_server(f"http://{server.host}:{server.port}")
logger.info("Server started successfully at " + f"http://{server.host}:{server.port}")
except Exception as e:
logger.error(f"Failed to start lmdeploy server: {str(e)}")
if server.process:
terminate_process(server.process)
raise RuntimeError(f"Failed to start lmdeploy server: {str(e)}")

return server


class LMDeployModel(OpenAIModelClass):
"""
A custom runner that integrates with the Clarifai platform and uses lmdeploy framework for inference
"""
client = True
model = True
def load_model(self):
"""Load the model here and start the server."""

# server args were generated by `upload` module
server_args = {'backend': 'turbomind', 'cache_max_entry_count': 0.95,
'tp': 1, 'max_prefill_token_num': 8192, 'dtype': 'auto',
'model_format': None, 'quant_policy': 0, 'chat_template': 'llama3_2', 'max_batch_size': 16,
'device': 'cuda', 'server_name': '0.0.0.0', 'server_port': 23333,
'tool_call_parser': 'llama3',
}
model_path = os.path.dirname(os.path.dirname(__file__))
builder = ModelBuilder(model_path, download_validation_only=True)
model_config = builder.config

stage = model_config["checkpoints"]['when']
checkpoints = builder.config["checkpoints"]['repo_id']
if stage in ["build", "runtime"]:
checkpoints = builder.download_checkpoints(stage=stage)

# Start server
self.server = lmdeploy_openai_server(checkpoints, **server_args)
# Create client
self.client = OpenAI(
api_key="notset",
base_url= f"http://{self.server.host}:{self.server.port}/v1")
self.model = self.client.models.list().data[0].id


@OpenAIModelClass.method
def predict(self,
prompt: str,
chat_history: List[dict] = None,
tools: List[dict] = None,
tool_choice: str = None,
max_tokens: int = Param(default=512, description="The maximum number of tokens to generate. Shorter token lengths will provide faster performance.", ),
temperature: float = Param(default=0.7, description="A decimal number that determines the degree of randomness in the response", ),
top_p: float = Param(default=0.95, description="An alternative to sampling with temperature, where the model considers the results of the tokens with top_p probability mass."),
) -> str:
"""
This method is used to predict the response for the given prompt and chat history using the model and tools.
"""

if tools is not None and tool_choice is None:
tool_choice = "auto"

messages = build_openai_messages(prompt=prompt, messages=chat_history)
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=tools,
tool_choice=tool_choice,
max_completion_tokens=max_tokens,
temperature=temperature,
top_p=top_p)

if response.choices[0] and response.choices[0].message.tool_calls:
# If the response contains tool calls, return as a string
tool_calls = response.choices[0].message.tool_calls
tool_calls_json = json.dumps([tc.to_dict() for tc in tool_calls], indent=2)
return tool_calls_json
else:
# Otherwise, return the content of the first choice
return response.choices[0].message.content


@OpenAIModelClass.method
def generate(self,
prompt: str,
chat_history: List[dict] = None,
tools: List[dict] = None,
tool_choice: str = None,
max_tokens: int = Param(default=512, description="The maximum number of tokens to generate. Shorter token lengths will provide faster performance.", ),
temperature: float = Param(default=0.7, description="A decimal number that determines the degree of randomness in the response", ),
top_p: float = Param(default=0.95, description="An alternative to sampling with temperature, where the model considers the results of the tokens with top_p probability mass.")) -> Iterator[str]:
"""
This method is used to stream generated text tokens from a prompt + optional chat history and tools.
"""
messages = build_openai_messages(prompt=prompt, messages=chat_history)
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=tools,
tool_choice=tool_choice,
max_completion_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stream=True)

for chunk in response:
if chunk.choices:
if chunk.choices[0].delta.tool_calls:
# If the response contains tool calls, return the first one as a string
tool_calls = chunk.choices[0].delta.tool_calls
tool_calls_json = [tc.to_dict() for tc in tool_calls]
# Convert to JSON string
json_string = json.dumps(tool_calls_json, indent=2)
# Yield the JSON string
yield json_string
else:
# Otherwise, return the content of the first choice
text = (chunk.choices[0].delta.content
if (chunk and chunk.choices[0].delta.content) is not None else '')
yield text

# This method is needed to test the model with the test-locally CLI command.
def test(self):
"""Test the model here."""
try:
print("Testing predict...")
# Test predict
print(self.predict(prompt="Hello, how are you?",))
except Exception as e:
print("Error in predict", e)

try:
print("Testing generate...")
# Test generate
for each in self.generate(prompt="Hello, how are you?",):
print(each, end=" ")
except Exception as e:
print("Error in generate", e)
requirements.txt
tokenizers>=0.21.0
accelerate>=1.2.0
optimum>=1.23.3
einops>=0.8.0
packaging
ninja

timm
openai
clarifai
clarifai-protocol
psutil

torch==2.6.0
lmdeploy==0.8.0
transformers>=4.51.2
partial-json-parser
config.yaml
model:
id: Llama-3_2-3B-Instruct
user_id:
app_id:
model_type_id: text-to-text
build_info:
python_version: '3.11'
inference_compute_info:
cpu_limit: '1'
cpu_memory: 6Gi
num_accelerators: 1
accelerator_type:
- NVIDIA-*
accelerator_memory: 20Gi
num_threads: 64
checkpoints:
type: huggingface
repo_id: meta-llama/Llama-3.2-3B-Instruct
hf_token:
when: runtime

Gemma 3 1B Instruct (vLLM)

1/model.py
import os
import sys

from typing import List, Iterator

from clarifai.runners.models.model_builder import ModelBuilder
from clarifai.runners.models.openai_class import OpenAIModelClass
from openai import OpenAI
from clarifai.runners.utils.openai_convertor import build_openai_messages
from clarifai.runners.utils.data_utils import Param
from clarifai.utils.logging import logger

PYTHON_EXEC = sys.executable

def vllm_openai_server(checkpoints, **kwargs):
"""Start vLLM OpenAI compatible server."""

from clarifai.runners.utils.model_utils import execute_shell_command, wait_for_server, terminate_process
# Start building the command
cmds = [
PYTHON_EXEC, '-m', 'vllm.entrypoints.openai.api_server', '--model', checkpoints,
]
# Add all parameters from kwargs to the command
for key, value in kwargs.items():
if value is None: # Skip None values
continue
param_name = key.replace('_', '-')
if isinstance(value, bool):
if value: # Only add the flag if True
cmds.append(f'--{param_name}')
else:
cmds.extend([f'--{param_name}', str(value)])
# Create server instance
server = type('Server', (), {
'host': kwargs.get('host', '0.0.0.0'),
'port': kwargs.get('port', 23333),
'backend': "vllm",
'process': None
})()

try:
server.process = execute_shell_command(" ".join(cmds))
logger.info("Waiting for " + f"http://{server.host}:{server.port}")
wait_for_server(f"http://{server.host}:{server.port}")
logger.info("Server started successfully at " + f"http://{server.host}:{server.port}")
except Exception as e:
logger.error(f"Failed to start vllm server: {str(e)}")
if server.process:
terminate_process(server.process)
raise RuntimeError(f"Failed to start vllm server: {str(e)}")

return server

class VLLMLlamaModel(OpenAIModelClass):
"""
A Model that integrates with the Clarifai platform and uses vLLM framework for inference to run the Llama 3.1 8B model with tool calling capabilities.
"""
client = True # This will be set in load_model method
model = True # This will be set in load_model method

def load_model(self):
"""Load the model here and start the server."""
os.path.join(os.path.dirname(__file__))
# This is the path to the chat template file and you can get this chat template from vLLM repo(https://github.com/vllm-project/vllm/blob/main/examples/tool_chat_template_llama3.1_json.jinja)

server_args = {
'max_model_len': 2048,
#'gpu_memory_utilization': 0.8,
'dtype': 'auto',
'task': 'auto',
'kv_cache_dtype': 'auto',
'tensor_parallel_size': 1,
'quantization': None,
'cpu_offload_gb': 5.0,
'chat_template': None,
'port': 23333,

'host': 'localhost',
}

model_path = os.path.dirname(os.path.dirname(__file__))
builder = ModelBuilder(model_path, download_validation_only=True)
model_config = builder.config

stage = model_config["checkpoints"]['when']
checkpoints = builder.config["checkpoints"]['repo_id']
if stage in ["build", "runtime"]:
checkpoints = builder.download_checkpoints(stage=stage)

# Start server
self.server = vllm_openai_server(checkpoints, **server_args)
# CLIent initialization
self.client = OpenAI(
api_key="notset",
base_url=f'http://{self.server.host}:{self.server.port}/v1')
self.model = self.client.models.list().data[0].id

@OpenAIModelClass.method
def predict(self,
prompt: str,
chat_history: List[dict] = None,
tools: List[dict] = None,
tool_choice: str = None,
max_tokens: int = Param(default=512, description="The maximum number of tokens to generate. Shorter token lengths will provide faster performance.", ),
temperature: float = Param(default=0.7, description="A decimal number that determines the degree of randomness in the response", ),
top_p: float = Param(default=0.95, description="An alternative to sampling with temperature, where the model considers the results of the tokens with top_p probability mass."),
) -> str:
"""
This method is used to predict the response for the given prompt and chat history using the model and tools.
"""
if tools is not None and tool_choice is None:
tool_choice = "auto"

messages = build_openai_messages(prompt=prompt, messages=chat_history)
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=tools,
tool_choice=tool_choice,
max_completion_tokens=max_tokens,
temperature=temperature,
top_p=top_p)

if response.choices[0] and response.choices[0].message.tool_calls:
import json
# If the response contains tool calls, return as a string

tool_calls = response.choices[0].message.tool_calls
tool_calls_json = json.dumps([tc.to_dict() for tc in tool_calls], indent=2)
return tool_calls_json
else:
# Otherwise, return the content of the first choice
return response.choices[0].message.content


@OpenAIModelClass.method
def generate(self,
prompt: str,
chat_history: List[dict] = None,
tools: List[dict] = None,
tool_choice: str = None,
max_tokens: int = Param(default=512, description="The maximum number of tokens to generate. Shorter token lengths will provide faster performance.", ),
temperature: float = Param(default=0.7, description="A decimal number that determines the degree of randomness in the response", ),
top_p: float = Param(default=0.95, description="An alternative to sampling with temperature, where the model considers the results of the tokens with top_p probability mass.")) -> Iterator[str]:
"""
This method is used to stream generated text tokens from a prompt + optional chat history and tools.
"""
messages = build_openai_messages(prompt=prompt, messages=chat_history)
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=tools,
tool_choice=tool_choice,
max_completion_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stream=True)

for chunk in response:
if chunk.choices:
if chunk.choices[0].delta.tool_calls:
# If the response contains tool calls, return the first one as a string
import json
tool_calls = chunk.choices[0].delta.tool_calls
tool_calls_json = [tc.to_dict() for tc in tool_calls]
# Convert to JSON string
json_string = json.dumps(tool_calls_json, indent=2)
# Yield the JSON string
yield json_string
else:
# Otherwise, return the content of the first choice
text = (chunk.choices[0].delta.content
if (chunk and chunk.choices[0].delta.content) is not None else '')
yield text
requirements.txt
tokenizers==0.21.0
accelerate==1.2.0
optimum==1.23.3
# xformers
einops==0.8.0
packaging
ninja

qwen-vl-utils==0.0.8
timm
openai
clarifai
psutil

torch==2.6.0
vllm==0.8.0
transformers==4.50.1
backoff==2.2.1
peft>=0.13.2
soundfile>=0.13.1
scipy==1.15.2
librosa
decord
config.yaml
build_info:
python_version: '3.12'
checkpoints:
hf_token: "<your_hf_token>"
repo_id: google/gemma-3-1b-it
type: huggingface
when: runtime
inference_compute_info:
accelerator_memory: 5Gi
accelerator_type:
- NVIDIA-*
cpu_limit: '1'
cpu_memory: 5Gi
num_accelerators: 1
model:
app_id: APP_ID
id: MODEL_ID
model_type_id: text-to-text
user_id: USER_ID

LLaMA 3.1 8B Tool Calling (vLLM)

1/model.py
import os
import sys

from typing import List, Iterator

from clarifai.runners.models.model_builder import ModelBuilder
from clarifai.runners.models.openai_class import OpenAIModelClass
from openai import OpenAI
from clarifai.runners.utils.openai_convertor import build_openai_messages
from clarifai.runners.utils.data_utils import Param
from clarifai.utils.logging import logger

PYTHON_EXEC = sys.executable

def vllm_openai_server(checkpoints, **kwargs):
"""Start vLLM OpenAI compatible server."""

from clarifai.runners.utils.model_utils import execute_shell_command, wait_for_server, terminate_process
# Start building the command
cmds = [
PYTHON_EXEC, '-m', 'vllm.entrypoints.openai.api_server', '--model', checkpoints,
]
# Add all parameters from kwargs to the command
for key, value in kwargs.items():
if value is None: # Skip None values
continue
param_name = key.replace('_', '-')
if isinstance(value, bool):
if value: # Only add the flag if True
cmds.append(f'--{param_name}')
else:
cmds.extend([f'--{param_name}', str(value)])
# Create server instance
server = type('Server', (), {
'host': kwargs.get('host', '0.0.0.0'),
'port': kwargs.get('port', 23333),
'backend': "vllm",
'process': None
})()

try:
server.process = execute_shell_command(" ".join(cmds))
logger.info("Waiting for " + f"http://{server.host}:{server.port}")
wait_for_server(f"http://{server.host}:{server.port}")
logger.info("Server started successfully at " + f"http://{server.host}:{server.port}")
except Exception as e:
logger.error(f"Failed to start vllm server: {str(e)}")
if server.process:
terminate_process(server.process)
raise RuntimeError(f"Failed to start vllm server: {str(e)}")

return server

class VLLMLlamaModel(OpenAIModelClass):
"""
A Model that integrates with the Clarifai platform and uses vLLM framework for inference to run the Llama 3.1 8B model with tool calling capabilities.
"""
client = True # This will be set in load_model method
model = True # This will be set in load_model method

def load_model(self):
"""Load the model here and start the server."""
os.path.join(os.path.dirname(__file__))
# This is the path to the chat template file and you can get this chat template from vLLM repo(https://github.com/vllm-project/vllm/blob/main/examples/tool_chat_template_llama3.1_json.jinja)
chat_template = 'examples/tool_chat_template_llama3.1_json.jinja'

server_args = {
'max_model_len': 2048,
'gpu_memory_utilization': 0.8,
'dtype': 'auto',
'task': 'auto',
'kv_cache_dtype': 'auto',
'tensor_parallel_size': 1,
'quantization': None,
'chat_template': chat_template,
'cpu_offload_gb': 0.0,
'port': 23333,
'host': 'localhost',
"enable_auto_tool_choice": True,
'tool_call_parser': "llama3_json",
}

model_path = os.path.dirname(os.path.dirname(__file__))
builder = ModelBuilder(model_path, download_validation_only=True)
model_config = builder.config

stage = model_config["checkpoints"]['when']
checkpoints = builder.config["checkpoints"]['repo_id']
if stage in ["build", "runtime"]:
checkpoints = builder.download_checkpoints(stage=stage)

# Start server
self.server = vllm_openai_server(checkpoints, **server_args)
# CLIent initialization
self.client = OpenAI(
api_key="notset",
base_url=f'http://{self.server.host}:{self.server.port}/v1')
self.model = self.client.models.list().data[0].id

@OpenAIModelClass.method
def predict(self,
prompt: str,
chat_history: List[dict] = None,
tools: List[dict] = None,
tool_choice: str = None,
max_tokens: int = Param(default=512, description="The maximum number of tokens to generate. Shorter token lengths will provide faster performance.", ),
temperature: float = Param(default=0.7, description="A decimal number that determines the degree of randomness in the response", ),
top_p: float = Param(default=0.95, description="An alternative to sampling with temperature, where the model considers the results of the tokens with top_p probability mass."),
) -> str:
"""
This method is used to predict the response for the given prompt and chat history using the model and tools.
"""
if tools is not None and tool_choice is None:
tool_choice = "auto"

messages = build_openai_messages(prompt=prompt, messages=chat_history)
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=tools,
tool_choice=tool_choice,
max_completion_tokens=max_tokens,
temperature=temperature,
top_p=top_p)

if response.choices[0] and response.choices[0].message.tool_calls:
import json
# If the response contains tool calls, return as a string

tool_calls = response.choices[0].message.tool_calls
tool_calls_json = json.dumps([tc.to_dict() for tc in tool_calls], indent=2)
return tool_calls_json
else:
# Otherwise, return the content of the first choice
return response.choices[0].message.content


@OpenAIModelClass.method
def generate(self,
prompt: str,
chat_history: List[dict] = None,
tools: List[dict] = None,
tool_choice: str = None,
max_tokens: int = Param(default=512, description="The maximum number of tokens to generate. Shorter token lengths will provide faster performance.", ),
temperature: float = Param(default=0.7, description="A decimal number that determines the degree of randomness in the response", ),
top_p: float = Param(default=0.95, description="An alternative to sampling with temperature, where the model considers the results of the tokens with top_p probability mass.")) -> Iterator[str]:
"""
This method is used to stream generated text tokens from a prompt + optional chat history and tools.
"""
messages = build_openai_messages(prompt=prompt, messages=chat_history)
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=tools,
tool_choice=tool_choice,
max_completion_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stream=True)

for chunk in response:
if chunk.choices:
if chunk.choices[0].delta.tool_calls:
# If the response contains tool calls, return the first one as a string
import json
tool_calls = chunk.choices[0].delta.tool_calls
tool_calls_json = [tc.to_dict() for tc in tool_calls]
# Convert to JSON string
json_string = json.dumps(tool_calls_json, indent=2)
# Yield the JSON string
yield json_string
else:
# Otherwise, return the content of the first choice
text = (chunk.choices[0].delta.content
if (chunk and chunk.choices[0].delta.content) is not None else '')
yield text
requirements.txt
tokenizers==0.21.0
accelerate==1.2.0
optimum==1.23.3
# xformers
einops==0.8.0
packaging
ninja

qwen-vl-utils==0.0.8
timm
openai
clarifai
psutil

torch==2.6.0
vllm==0.8.0
transformers==4.50.1
backoff==2.2.1
peft>=0.13.2
soundfile>=0.13.1
scipy==1.15.2
librosa
decord
config.yaml
model:
id: "llama-3_1-8B-instruct-tool-calling"
user_id: "user_id"
app_id: "app_id"
model_type_id: "text-to-text"

build_info:
python_version: '3.12'

inference_compute_info:
cpu_limit: '1'
cpu_memory: 12Gi
num_accelerators: 1
accelerator_type: ["NVIDIA-*"]
accelerator_memory: 44Gi

checkpoints:
type: huggingface
repo_id: meta-llama/Llama-3.1-8B-Instruct
hf_token: "hf_token"
when: runtime

Local Runners

Ollama

1/model.py
import json
import os
from typing import Iterator, List

from clarifai.runners.models.openai_class import OpenAIModelClass
from clarifai.runners.utils.data_types import Image
from clarifai.runners.utils.data_utils import Param
from clarifai.runners.utils.openai_convertor import build_openai_messages
from clarifai.utils.logging import logger
from openai import OpenAI

# Set default host
if not os.environ.get('OLLAMA_HOST'):
os.environ["OLLAMA_HOST"] = '127.0.0.1:23333'
OLLAMA_HOST = os.environ.get('OLLAMA_HOST')

if not os.environ.get('OLLAMA_CONTEXT_LENGTH'):
# Set default context length if not set
os.environ["OLLAMA_CONTEXT_LENGTH"] = '8192' # Default context length for Llama 3.2
OLLAMA_CONTEXT_LENGTH = os.environ.get('OLLAMA_CONTEXT_LENGTH')


def run_ollama_server(model_name: str = 'llama3.2'):
"""
start the Ollama server.
"""
from clarifai.runners.utils.model_utils import execute_shell_command, terminate_process

try:
logger.info(f"Starting Ollama server in the host: {OLLAMA_HOST}")
start_process = execute_shell_command("ollama serve")
if start_process:
pull_model = execute_shell_command(f"ollama pull {model_name}")
logger.info(f"Model {model_name} pulled successfully.")
logger.info(f"Ollama server started successfully on {OLLAMA_HOST}")

except Exception as e:
logger.error(f"Error starting Ollama server: {e}")
if 'start_process' in locals():
terminate_process(start_process)
raise RuntimeError(f"Failed to start Ollama server: {e}")


# Check if Image has content before building messages
def has_image_content(image: Image) -> bool:
"""Check if Image object has either bytes or URL."""
return bool(getattr(image, 'url', None) or getattr(image, 'bytes', None))


class OllamaModelClass(OpenAIModelClass):
client = True
model = True

def load_model(self):
"""
Load the Ollama model.
"""
# set the model name here or via OLLAMA_MODEL_NAME
self.model = os.environ.get("OLLAMA_MODEL_NAME", 'llama3.2') #'devstral:latest')

# start ollama server
run_ollama_server(model_name=self.model)

self.client = OpenAI(api_key="notset", base_url=f"http://{OLLAMA_HOST}/v1")

logger.info(f"Ollama model loaded successfully: {self.model}")

@OpenAIModelClass.method
def predict(
self,
prompt: str,
image: Image = None,
images: List[Image] = None,
chat_history: List[dict] = None,
tools: List[dict] = None,
tool_choice: str = None,
max_tokens: int = Param(
default=2048,
description="The maximum number of tokens to generate. Shorter token lengths will provide faster performance.",
),
temperature: float = Param(
default=0.7,
description="A decimal number that determines the degree of randomness in the response",
),
top_p: float = Param(
default=0.95,
description="An alternative to sampling with temperature, where the model considers the results of the tokens with top_p probability mass.",
),
) -> str:
"""
This method is used to predict the response for the given prompt and chat history using the model and tools.
"""
if tools is not None and tool_choice is None:
tool_choice = "auto"

img_content = image if has_image_content(image) else None

messages = build_openai_messages(
prompt=prompt, image=img_content, images=images, messages=chat_history
)
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=tools,
tool_choice=tool_choice,
max_completion_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
)

if response.usage is not None:
self.set_output_context(
prompt_tokens=response.usage.prompt_tokens,
completion_tokens=response.usage.completion_tokens,
)
if len(response.choices) == 0:
# still need to send the usage back.
return ""

if response.choices[0] and response.choices[0].message.tool_calls:
# If the response contains tool calls, return as a string
tool_calls = response.choices[0].message.tool_calls
tool_calls_json = json.dumps([tc.to_dict() for tc in tool_calls], indent=2)
return tool_calls_json
else:
# Otherwise, return the content of the first choice
return response.choices[0].message.content

@OpenAIModelClass.method
def generate(
self,
prompt: str,
image: Image = None,
images: List[Image] = None,
chat_history: List[dict] = None,
tools: List[dict] = None,
tool_choice: str = None,
max_tokens: int = Param(
default=2048,
description="The maximum number of tokens to generate. Shorter token lengths will provide faster performance.",
),
temperature: float = Param(
default=0.7,
description="A decimal number that determines the degree of randomness in the response",
),
top_p: float = Param(
default=0.95,
description="An alternative to sampling with temperature, where the model considers the results of the tokens with top_p probability mass.",
),
) -> Iterator[str]:
"""
This method is used to stream generated text tokens from a prompt + optional chat history and tools.
"""
if tools is not None and tool_choice is None:
tool_choice = "auto"

img_content = image if has_image_content(image) else None

messages = build_openai_messages(
prompt=prompt, image=img_content, images=images, messages=chat_history
)
for chunk in self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=tools,
tool_choice=tool_choice,
max_completion_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stream=True,
stream_options={"include_usage": True},
):
if chunk.usage is not None:
if chunk.usage.prompt_tokens or chunk.usage.completion_tokens:
self.set_output_context(
prompt_tokens=chunk.usage.prompt_tokens,
completion_tokens=chunk.usage.completion_tokens,
)
if len(chunk.choices) == 0: # still need to send the usage back.
yield ""
if chunk.choices:
if chunk.choices[0].delta.tool_calls:
# If the response contains tool calls, return the first one as a string
import json

tool_calls = chunk.choices[0].delta.tool_calls
tool_calls_json = [tc.to_dict() for tc in tool_calls]
# Convert to JSON string
json_string = json.dumps(tool_calls_json, indent=2)
# Yield the JSON string
yield json_string
else:
# Otherwise, return the content of the first choice
text = (
chunk.choices[0].delta.content
if (chunk and chunk.choices[0].delta.content) is not None
else ''
)
yield text
requirements.txt
ollama
clarifai
openai
config.yaml
model:
app_id: local-dev-runner-app
id: local-dev-model
model_type_id: text-to-text
user_id: clarifai-user-id

build_info:
python_version: '3.12'

inference_compute_info:
cpu_limit: '3'
cpu_memory: 14Gi
num_accelerators: 0

MCP

Browser Tools

1/model.py
from typing import Annotated
from urllib.parse import urljoin, urlparse

from fastmcp import FastMCP # use fastmcp v2 not the built in mcp
from pydantic import Field

server = FastMCP(
"browser-tools-mcp-server",
instructions="Web browsing, scraping, and search tools for gathering information from the internet",
stateless_http=True,
)


def make_http_request(
url: str, method: str = "GET", headers: dict = None, timeout: int = 30
) -> tuple[bool, dict]:
"""Make an HTTP request with proper error handling."""
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

try:
# Setup session with retries
session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)

# Default headers
default_headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
if headers:
default_headers.update(headers)

response = session.request(method, url, headers=default_headers, timeout=timeout)

return True, {
"status_code": response.status_code,
"headers": dict(response.headers),
"text": response.text,
"url": response.url,
}
except Exception as e:
return False, {"error": str(e)}


@server.tool("fetch_webpage", description="Fetch and return the content of a webpage")
def fetch_webpage(
url: Annotated[str, Field(description="URL of the webpage to fetch")],
include_headers: Annotated[
bool, Field(description="Include HTTP headers in response")
] = False,
max_length: Annotated[
int, Field(description="Maximum content length to return", ge=100, le=50000)
] = 10000,
) -> str:
"""Fetch the content of a webpage and return the text."""
success, response = make_http_request(url)

if not success:
return f"Error fetching webpage: {response.get('error', 'Unknown error')}"

content = response.get('text', '')

# Truncate if too long
if len(content) > max_length:
content = (
content[:max_length]
+ f"\n\n... (content truncated, showing first {max_length} characters)"
)

result = f"URL: {response.get('url', url)}\n"
result += f"Status Code: {response.get('status_code')}\n\n"

if include_headers:
headers = response.get('headers', {})
result += "Headers:\n"
for key, value in headers.items():
result += f" {key}: {value}\n"
result += "\n"

result += "Content:\n"
result += content

return result


@server.tool("extract_text", description="Extract clean text from HTML content")
def extract_text(
url: Annotated[str, Field(description="URL of the webpage")],
remove_scripts: Annotated[bool, Field(description="Remove script and style tags")] = True,
max_length: Annotated[
int, Field(description="Maximum text length to return", ge=100, le=50000)
] = 5000,
) -> str:
"""Extract clean text from a webpage by removing HTML tags."""
success, response = make_http_request(url)

if not success:
return f"Error fetching webpage: {response.get('error', 'Unknown error')}"

try:
from bs4 import BeautifulSoup

soup = BeautifulSoup(response.get('text', ''), 'html.parser')

# Remove script and style elements if requested
if remove_scripts:
for script in soup(["script", "style"]):
script.decompose()

# Extract text
text = soup.get_text()

# Clean up whitespace
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = ' '.join(chunk for chunk in chunks if chunk)

# Truncate if too long
if len(text) > max_length:
text = (
text[:max_length]
+ f"\n\n... (text truncated, showing first {max_length} characters)"
)

return f"URL: {response.get('url', url)}\nExtracted Text:\n\n{text}"

except ImportError:
return "Error: BeautifulSoup4 not available for HTML parsing"
except Exception as e:
return f"Error extracting text: {str(e)}"


@server.tool("search_google", description="Search Google and return results")
def search_google(
query: Annotated[str, Field(description="Search query")],
num_results: Annotated[int, Field(description="Number of results to return", ge=1, le=20)] = 5,
safe_search: Annotated[bool, Field(description="Enable safe search")] = True,
) -> str:
"""Search Google and return search results. Note: This is a mock implementation."""
# This is a simplified mock implementation
# In production, you'd use Google Custom Search API or similar service

# Mock search results based on common queries
mock_results = {
"python": [
{
"title": "Welcome to Python.org",
"url": "https://www.python.org/",
"snippet": "The official home of the Python Programming Language",
},
{
"title": "Python Tutorial",
"url": "https://docs.python.org/3/tutorial/",
"snippet": "Python is an easy to learn, powerful programming language",
},
{
"title": "Learn Python",
"url": "https://www.learnpython.org/",
"snippet": "Learn Python programming with interactive tutorials",
},
],
"machine learning": [
{
"title": "Machine Learning | Coursera",
"url": "https://www.coursera.org/learn/machine-learning",
"snippet": "Learn Machine Learning online with courses from top universities",
},
{
"title": "Machine Learning - Wikipedia",
"url": "https://en.wikipedia.org/wiki/Machine_learning",
"snippet": "Machine learning is a method of data analysis that automates analytical model building",
},
{
"title": "Introduction to Machine Learning",
"url": "https://scikit-learn.org/",
"snippet": "Simple and efficient tools for predictive data analysis",
},
],
}

# Find relevant results
results = []
query_lower = query.lower()

for key, search_results in mock_results.items():
if any(word in query_lower for word in key.split()):
results.extend(search_results)

if not results:
# Generic results for unknown queries
results = [
{
"title": f"Search results for: {query}",
"url": "https://www.google.com/search?q=" + query.replace(" ", "+"),
"snippet": f"Various results related to {query}",
},
{
"title": f"Wikipedia: {query}",
"url": f"https://en.wikipedia.org/wiki/{query.replace(' ', '_')}",
"snippet": f"Wikipedia article about {query}",
},
]

# Limit results
results = results[:num_results]

# Format output
output = f"Google Search Results for '{query}':\n\n"
for i, result in enumerate(results, 1):
output += f"{i}. {result['title']}\n"
output += f" URL: {result['url']}\n"
output += f" {result['snippet']}\n\n"

return output


@server.tool("extract_links", description="Extract all links from a webpage")
def extract_links(
url: Annotated[str, Field(description="URL of the webpage")],
filter_domain: Annotated[
bool, Field(description="Only return links from the same domain")
] = False,
max_links: Annotated[
int, Field(description="Maximum number of links to return", ge=1, le=100)
] = 20,
) -> str:
"""Extract all links from a webpage."""
success, response = make_http_request(url)

if not success:
return f"Error fetching webpage: {response.get('error', 'Unknown error')}"

try:
from bs4 import BeautifulSoup

soup = BeautifulSoup(response.get('text', ''), 'html.parser')

# Find all links
links = []
base_domain = urlparse(url).netloc

for link in soup.find_all('a', href=True):
href = link['href']
text = link.get_text(strip=True)

# Convert relative URLs to absolute
if href.startswith('/'):
href = urljoin(url, href)
elif not href.startswith(('http://', 'https://')):
href = urljoin(url, href)

# Filter by domain if requested
if filter_domain:
link_domain = urlparse(href).netloc
if link_domain != base_domain:
continue

links.append({"url": href, "text": text or "No text"})

# Remove duplicates and limit
unique_links = []
seen_urls = set()
for link in links:
if link["url"] not in seen_urls:
unique_links.append(link)
seen_urls.add(link["url"])
if len(unique_links) >= max_links:
break

# Format output
output = f"Links extracted from {url} ({len(unique_links)} unique links):\n\n"
for i, link in enumerate(unique_links, 1):
output += f"{i}. {link['text']}\n"
output += f" URL: {link['url']}\n\n"

return output

except ImportError:
return "Error: BeautifulSoup4 not available for HTML parsing"
except Exception as e:
return f"Error extracting links: {str(e)}"


@server.tool("take_screenshot", description="Take a screenshot of a webpage")
def take_screenshot(
url: Annotated[str, Field(description="URL of the webpage")],
width: Annotated[int, Field(description="Screenshot width", ge=100, le=3840)] = 1280,
height: Annotated[int, Field(description="Screenshot height", ge=100, le=2160)] = 720,
full_page: Annotated[bool, Field(description="Capture full page")] = False,
) -> str:
"""Take a screenshot of a webpage using headless browser (mock implementation)."""
# This is a mock implementation
# In production, you'd use Selenium, Playwright, or similar

try:
# Mock screenshot functionality
screenshot_path = f"/tmp/screenshot_{url.replace('://', '_').replace('/', '_')}.png"

# Simulate screenshot creation
return (
f"Screenshot would be saved to: {screenshot_path}\n"
f"URL: {url}\n"
f"Dimensions: {width}x{height}\n"
f"Full page: {full_page}\n\n"
f"Note: This is a mock implementation. In production, this would use:\n"
f"- Selenium WebDriver\n"
f"- Playwright\n"
f"- Puppeteer\n"
f"- Or similar headless browser automation tools"
)

except Exception as e:
return f"Error taking screenshot: {str(e)}"


@server.tool(
"check_website_status",
description="Check if a website is accessible and get status information",
)
def check_website_status(
url: Annotated[str, Field(description="URL to check")],
check_ssl: Annotated[bool, Field(description="Check SSL certificate validity")] = True,
) -> str:
"""Check website accessibility and status."""
success, response = make_http_request(url, timeout=10)

if not success:
return f"Website is not accessible: {response.get('error', 'Unknown error')}"

status_code = response.get('status_code')
headers = response.get('headers', {})

result = f"Website Status for {url}:\n\n"
result += f"Status Code: {status_code}\n"
result += f"Status: {'✓ Accessible' if 200 <= status_code < 400 else '✗ Error'}\n"
result += f"Server: {headers.get('server', 'Unknown')}\n"
result += f"Content-Type: {headers.get('content-type', 'Unknown')}\n"
result += f"Content-Length: {headers.get('content-length', 'Unknown')}\n"
result += f"Last-Modified: {headers.get('last-modified', 'Unknown')}\n"

if check_ssl and url.startswith('https://'):
result += "\nSSL Status: ✓ HTTPS enabled"

return result


@server.tool("search_webpage_content", description="Search for specific content within a webpage")
def search_webpage_content(
url: Annotated[str, Field(description="URL of the webpage")],
search_term: Annotated[str, Field(description="Term to search for")],
case_sensitive: Annotated[bool, Field(description="Case sensitive search")] = False,
max_matches: Annotated[
int, Field(description="Maximum number of matches to return", ge=1, le=50)
] = 10,
) -> str:
"""Search for specific content within a webpage."""
success, response = make_http_request(url)

if not success:
return f"Error fetching webpage: {response.get('error', 'Unknown error')}"

try:
from bs4 import BeautifulSoup

soup = BeautifulSoup(response.get('text', ''), 'html.parser')
text = soup.get_text()

# Perform search
if not case_sensitive:
text = text.lower()
search_term = search_term.lower()

matches = []
lines = text.split('\n')

for line_num, line in enumerate(lines, 1):
if search_term in line:
# Get context around the match
context_start = max(0, line.find(search_term) - 50)
context_end = min(len(line), line.find(search_term) + len(search_term) + 50)
context = line[context_start:context_end]

matches.append(
{"line": line_num, "context": context.strip(), "full_line": line.strip()}
)

if len(matches) >= max_matches:
break

if not matches:
return f"No matches found for '{search_term}' in {url}"

result = f"Found {len(matches)} matches for '{search_term}' in {url}:\n\n"
for i, match in enumerate(matches, 1):
result += f"{i}. Line {match['line']}: ...{match['context']}...\n\n"

return result

except ImportError:
return "Error: BeautifulSoup4 not available for HTML parsing"
except Exception as e:
return f"Error searching content: {str(e)}"


# Static resource
@server.resource("config://browser_settings")
def get_browser_settings():
return {
"default_timeout": 30,
"max_content_length": 10000,
"supported_formats": ["html", "text", "json"],
"user_agent": "MCP-Browser-Tools/1.0",
"screenshot_formats": ["png", "jpg"],
}


# Dynamic resource template
@server.resource("site://{domain}/info")
def get_site_info(domain: str):
return {
"domain": domain,
"note": "Use check_website_status tool to get actual site information",
}


@server.prompt()
def web_research_prompt(research_type: str) -> str:
"""Generate prompts for web research tasks."""
prompts = {
"content": "To research web content:\n1. Use search_google to find relevant pages\n2. Use fetch_webpage to get page content\n3. Use extract_text for clean text extraction\n4. Use search_webpage_content to find specific information",
"links": "To analyze website links:\n1. Use extract_links to get all links\n2. Filter by domain if needed\n3. Check link status with check_website_status",
"monitoring": "To monitor websites:\n1. Use check_website_status for availability\n2. Use fetch_webpage to check content changes\n3. Use take_screenshot for visual monitoring",
}

return prompts.get(research_type, f"Web research guidance for: {research_type}")


from clarifai.runners.models.mcp_class import MCPModelClass


class MyModelClass(MCPModelClass):
def get_server(self) -> FastMCP:
return server
client.py
import asyncio
import os

from clarifai.urls.helper import ClarifaiUrlHelper
from fastmcp import Client
from fastmcp.client.transports import StreamableHttpTransport

PAT = os.environ['CLARIFAI_PAT']
url = ClarifaiUrlHelper().mcp_api_url() # get url from the current clarifai config

print(url)

transport = StreamableHttpTransport(url=url, headers={"Authorization": "Bearer " + PAT})


async def main():
print("=== Browser Tools MCP Server Examples ===\n")

async with Client(transport) as client:
# List available tools first
print("Available tools:")
tools = await client.list_tools()
for tool in tools:
print(f"- {tool.name}: {tool.description}")
print("\n" + "=" * 50 + "\n")

# Example 1: Fetch webpage content
print("1. Fetching webpage content:")
try:
result = await client.call_tool(
"fetch_webpage", {"url": "https://httpbin.org/get", "max_length": 2000}
)
print(result[0].text)
except Exception as e:
print(f"Error: {e}")
print("\n" + "=" * 50 + "\n")

# Example 2: Extract clean text from webpage
print("2. Extracting clean text:")
try:
result = await client.call_tool(
"extract_text", {"url": "https://httpbin.org/html", "max_length": 1000}
)
print(result[0].text)
except Exception as e:
print(f"Error: {e}")
print("\n" + "=" * 50 + "\n")

# Example 3: Search Google (mock results)
print("3. Searching Google (mock implementation):")
try:
result = await client.call_tool(
"search_google", {"query": "python programming", "num_results": 3}
)
print(result[0].text)
except Exception as e:
print(f"Error: {e}")
print("\n" + "=" * 50 + "\n")

# Example 4: Check website status
print("4. Checking website status:")
try:
result = await client.call_tool(
"check_website_status", {"url": "https://httpbin.org", "check_ssl": True}
)
print(result[0].text)
except Exception as e:
print(f"Error: {e}")
print("\n" + "=" * 50 + "\n")

# Example 5: Extract links from webpage
print("5. Extracting links from webpage:")
try:
result = await client.call_tool(
"extract_links", {"url": "https://httpbin.org", "max_links": 5}
)
print(result[0].text)
except Exception as e:
print(f"Error: {e}")
print("\n" + "=" * 50 + "\n")


if __name__ == "__main__":
asyncio.run(main())
requirements.txt
clarifai
anyio==4.9.0
mcp==1.9.0
fastmcp==2.3.4
requests>=2.31.0
beautifulsoup4==4.12.2
lxml==4.9.3
config.yaml
build_info:
python_version: '3.11'
inference_compute_info:
cpu_limit: 1000m
cpu_memory: 1Gi
num_accelerators: 0
model:
app_id: mcp-examples-app
id: browser-tools-mcp-server
model_type_id: text-to-text
user_id: mcp-examples-user

Code Execution

1/model.py
import io
import tarfile
from typing import Annotated, Any, Dict

import docker
from clarifai.runners.models.mcp_class import MCPModelClass
from fastmcp import FastMCP
from pydantic import Field

server = FastMCP(
"python-execution-server",
instructions="Execute Python code securely in Docker containers",
stateless_http=True,
)

_docker_client = None


def get_docker_client():
"""Get or create Docker client."""
global _docker_client
if _docker_client is None:
try:
_docker_client = docker.from_env()
_docker_client.ping()
except Exception as e:
# Try alternative connection methods for different Docker setups
try:
# TODO: replace base_url with your local machine's docker socket
_docker_client = docker.DockerClient(
base_url='unix:///Users/YOUR_USER_NAME/.rd/docker.sock'
)
_docker_client.ping()
except:
try:
_docker_client = docker.DockerClient(base_url='unix://var/run/docker.sock')
_docker_client.ping()
except:
raise Exception(f"Cannot connect to Docker daemon. Original error: {e}")
return _docker_client


def execute_python_code_fresh_container(code: str) -> Dict[str, Any]:
"""
Execute Python code in a fresh Docker container (OpenAI approach).
Each execution gets a completely clean environment.
"""
try:
client = get_docker_client()

# Pull Python image if not present
try:
client.images.get("python:3.11")
except docker.errors.ImageNotFound:
client.images.pull("python:3.11")

# Create a temporary tar archive containing the script (like OpenAI)
script_name = "script.py"
tarstream = io.BytesIO()
with tarfile.open(fileobj=tarstream, mode="w") as tar:
script_bytes = code.encode("utf-8")
tarinfo = tarfile.TarInfo(name=script_name)
tarinfo.size = len(script_bytes)
tar.addfile(tarinfo, io.BytesIO(script_bytes))
tarstream.seek(0)

# Start fresh container
container = client.containers.create("python:3.11", command="sleep infinity", detach=True)

try:
container.start()
# Put the script into the container
container.put_archive(path="/tmp", data=tarstream.read())
# Execute the script
exec_result = container.exec_run(f"python /tmp/{script_name}")

return {
"stdout": exec_result.output.decode("utf-8", errors='replace'),
"stderr": "",
"status": exec_result.exit_code,
}
finally:
# Always clean up container
container.remove(force=True)

except docker.errors.ContainerError as e:
return {"stdout": "", "stderr": str(e), "status": 1}
except Exception as e:
return {"stdout": "", "stderr": f"Execution error: {str(e)}", "status": 1}


def execute_with_packages(code: str, packages: list = None) -> Dict[str, Any]:
"""
Execute Python code with pre-installed packages.
This is the key enhancement over #1 - allows package installation.
"""
if packages:
# Prepend package installation to the code
install_code = "\n".join(
[
f"import subprocess; subprocess.run(['pip', 'install', '{pkg}'], check=True)"
for pkg in packages
]
)
full_code = f"{install_code}\n\n{code}"
else:
full_code = code

return execute_python_code_fresh_container(full_code)


@server.tool(
"execute_with_packages", description="Execute Python code with packages pre-installed"
)
def execute_with_packages_tool(
code: Annotated[str, Field(description="Python code to execute")],
packages: Annotated[
list[str], Field(description="List of packages to install before execution")
] = None,
) -> str:
"""
Execute Python code with specified packages installed on top of the base Python image.
This enables users to work with the full Python ecosystem.
Example: execute_with_packages("import requests; print(requests.get('https://httpbin.org/json').json())", ["requests"])
"""
if not code.strip():
return "Error: No code provided"

result = execute_with_packages(code, packages or [])

if result["status"] == 0:
output = "--- Execution Successful ---\n"
if packages:
output += f"Packages installed: {', '.join(packages)}\n\n"
if result["stdout"].strip():
output += result["stdout"]
else:
output += "(No output - use print() to see results)"
else:
output = f"--- Execution Error (status: {result['status']}) ---\n"
if result["stderr"].strip():
output += result["stderr"]
if result["stdout"].strip():
output += "\n--- Output ---\n" + result["stdout"]

return output


class MyModel(MCPModelClass):
def get_server(self) -> FastMCP:
"""Return the FastMCP server instance."""
return server
client.py
import asyncio
import os

from clarifai.urls.helper import ClarifaiUrlHelper
from fastmcp import Client
from fastmcp.client.transports import StreamableHttpTransport

PAT = os.environ['CLARIFAI_PAT']
url = ClarifaiUrlHelper().mcp_api_url()

transport = StreamableHttpTransport(url=url, headers={"Authorization": "Bearer " + PAT})


async def main():
async with Client(transport) as client:
# List available tools
print("=== Available Tools ===")
tools = await client.list_tools()
for tool in tools:
print(f"- {tool.name}: {tool.description}")
print()

# Test 1: execute_with_packages
print("=== Test 1: execute_with_packages ===")
code_with_pkg = 'import requests; print(f"Requests version: {requests.__version__}"); print("Package imported successfully!")'
result = await client.call_tool(
"execute_with_packages", {"code": code_with_pkg, "packages": ["requests"]}
)
print(result.content[0].text)


if __name__ == "__main__":
asyncio.run(main())
requirements.txt
# Clarifai SDK - required
clarifai
fastmcp
pydantic
docker
config.yaml
build_info:
python_version: '3.11'
inference_compute_info:
cpu_limit: 1
cpu_memory: 500Mi
num_accelerators: 0
model:
app_id: code-execution-app
id: code-execution-model
model_type_id: text-to-text
user_id: your_user_id

Code Execution Without Docker Version

1/model.py
import subprocess
import tempfile
import os
from typing import Annotated, Any, Dict

from clarifai.runners.models.mcp_class import MCPModelClass
from fastmcp import FastMCP
from pydantic import Field

server = FastMCP(
"python-execution-server",
instructions="Execute Python code using local Python environment",
stateless_http=True,
)


def execute_python_code_fresh_container(code: str) -> Dict[str, Any]:
"""
Execute Python code using local Python environment.
Each execution gets a clean temporary file.
"""
try:
# Create a temporary file for the code
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as temp_file:
temp_file.write(code)
temp_path = temp_file.name

try:
# Execute the script using subprocess
result = subprocess.run(
["python", temp_path],
capture_output=True,
text=True,
check=False
)

return {
"stdout": result.stdout,
"stderr": result.stderr,
"status": result.returncode,
}
finally:
# Clean up temp file
try:
os.unlink(temp_path)
except Exception:
pass

except Exception as e:
return {"stdout": "", "stderr": f"Execution error: {str(e)}", "status": 1}


def execute_with_packages(code: str, packages: list = None) -> Dict[str, Any]:
"""
Execute Python code with pre-installed packages.
This is the key enhancement over #1 - allows package installation.
"""
if packages:
# Prepend package installation to the code
install_code = "\n".join(
[
f"import subprocess; subprocess.run(['pip', 'install', '{pkg}'], check=True)"
for pkg in packages
]
)
full_code = f"{install_code}\n\n{code}"
else:
full_code = code

return execute_python_code_fresh_container(full_code)


@server.tool(
"execute_with_packages", description="Execute Python code with packages pre-installed"
)
def execute_with_packages_tool(
code: Annotated[str, Field(description="Python code to execute")],
packages: Annotated[
list[str], Field(description="List of packages to install before execution")
] = None,
) -> str:
"""
Execute Python code with specified packages installed on top of the base Python image.
This enables users to work with the full Python ecosystem.
Example: execute_with_packages("import requests; print(requests.get('https://httpbin.org/json').json())", ["requests"])
"""
if not code.strip():
return "Error: No code provided"

result = execute_with_packages(code, packages or [])

if result["status"] == 0:
output = "--- Execution Successful ---\n"
if packages:
output += f"Packages installed: {', '.join(packages)}\n\n"
if result["stdout"].strip():
output += result["stdout"]
else:
output += "(No output - use print() to see results)"
else:
output = f"--- Execution Error (status: {result['status']}) ---\n"
if result["stderr"].strip():
output += result["stderr"]
if result["stdout"].strip():
output += "\n--- Output ---\n" + result["stdout"]

return output


class MyModel(MCPModelClass):
def get_server(self) -> FastMCP:
"""Return the FastMCP server instance."""
return server
client.py
import asyncio
import os

from clarifai.urls.helper import ClarifaiUrlHelper
from fastmcp import Client
from fastmcp.client.transports import StreamableHttpTransport

PAT = os.environ['CLARIFAI_PAT']
url = ClarifaiUrlHelper().mcp_api_url()

transport = StreamableHttpTransport(url=url, headers={"Authorization": "Bearer " + PAT})


async def main():
async with Client(transport) as client:
# List available tools
print("=== Available Tools ===")
tools = await client.list_tools()
for tool in tools:
print(f"- {tool.name}: {tool.description}")
print()

# Test 1: execute_with_packages
print("=== Test 1: execute_with_packages ===")
code_with_pkg = 'import requests; print(f"Requests version: {requests.__version__}"); print("Package imported successfully!")'
result = await client.call_tool(
"execute_with_packages", {"code": code_with_pkg, "packages": ["requests"]}
)
print(result.content[0].text)


if __name__ == "__main__":
asyncio.run(main())
requirements.txt
# Clarifai SDK - required
clarifai==11.7.5
fastmcp
pydantic
config.yaml
build_info:
python_version: '3.11'
inference_compute_info:
cpu_limit: 1
cpu_memory: 500Mi
num_accelerators: 0
model:
app_id: code-execution-app-no-docker
id: code-execution-model-no-docker
model_type_id: text-to-text
user_id: your_user_id

Google Drive

1/model.py
import os
from typing import Annotated

from clarifai.utils.logging import logger
from fastmcp import FastMCP # use fastmcp v2 not the built in mcp
from pydantic import Field

server = FastMCP(
"google-drive-mcp-server",
instructions="Google Drive operations for file storage, sharing, and collaboration",
stateless_http=True,
)


def get_drive_service():
"""Create and return a Google Drive service object."""
try:
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build

# Define the scope
SCOPES = ['https://www.googleapis.com/auth/drive']

creds = None
# Check for existing token
if os.path.exists('token.json'):
creds = Credentials.from_authorized_user_file('token.json', SCOPES)

# If there are no valid credentials, use mock data
if not creds or not creds.valid:
logger.warning("Google Drive credentials not available, using mock data")
return None

service = build('drive', 'v3', credentials=creds)
return service
except ImportError:
logger.error("Google API client libraries not available")
return None
except Exception as e:
logger.error(f"Failed to create Drive service: {str(e)}")
return None


def handle_drive_operation(operation_func):
"""Decorator to handle Drive operations with proper error handling."""

def wrapper(*args, **kwargs):
try:
service = get_drive_service()
if not service:
# Return mock data if service not available
return operation_func(None, *args, **kwargs)

return operation_func(service, *args, **kwargs)
except Exception as e:
return f"Google Drive operation failed: {str(e)}"

return wrapper


@server.tool("drive_list_files", description="List files in Google Drive")
def drive_list_files(
folder_id: Annotated[str, Field(description="Folder ID to list files from (optional)")] = "",
file_type: Annotated[
str,
Field(
description="Filter by file type (document, spreadsheet, presentation, folder, etc.)"
),
] = "",
max_results: Annotated[
int, Field(description="Maximum number of files to return", ge=1, le=100)
] = 20,
order_by: Annotated[
str, Field(description="Order by: name, modifiedTime, createdTime")
] = "modifiedTime desc",
) -> str:
"""List files in Google Drive."""

@handle_drive_operation
def _list_files(service, folder_id, file_type, max_results, order_by):
if not service:
# Mock response
mock_files = [
{
"id": "1abc123",
"name": "Project Document.docx",
"mimeType": "application/vnd.google-apps.document",
"modifiedTime": "2024-01-15T10:30:00Z",
"size": "2048576",
"owners": [{"displayName": "John Doe"}],
},
{
"id": "2def456",
"name": "Budget Spreadsheet.xlsx",
"mimeType": "application/vnd.google-apps.spreadsheet",
"modifiedTime": "2024-01-14T14:22:00Z",
"size": "1048576",
"owners": [{"displayName": "Jane Smith"}],
},
{
"id": "3ghi789",
"name": "Presentation.pptx",
"mimeType": "application/vnd.google-apps.presentation",
"modifiedTime": "2024-01-13T09:15:00Z",
"size": "5242880",
"owners": [{"displayName": "Bob Johnson"}],
},
]

output = f"Google Drive Files (showing {len(mock_files)} files):\n\n"
for file in mock_files:
file_size_mb = int(file.get('size', 0)) / (1024 * 1024)
output += f"• {file['name']}\n"
output += f" ID: {file['id']}\n"
output += f" Type: {file['mimeType'].split('.')[-1] if '.' in file['mimeType'] else 'Google App'}\n"
output += f" Size: {file_size_mb:.2f} MB\n"
output += f" Modified: {file['modifiedTime']}\n"
output += f" Owner: {file['owners'][0]['displayName']}\n\n"

return output

# Real implementation would use service.files().list()
query_parts = []

if folder_id:
query_parts.append(f"'{folder_id}' in parents")

if file_type:
mime_type_map = {
"document": "application/vnd.google-apps.document",
"spreadsheet": "application/vnd.google-apps.spreadsheet",
"presentation": "application/vnd.google-apps.presentation",
"folder": "application/vnd.google-apps.folder",
"pdf": "application/pdf",
}
if file_type in mime_type_map:
query_parts.append(f"mimeType='{mime_type_map[file_type]}'")

query = " and ".join(query_parts) if query_parts else None

results = (
service.files()
.list(
q=query,
pageSize=max_results,
orderBy=order_by,
fields="files(id,name,mimeType,size,modifiedTime,owners)",
)
.execute()
)

files = results.get('files', [])

if not files:
return "No files found in Google Drive"

output = f"Google Drive Files ({len(files)} files):\n\n"
for file in files:
file_size = int(file.get('size', 0))
file_size_mb = file_size / (1024 * 1024) if file_size > 0 else 0

output += f"• {file['name']}\n"
output += f" ID: {file['id']}\n"
output += f" Type: {file['mimeType'].split('.')[-1] if '.' in file['mimeType'] else 'Google App'}\n"
output += f" Size: {file_size_mb:.2f} MB\n"
output += f" Modified: {file['modifiedTime']}\n"
if file.get('owners'):
output += f" Owner: {file['owners'][0]['displayName']}\n"
output += "\n"

return output

return _list_files(folder_id, file_type, max_results, order_by)


@server.tool("drive_upload_file", description="Upload a file to Google Drive")
def drive_upload_file(
file_path: Annotated[str, Field(description="Local file path to upload")],
file_name: Annotated[str, Field(description="Name for the file in Drive (optional)")] = "",
folder_id: Annotated[str, Field(description="Folder ID to upload to (optional)")] = "",
description: Annotated[str, Field(description="File description")] = "",
) -> str:
"""Upload a file to Google Drive."""

@handle_drive_operation
def _upload_file(service, file_path, file_name, folder_id, description):
if not os.path.exists(file_path):
return f"Error: File '{file_path}' does not exist"

if not service:
# Mock response
file_name = file_name or os.path.basename(file_path)
file_size = os.path.getsize(file_path)
file_size_mb = file_size / (1024 * 1024)

return (
f"Mock upload successful!\n"
f"File: {file_name}\n"
f"Size: {file_size_mb:.2f} MB\n"
f"Drive ID: mock-file-id-{hash(file_path) % 10000}\n"
f"Note: This is a mock response. Install Google API libraries for actual uploads."
)

from googleapiclient.http import MediaFileUpload

file_name = file_name or os.path.basename(file_path)

file_metadata = {'name': file_name, 'description': description}

if folder_id:
file_metadata['parents'] = [folder_id]

media = MediaFileUpload(file_path, resumable=True)

file = (
service.files()
.create(body=file_metadata, media_body=media, fields='id,name,size,webViewLink')
.execute()
)

file_size = int(file.get('size', 0))
file_size_mb = file_size / (1024 * 1024) if file_size > 0 else 0

return (
f"Successfully uploaded '{file_name}' to Google Drive\n"
f"File ID: {file['id']}\n"
f"Size: {file_size_mb:.2f} MB\n"
f"View Link: {file.get('webViewLink', 'N/A')}"
)

return _upload_file(file_path, file_name, folder_id, description)


@server.tool("drive_download_file", description="Download a file from Google Drive")
def drive_download_file(
file_id: Annotated[str, Field(description="Google Drive file ID")],
local_path: Annotated[str, Field(description="Local path to save the file")] = "",
export_format: Annotated[
str, Field(description="Export format for Google Docs (pdf, docx, etc.)")
] = "",
) -> str:
"""Download a file from Google Drive."""

@handle_drive_operation
def _download_file(service, file_id, local_path, export_format):
if not service:
# Mock response
local_path = local_path or f"downloaded_file_{file_id}"
return (
f"Mock download successful!\n"
f"File ID: {file_id}\n"
f"Saved to: {local_path}\n"
f"Note: This is a mock response. Install Google API libraries for actual downloads."
)

# Get file metadata
file_metadata = service.files().get(fileId=file_id).execute()
file_name = file_metadata['name']

if not local_path:
local_path = file_name

# Check if it's a Google Workspace file that needs to be exported
google_mime_types = [
'application/vnd.google-apps.document',
'application/vnd.google-apps.spreadsheet',
'application/vnd.google-apps.presentation',
]

if file_metadata['mimeType'] in google_mime_types:
if not export_format:
export_format = 'pdf' # Default export format

export_mime_types = {
'pdf': 'application/pdf',
'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
}

request = service.files().export_media(
fileId=file_id,
mimeType=export_mime_types.get(export_format, export_mime_types['pdf']),
)
else:
request = service.files().get_media(fileId=file_id)

# Download the file
import io

fh = io.BytesIO()
downloader = MediaIoBaseDownload(fh, request)
done = False
while done is False:
status, done = downloader.next_chunk()

# Save to local file
with open(local_path, 'wb') as f:
f.write(fh.getvalue())

file_size = os.path.getsize(local_path)
file_size_mb = file_size / (1024 * 1024)

return (
f"Successfully downloaded '{file_name}' from Google Drive\n"
f"Saved to: {local_path}\n"
f"Size: {file_size_mb:.2f} MB"
)

return _download_file(file_id, local_path, export_format)


@server.tool("drive_share_file", description="Share a file or folder in Google Drive")
def drive_share_file(
file_id: Annotated[str, Field(description="Google Drive file or folder ID")],
email: Annotated[str, Field(description="Email address to share with (optional)")] = "",
role: Annotated[
str, Field(description="Permission role: reader, writer, commenter")
] = "reader",
anyone_can_view: Annotated[
bool, Field(description="Make file viewable by anyone with the link")
] = False,
) -> str:
"""Share a file or folder in Google Drive."""

@handle_drive_operation
def _share_file(service, file_id, email, role, anyone_can_view):
if not service:
# Mock response
share_url = f"https://drive.google.com/file/d/{file_id}/view"
return (
f"Mock sharing successful!\n"
f"File ID: {file_id}\n"
f"Share URL: {share_url}\n"
f"Permissions: {role}\n"
f"Note: This is a mock response. Install Google API libraries for actual sharing."
)

permissions_created = []

# Share with specific email if provided
if email:
permission = {'type': 'user', 'role': role, 'emailAddress': email}

result = (
service.permissions()
.create(fileId=file_id, body=permission, sendNotificationEmail=True)
.execute()
)

permissions_created.append(f"Shared with {email} as {role}")

# Make viewable by anyone with link if requested
if anyone_can_view:
permission = {'type': 'anyone', 'role': 'reader'}

service.permissions().create(fileId=file_id, body=permission).execute()

permissions_created.append("Made viewable by anyone with the link")

# Get the file's web view link
file_metadata = service.files().get(fileId=file_id, fields='name,webViewLink').execute()

result = f"Successfully shared '{file_metadata['name']}'\n"
result += f"File ID: {file_id}\n"
result += f"Share URL: {file_metadata.get('webViewLink', 'N/A')}\n"

if permissions_created:
result += "\nPermissions:\n"
for perm in permissions_created:
result += f" • {perm}\n"

return result

return _share_file(file_id, email, role, anyone_can_view)


@server.tool("drive_create_folder", description="Create a new folder in Google Drive")
def drive_create_folder(
folder_name: Annotated[str, Field(description="Name for the new folder")],
parent_folder_id: Annotated[str, Field(description="Parent folder ID (optional)")] = "",
description: Annotated[str, Field(description="Folder description")] = "",
) -> str:
"""Create a new folder in Google Drive."""

@handle_drive_operation
def _create_folder(service, folder_name, parent_folder_id, description):
if not service:
# Mock response
return (
f"Mock folder creation successful!\n"
f"Folder name: {folder_name}\n"
f"Folder ID: mock-folder-id-{hash(folder_name) % 10000}\n"
f"Note: This is a mock response. Install Google API libraries for actual folder creation."
)

folder_metadata = {
'name': folder_name,
'mimeType': 'application/vnd.google-apps.folder',
'description': description,
}

if parent_folder_id:
folder_metadata['parents'] = [parent_folder_id]

folder = (
service.files().create(body=folder_metadata, fields='id,name,webViewLink').execute()
)

return (
f"Successfully created folder '{folder_name}'\n"
f"Folder ID: {folder['id']}\n"
f"View Link: {folder.get('webViewLink', 'N/A')}"
)

return _create_folder(folder_name, parent_folder_id, description)


@server.tool("drive_delete_file", description="Delete a file or folder from Google Drive")
def drive_delete_file(
file_id: Annotated[str, Field(description="Google Drive file or folder ID to delete")],
permanent: Annotated[bool, Field(description="Permanently delete (bypass trash)")] = False,
) -> str:
"""Delete a file or folder from Google Drive."""

@handle_drive_operation
def _delete_file(service, file_id, permanent):
if not service:
# Mock response
return (
f"Mock deletion successful!\n"
f"File ID: {file_id}\n"
f"Permanent: {permanent}\n"
f"Note: This is a mock response. Install Google API libraries for actual deletion."
)

# Get file metadata before deletion
try:
file_metadata = service.files().get(fileId=file_id, fields='name').execute()
file_name = file_metadata['name']
except:
return f"Error: File with ID '{file_id}' not found"

if permanent:
service.files().delete(fileId=file_id).execute()
return f"Permanently deleted '{file_name}' (ID: {file_id})"
else:
# Move to trash
service.files().update(fileId=file_id, body={'trashed': True}).execute()
return f"Moved '{file_name}' to trash (ID: {file_id})"

return _delete_file(file_id, permanent)


@server.tool("drive_search_files", description="Search for files in Google Drive")
def drive_search_files(
query: Annotated[str, Field(description="Search query")],
file_type: Annotated[str, Field(description="File type filter")] = "",
max_results: Annotated[int, Field(description="Maximum number of results", ge=1, le=100)] = 10,
) -> str:
"""Search for files in Google Drive."""

@handle_drive_operation
def _search_files(service, query, file_type, max_results):
if not service:
# Mock response
mock_results = [
{
"id": "search1",
"name": f"Document about {query}.docx",
"mimeType": "application/vnd.google-apps.document",
},
{
"id": "search2",
"name": f"{query} Spreadsheet.xlsx",
"mimeType": "application/vnd.google-apps.spreadsheet",
},
]

output = f"Search results for '{query}' (mock data):\n\n"
for i, file in enumerate(mock_results, 1):
output += f"{i}. {file['name']}\n"
output += f" ID: {file['id']}\n"
output += f" Type: {file['mimeType'].split('.')[-1]}\n\n"

return output

search_query = f"name contains '{query}'"

if file_type:
mime_type_map = {
"document": "application/vnd.google-apps.document",
"spreadsheet": "application/vnd.google-apps.spreadsheet",
"presentation": "application/vnd.google-apps.presentation",
}
if file_type in mime_type_map:
search_query += f" and mimeType='{mime_type_map[file_type]}'"

results = (
service.files()
.list(
q=search_query, pageSize=max_results, fields="files(id,name,mimeType,modifiedTime)"
)
.execute()
)

files = results.get('files', [])

if not files:
return f"No files found matching '{query}'"

output = f"Search results for '{query}' ({len(files)} files):\n\n"
for i, file in enumerate(files, 1):
output += f"{i}. {file['name']}\n"
output += f" ID: {file['id']}\n"
output += f" Type: {file['mimeType'].split('.')[-1] if '.' in file['mimeType'] else 'Google App'}\n"
output += f" Modified: {file['modifiedTime']}\n\n"

return output

return _search_files(query, file_type, max_results)


# Static resource
@server.resource("config://drive_settings")
def get_drive_settings():
return {
"api_version": "v3",
"scopes": ["https://www.googleapis.com/auth/drive"],
"supported_formats": ["pdf", "docx", "xlsx", "pptx"],
"max_upload_size": "5TB",
"permission_roles": ["reader", "writer", "commenter", "owner"],
}


# Dynamic resource template
@server.resource("drive://file/{file_id}/info")
def get_file_info(file_id: str):
return {
"file_id": file_id,
"note": "Use drive_list_files or other tools to get actual file information",
}


@server.prompt()
def drive_workflow_prompt(workflow_type: str) -> str:
"""Generate prompts for Google Drive workflows."""
prompts = {
"collaboration": "For collaboration workflow:\n1. Use drive_upload_file to share documents\n2. Use drive_share_file to set permissions\n3. Use drive_create_folder for organization\n4. Grant appropriate roles (reader, writer, commenter)",
"backup": "For backup workflow:\n1. Use drive_create_folder for organized storage\n2. Use drive_upload_file to backup important files\n3. Consider using drive_share_file for team access\n4. Regular sync using automated tools",
"document_management": "For document management:\n1. Create folder structure with drive_create_folder\n2. Upload files with drive_upload_file\n3. Use drive_search_files to find documents\n4. Share selectively with drive_share_file",
}

return prompts.get(workflow_type, f"Google Drive workflow guidance for: {workflow_type}")


from clarifai.runners.models.mcp_class import MCPModelClass


class MyModelClass(MCPModelClass):
def get_server(self) -> FastMCP:
return server
client.py
import asyncio
import os
import tempfile

from clarifai.urls.helper import ClarifaiUrlHelper
from fastmcp import Client
from fastmcp.client.transports import StreamableHttpTransport

PAT = os.environ['CLARIFAI_PAT']
url = ClarifaiUrlHelper().mcp_api_url() # get url from the current clarifai config

transport = StreamableHttpTransport(url=url, headers={"Authorization": "Bearer " + PAT})


async def main():
print("=== Google Drive MCP Server Examples ===\n")

async with Client(transport) as client:
# List available tools first
print("Available tools:")
tools = await client.list_tools()
for tool in tools:
print(f"- {tool.name}: {tool.description}")
print("\n" + "=" * 50 + "\n")

# Example 1: List files in Google Drive
print("1. Listing files in Google Drive:")
try:
result = await client.call_tool(
"drive_list_files", {"max_results": 5, "order_by": "modifiedTime desc"}
)
print(result[0].text)
except Exception as e:
print(f"Error: {e}")
print("\n" + "=" * 50 + "\n")

# Example 2: Search for files
print("2. Searching for files:")
try:
result = await client.call_tool(
"drive_search_files", {"query": "project", "max_results": 3}
)
print(result[0].text)
except Exception as e:
print(f"Error: {e}")
print("\n" + "=" * 50 + "\n")

# Example 3: Create a folder
print("3. Creating a folder:")
try:
result = await client.call_tool(
"drive_create_folder",
{"folder_name": "MCP Demo Folder", "description": "Folder created by MCP example"},
)
print(result[0].text)
except Exception as e:
print(f"Error: {e}")
print("\n" + "=" * 50 + "\n")

# Example 4: Upload a file (creates a temporary file for demo)
print("4. Uploading a file:")

# Create a temporary file for demonstration
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write("This is a test file for Google Drive upload demonstration.")
temp_file_path = f.name

try:
result = await client.call_tool(
"drive_upload_file",
{
"file_path": temp_file_path,
"file_name": "MCP Demo File.txt",
"description": "File uploaded by MCP example",
},
)
print(result[0].text)
except Exception as e:
print(f"Error: {e}")
finally:
# Clean up temporary file
os.unlink(temp_file_path)
print("\n" + "=" * 50 + "\n")

# Example 5: Share a file
print("5. Sharing a file:")
try:
result = await client.call_tool(
"drive_share_file", {"file_id": "mock-file-id-123", "anyone_can_view": True}
)
print(result[0].text)
except Exception as e:
print(f"Error: {e}")
print("\n" + "=" * 50 + "\n")

# Example 6: Download a file
print("6. Downloading a file:")
try:
result = await client.call_tool(
"drive_download_file",
{
"file_id": "mock-file-id-123",
"local_path": "downloaded_document.pdf",
"export_format": "pdf",
},
)
print(result[0].text)
except Exception as e:
print(f"Error: {e}")
print("\n" + "=" * 50 + "\n")

print("\n" + "=" * 50)
print("Note: Google Drive authentication required for actual operations:")
print("- Set up OAuth 2.0 credentials in Google Cloud Console")
print("- Download credentials.json file")
print("- Run authentication flow to generate token.json")
print("- This example uses mock data for demonstration")
print("=" * 50)


if __name__ == "__main__":
asyncio.run(main())
requirements.txt
clarifai
anyio==4.9.0
mcp==1.9.0
fastmcp==2.3.4
google-api-python-client==2.111.0
google-auth-httplib2==0.2.0
google-auth-oauthlib==1.1.0
config.yaml
build_info:
python_version: '3.11'
inference_compute_info:
cpu_limit: 1000m
cpu_memory: 1Gi
num_accelerators: 0
model:
app_id: mcp-examples-app
id: google-drive-mcp-server
model_type_id: text-to-text
user_id: mcp-examples-user

Math

1/model.py
from fastmcp import FastMCP
from pydantic import Field
from typing import Annotated

from clarifai.runners.models.mcp_class import MCPModelClass

server = FastMCP("my-mcp-server", instructions="", stateless_http=True)


@server.tool("addition_tool", description="Add two numbers")
def addition_tool(
a: Annotated[float, Field(description="First number")],
b: Annotated[float, Field(description="Second number")],
) -> float:
"""Add two numbers"""
return a + b


@server.tool("subtraction_tool", description="Subtract two numbers")
def subtraction_tool(
a: Annotated[float, Field(description="First number")],
b: Annotated[float, Field(description="Second number")],
) -> float:
"""Subtract two numbers"""
return a - b


@server.tool("multiplication_tool", description="Multiply two numbers")
def multiplication_tool(
a: Annotated[float, Field(description="First number")],
b: Annotated[float, Field(description="Second number")],
) -> float:
"""Multiply two numbers"""
return a * b


@server.tool("division_tool", description="Divide two numbers")
def division_tool(
a: Annotated[float, Field(description="First number")],
b: Annotated[float, Field(description="Second number")],
) -> float:
"""Divide two numbers"""
return a / b


class MyModel(MCPModelClass):
def get_server(self) -> FastMCP:
"""Return the FastMCP server instance."""
return server
client.py
import asyncio
import os

from clarifai.urls.helper import ClarifaiUrlHelper
from fastmcp import Client
from fastmcp.client.transports import StreamableHttpTransport

PAT = os.environ['CLARIFAI_PAT']
url = ClarifaiUrlHelper().mcp_api_url() # get url from the current clarifai config

transport = StreamableHttpTransport(url=url, headers={"Authorization": "Bearer " + PAT})


async def main():
async with Client(transport) as client:
tools = await client.list_tools()
for tool in tools:
print(f"- {tool.name}: {tool.description}")

result = await client.call_tool("addition_tool", {"a": 10.1, "b": 5.2})
print(f"10.1 + 5.2 = {result.content[0].text}")

result = await client.call_tool("subtraction_tool", {"a": 10.1, "b": 3.2})
print(f"10.1 - 3.2 = {result.content[0].text}")

result = await client.call_tool("multiplication_tool", {"a": 4.1, "b": 7.2})
print(f"4.1 * 7.2 = {result.content[0].text}")

result = await client.call_tool("division_tool", {"a": 20.1, "b": 4.2})
print(f"20.1 / 4.2 = {result.content[0].text}")


if __name__ == "__main__":
asyncio.run(main())
requirements.txt
# Clarifai SDK - required
clarifai
fastmcp
pydantic
config.yaml
build_info:
python_version: '3.11'
inference_compute_info:
cpu_limit: 1
cpu_memory: 500Mi
num_accelerators: 0
model:
app_id: mcp-math-app
id: mcp-math-model
model_type_id: text-to-text
user_id: mcp-math-user

Postgres

1/model.py
import os
from typing import Annotated, Any

from clarifai.utils.logging import logger
from fastmcp import FastMCP # use fastmcp v2 not the built in mcp
from pydantic import Field

server = FastMCP(
"postgres-mcp-server",
instructions="PostgreSQL database operations and management",
stateless_http=True,
)


def get_postgres_connection(host: str, port: int, user: str, password: str, database: str):
"""Create a PostgreSQL connection. Returns connection object or None if failed."""
try:
import psycopg2

connection = psycopg2.connect(
host=host, port=port, user=user, password=password, database=database
)
connection.autocommit = True
return connection
except Exception as e:
logger.error(f"PostgreSQL connection failed: {str(e)}")
return None


def execute_postgres_query(connection, query: str, params: tuple = None) -> tuple[bool, Any]:
"""Execute a PostgreSQL query and return results."""
try:
from psycopg2.extras import RealDictCursor

cursor = connection.cursor(cursor_factory=RealDictCursor)
cursor.execute(query, params or ())

# Handle different query types
if query.strip().upper().startswith(('SELECT', 'WITH', 'SHOW', 'EXPLAIN')):
results = cursor.fetchall()
# Convert to list of dicts for JSON serialization
results = [dict(row) for row in results]
cursor.close()
return True, results
else:
# For INSERT, UPDATE, DELETE, etc.
affected_rows = cursor.rowcount
cursor.close()
return True, {"affected_rows": affected_rows, "message": "Query executed successfully"}

except Exception as e:
return False, {"error": str(e)}


@server.tool("postgres_connect", description="Test PostgreSQL database connection")
def postgres_connect(
host: Annotated[str, Field(description="PostgreSQL host")] = "localhost",
port: Annotated[int, Field(description="PostgreSQL port", ge=1, le=65535)] = 5432,
user: Annotated[str, Field(description="PostgreSQL username")] = "postgres",
password: Annotated[str, Field(description="PostgreSQL password")] = "",
database: Annotated[str, Field(description="Database name")] = "postgres",
) -> str:
"""Test connection to PostgreSQL database."""
connection = get_postgres_connection(host, port, user, password, database)

if connection:
try:
cursor = connection.cursor()
cursor.execute("SELECT version();")
version = cursor.fetchone()[0]
cursor.close()
connection.close()
return f"Successfully connected to PostgreSQL database '{database}' on {host}:{port}\nPostgreSQL Version: {version}"
except Exception as e:
connection.close()
return f"Connected but failed to get version: {str(e)}"
else:
return f"Failed to connect to PostgreSQL database '{database}' on {host}:{port}"


@server.tool("postgres_execute_query", description="Execute a PostgreSQL query")
def postgres_execute_query(
query: Annotated[str, Field(description="SQL query to execute")],
host: Annotated[str, Field(description="PostgreSQL host")] = "localhost",
port: Annotated[int, Field(description="PostgreSQL port", ge=1, le=65535)] = 5432,
user: Annotated[str, Field(description="PostgreSQL username")] = "postgres",
password: Annotated[str, Field(description="PostgreSQL password")] = "",
database: Annotated[str, Field(description="Database name")] = "postgres",
limit: Annotated[
int, Field(description="Limit results for SELECT queries", ge=1, le=1000)
] = 100,
) -> str:
"""Execute a SQL query on PostgreSQL database."""
connection = get_postgres_connection(host, port, user, password, database)

if not connection:
return f"Failed to connect to PostgreSQL database '{database}' on {host}:{port}"

try:
# Add LIMIT to SELECT queries if not already present
print("QQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQ")
print(query)

if query.strip().upper().startswith('SELECT') and 'LIMIT' not in query.upper():
l = limit
query = f"{query.strip().rstrip(';')} LIMIT {l};"

print(query)

success, result = execute_postgres_query(connection, query)
connection.close()

if not success:
return f"Query failed: {result.get('error', 'Unknown error')}"

if isinstance(result, list):
# Format SELECT results
if not result:
return "Query executed successfully. No rows returned."

# Create a formatted table
output = f"Query results ({len(result)} rows):\n\n"

if result:
# Get column names
columns = list(result[0].keys())

# Calculate column widths
col_widths = {}
for col in columns:
col_widths[col] = max(
len(col), max(len(str(row.get(col, ''))) for row in result)
)

# Create header
header = " | ".join(col.ljust(col_widths[col]) for col in columns)
separator = " | ".join("-" * col_widths[col] for col in columns)

output += header + "\n"
output += separator + "\n"

# Add data rows
for row in result:
row_str = " | ".join(
str(row.get(col, '')).ljust(col_widths[col]) for col in columns
)
output += row_str + "\n"

return output
else:
# Non-SELECT query result
return f"Query executed successfully. {result.get('message', '')} Affected rows: {result.get('affected_rows', 0)}"

except Exception as e:
if connection:
connection.close()
return f"Error executing query: {str(e)}"


@server.tool("postgres_list_tables", description="List all tables in the database")
def postgres_list_tables(
host: Annotated[str, Field(description="PostgreSQL host")] = "localhost",
port: Annotated[int, Field(description="PostgreSQL port", ge=1, le=65535)] = 5432,
user: Annotated[str, Field(description="PostgreSQL username")] = "postgres",
password: Annotated[str, Field(description="PostgreSQL password")] = "",
database: Annotated[str, Field(description="Database name")] = "postgres",
schema: Annotated[str, Field(description="Schema name")] = "public",
) -> str:
"""List all tables in the PostgreSQL database."""
query = f"""
SELECT table_name, table_type
FROM information_schema.tables
WHERE table_schema = '{schema}'
ORDER BY table_name;
"""
return postgres_execute_query(query, host, port, user, password, database)


@server.tool("postgres_describe_table", description="Describe the structure of a table")
def postgres_describe_table(
table_name: Annotated[str, Field(description="Table name to describe")],
host: Annotated[str, Field(description="PostgreSQL host")] = "localhost",
port: Annotated[int, Field(description="PostgreSQL port", ge=1, le=65535)] = 5432,
user: Annotated[str, Field(description="PostgreSQL username")] = "postgres",
password: Annotated[str, Field(description="PostgreSQL password")] = "",
database: Annotated[str, Field(description="Database name")] = "postgres",
schema: Annotated[str, Field(description="Schema name")] = "public",
) -> str:
"""Describe the structure of a PostgreSQL table."""
query = f"""
SELECT
column_name,
data_type,
character_maximum_length,
is_nullable,
column_default
FROM information_schema.columns
WHERE table_schema = '{schema}' AND table_name = '{table_name}'
ORDER BY ordinal_position;
"""
return postgres_execute_query(query, host, port, user, password, database)


@server.tool("postgres_list_databases", description="List all databases")
def postgres_list_databases(
host: Annotated[str, Field(description="PostgreSQL host")] = "localhost",
port: Annotated[int, Field(description="PostgreSQL port", ge=1, le=65535)] = 5432,
user: Annotated[str, Field(description="PostgreSQL username")] = "postgres",
password: Annotated[str, Field(description="PostgreSQL password")] = "",
) -> str:
"""List all databases on the PostgreSQL server."""
# Use postgres database as default for this query
query = "SELECT datname as database_name FROM pg_database WHERE datistemplate = false ORDER BY datname;"
return postgres_execute_query(query, host, port, user, password, "postgres")


@server.tool("postgres_table_stats", description="Get statistics for a table")
def postgres_table_stats(
table_name: Annotated[str, Field(description="Table name")],
host: Annotated[str, Field(description="PostgreSQL host")] = "localhost",
port: Annotated[int, Field(description="PostgreSQL port", ge=1, le=65535)] = 5432,
user: Annotated[str, Field(description="PostgreSQL username")] = "postgres",
password: Annotated[str, Field(description="PostgreSQL password")] = "",
database: Annotated[str, Field(description="Database name")] = "postgres",
schema: Annotated[str, Field(description="Schema name")] = "public",
) -> str:
"""Get statistics for a PostgreSQL table."""
query = f"""
SELECT
schemaname,
tablename,
attname as column_name,
n_distinct,
correlation
FROM pg_stats
WHERE schemaname = '{schema}' AND tablename = '{table_name}'
ORDER BY attname;
"""
return postgres_execute_query(query, host, port, user, password, database)


@server.tool("postgres_table_size", description="Get size information for tables")
def postgres_table_size(
host: Annotated[str, Field(description="PostgreSQL host")] = "localhost",
port: Annotated[int, Field(description="PostgreSQL port", ge=1, le=65535)] = 5432,
user: Annotated[str, Field(description="PostgreSQL username")] = "postgres",
password: Annotated[str, Field(description="PostgreSQL password")] = "",
database: Annotated[str, Field(description="Database name")] = "postgres",
schema: Annotated[str, Field(description="Schema name")] = "public",
) -> str:
"""Get size information for all tables in the schema."""
query = f"""
SELECT
table_name,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size,
pg_size_pretty(pg_relation_size(schemaname||'.'||tablename)) AS table_size,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename) - pg_relation_size(schemaname||'.'||tablename)) AS index_size
FROM pg_tables
WHERE schemaname = '{schema}'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
"""
return postgres_execute_query(query, host, port, user, password, database)


@server.tool("postgres_active_connections", description="Show active database connections")
def postgres_active_connections(
host: Annotated[str, Field(description="PostgreSQL host")] = "localhost",
port: Annotated[int, Field(description="PostgreSQL port", ge=1, le=65535)] = 5432,
user: Annotated[str, Field(description="PostgreSQL username")] = "postgres",
password: Annotated[str, Field(description="PostgreSQL password")] = "",
database: Annotated[str, Field(description="Database name")] = "postgres",
) -> str:
"""Show active connections to the PostgreSQL database."""
query = """
SELECT
pid,
usename,
datname,
client_addr,
application_name,
state,
query_start,
state_change
FROM pg_stat_activity
WHERE state = 'active'
ORDER BY query_start DESC;
"""
return postgres_execute_query(query, host, port, user, password, database)


@server.tool("postgres_create_backup", description="Create a backup using pg_dump")
def postgres_create_backup(
backup_type: Annotated[str, Field(description="Backup type: 'database' or 'table'")],
target_name: Annotated[str, Field(description="Database name or table name")],
host: Annotated[str, Field(description="PostgreSQL host")] = "localhost",
port: Annotated[int, Field(description="PostgreSQL port", ge=1, le=65535)] = 5432,
user: Annotated[str, Field(description="PostgreSQL username")] = "postgres",
password: Annotated[str, Field(description="PostgreSQL password")] = "",
database: Annotated[str, Field(description="Database name (for table backup)")] = "postgres",
) -> str:
"""Create a backup using pg_dump."""
import subprocess

try:
env = os.environ.copy()
env['PGPASSWORD'] = password

if backup_type == "database":
cmd = [
"pg_dump",
f"--host={host}",
f"--port={port}",
f"--username={user}",
"--format=custom",
"--no-password",
target_name,
]
backup_file = f"{target_name}_backup.dump"
elif backup_type == "table":
cmd = [
"pg_dump",
f"--host={host}",
f"--port={port}",
f"--username={user}",
"--format=custom",
"--no-password",
f"--table={target_name}",
database,
]
backup_file = f"{database}_{target_name}_backup.dump"
else:
return "Error: backup_type must be 'database' or 'table'"

# Execute pg_dump
with open(backup_file, 'wb') as f:
result = subprocess.run(
cmd, stdout=f, stderr=subprocess.PIPE, env=env, timeout=300, check=False
)

if result.returncode == 0:
return f"Backup created successfully: {backup_file}"
else:
return f"Backup failed: {result.stderr.decode()}"

except Exception as e:
return f"Backup failed: {str(e)}"


@server.tool("postgres_analyze_table", description="Analyze a table to update statistics")
def postgres_analyze_table(
table_name: Annotated[str, Field(description="Table name to analyze")],
host: Annotated[str, Field(description="PostgreSQL host")] = "localhost",
port: Annotated[int, Field(description="PostgreSQL port", ge=1, le=65535)] = 5432,
user: Annotated[str, Field(description="PostgreSQL username")] = "postgres",
password: Annotated[str, Field(description="PostgreSQL password")] = "",
database: Annotated[str, Field(description="Database name")] = "postgres",
) -> str:
"""Analyze a PostgreSQL table to update statistics."""
return postgres_execute_query(f"ANALYZE {table_name};", host, port, user, password, database)


# Static resource
@server.resource("config://postgres_settings")
def get_postgres_settings():
return {
"default_port": 5432,
"default_host": "localhost",
"default_schema": "public",
"max_query_limit": 1000,
"supported_operations": [
"SELECT",
"INSERT",
"UPDATE",
"DELETE",
"CREATE",
"DROP",
"ALTER",
],
"backup_formats": ["custom", "plain", "tar"],
}


# Dynamic resource template
@server.resource("database://{database_name}/schema_info")
def get_database_schema_info(database_name: str):
return {
"database": database_name,
"note": "Use postgres_list_tables and postgres_describe_table tools to get actual schema information",
}


@server.prompt()
def postgres_query_prompt(query_type: str) -> str:
"""Generate prompts for PostgreSQL query construction."""
prompts = {
"select": "To write a SELECT query:\nSELECT column1, column2 FROM schema.table_name WHERE condition ORDER BY column LIMIT n;",
"insert": "To write an INSERT query:\nINSERT INTO schema.table_name (column1, column2) VALUES (value1, value2);",
"update": "To write an UPDATE query:\nUPDATE schema.table_name SET column1 = value1 WHERE condition;",
"delete": "To write a DELETE query:\nDELETE FROM schema.table_name WHERE condition;",
"create": "To create a table:\nCREATE TABLE schema.table_name (column1 datatype constraints, column2 datatype constraints);",
"index": "To create an index:\nCREATE INDEX index_name ON schema.table_name (column1, column2);",
"jsonb": "For JSONB operations:\nSELECT data->>'key' FROM table WHERE data @> '{\"key\": \"value\"}';",
}

return prompts.get(query_type, f"PostgreSQL query guidance for: {query_type}")


from clarifai.runners.models.mcp_class import MCPModelClass


class MyModelClass(MCPModelClass):
def get_server(self) -> FastMCP:
return server
client.py
import asyncio
import os

from clarifai.urls.helper import ClarifaiUrlHelper
from fastmcp import Client
from fastmcp.client.transports import StreamableHttpTransport

PAT = os.environ['CLARIFAI_PAT']
url = ClarifaiUrlHelper().mcp_api_url() # get url from the current clarifai config

transport = StreamableHttpTransport(url=url, headers={"Authorization": "Bearer " + PAT})


async def main():
print("=== PostgreSQL MCP Server Examples ===\n")

# Note: These examples assume you have PostgreSQL credentials
# Set environment variables: POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_DATABASE

postgres_config = {
"host": os.environ.get("POSTGRES_HOST", "localhost"),
"port": os.environ.get("POSTGRES_PORT", "5432"),
"user": os.environ.get("POSTGRES_USER", "postgres"),
"password": os.environ.get("POSTGRES_PASSWORD", ""),
"database": os.environ.get("POSTGRES_DATABASE", "postgres"),
}

async with Client(transport) as client:
# List available tools first
print("Available tools:")
tools = await client.list_tools()
for tool in tools:
print(f"- {tool.name}: {tool.description}")
print("\n" + "=" * 50 + "\n")

# Example 1: Test PostgreSQL connection
print("1. Testing PostgreSQL connection:")
try:
result = await client.call_tool("postgres_connect", postgres_config)
print(result[0].text)
except Exception as e:
print(f"Error: {e}")
print("\n" + "=" * 50 + "\n")

# Example 2: List databases
print("2. Listing databases:")
try:
result = await client.call_tool(
"postgres_list_databases",
{
"host": postgres_config["host"],
"port": postgres_config["port"],
"user": postgres_config["user"],
"password": postgres_config["password"],
},
)
print(result[0].text)
except Exception as e:
print(f"Error: {e}")
print("\n" + "=" * 50 + "\n")

# Example 3: List tables in database
print("3. Listing tables:")
try:
result = await client.call_tool("postgres_list_tables", postgres_config)
print(result[0].text)
except Exception as e:
print(f"Error: {e}")
print("\n" + "=" * 50 + "\n")

# Example 4: Execute a sample query
print("4. Executing a sample query (SELECT version):")
try:
result = await client.call_tool(
"postgres_execute_query",
{
**postgres_config,
"query": "SELECT version() as postgres_version, current_timestamp as current_time;",
},
)
print(result[0].text)
except Exception as e:
print(f"Error: {e}")
print("\n" + "=" * 50 + "\n")

# Example 5: Get table sizes
print("5. Getting table sizes:")
try:
result = await client.call_tool("postgres_table_size", postgres_config)
print(result[0].text)
except Exception as e:
print(f"Error: {e}")
print("\n" + "=" * 50 + "\n")

print("\n" + "=" * 50)
print("Note: Set these environment variables for actual PostgreSQL connections:")
print("- POSTGRES_HOST (default: localhost)")
print("- POSTGRES_USER (default: postgres)")
print("- POSTGRES_PASSWORD")
print("- POSTGRES_DATABASE (default: postgres)")
print("=" * 50)


if __name__ == "__main__":
asyncio.run(main())
requirements.txt
clarifai
anyio==4.9.0
mcp==1.9.0
fastmcp==2.3.4
psycopg2-binary==2.9.9
config.yaml
build_info:
python_version: '3.11'
inference_compute_info:
cpu_limit: 1000m
cpu_memory: 1Gi
num_accelerators: 0
model:
app_id: mcp-examples-app
id: postgres-mcp-server
model_type_id: text-to-text
user_id: mcp-examples-user
1/model.py
from __future__ import annotations
import os, re
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Union
from urllib.parse import urlparse, parse_qs, unquote

import httpx # type: ignore
from bs4 import BeautifulSoup # type: ignore
from mcp.server.fastmcp import FastMCP, Context # type: ignore

USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0 Safari/537.36"
)

try:
# Optional SDK for DuckDuckGo (pip install duckduckgo_search)
from ddgs import DDGS # type: ignore
except ImportError:
DDGS = None

# ---------------- Data Models ----------------
@dataclass
class SearchResult:
id: int
title: str
url: str
snippet: str

@dataclass
class PageCache:
url: str
text_lines: List[str]

@dataclass
class SessionState:
last_query: Optional[str] = None
results: List[SearchResult] = field(default_factory=list)
opened_pages: Dict[str, PageCache] = field(default_factory=dict)
last_open_url: Optional[str] = None

async def ddg_sdk_search(query: str, topn: int, region: str, safesearch: str) -> List[SearchResult]:
"""
Use duckduckgo_search AsyncDDGS instead of manual HTML scraping.
Falls back to manual method if SDK not available.
"""
if DDGS is None:
raise RuntimeError("duckduckgo_search not installed")
# The library expects: region like 'wt-wt', safesearch one of: 'moderate','off','strict'
safe = safesearch.lower()
if safe not in {"moderate", "off", "strict"}:
safe = "moderate"
results: List[SearchResult] = []
try:
results_gen = DDGS().text(query, region=region, safesearch=safe, max_results=topn)
for r in results_gen:
# r keys commonly: title, href, body
title = (r.get("title") or "").strip()
url = (r.get("href") or "").strip()
snippet = (r.get("body") or "").strip()
if title and url.startswith("http"):
results.append(SearchResult(id=len(results) + 1, title=title, url=url, snippet=snippet))
if len(results) >= topn:
break
except Exception as e:
print(f"[ddg_sdk_search] Exception: {e}")
raise
return results

def _clean_duckduckgo_href(raw: str) -> str:
print(f"[_clean_duckduckgo_href] raw={raw}")
if not raw:
return ""
if raw.startswith("//"):
raw = "https:" + raw
p = urlparse(raw)
if p.netloc.endswith("duckduckgo.com"):
if p.path.startswith("/l/"):
qs = parse_qs(p.query)
if qs.get("uddg"):
target = unquote(qs["uddg"][0])
if "duckduckgo.com" in urlparse(target).netloc:
return ""
return target
if p.path.endswith(".js") or p.path.startswith("/y.js"):
return ""
if not p.scheme:
return ""
return raw

def parse_duckduckgo(html: str, topn: int) -> List[SearchResult]:
print(f"[parse_duckduckgo] Parsing HTML, topn={topn}")
soup = BeautifulSoup(html, "html.parser")
results: List[SearchResult] = []
for block in soup.select("div.result"):
a = block.select_one("a.result__a")
if not a:
continue
title = a.get_text(" ", strip=True)
url = _clean_duckduckgo_href(a.get("href") or "")
if not url or "duckduckgo.com" in urlparse(url).netloc:
continue
snip_el = block.select_one(".result__snippet, .snippet")
snippet = snip_el.get_text(" ", strip=True) if snip_el else ""
if not title or not url.startswith("http"):
continue
results.append(SearchResult(id=len(results) + 1, title=title, url=url, snippet=snippet))
if len(results) >= topn:
break
print(f"[parse_duckduckgo] Found {len(results)} results")
return results

def format_search_results(results: List[SearchResult]) -> str:
print(f"[format_search_results] Formatting {len(results)} results")
if not results:
return "No results."
return "\n\n".join(
f"[{r.id}] {r.title}\nURL: {r.url}\nSnippet: {r.snippet or '(no snippet)'}"
for r in results
)

async def fetch_page(url: str, timeout: int = 20) -> PageCache:
print(f"[fetch_page] Fetching URL: {url} with timeout={timeout}")
headers = {"User-Agent": USER_AGENT, "Accept-Language": "en-US,en;q=0.9"}
try:
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
r = await client.get(url, headers=headers)
print(f"[fetch_page] HTTP status: {r.status_code}")
r.raise_for_status()
html = r.text
except Exception as e:
print(f"[fetch_page] Exception: {e}")
raise
soup = BeautifulSoup(html, "html.parser")
for tag in soup(["script", "style", "noscript"]):
tag.decompose()
text = soup.get_text("\n")
lines = [ln.rstrip() for ln in text.splitlines() if ln.strip()]
print(f"[fetch_page] Extracted {len(lines)} lines")
return PageCache(url=url, text_lines=lines)

# ---------------- FastMCP Server (in-process) ----------------
mcp = FastMCP(
name="duckduckgo-browser",
instructions=r"""
Tool for browsing.
The `cursor` appears in brackets before each browsing display: `[{cursor}]`.
Cite information from the tool using the following format:
`【{cursor}†L{line_start}(-L{line_end})?】`, for example: `【6†L9-L11】` or `【8†L3】`.
Do not quote more than 10 words directly from the tool output.
sources=web
""".strip(),
port=8001,
)

@mcp.tool(
name="search",
title="Search for information",
description=
"Searches for information related to `query` and displays `topn` results.",
)
async def search(query: str, topn: int = 10,
region: str = "wt-wt", safesearch: str = "moderate") -> str:
print(f"[search] Query: {query}, topn={topn}, region={region}, safesearch={safesearch}")

try:
print("[search] Using DuckDuckGo SDK")
results = await ddg_sdk_search(query, topn, region, safesearch)

except Exception as e: # noqa: BLE001
print(f"[search] Exception: {e}")
return f"Search error: {e}"
return format_search_results(results)

@mcp.tool(
name="open",
title="Open a link or page",
description="""
Opens the link `id` from the page indicated by `cursor` starting at line number `loc`, showing `num_lines` lines.
Valid link ids are displayed with the formatting: `【{id}†.*】`.
If `cursor` is not provided, the most recent page is implied.
If `id` is a string, it is treated as a fully qualified URL associated with `source`.
If `loc` is not provided, the viewport will be positioned at the beginning of the document or centered on the most relevant passage, if available.
Use this function without `id` to scroll to a new location of an opened page.
""".strip(),
)
async def open(id: Union[int, str] = -1,
loc: int = 0, num_lines: int = 60) -> str:
print(f"[open] id={id}, loc={loc}, num_lines={num_lines}")
if isinstance(id, int) and id != -1:
print("[open] Invalid id (int but not -1)")
return "Result id not found. Run search."
else:
if not isinstance(id, str) or not id.startswith("http"):
print("[open] Invalid id (not a valid URL)")
return "Provide result id or full http(s) URL."
url = id
try:
page = await fetch_page(url)
except Exception as e: # noqa: BLE001
print(f"[open] Exception: {e}")
return f"Open failed: {e}"
total = len(page.text_lines)
if loc < 0: loc = 0
if num_lines <= 0: num_lines = 60
end = min(total, loc + num_lines)
body = "\n".join(f"L{loc+i+1}: {line}" for i, line in enumerate(page.text_lines[loc:end]))
print(f"[open] Returning lines {loc+1}-{end} of {total}")
return f"URL: {url}\nLines {loc+1}-{end} of {total}\n" + "-"*60 + "\n" + body

@mcp.tool(
name="find",
title="Find pattern in page",
description=
"Finds exact matches of `pattern` in the current page, or the page given by `cursor`.",
)
async def find(pattern: str, url: Optional[str] = None, max_matches: int = 50) -> str:
print(f"[find] pattern={pattern}, url={url}, max_matches={max_matches}")
if not url:
print("[find] No URL provided")
return "No page open."
try:
page = await fetch_page(url)
except Exception as e:
print(f"[find] Exception: {e}")
return f"Fetch failed: {e}"
rx = re.compile(re.escape(pattern), re.IGNORECASE)
hits: List[str] = []
for i, line in enumerate(page.text_lines, start=1):
if rx.search(line):
hits.append(f"L{i}: {rx.sub(lambda m: '**'+m.group(0)+'**', line)}")
if len(hits) >= max_matches:
break
print(f"[find] Found {len(hits)} matches")
return "\n".join(hits) if hits else f"No matches for '{pattern}'."

@mcp.resource("config://browser_search_settings")
def get_browser_search_settings() -> Dict[str, str]:
return {
"available_tools": ["search", "open", "find"],
}

@mcp.prompt()
def web_browsing_prompt(task_type: str) -> str:
"""
Generate a usage prompt for web browsing and extraction tasks, tailored to the available tools in this file.
"""
prompts = {
"content_research": (
"To research web content:\n"
"1. Use the 'search' tool to find relevant pages.\n"
"2. Use 'open' to read the content of a result or URL.\n"
"3. Use 'find' to locate specific information within an opened page."
),
"data_extraction": (
"To extract structured data:\n"
"1. Use 'search' to find pages with the data you need.\n"
"2. Use 'open' to load the page content.\n"
"3. Use 'find' to extract or locate specific patterns or fields."
),
"website_analysis": (
"To analyze a website:\n"
"1. Use 'search' to discover relevant pages.\n"
"2. Use 'open' to inspect the content of those pages.\n"
"3. Use 'find' to search for keywords, features, or patterns."
),
"competitive_research": (
"To research competitors:\n"
"1. Use 'search' to find competitor websites or pages.\n"
"2. Use 'open' to review their content.\n"
"3. Use 'find' to compare features, pricing, or other details."
),
"market_research": (
"To conduct market research:\n"
"1. Use 'search' for industry trends and news.\n"
"2. Use 'open' to read relevant articles or reports.\n"
"3. Use 'find' to extract market insights or statistics."
),
"content_monitoring": (
"To monitor web content:\n"
"1. Use 'search' to discover new or updated content.\n"
"2. Use 'open' to review the latest pages.\n"
"3. Use 'find' to detect changes or specific updates."
),
}
# Default fallback if task_type is not recognized
return prompts.get(
task_type,
(
"Web browsing guidance:\n"
"• Use 'search' to find information.\n"
"• Use 'open' to read a page.\n"
"• Use 'find' to locate details within a page."
),
)


from clarifai.runners.models.mcp_class import MCPModelClass

class MyBrowserSearchToolClass(MCPModelClass):
def get_server(self) -> FastMCP:
return mcp

# Main function to run the MCP server
if __name__ == "__main__":
import asyncio
import sys

# Simple approach - just run the server
try:
asyncio.run(mcp.run())
except KeyboardInterrupt:
print("Server stopped by user", file=sys.stderr)
sys.exit(0)
except Exception as e:
print(f"Server error: {e}", file=sys.stderr)
sys.exit(1)
client.py
import asyncio
import os
import json
from openai import AsyncOpenAI
from fastmcp import Client
from fastmcp.client.transports import StreamableHttpTransport
from clarifai.urls.helper import ClarifaiUrlHelper

API_KEY = os.getenv("CLARIFAI_PAT", "None") # Set env var; avoid hardcoding secrets.

transport = StreamableHttpTransport(
url=ClarifaiUrlHelper().mcp_api_url(),
headers={"Authorization": f"Bearer {API_KEY}"},
)

openai_client = AsyncOpenAI(
api_key=API_KEY,
base_url="https://api.clarifai.com/v2/ext/openai/v1"
)

MODEL_ID = "https://clarifai.com/openai/chat-completion/models/gpt-oss-120b"

def format_tools_to_openai_function(tools):
return [
{
"type": "function",
"function": {
"name": tool.name,
"description": f"[{tool.name}] {tool.description}",
"parameters": tool.inputSchema,
},
}
for tool in tools
]

async def first_model_call(messages, tools):
return await openai_client.chat.completions.create(
model=MODEL_ID,
messages=messages,
temperature=0.4,
tools=tools,
tool_choice="auto",
stream=False,
)

async def final_model_call(messages):
# Force answer without further tool use
return await openai_client.chat.completions.create(
model=MODEL_ID,
messages=messages,
temperature=0.4,
tool_choice="none",
stream=False,
)

async def run_two_step_answer(user_prompt: str):
# 1. Base messages
messages = [
{
"role": "system",
"content": (
"You can call tools only in the FIRST response if needed to gather info. "
"After tool results are provided, you MUST produce the final answer without more tool calls."
),
},
{"role": "user", "content": user_prompt},
]

# Load tool definitions
async with Client(transport) as client:
tools_raw = await client.list_tools()
tools = format_tools_to_openai_function(tools_raw)

# 2. First model pass (may request tool calls)
first_resp = await first_model_call(messages, tools)
first_msg = first_resp.choices[0].message
messages.append({
"role": "assistant",
"content": first_msg.content,
"tool_calls": getattr(first_msg, "tool_calls", None)
})

# If no tool calls, just answer now
if not first_msg.tool_calls:
print("Assistant (no tools needed):", first_msg.content)
return first_msg.content

# 3. Execute ALL requested tool calls once
async with Client(transport) as client:
for tool_call in first_msg.tool_calls:
tool_name = tool_call.function.name
raw_args = tool_call.function.arguments or "{}"
try:
args = json.loads(raw_args)
except json.JSONDecodeError:
args = {}
print(f"\n== Executing tool: {tool_name} | args: {raw_args}")
try:
result = await client.call_tool(tool_name, arguments=args)
# Collect text parts
if hasattr(result, "content"):
parts = [getattr(seg, "text", "") for seg in result.content if getattr(seg, "text", "")]
result_text = "\n".join(parts) if parts else str(result)
else:
result_text = str(result)
except Exception as e:
result_text = f"Tool {tool_name} failed: {e}"
print(result_text)

if len(result_text) > 4000:
result_text = result_text[:3500] + "\n...[truncated]..."
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result_text
})

# 4. Final model call (NO more tools)
messages.append({
"role": "system",
"content": "Use the tool outputs above to craft the final answer. Do not call tools again."
})
final_resp = await final_model_call(messages)
final_msg = final_resp.choices[0].message.content
print("\nFinal Answer:\n", final_msg)
return final_msg

async def main():
await run_two_step_answer("Who won the 2025 Ballon d'Or?")

if __name__ == "__main__":
asyncio.run(main())
requirements.txt
clarifai==11.7.5
anyio==4.9.0
mcp==1.9.0
fastmcp==2.3.4
requests>=2.31.0
beautifulsoup4==4.12.2
lxml>=4.9.3
ddgs
config.yaml
build_info:
python_version: '3.11'
inference_compute_info:
cpu_limit: 1000m
cpu_memory: 1Gi
num_accelerators: 0
model:
app_id: test-mcp
id: browser-search-mcp-model
model_type_id: text-to-text
user_id: clarifai-user-id

Multimodal Models

Qwen2.5 VL 3B Instruct

1/model.py
import os
import sys

sys.path.append(os.path.dirname(__file__))
from typing import Iterator, List

from clarifai.runners.models.model_builder import ModelBuilder
from clarifai.runners.models.openai_class import OpenAIModelClass
from clarifai.runners.utils.data_types import Image
from clarifai.runners.utils.data_utils import Param
from clarifai.runners.utils.openai_convertor import build_openai_messages
from clarifai.utils.logging import logger
from openai import OpenAI
from openai_server_starter import OpenAI_APIServer


class VLLMModel(OpenAIModelClass):
"""
A custom runner that integrates with the Clarifai platform and uses Server inference
to process inputs, including text and images.
"""

client = True # This will be set in load_model method
model = True # This will be set in load_model method

def load_model(self):
"""Load the model here and start the server."""
os.path.join(os.path.dirname(__file__))

server_args = {
'max_model_len': '8192',
'gpu_memory_utilization': 0.95,
'dtype': 'auto',
'task': 'auto',
'kv_cache_dtype': 'auto',
'tensor_parallel_size': 1,
'chat_template': None,
'cpu_offload_gb': 0.0,
'quantization': None,
'port': 23333,
'host': 'localhost',
'checkpoints': 'runtime'
}

stage = server_args.get("checkpoints")
if stage in ["build", "runtime"]:
config_path = os.path.dirname(os.path.dirname(__file__))
builder = ModelBuilder(config_path, download_validation_only=True)
checkpoints = builder.download_checkpoints(stage=stage)
server_args.update({"checkpoints": checkpoints})

if server_args.get("additional_list_args") == ['']:
server_args.pop("additional_list_args")

# Start server
# This line were generated by `upload` module
self.server = OpenAI_APIServer.from_vllm_backend(**server_args)

self.client = OpenAI(
api_key="notset",
base_url=VLLMModel.make_api_url(self.server.host, self.server.port))
self.model = self._get_model()

logger.info(f"OpenAI {self.model} model loaded successfully!")

def _get_model(self):
try:
return self.client.models.list().data[0].id
except Exception as e:
raise ConnectionError("Failed to retrieve model ID from API") from e

@staticmethod
def make_api_url(host: str, port: int, version: str = "v1") -> str:
return f"http://{host}:{port}/{version}"

@OpenAIModelClass.method
def predict(self,
prompt: str,
image: Image = None,
images: List[Image] = None,
chat_history: List[dict] = None,
max_tokens: int = Param(default=512, description="The maximum number of tokens to generate. Shorter token lengths will provide faster performance.", ),
temperature: float = Param(default=0.7, description="A decimal number that determines the degree of randomness in the response", ),
top_p: float = Param(default=0.95, description="An alternative to sampling with temperature, where the model considers the results of the tokens with top_p probability mass.", )
) -> str:
"""This is the method that will be called when the runner is run. It takes in an input and
returns an output.
"""
openai_messages = build_openai_messages(prompt=prompt, image=image, images=images, messages=chat_history)
response = self.client.chat.completions.create(
model=self.model,
messages=openai_messages,
max_completion_tokens=max_tokens,
temperature=temperature,
top_p=top_p)
if response.usage and response.usage.prompt_tokens and response.usage.completion_tokens:
self.set_output_context(prompt_tokens=response.usage.prompt_tokens, completion_tokens=response.usage.completion_tokens)
return response.choices[0].message.content

@OpenAIModelClass.method
def generate(self,
prompt: str,
image: Image = None,
images: List[Image] = None,
chat_history: List[dict] = None,
max_tokens: int = Param(default=512, description="The maximum number of tokens to generate. Shorter token lengths will provide faster performance.", ),
temperature: float = Param(default=0.7, description="A decimal number that determines the degree of randomness in the response", ),
top_p: float = Param(default=0.95, description="An alternative to sampling with temperature, where the model considers the results of the tokens with top_p probability mass.", )
) -> Iterator[str]:
"""Example yielding a whole batch of streamed stuff back."""
openai_messages = build_openai_messages(prompt=prompt, image=image, images=images, messages=chat_history)
for chunk in self.client.chat.completions.create(
model=self.model,
messages=openai_messages,
max_completion_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stream=True):
if chunk.choices:
text = (chunk.choices[0].delta.content
if (chunk and chunk.choices[0].delta.content) is not None else '')
yield text

def test(self):
try:
print("Testing predict...")
print(self.predict(prompt="Explain why cat can't fly"))
print(self.predict(prompt="Describe this image", image=Image.from_url("https://samples.clarifai.com/metro-north.jpg")))
except Exception as e:
print(f"Error in predict {e}")

try:
print("Testing generate...")
for each in self.generate(prompt="Explain why cat can't fly"):
print(each, end=" ")
print()
for each in self.generate(prompt="Describe this image", image=Image.from_url("https://samples.clarifai.com/metro-north.jpg")):
print(each, end=" ")
print()
except Exception as e:
print(f"Error in generate {e}")
1/openai_server_starter.py
import os
import signal
import subprocess
import sys
import threading
from typing import List

import psutil
from clarifai.utils.logging import logger

PYTHON_EXEC = sys.executable


def kill_process_tree(parent_pid, include_parent: bool = True, skip_pid: int = None):
"""Kill the process and all its child processes."""
if parent_pid is None:
parent_pid = os.getpid()
include_parent = False

try:
itself = psutil.Process(parent_pid)
except psutil.NoSuchProcess:
return

children = itself.children(recursive=True)
for child in children:
if child.pid == skip_pid:
continue
try:
child.kill()
except psutil.NoSuchProcess:
pass

if include_parent:
try:
itself.kill()

# Sometime processes cannot be killed with SIGKILL (e.g, PID=1 launched by kubernetes),
# so we send an additional signal to kill them.
itself.send_signal(signal.SIGQUIT)
except psutil.NoSuchProcess:
pass


class OpenAI_APIServer:

def __init__(self, **kwargs):
self.server_started_event = threading.Event()
self.process = None
self.backend = None
self.server_thread = None

def __del__(self, *exc):
# This is important
# close the server when exit the program
self.close()

def close(self):
if self.process:
try:
kill_process_tree(self.process.pid)
except:
self.process.terminate()
if self.server_thread:
self.server_thread.join()

def wait_for_startup(self):
self.server_started_event.wait()

def validate_if_server_start(self, line: str):
line_lower = line.lower()
if self.backend in ["vllm", "sglang", "lmdeploy"]:
if self.backend == "vllm":
return "application startup complete" in line_lower or "vllm api server on" in line_lower
else:
return f" running on http://{self.host}:" in line.strip()
elif self.backend == "llamacpp":
return "waiting for new tasks" in line_lower
elif self.backend == "tgi":
return "Connected" in line.strip()

def _start_server(self, cmds):
try:
env = os.environ.copy()
env["VLLM_USAGE_SOURCE"] = "production-docker-image"
self.process = subprocess.Popen(
cmds,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
for line in self.process.stdout:
logger.info("Server Log: " + line.strip())
if self.validate_if_server_start(line):
self.server_started_event.set()
# break
except Exception as e:
if self.process:
self.process.terminate()
raise RuntimeError(f"Failed to start Server server: {e}")

def start_server_thread(self, cmds: str):
try:
# Start the server in a separate thread
self.server_thread = threading.Thread(target=self._start_server, args=(cmds,), daemon=None)
self.server_thread.start()

# Wait for the server to start
self.wait_for_startup()
except Exception as e:
raise Exception(e)

@classmethod
def from_vllm_backend(cls,
checkpoints,
limit_mm_per_prompt: str = '',
max_model_len: float = None,
gpu_memory_utilization: float = 0.9,
dtype="auto",
task="auto",
kv_cache_dtype: str = "auto",
tensor_parallel_size=1,
chat_template: str = None,
cpu_offload_gb: float = 0.,
quantization: str = None,
port=23333,
host="localhost",
additional_list_args: List[str] = []):
"""Run VLLM OpenAI compatible server

Args:
checkpoints (str): model id or path
limit_mm_per_prompt (str, optional): For each multimodal plugin, limit how many input instances to allow for each prompt. Expects a comma-separated list of items, e.g.: image=16,video=2 allows a maximum of 16 images and 2 videos per prompt. Defaults to 1 for each modality.
max_model_len (float, optional):Model context length. If unspecified, will be automatically derived from the model config. Defaults to None.
gpu_memory_utilization (float, optional): The fraction of GPU memory to be used for the model executor, which can range from 0 to 1. For example, a value of 0.5 would imply 50% GPU memory utilization. If unspecified, will use the default value of 0.9. This is a per-instance limit, and only applies to the current vLLM instance.It does not matter if you have another vLLM instance running on the same GPU. For example, if you have two vLLM instances running on the same GPU, you can set the GPU memory utilization to 0.5 for each instance. Defaults to 0.9.
dtype (str, optional): dtype. Defaults to "float16".
task (str, optional): The task to use the model for. Each vLLM instance only supports one task, even if the same model can be used for multiple tasks. When the model only supports one task, "auto" can be used to select it; otherwise, you must specify explicitly which task to use. Choices {auto, generate, embedding, embed, classify, score, reward, transcription}. Defaults to "auto".
kv_cache_dtype (str, optional): Data type for kv cache storage. If “auto”, will use model data type. CUDA 11.8+ supports fp8 (=fp8_e4m3) and fp8_e5m2. ROCm (AMD GPU) supports fp8 (=fp8_e4m3). Defaults to "auto".
tensor_parallel_size (int, optional): n gpus. Defaults to 1.
chat_template (str, optional): The file path to the chat template, or the template in single-line form for the specified model. Defaults to None.
cpu_offload_gb (float, optional): The space in GiB to offload to CPU, per GPU. Default is 0, which means no offloading. Intuitively, this argument can be seen as a virtual way to increase the GPU memory size. For example, if you have one 24 GB GPU and set this to 10, virtually you can think of it as a 34 GB GPU. Then you can load a 13B model with BF16 weight, which requires at least 26GB GPU memory. Note that this requires fast CPU-GPU interconnect, as part of the model is loaded from CPU memory to GPU memory on the fly in each model forward pass. Defaults to 0.
quantization (str, optional): quantization format {aqlm,awq,deepspeedfp,tpu_int8,fp8,fbgemm_fp8,modelopt,marlin,gguf,gptq_marlin_24,gptq_marlin,awq_marlin,gptq,compressed-tensors,bitsandbytes,qqq,hqq,experts_int8,neuron_quant,ipex,quark,moe_wna16,None}. Defaults to None.
port (int, optional): port. Defaults to 23333.
host (str, optional): host name. Defaults to "localhost".
additional_list_args (List[str], optional): additional args to run subprocess cmd e.g. ["--arg-name", "arg value"]. See more at [this document](https://docs.vllm.ai/en/latest/serving/openai_compatible_server.html#vllm-serve). Defaults to [].

"""
cmds = [
PYTHON_EXEC, '-m', 'vllm.entrypoints.openai.api_server', '--model', checkpoints, '--dtype',
str(dtype), '--task',
str(task), '--kv-cache-dtype',
str(kv_cache_dtype), '--tensor-parallel-size',
str(tensor_parallel_size), '--gpu-memory-utilization',
str(gpu_memory_utilization), '--cpu-offload-gb',
str(cpu_offload_gb), '--port',
str(port), '--host',
str(host), "--trust-remote-code"
]

if quantization:
cmds += [
'--quantization',
str(quantization),
]
if chat_template:
cmds += [
'--chat-template',
str(chat_template),
]
if max_model_len:
cmds += [
'--max-model-len',
str(max_model_len),
]
if limit_mm_per_prompt:
cmds += [
'--limit-mm-per-prompt',
str(limit_mm_per_prompt),
]

if additional_list_args != []:
cmds += additional_list_args

print("CMDS to run vllm server: ", cmds)

_self = cls()

_self.host = host
_self.port = port
_self.backend = "vllm"
_self.start_server_thread(cmds)
import time
time.sleep(5)

return _self
requirements.txt
torch==2.6.0
tokenizers==0.21.1
accelerate==1.2.0
optimum==1.23.3
xformers
einops==0.8.0
packaging
ninja

qwen-vl-utils==0.0.8
timm==1.0.12
openai
clarifai>=11.5.0,<12.0.0
psutil

vllm==0.8.5
transformers==4.51.1
config.yaml
# Config file for the vLLM runner

model:
id: "Qwen2_5-VL-3B-Instruct"
user_id: "user_id"
app_id: "app_id"
model_type_id: "multimodal-to-text"

build_info:
python_version: "3.11"

inference_compute_info:
cpu_limit: "3"
cpu_memory: "14Gi"
num_accelerators: 1
accelerator_type: ["NVIDIA-*"]
accelerator_memory: "44Gi"

checkpoints:
type: "huggingface"
repo_id: "Qwen/Qwen2.5-VL-3B-Instruct"
hf_token: "hf_token"

OCR

NanoNets OCR Small

1/model.py
import os
from io import BytesIO

# Third-party imports
import torch
from PIL import Image as PILImage
from transformers import AutoTokenizer, AutoProcessor, AutoModelForImageTextToText

# Clarifai imports
from clarifai.runners.models.model_builder import ModelBuilder
from clarifai.runners.models.model_class import ModelClass
from clarifai.runners.utils.data_types import Image
from clarifai.runners.utils.data_utils import Param
from clarifai.utils.logging import logger

DEFAULT_PROMPT = """Extract the text from the above document as if you were reading it naturally. Return the tables in html format. Return the equations in LaTeX representation. If there is an image in the document and image caption is not present, add a small description of the image inside the <img></img> tag; otherwise, add the image caption inside <img></img>. Watermarks should be wrapped in brackets. Ex: <watermark>OFFICIAL COPY</watermark>. Page numbers should be wrapped in brackets. Ex: <page_number>14</page_number> or <page_number>9/22</page_number>. Prefer using ☐ and ☑ for check boxes."""


def preprocess_image(image_bytes: bytes) -> PILImage:
"""Convert image bytes to PIL Image."""
return PILImage.open(BytesIO(image_bytes)).convert("RGB")


class MyRunner(ModelClass):
"""A custom runner that loads the OCR model and runs it on the input image."""

def load_model(self):
"""Load the model here."""

model_path = os.path.dirname(os.path.dirname(__file__))
builder = ModelBuilder(model_path, download_validation_only=True)
checkpoint_path = builder.download_checkpoints(stage="runtime")

self.device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Running on device: {self.device}")

self.model = AutoModelForImageTextToText.from_pretrained(checkpoint_path).to(
self.device
)
self.model.eval()
self.tokenizer = AutoTokenizer.from_pretrained(checkpoint_path)
self.processor = AutoProcessor.from_pretrained(checkpoint_path)

logger.info("Done loading!")

@ModelClass.method
def predict(
self,
image: Image,
prompt: str = Param(
default=DEFAULT_PROMPT, description="The prompt to use for the OCR model."
),
max_new_tokens: int = Param(
default=512,
description="The maximum number of tokens to generate. Shorter token lengths will provide faster performance.",
),
) -> str:
"""This is the method that will be called when the runner is run. It takes in an input and returns an output."""
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": f"data:image/png;base64,{image.bytes}",
},
{"type": "text", "text": prompt},
],
},
]
image = preprocess_image(image.bytes)
text = self.processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
inputs = self.processor(
text=[text], images=[image], padding=True, return_tensors="pt"
)
inputs = inputs.to(self.model.device)

output_ids = self.model.generate(
**inputs, max_new_tokens=max_new_tokens, do_sample=False
)
generated_ids = [
output_ids[len(input_ids) :]
for input_ids, output_ids in zip(inputs.input_ids, output_ids)
]

output_text = self.processor.batch_decode(
generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
)
return output_text[0]

def test(self):
"""Test the model with a sample image."""
import requests # Import moved here as it's only used for testing

# Load a sample image from the IAM database
url = "https://dl.a9t9.com/ocr/solarcell.jpg"
image = Image(bytes=requests.get(url).content)
# image = Image.from_url(url)
generated_text = self.predict(image)
# Log the detected text
logger.info(f"Detected text:\n{generated_text}")
requirements.txt
clarifai>=11.5.2,<12.0.0
pillow==10.4.0
torch==2.6.0
torchvision==0.21.0
transformers>=4.51.1
config.yaml
model:
id: "nanonets-ocr-s"
user_id: "user_id"
app_id: "app_id"
model_type_id: "multimodal-to-text"

build_info:
python_version: "3.11"

inference_compute_info:
cpu_limit: "2"
cpu_memory: "18Gi"
num_accelerators: 1
accelerator_type: ["NVIDIA-*"]
accelerator_memory: "18Gi"

checkpoints:
type: "huggingface"
repo_id: "nanonets/Nanonets-OCR-s"

Text Embedder

Jina Embeddings v3

1/model.py
from typing import List, Iterator
import os
from clarifai.runners.models.openai_class import OpenAIModelClass
from clarifai.runners.models.model_builder import ModelBuilder

import onnxruntime
import numpy as np
from transformers import AutoTokenizer, PretrainedConfig

from clarifai.utils.logging import logger

#Helper function Mean pool function
def mean_pooling(model_output: np.ndarray, attention_mask: np.ndarray):
token_embeddings = model_output
input_mask_expanded = np.expand_dims(attention_mask, axis=-1)
input_mask_expanded = np.broadcast_to(input_mask_expanded, token_embeddings.shape)
sum_embeddings = np.sum(token_embeddings * input_mask_expanded, axis=1)
sum_mask = np.clip(np.sum(input_mask_expanded, axis=1), a_min=1e-9, a_max=None)
return sum_embeddings / sum_mask

class Jinaai_embedding_v2(OpenAIModelClass):
"""
A custom runner that integrates with the Clarifai platform and uses Jinaai embedding v2 model to process inputs, including text and images.
"""
client = True
model = "jinaai-embedding-v3"

def load_model(self):
"""Load the model here and start the server."""

# Load checkpoints
model_path = os.path.dirname(os.path.dirname(__file__))
builder = ModelBuilder(model_path, download_validation_only=True)
logger.info(f"\nDownloading Jinaai {self.model} model checkpoints...\n")
self.checkpoints = builder.download_checkpoints(stage='runtime')
logger.info(f"Checkpoints downloaded to {self.checkpoints}")

#logger.info("Loading Jinaai embedding v3 model...")
self.tokenizer = AutoTokenizer.from_pretrained(self.checkpoints)
self.config = PretrainedConfig.from_pretrained(self.checkpoints)

#logger.info(f"Tokenizer and config loaded from 'jinaai/jinaai-embeddings-v3'")
self.session = onnxruntime.InferenceSession(self.checkpoints + '/onnx/model.onnx')

# log that system is ready
logger.info(f"Jinaai {self.model} model loaded successfully!")

def tokenize_and_embed(self, input: str):
"""
Tokenize the input text and return the embedding vector.

Args:
input (str): The input text to be tokenized and embedded.

Returns:
np.ndarray: The embedding vector for the input text.
"""
# Tokenize input
input_text = self.tokenizer(input, return_tensors='np')
# Prepare inputs for ONNX model
task_type = 'text-matching'
task_id = np.array(self.config.lora_adaptations.index(task_type), dtype=np.int64)
inputs = {
'input_ids': input_text['input_ids'],
'attention_mask': input_text['attention_mask'],
'task_id': task_id
}

# Run model
outputs = self.session.run(None, inputs)[0]
return outputs, input_text

@OpenAIModelClass.method
def predict(self,
input: str,) -> List[float]:
"""
Predict method to process the input text and return the embedding vector.

Args:
input (str): The input text to be processed.

Returns:
List[float]: The embedding vector for the input text.
"""
# Tokenize and embed the input text
outputs, input_text = self.tokenize_and_embed(input)

# Apply mean pooling and normalization to the model outputs
embeddings = mean_pooling(outputs, input_text["attention_mask"])
embeddings = embeddings / np.linalg.norm(embeddings, ord=2, axis=1, keepdims=True)

return embeddings[0].tolist()
requirements.txt
transformers #4.52.4
clarifai #11.4.10
onnxruntime
numpy
config.yaml
model:
id: "jinaai-embeddings-v3"
user_id: "user_id"
app_id: "app_id"
model_type_id: "text-embedder"

build_info:
python_version: "3.11"

inference_compute_info:
cpu_limit: "1"
cpu_memory: "5Gi"
num_accelerators: 0

checkpoints:
type: "huggingface"
repo_id: "jinaai/jina-embeddings-v3"

Text-to-Image

FLUX Schnell

1/model.py
import os
from typing import List

from clarifai.runners.models.model_class import ModelClass
from clarifai.runners.utils.data_utils import Param
from clarifai.runners.utils.data_types import Image
from clarifai.runners.models.model_builder import ModelBuilder

from diffusers import FluxPipeline
import torch


class TextToImageModel(ModelClass):
"""
A custom runner for the FLUX model that integrates with the Clarifai platform.
"""

def load_model(self):
"""Load the model here."""
# "black-forest-labs/FLUX.1-schnell"

self.device = "cuda"

model_path = os.path.dirname(os.path.dirname(__file__))
builder = ModelBuilder(model_path, download_validation_only=True)
checkpoints = builder.download_checkpoints(stage="runtime")
# load model and scheduler
self.pipeline = FluxPipeline.from_pretrained(
checkpoints,
torch_dtype=torch.bfloat16
)

self.pipeline = self.pipeline.to(self.device)

@ModelClass.method
def predict(
self,
prompt: str,
num_inference_steps: int = Param(default=28, description="The number of denoising steps. More denoising steps usually lead to a higher quality image at the expense of slower inference."),
guidance_scale: float = Param(default=3.5, description="The `guidance_scale` controls how strongly the model follows the conditioning input during generation."),
negative_prompt: str = Param(default="", description="The prompt to guide what to not include in image generation. Ignored when not using guidance (guidance_scale < 1)"),
true_cfg_scale: float = Param(default=1.0, description="When > 1.0 and a provided negative_prompt, enables true classifier-free guidance"),
height: int = Param(default=1024, description="The height in pixels of the generated image. This is set to 1024 by default for the best results."),
width: int = Param(default=1024, description="The width in pixels of the generated image. This is set to 1024 by default for the best results."),
max_sequence_length: int = Param(default=256, description="Maximum sequence length to use with the prompt"),
seed: int = Param(default=None, description="Seed value to make generation deterministic."),
# No need
sigmas: List[float] = None,
) -> Image:

image = self.pipeline(
prompt=prompt,
negative_prompt=negative_prompt,
guidance_scale=guidance_scale,
num_inference_steps=num_inference_steps,
max_sequence_length=max_sequence_length,
width=width,
height=height,
true_cfg_scale=true_cfg_scale,
generator=torch.Generator("cpu").manual_seed(seed) if seed else None,
sigmas=sigmas,
).images[0]

# this is important, delete all model cache to avoid OOM
torch.cuda.empty_cache()

return Image.from_pil(image)


@ModelClass.method
def create(
self,
prompt: List[str],
prompt_2: List[str] = None,
negative_prompt: List[str] = None,
negative_prompt_2: List[str] = None,
true_cfg_scale: float = 1.0,
height: int = None,
width: int = None,
max_sequence_length: int = 256,
num_inference_steps: int = 28,
guidance_scale: float = 3.5,
seed: int = None,
sigmas: List[float] = None,
) -> List[Image]:
"""
Generate an image from the given prompt using the FLUX model.
Args:
* prompt (`List[str]`): The prompt or prompts to guide the image generation.
* prompt_2 (`List[str]`, *optional*): The prompt to be sent to `tokenizer_2` and `text_encoder_2`. If not defined, `prompt` is will be used instead.
* negative_prompt (`List[str]`, *optional*): The prompt to guide what to not include in image generation. Ignored when not using guidance (guidance_scale < 1).
* negative_prompt_2 (`List[str]`, *optional*): The negative_prompt to be sent to `tokenizer_2` and `text_encoder_2`. If not defined, `negative_prompt` is will be used instead.
* height (`int`, *optional*, defaults to model.unet.config.sample_size * model.vae_scale_factor): The height in pixels of the generated image. This is set to 1024 by default for the best results.
* width (`int`, *optional*, defaults to model.unet.config.sample_size * model.vae_scale_factor): The width in pixels of the generated image. This is set to 1024 by default for the best results.
* num_inference_steps (`int`, *optional*, defaults to 28): The number of denoising steps. More denoising steps usually lead to a higher quality image at the expense of slower inference.
* guidance_scale (`float`, *optional*, defaults to 3.5):
Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598).
`guidance_scale` is defined as `w` of equation 2. of [Imagen
Paper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale >
1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`,
usually at the expense of lower image quality.
* seed (`int`, *optional*, defaults to None):
Seed value passed to `torch.Generator("cpu").manual_seed(seed)` (see more [torch generator(s)](https://pytorch.org/docs/stable/generated/torch.Generator.html)) to make generation deterministic.
* sigmas (`List[float]`, *optional*): Custom sigmas to use for the denoising process with schedulers which support a `sigmas` argument in their `set_timesteps` method. If not defined, the default behavior when `num_inference_steps` is passed will be used.
* max_sequence_length (`int` defaults to `256`): Maximum sequence length to use with the prompt.

(see more at this [doc](https://huggingface.co/docs/diffusers/v0.32.2/en/api/pipelines/flux#diffusers.FluxPipeline.__call__))
"""
assert isinstance(prompt, list), ValueError("prompt must be a list of string")
assert len(prompt) <= 4, ValueError(
f"The provided prompt length ({len(prompt)}) exceeds the maximum limit (4). Please reduce the number of prompts in `prompt`.")
images = self.pipeline(
prompt=prompt,
prompt_2=prompt_2,
negative_prompt=negative_prompt,
negative_prompt_2=negative_prompt_2,
guidance_scale=guidance_scale,
num_inference_steps=num_inference_steps,
max_sequence_length=max_sequence_length,
width=width,
height=height,
true_cfg_scale=true_cfg_scale,
generator=torch.Generator("cpu").manual_seed(seed) if seed else None,
sigmas=sigmas,
).images

# this is important, delete all model cache to avoid OOM
torch.cuda.empty_cache()

return [Image.from_pil(image) for image in images]


def test(self):
"""
Test cases only executed when running `clarifai model test-locally`
"""
image = self.predict(
prompt="A Ghibli animated orange cat, panicked about a deadline, sits in front of a Banana-brand laptop.",
negative_prompt="Ugly, cute", guidance_scale=7)
print(image)

images = self.create(
prompt=["A Ghibli animated orange cat, panicked about a deadline, sits in front of a Banana-brand laptop."]*3,
negative_prompt=["Ugly, cute"]*2, guidance_scale=7)
print(images)
requirements.txt
tokenizers==0.21.0
transformers>=4.48
diffusers==0.32.2
accelerate==1.2.0
optimum==1.23.3
xformers
einops==0.8.0
requests==2.32.3
sentencepiece==0.2.0
numpy>2.0
ninja
aiohttp
packaging
torch==2.5.1
clarifai
clarifai-protocol
config.yaml
model: 
id: "flux_1-schnell"
user_id:
app_id:
model_type_id: "text-to-image"

build_info:
python_version: "3.11"

inference_compute_info:
cpu_limit: "3"
cpu_memory: "18Gi"
num_accelerators: 1
accelerator_type: ["NVIDIA-L40S", "NVIDIA-A100", "NVIDIA-H100"]
accelerator_memory: "44Gi"

checkpoints:
type: huggingface
repo_id: black-forest-labs/FLUX.1-schnell
hf_token: