Skip to content

Commit

Permalink
Merge pull request #5 from dowjones/feature/doc_exceeded_check
Browse files Browse the repository at this point in the history
unify functionality for setting creds for both auth flows
  • Loading branch information
dj-dna-andersend authored Oct 23, 2018
2 parents 3c31658 + a0cdab5 commit 13aeeff
Show file tree
Hide file tree
Showing 18 changed files with 541 additions and 403 deletions.
53 changes: 37 additions & 16 deletions Listener.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
const PubSub = require('@google-cloud/pubsub');
const ConfigUtil = require('./config/ConfigUtil');
const fetchCredentials = require('./services/fetchCredentials');
const path = require('path');
const os = require('os');
const Config = require('./config/Config');
const ExtractionApiService = require('./services/ExtractionApiService');

/** Class that allows you to listen to a number of Dow Jones PubSub subscriptions. This is a singleton. */
class Listener {

constructor(accountCredentials, pubsubClient) {
this.configUtil = new ConfigUtil(accountCredentials);
this.config = new Config(accountCredentials);
this.extractionApiService = new ExtractionApiService(
this.config.getExtractionApiHost(),
this.config.getAccountCredentials(),
this.config.getOauthUrl()
);
this.pubsubClient = pubsubClient;
}

initialize(credentials, pubSub) {
initialize(credentials) {
this.projectId = credentials.project_id;
this.pubsubClient = this.pubsubClient || new PubSub({
projectId: this.projectId,
credentials
});

this.defaultSubscriptionId = this.configUtil.getSubscriptionId();
this.defaultSubscriptionId = this.config.getSubscriptionId();
}

/**
Expand All @@ -39,24 +42,20 @@ class Listener {
* want to use the default.
*/
listen(onMessageCallback, subscription) {
return this.getCredentials().then((credentials) => {
return this.extractionApiService.getStreamingCredentials().then((credentials) => {
this.initialize(credentials);
this.readyListener(onMessageCallback, subscription);
return true;
}).catch((err) => {
if (err.message) {
console.log(err.message);
console.error(err.message);
} else {
console.log(JSON.stringify(err));
console.error(JSON.stringify(err));
}
return false;
});
}

getCredentials() {
return fetchCredentials(this.configUtil);
}

readyListener(onMessageCallback, subscriptionId) {
const sub = subscriptionId || this.defaultSubscriptionId;

Expand All @@ -75,20 +74,42 @@ class Listener {

const pubsubSubscription = this.pubsubClient.subscription(subscriptionFullName);

this.extractionApiService.getAccountInfo().then(accountInfo =>
this.checkDocCountExceeded(sub, accountInfo.max_allowed_document_extracts));

pubsubSubscription.get().then((data) => {
const pubsubSub = data[0];
pubsubSub.on('message', onMessage);
pubsubSub.on('error', (subErr) => {
console.log(`On Subscription Error: ${subErr}`);
console.error(`On Subscription Error: ${subErr}`);
pubsubSub.removeListener('message', onMessage);
pubsubSub.on('message', onMessage);
});
}).catch((err) => {
console.log(`Error retrieving subscription from Google PubSub: ${err}`);
console.error(`Error retrieving subscription from Google PubSub: ${err}`);
});

console.log('Listeners for subscriptions have been configured, set and await message arrival.');
}

checkDocCountExceeded(subscriptionId, maxDocumentsReceived) {
const streamDisabledMsg =
`\nOOPS! Looks like you've exceeded the maximum number of documents received for your account (${maxDocumentsReceived}).\n` +
'As such, no new documents will be added to your stream\'s queue.\n' +
'However, you won\'t lose access to any documents that have already been added to the queue.\n' +
'These will continue to be streamed to you.\n' +
'Contact your account administrator with any questions or to upgrade your account limits.';
const interval = 300000;
this.extractionApiService.isStreamDisabled(subscriptionId).then((isDisabled) => {
if (isDisabled) {
console.error(streamDisabledMsg);
}
setTimeout(this.checkDocCountExceeded.bind(this), interval, subscriptionId, maxDocumentsReceived);
}).catch((err) => {
console.error(err);
setTimeout(this.checkDocCountExceeded.bind(this), interval, subscriptionId, maxDocumentsReceived);
});
}
}

module.exports = Listener;
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Alternatively you can simply check out this project from Git.
#### Authentication Options
There are two credential types that can be used.

Option 1. Service Account Id (service_account_id)
Option 1. User Key

Option 2. Client Credentials (user_id, client_id, password)

Expand All @@ -27,10 +27,10 @@ They will not override values passed directly to the `Listener` constructor (Opt

Option 1. Set environment variables.

###### Service Account ID
###### User Key

**SERVICE_ACCOUNT_ID**
Dow Jones provided Service Account ID.
**USER_KEY**
Dow Jones provided user key.

**SUBSCRIPTION_ID**
This environment variable holds the subscription ID.
Expand All @@ -53,11 +53,11 @@ Option 1. Set environment variables.

Option 2. Modify the 'customerConfig.json' file. In this project's root you will find the 'customerConfig.json' file. Add your credentials and subscription ID. Ensure your additions follow the JSON data format conventions.

###### Service Account Id
###### User Key

```
{
"service_account_id": "<Dow Jones provided Service Account Id>",
"user_key": "<Dow Jones provided user key>",
"subscription_id": "<Subscription ID returned upon stream creation>"
}
```
Expand Down Expand Up @@ -86,15 +86,15 @@ Option 3: Passing values as function arguments. Specifically you can pass either
const listener = new Listener({
/**
Service Account ID
User Key
*/
service_account_id: "<YOUR SERVICE ACCOUNT ID HERE>",
user_key: "<YOUR USER KEY HERE>",
/**
Client Credentials
*/
user_id: "<YOUR USER ID HERE>",
client_id: "<YOUR CLIENT ID HERE>",
password: "<YOUR PASSWORD HERE>"
password: "<YOUR PASSWORD HERE>",
});
listener.listen(onMessageCallback);
~~~~
Expand Down Expand Up @@ -137,11 +137,11 @@ Step 1: Build the docker image. Execute the following command line:

Step 2: Run the docker image

###### Service Account ID
###### User Key

~~~
docker run -it \
-e SERVICE_ACCOUNT_ID="<your service account ID"> \
-e USER_KEY="<your user key"> \
-e SUBSCRIPTION_ID="<your subscription ID>" \
dj-dna-streaming-javascript
~~~
Expand Down
80 changes: 80 additions & 0 deletions config/Config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
const ConfigFileUtil = require('./ConfigFileUtil');
const path = require('path');

class Config {

constructor(credentials) {
this.credentials = credentials;

this.Constants = {
OAUTH_URL_ENV: 'OAUTH_URL',
OAUTH_URL_DEFAULT: 'https://accounts.dowjones.com/oauth2/v1/token',
USER_ID_ENV: 'USER_ID',
CLIENT_ID_ENV: 'CLIENT_ID',
PASSWORD_ENV: 'PASSWORD',
USER_KEY_ENV: 'USER_KEY',
SUBSCRIPTION_ID_ENV: 'SUBSCRIPTION_ID',
EXTRACTION_API_HOST_ENV: 'EXTRACTION_API_HOST',
EXTRACTION_API_HOST_DEFAULT: 'https://api.dowjones.com',
CONFIG_FILE_PATH_DEFAULT: path.join(__dirname, '../customerConfig.json')
};

this._configFileUtil = new ConfigFileUtil(this.Constants.CONFIG_FILE_PATH_DEFAULT);
}

// needed to use a separate config file for testing
setConfigFilePath(configFilePath) {
this._configFileUtil = new ConfigFileUtil(configFilePath);
}

getExtractionApiHost() {
const extractionApiHost = process.env[this.Constants.EXTRACTION_API_HOST_ENV];
return extractionApiHost || this.Constants.EXTRACTION_API_HOST_DEFAULT;
}

getOauthUrl() {
const oauthUrl = process.env[this.Constants.OAUTH_URL_ENV];
return oauthUrl || this.Constants.OAUTH_URL_DEFAULT;
}

getSubscriptionId() {
const subscriptionId = process.env[this.Constants.SUBSCRIPTION_ID_ENV];
return subscriptionId || this._configFileUtil.getSubscriptionId();
}

getAccountCredentials() {
// first get credentials from parameters if they're defined there
let accountCreds = this.credentials;

// if creds not passed in as params check env vars for credentials
if (!this._areCredsSet(accountCreds)) {
accountCreds = this._initCredsFromEnv();

// finally check config file for credentials
if (!this._areCredsSet(accountCreds)) {
accountCreds = this._configFileUtil.getAccountCredentials();
}
}

return this._areCredsSet(accountCreds) ? accountCreds : new Error(
'Error: No account credentials specified\n' +
'Must specify user_id, client_id, and password as args to Listener constructor, env vars, or via customerConfig.json file\n' +
'See dj-dna-streaming-javascript README.md'
);
}

_initCredsFromEnv() {
return {
user_id: process.env[this.Constants.USER_ID_ENV],
client_id: process.env[this.Constants.CLIENT_ID_ENV],
password: process.env[this.Constants.PASSWORD_ENV],
user_key: process.env[this.Constants.USER_KEY_ENV]
};
}

_areCredsSet(accountCreds) {
return accountCreds && (accountCreds.user_key || (accountCreds.user_id && accountCreds.client_id && accountCreds.password));
}
}

module.exports = Config;
30 changes: 8 additions & 22 deletions config/ConfigFileUtil.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
const path = require('path');
const fs = require('fs');
const _ = require('lodash');

class ConfigFileUtil {

constructor() {
this.configFilePath = path.join(__dirname, '../customerConfig.json');
constructor(configFilePath) {
this.configFilePath = configFilePath;
this.initialized = false;
}

Expand Down Expand Up @@ -41,30 +40,17 @@ class ConfigFileUtil {
return this.config.subscription_id;
}

getServiceAccountId() {
if (!this.initialized) {
this.initialize();
}

return _.trim(this.config.service_account_id);
}

getAccountCredentials() {
if (!this.initialized) {
this.initialize();
}

let accountCreds;

if (this.config.user_id && this.config.client_id && this.config.password) {
accountCreds = {
userId: _.trim(this.config.user_id),
clientId: _.trim(this.config.client_id),
password: _.trim(this.config.password)
}
}

return accountCreds;
return {
user_key: _.trim(this.config.user_key),
user_id: _.trim(this.config.user_id),
client_id: _.trim(this.config.client_id),
password: _.trim(this.config.password)
};
}
}

Expand Down
Loading

0 comments on commit 13aeeff

Please sign in to comment.