Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
T
test_webhook
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
时海鑫
test_webhook
Commits
6667a6fe
Commit
6667a6fe
authored
Oct 10, 2025
by
时海鑫
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'dev' into 'master'
test2 See merge request
!16
parents
14bb618e
5017c2cb
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
486 additions
and
0 deletions
+486
-0
main.py
main.py
+486
-0
No files found.
main.py
0 → 100644
View file @
6667a6fe
import
json
import
os
import
re
import
shutil
import
stat
import
time
from
datetime
import
datetime
from
urllib.parse
import
urlparse
import
requests
from
flask
import
Flask
,
request
,
jsonify
import
subprocess
from
flask
import
Flask
,
render_template
,
request
,
redirect
,
url_for
import
pymysql
from
flask_sqlalchemy
import
SQLAlchemy
import
configparser
import
glob
from
buildTools.DBU
import
DBU
from
buildTools.SendEmailTools
import
sendEmail2
from
buildTools.UploadFile2git
import
creatTagAndRelease
from
buildTools.WxBootTools
import
sendWx
from
buildTools.extensions
import
db
from
model.Record
import
Record
from
model.RecvData
import
*
SECRET_TOKEN
=
"qwer123"
# 建议从环境变量读取
VALID_USERNAME
=
"admin"
VALID_PASSWORD
=
"123456"
app
=
Flask
(
__name__
)
app
.
config
[
'SQLALCHEMY_DATABASE_URI'
]
=
'mysql+pymysql://root:root123@localhost:3306/tyw?charset=utf8mb4'
app
.
config
[
'SQLALCHEMY_TRACK_MODIFICATIONS'
]
=
False
db
.
init_app
(
app
)
dbu
=
DBU
()
# 数据库工具类实例
# 路由:首页
@
app
.
route
(
"/home"
)
def
home
():
return
render_template
(
"home.html"
)
# 默认跳转到登录页
@
app
.
route
(
"/"
)
def
index
():
return
redirect
(
url_for
(
"login"
))
@
app
.
route
(
"/login"
,
methods
=
[
"GET"
,
"POST"
])
def
login
():
error
=
None
login_success
=
False
if
request
.
method
==
"POST"
:
username
=
request
.
form
.
get
(
"username"
)
password
=
request
.
form
.
get
(
"password"
)
if
username
==
VALID_USERNAME
and
password
==
VALID_PASSWORD
:
login_success
=
True
return
render_template
(
"login.html"
,
login_success
=
True
)
else
:
error
=
"用户名或密码错误"
return
render_template
(
"login.html"
,
error
=
error
)
@
app
.
route
(
'/cicd'
,
methods
=
[
'POST'
])
def
handle_cicd
():
baseurl
=
'http://tyw-server.synology.me:12345'
def
remove_readonly
(
func
,
path
,
_
):
os
.
chmod
(
path
,
stat
.
S_IWRITE
)
func
(
path
)
def
update_record
(
data
):
if
'id'
not
in
data
:
return
{
'code'
:
-
1
,
'message'
:
"更新失败"
,
'error_message'
:
"缺少主键ID"
}
dbu
.
openDB
()
if
not
dbu
.
checkDb
():
return
{
'code'
:
-
2
,
'message'
:
"更新失败"
,
'error_message'
:
"数据库连接失败"
}
try
:
update_fields
=
[]
params
=
[]
# 要更新的字段(排除 id)
allowed_fields
=
[
'project_name'
,
'project_group'
,
'builder'
,
'create_time'
,
'build_time'
,
'status'
,
'wechat_notify'
,
'email_notify'
,
'build_type'
]
for
field
in
allowed_fields
:
if
field
in
data
:
update_fields
.
append
(
f
"{field} =
%
s"
)
params
.
append
(
data
[
field
])
if
not
update_fields
:
return
{
'code'
:
-
3
,
'message'
:
"更新失败"
,
'error_message'
:
"没有需要更新的字段"
}
params
.
append
(
data
[
'id'
])
# 添加 id 到参数列表最后
sql
=
f
"""
UPDATE t_record
SET {', '.join(update_fields)}
WHERE id =
%
s
"""
with
dbu
.
connection
.
cursor
()
as
cursor
:
cursor
.
execute
(
sql
,
params
)
dbu
.
connection
.
commit
()
return
{
'code'
:
0
,
'message'
:
"更新成功"
,
'error_message'
:
""
}
except
Exception
as
e
:
return
{
'code'
:
-
100
,
'message'
:
"更新失败"
,
'error_message'
:
str
(
e
)
}
finally
:
dbu
.
closeDB
()
def
get_current_timestamp
():
return
datetime
.
now
()
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
)
def
add_record
(
data
):
required_fields
=
[
'project_name'
,
'project_group'
,
'builder'
,
'create_time'
,
'build_time'
,
'status'
,
'wechat_notify'
,
'email_notify'
,
'build_type'
]
# 校验是否有缺字段
for
field
in
required_fields
:
if
field
not
in
data
:
return
{
'code'
:
-
1
,
'message'
:
"插入失败"
,
'error_message'
:
f
"缺少字段: {field}"
}
dbu
.
openDB
()
if
not
dbu
.
checkDb
():
return
{
'code'
:
-
2
,
'message'
:
"插入失败"
,
'error_message'
:
"数据库连接失败"
}
try
:
with
dbu
.
connection
.
cursor
()
as
cursor
:
sql
=
"""
INSERT INTO t_record (
project_name, project_group, builder,
create_time, build_time, status,
wechat_notify, email_notify, build_type
) VALUES (
%
s,
%
s,
%
s,
%
s,
%
s,
%
s,
%
s,
%
s,
%
s)
"""
cursor
.
execute
(
sql
,
(
data
[
'project_name'
],
data
[
'project_group'
],
data
[
'builder'
],
data
[
'create_time'
],
data
[
'build_time'
],
int
(
data
[
'status'
]),
int
(
data
[
'wechat_notify'
]),
int
(
data
[
'email_notify'
]),
int
(
data
[
'build_type'
])
))
insert_id
=
cursor
.
lastrowid
# 获取自增ID
dbu
.
connection
.
commit
()
return
{
'code'
:
0
,
'message'
:
"插入成功"
,
'id'
:
insert_id
,
# 返回主键ID
'error_message'
:
""
}
except
Exception
as
e
:
return
{
'code'
:
-
100
,
'message'
:
"插入失败"
,
'error_message'
:
f
"{e}"
}
finally
:
dbu
.
closeDB
()
raw_data
=
request
.
data
.
decode
(
"utf-8"
)
try
:
# 读取cicd的文件 加载到payload 类里
data
=
json
.
loads
(
raw_data
)
print
(
"Parsed JSON:"
,
data
)
payload
=
GitlabPayload
(
repo_url
=
data
[
"repo_url"
],
metadata
=
Metadata
(
**
data
[
"metadata"
]),
commit
=
Commit
(
**
data
[
"commit"
]),
timestamps
=
Timestamps
(
**
data
[
"timestamps"
]),
urls
=
Urls
(
**
data
[
"urls"
])
)
print
(
"✅ json 类加载成功"
)
# 截取出实际的git地址
repo_url_split
=
payload
.
repo_url
.
split
(
'30000'
)[
-
1
]
project_url
=
baseurl
+
repo_url_split
print
(
project_url
)
print
(
"✅ git地址获取成功"
)
t_project_url
=
project_url
[:
-
4
]
base_dir
=
"project"
parsed_path
=
urlparse
(
t_project_url
)
.
path
.
lstrip
(
"/"
)
# 得到 allwinner/media/cedarc-release/libcedarc
full_path
=
os
.
path
.
join
(
base_dir
,
parsed_path
)
project_root
=
os
.
path
.
dirname
(
full_path
)
last_dir_name
=
os
.
path
.
basename
(
full_path
)
target_dir
=
os
.
path
.
join
(
project_root
,
last_dir_name
)
# 如果存在就删除
if
os
.
path
.
exists
(
target_dir
):
print
(
f
"⚠️ 目录已存在,准备删除:{target_dir}"
)
shutil
.
rmtree
(
target_dir
,
onerror
=
remove_readonly
)
os
.
makedirs
(
full_path
,
exist_ok
=
True
)
print
(
f
"✅ 目录:{full_path}创建成功"
)
subprocess
.
run
([
"git"
,
"clone"
,
project_url
,
full_path
],
check
=
True
)
subprocess
.
run
([
"git"
,
"checkout"
,
payload
.
commit
.
branch
],
cwd
=
full_path
,
check
=
True
)
print
(
f
"仓库已克隆并切换到分支:{payload.commit.branch}"
)
print
(
f
"✅ git文件下载成功,并成功切换到{payload.commit.branch}"
)
cicd_conf_path
=
os
.
path
.
join
(
full_path
,
"cicd.conf"
)
if
os
.
path
.
isfile
(
cicd_conf_path
):
print
(
"✅ cicd.conf 文件存在,开始解析"
)
else
:
print
(
"❌ cicd.conf 文件不存在,请上传"
)
config
=
configparser
.
ConfigParser
()
config
.
read
(
full_path
+
'
\\
cicd.conf'
,
encoding
=
'utf-8'
)
confDict
=
{}
for
section
in
config
.
sections
():
print
(
f
"[{section}]"
)
for
key
,
value
in
config
.
items
(
section
):
confDict
[
key
]
=
value
print
(
confDict
)
print
(
"✅ cicd.conf 解析完成,开始编译"
)
time
.
sleep
(
2
)
print
(
"更新状态"
)
sql_res
=
add_record
({
"project_name"
:
f
"{payload.metadata.project_name}"
,
"project_group"
:
f
"{payload.metadata.project_namespace}"
,
"builder"
:
f
"{payload.metadata.user_login}"
,
"create_time"
:
f
"{payload.timestamps.job_started_at}"
,
"build_time"
:
"2024-09-02 10:10:00"
,
"status"
:
0
,
"wechat_notify"
:
confDict
[
'wechat_notify'
],
"email_notify"
:
confDict
[
'email_notify'
],
"build_type"
:
confDict
[
'build_type'
]
})
time
.
sleep
(
2
)
# 编译代码
# todo 需要等待 新电脑环境
print
(
"✅ 编译完成,开始处理产物"
)
update_record
({
'id'
:
sql_res
[
'id'
],
"build_time"
:
get_current_timestamp
(),
"status"
:
1
})
# 产物处理
# todo 需要封装 胡松代码
time
.
sleep
(
2
)
print
(
"✅ 产物处理完成完成,开始上传"
)
update_record
({
'id'
:
sql_res
[
'id'
],
"build_time"
:
get_current_timestamp
(),
"status"
:
2
})
time
.
sleep
(
2
)
print
(
"✅ 上传完成,准备发送通知"
)
update_record
({
'id'
:
sql_res
[
'id'
],
"build_time"
:
get_current_timestamp
(),
"status"
:
3
})
# gitlab 打tag和 上传release 版本为version_code
if
confDict
.
get
(
'release'
)
==
'1'
:
# creatTagAndRelease(recv_data=payload, confDict=confDict, url=repo_url_split)
print
(
"✅ gitlab release 完成"
)
# 根据通知类型,开始通知
if
confDict
.
get
(
'wechat_notify'
,
'0'
)
==
'0'
:
print
(
"⚠️ 微信通知未开启"
)
else
:
print
(
"✅ 微信通知已开启"
)
# todo wifi 下不可用 网线解除注释 功能已完成
# sendWx(recv_data=payload,confDict=confDict,url=project_url)
update_record
({
'id'
:
sql_res
[
'id'
],
"build_time"
:
get_current_timestamp
(),
"wechat_notify"
:
2
,
"email_notify"
:
confDict
[
'email_notify'
],
"status"
:
3
})
print
(
"✅ 微信通知结束"
)
if
confDict
.
get
(
'email_notify'
,
'0'
)
==
'0'
:
print
(
"⚠️ 邮件通知未开启"
)
else
:
print
(
"✅ 邮件通知已开启"
)
# todo 需要拿到高权限 然后拿到项目下的 邮箱,轮询调用 sendEmail 目前的发送邮箱为 haixin.shi@hljtyw.com 目前功能已经结束
# sendEmail2(text='长安汽车',info="版本号升级为vv1.0.0",receiver_email="842463468@qq.com")
update_record
({
'id'
:
sql_res
[
'id'
],
"build_time"
:
get_current_timestamp
(),
"email_notify"
:
2
,
"status"
:
3
})
print
(
"✅ 邮件通知结束"
)
update_record
({
'id'
:
sql_res
[
'id'
],
"build_time"
:
get_current_timestamp
(),
"status"
:
4
})
return
jsonify
({
"status"
:
"success"
,
"message"
:
"Received params"
})
except
Exception
as
e
:
print
(
"Error parsing JSON:"
,
e
)
return
jsonify
({
"status"
:
"error"
,
"message"
:
"Invalid JSON"
}),
400
@
app
.
route
(
'/add_record'
,
methods
=
[
'POST'
])
def
add_record
():
data
=
request
.
get_json
()
required_fields
=
[
'project_name'
,
'project_group'
,
'builder'
,
'create_time'
,
'build_time'
,
'status'
,
'wechat_notify'
,
'email_notify'
,
'build_type'
]
# 校验是否有缺字段
for
field
in
required_fields
:
if
field
not
in
data
:
return
jsonify
({
"status"
:
"error"
,
"message"
:
f
"Missing field: {field}"
}),
400
dbu
.
openDB
()
if
not
dbu
.
checkDb
():
return
jsonify
({
"status"
:
"error"
,
"message"
:
"db not connected"
}),
500
try
:
with
dbu
.
connection
.
cursor
()
as
cursor
:
sql
=
"""
INSERT INTO t_record (
project_name, project_group, builder,
create_time, build_time, status,
wechat_notify, email_notify, build_type
) VALUES (
%
s,
%
s,
%
s,
%
s,
%
s,
%
s,
%
s,
%
s,
%
s)
"""
cursor
.
execute
(
sql
,
(
data
[
'project_name'
],
data
[
'project_group'
],
data
[
'builder'
],
data
[
'create_time'
],
data
[
'build_time'
],
int
(
data
[
'status'
]),
int
(
data
[
'wechat_notify'
]),
int
(
data
[
'email_notify'
]),
int
(
data
[
'build_type'
])
))
dbu
.
connection
.
commit
()
return
jsonify
({
"status"
:
"success"
,
"message"
:
"record added"
})
except
Exception
as
e
:
return
jsonify
({
"status"
:
"error"
,
"message"
:
str
(
e
)})
finally
:
dbu
.
closeDB
()
@
app
.
route
(
'/get_filtered_records'
,
methods
=
[
'GET'
])
def
get_filtered_records
():
project_name
=
request
.
args
.
get
(
'project_name'
,
''
)
builder
=
request
.
args
.
get
(
'builder'
,
''
)
create_time
=
request
.
args
.
get
(
'create_time'
,
''
)
sql
=
'SELECT * FROM t_record WHERE 1=1'
params
=
[]
if
project_name
:
sql
+=
' AND project_name LIKE
%
s'
params
.
append
(
'
%
'
+
project_name
+
'
%
'
)
if
builder
:
sql
+=
' AND builder LIKE
%
s'
params
.
append
(
'
%
'
+
builder
+
'
%
'
)
if
create_time
:
sql
+=
' AND create_time LIKE
%
s'
params
.
append
(
'
%
'
+
create_time
+
'
%
'
)
result
=
dbu
.
execute
(
sql
,
tuple
(
params
),
fetch
=
True
)
return
jsonify
({
"status"
:
"success"
,
"data"
:
result
})
# 2. 根据项目名查询
@
app
.
route
(
'/get_by_project'
,
methods
=
[
'GET'
])
def
get_by_project
():
name
=
request
.
args
.
get
(
'project_name'
)
# 使用 LIKE 进行模糊查询,% 表示任意字符的匹配
sql
=
'SELECT * FROM t_record WHERE project_name LIKE
%
s'
result
=
dbu
.
execute
(
sql
,
(
'
%
'
+
name
+
'
%
'
,),
fetch
=
True
)
# 在项目名两侧加上%以进行模糊查询
return
jsonify
({
"status"
:
"success"
,
"data"
:
result
})
# 3. 根据操作人(builder)查询
@
app
.
route
(
'/get_by_builder'
,
methods
=
[
'GET'
])
def
get_by_builder
():
builder
=
request
.
args
.
get
(
'builder'
)
sql
=
'SELECT * FROM t_record WHERE builder =
%
s'
result
=
dbu
.
execute
(
sql
,
(
builder
,),
fetch
=
True
)
return
jsonify
(
result
)
# 4. 根据创建时间查询
@
app
.
route
(
'/get_by_create_time'
,
methods
=
[
'GET'
])
def
get_by_create_time
():
create_time
=
request
.
args
.
get
(
'create_time'
)
sql
=
'SELECT * FROM t_record WHERE create_time =
%
s'
result
=
dbu
.
execute
(
sql
,
(
create_time
,),
fetch
=
True
)
return
jsonify
(
result
)
@
app
.
route
(
"/query_all"
)
def
query_all
():
records
=
Record
.
query
.
order_by
(
Record
.
id
.
desc
())
.
all
()
# 根据需要排序
result
=
[]
for
record
in
records
:
result
.
append
({
"id"
:
record
.
id
,
"project_name"
:
record
.
project_name
,
"project_group"
:
record
.
project_group
,
"builder"
:
record
.
builder
,
"create_time"
:
record
.
create_time
or
""
,
"build_time"
:
record
.
build_time
or
""
,
"status"
:
record
.
status
,
"wechat_notify"
:
record
.
wechat_notify
,
"email_notify"
:
record
.
email_notify
,
"build_type"
:
record
.
build_type
,
})
return
jsonify
({
"status"
:
"success"
,
"data"
:
result
})
def
test_query_all
():
url
=
"http://127.0.0.1:5000/query_all"
# 替换为你实际运行的 Flask 地址
try
:
response
=
requests
.
get
(
url
)
if
response
.
status_code
==
200
:
data
=
response
.
json
()
print
(
"接口调用成功!返回数据如下:"
)
for
record
in
data
.
get
(
"data"
,
[]):
print
(
record
)
else
:
print
(
f
"接口请求失败,状态码:{response.status_code}"
)
print
(
"响应内容:"
,
response
.
text
)
except
Exception
as
e
:
print
(
"请求出错:"
,
e
)
if
__name__
==
'__main__'
:
with
app
.
app_context
():
db
.
create_all
()
app
.
run
(
host
=
'0.0.0.0'
,
port
=
5000
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment