Featured image of post Python笔记

Python笔记

一、参考博客

5个有趣的Python脚本

Python 进阶教程笔记

Python常用模块大全(总结)

二、Python2 与Python3共存环境下的依赖库管理工具pip的安装

1、 弄清楚默认python环境版本号

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# python 版本及默认环境
[linux@ ~]# python
python     python2    python2.7  python3    python3.8  python_bk  pythontex  
[linux@ ~]# python --version
Python 3.8.10
[linux@ ~]# python3 --version
Python 3.8.10
[linux@ ~]# python3.8 --version
Python 3.8.10
[linux@ ~]# ll /usr/bin/python*
lrwxrwxrwx 1 root root      18 Jul 23  2020 /usr/bin/python -> /usr/bin/python3.8*
lrwxrwxrwx 1 root root       9 Apr 16  2018 /usr/bin/python2 -> python2.7*
-rwxr-xr-x 1 root root 3674216 Mar  8 21:02 /usr/bin/python2.7*
lrwxrwxrwx 1 root root       9 Mar 13  2020 /usr/bin/python3 -> python3.8*
-rwxr-xr-x 1 root root 5490352 Jun  2 18:49 /usr/bin/python3.8*
lrwxrwxrwx 1 root root       9 Apr 16  2018 /usr/bin/python_bk -> python2.7*
lrwxrwxrwx 1 root root      58 Feb 18  2020 /usr/bin/pythontex -> ../share/texlive/texmf-dist/scripts/pythontex/pythontex.py*
[linux@ ~]# 

注: 此OS下的默认Python 为 Python3

2、pip包管理工具安装

1
2
3
4
5
6
7
# 1. pip2 (For python2 env) 安装
[linux@ ~]# wget https://bootstrap.pypa.io/pip/2.7/get-pip.py
[linux@ ~]# sudo python2 get-pip.py 

# 2. pip3 (For python3 env)安装
[linux@ ~]# wget https://bootstrap.pypa.io/get-pip.py
[linux@ ~]# sudo python3 get-pip.py 

3、 查看pip

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# pip 工具版本及默认环境
[linux@ ~]# pip
pip     pip2    pip2.7  pip3    
[linux@ ~]# which pip	# 默认的pip环境为 pip3 (For python3 env)
/usr/local/bin/pip
[linux@ ~]# pip --version
pip 20.0.2 from /usr/lib/python3/dist-packages/pip (python 3.8)
[linux@ ~]# which pip2    
/usr/local/bin/pip2
[linux@ ~]# pip2 --version
pip 20.3.4 from /usr/local/lib/python2.7/dist-packages/pip (python 2.7)
[linux@ ~]# which pip2.7  
/usr/local/bin/pip2.7
[linux@ ~]# pip2.7 --version
pip 20.3.4 from /usr/local/lib/python2.7/dist-packages/pip (python 2.7)
[linux@ ~]# which pip3      
/usr/bin/pip3
[linux@ ~]# pip3 --version
pip 20.0.2 from /usr/lib/python3/dist-packages/pip (python 3.8)
[linux@ ~]# ll /usr/local/bin/pip*
-rwxr-xr-x 1 root root 221 Jul 21 09:56 /usr/local/bin/pip*
-rwxr-xr-x 1 root root 221 Jul 21 09:56 /usr/local/bin/pip2*
-rwxr-xr-x 1 root root 221 Jul 21 09:56 /usr/local/bin/pip2.7*
[linux@ ~]# 

4、pip 安装依赖包

  • 安装Python2环境的依赖包
1
pip2 install requests
  • 安装Python3环境的依赖包
1
2
pip install  xxx
pip3 install xxx

三、创建/使用独立的Python虚拟开发环境

参考博客:

Python3 新创建/使用独立的Python虚拟开发环境

  1. 创建独立的虚拟开发环境
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 在选定目录下打开命令行shell, 执行如下命令创建项目开发的独立的python虚拟环境:
PS D:\pythonworkspace>python -m venv venv

# 新创建的python虚拟环境目录结构如下
mubei@MUBEI:/mnt/d/pythonworkspace$ tree venv -L 2
venv
├── Include
├── Lib
   └── site-packages
├── Scripts
   ├── Activate.ps1
   ├── activate
   ├── activate.bat
   ├── deactivate.bat
   ├── pip.exe
   ├── pip3.10.exe
   ├── pip3.exe
   ├── python.exe
   └── pythonw.exe
└── pyvenv.cfg

4 directories, 10 files
  1. 使用新创建的python虚拟环境
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# Windows/Linux命令行执行activate 进入刚才新创建的python虚拟环境, 
PS D:\pythonworkspace> .\venv\Scripts\activate
# 如下, 命令行以 (venv) 开头即表示当前使用的是python虚拟环境
(venv) PS D:\pythonworkspace>
# Mac 命令行执行activate 进入刚才新创建的python虚拟环境, 
192:MubeiSpaces mac$ source venv/bin/activate
# 如下, 命令行以 (venv) 开头即表示当前使用的是python虚拟环境
(venv) 192:MubeiSpaces mac$ 

# 第一次进入新创建的python虚拟环境时,通常需要更新一下pip到最新版本
(venv) PS D:\pythonworkspace> python -m pip install --upgrade pip
# 使用 deactivate  命令退出下的python虚拟环境
(venv) 192:MubeiSpaces mac$ deactivate 
192:MubeiSpaces mac$ 

四、Python 依赖包管理

参考博客:

使用方法:

  1. 使用pip freeze方法导出当前Python环境下全部的第三方模块
1
2
# 1. 使用pip的freeze功能,可以一次性快捷的将整个Python环境下的第三方模块全部记录下来: 
pip freeze > requirements.txt
  1. 使用pipreqs第三方库导出当前Python项目(工程)所依赖的第三方模块(推荐)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 使用 pipreqs 可以自动检索到当前项目下的所有组件及其版本,并生成 requirements.txt 文件,
# 极大方便了项目迁移和部署的包管理。相比直接用pip freeze 命令,能直接隔离其它项目所依赖的第三方模块。

# 1、先安装pipreqs库
pip install pipreqs

# 2、在当前目录使用生成
pipreqs ./ --encoding=utf8 --force 
# --encoding=utf8 :为使用utf8编码
# --force :强制执行,当 生成目录下的requirements.txt存在时覆盖 
# . /: 在哪个文件生成requirements.txt 文件  
  1. requirements.txt依赖包下载/安装
1
2
3
4
5
# 1. 将requirements.txt文件里列出的第三方库安装到当前Python环境中:
pip install -r requirements.txt

# 2. 将requirements.txt文件里列出的第三方库下载到当前目录中, 命令格式:pip3 download -d [保存目录] -r [导出结果文件]
pip download -d ./ -r requirements.txt

五、Python3 + Django 开发环境搭建

  1. 安装指定版本Django 开发环境
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 安装django开发框架
(venv) PS D:\pythonworkspace> pip install django==3.2.10
Collecting django==3.2.10
  Downloading Django-3.2.10-py3-none-any.whl (7.9 MB)
     |████████████████████████████████| 7.9 MB 26 kB/s
Collecting sqlparse>=0.2.2
  Using cached sqlparse-0.4.2-py3-none-any.whl (42 kB)
Collecting pytz
  Using cached pytz-2021.3-py2.py3-none-any.whl (503 kB)
Collecting asgiref<4,>=3.3.2
  Using cached asgiref-3.4.1-py3-none-any.whl (25 kB)
Installing collected packages: sqlparse, pytz, asgiref, django
Successfully installed asgiref-3.4.1 django-3.2.10 pytz-2021.3 sqlparse-0.4.2

# 安装完成后在python 命令行工具查看django版本号以检验是否安装重构
# 如下可以发现,在虚拟环境中已经成功安装好了Django 
(venv) PS D:\pythonworkspace> python.exe
Python 3.10.0 (tags/v3.10.0:b494f59, Oct  4 2021, 19:00:18) [MSC v.1929 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import django
>>> print(django.get_version())
3.2.10
>>> quit()
(venv) PS D:\pythonworkspace> django-admin.exe --version
3.2.10
(venv) PS D:\pythonworkspace>
  1. 创建django项目
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# 创建django框架的项目 site_cas
(venv) PS D:\pythonworkspace> django-admin.exe startproject site_cas
# 新创建的site_cas项目的目录结构
mubei@MUBEI:/mnt/d/pythonworkspace$ tree site_cas/
site_cas/
├── manage.py
└── site_cas
    ├── __init__.py
    ├── asgi.py
    ├── settings.py
    ├── urls.py
    └── wsgi.py

1 directory, 6 files
  1. 执行同步数据库文件
  • Django默认数据库为db.sqlite3,执行同步命令如下
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 
(venv) PS D:\pythonworkspace> cd .\site_cas\
(venv) PS D:\pythonworkspace\site_cas> python manage.py migrate
Operations to perform:
  Apply all migrations: admin, auth, contenttypes, sessions
Running migrations:
  Applying contenttypes.0001_initial... OK
  Applying auth.0001_initial... OK
  Applying admin.0001_initial... OK
  Applying admin.0002_logentry_remove_auto_add... OK
  Applying admin.0003_logentry_add_action_flag_choices... OK
  Applying contenttypes.0002_remove_content_type_name... OK
  Applying auth.0002_alter_permission_name_max_length... OK
  Applying auth.0003_alter_user_email_max_length... OK
  Applying auth.0004_alter_user_username_opts... OK
  Applying auth.0005_alter_user_last_login_null... OK
  Applying auth.0006_require_contenttypes_0002... OK
  Applying auth.0007_alter_validators_add_error_messages... OK
  Applying auth.0008_alter_user_username_max_length... OK
  Applying auth.0009_alter_user_last_name_max_length... OK
  Applying auth.0010_alter_group_name_max_length... OK
  Applying auth.0011_update_proxy_permissions... OK
  Applying auth.0012_alter_user_first_name_max_length... OK
  Applying sessions.0001_initial... OK
(venv) PS D:\pythonworkspace\site_cas>

此时 运行Django Server,如果能正常访问,过程没有报错,说明数据库同步已经成功了

  • 将Django默认数据库(db.sqlite3)更换为Mysql
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# step 1: 在MySQL中创建Django使用的数据库,数据库取名为site_cas_db,并设置字符集为utf-8
mysql> CREATE DATABASE site_cas_db CHARACTER SET utf8;
Query OK, 1 row affected (0.01 sec)

# step 2:为项目操作MySQL DB所依赖安装myslqclient库
(venv) PS D:\pythonworkspace\site_cas> pip install mysqlclient
Collecting mysqlclient
  Using cached mysqlclient-2.1.0-cp310-cp310-win_amd64.whl (180 kB)
Installing collected packages: mysqlclient
Successfully installed mysqlclient-2.1.0
(venv) PS D:\pythonworkspace\site_cas>

# step3: site_cas项目下的settings.py文件配置如下MySQL数据库引擎。
DATABASES = {
    'default': {
        # db.sqlite3 数据库引擎配置
        # 'ENGINE': 'django.db.backends.sqlite3',
        # 'NAME': BASE_DIR / 'db.sqlite3',

        # MySQL 数据库引擎配置
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'site_cas_db',
        'USER': 'root',
        'PASSWORD': '33557799,
        'HOST': '127.0.0.1',
    }
}

#  step4: 从新执行同步操作,将db.sqlite3数据迁移到MySQL
(venv) PS D:\pythonworkspace\site_cas> python manage.py migrate
Operations to perform:
  Apply all migrations: admin, auth, contenttypes, sessions
Running migrations:
  Applying contenttypes.0001_initial... OK
  Applying auth.0001_initial... OK
  Applying admin.0001_initial... OK
  Applying admin.0002_logentry_remove_auto_add... OK
  Applying admin.0003_logentry_add_action_flag_choices... OK
  Applying contenttypes.0002_remove_content_type_name... OK
  Applying auth.0002_alter_permission_name_max_length... OK
  Applying auth.0003_alter_user_email_max_length... OK
  Applying auth.0004_alter_user_username_opts... OK
  Applying auth.0005_alter_user_last_login_null... OK
  Applying auth.0006_require_contenttypes_0002... OK
  Applying auth.0007_alter_validators_add_error_messages... OK
  Applying auth.0008_alter_user_username_max_length... OK
  Applying auth.0009_alter_user_last_name_max_length... OK
  Applying auth.0010_alter_group_name_max_length... OK
  Applying auth.0011_update_proxy_permissions... OK
  Applying auth.0012_alter_user_first_name_max_length... OK
  Applying sessions.0001_initial... OK
(venv) PS D:\pythonworkspace\site_cas>

# 执行同步后,MySQL DB中查询会创建如下表
mysql> use site_cas_db;
Database changed
mysql> show tables;
+----------------------------+
| Tables_in_site_cas_db      |
+----------------------------+
| auth_group                 |
| auth_group_permissions     |
| auth_permission            |
| auth_user                  |
| auth_user_groups           |
| auth_user_user_permissions |
| django_admin_log           |
| django_content_type        |
| django_migrations          |
| django_session             |
+----------------------------+
10 rows in set (0.03 sec)

mysql> 

重新运行Django Server,如果能正常访问,过程没有报错,说明切换数据库已经成功了

  1. 启动Django Server
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# 启动Django Server
(venv) PS D:\pythonworkspace\site_cas> python manage.py runserver 0.0.0.0:8000
Watching for file changes with StatReloader
Performing system checks...

System check identified no issues (0 silenced).
January 04, 2022 - 16:08:07
Django version 3.2.10, using settings 'site_cas.settings'
Starting development server at http://0.0.0.0:8000/
Quit the server with CTRL-BREAK.

打开浏览器访问http://localhost:8000/

  1. 安装djangorestframework相关依赖库(后称 DRF)
  • 前后端分离中涉及到一个重要的概念:序列化。Django 有一个非常优秀的库 djangorestframework(后称 DRF)。它可以帮我们封装好序列化的底层实现,让开发者专注于业务本身。
  • 安装 DRF 及其他依赖库:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 安装djangorestframework
(venv) PS D:\pythonworkspace> pip install djangorestframework
Collecting djangorestframework
  Using cached djangorestframework-3.13.1-py3-none-any.whl (958 kB)
Requirement already satisfied: django>=2.2 in d:\pythonworkspace\venv\lib\site-packages (from djangorestframework) (3.2.10)
Requirement already satisfied: pytz in d:\pythonworkspace\venv\lib\site-packages (from djangorestframework) (2021.3)
Requirement already satisfied: asgiref<4,>=3.3.2 in d:\pythonworkspace\venv\lib\site-packages (from django>=2.2->djangorestframework) (3.4.1)
Requirement already satisfied: sqlparse>=0.2.2 in d:\pythonworkspace\venv\lib\site-packages (from django>=2.2->djangorestframework) (0.4.2)
Installing collected packages: djangorestframework
Successfully installed djangorestframework-3.13.1

# 安装markdown
(venv) PS D:\pythonworkspace> pip install markdown
Collecting markdown
  Downloading Markdown-3.3.6-py3-none-any.whl (97 kB)
     |████████████████████████████████| 97 kB 348 kB/s
Installing collected packages: markdown
Successfully installed markdown-3.3.6

# 安装django-filter
(venv) PS D:\pythonworkspace> pip install django-filter
Collecting django-filter
  Downloading django_filter-21.1-py3-none-any.whl (81 kB)
     |████████████████████████████████| 81 kB 209 kB/s
Requirement already satisfied: Django>=2.2 in d:\pythonworkspace\venv\lib\site-packages (from django-filter) (3.2.10)
Requirement already satisfied: asgiref<4,>=3.3.2 in d:\pythonworkspace\venv\lib\site-packages (from Django>=2.2->django-filter) (3.4.1)
Requirement already satisfied: sqlparse>=0.2.2 in d:\pythonworkspace\venv\lib\site-packages (from Django>=2.2->django-filter) (0.4.2)
Requirement already satisfied: pytz in d:\pythonworkspace\venv\lib\site-packages (from Django>=2.2->django-filter) (2021.3)
Installing collected packages: django-filter
Successfully installed django-filter-21.1
(venv) PS D:\pythonworkspace>
  1. 后端Django项目实现登录API
  • 建一个新的app
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    
    # authentication authorization区别
    (venv) PS D:\pythonworkspace\site_cas> django-admin.exe startapp authentication
    mubei@MUBEI:/mnt/d/pythonworkspace/site_cas$ tree authentication/
    authentication/
    ├── __init__.py
    ├── admin.py
    ├── apps.py
    ├── migrations
       └── __init__.py
    ├── models.py
    ├── tests.py
    └── views.py
    
    1 directory, 7 files
    

六、pymysql 库安装

python 环境:

1
2
echo "pymysql" > requestments.txt
pip install pymysql==0.10.1

七、python之json格式

八、Python 模块 signal

九、Python matplotlib画图库

  • 包安装

pip install -I pyparsing pip install matplotlib

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# step 1
import matplotlib 
print(matplotlib.matplotlib_fname())  # 结果 找到 matplotlibrc 文件

# step 2 
# 打开 matplotlibrc 配置文件,
# 找到以 #font.sans-serif 开头的那行,去掉#,并添加 SimHei 在该参数中字体也可以改成别的
# 找到以  #font.family 开头的那行,去掉#
# 保存并退出

# step 3
# 载需要添加的字体的 ttf 文件,SimHei 的下载地址:https://www.fontpalace.com/font-details/SimHei/ 
# 或者 使用 notes/other/SimHei.ttf
# 将下载的ttf字体文件放置在 matplotlib 包文件夹下的 site-packages/matplotlib/mpl-data/fonts/ttf 中(也就是前面matplotlibrc 配置文件所在的文件夹的 /fonts/ttf 中)。

# step 4
# 改好上述配置之后如果还不生效,需要重新加载字体,在Python中运行如下代码即可:
from matplotlib.font_manager import _rebuild
_rebuild()
  • matplotlib 作图示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# _*_coding:utf-8_*_

import sys
reload(sys)
sys.setdefaultencoding('utf8')

import matplotlib
matplotlib.use('agg')
#在服务器端等没有图形界面的时候需要加上这句
import matplotlib.pyplot as plt
plt.switch_backend('agg')

# from matplotlib.font_manager import _rebuild
# _rebuild() 

# 这两行代码解决 plt 中文显示的问题
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

# 输入纵坐标轴数据与横坐标轴数据
gdp_rate = [9.4, 10.6, 9.6, 7.9, 7.8, 7.3, 6.9, 6.7, 6.8, 6.6]
first_industry_rate = [4.0, 4.3, 4.2, 4.50, 3.8, 4.1, 3.9, 3.3, 4.0, 3.5]
second_industry_rate = [10.3, 12.7, 10.7, 8.4, 8.0, 7.4, 6.2, 6.3, 5.9, 5.8]
third_industry_rate = [9.6, 9.7, 9.5, 8.0, 8.3, 7.8, 8.2, 7.7, 7.9, 7.6]
years = [2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018]

# 4 个 plot 函数画出 4 条线,线形为折线,每条线对应各自的标签 label
# plt.plot(years, gdp_rate, '.-', label='GDP增长率'.encode(encoding='UTF-8'))
plt.plot(years, gdp_rate, '.-', label='GDP INC')
plt.plot(years, first_industry_rate, '.-', label='第一产业增长率')
plt.plot(years, second_industry_rate, '.-', label='第二产业增长率')
plt.plot(years, third_industry_rate, '.-', label='第三产业增长率')

plt.xticks(years)  # 设置横坐标刻度为给定的年份
plt.xlabel('年份') # 设置横坐标轴标题
plt.legend() # 显示图例,即每条线对应 label 中的内容
plt.savefig("image_1.png")
plt.show() # 显示图形

十、Python 使用socket实现http服务器

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# -*- coding: utf-8 -*-
# webhook.py       
import json
import datetime

import urllib
import requests
# import urlparse

import socket
from multiprocessing import Process

import signal

# json 格式化
# json.dumps(json_str,sort_keys=True,indent=4)

PromQL = '((sum by(path)(increase(bedrock_requests_total{hasError="true",job="bedrock-auth",namespace="default"}[5m])))/(sum by(path)(increase(bedrock_requests_total{job="bedrock-auth",namespace="default"}[5m])))) or ((sum by(path)(increase(bedrock_requests_total{job="bedrock-auth",namespace="default",statusCode!~"200|0"}[5m])))/(sum by(path)(increase(bedrock_requests_total{job="bedrock-auth",namespace="default"}[5m]))))>0.2'

alarm_name = "BDC-TKE-TPS-Bedrock请求错误率过高"
alarm_docs = "bedrock下\"/sceniclive/update\"接口的错误率持续5分钟高于20%"
alarm_time = "2021-06-22 09:08:23"

class AlarmRequest:
    def __init__(self, r):
        self.content = r
        self.method = self.content.split()[0]
        self.url = self.content.split()[1]
        self.path = self.url
        self.query = {}
        self.headers = self.parse_headers()

        self.parse_path()
 
    def parse_path(self):
        index = self.url.find('?')
        if index != -1:
            self.path, query_string = self.path.split('?', 1)
            self.query = self._parse_parameter(query_string)

    def parse_headers(self):
        header_content = self.content.split('\r\n\r\n', 1)[0].split('\r\n')[1:]
        result = {}
        for line in header_content:
            k, v = line.split(': ')
            print ("Header: %s -> %s") % (k, v)
            result[urllib.quote(k)] = urllib.quote(v)
        return result

    def get_headers(self):
        return self.headers
    
    def get_path(self):
        return self.path
    
    def get_query(self):
        return self.query

    def get_body(self):
        body = self.content.split('\r\n\r\n', 1)[1]
        req_data = {}
        if self.headers["Content-Type"].lower() == "application/json":
            req_data = json.loads(body.strip())

        return req_data
 
    def dumps_body(self):
        return json.dumps(self.parse_body(), sort_keys=True, indent=4)

    @staticmethod
    def _parse_parameter(parameters):
        args = parameters.split('&')
        query = {}
        for arg in args:
            k, v = arg.split('=')
            query[k] = unquote(v)
        return query
 

class Alarm:
    def __init__(self):
        self.weixin_token = "71962233-c5c5-45a4-8629-56a6d2463773"
        self.grafana_url = "http://172.18.100.19/explore"
        self.content_tmp = '''您好,您TKE集群下的告警策略(策略名:%s)触发告警!
告警描述: %s
触发时间: 最近于%s发现异常
可以登录VPN后在:【[大数据监控服务(Grafana)](%s)】系统上查看详情.'''
    

    # 构造 Grafana 页面查询地址, 包括查询语句与相关参数
    def create_full_addr(self, url, promql):
        req_tmp = {"exemplar":False,"expr":str(promql),"interval":"5","instant":False,"range":True}
        left_argv = '["now-6h","now","Prometheus",%s]' % json.dumps(req_tmp, separators=(',', ':'))
    
        return ("%s%s%s") % (url, "?left=", urllib.quote(left_argv))
       
    
    def work_wechat_send_message(self, message, token):
        work_weixin_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send"
        headers = {
            'Content-Type': "application/json",
            'cache-control': "no-cache",
            'Postman-Token': token
        }
        querystring = {"key": token}
    
        markdown_msg = {
            "msgtype": "markdown",
            "markdown": {
                "content": message
            }
        }
    
        print json.dumps(markdown_msg)
        print requests.request("POST", work_weixin_url, data=json.dumps(markdown_msg), headers=headers, params=querystring)
    
    
    def process_alarm(self, http_data):
        # msg = ""
        # while True:
        #     try:
        #           recv_str = conn.recv(1024)
        #         if len(recv_str)>0:
        #             msg = msg + recv_str
        #     except ConnectionResetError as e:
        #         print(e)
        #         print "conn.close() running..." 
        #         conn.close()
        #         break
    
        print "recv msg:" + http_data
    
        req_ctx = AlarmRequest(http_data)
        req_data = req_ctx.get_body()
     
        grafana_addr = self.create_full_addr(self.grafana_url, PromQL)
        send_msg = self.content_tmp % (alarm_name, alarm_docs, alarm_time, grafana_addr)
        self.work_wechat_send_message(send_msg, self.weixin_token)
        print "process_alarm(conn) run over"

def process_handler(conn):
    req_msg = conn.recv(20480)
    conn.close()
    # print "recv msg:" + req_msg

    Alarm().process_alarm(req_msg)

if __name__ == '__main__':
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 设置关闭socket端口立即释放
    server_socket.bind(("", 1280))
    server_socket.listen(64)

    while True:
        print("New connect comming...")
        conn, addr = server_socket.accept()
        # print "req addr: " + addr
        Process(target=process_handler, args=(conn, )).start()

    print "conn.close() running over..." 
    conn.close()
    server_socket.close()

十一、Python 发送电子邮件

  1. 发送普通heml纯文本邮件
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
# -*- coding: utf-8 -*-
import smtplib, time, os
from email.mime.text import MIMEText
from email.header import Header

# 发送html内容的邮件
mail_body = '''<!DOCTYPE html>
<html>

<head>
<style>
body {
    margin:0;
    }
#header {
    background-color:black;
    color:white;
    text-align:center;
    padding:1px;
}
#nav {
    line-height:30px;
    background-color:#eeeeee;
    height:100px;
    width:100px;
    float:left;
    padding:5px;          
}
#section {
    width:350px;
    float:left;
    padding:10px;          
}
#footer {
    background-color:black;
    color:white;
    clear:both;
    text-align:center;
   padding:5px;          
}
</style>
</head>

<body>

<div id="header">
<h4>中国</h4>
</div>

<div id="nav">
北京<br>
上海<br>
深圳<br>
</div>

<div id="section">
<h2>上海</h2>
<p>
上海市总面积6340平方公里,辖16个市辖区,属亚热带湿润季风气候,四季分明,日照充分,雨量充沛。
</p>
<p>
上海,简称“沪”或“申”,是gcd的诞生地,中华人民共和国直辖市,国家中心城市,超大城市,国际经济、金融、贸易、航运、科技创新中心[1]  ,首批沿海开放城市。上海地处长江入海口,是长江经济带的龙头城市,隔东中国海与日本九州岛相望,南濒杭州湾,北、西与江苏、浙江两省相接。

</p>
</div>

<div id="footer">
www.example.com
</div>

</body>
</html>
'''


class Mail(object):
    def __init__(self, mail_config):
        self.mail_conf = mail_config
        # 发送邮箱服务器
        self.smtp_server = self.mail_conf.get('smtp_server', 'smtp.mail.com')
        # 发送邮箱
        self.sender = self.mail_conf.get('sender', 'sender@mail.com')
        # 接收邮箱
        self.receiver = self.mail_conf.get('receiver', 'receiver@mail.com')
        # 发送邮件主题
        self.subject = self.mail_conf.get('title', '自动化测试结果')
        # 发送邮箱用户/密码
        self.user_name = self.mail_conf.get('user_name', 'sender@mail.com')
        self.password = self.mail_conf.get('password', '123456')
    
    # 发送html内容邮件
    def send_mail_html(self, html_mail_body):
        # 组装邮件内容和标题,中文需参数‘utf-8’,单字节字符不需要
        msg = MIMEText(html_mail_body, _subtype='html', _charset='utf-8')
        msg['Subject'] = Header(self.subject, 'utf-8')
        msg['From'] = self.sender
        msg['To'] = self.receiver
        # 登录并发送邮件
        try:
            smtp = smtplib.SMTP()
            smtp.connect(self.smtp_server)
            smtp.login(self.user_name, self.password)
            smtp.sendmail(self.sender, self.receiver, msg.as_string())
        except:
            print("邮件发送失败!")
        else:
            print("邮件发送成功!")
        finally:
            smtp.quit()
    
if __name__ == '__main__':
    Mail({}).send_mail_html(mail_body)  # 发送html内容邮件
  1. 发送带图片类的Html 邮件

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    
    # -*- coding: utf-8 -*-
    
    import smtplib
    from email.header import Header
    from email.mime.multipart import MIMEMultipart
    from email.mime.text import MIMEText
    from email.MIMEImage import MIMEImage
    # import pandas as pd
    # import numpy as np
    # import datetime
    
    def SendMail():
        smtpserver = 'smtp.exmail.qq.com'
        sender = 'testmail@testmail.com'
        #receiver可设置多个,使用“,”分隔
        receiver = 'testmail@testmail.com'
        #抄送
        ccs = ''
        username = 'testmail@testmail.com'
        password = '1234567'
    
        msg = MIMEMultipart()
        #html格式内容
        body_prefix = """
        #标题 粗体 大写
        <h3>Hi,all</h3>
        <p>****</p>
        <p>****</p>
        """
    
        body_end = """
        <p>
        #图片2:<br>
        <img src="cid:1"><br>
    
        #图片2:<br>
        <img src="cid:2"></br> 
        </p>
        """
    
        #组装邮件内容的html
        body = body_prefix + body_end
        mail_body = MIMEText(body, _subtype='html', _charset='utf-8')
    
        #邮件名
        msg['Subject'] = Header("****", 'utf-8')
        msg['From'] = sender
        toclause = receiver.split(',')
        cc = ccs.split(',')
        msg['To'] = ",".join(toclause)
        msg['cc'] = ','.join(cc)
        msg.attach(mail_body)
    
        fp = open("./image_1.png", 'rb')
        images = MIMEImage(fp.read())
        fp.close()
        images.add_header('Content-ID', '<1>')
        msg.attach(images)
    
        fp = open("./image_1.png", 'rb')
        images = MIMEImage(fp.read())
        fp.close()
        images.add_header('Content-ID', '<2>')
        msg.attach(images)
    
        # 登陆并发送邮件
        try:
            smtp = smtplib.SMTP()
            #打开调试模式
            #smtp.set_debuglevel(1)
            smtp.connect(smtpserver)
            smtp.login(username, password)
            smtp.sendmail(sender, toclause, msg.as_string())
        except:
            print("邮件发送失败!!")
            return 1
        else:
            print("邮件发送成功")
            return 0
        finally:
            smtp.quit()
            return 0
    
    
    if __name__ == '__main__':
        SendMail()
    

十二、Python实现带签名的http请求

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# -*- coding:utf-8 -*-

import requests
import json
import time
import hmac
import hashlib
import base64

# 以下为POST请求

# 请求的body(json格式)字符串数据
data_json = {
    "opt_test_flag ": 1,
    "req_hotel_name": "如家酒店"
}
data = json.dumps(data_json)

# 请求地址及path
path = '/passenger/order/report'

url = "http://127.0.0.1" + path + "?"

# 计算签名的秘钥key
app_secret = "TFGfk583D6Z5XrmlKrzU5va9ltnop4GK"

# 设置请求头,标明body数据为json格式
headers = {'content-type': 'application/json'}

# url里的请求参数
pragrams = {"app_key": "HO98Kd7Bhraoxz6G", "time": str(int(time.time()))}

# 构造计算签名的URL(不包括http 及 host信息),只包含path开始的除去sign参数的的信息(注意对参数排序)
content = path + "?"

# 对URL里的参数按key进行升序排序够着为/req_path?key1=value1&key2=value2...的格式
items = pragrams.items()
items.sort()
for key, value in items:
    content += key + "=" + value + "&"  # 用于计算签名, 不包含http 和 host 信息
    url += key + "=" + value + "&"      # 用于发起请求的url, 包含http 和 host 等信息

# 示例1: 使用原始字符串body发起请求
content1 = content.strip("&") + data  # 在url 的尾部追加上body字符串数据

# 使用hmac.sha256 算法对 content1 计算计算签名
signature = hmac.new(app_secret, content1, digestmod=hashlib.sha256).digest()

# 把计算出来的签名按base64编码后作为sign参数追加到url里
url1 = url + "sign=" + base64.urlsafe_b64encode(signature)
print url1
# 发起http post请求,注意此处传入的body data 数据(字符串)需要与计算签名时的body 数据(字符串)完全一致,否则计算签名将不一致
response1 = requests.post(url1, headers = headers, data = data)

print response1.text

# 示例2: 请求body为json格式数据
data1 = json.loads(data)  # 先把请求的body字符串转换为json格式数据

json_data = json.dumps(data1)  # 把json格式数据data1 转为字符串格式的数据

content2 = content.strip("&") + json_data  # 在url 的尾部追加上body字符串数据

# 使用hmac.sha256 算法对 content1 计算计算签名
signature2 = hmac.new(app_secret, content2, digestmod=hashlib.sha256).digest()

# 把计算出来的签名按base64 URL安全编码后作为sign参数追加到url里
url2 = url + "sign=" + base64.urlsafe_b64encode(signature2)
print url2
# 发起http post请求,注意此处传入的body json_data 数据(字符串)需要与计算签名时的body 数据(字符串)完全一致,否则计算签名将不一致
response2 = requests.post(url2, headers = headers, data = json_data)

print response2.text

十三、Python URL处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# -*- coding: utf-8 -*-
import datetime
import json

import requests
import urlparse
import urllib
import json

token_str="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
query = '((sum by(path)(increase(bedrock_requests_total{hasError="true",job="bedrock-auth",namespace="default"}[5m])))/(sum by(path)(increase(bedrock_requests_total{job="bedrock-auth",namespace="default"}[5m]))))\r\nor\r\n((sum by(path)(increase(bedrock_requests_total{job="bedrock-auth",namespace="default",statusCode!~"200|0"}[5m])))/(sum by(path)(increase(bedrock_requests_total{job="bedrock-auth",namespace="default"}[5m]))))>0.2'

dit_em = {"exemplar":False,"expr": query,"interval":"5","instant":False,"range":True}

left_argv = '["now-6h","now","Prometheus", %s]' % json.dumps(dit_em)



content_tmp='''【<font color=\"warning\"> 这是一条验证消息!</font>】
您好,您TKE集群下的告警策略(策略名:%s)触发告警!
告警描述:%s
触发时间:最近于%s发现异常
可以登录VPN后在【[大数据监控服务(Grafana)](%s)】系统上查看详情.

'''

def work_wechat_send_message():
    url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send"
    headers = {
        'Content-Type': "application/json",
        'cache-control': "no-cache",
        'Postman-Token': token_str
    }
    querystring = {"key": token_str}


    a = {
        "msgtype": "markdown",
        "markdown": {
            "content": "【<font color=\"warning\"> 这是一条验证消息  </font>】\n您好,您TKE集群下的告警策略(策略名:BDC-TKE-TPS-Bedrock请求错误率过高)触发告警!\n告警描述:bedrock下\"/sceniclive/update\"接口的错误率持续5分钟高于20%\n触发时间:最近于2021-06-18 17:08:23发现异常\n您可以登录VPN后在【<font color=\"warning\">[大数据监控服务(Grafana)](http://172.18.100.19/explore?left=%5B%22now-6h%22,%22now%22,%22Prometheus%22,%7B%22exemplar%22:false,%22expr%22:%22%28%28sum%20by%20%28path%29%20%28increase%28bedrock_requests_total%7BhasError%3D%5C%22true%5C%22,job%3D%5C%22bedrock-auth%5C%22,namespace%3D%5C%22default%5C%22%7D%5B5m%5D%29%29%2Fsum%20by%20%28path%29%20%28increase%28bedrock_requests_total%7Bjob%3D%5C%22bedrock-auth%5C%22,namespace%3D%5C%22default%5C%22%7D%5B5m%5D%29%29%29%5Cr%5Cn%5Cr%5Cnor%5Cr%5Cn%5Cr%5Cn%28sum%20by%20%28path%29%20%28increase%28bedrock_requests_total%7Bjob%3D%5C%22bedrock-auth%5C%22,namespace%3D%5C%22default%5C%22,statusCode!~%5C%22200%7C0%5C%22%7D%5B5m%5D%29%29%2Fsum%20by%20%28path%29%20%28%20increase%28bedrock_requests_total%7Bjob%3D%5C%22bedrock-auth%5C%22,namespace%3D%5C%22default%5C%22%7D%5B5m%5D%29%29%29%29*100%22,%22instant%22:false,%22range%22:true,%22interval%22:%225%22%7D%5D) </font>】系统查看详情"
            }
    }

    r1 = json.dumps(a)
    requests.request("POST", url, data=r1, headers=headers, params=querystring)


if __name__ == '__main__':
    #  work_wechat_send_message()
    print (left_argv)

    print "----------------------------------------"
    que_str = r'left=' + left_argv
    url = urlparse.urlunparse(('http','172.18.100.19','/explore','', que_str, ''))
    print url

    print "----------------------------------------"

    s0 = "http://172.18.100.19/explore?left=%5B%22now-6h%22,%22now%22,%22Prometheus%22,%7B%22exemplar%22:false,%22expr%22:%22%28%28sum%20by%28path%29%28increase%28bedrock_requests_total%7BhasError%3D%5C%22true%5C%22,job%3D%5C%22bedrock-auth%5C%22,namespace%3D%5C%22default%5C%22%7D%5B5m%5D%29%29%29%2F%28sum%20by%28path%29%28increase%28bedrock_requests_total%7Bjob%3D%5C%22bedrock-auth%5C%22,namespace%3D%5C%22default%5C%22%7D%5B5m%5D%29%29%29%29%5Cnor%5Cn%28%28sum%20by%28path%29%28increase%28bedrock_requests_total%7Bjob%3D%5C%22bedrock-auth%5C%22,namespace%3D%5C%22default%5C%22,statusCode!~%5C%22200%7C0%5C%22%7D%5B5m%5D%29%29%29%2F%28sum%20by%28path%29%28increase%28bedrock_requests_total%7Bjob%3D%5C%22bedrock-auth%5C%22,namespace%3D%5C%22default%5C%22%7D%5B5m%5D%29%29%29%29%3E0.2%22,%22interval%22:%225%22,%22instant%22:false,%22range%22:true%7D%5D"
    s1=urllib.unquote(s0)
    print s1

十四、Python 查看帮助手册

14.1 命令生成临时网址(较推荐)

命令行执行 python -m pydoc -p 0 命令,控制台,输入选择“b”,即可自动打开浏览器页面。

1
2
3
4
mac$ python3 -m pydoc -p 0
Server ready at http://localhost:50519/
Server commands: [b]rowser, [q]uit
server>

14.2 使用内置函数help()、dir()进行查询

首先需要进入Python解释器,即在*“»>”*下执行

1
2
3
4
5
mac$ python3 
Python 3.9.6 (default, Feb  3 2024, 15:58:28) 
[Clang 15.0.0 (clang-1500.3.9.4)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> help("open")

或者 使用内置函数help()、dir()进行查询

1
2
3
4
5
6
7
8
import Xmodule
# 查看[支持的]属性和方法
print(dir(Xmodule))

# 查看模块/包的帮助文档,或者模块对应方法的用法。
help("Xmodule")  # 模块/包名称,字符类型 - 需加引号。

help("Xmodule.repalce")  # 模块内方法

十五、Python 自带的HTTP 下载服务器

Python3 提供了简易的 HTTP 下载服务器,可以直接运行起来。在终端,输入下面这条命令,就可以启动一个 HTTP 当前目录及子目录的下载服务器:

1
2
python -m http.server [80]           # pyhton3中启动方式,开启的端口为80
python -m SimpleHTTPServer [8080]    # python2启动方式

端口号为可选参数。

启动成功后,会输出以下信息:

1
Serving HTTP on :: port 8000 (http://[::]:8000/) ...

提示 HTTP 服务器在本机8000端口运行,接着就可以在浏览器输入 http://127.0.0.1:8000 看到由这个服务器提供的网页。

这个 HTTP 服务器会把运行目录的所有文件列出来,并提供下载功能。

Licensed under CC BY-NC-SA 4.0