Skip to main content

Overview

When you start a Live Monitor job, Trio sends webhook notifications to your server when conditions are met. This guide explains webhook payloads and best practices for handling them.

Webhook Events

Your webhook URL receives POST requests with different event types depending on what happened:

Condition Triggered

Sent when your condition is detected as true:
{
  "type": "live_monitor_triggered",
  "timestamp": "2024-01-26T10:35:00Z",
  "source_url": "https://youtube.com/watch?v=abc123",
  "data": {
    "condition": "Is there a traffic accident?",
    "triggered": true,
    "explanation": "A multi-car collision is visible on the highway. Emergency vehicles are responding.",
    "frame_b64": "base64-encoded-jpeg..."
  }
}
Fields:
  • timestamp: When the condition was detected
  • source_url: The stream URL being monitored
  • triggered: Always true for this event
  • explanation: VLM’s detailed observation of what was detected
  • frame_b64: The actual frame that triggered the condition (useful for verification)

Job Status Change

Sent when the job stops (either manually or auto-stop after 10 minutes):
{
  "type": "job_status",
  "job_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "status": "stopped",
  "details": {
    "checks_performed": 45,
    "triggers_fired": 3,
    "frames_skipped": 112,
    "auto_stopped": true,
    "reason": "max_duration_reached",
    "elapsed_seconds": 600
  }
}
Fields:
  • status: Current job status (pending, running, stopped, completed, failed)
  • auto_stopped: Whether the job stopped due to 10-minute limit
  • reason: Why the job stopped (max_duration_reached, condition_triggered, etc.)
  • elapsed_seconds: Total runtime if auto-stopped

Handling Webhooks

Basic Setup

from fastapi import FastAPI, Request
import httpx

app = FastAPI()

@app.post("/webhook")
async def handle_webhook(request: Request):
    payload = await request.json()

    # Handle condition trigger
    if payload.get("type") == "live_monitor_triggered":
        data = payload["data"]
        print(f"Condition matched: {data['explanation']}")
        # Send alert, save to database, etc.

    # Handle job status changes
    elif payload.get("type") == "job_status":
        status = payload["status"]
        if status == "stopped":
            print(f"Job stopped: {payload['details']['reason']}")

    return {"status": "ok"}

Continuous Monitoring with Auto-Restart

Since jobs auto-stop after 10 minutes, you’ll want to restart them:
import httpx
from fastapi import BackgroundTasks

TRIO_API = "https://trio.machinefi.com"

@app.post("/webhook")
async def handle_webhook(request: Request, background_tasks: BackgroundTasks):
    payload = await request.json()

    # Restart job if it auto-stopped
    if payload.get("type") == "job_status":
        details = payload.get("details", {})
        if details.get("auto_stopped"):
            job_id = payload["job_id"]
            print(f"Job {job_id} auto-stopped, restarting...")
            background_tasks.add_task(restart_job, job_id)

    return {"status": "ok"}

async def restart_job(job_id: str):
    # Get job details to know what to restart
    response = await httpx.AsyncClient().get(
        f"{TRIO_API}/jobs/{job_id}"
    )
    # Then restart the job with the same configuration

Best Practices

1. Validate Webhook Authenticity

In production, validate that webhooks are actually from Trio:
import hmac
import hashlib

WEBHOOK_SECRET = "your-secret-key"

@app.post("/webhook")
async def handle_webhook(request: Request):
    signature = request.headers.get("X-Trio-Signature")
    body = await request.body()

    # Verify signature
    expected = hmac.new(
        WEBHOOK_SECRET.encode(),
        body,
        hashlib.sha256
    ).hexdigest()

    if signature != expected:
        return {"error": "Invalid signature"}, 401

    payload = await request.json()
    # Process webhook...

2. Handle Timeouts

Webhooks should respond quickly (< 5 seconds). Do heavy work asynchronously:
from fastapi import BackgroundTasks

@app.post("/webhook")
async def handle_webhook(request: Request, background_tasks: BackgroundTasks):
    payload = await request.json()

    # Return immediately
    background_tasks.add_task(process_trigger, payload)
    return {"status": "received"}

async def process_trigger(payload):
    # Heavy work: send SMS, database writes, etc.
    # This runs in background
    pass

3. Idempotent Processing

Webhooks may be delivered multiple times. Make your handlers idempotent:
@app.post("/webhook")
async def handle_webhook(request: Request):
    payload = await request.json()
    event_id = payload.get("timestamp")

    # Check if we've already processed this
    if already_processed(event_id):
        return {"status": "already_processed"}

    # Process and mark as done
    mark_as_processed(event_id)
    return {"status": "ok"}

4. Log Everything

Keep detailed logs for debugging:
import logging

logger = logging.getLogger(__name__)

@app.post("/webhook")
async def handle_webhook(request: Request):
    payload = await request.json()

    logger.info(f"Webhook received: {payload.get('type')}")

    try:
        # Process webhook
        pass
    except Exception as e:
        logger.error(f"Webhook processing failed: {e}", exc_info=True)
        raise

Troubleshooting

Webhooks not arriving?

  1. Check that your webhook URL is publicly accessible
  2. Verify your firewall isn’t blocking Trio’s requests
  3. Check Job Status to see if the job is still running
  4. Review Logs for “webhooks_failed” entries

Webhook payload missing frame?

You need to request the frame when creating the job:
curl -X POST https://trio.machinefi.com/live-monitor \
  -H "Content-Type: application/json" \
  -d '{
    "stream_url": "https://youtube.com/watch?v=abc123",
    "condition": "Is there an accident?",
    "webhook_url": "https://your-server.com/webhook",
    "include_frame": true
  }'
Wait - that’s not a live-monitor parameter. The frame is always included. Check your webhook receiver is actually receiving the frame_b64 field.