This guide covers everything you need to build custom plugins for the Datawizz AI Gateway. Plugins are HTTP endpoints that receive requests from the gateway, process them according to your business logic, and return responses that control the request flow. A plugin is simply an HTTP endpoint that:
  1. Receives a POST request with the current request state
  2. Processes the data according to your custom logic
  3. Returns a JSON response indicating whether to allow, reject, or modify the request
Plugins can be built in any language or framework that can expose an HTTP endpoint including Node.js (Express, Fastify, Hono), Python (Flask, FastAPI, Django) or any other web framework… You can run them on serverless platforms (AWS Lambda, Cloudflare Workers, Vercel Functions) or traditional servers.

Request Schema

When a plugin is invoked, the gateway sends a POST request with the following JSON payload:

Input Structure

messages: Array<{
    role: "user" | "assistant" | "system" | "tool";
    content: string | Array<any>;
    // ... other message fields
}>;
requestBody: {
    model?: string;
    temperature?: number;
    max_tokens?: number;
    stream?: boolean;
    // ... all original request body fields
    response?: any; // Only present in RESPONSE phase - contains the LLM's response
};
requestHeaders: Record<string, string>;
metadata: Record<string, any>;
configs: any; // Custom configuration you defined for this plugin instance
requestId: string; // Unique ID for this request (UUID)

Field Descriptions

FieldTypeDescription
messagesArray<Message>The messages array being processed. In REQUEST phase, this is what will be sent to the LLM. In RESPONSE phase, this includes the LLM’s response as the last message.
requestBodyobjectThe complete request body from the client, including all LLM parameters (model, temperature, etc.)
requestHeadersobjectAll HTTP headers from the client’s request
metadataobjectCustom metadata from the request. Can include prompt info, model provider details, etc.
configsanyThe custom configuration object you defined for this plugin instance in the endpoint configuration
requestIdstringA unique UUID identifying this specific request, useful for logging and correlation

Phase-Specific Considerations

REQUEST Phase:
  • messages contains the messages that will be sent to the LLM (after prompt template application)
  • requestBody contains the original client request parameters
  • metadata may include prompt information if a prompt template was used
RESPONSE Phase:
  • messages contains all input messages PLUS the LLM’s response message as the last element
  • requestBody.response contains the full LLM response object
  • metadata includes additional fields like modelProvider and promptInfo
LOG Phase:
  • Same as RESPONSE phase
  • Intended for non-blocking analytics, monitoring, or logging operations

Response Schema

Your plugin must return a JSON response matching this schema:

Output Structure

{
  reject?: boolean;           // Default: false
  rejectReason?: string;      // Opptional error message that will be returned to the client if rejecting
  dontRetry?: boolean;        // Default: false, prevents retries with other LLMs (only in RESPONSE phase)
  messages?: Array<Message>;  // Optional: modified messages array
  debug?: Array<string>;      // Optional: debug messages for logging
}

Field Descriptions

FieldTypeRequiredDefaultDescription
rejectbooleanNofalseSet to true to reject the request/response and stop pipeline execution
rejectReasonstringNo-Human-readable explanation for why the request was rejected. Returned to the client.
dontRetrybooleanNofalsePrevents retries with other LLMs (only in RESPONSE phase). If true, the gateway will not attempt to call alternative models if the response is rejected.
messagesArray<Message>No-Modified messages array. If provided, these messages replace the input messages for subsequent plugins.
debugArray<string>No-Debug messages that will be included in the gateway logs for troubleshooting

Response Validation

The gateway validates your plugin’s response against the schema above using Zod. If validation fails:
  • The plugin execution is marked as failed
  • The error is logged with the validation error message
  • The request continues (fail-open behavior) with the original, unmodified messages

Implementation Examples

Example 1: Simple Content Filter (Guardrail)

This plugin rejects requests containing prohibited words.
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List, Dict, Any, Optional

app = FastAPI()

class Message(BaseModel):
    role: str
    content: Any

class PluginRequest(BaseModel):
    messages: List[Message]
    requestBody: Dict[str, Any]
    requestHeaders: Dict[str, str]
    metadata: Dict[str, Any]
    configs: Any
    requestId: str

class PluginResponse(BaseModel):
    reject: bool = False
    rejectReason: Optional[str] = None
    messages: Optional[List[Message]] = None
    debug: Optional[List[str]] = None

PROHIBITED_WORDS = ["badword1", "badword2", "inappropriate"]

@app.post("/filter")
async def content_filter(request: PluginRequest) -> PluginResponse:
    debug_messages = []

    # Check all messages for prohibited content
    for msg in request.messages:
        if isinstance(msg.content, str):
            content_lower = msg.content.lower()
            for word in PROHIBITED_WORDS:
                if word in content_lower:
                    debug_messages.append(f"Found prohibited word: {word}")
                    return PluginResponse(
                        reject=True,
                        rejectReason=f"Content contains prohibited term: {word}",
                        debug=debug_messages
                    )

    debug_messages.append("Content passed filter")
    return PluginResponse(
        reject=False,
        debug=debug_messages
    )

Example 2: PII Redaction (Modification)

This plugin detects and redacts personally identifiable information.
import re
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List, Dict, Any, Optional

app = FastAPI()

class Message(BaseModel):
    role: str
    content: Any

    class Config:
        extra = "allow"  # Allow additional fields

class PluginRequest(BaseModel):
    messages: List[Message]
    requestBody: Dict[str, Any]
    requestHeaders: Dict[str, str]
    metadata: Dict[str, Any]
    configs: Any
    requestId: str

class PluginResponse(BaseModel):
    reject: bool = False
    rejectReason: Optional[str] = None
    messages: Optional[List[Dict[str, Any]]] = None
    debug: Optional[List[str]] = None

# Simple regex patterns (use more sophisticated detection in production)
EMAIL_PATTERN = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b')
PHONE_PATTERN = re.compile(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b')
SSN_PATTERN = re.compile(r'\b\d{3}-\d{2}-\d{4}\b')

def redact_pii(text: str) -> tuple[str, list[str]]:
    """Redact PII from text and return redacted text + list of redactions"""
    redactions = []

    if EMAIL_PATTERN.search(text):
        text = EMAIL_PATTERN.sub('[EMAIL_REDACTED]', text)
        redactions.append('email')

    if PHONE_PATTERN.search(text):
        text = PHONE_PATTERN.sub('[PHONE_REDACTED]', text)
        redactions.append('phone')

    if SSN_PATTERN.search(text):
        text = SSN_PATTERN.sub('[SSN_REDACTED]', text)
        redactions.append('ssn')

    return text, redactions

@app.post("/redact-pii")
async def pii_redaction(request: PluginRequest) -> PluginResponse:
    debug_messages = []
    modified_messages = []
    any_redactions = False

    # Process each message
    for msg in request.messages:
        msg_dict = msg.dict()

        if isinstance(msg.content, str):
            redacted_content, redactions = redact_pii(msg.content)

            if redactions:
                any_redactions = True
                msg_dict['content'] = redacted_content
                debug_messages.append(f"Redacted {', '.join(redactions)} from {msg.role} message")

        modified_messages.append(msg_dict)

    if any_redactions:
        debug_messages.append(f"Total messages processed: {len(modified_messages)}")
        return PluginResponse(
            reject=False,
            messages=modified_messages,
            debug=debug_messages
        )
    else:
        debug_messages.append("No PII detected")
        return PluginResponse(
            reject=False,
            debug=debug_messages
        )

Example 3: Response Quality Check (Guardrail)

This plugin validates that LLM responses meet quality standards.
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
import re

app = FastAPI()

class Message(BaseModel):
    role: str
    content: Any

    class Config:
        extra = "allow"

class PluginRequest(BaseModel):
    messages: List[Message]
    requestBody: Dict[str, Any]
    requestHeaders: Dict[str, str]
    metadata: Dict[str, Any]
    configs: Any
    requestId: str

class PluginResponse(BaseModel):
    reject: bool = False
    rejectReason: Optional[str] = None
    messages: Optional[List[Dict[str, Any]]] = None
    debug: Optional[List[str]] = None

@app.post("/quality-check")
async def quality_check(request: PluginRequest) -> PluginResponse:
    debug_messages = []

    # Get the assistant's response (last message in RESPONSE phase)
    if not request.messages or request.messages[-1].role != 'assistant':
        debug_messages.append('No assistant response found')
        return PluginResponse(reject=False, debug=debug_messages)

    last_message = request.messages[-1]
    response_text = (
        last_message.content
        if isinstance(last_message.content, str)
        else str(last_message.content)
    )

    # Quality checks
    min_length = request.configs.get('minLength', 10) if request.configs else 10
    max_length = request.configs.get('maxLength', 10000) if request.configs else 10000
    require_sources = request.configs.get('requireSources', False) if request.configs else False

    # Check minimum length
    if len(response_text) < min_length:
        debug_messages.append(f"Response too short: {len(response_text)} < {min_length}")
        return PluginResponse(
            reject=True,
            rejectReason='Response does not meet minimum length requirements',
            debug=debug_messages
        )

    # Check maximum length
    if len(response_text) > max_length:
        debug_messages.append(f"Response too long: {len(response_text)} > {max_length}")
        return PluginResponse(
            reject=True,
            rejectReason='Response exceeds maximum length',
            debug=debug_messages
        )

    # Check for sources if required
    if require_sources:
        has_sources = bool(re.search(r'\[source\]|\[citation\]|source:|reference:', response_text, re.IGNORECASE))
        if not has_sources:
            debug_messages.append('Response missing required sources')
            return PluginResponse(
                reject=True,
                rejectReason='Response must include source citations',
                debug=debug_messages
            )

    debug_messages.append('Response passed all quality checks')
    return PluginResponse(reject=False, debug=debug_messages)

Example 4: Context Enhancement (Modification)

This plugin adds additional context to user requests.
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
from datetime import datetime

app = FastAPI()

class Message(BaseModel):
    role: str
    content: Any

    class Config:
        extra = "allow"

class PluginRequest(BaseModel):
    messages: List[Message]
    requestBody: Dict[str, Any]
    requestHeaders: Dict[str, str]
    metadata: Dict[str, Any]
    configs: Any
    requestId: str

class PluginResponse(BaseModel):
    reject: bool = False
    rejectReason: Optional[str] = None
    messages: Optional[List[Dict[str, Any]]] = None
    debug: Optional[List[str]] = None

@app.post("/add-context")
async def add_context(request: PluginRequest) -> PluginResponse:
    debug_messages = []
    modified_messages = []

    # Add system message with context at the beginning
    context_message = {
        "role": "system",
        "content": f"""Current date and time: {datetime.utcnow().isoformat()}Z

Additional context from configs:
- Organization: {request.configs.get('organization', 'N/A')}
- User tier: {request.configs.get('userTier', 'standard')}
- Special instructions: {request.configs.get('instructions', 'None')}

Please use this context when formulating your response."""
    }

    modified_messages.append(context_message)
    debug_messages.append("Added system context message")

    # Add all original messages
    for msg in request.messages:
        modified_messages.append(msg.dict())

    debug_messages.append(f"Total messages: {len(modified_messages)}")

    return PluginResponse(
        reject=False,
        messages=modified_messages,
        debug=debug_messages
    )

Testing Your Plugin

Local Testing

Before deploying your plugin, test it locally using curl or any HTTP client:
curl -X POST http://localhost:3000/your-endpoint \
  -H "Content-Type: application/json" \
  -d '{
    "messages": [
      {
        "role": "user",
        "content": "Test message with badword1"
      }
    ],
    "requestBody": {
      "model": "gpt-4",
      "temperature": 0.7
    },
    "requestHeaders": {
      "content-type": "application/json"
    },
    "metadata": {},
    "configs": {
      "customSetting": "value"
    },
    "requestId": "test-request-123"
  }'
Expected response:
{
  "reject": true,
  "rejectReason": "Content contains prohibited term: badword1",
  "debug": ["Found prohibited word: badword1"]
}

Integration Testing

Once your plugin is deployed and configured in the gateway:
  1. Monitor the gateway logs for plugin execution messages
  2. Check the debug array in your responses - these will appear in gateway logs
  3. Use the gateway’s inference logs to see plugin execution times and results
  4. Test timeout and retry behavior by simulating slow responses or failures

Best Practices

Performance

Response Time:
  • Aim for plugin response times under 100ms for REQUEST phase
  • RESPONSE phase plugins can be slightly slower (under 500ms)
  • Use LOG phase for any operations that can be async (analytics, slow external APIs)
Optimization Tips:
  • Cache frequently used data (e.g., ML models, lookup tables)
  • Use connection pooling for database queries
  • Implement circuit breakers for external API calls
  • Consider using async/parallel processing internally

Error Handling

Fail Gracefully:
@app.post("/my-plugin")
async def my_plugin(request: PluginRequest) -> PluginResponse:
    try:
        # Your plugin logic
        result = process_messages(request.messages)
        return PluginResponse(
            reject=False,
            messages=result,
            debug=["Processing successful"]
        )
    except Exception as e:
        # Log the error internally
        print(f"Plugin error: {str(e)}")

        # Return a safe response (fail-open)
        # Gateway will continue with original messages
        return PluginResponse(
            reject=False,
            debug=[f"Plugin error (non-critical): {str(e)}"]
        )
Timeout Handling:
  • Set appropriate timeout values in your plugin configuration
  • Ensure your plugin respects the timeout and fails fast
  • Use async operations to avoid blocking

Security

As your plugins will be exposed over the internet, it’s crucial to implement robust security measures. We recommend checking for a secret header, which you can configure in the Datawizz dashboard when setting up your plugin endpoint. Authentication:
from fastapi import FastAPI, Header, HTTPException

@app.post("/secure-plugin")
async def secure_plugin(
    request: PluginRequest,
    authorization: str = Header(None)
):
    # Validate authorization header
    if not authorization or not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Unauthorized")

    token = authorization.replace("Bearer ", "")
    if token != EXPECTED_TOKEN:
        raise HTTPException(status_code=403, detail="Forbidden")

    # Process request...
Configure the authorization header in your plugin settings in the gateway dashboard. Input Validation:
  • Always validate the structure of incoming requests
  • Sanitize any data before using it in queries or external API calls
  • Be cautious with the configs field - validate expected types
Secrets Management:
  • Never hardcode API keys or secrets in your plugin code
  • Use environment variables or secret management services
  • Rotate credentials regularly

Observability

Logging:
import logging

logger = logging.getLogger(__name__)

@app.post("/my-plugin")
async def my_plugin(request: PluginRequest) -> PluginResponse:
    logger.info(f"Processing request {request.requestId}")

    debug_messages = []

    # Add useful debug info
    debug_messages.append(f"Message count: {len(request.messages)}")
    debug_messages.append(f"Model: {request.requestBody.get('model', 'unknown')}")

    # Your logic here...

    logger.info(f"Completed request {request.requestId}")

    return PluginResponse(
        reject=False,
        debug=debug_messages
    )
Metrics:
  • Track plugin execution time
  • Monitor rejection rates
  • Alert on error rates
  • Track resource usage (CPU, memory, network)

Message Handling

Preserve Message Structure:
# Good: Preserve all message fields
msg_dict = msg.dict()  # or msg.model_dump() in Pydantic v2
msg_dict['content'] = modified_content
modified_messages.append(msg_dict)

# Bad: Only copying some fields
modified_messages.append({
    'role': msg.role,
    'content': modified_content
    # Missing other fields!
})
Handle Different Content Types:
def process_content(content: Any) -> Any:
    if isinstance(content, str):
        # Simple string content
        return process_string(content)
    elif isinstance(content, list):
        # Multimodal content (text + images, etc.)
        return process_multimodal(content)
    else:
        # Unknown format, return as-is
        return content

Troubleshooting

Common Issues

“Plugin response validation failed”
  • Check that your response matches the expected schema exactly
  • Ensure reject is a boolean, not a string
  • Ensure debug is an array of strings, not a single string
  • Verify messages is an array if provided
“Plugin timeout after Xms”
  • Your plugin is taking longer than the configured timeout
  • Optimize your plugin’s processing time
  • Increase the timeout value in plugin configuration
  • Move slow operations to LOG phase if possible
“Plugin returned status 500”
  • Your plugin threw an unhandled exception
  • Check your plugin’s logs for error details
  • Implement proper error handling
Messages not being modified
  • Ensure you’re returning a messages array in your response
  • Verify the array contains valid message objects
  • Check that you’re not accidentally returning the original messages reference
Plugin rejections not working
  • Ensure reject: true is present in response
  • Include a rejectReason string
  • Check that the response is valid JSON

Next Steps

Now that you understand how to build plugins:
  1. Implement a simple plugin following one of the examples above
  2. Test it locally with sample requests
  3. Deploy it to your hosting platform
  4. Configure it in the Datawizz dashboard
  5. Monitor its performance and iterate
For more information, see: