diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index e8fc0f0..f656994 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -22,7 +22,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install setuptools wheel + pip install setuptools wheel build - name: Build package run: | python -m build diff --git a/src/qspylib/_version.py b/src/qspylib/_version.py index 4a5d096..bf89ba7 100644 --- a/src/qspylib/_version.py +++ b/src/qspylib/_version.py @@ -1,2 +1,3 @@ """unified version string""" -__version__ = '1.0.0a2' + +__version__ = "1.0.0a3" diff --git a/src/qspylib/clublog.py b/src/qspylib/clublog.py index 4524e51..c33a180 100644 --- a/src/qspylib/clublog.py +++ b/src/qspylib/clublog.py @@ -6,25 +6,29 @@ import requests from .logbook import Logbook + class ClubLogError(Exception): """An error raised when an issue occurs with the ClubLog API.""" - def __init__(self, message="An error occurred while interfacing with the ClubLog API"): + + def __init__( + self, message="An error occurred while interfacing with the ClubLog API" + ): super().__init__(message) + class ClubLogClient: """This is a wrapper for the ClubLog API, holding a user's authentication\ to perform actions on their behalf. """ - def __init__(self, email: str, callsign: str, password: str, - timeout: int = 15): + def __init__(self, email: str, callsign: str, password: str, timeout: int = 15): """Initializes a ClubLogClient object. - Args: - email (str): Email address for the ClubLog account - callsign (str): Callsign for the ClubLog account - password (str): Password for the ClubLog account - timeout (int, optional): Timeout for requests. Defaults to 15. + Args: + email (str): Email address for the ClubLog account + callsign (str): Callsign for the ClubLog account + password (str): Password for the ClubLog account + timeout (int, optional): Timeout for requests. Defaults to 15. """ self.email = email self.callsign = callsign @@ -32,7 +36,6 @@ def __init__(self, email: str, callsign: str, password: str, self.timeout = timeout self.base_url = "https://clublog.org/getadif.php" - def fetch_logbook(self) -> Logbook: """Fetch the user's ClubLog logbook. Raises: @@ -40,11 +43,7 @@ def fetch_logbook(self) -> Logbook: Returns: qspylib.logbook.Logbook: A logbook containing the user's QSOs. """ - data = { - 'email': self.email, - 'password': self.password, - 'call': self.callsign - } + data = {"email": self.email, "password": self.password, "call": self.callsign} # filter down to only used params data = {k: v for k, v in data.items() if v is not None} diff --git a/src/qspylib/eqsl.py b/src/qspylib/eqsl.py index a860e73..c6c38bb 100644 --- a/src/qspylib/eqsl.py +++ b/src/qspylib/eqsl.py @@ -7,199 +7,27 @@ from .logbook import Logbook from ._version import __version__ + # region Exceptions -class eQSLError(Exception): #pylint: disable=invalid-name +class eQSLError(Exception): # pylint: disable=invalid-name """An error occurred interfacing with eQSL.""" + def __init__(self, message="An error occurred interfacing with eQSL"): super().__init__(message) -# endregion - -# region Module Functions -def verify_eqsl(callsign_from: str, callsign_to: str, qso_band: str, #pylint: disable=R0913 - qso_mode: str = None, qso_date: str = None, timeout: int = 15): - """Verify a QSL with eQSL. - - Args: - callsign_from (str): Callsign originating QSO (i.e. N5UP) - callsign_to (str): Callsign receiving QSO (i.e. TE5T) - qso_band (str): Band QSO took place on (i.e. 160m) - qso_mode (str, optional): Mode QSO took place with (i.e. SSB).\ - Defaults to None. - qso_date (str, optional): Date QSO took place (i.e. 01/31/2000).\ - Defaults to None. - timeout (int, optional): Seconds before connection times out.\ - Defaults to 15. - - Raises: - eQSLError: An error occurred interfacing with eQSL. - HTTPError: An error occurred while trying to make a connection. - - Returns: - bool, str: bool of whether the QSO was verified and a str of extra\ - information eQSL reports, such as Authenticity Guaranteed status. - """ - - url = "https://www.eqsl.cc/qslcard/VerifyQSO.cfm" - params = { - 'CallsignFrom': callsign_from, - 'CallsignTo': callsign_to, - 'QSOBand': qso_band, - 'QSOMode': qso_mode, - 'QSODate': qso_date, - } - - with requests.Session() as s: - response = s.get(url, params=params, headers={'user-agent': 'pyQSP/' - + __version__}, timeout=timeout) - if response.status_code == requests.codes.ok: - raw_result = response.text - # TO-DO: make this a case statement - if 'Result - QSO on file' in raw_result: - return True, raw_result - if 'Parameter missing' not in raw_result: - return False, raw_result - raise eQSLError(raw_result) - raise response.raise_for_status() - -def retrieve_graphic(username: str, password: str, callsign_from: str, - qso_year: str, qso_month: str, qso_day: str, - qso_hour: str, qso_minute: str, qso_band: str, - qso_mode: str, timeout: int = 15): - """Retrieve the graphic image for a QSO from eQSL. - - Note: - Not yet implemented. - - Args: - username (str): The callsign of the recipient of the eQSL - password (str): The password of the user's account - callsign_from (str): The callsign of the sender of the eQSL - qso_year (str): YYYY OR YY format date of the QSO - qso_month (str): MM format - qso_day (str): DD format - qso_hour (str): HH format (24-hour time) - qso_minute (str): MM format - qso_band (str): 20m, 80M, 70cm, etc. (case insensitive) - qso_mode (str): Must match exactly and should be an ADIF-compatible mode - timeout (int, optional): time to connection timeout. Defaults to 15. - - Todo: - Implement this function. - - Raises: - NotImplementedError: Not yet implemented. - - """ - raise NotImplementedError - -def get_ag_list(timeout: int = 15): - """Get a list of Authenticity Guaranteed members. - - Args: - timeout (int, optional): Seconds before connection times out. Defaults to 15. - - Raises: - HTTPError: An error occurred while trying to make a connection. - - Returns: - tuple, str: tuple contains a list of string callsigns, and a str header\ - with the date the list was generated - """ - - url = "https://www.eqsl.cc/qslcard/DownloadedFiles/AGMemberList.txt" - - with requests.Session() as s: - response = s.get(url, headers={'user-agent': 'pyQSP/' + __version__}, - timeout=timeout) - if response.status_code == requests.codes.ok: - result_list = [] - result_list += response.text.split('\r\n') - return set(result_list[1:-1]), str(result_list[0]) - raise response.raise_for_status() - -def get_ag_list_dated(timeout: int = 15): - """Get a list of Authenticity Guaranteed eQSL members with the date of\ - their last upload to eQSL. - - Args: - timeout (int, optional): Seconds before connection times out.\ - Defaults to 15. - - Raises: - HTTPError: An error occurred while trying to make a connection. - - Returns: - tuple: First element is a dict with key: callsign and value: date, and\ - second is a header of when this list was generated. - """ - url = "https://www.eqsl.cc/qslcard/DownloadedFiles/AGMemberListDated.txt" - - with requests.Session() as s: - response = s.get(url, headers={'user-agent': 'pyQSP/' + __version__},\ - timeout=timeout) - if response.status_code == requests.codes.ok: - result_list = response.text.split('\r\n') - loc, header = result_list[1:-1], str(result_list[0]) - dict_calls = {} - for pair in loc: - call, date = pair.split(', ') - dict_calls[call] = date - return dict_calls, header - raise response.raise_for_status() - -def get_full_member_list(timeout: int = 15): - """Get a list of all members of QRZ. - - Args: - timeout (int, optional): Seconds before connection times out.\ - Defaults to 15. - - Raises: - HTTPError: An error occurred while trying to make a connection. - - Returns: - dict: key is the callsign and the value is a tuple of: GridSquare, AG,\ - Last Upload - """ - - url = "https://www.eqsl.cc/DownloadedFiles/eQSLMemberList.csv" - - with requests.Session() as s: - response = s.get(url, timeout=timeout) - if response.status_code == requests.codes.ok: - result_list = response.text.split('\r\n')[1:-1] - dict_calls = {} - for row in result_list: - data = row.split(',') - dict_calls[data[0]] = data[1:] - return dict_calls - raise response.raise_for_status() - -def get_users_data(callsign: str): - """Get a specific user's data from the full member list. - Note: - This is incredibly slow. A better method probably involves doing some\ - vectorization, but that would require adding a dependency. - Args: - callsign (str): callsign to get data about - - Returns: - tuple: contains: GridSquare, AG, Last Upload - """ - dict_users: dict = get_full_member_list() - return dict_users.get(callsign) # endregion + # region eQSL API Wrapper -class eQSLClient: #pylint: disable=invalid-name +class eQSLClient: # pylint: disable=invalid-name """API wrapper for eQSL.cc. This class holds a user's authentication to\ perform actions on their behalf. """ - def __init__(self, username: str, password: str, qth_nickname: str = None, - timeout: int = 15): + def __init__( + self, username: str, password: str, qth_nickname: str = None, timeout: int = 15 + ): """Create an eQSLClient object. Args: @@ -215,12 +43,17 @@ def __init__(self, username: str, password: str, qth_nickname: str = None, session = requests.Session() - session.params = {k: v for k, v in { - 'username': username, - 'password': password, - 'QTHNickname': qth_nickname }.items() if v is not None} + session.params = { + k: v + for k, v in { + "username": username, + "password": password, + "QTHNickname": qth_nickname, + }.items() + if v is not None + } - session.headers = {'User-Agent': 'pyQSP/' + __version__} + session.headers = {"User-Agent": "pyQSP/" + __version__} self.session = session def set_timeout(self, timeout: int): @@ -245,19 +78,24 @@ def get_last_upload_date(self): DD-MMM-YYYY at HH:mm UTC """ with self.session as s: - r = s.get(self.base_url + 'DisplayLastUploadDate.cfm', - timeout=self.timeout) + r = s.get(self.base_url + "DisplayLastUploadDate.cfm", timeout=self.timeout) if r.status_code == requests.codes.ok: - success_txt = 'Your last ADIF upload was' + success_txt = "Your last ADIF upload was" if success_txt in r.text: - return r.text[r.text.index('(')+1:r.text.index(')')] + return r.text[r.text.index("(") + 1 : r.text.index(")")] raise eQSLError(r.text) raise r.raise_for_status() - def fetch_inbox(self, limit_date_lo:str=None, limit_date_hi:str=None, #pylint: disable=R0914,R0913 - rcvd_since:str=None, confirmed_only:str=None, - unconfirmed_only:str=None, archive:str=None, - ham_only:str=None) -> Logbook: + def fetch_inbox( + self, + limit_date_lo: str = None, + limit_date_hi: str = None, # pylint: disable=R0914,R0913 + rcvd_since: str = None, + confirmed_only: str = None, + unconfirmed_only: str = None, + archive: str = None, + ham_only: str = None, + ) -> Logbook: """Fetches INCOMING QSOs, from the user's eQSL Inbox. Args: @@ -295,37 +133,47 @@ def fetch_inbox(self, limit_date_lo:str=None, limit_date_hi:str=None, #pylint: d qspylib.logbook.Logbook: A logbook containing the user's QSOs. """ params = { - 'LimitDateLo': limit_date_lo, - 'LimitDateHi': limit_date_hi, - 'RcvdSince': rcvd_since, - 'ConfirmedOnly': confirmed_only, - 'UnconfirmedOnly': unconfirmed_only, - 'Archive': archive, - 'HamOnly': ham_only + "LimitDateLo": limit_date_lo, + "LimitDateHi": limit_date_hi, + "RcvdSince": rcvd_since, + "ConfirmedOnly": confirmed_only, + "UnconfirmedOnly": unconfirmed_only, + "Archive": archive, + "HamOnly": ham_only, } # filter down to only used params params = {k: v for k, v in params.items() if v is not None} with self.session as s: - r = s.get(self.base_url + "DownloadInBox.cfm", params=params, - timeout=self.timeout) + r = s.get( + self.base_url + "DownloadInBox.cfm", params=params, timeout=self.timeout + ) if r.status_code == requests.codes.ok: - adif_found_txt = 'Your ADIF log file has been built' - adif_status = r.text.index(adif_found_txt) if adif_found_txt in r.text else -1 + adif_found_txt = "Your ADIF log file has been built" + adif_status = ( + r.text.index(adif_found_txt) if adif_found_txt in r.text else -1 + ) if adif_status < 0: - raise eQSLError('Failed to generate ADIF.') + raise eQSLError("Failed to generate ADIF.") adif_link_start_idx = r.text.index('
  • .ADI file') - adif_link = self.base_url + r.text[adif_link_start_idx:adif_link_end_idx] + adif_link = ( + self.base_url + r.text[adif_link_start_idx:adif_link_end_idx] + ) adif_response = requests.get(adif_link, timeout=self.timeout) if adif_response.status_code == requests.codes.ok: return Logbook(self.callsign, adif_response.text) raise r.raise_for_status() raise r.raise_for_status() - def fetch_inbox_qsls(self, limit_date_lo:str=None, limit_date_hi:str=None, #pylint: disable = R0913 - rcvd_since:str=None, archive:str=None, - ham_only:str=None) -> Logbook: + def fetch_inbox_qsls( + self, + limit_date_lo: str = None, + limit_date_hi: str = None, # pylint: disable = R0913 + rcvd_since: str = None, + archive: str = None, + ham_only: str = None, + ) -> Logbook: """Fetches INCOMING QSLs, from the user's eQSL Inbox. limit_date_lo (str, optional): Earliest QSO date to download\ @@ -355,8 +203,9 @@ def fetch_inbox_qsls(self, limit_date_lo:str=None, limit_date_hi:str=None, #pyli Returns: qspylib.logbook.Logbook: A logbook containing the user's QSOs. """ - return self.fetch_inbox(limit_date_lo, limit_date_hi, rcvd_since, 'Y', - None, archive, ham_only) + return self.fetch_inbox( + limit_date_lo, limit_date_hi, rcvd_since, "Y", None, archive, ham_only + ) def fetch_outbox(self): """Fetches OUTGOING QSOs, from the user's eQSL Outbox. @@ -369,19 +218,230 @@ def fetch_outbox(self): qspylib.logbook.Logbook: A logbook containing the user's QSOs. """ with self.session as s: - r = s.get(self.base_url + "DownloadADIF.cfm", - timeout=self.timeout) + r = s.get(self.base_url + "DownloadADIF.cfm", timeout=self.timeout) if r.status_code == requests.codes.ok: - adif_found_txt = 'Your ADIF log file has been built' - adif_status = r.text.index(adif_found_txt) if adif_found_txt in r.text else -1 + adif_found_txt = "Your ADIF log file has been built" + adif_status = ( + r.text.index(adif_found_txt) if adif_found_txt in r.text else -1 + ) if adif_status < 0: - raise eQSLError('Failed to generate ADIF.') + raise eQSLError("Failed to generate ADIF.") adif_link_start_idx = r.text.index('
  • .ADI file') - adif_link = self.base_url + r.text[adif_link_start_idx:adif_link_end_idx] + adif_link = ( + self.base_url + r.text[adif_link_start_idx:adif_link_end_idx] + ) adif_response = requests.get(adif_link, timeout=self.timeout) - if adif_response.status_code == requests.codes.ok : + if adif_response.status_code == requests.codes.ok: return Logbook(self.callsign, adif_response.text) raise r.raise_for_status() raise r.raise_for_status() + + # region Static Methods + @staticmethod + def verify_eqsl( + callsign_from: str, + callsign_to: str, + qso_band: str, # pylint: disable=R0913 + qso_mode: str = None, + qso_date: str = None, + timeout: int = 15, + ): + """Verify a QSL with eQSL. + + Args: + callsign_from (str): Callsign originating QSO (i.e. N5UP) + callsign_to (str): Callsign receiving QSO (i.e. TE5T) + qso_band (str): Band QSO took place on (i.e. 160m) + qso_mode (str, optional): Mode QSO took place with (i.e. SSB).\ + Defaults to None. + qso_date (str, optional): Date QSO took place (i.e. 01/31/2000).\ + Defaults to None. + timeout (int, optional): Seconds before connection times out.\ + Defaults to 15. + + Raises: + eQSLError: An error occurred interfacing with eQSL. + HTTPError: An error occurred while trying to make a connection. + + Returns: + bool, str: bool of whether the QSO was verified and a str of extra\ + information eQSL reports, such as Authenticity Guaranteed status. + """ + + url = "https://www.eqsl.cc/qslcard/VerifyQSO.cfm" + params = { + "CallsignFrom": callsign_from, + "CallsignTo": callsign_to, + "QSOBand": qso_band, + "QSOMode": qso_mode, + "QSODate": qso_date, + } + + with requests.Session() as s: + response = s.get( + url, + params=params, + headers={"user-agent": "pyQSP/" + __version__}, + timeout=timeout, + ) + if response.status_code == requests.codes.ok: + raw_result = response.text + # TO-DO: make this a case statement + if "Result - QSO on file" in raw_result: + return True, raw_result + if "Parameter missing" not in raw_result: + return False, raw_result + raise eQSLError(raw_result) + raise response.raise_for_status() + + @staticmethod + def retrieve_graphic( + username: str, + password: str, + callsign_from: str, + qso_year: str, + qso_month: str, + qso_day: str, + qso_hour: str, + qso_minute: str, + qso_band: str, + qso_mode: str, + timeout: int = 15, + ): + """Retrieve the graphic image for a QSO from eQSL. + + Note: + Not yet implemented. + + Args: + username (str): The callsign of the recipient of the eQSL + password (str): The password of the user's account + callsign_from (str): The callsign of the sender of the eQSL + qso_year (str): YYYY OR YY format date of the QSO + qso_month (str): MM format + qso_day (str): DD format + qso_hour (str): HH format (24-hour time) + qso_minute (str): MM format + qso_band (str): 20m, 80M, 70cm, etc. (case insensitive) + qso_mode (str): Must match exactly and should be an ADIF-compatible mode + timeout (int, optional): time to connection timeout. Defaults to 15. + + Todo: + Implement this function. + + Raises: + NotImplementedError: Not yet implemented. + + """ + raise NotImplementedError + + @staticmethod + def get_ag_list(timeout: int = 15): + """Get a list of Authenticity Guaranteed members. + + Args: + timeout (int, optional): Seconds before connection times out. Defaults to 15. + + Raises: + HTTPError: An error occurred while trying to make a connection. + + Returns: + tuple, str: tuple contains a list of string callsigns, and a str header\ + with the date the list was generated + """ + + url = "https://www.eqsl.cc/qslcard/DownloadedFiles/AGMemberList.txt" + + with requests.Session() as s: + response = s.get( + url, headers={"user-agent": "pyQSP/" + __version__}, timeout=timeout + ) + if response.status_code == requests.codes.ok: + result_list = [] + result_list += response.text.split("\r\n") + return set(result_list[1:-1]), str(result_list[0]) + raise response.raise_for_status() + + @staticmethod + def get_ag_list_dated(timeout: int = 15): + """Get a list of Authenticity Guaranteed eQSL members with the date of\ + their last upload to eQSL. + + Args: + timeout (int, optional): Seconds before connection times out.\ + Defaults to 15. + + Raises: + HTTPError: An error occurred while trying to make a connection. + + Returns: + tuple: First element is a dict with key: callsign and value: date, and\ + second is a header of when this list was generated. + """ + url = "https://www.eqsl.cc/qslcard/DownloadedFiles/AGMemberListDated.txt" + + with requests.Session() as s: + response = s.get( + url, headers={"user-agent": "pyQSP/" + __version__}, timeout=timeout + ) + if response.status_code == requests.codes.ok: + result_list = response.text.split("\r\n") + loc, header = result_list[1:-1], str(result_list[0]) + dict_calls = {} + for pair in loc: + call, date = pair.split(", ") + dict_calls[call] = date + return dict_calls, header + raise response.raise_for_status() + + @staticmethod + def get_full_member_list(timeout: int = 15): + """Get a list of all members of QRZ. + + Args: + timeout (int, optional): Seconds before connection times out.\ + Defaults to 15. + + Raises: + HTTPError: An error occurred while trying to make a connection. + + Returns: + dict: key is the callsign and the value is a tuple of: GridSquare, AG,\ + Last Upload + """ + + url = "https://www.eqsl.cc/DownloadedFiles/eQSLMemberList.csv" + + with requests.Session() as s: + response = s.get(url, timeout=timeout) + if response.status_code == requests.codes.ok: + result_list = response.text.split("\r\n")[1:-1] + dict_calls = {} + for row in result_list: + data = row.split(",") + dict_calls[data[0]] = data[1:] + return dict_calls + raise response.raise_for_status() + + @staticmethod + def get_users_data(callsign: str): + """Get a specific user's data from the full member list. + + Note: + This is incredibly slow. A better method probably involves doing some\ + vectorization, but that would require adding a dependency. + + Args: + callsign (str): callsign to get data about + + Returns: + tuple: contains: GridSquare, AG, Last Upload + """ + dict_users: dict = eQSLClient.get_full_member_list() + return dict_users.get(callsign) + + # endregion + + # endregion diff --git a/src/qspylib/logbook.py b/src/qspylib/logbook.py index d01dc40..2b97390 100644 --- a/src/qspylib/logbook.py +++ b/src/qspylib/logbook.py @@ -5,7 +5,6 @@ """ import adif_io -# classes class QSO: """A hambaseio QSO obj. Contains simple info on a QSO. @@ -18,8 +17,16 @@ class QSO: time_on (str): time start of QSO qsl_rcvd (str): if QSO has been confirmed """ - def __init__(self, their_call:str, band:str, mode:str, qso_date:str, - time_on:str, qsl_rcvd:str='N'): + + def __init__( + self, + their_call: str, + band: str, + mode: str, + qso_date: str, + time_on: str, + qsl_rcvd: str = "N", + ): """Initializes a QSO object. Args: @@ -44,12 +51,17 @@ def __str__(self): def __eq__(self, other): if isinstance(other, QSO): - if self.their_call == other.their_call and self.band == other.band\ - and self.mode == other.mode and self.qso_date\ - == other.qso_date and self.time_on == other.time_on: + if ( + self.their_call == other.their_call + and self.band == other.band + and self.mode == other.mode + and self.qso_date == other.qso_date + and self.time_on == other.time_on + ): return True return False + class Logbook: """A Logbook has both an adi field, holding all fields parsed from an .adi\ log per QSO, and a simplified log field, holding a simplified set of\ @@ -91,8 +103,12 @@ def __str__(self): def __eq__(self, other): if isinstance(other, Logbook): - if self.callsign == other.callsign and self.adi == other.adi and\ - self.header == other.header and self.log == other.log: + if ( + self.callsign == other.callsign + and self.adi == other.adi + and self.header == other.header + and self.log == other.log + ): return True return False @@ -121,8 +137,8 @@ def discard_qso(self, contact: adif_io.QSO): self.log.remove(logified_qso) self.adi.remove(contact) -# functions of the module +# region Module Functions def qso_from_adi(contact: adif_io.QSO): """Transforms an adif_io.QSO object into a qspylib.logbook.QSO object. @@ -132,9 +148,20 @@ def qso_from_adi(contact: adif_io.QSO): Returns: qspylib.logbook.QSO: a qspylib QSO object """ - qsl_rcvd = contact.get('QSL_RCVD') - qrz_qsl_dte = contact.get('app_qrzlog_qsldate') - eqsl_qsl_rcvd = contact.get('eqsl_qsl_rcvd') - qso_confirmed = 'Y' if qsl_rcvd == 'Y' or qrz_qsl_dte or eqsl_qsl_rcvd == 'Y' else 'N' - return QSO(contact['CALL'], contact['BAND'], contact['MODE'], - contact['QSO_DATE'], contact['TIME_ON'], qso_confirmed) + qsl_rcvd = contact.get("QSL_RCVD") + qrz_qsl_dte = contact.get("app_qrzlog_qsldate") + eqsl_qsl_rcvd = contact.get("eqsl_qsl_rcvd") + qso_confirmed = ( + "Y" if qsl_rcvd == "Y" or qrz_qsl_dte or eqsl_qsl_rcvd == "Y" else "N" + ) + return QSO( + contact["CALL"], + contact["BAND"], + contact["MODE"], + contact["QSO_DATE"], + contact["TIME_ON"], + qso_confirmed, + ) + + +# endregion diff --git a/src/qspylib/lotw.py b/src/qspylib/lotw.py index f50f174..67ab074 100644 --- a/src/qspylib/lotw.py +++ b/src/qspylib/lotw.py @@ -3,90 +3,42 @@ # file, You can obtain one at https://mozilla.org/MPL/2.0/. """Functions and classes related to querying the LotW API. """ +from datetime import datetime import requests from .logbook import Logbook from ._version import __version__ -# exceptions +# region Exceptions + class RetrievalFailure(Exception): """A failure to retrieve information from LOTW. This can be due to a\ connection error, or a bad response from the server. """ - def __init__(self, message="Failed to retrieve information. Confirm log-in \ - credentials are correct."): - self.message=message + + def __init__( + self, + message="Failed to retrieve information. Confirm log-in \ + credentials are correct.", + ): + self.message = message super().__init__(self, message) + class UploadError(Exception): """A failure to upload a file to LOTW. This is due to a file being\ rejected by LOTW. The error message from LOTW is provided in the exception. """ + def __init__(self, message="Failed to upload file."): - self.message=message + self.message = message super().__init__(self, message) -# functions - -def get_last_upload(timeout: int = 15): - """Queries LOTW for a list of callsigns and date they last uploaded. - - Args: - timeout (int, optional): time in seconds to connection timeout.\ - Defaults to 15. - - Raises: - HTTPError: An error occurred while trying to make a connection. - - Returns: - csv: a csv of callsigns and last upload date - """ - - url = 'https://lotw.arrl.org/lotw-user-activity.csv' - - with requests.Session() as s: - response = s.get(url, timeout=timeout) - if response.status_code == requests.codes.ok: - return response.text - raise response.raise_for_status() - -def upload_logbook(file, timeout:int=120): - """Given a .tq5 or .tq8, uploads it to LOTW. - - Note: - The "handing this a file" part of this needs to be implemented. - Args: - file (_type_): file to be uploaded - timeout (int, optional): time in seconds to connection timeout.\ - Defaults to 120. +# endregion - Raises: - UploadFailure: The upload was rejected by LotW. - HTTPError: An error occurred while trying to make a connection. - - Returns: - str: Return message from LOTW on file upload. - """ - - upload_url = "https://lotw.arrl.org/lotw/upload" - - data = {'upfile': file} - - with requests.Session() as s: - response = s.post(upload_url, data, timeout=timeout) - if response.status_code == requests.codes.ok: - result = response.text - result_start_idx = result.index('') - upl_result = result[result_start_idx:result_end_idx] - upl_message = str(result[result.index('')]) - if 'rejected' in upl_result: - raise UploadError(upl_message) - return upl_message - raise response.raise_for_status() +# region LotW API class LOTWClient: """Wrapper for LOTW API functionality that requires a logged-in session. Fetching returns a Logbook object that must be assigned to something._ @@ -104,60 +56,78 @@ def __init__(self, username: str, password: str): self.base_url = "https://lotw.arrl.org/lotwuser/" session = requests.Session() - session.params = {'login': username, - 'password': password } - session.headers = {'User-Agent': 'pyQSP/' + __version__} + session.params = {"login": username, "password": password} + session.headers = {"User-Agent": "pyQSP/" + __version__} self.session = session - def fetch_logbook(self, qso_query=1, qso_qsl='yes', qso_qslsince=None, - qso_qsorxsince=None, qso_owncall=None, qso_callsign=None, - qso_mode=None, qso_band=None,qso_dxcc=None, - qso_startdate=None, qso_starttime=None, qso_enddate=None, - qso_endtime=None, qso_mydetail=None,qso_qsldetail=None, - qsl_withown=None): - """_summary_ + def fetch_logbook( + self, + qso_query: int = 1, + qso_qsl: str = "yes", + qso_qslsince: str = None, + qso_qsorxsince: str = None, + qso_owncall: str = None, + qso_callsign: str = None, + qso_mode: str = None, + qso_band: str = None, + qso_dxcc: str = None, + qso_startdate: str = None, + qso_starttime: str = None, + qso_enddate: str = None, + qso_endtime: str = None, + qso_mydetail: str = None, + qso_qsldetail: str = None, + qsl_withown: str = None, + ) -> Logbook: + """Fetches the user's logbook from LOTW. This function exposes *all*\ + of the parameters that can be passed to the LOTW API, including\ + ones that may be "contradictory" if used together. + + Note: + A provided helper that uses this function may be easier to use in\ + most cases. Args: qso_query (int, optional): If absent, ADIF file will contain no\ QSO records. Defaults to 1. - qso_qsl (str, optional): If "yes", only QSL records are returned \ + qso_qsl (str, optional): If "yes", only QSL records are returned\ (can be 'yes' or 'no'). Defaults to 'yes'. - qso_qslsince (_type_, optional): QSLs since specified datetime \ - (YYYY-MM-DD HH:MM:SS). Ignored unless qso_qsl="yes". \ - Defaults to None. - qso_qsorxsince (_type_, optional): QSOs received since specified \ + qso_qslsince (str, optional): QSLs since specified datetime\ + (YYYY-MM-DD HH:MM:SS). Ignored unless qso_qsl="yes".\ + Defaults to None. + qso_qsorxsince (str, optional): QSOs received since specified\ datetime. Ignored unless qso_qsl="no". Defaults to None. - qso_owncall (_type_, optional): Returns records where "own" call \ + qso_owncall (str, optional): Returns records where "own" call\ sign matches. Defaults to None. - qso_callsign (_type_, optional): Returns records where "worked" \ + qso_callsign (str, optional): Returns records where "worked"\ call sign matches. Defaults to None. - qso_mode (_type_, optional): Returns records where mode matches. \ + qso_mode (str, optional): Returns records where mode matches.\ Defaults to None. - qso_band (_type_, optional): Returns records where band matches. \ + qso_band (str, optional): Returns records where band matches.\ Defaults to None. - qso_dxcc (_type_, optional): Returns matching DXCC entities, \ + qso_dxcc (str, optional): Returns matching DXCC entities,\ implies qso_qsl='yes'. Defaults to None. - qso_startdate (_type_, optional): Returns only records with a QSO \ + qso_startdate (str, optional): Returns only records with a QSO\ date on or after the specified value. Defaults to None. - qso_starttime (_type_, optional): Returns only records with a QSO \ - time at or after the specified value on the starting date. \ - This value is ignored if qso_startdate is not provided. \ + qso_starttime (str, optional): Returns only records with a QSO\ + time at or after the specified value on the starting date.\ + This value is ignored if qso_startdate is not provided.\ Defaults to None. - qso_enddate (_type_, optional): Returns only records with a QSO \ + qso_enddate (str, optional): Returns only records with a QSO\ date on or before the specified value. Defaults to None. - qso_endtime (_type_, optional): Returns only records with a QSO \ - time at or before the specified value on the ending date. \ - This value is ignored if qso_enddate is not provided. \ + qso_endtime (str, optional): Returns only records with a QSO\ + time at or before the specified value on the ending date.\ + This value is ignored if qso_enddate is not provided.\ Defaults to None. - qso_mydetail (_type_, optional): If "yes", returns fields that \ - contain the Logging station's location data, if any. \ + qso_mydetail (str, optional): If "yes", returns fields that\ + contain the Logging station's location data, if any.\ Defaults to None. - qso_qsldetail (_type_, optional): If "yes", returns fields that \ - contain the QSLing station's location data, if any. \ + qso_qsldetail (str, optional): If "yes", returns fields that\ + contain the QSLing station's location data, if any.\ Defaults to None. - qsl_withown (_type_, optional): If "yes", each record contains the \ - STATION_CALLSIGN and APP_LoTW_OWNCALL fields to identify the \ + qsl_withown (str, optional): If "yes", each record contains the\ + STATION_CALLSIGN and APP_LoTW_OWNCALL fields to identify the\ "own" call sign used for the QSO. Defaults to None. Raises: @@ -171,35 +141,171 @@ def fetch_logbook(self, qso_query=1, qso_qsl='yes', qso_qslsince=None, log_url = "lotwreport.adi" params = { - 'qso_query': qso_query, - 'qso_qsl' : qso_qsl, - 'qso_qslsince': qso_qslsince, - 'qso_qsorxsince': qso_qsorxsince, - 'qso_owncall': qso_owncall, - 'qso_callsign': qso_callsign, - 'qso_mode': qso_mode, - 'qso_band': qso_band, - 'qso_dxcc': qso_dxcc, - 'qso_startdate': qso_startdate, - 'qso_starttime': qso_starttime, - 'qso_enddate': qso_enddate, - 'qso_endtime': qso_endtime, - 'qso_mydetail': qso_mydetail, - 'qso_qsldetail': qso_qsldetail, - 'qsl_withown': qsl_withown + "qso_query": qso_query, + "qso_qsl": qso_qsl, + "qso_qslsince": qso_qslsince, + "qso_qsorxsince": qso_qsorxsince, + "qso_owncall": qso_owncall, + "qso_callsign": qso_callsign, + "qso_mode": qso_mode, + "qso_band": qso_band, + "qso_dxcc": qso_dxcc, + "qso_startdate": qso_startdate, + "qso_starttime": qso_starttime, + "qso_enddate": qso_enddate, + "qso_endtime": qso_endtime, + "qso_mydetail": qso_mydetail, + "qso_qsldetail": qso_qsldetail, + "qsl_withown": qsl_withown, } # filter down to only used params params = {k: v for k, v in params.items() if v is not None} with self.session as s: response = s.get(self.base_url + log_url, params=params) - if '' not in response.text: + if "" not in response.text: raise RetrievalFailure if response.status_code == requests.codes.ok: return Logbook(self.username, response.text) raise response.raise_for_status() - def get_dxcc_credit(self, entity:str=None, ac_acct:str=None): + def fetch_qsls( + self, + qslsince: datetime = None, + owncall: str = None, + callsign: str = None, + mode: str = None, + band: str = None, + dxcc: str = None, + start_datetime: datetime = None, + end_datetime: datetime = None, + ) -> Logbook: + """Fetches matching QSLs (confirmed QSOs) from LOTW. + + Args: + qslsince (datetime, optional): QSLs since specified datetime\ + (YYYY-MM-DD HH:MM:SS). Defaults to None. + owncall (str, optional): Returns records where "own" call\ + sign matches. Defaults to None. + callsign (str, optional): Returns records where "worked"\ + call sign matches. Defaults to None. + mode (str, optional): Returns records where mode matches.\ + Defaults to None. + band (str, optional): Returns records where band matches.\ + Defaults to None. + dxcc (str, optional): Returns matching DXCC entities.\ + Defaults to None. + start_datetime (datetime, optional): Returns only records with a QSO\ + date on or after the specified value. Optionally, includes HH:MM:SS.\ + Defaults to None. + end_datetime (datetime, optional): Returns only records with a QSO\ + time at or before the specified value. Optionally, includes HH:MM:SS.\ + Defaults to None. + + Raises: + RetrievalFailure: A failure to retrieve information from LOTW.\ + Contains the error received from LOTW. + HTTPError: An error occurred while trying to make a connection. + + Returns: + qspylib.logbook.Logbook: A logbook containing the user's QSOs. + """ + # split datetime into date and time + startdate, starttime = LOTWClient.__split_datetime(start_datetime) + enddate, endtime = LOTWClient.__split_datetime(end_datetime) + if qslsince is not None: + if ":" in qslsince: + qslsince = qslsince.strftime("%Y-%m-%d %H:%M:%S") + else: + qslsince = qslsince.strftime("%Y-%m-%d") + + return self.fetch_logbook( + 1, + "yes", + qso_qslsince=qslsince, + qso_owncall=owncall, + qso_callsign=callsign, + qso_mode=mode, + qso_band=band, + qso_dxcc=dxcc, + qso_startdate=startdate, + qso_starttime=starttime, + qso_enddate=enddate, + qso_endtime=endtime, + qso_mydetail="yes", + qso_qsldetail="yes", + qsl_withown="yes", + ) + + def fetch_qsos( + self, + qsorxsince: datetime = None, + owncall: str = None, + callsign: str = None, + mode: str = None, + band: str = None, + dxcc: str = None, + start_datetime: datetime = None, + end_datetime: datetime = None, + ) -> Logbook: + """Fetches matching QSOs (confirmed & unconfirmed) from LOTW. + + Args: + qsorxsince (datetime, optional): QSOs since specified datetime\ + (YYYY-MM-DD HH:MM:SS). Defaults to None. + owncall (str, optional): Returns records where "own" call\ + sign matches. Defaults to None. + callsign (str, optional): Returns records where "worked"\ + call sign matches. Defaults to None. + mode (str, optional): Returns records where mode matches.\ + Defaults to None. + band (str, optional): Returns records where band matches.\ + Defaults to None. + dxcc (str, optional): Returns matching DXCC entities.\ + Defaults to None. + start_datetime (datetime, optional): Returns only records with a QSO\ + date on or after the specified value. Optionally, includes HH:MM:SS.\ + Defaults to None. + end_datetime (datetime, optional): Returns only records with a QSO\ + time at or before the specified value. Optionally, includes HH:MM:SS.\ + Defaults to None. + + Raises: + RetrievalFailure: A failure to retrieve information from LOTW.\ + Contains the error received from LOTW. + HTTPError: An error occurred while trying to make a connection. + + Returns: + qspylib.logbook.Logbook: A logbook containing the user's QSOs. + """ + startdate, starttime = LOTWClient.__split_datetime(start_datetime) + enddate, endtime = LOTWClient.__split_datetime(end_datetime) + if qsorxsince is not None: + if ":" in qsorxsince: + qsorxsince = qsorxsince.strftime("%Y-%m-%d %H:%M:%S") + else: + qsorxsince = qsorxsince.strftime("%Y-%m-%d") + + return self.fetch_logbook( + self, + 1, + "no", + qso_qsorxsince=qsorxsince, + qso_owncall=owncall, + qso_callsign=callsign, + qso_mode=mode, + qso_band=band, + qso_dxcc=dxcc, + qso_startdate=startdate, + qso_starttime=starttime, + qso_enddate=enddate, + qso_endtime=endtime, + qso_mydetail="yes", + qso_qsldetail="yes", + qsl_withown="yes", + ) + + def get_dxcc_credit(self, entity: str = None, ac_acct: str = None) -> Logbook: """Gets DXCC award account credit, optionally for a specific DXCC \ Entity Code specified via entity. @@ -222,10 +328,7 @@ def get_dxcc_credit(self, entity:str=None, ac_acct:str=None): qspylib.logbook.Logbook: A logbook containing the user's QSOs. """ dxcc_url = "logbook/qslcards.php" - params = { - 'entity': entity, - 'ac_acct': ac_acct - } + params = {"entity": entity, "ac_acct": ac_acct} # filter down to only used params params = {k: v for k, v in params.items() if v is not None} @@ -234,7 +337,98 @@ def get_dxcc_credit(self, entity:str=None, ac_acct:str=None): if response.status_code == requests.codes.ok: # lotw lies, and claims an will be absent from bad # outputs, but it's there, so we'll do something else. - if 'ARRL Logbook of the World DXCC QSL Card Report' not in response.text[:46]: + if ( + "ARRL Logbook of the World DXCC QSL Card Report" + not in response.text[:46] + ): raise RetrievalFailure(response.text) return Logbook(self.username, response.text) raise response.raise_for_status() + + # region Static Functions + @staticmethod + def __split_datetime(dt: datetime): + """Splits a datetime into a date and time, if a time is present. + + Args: + dt (datetime): Datetime containing YYYY-MM-DD, and optionally, HH:MM:SS. + + Returns: + tuple[str, str]: Tuple containing the date and time, respectively. + """ + date, time = None, None + date = dt.strftime("%Y-%m-%d") + if ":" in dt: + time = dt.strftime("%H:%M:%S") + + return date, time + + @staticmethod + def get_last_upload(timeout: int = 15): + """Queries LOTW for a list of callsigns and date they last uploaded. + + Args: + timeout (int, optional): time in seconds to connection timeout.\ + Defaults to 15. + + Raises: + HTTPError: An error occurred while trying to make a connection. + + Returns: + csv: a csv of callsigns and last upload date + """ + + url = "https://lotw.arrl.org/lotw-user-activity.csv" + + with requests.Session() as s: + response = s.get(url, timeout=timeout) + if response.status_code == requests.codes.ok: + return response.text + raise response.raise_for_status() + + @staticmethod + def upload_logbook(file, timeout: int = 120): + """Given a .tq5 or .tq8, uploads it to LOTW. + + Note: + The "handing this a file" part of this needs to be implemented. + + Args: + file (file): file to be uploaded + timeout (int, optional): time in seconds to connection timeout.\ + Defaults to 120. + + Raises: + UploadFailure: The upload was rejected by LotW. + HTTPError: An error occurred while trying to make a connection. + + Returns: + str: Return message from LOTW on file upload. + """ + + upload_url = "https://lotw.arrl.org/lotw/upload" + + data = {"upfile": file} + + with requests.Session() as s: + response = s.post(upload_url, data, timeout=timeout) + if response.status_code == requests.codes.ok: + result = response.text + result_start_idx = result.index("") + upl_result = result[result_start_idx:result_end_idx] + upl_message = str( + result[ + result.index("") + ] + ) + if "rejected" in upl_result: + raise UploadError(upl_message) + return upl_message + raise response.raise_for_status() + + # endregion + + +# endregion diff --git a/src/qspylib/qrz.py b/src/qspylib/qrz.py index e7dd98a..b700092 100644 --- a/src/qspylib/qrz.py +++ b/src/qspylib/qrz.py @@ -3,37 +3,41 @@ # file, You can obtain one at https://mozilla.org/MPL/2.0/. """Functions and classes related to querying the QRZ APIs. """ -#region Imports +# region Imports import html from collections import OrderedDict from typing import Any from urllib.parse import urlparse, parse_qs import requests import xmltodict -#import adif_io from .logbook import Logbook from ._version import __version__ # constants MAX_NUM_RETRIES = 1 +# endregion -#endregion -#region Exceptions +# region Exceptions class QRZInvalidSession(Exception): - """Error for when session is invalid. - """ - def __init__(self, message="Got no session key back. This session is \ - invalid."): - self.message=message + """Error for when session is invalid.""" + + def __init__( + self, + message="Got no session key back. This session is \ + invalid.", + ): + self.message = message super().__init__(self, message) -#endregion -#region Client Classes + +# endregion + + +# region Client Classes class QRZLogbookClient: - """API wrapper for accessing QRZ Logbook data. - """ + """API wrapper for accessing QRZ Logbook data.""" def __init__(self, key: str, timeout: int = 15): """Initializes a QRZLogbookClient object. @@ -47,13 +51,13 @@ def __init__(self, key: str, timeout: int = 15): self.base_url = "https://logbook.qrz.com/api" self.timeout = timeout self.headers = { - 'User-Agent': 'pyQSP/' + __version__, - 'Accept-Encoding': 'gzip, deflate', - 'Accept': '*/*', - 'Connection': 'keep-alive' + "User-Agent": "pyQSP/" + __version__, + "Accept-Encoding": "gzip, deflate", + "Accept": "*/*", + "Connection": "keep-alive", } - def fetch_logbook(self, option:str=None) -> Logbook: + def fetch_logbook(self, option: str = None) -> Logbook: """Fetches the logbook corresponding to the Client's API Key. Note: @@ -74,26 +78,24 @@ def fetch_logbook(self, option:str=None) -> Logbook: Returns: qspylib.logbook.Logbook: A logbook containing the user’s QSOs. """ - data = { - 'KEY': self.key, - 'ACTION': 'FETCH', - 'OPTION': option - } + data = {"KEY": self.key, "ACTION": "FETCH", "OPTION": option} # filter down to only used params data = {k: v for k, v in data.items() if v is not None} - response = requests.post(self.base_url, data=data, - headers=self.headers, timeout=self.timeout) + response = requests.post( + self.base_url, data=data, headers=self.headers, timeout=self.timeout + ) if response.status_code == requests.codes.ok: - response_dict = parse_qs(urlparse("ws://a.a/?" - + html.unescape(response.text))[4], - strict_parsing=True) + response_dict = parse_qs( + urlparse("ws://a.a/?" + html.unescape(response.text))[4], + strict_parsing=True, + ) return QRZLogbookClient.__stringify(self, response_dict["ADIF"]) - #iff we didn't manage to return from a logged in session, raise an error + # iff we didn't manage to return from a logged in session, raise an error raise response.raise_for_status() - #def fetch_logbook_paged(self, per_page:int=50, option:str=None): + # def fetch_logbook_paged(self, per_page:int=50, option:str=None): # # data = { # 'KEY': self.key, @@ -123,7 +125,7 @@ def fetch_logbook(self, option:str=None) -> Logbook: # } # raise NotImplementedError - def delete_record(self, list_logids:list) -> dict[str, list[str]]: + def delete_record(self, list_logids: list) -> dict[str, list[str]]: """Deletes log records from the logbook corresponding to the\ Client's API Key. @@ -142,23 +144,21 @@ def delete_record(self, list_logids:list) -> dict[str, list[str]]: from QRZ. This should include the RESULT, COUNT of records\ deleted, and LOGIDs not found, if any. """ - data = { - 'KEY': self.key, - 'ACTION': 'DELETE', - 'LOGIDS': ','.join(list_logids) - } - response = requests.post(self.base_url, data=data, - headers=self.headers, timeout=self.timeout) + data = {"KEY": self.key, "ACTION": "DELETE", "LOGIDS": ",".join(list_logids)} + response = requests.post( + self.base_url, data=data, headers=self.headers, timeout=self.timeout + ) if response.status_code == requests.codes.ok: - response_dict = parse_qs(urlparse("ws://a.a/?" - + html.unescape(response.text))[4], - strict_parsing=True) + response_dict = parse_qs( + urlparse("ws://a.a/?" + html.unescape(response.text))[4], + strict_parsing=True, + ) return response_dict - #iff we didn't manage to return from a logged in session, raise an error + # iff we didn't manage to return from a logged in session, raise an error raise response.raise_for_status() - def check_status(self, list_logids:list=None) -> dict[str, list[str]]: + def check_status(self, list_logids: list = None) -> dict[str, list[str]]: """Gets the status of a logbook based on the API Key supplied\ to the Client. This status can include information about the logbook\ like the owner, logbook name, DXCC count, confirmed QSOs, start and\ @@ -176,39 +176,45 @@ def check_status(self, list_logids:list=None) -> dict[str, list[str]]: field by QRZ's API, e.g. DXCC count is 'DXCC_COUNT', confirmed\ is 'CONFIRMED', etc. """ - data = { - 'KEY': self.key, - 'ACTION': 'STATUS', - 'LOGIDS': ','.join(list_logids) - } + data = {"KEY": self.key, "ACTION": "STATUS", "LOGIDS": ",".join(list_logids)} - response = requests.post(self.base_url, data=data, - headers=self.headers, timeout=self.timeout) + response = requests.post( + self.base_url, data=data, headers=self.headers, timeout=self.timeout + ) if response.status_code == requests.codes.ok: - response_dict = parse_qs(urlparse("ws://a.a/?" - + html.unescape(response.text))[4], - strict_parsing=True) + response_dict = parse_qs( + urlparse("ws://a.a/?" + html.unescape(response.text))[4], + strict_parsing=True, + ) return response_dict - #iff we didn't manage to return from a logged in session, raise an error + # iff we didn't manage to return from a logged in session, raise an error raise response.raise_for_status() ### Helpers def __stringify(self, adi_log) -> Logbook: - #qrz_output = html.unescape(adi_log) - #start_of_log, end_of_log = qrz_output.index('ADIF=') + 5, + # qrz_output = html.unescape(adi_log) + # start_of_log, end_of_log = qrz_output.index('ADIF=') + 5, # qrz_output.rindex('\n\n') + 4 - log_adi = "" + adi_log #adif_io expects a header, so we're giving it an end of header + log_adi = ( + "" + adi_log + ) # adif_io expects a header, so we're giving it an end of header return Logbook(self.key, log_adi) + class QRZXMLClient: """A wrapper for the QRZ XML interface. This functionality requires being logged in and maintaining a session. """ - def __init__(self, username:str=None, password:str=None, agent:str=None, - timeout:int=15): + def __init__( + self, + username: str = None, + password: str = None, + agent: str = None, + timeout: int = 15, + ): """Creates a QRZXMLClient object. Todo: Change this to use a session key instead of username/password. @@ -227,15 +233,15 @@ def __init__(self, username:str=None, password:str=None, agent:str=None, """ self.username = username self.password = password - self.agent = agent if agent is not None else 'pyQSP/' + __version__ + self.agent = agent if agent is not None else "pyQSP/" + __version__ self.session_key = None self.timeout = timeout self.base_url = "https://xmldata.qrz.com/xml/1.34/" self.headers = { - 'User-Agent': self.agent, - 'Accept-Encoding': 'gzip, deflate', - 'Accept': '*/*', - 'Connection': 'keep-alive' + "User-Agent": self.agent, + "Accept-Encoding": "gzip, deflate", + "Accept": "*/*", + "Connection": "keep-alive", } self._initiate_session() @@ -243,12 +249,15 @@ def __init__(self, username:str=None, password:str=None, agent:str=None, def _initiate_session(self): """Helper -- Grab us a session key so we're not throwing around\ passwords""" - params = {'username': self.username, - 'password': self.password, - 'agent': self.agent} + params = { + "username": self.username, + "password": self.password, + "agent": self.agent, + } - response = requests.get(self.base_url, params=params, - headers=self.headers, timeout=self.timeout) + response = requests.get( + self.base_url, params=params, headers=self.headers, timeout=self.timeout + ) xml_dict = xmltodict.parse(response.text) key = xml_dict["QRZDatabase"]["Session"].get("Key") if not key: @@ -257,17 +266,16 @@ def _initiate_session(self): self.session_key = key def _verify_session(self): - """ Helper -- Verify our session key is still valid.""" - params = {'agent': self.agent, - 's': self.session_key} + """Helper -- Verify our session key is still valid.""" + params = {"agent": self.agent, "s": self.session_key} - response = requests.get(self.base_url, params=params, - headers=self.headers, timeout=self.timeout) + response = requests.get( + self.base_url, params=params, headers=self.headers, timeout=self.timeout + ) if not xmltodict.parse(response.text)["QRZDatabase"]["Session"].get("Key"): raise QRZInvalidSession() - - def lookup_callsign(self, callsign:str) -> OrderedDict[str, Any]: + def lookup_callsign(self, callsign: str) -> OrderedDict[str, Any]: """Looks up a callsign in the QRZ database. Args: @@ -281,15 +289,12 @@ def lookup_callsign(self, callsign:str) -> OrderedDict[str, Any]: OrderedDict[str, Any]: Data on the callsign looked up, organized as a dict with each returned field as a key. """ - params = { - 'agent': self.agent, - 's': self.session_key, - 'callsign': callsign - } + params = {"agent": self.agent, "s": self.session_key, "callsign": callsign} num_retries = 0 while num_retries < MAX_NUM_RETRIES: - response = requests.get(self.base_url, params=params, - headers=self.headers, timeout=self.timeout) + response = requests.get( + self.base_url, params=params, headers=self.headers, timeout=self.timeout + ) if response.status_code == requests.codes.ok: parsed_response = xmltodict.parse(response.text) if not parsed_response.get("Key"): @@ -299,11 +304,14 @@ def lookup_callsign(self, callsign:str) -> OrderedDict[str, Any]: return parsed_response else: raise response.raise_for_status() - #if we didn't manage to return from a logged in session, raise an error - raise QRZInvalidSession(**{'message':parsed_response['ERROR']} \ - if parsed_response.get('ERROR') else {}) - - def lookup_dxcc(self, dxcc:str) -> OrderedDict[str, Any]: + # if we didn't manage to return from a logged in session, raise an error + raise QRZInvalidSession( + **{"message": parsed_response["ERROR"]} + if parsed_response.get("ERROR") + else {} + ) + + def lookup_dxcc(self, dxcc: str) -> OrderedDict[str, Any]: """Looks up a DXCC by prefix or DXCC number. Args: @@ -319,16 +327,13 @@ def lookup_dxcc(self, dxcc:str) -> OrderedDict[str, Any]: DXCC, CC, name, continent, ituzone, cqzone, timezone, lat,\ lon, & notes """ - #return self.__lookup_dxcc(dxcc, 0) - params = { - 'agent': self.agent, - 's': self.session_key, - 'dxcc': dxcc - } + # return self.__lookup_dxcc(dxcc, 0) + params = {"agent": self.agent, "s": self.session_key, "dxcc": dxcc} num_retries = 0 while num_retries < MAX_NUM_RETRIES: - response = requests.get(self.base_url, params=params, - headers=self.headers, timeout=self.timeout) + response = requests.get( + self.base_url, params=params, headers=self.headers, timeout=self.timeout + ) if response.status_code == requests.codes.ok: parsed_response = xmltodict.parse(response.text) if not parsed_response.get("Key"): @@ -338,8 +343,12 @@ def lookup_dxcc(self, dxcc:str) -> OrderedDict[str, Any]: return parsed_response else: raise response.raise_for_status() - #if we didn't manage to return from a logged in session, raise an error - raise QRZInvalidSession(**{'message':parsed_response['ERROR']} \ - if parsed_response.get('ERROR') else {}) + # if we didn't manage to return from a logged in session, raise an error + raise QRZInvalidSession( + **{"message": parsed_response["ERROR"]} + if parsed_response.get("ERROR") + else {} + ) + -#endregion +# endregion diff --git a/src/qspylib/test_pytest.py b/src/qspylib/test_pytest.py index 5ba4ad5..1cdc721 100644 --- a/src/qspylib/test_pytest.py +++ b/src/qspylib/test_pytest.py @@ -10,29 +10,46 @@ import qspylib.logbook from qspylib import eqsl from qspylib import lotw -#from qspylib import qrz +# from qspylib import qrz + ################# # logbook tests # ################# def test_equality_of_qso(): """Tests equality of qspylib.logbook.QSOs functions as expected""" - adif_qso = adif_io.QSO({'CALL': 'W1AW', 'BAND': '20m', 'MODE': 'SSB', - 'QSO_DATE': '20220101', 'TIME_ON': '0000', - 'QSL_RCVD': 'N'}) - qso1 = qspylib.logbook.QSO('W1AW', '20m', 'SSB', '20220101', '0000', 'N') + adif_qso = adif_io.QSO( + { + "CALL": "W1AW", + "BAND": "20m", + "MODE": "SSB", + "QSO_DATE": "20220101", + "TIME_ON": "0000", + "QSL_RCVD": "N", + } + ) + qso1 = qspylib.logbook.QSO("W1AW", "20m", "SSB", "20220101", "0000", "N") qso2 = qspylib.logbook.qso_from_adi(adif_qso) assert qso1 == qso2 + def test_inequality_of_qso(): """Tests inequality of qspylib.logbook.QSOs functions as expected""" - adif_qso = adif_io.QSO({'CALL': 'W1AW/4', 'BAND': '20m', 'MODE': 'SSB', - 'QSO_DATE': '20220101', 'TIME_ON': '0000', - 'QSL_RCVD': 'N'}) - qso1 = qspylib.logbook.QSO('W1AW', '20m', 'SSB', '20220101', '0000', 'N') + adif_qso = adif_io.QSO( + { + "CALL": "W1AW/4", + "BAND": "20m", + "MODE": "SSB", + "QSO_DATE": "20220101", + "TIME_ON": "0000", + "QSL_RCVD": "N", + } + ) + qso1 = qspylib.logbook.QSO("W1AW", "20m", "SSB", "20220101", "0000", "N") qso2 = qspylib.logbook.qso_from_adi(adif_qso) assert qso1 != qso2 + def test_generating_a_logbook(): """Test generating a qspylib.logbook.Logbook""" adif_string = "a header\ @@ -50,6 +67,7 @@ def test_generating_a_logbook(): log = qspylib.logbook.Logbook("TE5T", adif_string) assert isinstance(log, qspylib.logbook.Logbook) + def test_logbook_attributes_match(): """Confirm that the Logbook's stored QSO matches what we read from text""" adif_string = "a header\ @@ -67,6 +85,7 @@ def test_logbook_attributes_match(): log = qspylib.logbook.Logbook("TE5T", adif_string) assert log.log[0] == qspylib.logbook.qso_from_adi(log.adi[0]) + def test_adding_and_removing(): """Test adding and removing a QSO""" adif_string = "a header\ @@ -82,85 +101,112 @@ def test_adding_and_removing(): 20240102\ " log = qspylib.logbook.Logbook("TE5T", adif_string) - new_adif_qso = adif_io.QSO({'CALL': 'W1AW/5', 'BAND': '20m', 'MODE': 'SSB', - 'QSO_DATE': '20220101', 'TIME_ON': '0000', - 'QSL_RCVD': 'N'}) + new_adif_qso = adif_io.QSO( + { + "CALL": "W1AW/5", + "BAND": "20m", + "MODE": "SSB", + "QSO_DATE": "20220101", + "TIME_ON": "0000", + "QSL_RCVD": "N", + } + ) log.write_qso(new_adif_qso) log.discard_qso(log.adi[0]) - assert len(log.log) == 1 and len(log.adi) == 1 and \ - log.adi[0]['CALL'] == 'W1AW/5' and log.log[0].their_call == 'W1AW/5' + assert ( + len(log.log) == 1 + and len(log.adi) == 1 + and log.adi[0]["CALL"] == "W1AW/5" + and log.log[0].their_call == "W1AW/5" + ) + ############## # lotw tests # ############## + def test_pull_a_call_from_last_upload(): """Test pulling a known call from the last upload""" - last_uploads = lotw.get_last_upload() - assert 'W1AW' in last_uploads + last_uploads = lotw.LOTWClient.get_last_upload() + assert "W1AW" in last_uploads + def test_bad_login_fetch(): """Test fetching a logbook with a bad login""" with pytest.raises(lotw.RetrievalFailure): - lotw_obj = lotw.LOTWClient('**notavalidcall**', '**notarealpassword**') + lotw_obj = lotw.LOTWClient("**notavalidcall**", "**notarealpassword**") lotw_obj.fetch_logbook() + def test_bad_login_dxcc(): """Test fetching DXCC credits with a bad login""" with pytest.raises(lotw.RetrievalFailure): - lotw_obj = lotw.LOTWClient('**notavalidcall**', '**notarealpassword**') + lotw_obj = lotw.LOTWClient("**notavalidcall**", "**notarealpassword**") lotw_obj.get_dxcc_credit() + ############### # eqsl tests # ############### + def test_verify_a_bad_eqsl(): """Test verifying a known bad eqsl""" - is_qsl_real, result = eqsl.verify_eqsl('N5UP', 'TEST', '160m', 'SSB', \ - '01/01/2000') - assert 'Error - Result: QSO not on file' in result and is_qsl_real is False + is_qsl_real, result = eqsl.eQSLClient.verify_eqsl( + "N5UP", "TEST", "160m", "SSB", "01/01/2000" + ) + assert "Error - Result: QSO not on file" in result and is_qsl_real is False + def test_verify_a_good_eqsl(): """Test verifying a known good eqsl""" - is_qsl_real, result = eqsl.verify_eqsl('ai5zk', 'w1tjl', '10m', 'SSB', \ - '01/20/2024') - assert 'Result - QSO on file' in result and is_qsl_real is True + is_qsl_real, result = eqsl.eQSLClient.verify_eqsl( + "ai5zk", "w1tjl", "10m", "SSB", "01/20/2024" + ) + assert "Result - QSO on file" in result and is_qsl_real is True + def test_pull_a_known_ag_call(): """Test grabbing a call we know should be in the AG list""" - callsigns = eqsl.get_ag_list() - assert 'W1AW' in callsigns[0] + callsigns = eqsl.eQSLClient.get_ag_list() + assert "W1AW" in callsigns[0] + def test_pull_a_known_nonag_call(): """Test grabbing a call we know shouldn't be in the AG list""" - callsigns = eqsl.get_ag_list() - assert 'WE3BS' not in callsigns[0] + callsigns = eqsl.eQSLClient.get_ag_list() + assert "WE3BS" not in callsigns[0] + def test_pull_a_call_from_ag_dated(): """Test grabbing a call from the AG dated list""" - callsigns = eqsl.get_ag_list_dated() - assert callsigns[0].get('W1AW') >= '0000-00-00' + callsigns = eqsl.eQSLClient.get_ag_list_dated() + assert callsigns[0].get("W1AW") >= "0000-00-00" + def test_pull_a_known_call_from_total_members(): """Test grabbing a call that should be in the list of total members""" - all_users = eqsl.get_full_member_list() - assert all_users.get('W1AW') + all_users = eqsl.eQSLClient.get_full_member_list() + assert all_users.get("W1AW") + def test_pull_a_missing_call_from_total_members(): """Test grabbing a call that should be missing from the list of total members""" - all_users = eqsl.get_full_member_list() - assert not all_users.get('WE3BS') + all_users = eqsl.eQSLClient.get_full_member_list() + assert not all_users.get("WE3BS") + def test_get_user_data(): """Test getting the data of a user, and verify it's what we expect""" - user = eqsl.get_users_data('W1AW') - assert user[0] == 'FN31pr' and user[1] == 'Y' and not user[2] + user = eqsl.eQSLClient.get_users_data("W1AW") + assert user[0] == "FN31pr" and user[1] == "Y" and not user[2] + ############# # qrz tests # ############# -#def test_qrz_xml_with_invalid_key(): +# def test_qrz_xml_with_invalid_key(): # log_obj = qrz.QRZLogbookAPI('aaaaaaaaaaaaa') # log = log_obj.fetch_logbook()