-
Notifications
You must be signed in to change notification settings - Fork 33
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #24 from data-govt-nz/fix/csrf_implementation
Replace csrf plugin with Queensland Government solution
- Loading branch information
Showing
4 changed files
with
222 additions
and
52 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
"""Provides a filter to prevent Cross-Site Request Forgery, | ||
based on the Double Submit Cookie pattern, | ||
https://github.com/OWASP/CheatSheetSeries/blob/master/cheatsheets/Cross-Site_Request_Forgery_Prevention_Cheat_Sheet.md#double-submit-cookie | ||
This is integrated in the CSRFMiddleware | ||
""" | ||
import ckan.lib.base as base | ||
import re | ||
from re import IGNORECASE, MULTILINE | ||
from logging import getLogger | ||
from ckan.common import request | ||
|
||
LOG = getLogger(__name__) | ||
|
||
RAW_RENDER = base.render | ||
RAW_RENDER_JINJA = base.render_jinja2 | ||
RAW_BEFORE = base.BaseController.__before__ | ||
|
||
""" Used as the cookie name and input field name. | ||
""" | ||
TOKEN_FIELD_NAME = 'token' | ||
""" Used to rotate the token cookie periodically. | ||
If the freshness cookie doesn't appear, the token cookie is still OK, | ||
but we'll set a new one for next time. | ||
""" | ||
TOKEN_FRESHNESS_COOKIE_NAME = 'token-fresh' | ||
|
||
# We need to edit confirm-action links, which get intercepted by JavaScript, | ||
#regardless of which order their 'data-module' and 'href' attributes appear. | ||
CONFIRM_LINK = re.compile(r'(<a [^>]*data-module=["\']confirm-action["\'][^>]*href=["\']([^"\']+))(["\'])', IGNORECASE | MULTILINE) | ||
CONFIRM_LINK_REVERSED = re.compile(r'(<a [^>]*href=["\']([^"\']+))(["\'][^>]*data-module=["\']confirm-action["\'])', IGNORECASE | MULTILINE) | ||
|
||
""" | ||
This will match a POST form that has whitespace after the opening tag (which all existing forms do). | ||
Once we have injected a token immediately after the opening tag, | ||
it won't match any more, which avoids redundant injection. | ||
""" | ||
POST_FORM = re.compile(r'(<form [^>]*method=["\']post["\'][^>]*>)([^<]*\s<)', IGNORECASE | MULTILINE) | ||
|
||
"""The format of the token HTML field. | ||
""" | ||
HEX_PATTERN=re.compile(r'^[0-9a-z]+$') | ||
TOKEN_PATTERN = r'<input type="hidden" name="' + TOKEN_FIELD_NAME + '" value="{token}"/>' | ||
TOKEN_SEARCH_PATTERN = re.compile(TOKEN_PATTERN.format(token=r'([0-9a-f]+)')) | ||
API_URL = re.compile(r'^/api\b.*') | ||
|
||
|
||
def apply_token(html, token): | ||
""" Rewrite HTML to insert tokens if applicable. | ||
""" | ||
|
||
token_match = TOKEN_SEARCH_PATTERN.search(html) | ||
if token_match: | ||
token = token_match.group(1) | ||
|
||
def insert_form_token(form_match): | ||
return form_match.group(1) + TOKEN_PATTERN.format(token=token) + form_match.group(2) | ||
|
||
def insert_link_token(link_match): | ||
if '?' in link_match.group(2): | ||
separator = '&' | ||
else: | ||
separator = '?' | ||
return link_match.group(1) + separator + TOKEN_FIELD_NAME + '=' + token + link_match.group(3) | ||
|
||
return CONFIRM_LINK_REVERSED.sub(insert_link_token, CONFIRM_LINK.sub(insert_link_token, POST_FORM.sub(insert_form_token, html))) | ||
|
||
|
||
def get_cookie_token(request): | ||
"""Retrieve the token expected by the server. | ||
This will be retrieved from the 'token' cookie, if it exists. | ||
If not, an error will occur. | ||
""" | ||
token = None | ||
if request.cookies.has_key(TOKEN_FIELD_NAME): | ||
LOG.debug("Obtaining token from cookie") | ||
token = request.cookies.get(TOKEN_FIELD_NAME) | ||
if token is None or token.strip() == "": | ||
csrf_fail("CSRF token is blank") | ||
return token | ||
|
||
|
||
def get_response_token(request, response): | ||
"""Retrieve the token to be injected into pages. | ||
This will be retrieved from the 'token' cookie, if it exists and is fresh. | ||
If not, a new token will be generated and a new cookie set. | ||
""" | ||
# ensure that the same token is used when a page is assembled from pieces | ||
if TOKEN_FIELD_NAME in request.cookies and TOKEN_FRESHNESS_COOKIE_NAME in request.cookies: | ||
LOG.debug("Obtaining token from cookie") | ||
token = request.cookies.get(TOKEN_FIELD_NAME) | ||
if not HEX_PATTERN.match(token): | ||
LOG.debug("Invalid cookie token; making new token cookie") | ||
token = create_response_token(response) | ||
else: | ||
LOG.debug("No fresh token found; making new token cookie") | ||
token = create_response_token(response) | ||
return token | ||
|
||
|
||
def create_response_token(response): | ||
import binascii, os | ||
token = binascii.hexlify(os.urandom(32)) | ||
response.set_cookie(TOKEN_FIELD_NAME, token, secure=True, httponly=True) | ||
response.set_cookie(TOKEN_FRESHNESS_COOKIE_NAME, '1', max_age=600, secure=True, httponly=True) | ||
return token | ||
|
||
|
||
def csrf_fail(message): | ||
from flask import abort | ||
LOG.error(message) | ||
abort(403, "Your form submission could not be validated") | ||
|
||
|
||
def get_post_token(request): | ||
"""Retrieve the token provided by the client. | ||
This is normally a single 'token' parameter in the POST body. | ||
However, for compatibility with 'confirm-action' links, | ||
it is also acceptable to provide the token as a query string parameter, | ||
if there is no POST body. | ||
""" | ||
if request.environ['webob.adhoc_attrs'].has_key(TOKEN_FIELD_NAME): | ||
return request.token | ||
|
||
# handle query string token if there are no POST parameters | ||
# this is needed for the 'confirm-action' JavaScript module | ||
if not request.POST and len(request.GET.getall(TOKEN_FIELD_NAME)) == 1: | ||
request.token = request.GET.getone(TOKEN_FIELD_NAME) | ||
del request.GET[TOKEN_FIELD_NAME] | ||
return request.token | ||
|
||
postTokens = request.POST.getall(TOKEN_FIELD_NAME) | ||
if not postTokens: | ||
csrf_fail("Missing CSRF token in form submission") | ||
elif len(postTokens) > 1: | ||
csrf_fail("More than one CSRF token in form submission") | ||
else: | ||
request.token = postTokens[0] | ||
|
||
# drop token from request so it doesn't populate resource extras | ||
del request.POST[TOKEN_FIELD_NAME] | ||
|
||
return request.token | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,95 +1,122 @@ | ||
import os | ||
import codecs | ||
|
||
import anti_csrf | ||
import webob | ||
from webob.exc import HTTPForbidden | ||
|
||
from ckanext.security.cache.clients import CSRFClient | ||
|
||
import logging | ||
log = logging.getLogger(__name__) | ||
|
||
try: | ||
from hmac import compare_digest | ||
except ImportError: | ||
def compare_digest(a, b): | ||
return a == b | ||
log = logging.getLogger(__name__) | ||
|
||
|
||
CSRF_ERR = 'CSRF authentication failed. Token missing or invalid.' | ||
|
||
|
||
class Request(webob.Request): | ||
def __init__(self, environ): | ||
super(Request, self).__init__(environ) | ||
self.token = self._get_post_token() | ||
|
||
def is_secure(self): | ||
# allow requests which have the x-forwarded-proto of https (inserted by nginx) | ||
if self.headers.get('X-Forwarded-Proto') == 'https': | ||
return True | ||
return True | ||
return self.scheme == 'https' | ||
|
||
def is_safe(self): | ||
"Check if the request is 'safe', if the request is safe it will not be checked for csrf" | ||
# api requests are exempt from csrf checks | ||
if self.path.startswith("/api"): | ||
return True | ||
|
||
# get/head/options/trace are exempt from csrf checks | ||
return self.method in ('GET', 'HEAD', 'OPTIONS', 'TRACE') | ||
|
||
def good_referer(self): | ||
def good_referer(self, domain): | ||
"Returns true if the referrer is https and matching the host" | ||
if not self.referer: | ||
return False | ||
return False | ||
else: | ||
match = "https://{}/".format(self.host) | ||
match = "https://{}".format(domain) | ||
return self.referer.startswith(match) | ||
|
||
def good_origin(self, domain): | ||
""" | ||
checks if the origin header is present and matches the header" | ||
:param domain: string: the expected origin domain | ||
:return: boolean: true if the origin header is present and matches the expected domain | ||
""" | ||
origin = self.headers.get('origin', None) | ||
if not origin: | ||
log.warning("Potentially unsafe CSRF request is missing the origin header") | ||
return True | ||
else: | ||
match = "https://{}".format(domain) | ||
return origin.startswith(match) | ||
|
||
def _get_post_token(self): | ||
"""Retrieve the token provided by the client. Or return None if not present | ||
This is normally a single 'token' parameter in the POST body. | ||
However, for compatibility with 'confirm-action' links, | ||
it is also acceptable to provide the token as a query string parameter, | ||
if there is no POST body. | ||
""" | ||
# handle query string token if there are no POST parameters | ||
# this is needed for the 'confirm-action' JavaScript module | ||
if not self.POST and len(self.GET.getall(anti_csrf.TOKEN_FIELD_NAME)) == 1: | ||
token = self.GET.getone(anti_csrf.TOKEN_FIELD_NAME) | ||
del self.GET[anti_csrf.TOKEN_FIELD_NAME] | ||
return token | ||
post_tokens = self.POST.getall(anti_csrf.TOKEN_FIELD_NAME) | ||
if not post_tokens or len(post_tokens) != 1: | ||
return None | ||
token = post_tokens[0] | ||
# drop token from request so it doesn't populate resource extras | ||
del self.POST[anti_csrf.TOKEN_FIELD_NAME] | ||
return token | ||
|
||
def get_cookie_token(self): | ||
"""Retrieve the token expected by the server. | ||
This will be retrieved from the 'token' cookie | ||
""" | ||
if anti_csrf.TOKEN_FIELD_NAME in self.cookies: | ||
log.debug("Obtaining token from cookie") | ||
return self.cookies.get(anti_csrf.TOKEN_FIELD_NAME) | ||
else: | ||
return None | ||
|
||
class CSRFMiddleware(object): | ||
COOKIE_NAME = 'csrftoken' | ||
def check_token(self): | ||
log.debug("Checking token matches Token {}, cookie_token: {}".format(self.token, self.get_cookie_token())) | ||
return self.token is not None and self.token == self.get_cookie_token() | ||
|
||
class CSRFMiddleware(object): | ||
def __init__(self, app, config): | ||
self.app = app | ||
self.cache = CSRFClient() | ||
self.domain = config['ckanext.security.domain'] | ||
|
||
def __call__(self, environ, start_response): | ||
request = Request(environ) | ||
self.session = environ['beaker.session'] | ||
self.session.save() | ||
|
||
if self.is_valid(request): | ||
resp = request.get_response(self.app) | ||
else: | ||
resp = HTTPForbidden(CSRF_ERR) | ||
|
||
return resp(environ, start_response) | ||
if 'text/html' in resp.headers.get('Content-type', ''): | ||
resp = self.add_new_token(resp) | ||
token = anti_csrf.get_response_token(request, resp) | ||
|
||
return resp(environ, start_response) | ||
new_response = anti_csrf.apply_token(resp.unicode_body, token) | ||
resp.unicode_body = new_response | ||
return resp(environ, start_response) | ||
|
||
else: | ||
response_value = resp(environ, start_response) | ||
return response_value | ||
|
||
def is_valid(self, request): | ||
return request.is_safe() or self.unsafe_request_is_valid(request) | ||
|
||
def unsafe_request_is_valid(self, request): | ||
return request.is_secure() and request.good_referer() and self.check_cookie(request) | ||
|
||
def check_cookie(self, request): | ||
token = request.cookies.get(self.COOKIE_NAME, None) | ||
|
||
if token is None: | ||
# Just in case this is set by an AJAX request | ||
token = request.cookies.get('X-CSRFToken', None) | ||
|
||
csrf_token = self.cache.get(self.session.id) | ||
|
||
if csrf_token == None: | ||
log.warning('Could not find a csrf token for session id: {}\n{}'.format(self.session.id, request)) | ||
return False | ||
|
||
return compare_digest(str(token), str(csrf_token)) | ||
|
||
def add_new_token(self, response): | ||
token = codecs.encode(os.urandom(32), 'hex') | ||
self.cache.set(self.session.id, token) | ||
response.set_cookie(self.COOKIE_NAME, token, httponly=True, overwrite=True, secure=True, domain=self.domain) | ||
return response | ||
return request.is_secure() and request.good_referer(self.domain) and \ | ||
request.good_origin(self.domain) and request.check_token() |