Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Repo filtering #48

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions xgitguard/common/github_calls.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, base_url, token_env, commits_api_url, throttle_time=2):
self._commits_api_url = commits_api_url
self._throttle_time = throttle_time

def run_github_search(self, search_query, extension, org=[], repo=[]):
def run_github_search(self, search_query, extension, org=[], repo=[], search_archived = True, search_forked = True):
"""
Run the GitHub API search with given search query
Get the items from the response content and Return
Expand Down Expand Up @@ -73,20 +73,24 @@ def run_github_search(self, search_query, extension, org=[], repo=[]):

if not extension or extension == "others" or len(extension) == 0:
response = self.__github_api_get_params(
search_query, org_qualifiers, repo_qualifiers
search_query, org_qualifiers, repo_qualifiers, search_archived, search_forked
)
elif self._token_env == "public":

response = self.__github_api_get_params(
(search_query + " extension:" + extension),
org_qualifiers,
repo_qualifiers,
search_archived,
search_forked
)
else:
response = self.__github_api_get_params(
(search_query + " extension:" + extension),
org_qualifiers,
repo_qualifiers,
search_archived,
search_forked
)

if response:
Expand All @@ -95,7 +99,7 @@ def run_github_search(self, search_query, extension, org=[], repo=[]):
return []

def __github_api_get_params(
self, search_query, org_qualifiers="", repo_qualifiers=""
self, search_query, org_qualifiers="", repo_qualifiers="", search_archived = True, search_forked = True
):
"""
For the given GITHUB API url and search query, call the api
Expand Down Expand Up @@ -132,13 +136,17 @@ def __github_api_get_params(
elif len(repo_qualifiers) > 0:
additional_qualifiers = repo_qualifiers

archive_filter = "" if search_archived else "NOT is:archived"
forked_filter = "" if search_forked else "NOT is:fork"

search_response = []
if additional_qualifiers:
try:
q_string = f"{search_query} {additional_qualifiers} {archive_filter} {forked_filter}"
response = requests.get(
self._base_url,
params={
"q": f"{search_query} {additional_qualifiers}",
"q": q_string,
"order": "desc",
"sort": "indexed",
"per_page": 100,
Expand All @@ -149,10 +157,11 @@ def __github_api_get_params(
logger.error(f"Github API call Error: {e}")
else:
try:
q_string = f"{search_query} {archive_filter} {forked_filter}"
response = requests.get(
self._base_url,
params={
"q": f"{search_query}",
"q": q_string,
"order": "desc",
"sort": "indexed",
"per_page": 100,
Expand Down
42 changes: 40 additions & 2 deletions xgitguard/custom keyword search/enterprise_keyword_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def format_search_query_list(secondary_keywords):
return search_query_list


def run_detection(enterprise_keywords=[], org=[], repo=[]):
def run_detection(enterprise_keywords=[], org=[], repo=[], search_archived = True, search_forked = True):
"""
Run GitHub search
If a Enterprise keyword is provided, perform the search using the Enterprise keyword.
Expand Down Expand Up @@ -286,6 +286,8 @@ def run_detection(enterprise_keywords=[], org=[], repo=[]):
"",
org,
repo,
search_archived,
search_forked
)
# If search has detections, process the result urls else continue next search
if search_response_lines:
Expand Down Expand Up @@ -379,6 +381,28 @@ def arg_parser():
help="Pass the repo name list as comma separated string",
)

argparser.add_argument(
"-a",
"--archived",
metavar="Archived",
action="store",
type=str,
default="Yes",
choices=flag_choices,
help="Pass Yes or No to search for Archived repos. Default is Yes",
)

argparser.add_argument(
"-f",
"--forked",
metavar="Forked",
action="store",
type=str,
default="Yes",
choices=flag_choices,
help="Pass Yes or No to search for Forked repos. Default is Yes",
)

argparser.add_argument(
"-l",
"--log_level",
Expand Down Expand Up @@ -421,6 +445,16 @@ def arg_parser():
else:
repo = []

if args.archived.lower() in flag_choices[:5]:
search_archived = True
else:
search_archived = False

if args.forked.lower() in flag_choices[:5]:
search_forked = True
else:
search_forked = False

if args.log_level in log_level_choices:
log_level = args.log_level
else:
Expand All @@ -434,6 +468,8 @@ def arg_parser():
enterprise_keywords,
org,
repo,
search_archived,
search_forked,
log_level,
console_logging,
)
Expand All @@ -445,6 +481,8 @@ def arg_parser():
enterprise_keywords,
org,
repo,
search_archived,
search_forked,
log_level,
console_logging,
) = arg_parser()
Expand All @@ -470,5 +508,5 @@ def arg_parser():
)
sys.exit(1)

run_detection(enterprise_keywords, org, repo)
run_detection(enterprise_keywords, org, repo, search_archived, search_forked)
logger.info("xGitGuard Custom keyword search Process Completed")
38 changes: 36 additions & 2 deletions xgitguard/custom keyword search/public_keyword_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def process_search_results(search_response_lines, search_query):
return detection_writes_per_query, new_results_per_query, detections_per_query


def run_detection(public_keywords=[], org=[], repo=[]):
def run_detection(public_keywords=[], org=[], repo=[], search_archived = True, search_forked = True):
"""
Run GitHub search
If a primary keyword is provided, perform the search using the primary keyword.
Expand Down Expand Up @@ -264,6 +264,8 @@ def run_detection(public_keywords=[], org=[], repo=[]):
"",
org,
repo,
search_archived,
search_forked
)
# If search has detections, process the result urls else continue next search
if search_response_lines:
Expand Down Expand Up @@ -353,6 +355,26 @@ def arg_parser():
default="",
help="Pass the repo name list as comma separated string",
)
argparser.add_argument(
"-a",
"--archived",
metavar="Archived",
action="store",
type=str,
default="Yes",
choices=flag_choices,
help="Pass Yes or No to search for Archived repos. Default is Yes",
)
argparser.add_argument(
"-f",
"--forked",
metavar="Forked",
action="store",
type=str,
default="Yes",
choices=flag_choices,
help="Pass Yes or No to search for Forked repos. Default is Yes",
)
argparser.add_argument(
"-l",
"--log_level",
Expand Down Expand Up @@ -389,6 +411,14 @@ def arg_parser():
repo = []
else:
repo = []
if args.archived.lower() in flag_choices[:5]:
search_archived = True
else:
search_archived = False
if args.forked.lower() in flag_choices[:5]:
search_forked = True
else:
search_forked = False
if args.log_level in log_level_choices:
log_level = args.log_level
else:
Expand All @@ -401,6 +431,8 @@ def arg_parser():
public_keywords,
org,
repo,
search_archived,
search_forked,
log_level,
console_logging,
)
Expand All @@ -412,6 +444,8 @@ def arg_parser():
public_keywords,
org,
repo,
search_archived,
search_forked,
log_level,
console_logging,
) = arg_parser()
Expand All @@ -436,5 +470,5 @@ def arg_parser():
f"GitHub API Token Environment variable '{token_var}' not set. API Search will fail/return no results. Please Setup and retry"
)
sys.exit(1)
run_detection(public_keywords, org, repo)
run_detection(public_keywords, org, repo, search_archived, search_forked)
logger.info("xGitGuard custom keyword search Process Completed")
42 changes: 39 additions & 3 deletions xgitguard/github-enterprise/enterprise_cred_detections.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ def format_search_query_list(secondary_keywords):


def run_detection(
secondary_keywords=[], extensions=[], ml_prediction=False, org=[], repo=[]
secondary_keywords=[], extensions=[], ml_prediction=False, org=[], repo=[], search_archived = True, search_forked = True
):
"""
Run GitHub detections
Expand Down Expand Up @@ -646,7 +646,7 @@ def run_detection(
# Search GitHub and return search response confidence_score
total_processed_search += 1
search_response_lines = githubCalls.run_github_search(
search_query, extension, org, repo
search_query, extension, org, repo, search_archived, search_forked
)
# If search has detections, process the result urls else continue next search
if search_response_lines:
Expand Down Expand Up @@ -782,6 +782,28 @@ def arg_parser():
help="Pass the repo name list as comma separated string",
)

argparser.add_argument(
"-a",
"--archived",
metavar="Archived",
action="store",
type=str,
default="Yes",
choices=flag_choices,
help="Pass Yes or No to search for Archived repos. Default is Yes",
)

argparser.add_argument(
"-f",
"--forked",
metavar="Forked",
action="store",
type=str,
default="Yes",
choices=flag_choices,
help="Pass Yes or No to search for Forked repos. Default is Yes",
)

argparser.add_argument(
"-l",
"--log_level",
Expand Down Expand Up @@ -839,6 +861,16 @@ def arg_parser():
else:
repo = []

if args.archived.lower() in flag_choices[:5]:
search_archived = True
else:
search_archived = False

if args.forked.lower() in flag_choices[:5]:
search_forked = True
else:
search_forked = False

if args.log_level in log_level_choices:
log_level = args.log_level
else:
Expand All @@ -855,6 +887,8 @@ def arg_parser():
unmask_secret,
org,
repo,
search_archived,
search_forked,
log_level,
console_logging,
)
Expand All @@ -869,6 +903,8 @@ def arg_parser():
unmask_secret,
org,
repo,
search_archived,
search_forked,
log_level,
console_logging,
) = arg_parser()
Expand Down Expand Up @@ -896,6 +932,6 @@ def arg_parser():
)
sys.exit(1)

run_detection(secondary_keywords, extensions, ml_prediction, org, repo)
run_detection(secondary_keywords, extensions, ml_prediction, org, repo, search_archived, search_forked)

logger.info("xGitGuard Credentials Detection Process Completed")
Loading