@@ -38,27 +38,27 @@ def aws_access(*argv):
38
38
return aws_access_key_id , aws_secret_access_key
39
39
40
40
41
- def s3_to_pyspark (config , aws_access_key_id , aws_secret_access_key ):
41
+ def s3_to_pyspark (aws_access_key_id , aws_secret_access_key ):
42
42
"""
43
43
Set up spark context and s3 bucket and folder config
44
44
"""
45
45
conf = SparkConf ()
46
- conf .setMaster (config [ "publicDNS" ] )
46
+ conf .setMaster (os . getenv ( "publicDNS" ) )
47
47
conf .setAppName ("topicMakr" )
48
48
sc = SparkContext (conf = conf )
49
49
sqlContext = SQLContext (sc )
50
50
# Connect to bucket with boto3
51
51
s3 = boto3 .resource ('s3' )
52
- bucket = s3 .Bucket (config [ "bucket" ] )
52
+ bucket = s3 .Bucket (os . getenv ( "bucket" ) )
53
53
# Loop through all files and create a file list
54
54
filelist = []
55
- for obj in bucket .objects .filter (Prefix = config [ "bucketfolder" ] ):
55
+ for obj in bucket .objects .filter (Prefix = os . getenv ( "bucketfolder" ) ):
56
56
if obj .size :
57
57
filelist .append ("s3n://" + bucket .name + "/" + obj .key )
58
58
59
59
# Filter list to just books (named with numbers as per project gutenberg)
60
60
filelist = fnmatch .filter (filelist , "s3n://" + bucket .name + "/" +
61
- config [ "bucketfolder" ] + "[0-9]*.txt" )
61
+ os . getenv ( "bucketfolder" ) + "[0-9]*.txt" )
62
62
63
63
def preproc (iterator ):
64
64
"""
@@ -117,7 +117,7 @@ def preproc(iterator):
117
117
return sqlContext , tokens , titles
118
118
119
119
120
- def books_to_lda (ldaparam , sqlContext , tokens , titles ):
120
+ def books_to_lda (sqlContext , tokens , titles ):
121
121
"""
122
122
Convert tokens to TF-IDF and run LDA model
123
123
"""
@@ -144,13 +144,12 @@ def books_to_lda(ldaparam, sqlContext, tokens, titles):
144
144
result_tfidf = idfModel .transform (result_cv )
145
145
146
146
# Run LDA model
147
- lda = LDA (k = ldaparam [ "k" ] , maxIter = ldaparam [ "maxIter" ] )
147
+ lda = LDA (k = os . getenv ( "k" ) , maxIter = os . getenv ( "maxIter" ) )
148
148
model = lda .fit (result_tfidf )
149
149
return vocab , result_tfidf , model
150
150
151
151
152
- def postgres_tables (SQLconf , ldaparam , vocab ,
153
- result_tfidf , model , sqlContext , titles ):
152
+ def postgres_tables (vocab , result_tfidf , model , sqlContext , titles ):
154
153
"""
155
154
Set up tables and write to postgres
156
155
"""
@@ -177,7 +176,7 @@ def postgres_tables(SQLconf, ldaparam, vocab,
177
176
.cast (StringType ()))
178
177
179
178
# Get top 7 words per topic
180
- topics = model .describeTopics (maxTermsPerTopic = SQLconf [ "topwords" ] )
179
+ topics = model .describeTopics (maxTermsPerTopic = os . getenv ( "topwords" ) )
181
180
#
182
181
# Add vocab to topics dataframe
183
182
topics_rdd = topics .rdd
@@ -195,46 +194,25 @@ def postgres_tables(SQLconf, ldaparam, vocab,
195
194
196
195
# Save dataframes to postgreSQL database on postgres_DB ec2 instance
197
196
topics .write .format ('jdbc' ) \
198
- .options (url = SQLconf [ "postgresURL" ] ,
197
+ .options (url = os . getenv ( "postgresURL" ) ,
199
198
driver = 'org.postgresql.Driver' , dbtable = 'topics' ) \
200
199
.mode ('overwrite' ).save ()
201
200
202
201
top_doc_table .write .format ('jdbc' ) \
203
- .options (url = SQLconf [ "postgresURL" ] ,
202
+ .options (url = os . getenv ( "postgresURL" ) ,
204
203
driver = 'org.postgresql.Driver' , dbtable = 'documents' ) \
205
204
.mode ('overwrite' ).save ()
206
205
207
206
208
- # Set configurations
209
- config = {
210
- "publicDNS" : "spark://ec2-54-227-182-209.compute-1.amazonaws.com:7077" ,
211
- "bucket" : "maxcantor-insight-deny2019a-bookbucket" ,
212
- "bucketfolder" : "gutenberg_data/unzipped_data/"
213
- }
214
-
215
- ldaparam = {
216
- "k" : 20 ,
217
- "maxIter" : 100
218
- }
219
-
220
- SQLconf = {
221
- "topwords" : 7 ,
222
- "postgresURL" : "jdbc:postgresql://" +
223
- "ec2-54-205-173-0.compute-1.amazonaws.com/lda_booktopics"
224
- }
225
-
226
207
if __name__ == '__main__' :
227
208
"""
228
209
Run pipeline functions
229
210
"""
230
211
[aws_access_key_id , aws_secret_access_key ] = aws_access (* sys .argv )
231
- [sqlContext , tokens , titles ] = s3_to_pyspark (
232
- config ,
233
- aws_access_key_id ,
234
- aws_secret_access_key )
235
- [vocab , result_tfidf , model ] = books_to_lda (
236
- ldaparam , sqlContext ,
237
- tokens , titles )
238
- postgres_tables (
239
- SQLconf , ldaparam , vocab , result_tfidf ,
240
- model , sqlContext , titles )
212
+
213
+ [sqlContext , tokens , titles ] = s3_to_pyspark (aws_access_key_id ,
214
+ aws_secret_access_key )
215
+
216
+ [vocab , result_tfidf , model ] = books_to_lda (sqlContext , tokens , titles )
217
+
218
+ postgres_tables (vocab , result_tfidf , model , sqlContext , titles )
0 commit comments