-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsearch.py
163 lines (144 loc) · 6.81 KB
/
search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
from github import Github
import time
import datetime
import re
class Search:
def __init__(self, config_file, query, search):
self.config = self.read_config(config_file)
self.query = query
self.search = search
self.repo_names = []
self.file_names = []
@staticmethod
def read_config(config_file):
config = {}
with open(config_file, 'r') as file:
for line in file:
key, value = line.split("=")
if key not in config:
try:
config[key] = int(value)
except ValueError:
config[key] = value.strip()
return config
@staticmethod
def go_to_sleep(message, sleep_duration):
start_time = datetime.datetime.now()
print(message,
"\nSleeping at:", start_time,
"for: %i seconds" % sleep_duration)
time.sleep(sleep_duration)
end_time = datetime.datetime.now()
print("Woke at: ", end_time)
def read_repo_names(self):
with open(self.config["REPO_NAMES_FILE"]) as repo_names:
for repo in repo_names:
self.repo_names.append(repo.strip())
@staticmethod
def write_list_to_file(filename, lst):
with open(filename, "w") as repo_names:
for repo in lst:
repo_names.write(repo + "\n")
def check_search_rate(self):
search_rate = Github(self.config["TOKEN"]).get_rate_limit().search
if search_rate.remaining == 0:
print("You have 0/%i API calls remaining. Reset time: %s"
% (search_rate.limit, search_rate.reset.replace(tzinfo=datetime.timezone.utc).astimezone()))
raise RuntimeError
else:
print("You have %i/%i API calls remaining"
% (search_rate.remaining, search_rate.limit))
def find_repos(self):
try:
self.check_search_rate()
github = Github(self.config["TOKEN"])
potential_repos = set()
issue_repos = set()
query = self.query + " pushed:>=" + (datetime.datetime.now() -
datetime.timedelta(days=self.config["TIME_SPAN"])
).strftime("%Y-%m-%d") + " stars:>=5"
result = github.search_repositories(query)
if result.totalCount > 0:
repo_count = 0
page = 1
# Can only read in sets of 30 so max is 990 results
while repo_count < self.config["MAX_RESULTS"]:
try:
for repo in result.get_page(page):
potential_repos.add(repo.full_name)
repo_count += 1
page += 1
except Exception as e:
print(e)
match = re.match("[0-9]+", str(e))
if match.group() == '422':
break
self.go_to_sleep("Error when retrieving repos", self.config["QUICK_SLEEP"])
self.go_to_sleep("Quick nap after getting repos, before checking the PRs", self.config["QUICK_SLEEP"])
# Loop through potential repos and find ones that have closed Pull Requests
i = 0
issue_query = "type:pr state:closed"
potential_repo_list = list(potential_repos)
while i < len(potential_repos):
try:
query = issue_query + " repo:" + potential_repo_list[i]
result = github.search_issues(query)
if result.totalCount > 0:
issue_repos.add(potential_repo_list[i])
i += 1
except Exception as e:
print(e)
print("%i/%i repos checked" % (i, len(potential_repos)))
self.go_to_sleep("Error when checking PRs", self.config["QUICK_SLEEP"])
self.go_to_sleep("Quick nap after getting repos, before checking the PRs", self.config["QUICK_SLEEP"])
self.repo_names = list(potential_repos.intersection(issue_repos))
self.write_list_to_file(self.config["REPO_NAMES_FILE"], self.repo_names)
except RuntimeError:
self.go_to_sleep("Error: abuse detection mechanism detected.", self.config["ERROR_SLEEP"])
def find_code_in_repo(self):
try:
self.check_search_rate()
self.read_repo_names()
github = Github(self.config["TOKEN"])
i = 0
starting_i = 0
files = set()
while i < len(self.repo_names):
query = self.search + " in:file"
try:
# Save the index we started at in case we get an error
starting_i = i
# Add repo names up to the 256 char limit
while len(query) < 256 and i < len(self.repo_names):
temp_query = query + " repo:" + self.repo_names[i]
if len(temp_query) > 256:
break
else:
query = temp_query
i += 1
# Search and add the files which contain the code
result = github.search_code(query)
for contentFile in result:
try:
add_file = True
for word in self.search.split():
# All words in the search must be found in the file contents
if contentFile.decoded_content.decode().find(word) == -1:
add_file = False
break
if add_file:
files.add(contentFile.html_url)
except Exception as e:
print(e)
self.go_to_sleep("Error retrieving htmlUrl from contentFile", self.config["QUICK_SLEEP"])
i += 1
except Exception as e:
print(e)
# If we get an error then reset the index
i = starting_i
print("%i/%i repos analyzed" % (i, len(self.repo_names)))
self.go_to_sleep("Error when searching for code", self.config["QUICK_SLEEP"])
self.file_names = list(files)
self.write_list_to_file(self.config["FILE_NAMES_FILE"], self.file_names)
except RuntimeError:
self.go_to_sleep("Error: abuse detection mechanism detected.", self.config["ERROR_SLEEP"])