Skip to content

Commit

Permalink
oie implementation WIP, oauth2 only, authorization code flow.
Browse files Browse the repository at this point in the history
  • Loading branch information
sevignyj committed Sep 10, 2023
1 parent 45703a5 commit 6d888c0
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 95 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ venv/
ENV/
env.bak/
venv.bak/
.vscode/

# Spyder project settings
.spyderproject
Expand Down
2 changes: 1 addition & 1 deletion tokendito/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def main(args=None): # needed for console script

path = os.path.dirname(os.path.dirname(__file__))
sys.path[0:0] = [path]
from tokendito.tool import cli
from tokendito.user import cli

try:
return cli(args)
Expand Down
286 changes: 268 additions & 18 deletions tokendito/okta.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import time

import bs4
import base64
import os
import hashlib
from bs4 import BeautifulSoup
import requests
from tokendito import duo
Expand Down Expand Up @@ -73,6 +76,7 @@ def api_error_code_parser(status=None):
param status: Response status
return message: status message
"""
logger.debug(f"api_error_code_parser({status})")
if status and status in _status_dict.keys():
message = f"Okta auth failed: {_status_dict[status]}"
else:
Expand All @@ -82,17 +86,41 @@ def api_error_code_parser(status=None):
return message


def get_auth_pipeline(url=None):
"""Get auth pipeline version."""
logger.debug(f"get_auth_pipeline({url})")
headers = {"accept": "application/json"}
# https://developer.okta.com/docs/api/openapi/okta-management/management/tag/OrgSetting/
url = f"{url}/.well-known/okta-organization"

response = user.request_wrapper("GET", url, headers=headers)
try:
ret = response.json()
except (KeyError, ValueError) as e:
logger.error(f"Failed to parse type in {url}:{str(e)}")
logger.debug(f"Response: {response.text}")
sys.exit(1)
logger.debug(f"we have {ret}")
auth_pipeline = ret.get("pipeline", None)
if auth_pipeline != "idx" and auth_pipeline != "v1":
logger.error(f"unsupported auth pipeline version {auth_pipeline}")
sys.exit(1)
logger.debug(f"Pipeline is of type {auth_pipeline}")
return auth_pipeline


def get_auth_properties(userid=None, url=None):
"""Make a call to the webfinger endpoint.
"""Make a call to the webfinger endpoint to get the auth properties metadata
:param userid: User for which we are requesting an auth endpoint.
:param url: Site where we are looking up the user.
:returns: dictionary with authentication properties.
"""
logger.debug(f"get_auth_properies({userid}, {url})")
payload = {"resource": f"okta:acct:{userid}", "rel": "okta:idp"}
# payload = {"resource": f"okta:acct:{userid}"}
headers = {"accept": "application/jrd+json"}
url = f"{url}/.well-known/webfinger"

logger.debug(f"Looking up auth endpoint for {userid} in {url}")
response = user.request_wrapper("GET", url, headers=headers, params=payload)

Expand Down Expand Up @@ -195,7 +223,6 @@ def get_session_token(config, primary_auth, headers):
:param primary_auth: Primary authentication
:return: Session Token from JSON response
"""
status = None
try:
status = primary_auth.get("status", None)
except AttributeError:
Expand All @@ -207,42 +234,264 @@ def get_session_token(config, primary_auth, headers):
session_token = mfa_challenge(config, headers, primary_auth)
else:
logger.debug(f"Error parsing response: {json.dumps(primary_auth)}")
logger.error("Okta auth failed: unknown status.")
logger.error(f"Okta auth failed: unknown status {status}")
sys.exit(1)

user.add_sensitive_value_to_be_masked(session_token)

return session_token


def authenticate(config):
"""Authenticate user.
def get_oauth_token(token_endpoint_url, authz_code):
"""Get OAuth token from Okta.
:param url: URL of the Okta OAuth token endpoint
todo:, authz_code in payload, put call etc.
:return: OAuth token
"""
# payload = {"code": f"{authz_code}"}
# payload = {"resource": f"okta:acct:{userid}"}
# headers = {"accept": "application/jrd+json"}
# response = user.request_wrapper("GET", token_endpoint_url , headers=headers, params=payload)

headers = {"accept": "text/html,application/xhtml+xml,application/xml"}
payload = {"code": authz_code}
response = user.request_wrapper("POST", token_endpoint_url, headers=headers, data=payload)
return response.json()


def extract_authz_code(response_text):
"""extract authorization code from response text
:param response_text: response text from /authorize call
:return: authorization code
"""
authz_code = re.search(r"(?<=code=)[^&]+", response_text).group(0)
return authz_code


def get_client_id():
"""TODO
https://datatracker.ietf.org/doc/html/rfc6749#section-2.2
https://developer.okta.com/docs/reference/api/oidc/#access-token-scopes-and-claims
https://developer.okta.com/docs/guides/implement-grant-type/authcodepkce/main/#next-steps
""" ""
return "okta.2b1959c8-bcc0-56eb-a589-cfcfb7422f26"


def get_redirect_uri(org_url):
"""TODO"""
# return "http://localhost:8080/authorization-code/callback"
return "set_locally"


def get_response_type():
"""TODO"""
return "code"


def get_authorize_scope():
"""TODO
https://developer.okta.com/docs/reference/api/oidc/#access-token-scopes-and-claims
https://developer.okta.com/docs/guides/implement-grant-type/authcodepkce/main/#next-steps
"""
return "openid profile email okta.users.read.self okta.users.manage.self okta.internal.enduser.read okta.internal.enduser.manage okta.enduser.dashboard.read okta.enduser.dashboard.manage"


def get_oauth_state():
"""generate a random string for state
https://developer.okta.com/docs/guides/implement-grant-type/authcode/main/#flow-specifics
https://developer.okta.com/docs/guides/implement-grant-type/authcodepkce/main/#next-steps
"""
state = hashlib.sha256(os.urandom(1024)).hexdigest()
return state


def get_pkce_code_challenge_method():
"""TODO"""
return "S256"


def get_pkce_code_challenge(code_verifier=None):
"""
get_pkce_code_challenge
Base64-URL-encoded string of the SHA256 hash of the code verifier
https://www.oauth.com/oauth2-servers/pkce/authorization-request/
:param: code_verifier
:return: code_challenge
"""
if code_verifier is None:
code_verifier = get_pkce_code_verifier()
code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode("utf-8")).digest()
).decode("utf-8")
return code_challenge


def get_pkce_code_verifier():
"""
to review
"""
code_verifier = base64.urlsafe_b64encode(os.urandom(32)).decode("utf-8")
return code_verifier


def pkce_enabled():
"""TODO"""
return True


def oauth_authz_code_request(authz_server_url, authn_sid):
"""implements authorization code request
calls /authorize endpoint with authenticated sid.
https://developer.okta.com/docs/reference/api/oidc/#_2-okta-as-the-identity-platform-for-your-app-or-api
:param
:return: authorization code, needed for /token call
"""
logger.debug(f"authz_code_reqquest({authz_server_url}, {authn_sid})")
# headers = {"accept": "text/html,application/xhtml+xml,application/xml"}
headers = {"accept": "application/json"}
client_id = get_client_id()
redirect_uri = get_redirect_uri(authz_server_url)
response_type = get_response_type()
scope = get_authorize_scope()
oauth_state = get_oauth_state()

if pkce_enabled():
code_verifier = get_pkce_code_verifier()
code_challenge = get_pkce_code_challenge(code_verifier)
code_challenge_method = get_pkce_code_challenge_method()

payload = {
"client_id": client_id,
"redirect_url": redirect_uri,
"response_type": response_type,
"scope": scope,
"state": oauth_state,
"code_challenge": code_challenge,
"code_challenge_method": code_challenge_method,
}
#
# payload = {"client_id": client_id, "response_type": response_type, "scope": scope, "state": oauth_state, "code_challenge": code_challenge, "code_challenge_method": code_challenge_method}

# todo; authn_sid?

breakpoint()
response = user.request_wrapper("GET", authz_server_url, params=payload)
logger.debug(f"authz_code_reqquest({authz_server_url}, {authn_sid}) response: {response.text}")
authz_code = extract_authz_code(response.text)


def authorization_code_flow(config, authn_sid):
# Authorization Code flow (see
# https://developer.okta.com/docs/guides/implement-grant-type/authcode/main/#about-the-authorization-code-grant
# )
logger.debug(f"authorization_code_worflow({config},{authn_sid})")
authz_endpoint_url, token_endpoint_url = get_authorization_server_endpoints(config.okta["org"])
authz_code = oauth_authz_code_request(authz_endpoint_url, authn_sid)
user.add_sensitive_value_to_be_masked(authz_code)

authz_token = get_oauth_token(token_endpoint_url, authz_code)
user.add_sensitive_value_to_be_masked(authz_token)
return authz_token


def authorization_code_enabled(config):
"""determines if authorization code grant is enabled
todo
"""
return True


def get_authorization_server_endpoints(url=None):
"""Get /authorize and /token endpoints from Okta
:param url: URL of the Okta
:return: tuple of URLs of the /authorize and /token endpoints
"""
url = f"{url}/.well-known/oauth-authorization-server"
headers = {"accept": "application/json"}
response = user.request_wrapper("GET", url, headers=headers)
logger.info(f"Authorization Server info: {response.json()}")
# todo: handle errors.
return response.json()["authorization_endpoint"], response.json()["token_endpoint"]


def oie_authz(config, authn_sid):
logger.debug(f"oie_auth({config}, {authn_sid})")
# get url where to authorize
if authorization_code_enabled(config):
sid = authorization_code_flow(config, authn_sid)
else:
logger.warning("Authorization Code is not enabled, basically skipping oie.")
sid = authn_sid
return sid


def idp_auth(config):
"""authenticate and authorize with the idp. Authorize heppens if oie is enabled.
:param config: Config object
:return: session ID cookie.
"""
breakpoint()

logger.debug(f"idp_auth({config})")
auth_properties = get_auth_properties(userid=config.okta["username"], url=config.okta["org"])

if "type" not in auth_properties:
logger.error("Okta auth failed: unknown type.")
sys.exit(1)
sid = None

if is_local_auth(auth_properties):
session_token = local_auth(config)
sid = user.request_cookies(config.okta["org"], session_token)
session_cookies = None

if is_local_authn(auth_properties):
session_token = local_authn(config)
session_cookies = user.request_cookies(config.okta["org"], session_token)
elif is_saml2_auth(auth_properties):
sid = saml2_auth(config, auth_properties)
session_cookies = saml2_auth(config, auth_properties)
else:
logger.error(f"{auth_properties['type']} login via IdP Discovery is not curretly supported")
sys.exit(1)
return sid

if oie_enabled(config.okta["org"]):
session_cookies = oie_authz(config, session_cookies)

return session_cookies


def oie_enabled(url):
"""
Determines if OIE is enabled.
:pamam url: okta org url
:return: True if OIE is enabled, False otherwise
"""
if get_auth_pipeline(url) == "idx": # oie
return True
else:
return False


def local_authn(config):
"""Authenticate user on local okta instance.
:param config: Config object
:return: auth session ID cookie.
"""

url = config.okta["org"]

authn_sid = idp_authn(config)
user.add_sensitive_value_to_be_masked(authn_sid)
return authn_sid


def is_local_auth(auth_properties):
"""Check whether authentication happens locally.
def is_local_authn(auth_properties):
"""Check whether authentication happens on the current instance.
:param auth_properties: auth_properties dict
:return: True for local auth, False otherwise.
:return: True if this is the place to authenticate, False otherwise.
"""
try:
if auth_properties["type"] == "OKTA":
Expand All @@ -266,12 +515,13 @@ def is_saml2_auth(auth_properties):
return False


def local_auth(config):
"""Authenticate local user with okta credential.
def idp_authn(config):
"""Authenticate with okta.
:param config: Config object
:return: MFA session with options
"""
logger.debug(f"idp_authn({config}")
session_token = None
headers = {"content-type": "application/json", "accept": "application/json"}
payload = {"username": config.okta["username"], "password": config.okta["password"]}
Expand Down Expand Up @@ -303,7 +553,7 @@ def saml2_auth(config, auth_properties):
logger.info(f"Authentication is being redirected to {saml2_config.okta['org']}.")
# Try to authenticate using the new configuration. This could cause
# recursive calls, which allows for IdP chaining.
session_cookies = authenticate(saml2_config)
session_cookies = idp_authn(saml2_config)

# Once we are authenticated, send the SAML request to the IdP.
# This call requires session cookies.
Expand Down
Loading

0 comments on commit 6d888c0

Please sign in to comment.