Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for optional profile expiration #153

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ usage: tokendito [-h] [--version] [--configure] [--username OKTA_USERNAME] [--pa
[--loglevel {DEBUG,INFO,WARN,ERROR}] [--log-output-file USER_LOG_OUTPUT_FILE] [--aws-config-file AWS_CONFIG_FILE] [--aws-output AWS_OUTPUT]
[--aws-profile AWS_PROFILE] [--aws-region AWS_REGION] [--aws-role-arn AWS_ROLE_ARN] [--aws-shared-credentials-file AWS_SHARED_CREDENTIALS_FILE]
[--okta-org OKTA_ORG | --okta-tile OKTA_TILE] [--okta-client-id OKTA_CLIENT_ID] [--okta-mfa OKTA_MFA] [--okta-mfa-response OKTA_MFA_RESPONSE]
[--use-device-token] [--quiet]
[--use-device-token] [--use-profile-expiration] [--quiet]

Gets an STS token to use with the AWS CLI and SDK.

Expand Down Expand Up @@ -116,6 +116,8 @@ options:
--okta-mfa-response OKTA_MFA_RESPONSE
Sets the MFA response to a challenge. You can also use the TOKENDITO_OKTA_MFA_RESPONSE environment variable.
--use-device-token Use device token across sessions
--use-profile-expiration
Use profile expiration to bypass re-authenticating
--quiet Suppress output
```

Expand Down Expand Up @@ -158,6 +160,7 @@ The following table lists the environment variable and user configuration entry
| `--okta-mfa` | `TOKENDITO_OKTA_MFA` | `okta_mfa` |
| `--okta-mfa-response` | `TOKENDITO_OKTA_MFA_RESPONSE` | `okta_mfa_response` |
| `--use-device-token` | `TOKENDITO_USER_USE_DEVICE_TOKEN` | `user_use_device_token` |
| `--use-profile-expiration` | `TOKENDITO_USER_USE_PROFILE_EXPIRATION` | `user_use_profile_expiration` |
| `--quiet` | `TOKENDITO_USER_QUIET` | `quiet` |

# Configuration file location
Expand Down
53 changes: 50 additions & 3 deletions tests/unit/test_user.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# vim: set filetype=python ts=4 sw=4
# -*- coding: utf-8 -*-
"""Unit tests, and local fixtures for the user module."""
from datetime import datetime
from datetime import datetime, timedelta, timezone
import os
import sys

Expand Down Expand Up @@ -466,7 +466,7 @@ def test_update_configuration(tmpdir):
assert ret.okta["mfa"] == "pytest"


def test_update_device_token(tmpdir):
def test_update_profile_device_token(tmpdir):
"""Test writing and reading device token to a configuration file."""
from tokendito import user
from tokendito.config import Config
Expand All @@ -481,11 +481,58 @@ def test_update_device_token(tmpdir):
)

# Write out a config file via configure() and ensure it's functional
user.update_device_token(pytest_config)
user.update_profile_device_token(pytest_config)
ret = user.process_ini_file(path, "pytest")
assert ret.okta["device_token"] == device_token


def test_check_profile_expiration():
"""Test checking profile expiration."""
from tokendito import user
from tokendito.config import Config

now = datetime.now(timezone.utc)
future = now + timedelta(days=1)
past = now + timedelta(days=-1)

pytest_config = Config(
aws={"profile": "test-profile"},
okta={"profile_expiration": str(future)},
user={"use_profile_expiration": True},
)

# Expiration in the future should exit
with pytest.raises(SystemExit):
user.check_profile_expiration(pytest_config)

# Expiration in the past should not exit
pytest_config.okta["profile_expiration"] = str(past)
try:
user.check_profile_expiration(pytest_config)
except SystemExit:
pytest.fail("Profile expiration was invalid and should not have exited")


def test_update_profile_expiration(tmpdir):
"""Test writing and reading profile expiration to a configuration file."""
from tokendito import user
from tokendito.config import Config

path = tmpdir.mkdir("pytest").join("pytest_tokendito.ini")

expiration = datetime.now(timezone.utc)

pytest_config = Config(
okta={"profile_expiration": expiration},
user={"config_file": path, "config_profile": "pytest"},
)

# Write out a config file via configure() and ensure it's functional
user.update_profile_expiration(pytest_config)
ret = user.process_ini_file(path, "pytest")
assert datetime.fromisoformat(ret.okta["profile_expiration"]) == expiration


def test_process_ini_file(tmpdir):
"""Test whether ini config elements are set correctly.

Expand Down
2 changes: 2 additions & 0 deletions tokendito/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Config(object):
loglevel="INFO",
log_output_file="",
use_device_token=False,
use_profile_expiration=False,
mask_items=[],
quiet=False,
),
Expand All @@ -50,6 +51,7 @@ class Config(object):
tile=None,
org=None,
device_token=None,
profile_expiration=None,
),
)

Expand Down
112 changes: 96 additions & 16 deletions tokendito/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import builtins
import codecs
import configparser
from datetime import timezone
from datetime import datetime, timezone
from getpass import getpass
import json
import logging
Expand Down Expand Up @@ -67,6 +67,8 @@ def cmd_interface(args):
# rm trailing / if provided as such so the urls with this as base dont have //
config.okta["org"] = config.okta["org"].strip("/")

check_profile_expiration(config)

if config.user["use_device_token"]:
device_token = config.okta["device_token"]
if device_token:
Expand Down Expand Up @@ -107,11 +109,7 @@ def cmd_interface(args):
output=config.aws["output"],
)

device_token = HTTP_client.get_device_token()
if config.user["use_device_token"] and device_token:
logger.info(f"Saving device token to config profile {args.user_config_profile}")
config.okta["device_token"] = device_token
update_device_token(config)
update_profile(config, role_response)

display_selected_role(profile_name=config.aws["profile"], role_response=role_response)

Expand Down Expand Up @@ -233,6 +231,13 @@ def parse_cli_args(args):
default=False,
help="Use device token across sessions",
)
parser.add_argument(
"--use-profile-expiration",
dest="user_use_profile_expiration",
action="store_true",
default=False,
help="Use profile expiration to bypass re-authenticating",
)
parser.add_argument(
"--quiet",
dest="user_quiet",
Expand Down Expand Up @@ -515,6 +520,65 @@ def prompt_role_choices(aut_tiles):
return selected_role


def get_role_expiration(role_response={}):
"""Get the expiration from the role response.

:param role_response: Assume Role response dict
:return expiration
"""
try:
return role_response["Credentials"]["Expiration"]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming here that aws returns the expiration time in utc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, the value is returned in UTC.

There is existing code that converts this to local when displaying the role to the user.

Copy link

@JSpenced JSpenced Dec 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup yup! I just knew there was a aws config var for setting timestamps. I have had a problem switching from aws cli version 1 to 2 before. A different issue than this, but just wanted to check as those returned different formats. Think we are good then 👍 !

except (KeyError, TypeError) as err:
logger.error(f"Could not retrieve expiration: {err}")
sys.exit(1)


def check_profile_expiration(config):
"""Check profile expiration and exit if still valid.

:param config: Config object
"""
if not config.user["use_profile_expiration"]:
return

profile = config.aws["profile"]

profile_expiration_str = config.okta["profile_expiration"]
if not profile_expiration_str:
logger.warning(f"Expiration unavailable for config profile {profile}. ")
return

profile_expiration = datetime.fromisoformat(profile_expiration_str)
now = datetime.now(timezone.utc)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not necessary but if it isn't guaranteed to return utc.

Suggested change
now = datetime.now(timezone.utc)
now = datetime.now(profile_expiration.tzinfo if profile_espiration.tzinfo else timezone.utc)


if now < profile_expiration:
logger.info(f"Expiration for config profile {profile} is still valid: {profile_expiration}")
sys.exit(0)
else:
logger.warning(
f"Expiration for config profile {profile} is no longer valid: {profile_expiration}"
)


def update_profile(config, role_response={}):
"""Update profile with device token and expiration if needed.

:param config: Config object
:param role_response: Assume Role response dict
"""
device_token = HTTP_client.get_device_token()
if config.user["use_device_token"] and device_token:
logger.info(f"Saving device token to config profile {config.aws['profile']}")
config.okta["device_token"] = device_token
update_profile_device_token(config)

role_expiration = get_role_expiration(role_response)
if config.user["use_profile_expiration"] and role_expiration:
logger.info(f"Saving expiration to config profile {config.aws['profile']}")
config.okta["profile_expiration"] = role_expiration
update_profile_expiration(config)


def display_selected_role(profile_name="", role_response={}):
"""Print details about how to assume role.

Expand All @@ -523,21 +587,16 @@ def display_selected_role(profile_name="", role_response={}):
:return: message displayed.

"""
try:
expiration_time = role_response["Credentials"]["Expiration"]
except (KeyError, TypeError) as err:
logger.error(f"Could not retrieve expiration time: {err}")
sys.exit(1)

expiration_time_local = utc_to_local(expiration_time)
role_expiration = get_role_expiration(role_response)
role_expiration_local = utc_to_local(role_expiration)
msg = (
f"\nGenerated profile '{profile_name}' in "
f"{config.aws['shared_credentials_file']}.\n"
"\nUse profile to authenticate to AWS:\n\t"
f"aws --profile '{profile_name}' sts get-caller-identity"
"\nOR\n\t"
f"export AWS_PROFILE='{profile_name}'\n\n"
f"Credentials are valid until {expiration_time} ({expiration_time_local})."
f"Credentials are valid until {role_expiration} ({role_expiration_local})."
)

print(msg)
Expand Down Expand Up @@ -773,7 +832,7 @@ def process_environment(prefix="tokendito"):

def process_interactive_input(config, skip_password=False):
"""
Request input interactively interactively for elements that are not proesent.
Request input interactively interactively for elements that are not present.

:param config: Config object with some values set.
:param skip_password: Whether or not ask the user for a password.
Expand Down Expand Up @@ -1002,7 +1061,7 @@ def update_configuration(config):
logger.info(f"Updated {ini_file} with profile {profile}")


def update_device_token(config):
def update_profile_device_token(config):
"""Update configuration file on local system with device token.

:param config: the current configuration
Expand All @@ -1023,6 +1082,27 @@ def update_device_token(config):
logger.info(f"Updated {ini_file} with profile {profile}")


def update_profile_expiration(config):
"""Update configuration file on local system with profile expiration.

:param config: the current configuration
:return: None
"""
logger.debug("Update configuration file on local system with profile expiration.")
ini_file = config.user["config_file"]
profile = config.user["config_profile"]

contents = {}
# Copy relevant parts of the configuration into an dictionary that
# will be written out to disk
if "profile_expiration" in config.okta and config.okta["profile_expiration"] is not None:
contents["okta_profile_expiration"] = config.okta["profile_expiration"]

logger.debug(f"Adding {contents} to config file.")
update_ini(profile=profile, ini_file=ini_file, **contents)
logger.info(f"Updated {ini_file} with profile {profile}")


def set_local_credentials(response={}, role="default", region="us-east-1", output="json"):
"""Write to local files to insert credentials.

Expand Down