Validate the Access token with FastAPI
Last reviewed: over 1 year ago
This tutorial covers how to validate that the Access JWT is on requests made to FastAPI apps.
Time to complete: 15 minutes
- A self-hosted Access application for your FastAPI app
- The AUD tag for your Access application
- In your FastAPI project, create a new file called
cloudflare.py
that contains the following code:
from fastapi import Request, HTTPException
# The Application Audience (AUD) tag for your applicationPOLICY_AUD = "XXXXX"
# Your CF Access team domainTEAM_DOMAIN = "https://<your-team-name>.cloudflareaccess.com"CERTS_URL = "{}/cdn-cgi/access/certs".format(TEAM_DOMAIN)
async def validate_cloudflare(request: Request): """ Validate that the request is authenticated by Cloudflare Access. """ if verify_token(request) != True: raise HTTPException(status_code=400, detail="Not authenticated properly!")
def _get_public_keys(): """ Returns: List of RSA public keys usable by PyJWT. """ r = requests.get(CERTS_URL) public_keys = [] jwk_set = r.json() for key_dict in jwk_set["keys"]: public_key = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(key_dict)) public_keys.append(public_key) return public_keys
def verify_token(request): """ Verify the token in the request. """ token = ""
if "CF_Authorization" in request.cookies: token = request.cookies["CF_Authorization"] else: raise HTTPException(status_code=400, detail="missing required cf authorization token")
keys = _get_public_keys()
# Loop through the keys since we can't pass the key set to the decoder valid_token = False for key in keys: try: # decode returns the claims that has the email when needed jwt.decode(token, key=key, audience=POLICY_AUD, algorithms=["RS256"]) valid_token = True break except: raise HTTPException(status_code=400, detail="Error decoding token") if not valid_token: raise HTTPException(status_code=400, detail="Invalid token")
return True
You can now add the validation function as a dependency in your FastAPI app. One way to do this is by creating an APIRouter
instance ↗. The following example executes the validation function on each request made to paths that start with /admin
:
from fastapi import APIRouter, Depends, HTTPExceptionfrom cloudflare import validate_cloudflare
router = APIRouter( prefix="/admin", tags=["admin"], dependencies=[Depends(validate_cloudflare)] responses={404: {"description": "Not found"}},)
@router.get("/")async def root(): return {"message": "Hello World"}