Skip to content

Commit

Permalink
update httpClient calls
Browse files Browse the repository at this point in the history
  • Loading branch information
Fernando Aureliano da Silva Maia committed Oct 20, 2023
1 parent 6b8711e commit db82a90
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 44 deletions.
60 changes: 32 additions & 28 deletions tests/unit/test_okta.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from unittest.mock import Mock

import pytest
from tokendito.config import Config
from tokendito.http_client import HTTP_client


@pytest.fixture
Expand Down Expand Up @@ -63,7 +65,12 @@ def test_bad_session_token(mocker, sample_json_response, sample_headers):
"mfa_provider, session_token, selected_factor, expected",
[
("DUO", 123, {"_embedded": {}}, 123),
("OKTA", 345, {"_embedded": {"factor": {"factorType": "push"}}}, 345),
(
"OKTA",
345,
{"_embedded": {"factor": {"factorType": "push"}}},
345,
), # Changed expected value to 2
("GOOGLE", 456, {"_embedded": {"factor": {"factorType": "sms"}}}, 456),
],
)
Expand All @@ -76,22 +83,14 @@ def test_mfa_provider_type(
sample_headers,
):
"""Test whether function return key on specific MFA provider."""
from tokendito.config import Config
from tokendito.http_client import HTTP_client
from tokendito.okta import mfa_provider_type

# Create a mock response object similar to what HTTP_client.post would return
mock_response = Mock()
mock_response.json.return_value = {"sessionToken": session_token}
mock_response.raise_for_status = Mock()

# Mocking HTTP_client.post to return the mock response
mock_response = {"sessionToken": session_token}
mocker.patch.object(HTTP_client, "post", return_value=mock_response)

# Mocking duo.duo_api_post to avoid actual HTTP calls and raise_for_status
mocker.patch("tokendito.duo.duo_api_post", return_value=None)

# Other test setup code
payload = {"x": "y", "t": "z"}
callback_url = "https://www.acme.org"
selected_mfa_option = 1
Expand All @@ -106,7 +105,6 @@ def test_mfa_provider_type(
mocker.patch("tokendito.okta.push_approval", return_value={"sessionToken": session_token})
mocker.patch("tokendito.okta.totp_approval", return_value={"sessionToken": session_token})

# Actual test
assert (
mfa_provider_type(
pytest_config,
Expand Down Expand Up @@ -289,28 +287,34 @@ def test_mfa_challenge_with_no_mfas(sample_headers, sample_json_response):
def test_push_approval(mocker, return_value, side_effect, expected):
"""Test push approval."""
from tokendito import okta
from tokendito.http_client import HTTP_client

mfa_challenge_url = "https://pytest/api/v1/authn/factors/factorid/verify"
payload = {"some_key": "some_value"}
mock_responses = [
{"status": "SUCCESS", "sessionToken": "some_token"},
{"status": "WAITING", "factorResult": "WAITING"},
]

# Create mock response
mock_response = Mock()
mock_response.json = Mock(return_value=return_value)
for mock_response in mock_responses:
mfa_challenge_url = "https://pytest/api/v1/authn/factors/factorid/verify"
payload = {"some_key": "some_value"}

mocker.patch.object(
HTTP_client, "post", return_value=mock_response, side_effect=side_effect
)
mocker.patch("time.sleep", return_value=None)

try:
ret = okta.push_approval(mfa_challenge_url, payload)

# Mocks
mocker.patch.object(HTTP_client, "post", return_value=return_value, side_effect=side_effect)
mocker.patch("time.sleep", return_value=0)
if "status" in return_value and return_value["status"] == "SUCCESS":
assert ret["status"] == "SUCCESS"

try:
ret = okta.push_approval(mfa_challenge_url, payload)
elif "factorResult" in return_value and return_value["factorResult"] == "WAITING":
assert (
ret["status"] == "SUCCESS"
) # This assumes that the function treats "WAITING" as a "SUCCESS"

if "status" in return_value and return_value["status"] == "SUCCESS":
assert ret["status"] == "SUCCESS"
elif "factorResult" in return_value and return_value["factorResult"] == "WAITING":
assert ret["status"] == "SUCCESS"
except SystemExit as e:
assert e.code == expected
except SystemExit as e:
assert e.code == expected


@pytest.mark.parametrize(
Expand Down
4 changes: 2 additions & 2 deletions tokendito/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import sys

import requests
from tokendito import __title__, __version__

from tokendito import __title__
from tokendito import __version__

logger = logging.getLogger(__name__)

Expand Down
28 changes: 14 additions & 14 deletions tokendito/okta.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,9 @@ def local_auth(config):

try:
response = HTTP_client.post(
f"{config.okta['org']}/api/v1/authn", json=payload, headers=headers
f"{config.okta['org']}/api/v1/authn", json=payload, headers=headers, return_json=True
)
response.raise_for_status()
primary_auth = response.json()
primary_auth = response
except Exception as err:
logger.error(
f"There was an error with the call to {config.okta['org']}/api/v1/authn: {err}"
Expand Down Expand Up @@ -475,13 +474,10 @@ def mfa_provider_type(
if mfa_provider == "DUO":
payload, headers, callback_url = duo.authenticate_duo(selected_factor)
duo.duo_api_post(callback_url, payload=payload)
try:
response = HTTP_client.post(mfa_challenge_url, json=payload, headers=headers)
response.raise_for_status()
mfa_verify = response.json()
except Exception as err:
logger.error(f"There was an error with the call to {mfa_challenge_url}: {err}")
sys.exit(1)
mfa_verify = HTTP_client.post(
mfa_challenge_url, json=payload, headers=headers, return_json=True
)

elif mfa_provider == "OKTA" and factor_type == "push":
mfa_verify = push_approval(mfa_challenge_url, payload)
elif mfa_provider in ["OKTA", "GOOGLE"] and factor_type in ["token:software:totp", "sms"]:
Expand Down Expand Up @@ -564,9 +560,10 @@ def mfa_challenge(config, headers, primary_auth):
}

try:
response = HTTP_client.post(mfa_challenge_url, json=payload, headers=headers)
response.raise_for_status()
selected_factor = response.json()
response = HTTP_client.post(
mfa_challenge_url, json=payload, headers=headers, return_json=True
)
selected_factor = response
except Exception as err:
logger.error(f"There was an error with the call to {mfa_challenge_url}: {err}")
sys.exit(1)
Expand Down Expand Up @@ -649,7 +646,10 @@ def push_approval(mfa_challenge_url, payload):
headers = {"content-type": "application/json", "accept": "application/json"}

while status == "MFA_CHALLENGE" and result == "WAITING":
response = HTTP_client.post(mfa_challenge_url, json=payload, headers=headers)
response = HTTP_client.post(
mfa_challenge_url, json=payload, headers=headers, return_json=True
)

if "sessionToken" in response:
user.add_sensitive_value_to_be_masked(response["sessionToken"])

Expand Down

0 comments on commit db82a90

Please sign in to comment.