The Retire-Cluster Task Execution Framework provides a powerful, distributed task execution system that can intelligently distribute computational work across your cluster of devices.
The framework consists of several key components:
from retire_cluster.tasks import Task, TaskRequirements, TaskPriority
# Create a simple task
task = Task(
task_type="echo",
payload={"message": "Hello World!"},
priority=TaskPriority.NORMAL
)
# Submit to cluster
task_id = scheduler.submit_task(task)
# Task that requires specific device capabilities
compute_task = Task(
task_type="python_eval",
payload={"expression": "sum(range(1000000))"},
requirements=TaskRequirements(
min_cpu_cores=4,
min_memory_gb=8,
required_platform="linux",
timeout_seconds=60
)
)
# Task targeted at mobile devices
mobile_task = Task(
task_type="system_info",
payload={},
requirements=TaskRequirements(
required_role="mobile",
required_platform="android"
)
)
The framework includes several built-in task types:
Task(
task_type="echo",
payload={"message": "Your message here"}
)
Returns the payload as-is. Useful for testing connectivity.
Task(
task_type="sleep",
payload={"duration": 5.0}
)
Sleeps for the specified duration in seconds.
Task(task_type="system_info", payload={})
Returns device capabilities and system information.
Task(
task_type="python_eval",
payload={"expression": "2 + 2"}
)
Evaluates a Python expression safely. Limited for security.
Task(
task_type="http_request",
payload={
"url": "https://api.example.com/data",
"method": "GET",
"headers": {"Authorization": "Bearer token"},
"timeout": 30
}
)
Makes HTTP requests. Useful for web scraping or API calls.
Task(
task_type="command",
payload={
"command": "ls -la",
"timeout": 10
}
)
Executes shell commands. Use with caution for security.
You can register custom task handlers on worker nodes:
def my_custom_handler(payload):
# Your custom logic here
input_data = payload.get('input')
result = process_data(input_data)
return {"output": result}
# Register on worker
executor.register_handler("my_custom_task", my_custom_handler)
TaskRequirements(
min_cpu_cores=4, # Minimum CPU cores
min_memory_gb=8, # Minimum RAM in GB
min_storage_gb=100, # Minimum storage in GB
gpu_required=True, # Requires GPU
internet_required=True # Requires internet access
)
TaskRequirements(
required_platform="linux", # "windows", "linux", "android", "darwin"
required_role="compute", # "worker", "compute", "storage", "mobile"
required_tags=["gpu-capable"] # Custom device tags
)
TaskRequirements(
timeout_seconds=300, # Maximum execution time
max_retries=3 # Maximum retry attempts
)
Tasks can be assigned different priority levels:
TaskPriority.URGENT # Highest priority
TaskPriority.HIGH # High priority
TaskPriority.NORMAL # Normal priority (default)
TaskPriority.LOW # Lowest priority
Higher priority tasks are scheduled first.
Tasks go through several status states:
PENDING: Task created but not yet queuedQUEUED: Task waiting for available deviceASSIGNED: Task assigned to specific deviceRUNNING: Task currently executingSUCCESS: Task completed successfullyFAILED: Task failed with errorCANCELLED: Task was cancelledTIMEOUT: Task exceeded timeout limitstatus = scheduler.get_task_status(task_id)
print(f"Task status: {status.value}")
result = scheduler.get_task_result(task_id)
if result:
if result.status == TaskStatus.SUCCESS:
print(f"Result: {result.result_data}")
else:
print(f"Error: {result.error_message}")
stats = scheduler.get_cluster_statistics()
print(f"Online devices: {stats['cluster_stats']['online_devices']}")
print(f"Queue size: {stats['queue_stats']['total_tasks']}")
The scheduler automatically balances tasks across available devices based on:
max_retries)Tasks of the same type are preferentially assigned to the same device to improve:
Use Temporal for complex workflow orchestration:
from retire_cluster.tasks.integrations import TemporalIntegration
temporal = TemporalIntegration(cluster_client)
await temporal.initialize("localhost:7233")
# Submit task via Temporal workflow
workflow_id = temporal.submit_task("data_processing", {"input": "data"})
Use Celery as a task broker:
from retire_cluster.tasks.integrations import CeleryIntegration
celery = CeleryIntegration(cluster_client)
celery.initialize("redis://localhost:6379")
# Submit via Celery
task_id = celery.submit_task("analysis", {"dataset": "data.csv"})
Expose task submission via HTTP:
from retire_cluster.tasks.integrations import SimpleTaskBridge
bridge = SimpleTaskBridge(cluster_client)
bridge.start_http_bridge() # Starts HTTP server on port 8081
# Submit via HTTP POST
# curl -X POST http://localhost:8081/tasks \
# -H "Content-Type: application/json" \
# -d '{"task_type": "echo", "payload": {"message": "Hello"}}'
The task framework integrates with the Retire-Cluster CLI:
# Start main node with task scheduling
retire-cluster-main --port 8080
# Start worker that can execute tasks
retire-cluster-worker --join 192.168.1.100:8080 --role compute
# Monitor cluster and tasks
retire-cluster-status 192.168.1.100:8080
echo: Always safesleep: Safesystem_info: Safe (read-only)http_request: Generally safe, but review URLspython_eval: Limited expression evaluation, but still potentially riskycommand: Can execute arbitrary commands - use with extreme cautionTasks stuck in QUEUED status
Tasks failing immediately
Poor performance
Enable debug logging:
logger = get_logger("scheduler", level="DEBUG")
Check queue statistics:
stats = task_queue.get_queue_statistics()
print(stats)
Monitor device status:
online_devices = scheduler.get_online_devices()
for device_id in online_devices:
capabilities = scheduler.get_device_capabilities(device_id)
print(f"{device_id}: {capabilities}")
See examples/task_execution_example.py for a comprehensive demonstration of the task execution framework in action.
The example shows: