基于spark的实时农产品价格分析可视化_分支1

5/19/2025 pythonscalakafkaflumeflask

可视化效果视频 (opens new window)

# 项目概况

data1 (opens new window)

# 数据类型

新发地农产品价格数据

# 开发环境

centos7

# 软件版本

python3.8.18、hadoop3.2.0、spark3.1.2、mysql5.7.38、scala2.12.18、jdk8、flume1.6.0、kafka2.8.2

# 开发语言

python、Scala

# 开发流程

数据清洗(python)->数据上传(hdfs)->数据分发(flume)->数据实时化(kafka)->数据分析(spark)->数据存储(mysql)->后端(flask)->前端(html+js+css)

# 可视化图表

2025-05-19_221941

# 操作步骤

# python安装包


pip3 install pandas==2.0.3 -i https://mirrors.aliyun.com/pypi/simple/
pip3 install flask==3.0.0 -i https://mirrors.aliyun.com/pypi/simple/
pip3 install flask-cors==4.0.1 -i https://mirrors.aliyun.com/pypi/simple/
pip3 install pymysql==1.1.0 -i https://mirrors.aliyun.com/pypi/simple/

1
2
3
4
5
6

# 启动MySQL


# 查看mysql是否启动 启动命令: systemctl start mysqld.service
systemctl status mysqld.service
# 进入mysql终端
# MySQL的用户名:root 密码:123456
# MySQL的用户名:root 密码:123456
# MySQL的用户名:root 密码:123456
mysql -uroot -p123456

1
2
3
4
5
6
7
8
9

# 创建MySQL数据库


CREATE DATABASE IF NOT EXISTS echarts CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE echarts;
DROP TABLE IF EXISTS category_analysis_table;
CREATE TABLE category_analysis_table (
  pct VARCHAR(255) PRIMARY KEY,
  avg_price DOUBLE,
  update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
1
2
3
4
5
6
7
8
9

# 启动Hadoop


# 离开安全模式: hdfs dfsadmin -safemode leave
# 启动hadoop
bash /export/software/hadoop-3.2.0/sbin/start-hadoop.sh

1
2
3
4
5

hadoop

# 启动kafka


# 启动zookeeper
sh /export/software/kafka_2.12-2.8.2/bin/zookeeper-server-start.sh -daemon /export/software/kafka_2.12-2.8.2/config/zookeeper.properties
# 启动kafka
sh /export/software/kafka_2.12-2.8.2/bin/kafka-server-start.sh -daemon /export/software/kafka_2.12-2.8.2/config/server.properties
# 创建topic
/export/software/kafka_2.12-2.8.2/bin/kafka-topics.sh --create --topic mytest --replication-factor 1 --partitions 1 --zookeeper master:2181

1
2
3
4
5
6
7
8

# 准备目录


mkdir -p /data/jobs/project/flume/
rm -rf /data/jobs/project/flume/*
touch /data/jobs/project/flume/flume_data.txt
cd /data/jobs/project/

# 解压 "data" 目录下的 "data.7z" 到当前目录下
# 上传 "data" 目录下的 "data.csv" 文件

1
2
3
4
5
6
7
8
9

# 上传文件到hdfs


cd /data/jobs/project/

hdfs dfs -mkdir -p /data/input/
hdfs dfs -rm -r /data/input/*
hdfs dfs -put -f data.csv /data/input/
hdfs dfs -ls /data/input/

1
2
3
4
5
6
7
8

# 启动flume


# 上传flume监控配置文件到/export/software/apache-flume-1.6.0-bin/conf/目录

# 启动flume监控
cd /export/software/apache-flume-1.6.0-bin/
bin/flume-ng agent -n a2 -c conf -f conf/flume_source_file_sink_kafka.conf -Dflume.root.logger=INFO,console

1
2
3
4
5
6
7

# spark数据清洗


cd /data/jobs/project/

# mvn clean package -Dmaven.test.skip=true
# 上传 jar

spark-submit \
--master local[*] \
--class com.exam.SparkClean \
/data/jobs/project/spark-demo-project.jar /data/input/

1
2
3
4
5
6
7
8
9
10
11

# spark数据分析


cd /data/jobs/project/

spark-submit \
--master local[*] \
--class com.exam.SparkApp \
/data/jobs/project/spark-demo-project.jar /data/output/

1
2
3
4
5
6
7
8

# spark实时数据分析


cd /data/jobs/project/

spark-submit \
--master local[*] \
--class com.exam.Main \
/data/jobs/project/spark-demo-project.jar

1
2
3
4
5
6
7
8

# 数据发送到kafka


cd /data/jobs/project/
# 上传 "数据分发" 目录下的 "data_sync.py" 文件
python3 data_sync.py

1
2
3
4
5

# 启动可视化


mkdir -p /data/jobs/project/myapp/
cd /data/jobs/project/myapp/

# 上传 "可视化" 目录下的 "所有" 文件

# windows本地运行: python app.py
python3 app.py pro

1
2
3
4
5
6
7
8
9
Last Updated: 7/4/2025, 1:59:06 PM