Skip to content

Commit

Permalink
Merge pull request #19 from antoinebou12/test-api
Browse files Browse the repository at this point in the history
add api_key login to the vercel service
  • Loading branch information
antoinebou12 authored Apr 19, 2024
2 parents 82e8cb2 + a0c272b commit 5f7841d
Showing 1 changed file with 41 additions and 34 deletions.
75 changes: 41 additions & 34 deletions api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@


from dataclasses import dataclass
from typing import List, Optional

from pydantic import BaseModel

class DeviceBind(BaseModel):
Expand Down Expand Up @@ -1012,12 +1010,12 @@ class ClientSSLError(Exception):
from starlette.status import HTTP_401_UNAUTHORIZED, HTTP_404_NOT_FOUND
from datetime import datetime
import hashlib
import os
from Crypto.PublicKey import RSA
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA256
from pydantic import BaseModel
from typing import Optional
from Crypto.Cipher import PKCS1_OAEP
import binascii
from base64 import b64encode, b64decode

security_basic = HTTPBasic()
API_KEY_NAME = "access_token"
Expand Down Expand Up @@ -1056,41 +1054,50 @@ def load_rsa_keys():


def generate_api_key(email: str, password: str) -> str:
"""Generate a signed API key."""
timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f")
salt = os.urandom(16).hex()
payload = f"{email}:{password}:{timestamp}:{salt}"
api_key = hashlib.sha256(payload.encode()).hexdigest()
"""Generate a signed and encrypted API key."""
timestamp = datetime.utcnow().strftime("%Y%m%d%H%M%S%f")
payload = f"{email}:{password}:{timestamp}"
# Encrypt the payload
cipher = PKCS1_OAEP.new(PUBLIC_KEY)
encrypted_payload = cipher.encrypt(payload.encode())
encrypted_hex = binascii.hexlify(encrypted_payload).decode()

# Sign the encrypted payload
hash_obj = SHA256.new(encrypted_payload)
signature = pkcs1_15.new(PRIVATE_KEY).sign(hash_obj)
signature_hex = binascii.hexlify(signature).decode()

return f"{encrypted_hex}:{signature_hex}"

def decrypt_api_key(api_key: str) -> dict:
"""Verify and decrypt an API key."""
try:
encrypted_payload_hex, signature_hex = api_key.split(':')
encrypted_payload = binascii.unhexlify(encrypted_payload_hex)
signature = binascii.unhexlify(signature_hex)

# Sign the API key
key_hash = SHA256.new(api_key.encode())
signature = pkcs1_15.new(PRIVATE_KEY).sign(key_hash)
signed_api_key = api_key + ":" + signature.hex()
# Verify the signature
hash_obj = SHA256.new(encrypted_payload)
pkcs1_15.new(PUBLIC_KEY).verify(hash_obj, signature)

return signed_api_key
# Decrypt the payload
cipher = PKCS1_OAEP.new(PRIVATE_KEY)
payload = cipher.decrypt(encrypted_payload)
email, password, timestamp = payload.decode().split(':')

def decrypt_api_key(api_key: str):
"""Decrypt API key to extract the email and password."""
try:
api_key_part, signature = api_key.rsplit(':', 1)
key_hash = SHA256.new(api_key_part.encode())
# Verify the signature
pkcs1_15.new(PUBLIC_KEY).verify(key_hash, bytes.fromhex(signature))

# Decode the base string
decoded_bytes = bytes.fromhex(api_key_part)
decoded_string = decoded_bytes.decode('utf-8') # Assuming the input was UTF-8-encoded
email, password, timestamp, salt = decoded_string.split(':')

return {"email": email, "password": password}
except ValueError: # Catches all errors related to cryptographic operations
return {"email": email, "password": password, "timestamp": timestamp}
except (ValueError, IndexError, TypeError, binascii.Error) as e:
raise HTTPException(status_code=403, detail="Invalid API key")

def verify_api_key(api_key: str) -> bool:
try:
api_key, signature = api_key.rsplit(':', 1)
key_hash = SHA256.new(api_key.encode())
pkcs1_15.new(PUBLIC_KEY).verify(key_hash, bytes.fromhex(signature))
encrypted_payload_hex, signature_hex = api_key.split(':')
encrypted_payload = binascii.unhexlify(encrypted_payload_hex)
signature = binascii.unhexlify(signature_hex)

# Verify the signature
hash_obj = SHA256.new(encrypted_payload)
pkcs1_15.new(PUBLIC_KEY).verify(hash_obj, signature)
return True
except Exception:
return False
Expand Down Expand Up @@ -1133,7 +1140,7 @@ async def auth(renpho: RenphoWeight = Depends(get_current_user)):
return APIResponse(status="success", message="Authentication successful.")

@app.get("/generate_api_key", response_model=APIResponse)
def generate_key(request: Request, email: str, password: str):
def generate_key(request: Request, email: str, password: str, renpho: RenphoWeight = Depends(get_current_user)):
"""Generate API key for a user."""
try:
api_key = generate_api_key(email, password)
Expand Down

0 comments on commit 5f7841d

Please sign in to comment.