diff --git a/.env.example b/.env.example index 4331b68..b3deee0 100644 --- a/.env.example +++ b/.env.example @@ -33,3 +33,7 @@ OPENAI_API_KEY="" # Synthesia API Key SYNTHESIA_API_KEY="" + + +# Exa.AI API Key +EXA_API_KEY="" \ No newline at end of file diff --git a/check_solana_address.py b/check_solana_address.py new file mode 100644 index 0000000..419a1d3 --- /dev/null +++ b/check_solana_address.py @@ -0,0 +1,18 @@ +from swarms_tools.finance.check_solana_address import ( + check_solana_balance, + check_multiple_wallets, +) + +print( + check_solana_balance( + "7MaX4muAn8ZQREJxnupm8sgokwFHujgrGfH9Qn81BuEV" + ) +) +print( + check_multiple_wallets( + [ + "7MaX4muAn8ZQREJxnupm8sgokwFHujgrGfH9Qn81BuEV", + "7MaX4muAn8ZQREJxnupm8sgokwFHujgrGfH9Qn81BuEV", + ] + ) +) diff --git a/mcs_agent.py b/examples/mcs_agent.py similarity index 100% rename from mcs_agent.py rename to examples/mcs_agent.py diff --git a/examples/mcs_auto_reply.py b/examples/mcs_auto_reply.py new file mode 100644 index 0000000..a4e7935 --- /dev/null +++ b/examples/mcs_auto_reply.py @@ -0,0 +1,74 @@ +import os + +from swarm_models import OpenAIChat +from swarms import Agent +from dotenv import load_dotenv +from swarms_tools.social_media.twitter_tool import ( + reply_to_mentions_with_agent, +) + +load_dotenv() + +model_name = "gpt-4o" + +model = OpenAIChat( + model_name=model_name, + max_tokens=3000, + openai_api_key=os.getenv("OPENAI_API_KEY"), +) + + +medical_coder = Agent( + agent_name="Medical Coder", + system_prompt=""" + You are a highly experienced and certified medical coder with extensive knowledge of ICD-10 coding guidelines, clinical documentation standards, and compliance regulations. Your responsibility is to ensure precise, compliant, and well-documented coding for all clinical cases. + + ### Primary Responsibilities: + 1. **Review Clinical Documentation**: Analyze all available clinical records, including specialist inputs, physician notes, lab results, imaging reports, and discharge summaries. + 2. **Assign Accurate ICD-10 Codes**: Identify and assign appropriate codes for primary diagnoses, secondary conditions, symptoms, and complications. + 3. **Ensure Coding Compliance**: Follow the latest ICD-10-CM/PCS coding guidelines, payer-specific requirements, and organizational policies. + 4. **Document Code Justification**: Provide clear, evidence-based rationale for each assigned code. + + ### Detailed Coding Process: + - **Review Specialist Inputs**: Examine all relevant documentation to capture the full scope of the patient's condition and care provided. + - **Identify Diagnoses**: Determine the primary and secondary diagnoses, as well as any symptoms or complications, based on the documentation. + - **Assign ICD-10 Codes**: Select the most accurate and specific ICD-10 codes for each identified diagnosis or condition. + - **Document Supporting Evidence**: Record the documentation source (e.g., lab report, imaging, or physician note) for each code to justify its assignment. + - **Address Queries**: Note and flag any inconsistencies, missing information, or areas requiring clarification from providers. + + ### Output Requirements: + Your response must be clear, structured, and compliant with professional standards. Use the following format: + + 1. **Primary Diagnosis Codes**: + - **ICD-10 Code**: [e.g., E11.9] + - **Description**: [e.g., Type 2 diabetes mellitus without complications] + - **Supporting Documentation**: [e.g., Physician's note dated MM/DD/YYYY] + + 2. **Secondary Diagnosis Codes**: + - **ICD-10 Code**: [Code] + - **Description**: [Description] + - **Order of Clinical Significance**: [Rank or priority] + + 3. **Symptom Codes**: + - **ICD-10 Code**: [Code] + - **Description**: [Description] + + 4. **Complication Codes**: + - **ICD-10 Code**: [Code] + - **Description**: [Description] + - **Relevant Documentation**: [Source of information] + + 5. **Coding Notes**: + - Observations, clarifications, or any potential issues requiring provider input. + + ### Additional Guidelines: + - Always prioritize specificity and compliance when assigning codes. + - For ambiguous cases, provide a brief note with reasoning and flag for clarification. + - Ensure the output format is clean, consistent, and ready for professional use. + """, + llm=model, + max_loops=1, + dynamic_temperature_enabled=True, +) + +print(reply_to_mentions_with_agent(medical_coder)) diff --git a/pyproject.toml b/pyproject.toml index 9ab9773..bf98ddb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms-tools" -version = "0.1.3" +version = "0.1.5" description = "Paper - Pytorch" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms_tools/finance/__init__.py b/swarms_tools/finance/__init__.py index fd81488..3518dd9 100644 --- a/swarms_tools/finance/__init__.py +++ b/swarms_tools/finance/__init__.py @@ -21,6 +21,10 @@ fetch_solana_token_pairs, ) from swarms_tools.finance.macro_tool import fetch_macro_financial_data +from swarms_tools.finance.check_solana_address import ( + check_solana_balance, + check_multiple_wallets, +) __all__ = [ "fetch_stock_news", @@ -38,4 +42,6 @@ "fetch_latest_token_boosts", "fetch_solana_token_pairs", "fetch_macro_financial_data", + "check_solana_balance", + "check_multiple_wallets", ] diff --git a/swarms_tools/finance/check_solana_address.py b/swarms_tools/finance/check_solana_address.py new file mode 100644 index 0000000..0eb0d36 --- /dev/null +++ b/swarms_tools/finance/check_solana_address.py @@ -0,0 +1,142 @@ +import requests +import os + + +class SolanaWalletBalanceChecker: + def __init__( + self, + api_key: str = os.getenv("HELIUS_API_KEY"), + base_url: str = "https://api.helius.xyz/v0/addresses/", + ): + """ + Initializes the Solana wallet balance checker using Hélius API. + + Args: + api_key (str): Your Hélius API key. + base_url (str): The base URL for the Hélius API. + """ + self.api_key = api_key + self.base_url = base_url + self.token_mapping = self.load_token_list() + + def load_token_list(self) -> dict: + """ + Loads the Solana token list to map mint addresses to token names. + + Returns: + dict: A dictionary mapping mint addresses to token names. + """ + url = "https://raw.githubusercontent.com/solana-labs/token-list/main/src/tokens/solana.tokenlist.json" + try: + response = requests.get(url) + response.raise_for_status() + token_list = response.json()["tokens"] + return { + token["address"]: token["symbol"] + for token in token_list + } + except requests.exceptions.RequestException as e: + print(f"Error fetching token list: {e}") + return {} + + def get_wallet_balances(self, wallet_address: str) -> dict: + """ + Fetches the SOL and SPL token balances for the given wallet address. + + Args: + wallet_address (str): The public key of the wallet. + + Returns: + dict: A dictionary containing SOL and SPL token balances. + """ + url = f"{self.base_url}{wallet_address}/balances?api-key={self.api_key}" + try: + response = requests.get(url) + response.raise_for_status() # Ensure the request was successful + return response.json() + except requests.exceptions.RequestException as e: + print(f"Error fetching wallet balances: {e}") + return None + + def display_balances(self, wallet_address: str) -> None: + """ + Fetches and displays the SOL and SPL token balances with token names. + + Args: + wallet_address (str): The public key of the wallet. + """ + print(f"Fetching balances for wallet: {wallet_address}") + balances_data = self.get_wallet_balances(wallet_address) + + if not balances_data: + print("No balance data found or API request failed.") + return + + # Display SOL balance + sol_balance = ( + balances_data.get("nativeBalance", 0) / 1e9 + ) # Convert lamports to SOL + print(f"SOL: {sol_balance}") + + # Display SPL token balances + tokens = balances_data.get("tokens", []) + if not tokens: + print("No SPL tokens found.") + else: + print("SPL Tokens:") + for token in tokens: + mint = token.get("mint") + amount = token.get("amount", 0) + decimals = token.get("decimals", 0) + balance = amount / (10**decimals) + token_name = self.token_mapping.get( + mint, "Unknown Token" + ) + print(f" {token_name} ({mint}): {balance}") + + +def check_solana_balance(wallet_address: str) -> str: + """ + Checks and returns the SOL and SPL token balances for a given Solana wallet address. + + Args: + wallet_address (str): The public key of the Solana wallet. + + Returns: + str: A string representation of the SOL and SPL token balances. + + Raises: + ValueError: If the wallet_address is not a string. + TypeError: If the wallet_address is not a valid Solana wallet address. + """ + try: + checker = SolanaWalletBalanceChecker( + api_key=os.getenv("HELIUS_API_KEY") + ) + balance_info = checker.display_balances(wallet_address) + return str(balance_info) + except Exception as e: + raise TypeError( + f"Invalid wallet_address: {wallet_address}. Error: {e}" + ) + + +def check_multiple_wallets(wallet_addresses: list[str]) -> str: + """ + Checks and returns the SOL and SPL token balances for multiple Solana wallet addresses. + + Args: + wallet_addresses (list[str]): A list of public keys of the Solana wallets. + + Returns: + list[str]: A list of string representations of the SOL and SPL token balances for each wallet address. + + Raises: + ValueError: If any wallet_address in the list is not a string. + TypeError: If any wallet_address in the list is not a valid Solana wallet address. + """ + out = [ + check_solana_balance(wallet_address) + for wallet_address in wallet_addresses + ] + return str(out) diff --git a/swarms_tools/finance/coingecko_tool.py b/swarms_tools/finance/coingecko_tool.py index 726dcc7..edabe5d 100644 --- a/swarms_tools/finance/coingecko_tool.py +++ b/swarms_tools/finance/coingecko_tool.py @@ -1,6 +1,8 @@ -from typing import Dict, Any +from typing import Any, Dict + import requests from loguru import logger + from swarms_tools.utils.formatted_string import ( format_object_to_string, ) diff --git a/swarms_tools/finance/uniswap_tool.py b/swarms_tools/finance/uniswap_tool.py new file mode 100644 index 0000000..23309b7 --- /dev/null +++ b/swarms_tools/finance/uniswap_tool.py @@ -0,0 +1,346 @@ +import subprocess +from typing import Any, Dict, List, Optional + +try: + from web3 import Web3 +except ImportError: + subprocess.run(["pip", "install", "web3"]) + from web3 import Web3 + +from loguru import logger +import requests + +RPC_URL = "https://mainnet.infura.io/v3/YOUR_INFURA_PROJECT_ID" +UNISWAP_SUBGRAPH_URL = ( + "https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v2" +) + + +class UniswapDataFetcher: + def __init__(self, rpc_url: str, uniswap_subgraph_url: str): + """ + Initialize the UniswapDataFetcher. + + Args: + rpc_url (str): The RPC URL of the Ethereum node. + uniswap_subgraph_url (str): The URL of the Uniswap subgraph API. + """ + self.web3 = Web3(Web3.HTTPProvider(rpc_url)) + self.subgraph_url = uniswap_subgraph_url + + if not self.web3.isConnected(): + logger.error("Failed to connect to Ethereum node.") + raise ConnectionError( + "Unable to connect to Ethereum node." + ) + + logger.info("Connected to Ethereum node.") + + def fetch_pair_data( + self, token0: str, token1: str + ) -> Optional[Dict[str, Any]]: + """ + Fetch pair data from the Uniswap subgraph. + + Args: + token0 (str): The address of the first token in the pair. + token1 (str): The address of the second token in the pair. + + Returns: + Optional[Dict[str, Any]]: Pair data if found, otherwise None. + """ + query = { + "query": """ + query ($token0: String!, $token1: String!) { + pairs(where: {token0: $token0, token1: $token1}) { + id + reserve0 + reserve1 + totalSupply + volumeToken0 + volumeToken1 + } + } + """, + "variables": { + "token0": token0.lower(), + "token1": token1.lower(), + }, + } + + logger.info( + "Fetching pair data for token pair: {} - {}", + token0, + token1, + ) + response = requests.post(self.subgraph_url, json=query) + + if response.status_code != 200: + logger.error( + "Failed to fetch pair data: {}", response.text + ) + return None + + data = response.json() + return data.get("data", {}).get("pairs", [None])[0] + + def fetch_token_data( + self, token_address: str + ) -> Optional[Dict[str, Any]]: + """ + Fetch token data from the Uniswap subgraph. + + Args: + token_address (str): The address of the token. + + Returns: + Optional[Dict[str, Any]]: Token data if found, otherwise None. + """ + query = { + "query": """ + query ($address: String!) { + token(id: $address) { + id + symbol + name + decimals + totalSupply + } + } + """, + "variables": {"address": token_address.lower()}, + } + + logger.info( + "Fetching token data for address: {}", token_address + ) + response = requests.post(self.subgraph_url, json=query) + + if response.status_code != 200: + logger.error( + "Failed to fetch token data: {}", response.text + ) + return None + + data = response.json() + return data.get("data", {}).get("token") + + def fetch_pool_volume(self, pool_address: str) -> Optional[float]: + """ + Fetch the 24-hour volume of a specific pool. + + Args: + pool_address (str): The address of the pool. + + Returns: + Optional[float]: The 24-hour volume of the pool if available, otherwise None. + """ + query = { + "query": """ + query ($address: String!) { + pair(id: $address) { + volumeUSD + } + } + """, + "variables": {"address": pool_address.lower()}, + } + + logger.info( + "Fetching pool volume for address: {}", pool_address + ) + response = requests.post(self.subgraph_url, json=query) + + if response.status_code != 200: + logger.error( + "Failed to fetch pool volume: {}", response.text + ) + return None + + data = response.json() + volume = data.get("data", {}).get("pair", {}).get("volumeUSD") + return float(volume) if volume else None + + def fetch_liquidity_positions( + self, user_address: str + ) -> Optional[List[Dict[str, Any]]]: + """ + Fetch liquidity positions of a user. + + Args: + user_address (str): The address of the user. + + Returns: + Optional[List[Dict[str, Any]]]: A list of liquidity positions if available, otherwise None. + """ + query = { + "query": """ + query ($address: String!) { + user(id: $address) { + liquidityPositions { + id + pair { + id + token0 { + symbol + } + token1 { + symbol + } + } + liquidityTokenBalance + } + } + } + """, + "variables": {"address": user_address.lower()}, + } + + logger.info( + "Fetching liquidity positions for user: {}", user_address + ) + response = requests.post(self.subgraph_url, json=query) + + if response.status_code != 200: + logger.error( + "Failed to fetch liquidity positions: {}", + response.text, + ) + return None + + data = response.json() + user_data = data.get("data", {}).get("user") + return ( + user_data.get("liquidityPositions") if user_data else None + ) + + +def fetch_token_data(token: str) -> Optional[Dict[str, Any]]: + """ + Fetches token data from the Uniswap subgraph. + + Args: + token (str): The address of the token. + + Returns: + Optional[Dict[str, Any]]: Token data if found, otherwise None. + """ + try: + fetcher = UniswapDataFetcher(RPC_URL, UNISWAP_SUBGRAPH_URL) + token_data = fetcher.fetch_token_data(token) + return token_data + except Exception as e: + logger.error("Failed to fetch token data: {}", e) + return None + + +def fetch_pair_data( + token0: str, token1: str +) -> Optional[Dict[str, Any]]: + """ + Fetches pair data from the Uniswap subgraph. + + Args: + token0 (str): The address of the first token in the pair. + token1 (str): The address of the second token in the pair. + + Returns: + Optional[Dict[str, Any]]: Pair data if found, otherwise None. + """ + try: + fetcher = UniswapDataFetcher(RPC_URL, UNISWAP_SUBGRAPH_URL) + pair_data = fetcher.fetch_pair_data(token0, token1) + return pair_data + except Exception as e: + logger.error("Failed to fetch pair data: {}", e) + return None + + +def fetch_pool_volume(pool_address: str) -> Optional[float]: + """ + Fetches the volume of a pool from the Uniswap subgraph. + + Args: + pool_address (str): The address of the pool. + + Returns: + Optional[float]: The volume of the pool if found, otherwise None. + """ + try: + fetcher = UniswapDataFetcher(RPC_URL, UNISWAP_SUBGRAPH_URL) + pool_volume = fetcher.fetch_pool_volume(pool_address) + return pool_volume + except Exception as e: + logger.error("Failed to fetch pool volume: {}", e) + return None + + +def fetch_liquidity_positions( + user_address: str, +) -> Optional[List[Dict[str, Any]]]: + """ + Fetches the liquidity positions of a user from the Uniswap subgraph. + + Args: + user_address (str): The address of the user. + + Returns: + Optional[List[Dict[str, Any]]]: A list of liquidity positions if found, otherwise None. + """ + try: + fetcher = UniswapDataFetcher(RPC_URL, UNISWAP_SUBGRAPH_URL) + liquidity_positions = fetcher.fetch_liquidity_positions( + user_address + ) + return liquidity_positions + except Exception as e: + logger.error("Failed to fetch liquidity positions: {}", e) + return None + + +def fetch_all_uniswap_data( + token: str, pair: str, pool_address: str +) -> str: + """ + Fetches all data for a given token, pair, and pool address. + + Args: + token (str): The address of the token. + pair (str): The pair of tokens. + pool_address (str): The address of the pool. + + Returns: + str: A formatted string containing the token data, pair data, and pool volume. + """ + try: + token_data = fetch_token_data(token) + pair_data = fetch_pair_data(pair[0], pair[1]) + pool_volume = fetch_pool_volume(pool_address) + formatted_data = f"Token Data: {token_data}\nPair Data: {pair_data}\nPool Volume: {pool_volume}" + return formatted_data + except Exception as e: + logger.error("Failed to fetch all data: {}", e) + return "Failed to fetch all data" + + +# # Example usage +# if __name__ == "__main__": +# RPC_URL = "https://mainnet.infura.io/v3/YOUR_INFURA_PROJECT_ID" +# UNISWAP_SUBGRAPH_URL = "https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v2" + +# fetcher = UniswapDataFetcher(RPC_URL, UNISWAP_SUBGRAPH_URL) + +# token_data = fetcher.fetch_token_data("0xdAC17F958D2ee523a2206206994597C13D831ec7") # Example for USDT +# logger.info("Token Data: {}", token_data) + +# pair_data = fetcher.fetch_pair_data( +# "0xdAC17F958D2ee523a2206206994597C13D831ec7", +# "0xC02aaA39b223FE8D0A0E5C4F27eAD9083C756Cc2" +# ) # Example for USDT-WETH pair +# logger.info("Pair Data: {}", pair_data) + +# pool_volume = fetcher.fetch_pool_volume("0xB4e16d0168e52d35CaCD2c6185b44281Ec28C9Dc") # Example for USDT-WETH pool +# logger.info("Pool Volume: {}", pool_volume) + +# liquidity_positions = fetcher.fetch_liquidity_positions("0xYourWalletAddress") +# logger.info("Liquidity Positions: {}", liquidity_positions) diff --git a/swarms_tools/search/__init__.py b/swarms_tools/search/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/swarms_tools/search/exa_tool.py b/swarms_tools/search/exa_tool.py new file mode 100644 index 0000000..72bdf86 --- /dev/null +++ b/swarms_tools/search/exa_tool.py @@ -0,0 +1,45 @@ +import os +from typing import List +import requests + + +def search_exa_ai( + query: str = "Latest developments in LLM capabilities", + num_results: int = 10, + auto_prompt: bool = True, + include_domains: List[str] = ["arxiv.org", "paperswithcode.com"], + exclude_domains: List[str] = [], + category: str = "research paper", +) -> str: + url = "https://api.exa.ai/search" + payload = { + "query": query, + "useAutoprompt": auto_prompt, + "type": "auto", + "category": category, + "numResults": num_results, + "includeDomains": include_domains, + "excludeDomains": exclude_domains, + } + headers = { + "x-api-key": os.getenv("EXA_API_KEY"), + "Content-Type": "application/json", + } + + try: + response = requests.request( + "POST", url, json=payload, headers=headers + ) + response.raise_for_status() # Raises an HTTPError if the response status code is 4XX/5XX + return response.json() + except requests.exceptions.HTTPError as http_err: + print(f'HTTP error occurred: {http_err}') + return "Failed to retrieve results due to an HTTP error." + except Exception as err: + print(f'Other error occurred: {err}') + return "Failed to retrieve results due to an error." + + +# # Example usage +# query = "Latest developments in LLM capabilities" +# print(search_exa_ai(query)) diff --git a/swarms_tools/search/google_api.py b/swarms_tools/search/google_api.py new file mode 100644 index 0000000..8aac30d --- /dev/null +++ b/swarms_tools/search/google_api.py @@ -0,0 +1,426 @@ +from concurrent.futures import ThreadPoolExecutor, as_completed +import os +import time +from typing import Dict, List, Optional + + +try: + import asyncio +except ImportError: + print("asyncio not found. Installing...") + import subprocess + + subprocess.check_call(["pip", "install", "asyncio"]) + import asyncio + +try: + import aiohttp +except ImportError: + print("aiohttp not found. Installing...") + import subprocess + + subprocess.check_call(["pip", "install", "aiohttp"]) + import aiohttp + +try: + from bs4 import BeautifulSoup +except ImportError: + print("beautifulsoup4 not found. Installing...") + import subprocess + + subprocess.check_call(["pip", "install", "beautifulsoup4"]) + from bs4 import BeautifulSoup + +try: + import json +except ImportError: + print("json not found. Installing...") + import subprocess + + subprocess.check_call(["pip", "install", "json"]) + import json + + +try: + from dotenv import load_dotenv +except ImportError: + print("python-dotenv not found. Installing...") + import subprocess + + subprocess.check_call(["pip", "install", "python-dotenv"]) + from dotenv import load_dotenv + +try: + import google.generativeai as genai +except ImportError: + print("google-generativeai not found. Installing...") + import subprocess + + subprocess.check_call(["pip", "install", "google-generativeai"]) + import google.generativeai as genai + +try: + from rich.console import Console +except ImportError: + print("rich not found. Installing...") + import subprocess + + subprocess.check_call(["pip", "install", "rich"]) + from rich.console import Console + +try: + from rich.progress import ( + Progress, + SpinnerColumn, + TextColumn, + BarColumn, + TimeRemainingColumn, + ) +except ImportError: + print("rich not found. Installing...") + import subprocess + + subprocess.check_call(["pip", "install", "rich"]) + from rich.progress import ( + Progress, + SpinnerColumn, + TextColumn, + BarColumn, + TimeRemainingColumn, + ) + +try: + import html2text +except ImportError: + print("html2text not found. Installing...") + import subprocess + + subprocess.check_call(["pip", "install", "html2text"]) + import html2text + +try: + from playwright.async_api import async_api as sync_playwright +except ImportError: + print("playwright not found. Installing...") + import subprocess + + subprocess.check_call(["pip", "install", "playwright"]) + from playwright.async_api import async_api as sync_playwright + + +try: + from tenacity import retry, stop_after_attempt, wait_exponential +except ImportError: + print("tenacity not found. Installing...") + import subprocess + + subprocess.check_call(["pip", "install", "tenacity"]) + from tenacity import retry, stop_after_attempt, wait_exponential + +console = Console() +load_dotenv() + + +class WebsiteChecker: + def __init__(self, agent: callable): + self.google_api_key = os.getenv("GOOGLE_API_KEY") + self.google_cx = os.getenv("GOOGLE_CX") + self.gemini_api_key = os.getenv("GEMINI_API_KEY") + self.outputs_dir = "outputs" + self.agent = agent + os.makedirs(self.outputs_dir, exist_ok=True) + + # Initialize html2text + self.html_converter = html2text.HTML2Text() + self.html_converter.ignore_links = True + self.html_converter.ignore_images = True + self.html_converter.ignore_emphasis = True + + # Configure retry settings + self.max_retries = 3 + self.max_threads = 10 # Concurrent threads + self.timeout = 15 # Seconds + + async def fetch_search_results(self, query: str) -> List[Dict]: + """Fetch top 10 search results using Google Custom Search API""" + async with aiohttp.ClientSession() as session: + url = "https://www.googleapis.com/customsearch/v1" + params = { + "key": self.google_api_key, + "cx": self.google_cx, + "q": query, + "num": 10, # Fetch top 10 results + } + + try: + async with session.get( + url, params=params + ) as response: + if response.status == 200: + data = await response.json() + results = [] + for item in data.get("items", []): + if "link" in item and not any( + x in item["link"].lower() + for x in [".pdf", ".doc", ".docx"] + ): + results.append( + { + "title": item.get( + "title", "" + ), + "link": item["link"], + "snippet": item.get( + "snippet", "" + ), + } + ) + return results[ + :10 + ] # Ensure we only take top 10 + else: + console.print( + f"[red]Error: {response.status} - {await response.text()}[/red]" + ) + return [] + except Exception as e: + console.print( + f"[red]Error fetching search results: {str(e)}[/red]" + ) + return [] + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), + ) + def extract_content_with_retry(self, url: str) -> Optional[Dict]: + """Extract content from a URL with retry mechanism""" + try: + with sync_playwright() as p: + browser = p.chromium.launch(headless=True) + context = browser.new_context( + viewport={"width": 1920, "height": 1080}, + user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", + ) + + page = context.new_page() + page.set_default_timeout(25000) # 10 second timeout + + page.goto(url) + page.wait_for_load_state("networkidle", timeout=20000) + + # Extract content + content = page.content() + soup = BeautifulSoup(content, "lxml") + + # Clean up content + for element in soup.find_all( + [ + "script", + "style", + "nav", + "footer", + "header", + "aside", + ] + ): + element.decompose() + + # Get main content + main_content = ( + soup.find("main") + or soup.find("article") + or soup.find( + "div", {"class": ["content", "main"]} + ) + ) + if not main_content: + main_content = soup.find("body") + + # Convert to markdown-like text + clean_text = self.html_converter.handle( + str(main_content) + ) + + browser.close() + + return { + "url": url, + "title": ( + soup.title.string + if soup.title + else "No title" + ), + "content": clean_text.strip(), + } + + except Exception as e: + console.print( + f"[yellow]Warning: Failed to extract from {url}: {str(e)}[/yellow]" + ) + return None + + def process_url(self, url: str) -> Optional[Dict]: + """Process a single URL with progress tracking""" + try: + return self.extract_content_with_retry(url) + except Exception as e: + console.print( + f"[red]Failed to process {url}: {str(e)}[/red]" + ) + return None + + async def process_urls_concurrent( + self, urls: List[str] + ) -> List[Dict]: + """Process multiple URLs concurrently using ThreadPoolExecutor""" + successful_results = [] + + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TimeRemainingColumn(), + ) as progress: + task = progress.add_task( + "Processing websites...", total=len(urls) + ) + + with ThreadPoolExecutor( + max_workers=self.max_threads + ) as executor: + future_to_url = { + executor.submit(self.process_url, url): url + for url in urls + } + + for future in as_completed(future_to_url): + url = future_to_url[future] + try: + result = future.result() + if result: + successful_results.append(result) + except Exception as e: + console.print( + f"[red]Error processing {url}: {str(e)}[/red]" + ) + finally: + progress.advance(task) + + return successful_results + + async def summarize_with_gemini( + self, agent: callable, extracted_data: List[Dict], query: str + ) -> str: + """Generate summary using Gemini API""" + genai.configure(api_key=self.gemini_api_key) + + # Format content for summarization + formatted_content = "# Source Materials:\n\n" + for i, item in enumerate(extracted_data, 1): + formatted_content += f""" + ### Source {i}: {item['title']} + URL: {item['url']} + {item['content'][:2000]} # Limit content length per source + --- + """ + + prompt = f""" + Analyze and summarize the following content about: "{query}" + Create a detailed summary with these sections: + 1. Key Findings (2-3 paragraphs) + 2. Important Details (bullet points) + 3. Sources (numbered list) + Focus on accuracy, clarity, and completeness. + Present conflicting information if found. + Use proper markdown formatting. + Content to analyze: + {formatted_content} + """ + response = await asyncio.to_thread(lambda: agent.run(prompt)) + + return response + + async def search(self, query: str) -> str: + """Main search function with timing""" + start_time = time.time() + + console.print( + f"\n[bold cyan]Searching for: {query}[/bold cyan]\n" + ) + + # Fetch search results + search_results = await self.fetch_search_results(query) + if not search_results: + return "No search results found." + + # Extract URLs + urls = [result["link"] for result in search_results] + + # Process URLs concurrently + extracted_data = await self.process_urls_concurrent(urls) + + # Generate summary + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + ) as progress: + task = progress.add_task( + "[cyan]Generating summary...", total=None + ) + summary = await self.summarize_with_gemini( + agent=self.agent, + extracted_data=extracted_data, + query=query, + ) + progress.update(task, completed=True) + + # Save results + results = { + "query": query, + "search_results": search_results, + "extracted_data": extracted_data, + "summary": summary, + } + + with open( + os.path.join(self.outputs_dir, "search_results.json"), + "w", + encoding="utf-8", + ) as f: + json.dump(results, f, indent=2, ensure_ascii=False) + + end_time = time.time() + execution_time = end_time - start_time + + # Print results + console.print( + "\n[bold green]====== Search Summary ======[/bold green]\n" + ) + console.print(summary) + console.print( + "\n[bold green]========================[/bold green]" + ) + console.print( + f"\n[bold cyan]Execution time: {execution_time:.2f} seconds[/bold cyan]\n" + ) + + return summary + + +def search(query: str, agent: callable) -> str: + """Synchronous wrapper for the async search function""" + checker = WebsiteChecker(agent=agent) + return asyncio.run(checker.search(query)) + + +# search_tool_schema = functions_to_openai_tools([search]) +# # tools = functions_to_openai_tools([search, get_weather]) + +# # Print the generated schemas +# print(json.dumps(tools, indent=2)) +# if __name__ == "__main__": +# query = input("Enter your search query: ") +# result = search(query) + +# search("who won elections 2024 us") diff --git a/swarms_tools/social_media/twitter_tool.py b/swarms_tools/social_media/twitter_tool.py index 9f1c472..68fdffa 100644 --- a/swarms_tools/social_media/twitter_tool.py +++ b/swarms_tools/social_media/twitter_tool.py @@ -1,5 +1,6 @@ import os import subprocess +import time from typing import Any, Callable, Dict, List, Optional try: @@ -166,13 +167,107 @@ def _quote_tweet(self, tweet_id: int, quote: str) -> None: except tweepy.TweepyException as e: print(f"Failed to quote tweet {tweet_id}: {e}") + # Add to the TwitterPlugin class + + def _fetch_mentions(self) -> List[Dict[str, Any]]: + try: + # Fetch the authenticated user's ID + user = self.twitter_client.get_me() + user_id = user.data.id + + # Fetch mentions + response = self.twitter_client.get_users_mentions( + id=user_id, + tweet_fields=[ + "id", + "text", + "author_id", + "created_at", + ], + ) + + # Log rate limit information + headers = response.headers + print(f"Rate Limit: {headers.get('x-rate-limit-limit')}") + print( + f"Remaining: {headers.get('x-rate-limit-remaining')}" + ) + print(f"Reset Time: {headers.get('x-rate-limit-reset')}") + + if response.data: + print(f"Fetched {len(response.data)} mentions.") + return response.data + else: + print("No new mentions found.") + return [] + except tweepy.TooManyRequests as e: + print( + f"Rate limit exceeded: {e.response.headers.get('x-rate-limit-reset')} seconds until reset." + ) + time.sleep( + int(e.response.headers.get("x-rate-limit-reset", 60)) + ) + return [] + except tweepy.TweepyException as e: + print(f"Error fetching mentions: {e}") + return [] + + def reply_to_mentions_with_agent(self, agent: Any) -> None: + print("Starting real-time mention monitoring...") + replied_tweets = set() + + while True: + try: + mentions = self._fetch_mentions() + for mention in mentions: + tweet_id = mention["id"] + tweet_text = mention["text"] + + if tweet_id not in replied_tweets: + try: + response = agent(tweet_text) + self._reply_tweet(tweet_id, response) + print( + f"Replied to tweet {tweet_id}: {response}" + ) + replied_tweets.add(tweet_id) + except tweepy.TweepyException as e: + print( + f"Error replying to tweet {tweet_id}: {e}" + ) + + # Dynamic backoff based on remaining requests + headers = self.twitter_client.last_response.headers + remaining = int( + headers.get("x-rate-limit-remaining", 0) + ) + reset_time = int( + headers.get("x-rate-limit-reset", 60) + ) + + if remaining <= 5: # If close to the limit + sleep_time = reset_time - int(time.time()) + 1 + print( + f"Approaching rate limit. Sleeping for {sleep_time} seconds." + ) + time.sleep(max(sleep_time, 1)) + else: + time.sleep(60) # Default sleep time + + except tweepy.TooManyRequests: + print("Rate limit exceeded. Sleeping...") + time.sleep(300) # Fallback sleep for 5 minutes + def initialize_twitter_tool() -> TwitterTool: # Define your options with the necessary credentials - id = os.getenv("TWITTER_ID") - name = os.getenv("TWITTER_NAME") - description = os.getenv("TWITTER_DESCRIPTION") - + id = os.getenv("TWITTER_ID") or "twitter_plugin" + name = os.getenv("TWITTER_NAME") or "Twitter Plugin" + description = ( + os.getenv("TWITTER_DESCRIPTION") + or "A plugin that executes tasks within Twitter, capable of posting, replying, quoting, and liking tweets, and getting metrics." + ) + options = { "id": id, "name": name, @@ -283,3 +378,23 @@ def get_metrics() -> Dict[str, int]: except tweepy.TweepyException as e: print(f"Failed to fetch metrics: {e}") return {} + + +def reply_to_mentions_with_agent( + agent: Any, interval: int = 10 +) -> None: + """ + Replies to mentions on Twitter using the provided agent and interval. + + Args: + agent (Any): The agent to use for replying to mentions. + interval (int, optional): The time interval in seconds between replies. Defaults to 10. + + Raises: + tweepy.TweepyException: If there's an error replying to mentions. + """ + try: + twitter_plugin = initialize_twitter_tool() + twitter_plugin.reply_to_mentions_with_agent(agent) + except tweepy.TweepyException as e: + print(f"Failed to reply to mentions: {e}")