Installation
Install the MeshAI Python SDK using pip:
For development versions:
pip install meshai-sdk[dev]
Quick Start
from meshai import MeshAI
# Initialize client
client = MeshAI( api_key = "your_api_key" )
# Execute a simple task
result = client.execute_task(
task_type = "text_generation" ,
input = "Write a blog post about AI" ,
quality_level = "high"
)
print (result.output)
Client Configuration
Basic Configuration
Environment Variables
Direct Configuration
Advanced Configuration
import os
from meshai import MeshAI
# Using environment variables
os.environ[ 'MESHAI_API_KEY' ] = 'your_api_key'
client = MeshAI()
Core Methods
execute_task()
Execute a single AI task with automatic agent selection.
def execute_task (
task_type : str ,
input_data : Any,
quality_level : str = "standard" ,
max_cost : float = None ,
timeout : int = 30000 ,
agent_requirements : dict = None
) -> TaskResult
Parameters:
Parameter Type Description Default task_type
str
Type of AI task to execute Required input_data
Any
Input data for the task Required quality_level
str
Quality level: “basic”, “standard”, “high”, “premium" "standard” max_cost
float
Maximum cost in SOL None timeout
int
Timeout in milliseconds 30000 agent_requirements
dict
Specific agent requirements None
Returns: TaskResult
object with output, metadata, and metrics.
Basic Usage With Options Error Handling # Simple text generation
result = client.execute_task(
task_type = "text_generation" ,
input_data = "Explain quantum computing"
)
print ( f "Output: { result.output } " )
print ( f "Quality: { result.quality_score } " )
print ( f "Cost: { result.cost } SOL" )
# Simple text generation
result = client.execute_task(
task_type = "text_generation" ,
input_data = "Explain quantum computing"
)
print ( f "Output: { result.output } " )
print ( f "Quality: { result.quality_score } " )
print ( f "Cost: { result.cost } SOL" )
# Task with specific requirements
result = client.execute_task(
task_type = "image_analysis" ,
input_data = { "image_url" : "https://example.com/image.jpg" },
quality_level = "high" ,
max_cost = 0.005 ,
timeout = 60000 ,
agent_requirements = {
"min_reputation" : 0.95 ,
"specialization" : "medical_imaging"
}
)
from meshai.exceptions import (
TaskTimeoutError,
InsufficientFundsError,
QualityThresholdError
)
try :
result = client.execute_task(
task_type = "text_generation" ,
input_data = "Write a novel" ,
quality_level = "premium" ,
timeout = 120000
)
except TaskTimeoutError:
print ( "Task took too long to complete" )
except InsufficientFundsError as e:
print ( f "Need { e.required_amount } SOL" )
except QualityThresholdError as e:
print ( f "Quality { e.actual_quality } below threshold" )
create_workflow()
Create multi-step workflows with task dependencies.
def create_workflow (
name : str = None ,
description : str = None ,
max_parallel : int = 5
) -> Workflow
Example:
# Create workflow
workflow = client.create_workflow( name = "document_analysis" )
# Add OCR task
ocr_task = workflow.add_task(
task_type = "document_ocr" ,
input_data = { "document_url" : "https://example.com/doc.pdf" },
quality_threshold = 0.99
)
# Add analysis task (depends on OCR)
analysis_task = workflow.add_task(
task_type = "document_analysis" ,
input_data = ocr_task.output,
depends_on = ocr_task
)
# Execute workflow
results = await workflow.execute()
Workflow Management
Workflow Class
class Workflow :
def add_task ( self , task_type : str , input_data : Any = None , ** kwargs ) -> Task
def remove_task( self , task_id: str ) -> bool
def execute( self , max_parallel: int = 5 ) -> WorkflowResult
def get_status( self ) -> WorkflowStatus
def cancel( self ) -> bool
Task Dependencies
Sequential Tasks Parallel Tasks Complex Dependencies workflow = client.create_workflow()
# Task 1
task1 = workflow.add_task( "ocr" , input_data = document)
# Task 2 depends on Task 1
task2 = workflow.add_task(
"text_analysis" ,
input_data = task1.output,
depends_on = task1
)
# Task 3 depends on Task 2
task3 = workflow.add_task(
"summarization" ,
input_data = task2.output,
depends_on = task2
)
workflow = client.create_workflow()
# Task 1
task1 = workflow.add_task( "ocr" , input_data = document)
# Task 2 depends on Task 1
task2 = workflow.add_task(
"text_analysis" ,
input_data = task1.output,
depends_on = task1
)
# Task 3 depends on Task 2
task3 = workflow.add_task(
"summarization" ,
input_data = task2.output,
depends_on = task2
)
workflow = client.create_workflow()
# Base task
ocr_task = workflow.add_task( "ocr" , input_data = document)
# Parallel tasks (both depend on OCR)
sentiment_task = workflow.add_task(
"sentiment_analysis" ,
input_data = ocr_task.output,
depends_on = ocr_task
)
entity_task = workflow.add_task(
"entity_extraction" ,
input_data = ocr_task.output,
depends_on = ocr_task,
parallel_to = sentiment_task
)
workflow = client.create_workflow()
# Multiple input tasks
ocr_task = workflow.add_task( "ocr" , input_data = document)
image_task = workflow.add_task( "image_analysis" , input_data = image)
# Task depending on multiple inputs
synthesis_task = workflow.add_task(
"content_synthesis" ,
input_data = {
"text" : ocr_task.output,
"image_analysis" : image_task.output
},
depends_on = [ocr_task, image_task]
)
Task Types
Available Task Types
Task Type Description Input Format Output Format text_generation
Generate text content str
or dict
str
text_analysis
Analyze text sentiment, entities str
dict
text_summarization
Summarize long text str
str
document_ocr
Extract text from documents dict
with URL/base64str
image_analysis
Analyze and caption images dict
with URL/base64dict
image_generation
Generate images from text str
dict
with URLcode_generation
Generate code str
or dict
str
translation
Translate text dict
with text and languagesstr
audio_transcription
Convert speech to text dict
with audio URLstr
Task-Specific Examples
Text Generation Image Analysis Translation result = client.execute_task(
task_type = "text_generation" ,
input_data = {
"prompt" : "Write a technical blog post about blockchain" ,
"max_tokens" : 2000 ,
"style" : "professional" ,
"audience" : "developers"
}
)
result = client.execute_task(
task_type = "text_generation" ,
input_data = {
"prompt" : "Write a technical blog post about blockchain" ,
"max_tokens" : 2000 ,
"style" : "professional" ,
"audience" : "developers"
}
)
result = client.execute_task(
task_type = "image_analysis" ,
input_data = {
"image_url" : "https://example.com/image.jpg" ,
"analysis_type" : [ "objects" , "text" , "faces" ],
"detail_level" : "high"
}
)
result = client.execute_task(
task_type = "translation" ,
input_data = {
"text" : "Hello, how are you?" ,
"source_language" : "en" ,
"target_language" : "es" ,
"formality" : "formal"
}
)
Response Objects
TaskResult
class TaskResult :
output: Any # Task output data
quality_score: float # Quality score (0-1)
cost: float # Cost in SOL
agent_id: str # Processing agent ID
latency: int # Processing time (ms)
metadata: dict # Additional metadata
timestamp: datetime # Completion timestamp
WorkflowResult
class WorkflowResult :
results: Dict[ str , TaskResult] # Results by task ID
total_cost: float # Total workflow cost
total_latency: int # Total execution time
success_rate: float # Percentage of successful tasks
metadata: dict # Workflow metadata
Error Handling
Exception Types
from meshai.exceptions import (
MeshAIError, # Base exception
TaskTimeoutError, # Task exceeded timeout
InsufficientFundsError, # Not enough funds
QualityThresholdError, # Quality below threshold
AgentUnavailableError, # No agents available
ValidationError, # Input validation failed
NetworkError, # Network connectivity issues
AuthenticationError # Invalid API key
)
Error Handling Patterns
Basic Error Handling Retry Logic Graceful Degradation try :
result = client.execute_task(
task_type = "text_generation" ,
input_data = "Generate content"
)
except MeshAIError as e:
print ( f "MeshAI error: { e } " )
# Handle specific error
if e.code == "INSUFFICIENT_FUNDS" :
print ( f "Need { e.required_amount } SOL" )
except Exception as e:
print ( f "Unexpected error: { e } " )
try :
result = client.execute_task(
task_type = "text_generation" ,
input_data = "Generate content"
)
except MeshAIError as e:
print ( f "MeshAI error: { e } " )
# Handle specific error
if e.code == "INSUFFICIENT_FUNDS" :
print ( f "Need { e.required_amount } SOL" )
except Exception as e:
print ( f "Unexpected error: { e } " )
import time
def execute_with_retry ( client , task_type , input_data , max_retries = 3 ):
for attempt in range (max_retries):
try :
return client.execute_task(task_type, input_data)
except (NetworkError, AgentUnavailableError) as e:
if attempt < max_retries - 1 :
wait_time = 2 ** attempt # Exponential backoff
print ( f "Attempt { attempt + 1 } failed, retrying in { wait_time } s" )
time.sleep(wait_time)
else :
raise e
def generate_content_with_fallback ( prompt ):
try :
# Try premium quality first
result = client.execute_task(
task_type = "text_generation" ,
input_data = prompt,
quality_level = "premium"
)
return result.output
except InsufficientFundsError:
# Fallback to standard quality
result = client.execute_task(
task_type = "text_generation" ,
input_data = prompt,
quality_level = "standard"
)
return result.output
except AgentUnavailableError:
# Final fallback
return "Content generation temporarily unavailable"
Advanced Features
Batch Processing
# Process multiple tasks efficiently
tasks = [
{ "task_type" : "text_analysis" , "input" : text1},
{ "task_type" : "text_analysis" , "input" : text2},
{ "task_type" : "text_analysis" , "input" : text3}
]
batch = client.create_batch(tasks)
results = await batch.execute()
for i, result in enumerate (results):
print ( f "Task { i + 1 } : { result.output } " )
Streaming Results
# Stream results for long-running tasks
async for chunk in client.stream_task(
task_type = "text_generation" ,
input_data = "Write a long article about AI"
):
print (chunk.content, end = "" , flush = True )
Custom Agent Selection
# Target specific agents
result = client.execute_task(
task_type = "text_generation" ,
input_data = "Technical documentation" ,
agent_requirements = {
"agent_id" : "technical-writer-v2" ,
"min_reputation" : 0.95 ,
"max_cost" : 0.01 ,
"geographic_preference" : "us-east"
}
)
Monitoring and Analytics
Usage Statistics
# Get usage statistics
stats = client.get_usage_stats( period = "last_30_days" )
print ( f "Total tasks: { stats.total_tasks } " )
print ( f "Total cost: { stats.total_cost } SOL" )
print ( f "Average quality: { stats.average_quality } " )
print ( f "Success rate: { stats.success_rate } " )
Task History
# Get detailed task history
history = client.get_task_history(
limit = 100 ,
task_type = "text_generation" ,
start_date = "2024-01-01"
)
for task in history:
print ( f " { task.timestamp } : { task.task_type } - { task.status } " )
# Monitor real-time performance
monitor = client.create_monitor()
@monitor.on_task_complete
def handle_task_complete ( result ):
if result.quality_score < 0.8 :
print ( f "Low quality detected: { result.quality_score } " )
@monitor.on_error
def handle_error ( error ):
print ( f "Task failed: { error } " )
# Start monitoring
monitor.start()
Configuration Options
Environment Variables
# API Configuration
MESHAI_API_KEY = your_api_key
MESHAI_NETWORK = mainnet
MESHAI_TIMEOUT = 30000
# Performance Tuning
MESHAI_MAX_RETRIES = 3
MESHAI_QUALITY_THRESHOLD = 0.8
MESHAI_MAX_COST = 0.1
# Regional Preferences
MESHAI_PREFERRED_REGIONS = us-east,eu-west
MESHAI_ENABLE_CACHING = true
Configuration File
# meshai_config.py
config = {
"api_key" : "your_api_key" ,
"network" : "mainnet" ,
"timeout" : 30000 ,
"retry_attempts" : 3 ,
"quality_threshold" : 0.9 ,
"max_cost" : 0.1 ,
"preferred_regions" : [ "us-east" , "eu-west" ],
"enable_caching" : True ,
"log_level" : "info" ,
"agent_preferences" : {
"text_generation" : {
"min_reputation" : 0.9 ,
"max_response_time" : 5000
}
}
}
# Load configuration
client = MeshAI( config = config)
Testing and Development
Mock Client for Testing
from meshai.testing import MockMeshAI
# Use mock client for testing
client = MockMeshAI()
# Mock responses
client.mock_response(
task_type = "text_generation" ,
response = TaskResult(
output = "Mocked response" ,
quality_score = 0.95 ,
cost = 0.001
)
)
# Test your code
result = client.execute_task( "text_generation" , "test input" )
assert result.output == "Mocked response"
Development Mode
# Enable development mode
client = MeshAI(
api_key = "test_key" ,
network = "testnet" ,
development_mode = True ,
verbose_logging = True
)
Best Practices
Always implement proper error handling for production applications:
Handle network timeouts gracefully
Implement retry logic for transient failures
Provide fallback mechanisms for critical features
Log errors for debugging and monitoring
Follow security best practices:
Store API keys securely (environment variables)
Validate and sanitize input data
Use HTTPS for all communications
Monitor for unusual usage patterns
Next Steps