Skip to main content

Hands-On Tutorial

Your First Secure MCP Server

Build a working MCP server in Python with security baked in from line 1. Not bolted on after. Every step includes the secure version and the insecure version you should avoid.

Code editor showing a secure MCP server with step-by-step indicators

Prerequisites

  • Python 3.11+
  • Basic familiarity with Python functions and decorators
  • A terminal and a text editor
  • 10 minutes
1

Project Setup

terminal
mkdir my-mcp-server && cd my-mcp-server
python -m venv .venv
source .venv/bin/activate
pip install fastmcp pydantic

FastMCP is the most popular Python framework for building MCP servers. Pydantic handles input validation.

server.py
from fastmcp import FastMCP

mcp = FastMCP("my-secure-server")

if __name__ == "__main__":
    mcp.run()

That is a complete MCP server. It does nothing yet, but it boots, speaks the protocol, and is ready for tools.

2

Your First Tool (With Validation)

Let us build a tool that reads a file. The insecure version is terrifyingly common. The secure version is barely more code.

DO NOT DO THIS

@mcp.tool()
def read_file(path: str) -> str:
    with open(path) as f:
        return f.read()

This reads ANY file on the system. An agent can request /etc/passwd, ~/.ssh/id_rsa, or your .env file. No validation, no limits, no logging.

server.py (secure version)
import os
from pathlib import Path
from pydantic import BaseModel, field_validator

ALLOWED_DIR = Path(os.environ.get("MCP_WORKSPACE", "/tmp/workspace")).resolve()
MAX_FILE_SIZE = 1_000_000  # 1MB

class ReadFileInput(BaseModel):
    path: str

    @field_validator("path")
    @classmethod
    def validate_path(cls, v: str) -> str:
        resolved = (ALLOWED_DIR / v).resolve()
        if not str(resolved).startswith(str(ALLOWED_DIR)):
            raise ValueError("Path traversal detected")
        if not resolved.exists():
            raise ValueError("File not found")
        if resolved.stat().st_size > MAX_FILE_SIZE:
            raise ValueError("File too large")
        return str(resolved)

@mcp.tool()
def read_file(path: str) -> str:
    """Read a file from the workspace directory."""
    validated = ReadFileInput(path=path)
    with open(validated.path) as f:
        return f.read()

The path is resolved, checked against an allowlist directory, size-limited, and traversal-safe. All from a Pydantic validator.

3

Add Authentication

DO NOT DO THIS

# "I will add auth later"
# -- every developer who shipped an open server to prod

There is no later. If the server starts without auth, it is one misconfigured network rule away from being public.

server.py (auth via environment variable)
import os
import hmac

API_KEY = os.environ["MCP_API_KEY"]  # Crash on boot if missing

def verify_token(token: str) -> bool:
    """Constant-time comparison to prevent timing attacks."""
    return hmac.compare_digest(token, API_KEY)

# In your transport/middleware layer:
# if not verify_token(request.headers["Authorization"]):
#     raise PermissionError("Invalid API key")

The API key comes from an environment variable, not source code. The server refuses to start if it is missing. Comparison is constant-time.

4

Structured Logging

DO NOT DO THIS

print(f"Called read_file with {path}")
# or worse: no logging at all

print() is not structured, not searchable, and often includes secrets. No logging means you cannot investigate incidents.

server.py (structured audit logging)
import logging
import json
from datetime import datetime, timezone

logger = logging.getLogger("mcp.audit")
logger.setLevel(logging.INFO)

def audit_log(tool_name: str, params: dict, caller: str,
              success: bool, error: str | None = None):
    entry = {
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "tool": tool_name,
        "caller": caller,
        "params": {
            k: '***'
            if 'secret' in k.lower() or 'password' in k.lower()
            else v
            for k, v in params.items()
        },
        "success": success,
        "error": error,
    }
    logger.info(json.dumps(entry))

Every call is logged with structured JSON. Sensitive parameter names are auto-redacted. Both success and failure paths should call this.

5

Safe Error Handling

DO NOT DO THIS

@mcp.tool()
def query_db(sql: str) -> str:
    try:
        return db.execute(sql)
    except Exception as e:
        return str(e)  # Leaks table names, DB version

Returning raw exceptions to the client leaks internal details. Attackers use this to map your database schema.

server.py (safe error responses)
class ToolError(Exception):
    """User-facing error with no internal details."""
    pass

ALLOWED_QUERIES = {
    "user_count": "SELECT COUNT(*) FROM users",
    "recent_errors": "SELECT message FROM errors LIMIT 10",
}

@mcp.tool()
def query_db(query_name: str) -> str:
    """Run a pre-defined query by name."""
    if query_name not in ALLOWED_QUERIES:
        raise ToolError(
            f"Unknown query: {query_name}. "
            f"Available: {list(ALLOWED_QUERIES.keys())}"
        )
    try:
        result = db.execute(ALLOWED_QUERIES[query_name])
        return json.dumps(result)
    except Exception as e:
        logger.error(f"DB error in {query_name}: {e}")
        raise ToolError("Query failed. Check server logs.")

The client sees a generic error. The server logs the real cause. No raw SQL from the client -- only pre-defined query names.

The Complete Server

server.py (complete)
import os, hmac, json, logging
from pathlib import Path
from datetime import datetime, timezone
from pydantic import BaseModel, field_validator
from fastmcp import FastMCP

# --- Config (all from environment) ---
API_KEY = os.environ["MCP_API_KEY"]
ALLOWED_DIR = Path(
    os.environ.get("MCP_WORKSPACE", "/tmp/workspace")
).resolve()
MAX_FILE_SIZE = 1_000_000

# --- Logging ---
logger = logging.getLogger("mcp.audit")
logging.basicConfig(level=logging.INFO, format="%(message)s")

def audit_log(tool: str, params: dict, success: bool,
              error: str | None = None):
    safe = {
        k: '***' if any(
            s in k.lower()
            for s in ["secret", "password", "key", "token"]
        ) else v
        for k, v in params.items()
    }
    logger.info(json.dumps({
        "ts": datetime.now(timezone.utc).isoformat(),
        "tool": tool, "params": safe,
        "ok": success, "error": error
    }))

# --- Validation ---
class ReadFileInput(BaseModel):
    path: str

    @field_validator("path")
    @classmethod
    def validate_path(cls, v: str) -> str:
        resolved = (ALLOWED_DIR / v).resolve()
        if not str(resolved).startswith(str(ALLOWED_DIR)):
            raise ValueError("Path traversal detected")
        if not resolved.exists():
            raise ValueError("File not found")
        if resolved.stat().st_size > MAX_FILE_SIZE:
            raise ValueError("File exceeds size limit")
        return str(resolved)

# --- Server ---
mcp = FastMCP("my-secure-server")

@mcp.tool()
def read_file(path: str) -> str:
    """Read a file from the workspace directory."""
    try:
        validated = ReadFileInput(path=path)
        content = open(validated.path).read()
        audit_log("read_file", {"path": path}, True)
        return content
    except ValueError as e:
        audit_log("read_file", {"path": path}, False, str(e))
        raise
    except Exception as e:
        logger.error(f"Unexpected error in read_file: {e}")
        audit_log("read_file", {"path": path}, False, "internal")
        raise ValueError("Failed to read file.")

if __name__ == "__main__":
    mcp.run()

Keep Going

You have a working, secure MCP server. Here is what to learn next: