Hands-On Tutorial
Your First Secure MCP Server
Build a working MCP server in Python with security baked in from line 1. Not bolted on after. Every step includes the secure version and the insecure version you should avoid.
Prerequisites
- Python 3.11+
- Basic familiarity with Python functions and decorators
- A terminal and a text editor
- 10 minutes
Project Setup
mkdir my-mcp-server && cd my-mcp-server
python -m venv .venv
source .venv/bin/activate
pip install fastmcp pydanticFastMCP is the most popular Python framework for building MCP servers. Pydantic handles input validation.
from fastmcp import FastMCP
mcp = FastMCP("my-secure-server")
if __name__ == "__main__":
mcp.run()That is a complete MCP server. It does nothing yet, but it boots, speaks the protocol, and is ready for tools.
Your First Tool (With Validation)
Let us build a tool that reads a file. The insecure version is terrifyingly common. The secure version is barely more code.
DO NOT DO THIS
@mcp.tool()
def read_file(path: str) -> str:
with open(path) as f:
return f.read()This reads ANY file on the system. An agent can request /etc/passwd, ~/.ssh/id_rsa, or your .env file. No validation, no limits, no logging.
import os
from pathlib import Path
from pydantic import BaseModel, field_validator
ALLOWED_DIR = Path(os.environ.get("MCP_WORKSPACE", "/tmp/workspace")).resolve()
MAX_FILE_SIZE = 1_000_000 # 1MB
class ReadFileInput(BaseModel):
path: str
@field_validator("path")
@classmethod
def validate_path(cls, v: str) -> str:
resolved = (ALLOWED_DIR / v).resolve()
if not str(resolved).startswith(str(ALLOWED_DIR)):
raise ValueError("Path traversal detected")
if not resolved.exists():
raise ValueError("File not found")
if resolved.stat().st_size > MAX_FILE_SIZE:
raise ValueError("File too large")
return str(resolved)
@mcp.tool()
def read_file(path: str) -> str:
"""Read a file from the workspace directory."""
validated = ReadFileInput(path=path)
with open(validated.path) as f:
return f.read()The path is resolved, checked against an allowlist directory, size-limited, and traversal-safe. All from a Pydantic validator.
Add Authentication
DO NOT DO THIS
# "I will add auth later"
# -- every developer who shipped an open server to prodThere is no later. If the server starts without auth, it is one misconfigured network rule away from being public.
import os
import hmac
API_KEY = os.environ["MCP_API_KEY"] # Crash on boot if missing
def verify_token(token: str) -> bool:
"""Constant-time comparison to prevent timing attacks."""
return hmac.compare_digest(token, API_KEY)
# In your transport/middleware layer:
# if not verify_token(request.headers["Authorization"]):
# raise PermissionError("Invalid API key")The API key comes from an environment variable, not source code. The server refuses to start if it is missing. Comparison is constant-time.
Structured Logging
DO NOT DO THIS
print(f"Called read_file with {path}")
# or worse: no logging at allprint() is not structured, not searchable, and often includes secrets. No logging means you cannot investigate incidents.
import logging
import json
from datetime import datetime, timezone
logger = logging.getLogger("mcp.audit")
logger.setLevel(logging.INFO)
def audit_log(tool_name: str, params: dict, caller: str,
success: bool, error: str | None = None):
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"tool": tool_name,
"caller": caller,
"params": {
k: '***'
if 'secret' in k.lower() or 'password' in k.lower()
else v
for k, v in params.items()
},
"success": success,
"error": error,
}
logger.info(json.dumps(entry))Every call is logged with structured JSON. Sensitive parameter names are auto-redacted. Both success and failure paths should call this.
Safe Error Handling
DO NOT DO THIS
@mcp.tool()
def query_db(sql: str) -> str:
try:
return db.execute(sql)
except Exception as e:
return str(e) # Leaks table names, DB versionReturning raw exceptions to the client leaks internal details. Attackers use this to map your database schema.
class ToolError(Exception):
"""User-facing error with no internal details."""
pass
ALLOWED_QUERIES = {
"user_count": "SELECT COUNT(*) FROM users",
"recent_errors": "SELECT message FROM errors LIMIT 10",
}
@mcp.tool()
def query_db(query_name: str) -> str:
"""Run a pre-defined query by name."""
if query_name not in ALLOWED_QUERIES:
raise ToolError(
f"Unknown query: {query_name}. "
f"Available: {list(ALLOWED_QUERIES.keys())}"
)
try:
result = db.execute(ALLOWED_QUERIES[query_name])
return json.dumps(result)
except Exception as e:
logger.error(f"DB error in {query_name}: {e}")
raise ToolError("Query failed. Check server logs.")The client sees a generic error. The server logs the real cause. No raw SQL from the client -- only pre-defined query names.
The Complete Server
import os, hmac, json, logging
from pathlib import Path
from datetime import datetime, timezone
from pydantic import BaseModel, field_validator
from fastmcp import FastMCP
# --- Config (all from environment) ---
API_KEY = os.environ["MCP_API_KEY"]
ALLOWED_DIR = Path(
os.environ.get("MCP_WORKSPACE", "/tmp/workspace")
).resolve()
MAX_FILE_SIZE = 1_000_000
# --- Logging ---
logger = logging.getLogger("mcp.audit")
logging.basicConfig(level=logging.INFO, format="%(message)s")
def audit_log(tool: str, params: dict, success: bool,
error: str | None = None):
safe = {
k: '***' if any(
s in k.lower()
for s in ["secret", "password", "key", "token"]
) else v
for k, v in params.items()
}
logger.info(json.dumps({
"ts": datetime.now(timezone.utc).isoformat(),
"tool": tool, "params": safe,
"ok": success, "error": error
}))
# --- Validation ---
class ReadFileInput(BaseModel):
path: str
@field_validator("path")
@classmethod
def validate_path(cls, v: str) -> str:
resolved = (ALLOWED_DIR / v).resolve()
if not str(resolved).startswith(str(ALLOWED_DIR)):
raise ValueError("Path traversal detected")
if not resolved.exists():
raise ValueError("File not found")
if resolved.stat().st_size > MAX_FILE_SIZE:
raise ValueError("File exceeds size limit")
return str(resolved)
# --- Server ---
mcp = FastMCP("my-secure-server")
@mcp.tool()
def read_file(path: str) -> str:
"""Read a file from the workspace directory."""
try:
validated = ReadFileInput(path=path)
content = open(validated.path).read()
audit_log("read_file", {"path": path}, True)
return content
except ValueError as e:
audit_log("read_file", {"path": path}, False, str(e))
raise
except Exception as e:
logger.error(f"Unexpected error in read_file: {e}")
audit_log("read_file", {"path": path}, False, "internal")
raise ValueError("Failed to read file.")
if __name__ == "__main__":
mcp.run()Keep Going
You have a working, secure MCP server. Here is what to learn next: