Skip to content

Commit

Permalink
Spark integrated
Browse files Browse the repository at this point in the history
  • Loading branch information
Lakpa Sherpa authored and Lakpa Sherpa committed May 29, 2023
1 parent 5319652 commit b9794da
Show file tree
Hide file tree
Showing 100 changed files with 237,880 additions and 110,720 deletions.
15 changes: 14 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
FROM apache/airflow:2.3.0

USER root
RUN apt-get update && \
apt-get install -y --no-install-recommends openjdk-11-jre-headless && \
apt-get autoremove -yqq --purge && \
apt-get clean;


USER airflow
ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-arm64

COPY requirements.txt .
RUN pip install -r requirements.txt
RUN pip install -r requirements.txt

COPY --chown=airflow:root ./dags /opt/airflow/dags
17 changes: 17 additions & 0 deletions Dockerfile.Flask
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
FROM python:3.8-alpine

WORKDIR /app

# Install app dependencies
RUN apk add bash openjdk11 --repository=http://dl-cdn.alpinelinux.org/alpine/edge/community

ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk

RUN apk --no-cache add musl-dev linux-headers g++

COPY api /app
RUN pip install --upgrade pip
RUN pip install -r /app/requirements.txt


CMD ["python","/app/app.py"]
8 changes: 8 additions & 0 deletions Dockerfile.Spark
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM bitnami/spark:latest

USER root
# RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.1/hadoop-aws-3.3.1.jar --output /opt/bitnami/spark/jars/hadoop-aws-3.3.1.jar

# COPY spark/app /usr/local/spark/app
COPY requirements_spark.txt .
RUN pip install -r requirements_spark.txt
51 changes: 46 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
# Stock Data Processing
This project creates a data pipeline for stock market data. The pipeline has the following stages:
1. Verify whether the raw data processing paths exist, and create them if necessary.
2. Read the CSV files into Pandas dataframes, merge the data with a metadata file, and write the resulting dataset into a structured format such as Parquet.
2. Read the CSV files into dataframes, merge the data with a metadata file, and write the resulting dataset into a structured format such as Parquet.
3. Verify whether the feature engineering paths exist, and create them if necessary.
4. Calculate the rolling average of the trading volume and the rolling median of the Adjusted Close, and write the resulting dataset into a structured format such as Parquet.
4. Calculate the rolling average of the trading volume and the rolling median of the Adjusted Close, and write the resulting dataset into a staging Parquet file.
5. Train a RandomForestRegressor model on the feature-engineered data, and calculate the model's performance metrics.

The DAG consists of five tasks:
## DAG components
![DAG](/docs/dagv0_1.png)
- verify_raw_data_path_task: Verify whether the raw data processing paths exist, and create them if necessary.
- raw_data_processing_task_group: Read the CSV files into Pandas dataframes, merge the data with a metadata file, and write the resulting dataset into a structured format such as Parquet.
- verify_feature_data_path_task: Verify whether the feature engineering paths exist, and create them if necessary.
- feature_engineering_task_group: Calculate the rolling average of the trading volume and the rolling median of the Adjusted Close, and write the resulting dataset into a structured format such as Parquet.
- train_model_task: Train a RandomForestRegressor model on the feature-engineered data, and calculate the model's performance metrics.

## Architecture components
![Architecture](/docs/riskthinking.drawio.png)

## How to use
Before running this program, you will need to follow below steps:
### Step 1: Clone repo
Expand All @@ -25,7 +28,15 @@ You can follow this [link](https://docs.docker.com/desktop/).
mkdir ./data ./staging
```
### Step 4: Download the data from Kaggle in ./data directory
Download the ETF and stock datasets from the primary dataset available at [Kaggle](https://www.kaggle.com/datasets/jacksoncrow/stock-market-dataset)
Download the ETF and stock datasets from the primary dataset available at [Kaggle](https://www.kaggle.com/datasets/jacksoncrow/stock-market-dataset) and move to `data` directory.
### Step 5: Build the Images
```sh
docker build -f Dockerfile.Spark . -t airflow-spark
```

```sh
docker build -f Dockerfile.Flask . -t flask-app
```
### Step 5: Initialise the Airflow Database
```sh
docker-compose up airflow-init
Expand All @@ -34,6 +45,36 @@ docker-compose up airflow-init
```sh
docker-compose up
```
## Access necessary links
### Airflow:

[localhost:8080](localhost:8080)

By default, username and password will be <strong>airflow</strong> and hit ‘Sign in’.

Create a new spark connection with detail as shown in the image.
![Connection](/docs/connection.png)

### Spark Master:
[http://localhost:8090/](http://localhost:8090/)
![Spark UI](/docs/sparkui.png)

### Jupyter Notebook:
[http://127.0.0.1:8888](http://127.0.0.1:8888)

For Jupyter notebook, you must copy the URL with the token generated when the container is started and paste in your browser. The URL with the token can be taken from container logs using:

```sh
docker logs -f de-rt-jupyter-spark-1
```

### Model Serving API:
[http://127.0.0.1:8008/](http://127.0.0.1:8008/)
You will get 'Welcome!' response on default link. You can use /predict API endpoint which takes two values, vol_moving_avg and adj_close_rolling_med, and return with an integer value that represents the trading volume.
```
http://127.0.0.1:8008/predict?vol_moving_avg=12345&adj_close_rolling_med=55
```


## Author
This program was created by [Lakpa Sherpa](https://slakpa.com.np).
This data pipeline was created by [Lakpa Sherpa](https://slakpa.com.np).
33 changes: 30 additions & 3 deletions api/app.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
from flask import Flask, request
import pickle
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressionModel
from pyspark.ml import PipelineModel

app = Flask(__name__)
spark = SparkSession.builder.appName("random_reg").config("spark.sql.parquet.compression.codec", "lz4").getOrCreate()

@app.route('/predict', methods=['GET'])
def predict():
@app.route('/')
def home():
return "Welcome!"

@app.route('/predict_old', methods=['GET'])
def predict_old():
# load the trained predictive model
with open('./staging/predictive_model.pickle', 'rb') as f:
model = pickle.load(f)
Expand All @@ -15,6 +24,24 @@ def predict():
prediction = model.predict([[vol_moving_avg, adj_close_rolling_med]])
return str(int(prediction))

@app.route('/predict', methods=['GET'])
def predict():
# load the trained predictive model
model = RandomForestRegressionModel.load("/usr/local/spark/staging/regressor.model")

vol_moving_avg = float(request.args.get('vol_moving_avg'))
adj_close_rolling_med = float(request.args.get('adj_close_rolling_med'))

new_feature = spark.createDataFrame([(vol_moving_avg, adj_close_rolling_med, 0.0)],["vol_moving_avg", "adj_close_rolling_med", "Volume"])
vectorAssembler = VectorAssembler(inputCols = ['vol_moving_avg', 'adj_close_rolling_med'], outputCol = 'features')
vstock_df = vectorAssembler.transform(new_feature)
vstock_df = vstock_df.select('features', 'Volume')

predict = model.transform(vstock_df)
prediction = predict.select('prediction').collect()[0]['prediction']
# prediction = 123123
return str(prediction)

if __name__ == '__main__':
# run the Flask app on port 5000
app.run(debug=True, port=5000)
app.run(debug=True, host='0.0.0.0', port=5000)
2 changes: 0 additions & 2 deletions api/requirements.py

This file was deleted.

4 changes: 4 additions & 0 deletions api/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
flask
numpy
# scikit-learn
pyspark
Binary file added dags/__pycache__/spark_test.cpython-37.pyc
Binary file not shown.
Binary file modified dags/__pycache__/stock_ml_train.cpython-37.pyc
Binary file not shown.
Binary file added dags/__pycache__/stock_spark_ml.cpython-37.pyc
Binary file not shown.
17 changes: 16 additions & 1 deletion dags/stock_ml_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from airflow.utils.task_group import TaskGroup
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator

import os
import pickle
Expand All @@ -19,6 +20,8 @@
'start_date': days_ago(1)
}

spark_master = "spark://spark:7077"

dag = DAG(dag_id = 'stock_ml_train', default_args=args, schedule_interval=None)

today = datetime.today().strftime('%Y%m%d')
Expand Down Expand Up @@ -211,4 +214,16 @@ def train_model_func():
python_callable = train_model_func
)

task_group_raw_data_processing >> task_group_feature_engineering >> train_model
spark_job = SparkSubmitOperator(
task_id="spark_job",
application="/usr/local/spark/app/hello-world.py", # Spark application path created in airflow and spark cluster
name="Spark Hello World",
conn_id="spark_default",
verbose=1,
conf={"spark.master": spark_master},
application_args=["/usr/local/spark/app/hello-world.py"],
dag=dag)


# task_group_raw_data_processing >> task_group_feature_engineering >> train_model
spark_job
104 changes: 104 additions & 0 deletions dags/stock_spark_ml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from datetime import timedelta, datetime
import os
import pickle
import random
import pandas as pd
from pyspark.sql.functions import lit
from pyspark.sql.functions import input_file_name
from pyspark.sql.functions import element_at, split, col


from airflow import DAG
from airflow.utils.task_group import TaskGroup
from airflow.operators.python_operator import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.utils.dates import days_ago

default_args = {
'owner': 'airflow',
'start_date': days_ago(1)
}

today = datetime.today().strftime('%Y%m%d')
stage_path = "/usr/local/spark/staging/"

spark_dag = DAG(
dag_id = "stock_spark_airflow",
default_args=default_args,
schedule_interval=None,
dagrun_timeout=timedelta(minutes=60*24),
description='use case of sparkoperator in airflow',
)


def verify_raw_data_path_func():
output_path = stage_path + today + "/raw_data_processing"

# Check whether the specified path exists or not
if not os.path.exists(output_path):
# Create a new directory because it does not exist
os.makedirs(output_path)
print("The new directory is created: ", output_path)

def verify_feature_data_path_func():
output_path = stage_path + today + "/feature_engineering"

# Check whether the specified path exists or not
if not os.path.exists(output_path):
# Create a new directory because it does not exist
os.makedirs(output_path)
print("The new directory is created!")

with spark_dag:
with TaskGroup("raw_data_processing", tooltip="Tasks for raw_data_processing") as task_group_raw_data_processing:
verify_raw_data_path = PythonOperator(
task_id="verify_raw_data_path",
python_callable = verify_raw_data_path_func)

stock_data_processing = SparkSubmitOperator(
application = "/usr/local/spark/app/raw_data_processing.py",
conn_id= 'spark_local',
task_id='stock_data_processing',
application_args=['stocks'],
)

verify_raw_data_path >> stock_data_processing

etf_data_processing = SparkSubmitOperator(
application = "/usr/local/spark/app/raw_data_processing.py",
conn_id= 'spark_local',
task_id='etf_data_processing',
application_args=['etfs'],
)

verify_raw_data_path >> etf_data_processing

with TaskGroup("feature_engineering", tooltip="Tasks for feature_engineering") as task_group_feature_engineering:
task_verify_feature_data_path = PythonOperator(
task_id='verify_feature_data_path',
python_callable = verify_feature_data_path_func
)

stock_feature_engineering = SparkSubmitOperator(
application = "/usr/local/spark/app/feature_engineering_processing.py",
conn_id= 'spark_local',
task_id='stock_feature_processing',
application_args=['stocks'],
)

etf_feature_engineering = SparkSubmitOperator(
application = "/usr/local/spark/app/feature_engineering_processing.py",
conn_id= 'spark_local',
task_id='etf_feature_processing',
application_args=['etfs'],
)

task_verify_feature_data_path >> [stock_feature_engineering, etf_feature_engineering]

train_model = SparkSubmitOperator(
application = "/usr/local/spark/app/train_model.py",
conn_id= 'spark_local',
task_id='train_model',
)

task_group_raw_data_processing >> task_group_feature_engineering >> train_model
Loading

0 comments on commit b9794da

Please sign in to comment.