-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
39 lines (32 loc) · 1.35 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from flask import Flask, render_template
from process_data import SourceData
from pyspark.sql import SparkSession
from config import S3_CONFIG
app = Flask(__name__)
def create_spark_session(bucket_arn):
spark = (SparkSession.builder
.appName("S3TablesExample")
.config("spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,"
"software.amazon.s3tables:s3-tables-catalog-for-iceberg-runtime:0.1.4,"
"software.amazon.s3tables:s3-tables-catalog-for-iceberg:0.1.4")
.config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.s3tablesbucket.catalog-impl",
"software.amazon.s3tables.iceberg.S3TablesCatalog")
.config("spark.sql.catalog.s3tablesbucket.warehouse", bucket_arn)
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.getOrCreate()
)
return spark
BUCKET_ARN = S3_CONFIG["bucket_arn"]
# 创建 Spark 会话
spark = create_spark_session(BUCKET_ARN)
# 创建 SourceData 实例,传入 Spark 会话
data = SourceData(spark)
print(data)
@app.route('/')
def index():
return render_template('index.html', form=data, title=data.title)
if __name__ == "__main__":
app.run(host='0.0.0.0', port=5000)