零碎知识

这里是 Python 的一些零碎知识点。

多值参数:

1
2
3
4
5
6
7
8
9
10
def demo(num, *args, **kwargs):
print(num)
print(args)
print(kwargs)

demo(1, 2, 3, 4, 5, name="小明", age = 18, gender = True)

# 1
# (2, 3, 4, 5)
# {'name': '小明', 'age': 18, 'gender': True}

另外,了解元组和字典的拆包(自行查找资料)。

if __name__ == "__main__" 是什么?

该语句下面缩进的内容在python xxx.py时会执行,而在import xxx时不会执行。

类属性,类似于 C++ 中的类的静态成员

类方法

1
2
3
@classmethod   # 装饰器
def show_tool_cnt(cls):
print("工具对象总数: %d" % cls.count)

单例模式

实现起来比 C++ 简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class MusicPlayer(object):
instance = None # 类属性

@classmethod
def __new__(cls, *args, **kwargs):
if cls.instance is None:
print("创建对象,分配空间")
cls.instance = super().__new__(cls)

return cls.instance

def __init__(self, music_name):
print("播放器初始化")
self.music_name = music_name


player1 = MusicPlayer('弯弯的月亮')
print(player1.music_name)

player2 = MusicPlayer('竹楼情歌')
print(player2.music_name)

print(player1)
print(player2)

'''
创建对象,分配空间
播放器初始化
弯弯的月亮
播放器初始化
竹楼情歌
<__main__.MusicPlayer object at 0x7f4c18f16790>
<__main__.MusicPlayer object at 0x7f4c18f16790>
'''

异常

观其大略

在实际开发中,为了能够处理复杂的异常情况,完整的异常语法如下(伪代码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
try:
# 尝试执行的代码
pass
except 错误类型 1:
# 针对错误类型 1,对应的代码处理
pass
except 错误类型 2:
# 针对错误类型 2,对应的代码处理
pass
except (错误类型 3, 错误类型 4):
# 针对错误类型 3 和 4,对应的代码处理
pass
except Exception as result:
# 打印错误信息
print(result)
else:
# 没有异常才会执行的代码
pass
finally:
# 无论是否有异常,都会执行的代码
print("无论是否有异常,都会执行的代码")

异常的传递

当函数执行出现异常,会将异常传递给函数的调用一方。如果传递到主程序,仍然没有异常处理,程序才会被终止。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def demo1():
return int(input("输入整数:"))

def demo2():
return demo1() # 利用异常的传递性,在主程序捕获异常

try:
print(demo2())
except Exception as result:
print("未知错误 %s" % result)

'''
输入整数:abc
未知错误 invalid literal for int() with base 10: 'abc'
'''

抛出 raise 异常

应用场景:
在开发中,除了代码执行出错 Python 解释器会抛出异常之外,还可以根据应用程序特有的业务需求主动抛出异常。

示例:
提示用户输入密码,如果长度少于 8,抛出异常。当前函数只负责提示用户输入密码,如果密码长度不正确,需要其他的函数进行额外处理,因此可以抛出异常,由其他需要处理的函数捕获异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def input_password():

pwd = input("请输入密码:")
if len(pwd) >= 8:
return pwd

print("主动抛出异常")
ex = Exception("密码长度不够")
raise ex

try:
print(input_password())
except Exception as result:
print(result)

'''
请输入密码:123
主动抛出异常
密码长度不够
'''

'''
请输入密码:1234234545
1234234545
'''

让自定义异常像通用异常一样使用(若链接内容不可访问,可去 archive.ph 查找):
https://python3-cookbook.readthedocs.io/zh-cn/latest/c14/p08_creating_custom_exceptions.html

模块和包

原则:每一个文件都可以被导入。

一个独立的 Python 文件就是一个模块。在导入文件时,文件中所有没有任何缩进的代码,都会被执行一遍。因此需要配合 __name__ 属性使用。

包是一个包含多个模块的特殊目录。 目录下有一个特殊的文件 __init__.py

在外界使用包中的模块:

86-1.png

如果希望自己开发的模块,分享给其他人,可以按照以下步骤操作:

  1. 制作发布压缩包
    1. 创建 setup.py
    2. 构建模块
    3. 生成发布压缩包
  2. 安装模块

创建 setup.py

86-2.png

构建模块:

1
python3 setup.py build

生成发布压缩包(注意:要制作哪个版本的模块,就使用哪个版本的解释器执行):

1
python3 setup.py sdist

安装模块(示例):

1
2
3
tar xf wd_message-1.0.tar.gz
cd wd_message-1.0
sudo python3 setup.py install

卸载模块(直接从安装目录下,把安装模块的目录删除就可以):

1
2
cd /usr/local/lib/python3.6/dist-packages/
sudo rm -r wd_message*

文件操作

没什么好记录的,大致和 Cpp 差不多。

展示目录:

1
2
3
4
5
6
7
8
9
10
11
import os

def dir_dfs(path, width):
file_list = os.listdir(path)
for filename in file_list:
print(' '*width + filename)
if os.path.isdir(path+'/'+filename):
dir_dfs(path+'/'+filename, width+4)

if __name__ == '__main__':
dir_dfs('.', 0)

手撕红黑树

之前文章里写过平衡二叉树(虽然没写完),一些旋转操作的思路是通的。

pygame

游戏素材在本小节末尾放出。

验证是否安装成功:

1
python -m pygame.examples.aliens

游戏中的坐标系:

86-3.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import pygame

pygame.init() # 使用其他模块之前,必须先 init

hero_rect = pygame.Rect(100, 500, 120, 125) # 描述矩形区域
print("英雄的原点 %d %d" % (hero_rect.x, hero_rect.y))
print("英雄的尺寸 %d %d" % (hero_rect.width, hero_rect.height))
# size 属性会返回矩形区域的 (宽, 高) 元组
print("%d %d" % hero_rect.size)

pygame.quit() # 卸载所有 pygame 模块

'''
pygame 2.6.0 (SDL 2.28.4, Python 3.12.1)
Hello from the pygame community. https://www.pygame.org/contribute.html
英雄的原点 100 500
英雄的尺寸 120 125
120 125
'''

初始化游戏显示窗口:

1
pygame.display.set_mode()

刷新屏幕内容显示:

1
pygame.display.update()

set_mode 方法:

  • 作用:创建游戏显示窗口
  • resolution 指定屏幕的 宽 和 高,默认创建的窗口大小和屏幕大小一致
  • flags 参数指定屏幕的附加选项,例如是否全屏等等,默认不需要传递
  • depth 参数表示颜色的位数,默认自动匹配
  • 返回值:暂时可以理解为游戏的屏幕,游戏的元素 都需要被绘制到游戏的屏幕上
  • 注意:必须使用变量记录 set_mode 方法的返回结果!因为:后续所有的图像绘制都基于这个返回结果
1
set_mode(resolution=(0,0), flags=0, depth=0) -> Surface

图像、游戏循环、游戏时钟、监听事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import pygame
pygame.init()

# 创建游戏的窗口 480 * 700
screen = pygame.display.set_mode((480, 700))

# 加载图像数据
bg = pygame.image.load("./images/background.png")
# blit 绘制图像
screen.blit(bg, (0,0))

# 主角小飞机
hero = pygame.image.load("./images/me1.png")
screen.blit(hero, (150,300))

# 更新屏幕
pygame.display.update()

# 创建时钟对象
clock = pygame.time.Clock()

# 记录小飞机的初始位置
hero_rect = pygame.Rect(150, 300, 102, 126)

while True:
# 指定代码执行的频率
clock.tick(60)

# 捕获事件
event_list = pygame.event.get()
if len(event_list) > 0:
print(event_list)

for event in event_list:
if event.type == pygame.QUIT:
print("游戏退出")
pygame.quit()
exit() # 直接终止当前正在执行的程序

hero_rect.y -= 1
if(hero_rect.bottom < 0):
hero_rect.y = 700

screen.blit(bg, (0,0))
screen.blit(hero, hero_rect)

pygame.display.update()

在刚刚完成的案例中,图像加载、位置变化、绘制图像都需要程序员编写代码分别处理,为了简化开发步骤,pygame 提供了两个类:

  • pygame.sprite.Sprite 存储图像数据 image 和 位置 rect 的对象
  • pygame.sprite.Group

精灵

  1. 在游戏开发中,通常把显示图像的对象叫做精灵 Sprite
  2. 精灵有两个重要的属性
    1. image 要显示的图像
    2. rect 图像要显示在屏幕的位置
  3. 默认的 update() 方法什么也没做。子类可以重写此方法,在每次刷新屏幕时,更新精灵位置
  4. 注意pygame.sprite.Sprite 并没有提供 image 和 rect 两个属性。需要程序员从 pygame.sprite.Sprite 派生子类,并在子类的初始化方法中,设置 image 和 rect 属性。

精灵组

  1. 一个精灵组可以包含多个精灵对象
  2. 调用精灵组对象的 update() 方法可以自动调用组内每一个精灵update() 方法
  3. 调用精灵组对象的 draw 方法可以将组内每一个精灵的 image 绘制在 rect 位置
  4. 注意:仍然需要调用 pygame.display.update() 才能在屏幕看到最终结果

背景交替滚动的实现思路:

86-4.png

pygame 提供了两个非常方便的方法实现碰撞检测:

  • pygame.sprite.groupcollide() 两个精灵组中所有精灵的碰撞检测
  • pygame.sprite.spritecollide() 判断某个精灵和指定精灵组中的精灵的碰撞
1
2
3
groupcollide(group1, group2, dokill1, dokill2, collided = None) -> Sprite_dict
# 如果将 dokill 设置为 True,则发生碰撞的精灵将被自动移除
# collided 参数是用于计算碰撞的回调函数,如果没有指定,则每个精灵必须有一个 rect 属性
1
2
3
4
spritecollide(sprite, group, dokill, collided = None) -> Sprite_list
# 如果将 dokill 设置为 True,则 指定精灵组 中 发生碰撞的精灵将被自动移除
# collided 参数是用于计算碰撞的回调函数,如果没有指定,则每个精灵必须有一个 rect 属性
# 返回 精灵组 中跟 精灵 发生碰撞的 精灵列表

整体的代码相当简洁,共两个代码文件和若干素材文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# plane_sprites.py
import random
import pygame

# 屏幕大小的常量对象
SCREEN_RECT = pygame.Rect(0, 0, 480, 700)
# 刷新的帧率
FRAME_PER_SEC = 15
# 创建敌机的定时器常量,为事件定义不同名字的常量,从而能够区分,从24算起
CREATE_ENEMY_EVENT = pygame.USEREVENT
# 英雄发射子弹事件,为事件定义不同名字的常量,从而能够区分
HERO_FIRE_EVENT = pygame.USEREVENT + 1


class GameSprite(pygame.sprite.Sprite):
"""飞机大战游戏精灵"""

def __init__(self, image_name, speed=1):
# 调用父类的初始化方法
super().__init__()

# 定义对象的属性
self.image = pygame.image.load(image_name)
self.rect = self.image.get_rect() # 自动获取图像的尺寸
self.speed = speed

def update(self):
# 在屏幕的垂直方向上移动
self.rect.y += self.speed


class Background(GameSprite):
"""游戏背景精灵"""

def __init__(self, is_alt=False):

# 1. 调用父类方法实现精灵的创建(image/rect/speed)
super().__init__("./images/background.png")

# 2. 判断是否是交替图像,如果是,需要设置初始位置
if is_alt:
self.rect.y = -self.rect.height

def update(self):

# 1. 调用父类的方法实现
super().update()

# 2. 判断是否移出屏幕,如果移出屏幕,将图像设置到屏幕的上方
if self.rect.y >= SCREEN_RECT.height:
self.rect.y = -self.rect.height


class Enemy(GameSprite):
"""敌机精灵"""

def __init__(self):
# 1. 调用父类方法,创建敌机精灵,同时指定敌机图片
super().__init__("./images/enemy1.png")

# 2. 指定敌机的初始随机速度 1 ~ 3
self.speed = random.randint(1, 3)

# 3. 指定敌机的初始随机位置
self.rect.bottom = 0

max_x = SCREEN_RECT.width - self.rect.width #减去自身宽度
self.rect.x = random.randint(0, max_x)

def update(self):
# 1. 调用父类方法,保持垂直方向的飞行
super().update()

# 2. 判断是否飞出屏幕,如果是,需要从精灵组删除敌机
if self.rect.y >= SCREEN_RECT.height:
# print("飞出屏幕,需要从精灵组删除...")
# kill方法可以将精灵从所有精灵组中移出,精灵就会被自动销毁
self.kill()

def __del__(self):
# print("敌机挂了 %s" % self.rect)
pass


class Hero(GameSprite):
"""英雄精灵"""

def __init__(self):

# 1. 调用父类方法,设置image&speed
super().__init__("./images/me1.png", 0)

# 2. 设置英雄的初始位置
self.rect.centerx = SCREEN_RECT.centerx
self.rect.bottom = SCREEN_RECT.bottom - 120

# 3. 创建子弹的精灵组
self.bullets = pygame.sprite.Group()

def update(self):

# 英雄在水平方向移动
self.rect.x += self.speed

# 控制英雄不能离开屏幕
if self.rect.x < 0:
self.rect.x = 0
elif self.rect.right > SCREEN_RECT.right:
self.rect.right = SCREEN_RECT.right

def fire(self):
print("发射子弹...")

for i in (0, 1, 2):
# 1. 创建子弹精灵
bullet = Bullet()

# 2. 设置精灵的位置
bullet.rect.bottom = self.rect.y - i * 20
bullet.rect.centerx = self.rect.centerx

# 3. 将精灵添加到精灵组
self.bullets.add(bullet)


class Bullet(GameSprite):
"""子弹精灵"""

def __init__(self):
# 调用父类方法,设置子弹图片,设置初始速度
super().__init__("./images/bullet1.png", -5)

def update(self):
# 调用父类方法,让子弹沿垂直方向飞行
super().update()

# 判断子弹是否飞出屏幕
if self.rect.bottom < 0:
self.kill()

def __del__(self):
print("子弹被销毁...")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# plane_main.py
import pygame
from plane_sprites import *
import time

class PlaneGame(object):
"""飞机大战主游戏"""

def __init__(self):
print("游戏初始化")

# 1. 创建游戏的窗口
self.screen = pygame.display.set_mode(SCREEN_RECT.size)
# 2. 创建游戏的时钟
self.clock = pygame.time.Clock()
# 3. 调用私有方法,精灵和精灵组的创建,也是初始化
self.__create_sprites()

# 4. 设置定时器事件 - 创建敌机 设定敌机的刷新时间为1s,
# 英雄子弹事件的刷新频率为0.5秒
pygame.time.set_timer(CREATE_ENEMY_EVENT, 1000)
pygame.time.set_timer(HERO_FIRE_EVENT, 300)

def __create_sprites(self):

# 创建背景精灵和精灵组
bg1 = Background()
bg2 = Background(True)

self.back_group = pygame.sprite.Group(bg1, bg2)

# 创建敌机的精灵组
self.enemy_group = pygame.sprite.Group()

# 创建英雄的精灵和精灵组
self.hero = Hero()
self.hero_group = pygame.sprite.Group(self.hero)

def start_game(self):
print("游戏开始...")

while True:
# 1. 设置刷新帧率
self.clock.tick(FRAME_PER_SEC)
# 2. 事件监听
self.__event_handler()
# 3. 碰撞检测
self.__check_collide()
# 4. 更新/绘制精灵组
self.__update_sprites()
# 5. 更新显示
pygame.display.update()

def __event_handler(self):

for event in pygame.event.get():

# 判断是否退出游戏
if event.type == pygame.QUIT:
PlaneGame.__game_over()

elif event.type == CREATE_ENEMY_EVENT:
# print("敌机出场...")
# 创建敌机精灵
enemy = Enemy()

# 将敌机精灵添加到敌机精灵组
self.enemy_group.add(enemy)
elif event.type == HERO_FIRE_EVENT:
self.hero.fire()
# elif event.type == pygame.KEYDOWN and event.key == pygame.K_RIGHT:
# print("向右移动...")

# 使用键盘提供的方法获取键盘按键 - 按键元组
keys_pressed = pygame.key.get_pressed()
# 判断元组中对应的按键索引值 1
if keys_pressed[pygame.K_RIGHT]:
self.hero.speed = 8
elif keys_pressed[pygame.K_LEFT]:
self.hero.speed = -8
else:
self.hero.speed = 0

def __check_collide(self):

# 1. 子弹摧毁敌机
pygame.sprite.groupcollide(self.hero.bullets, self.enemy_group, True, True)

# 2. 敌机撞毁英雄
enemies = pygame.sprite.spritecollide(self.hero, self.enemy_group, True)

# 判断列表时候有内容
if len(enemies) > 0:
# 让英雄牺牲
self.hero.kill()
m = "./sound/use_bomb.wav"
pygame.mixer.music.load(m)
pygame.mixer.music.play()
time.sleep(2)
# 结束游戏
PlaneGame.__game_over()

def __update_sprites(self):

self.back_group.update()
self.back_group.draw(self.screen)

self.enemy_group.update()
self.enemy_group.draw(self.screen)

self.hero_group.update()
self.hero_group.draw(self.screen)

self.hero.bullets.update()
self.hero.bullets.draw(self.screen)

@staticmethod
def __game_over():
print("游戏结束")

pygame.quit()
exit() #进程结束


if __name__ == '__main__':
# 创建游戏对象
pygame.init()
game = PlaneGame()

# 启动游戏
game.start_game()

其中,图片素材、音频素材被放在 images 和 sound 文件夹下,被组织在上面两份代码的同级目录下。

网络编程

Linux 命令

查看或配置网卡信息(ifconfig):

86-5.png

路由查看:route 可以查看路由。route -n

1
2
3
4
5
内核 IP 路由表
目标 网关 子网掩码
0.0.0.0 192.168.19.2 0.0.0.0
169.254.0.0 0.0.0.0 255.255.0.0
192.168.19.0 0.0.0.0 255.255.255.0

0.0.0.0 代表任意目的地,网关就是转发数据的设备。

怎样查看端口及谁使用了端口?

  • netstat -an 查看端口状态
  • sudo lsof -i [tcp/udp]:2425 必须是 root 才能查看
  • sudo lsof -i tcp:22 查看哪一个进程用了这个端口
  • ps -elf |grep udp_server 查看某个进程是否还在

UDP

UDP 通信流程:

86-6.png

port 的形象理解:

86-7.png

看一段代码:

1
2
3
4
5
6
7
8
9
10
# server
import socket

s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
addr = ('192.168.31.106', 7656)
s.bind(addr) # 失败直接抛出异常
temp = s.recvfrom(100) # 100 代表接的长度
print(temp[0])
print(temp[1])
s.close()

运行这段代码,然后在终端中:

1
2
~$ netstat -an|grep 7656
udp 0 0 192.168.31.106:7656 0.0.0.0:*

终止运行(则查找不到了):

1
~$ netstat -an|grep 7656

客户端:

1
2
3
4
5
6
7
# client
import socket

c = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
dest_addr = ('192.168.31.106', 7656)
c.sendto(b'hello', dest_addr) # 必须发送字节流
c.close()

在两个终端中操作:

1
2
3
4
python3 server.py
python3 client.py
b'hello'
('192.168.31.106', 33817)

发送中文、全双工:

1
2
3
4
5
6
7
8
9
10
import socket

s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
addr = ('192.168.31.106', 7656)
s.bind(addr) # 失败直接抛出异常
data, c_addr = s.recvfrom(100) # 100 代表接的长度
print(data.decode('utf8'))
print(c_addr)
s.sendto('??????'.encode('utf8'), c_addr)
s.close()
1
2
3
4
5
6
7
8
9
import socket

c = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
dest_addr = ('192.168.31.106', 7656)
# c.sendto(b'hello', dest_addr) # 必须发送字节流
c.sendto('zui & 罚'.encode('utf8'), dest_addr) # 必须发送字节流
data , _ = c.recvfrom(100) # _ 表示我们不想要
print(data.decode('utf8'))
c.close() # 关闭时端口会释放

终端:

1
2
3
$ python3 server.py 
zui & 罚
('192.168.31.106', 58158)
1
2
$ python3 client.py 
??????

当 UDP recvfrom 函数内填的大小,小于 client 发来的数据的大小时,Windows 会报错,Linux 会截断数据。

sendtorecvfrom 次数对等。

相关的命令总结:

  • 查看 ip ifconfig
  • 查看路由 route -n
  • 端口状态
    • netstat -an|grep 端口
    • 端口正在被哪个进程使用 sudo lsof -i udp:2000
  • 进程查看 ps -elf|grep 进程名字

TCP

可以查看之前的笔记。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# server
import socket

def tcp_server():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
addr = ('192.168.31.106', 2000)
s.bind(addr)
s.listen(128)
handle_c_fd, c_addr = s.accept()
print(c_addr)

# 接下来就可以进行 send, recv 操作
handle_c_fd.send('我是服务器'.encode('utf8'))
data = handle_c_fd.recv(100)
print(data.decode('utf8'))

handle_c_fd.close()
s.close()

if __name__ == "__main__":
tcp_server()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# client
import socket

def tcp_client():
c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
dest_addr = ('192.168.31.106', 2000)
c.connect(dest_addr)

data = c.recv(100)
print(data.decode('utf8'))
c.send('我是客户端'.encode('utf8'))

c.close()

if __name__ == "__main__":
tcp_client()

其实之前的笔记中,代码的变量命名容易造成误解,这里的更好。

演示 TCP 特性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# server
import socket

def tcp_server():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
addr = ('192.168.31.106', 2000)
s.bind(addr)
s.listen(128)
handle_c_fd, c_addr = s.accept()
print(c_addr)

# 接下来就可以进行 send, recv 操作
handle_c_fd.send('abcdefghij'.encode('utf8'))
data = handle_c_fd.recv(100)
print(data.decode('utf8'))

handle_c_fd.close()
s.close()

if __name__ == "__main__":
tcp_server()

'''
('192.168.31.106', 57226)
客户端消息
'''
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# client
import socket

def tcp_client():
c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
dest_addr = ('192.168.31.106', 2000)
c.connect(dest_addr)

data = c.recv(5)
print(data.decode('utf8'))
data = c.recv(5) # TCP 可以,而 UDP 不行的操作
print(data.decode('utf8'))


c.send('客户端消息'.encode('utf8'))

c.close()

if __name__ == "__main__":
tcp_client()

'''
abcde
fghij
'''

下面我们实现一个简易的文件下载器(实际上它可以发送 txt, jpg 等格式的文件):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# server
from socket import *
import sys

def get_file_content(file_name):
"""获取文件的内容"""
try:
with open(file_name, "rb") as f:
content = f.read()
return content
except:
print("没有下载的文件:%s" % file_name)

def main():
if len(sys.argv) != 2:
print("请按照如下方式运行:python3 xxx.py 7890")
return
else:
# 运行方式为 python3 xxx.py 7890
port = int(sys.argv[1])

# 创建 socket
tcp_server_socket = socket(AF_INET, SOCK_STREAM)
# 本地信息
address = ('192.168.31.106', port)
# 绑定本地信息
tcp_server_socket.bind(address)
# 将主动套接字变为被动套接字
tcp_server_socket.listen(128)

while True:
# 等待客户端的链接,即为这个客户端发送文件
client_socket, clientAddr = tcp_server_socket.accept()
# 接收对方发送过来的数据
recv_data = client_socket.recv(1024) # 接收 1024 个字节
file_name = recv_data.decode("utf-8")
print("对方请求下载的文件名为:%s" % file_name)
file_content = get_file_content(file_name)
# 发送文件的数据给客户端
# 因为获取打开文件时是以 rb 方式打开,所以 file_content 中的数据已经是二进制的格式,因此不需要 encode 编码
if file_content:
client_socket.send(file_content)
# 关闭这个套接字
client_socket.close()

# 关闭监听套接字
tcp_server_socket.close()

if __name__ == "__main__":
main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# client
from socket import *

def main():
# 创建 socket
tcp_client_socket = socket(AF_INET, SOCK_STREAM)
# 目的信息
server_ip = '192.168.31.106'
server_port = int(input("请输入服务器 port:"))

# 链接服务器
tcp_client_socket.connect((server_ip, server_port))
# 输入需要下载的文件名
file_name = input("请输入要下载的文件名:")

# 发送文件下载请求
tcp_client_socket.send(file_name.encode("utf-8"))

# 接收对方发送过来的数据
recv_data = b"" # 用于存储接收到的所有数据
while True:
chunk = tcp_client_socket.recv(1024) # 每次接收 1024 字节
if not chunk:
break # 如果没有数据,跳出循环
recv_data += chunk

if recv_data:
with open("[接收]"+file_name, "wb") as f:
f.write(recv_data)

# 关闭套接字
tcp_client_socket.close()

if __name__ == "__main__":
main()

注意:
若一次性接收数据,网络传输的数据量不一定能一次性到达。 即使将接收缓冲区的大小设置为 102440000,仍可能出现问题。这是因为 TCP 数据传输的机制不是一次性发送所有数据(尤其在网络延迟或带宽有限的情况下),实际上,数据是以小块的形式分批到达的。*因此需要循环接收数据,直到完整地接收到文件的全部内容。

提问:TCP 为什么要三次握手?
省流:两次握手不足够,会造成 server 死锁。(可以分两种情况解释)
详细
假设我们设计为两次握手。
(一)
A(client) 机要连到 B(server) 机,但连接信息在网络中延误了。
于是A 机重新发送,这次 B 收到了,于是 B 发信息回 A,两机建立连接。传输完毕后,断开连接。
但此时,一开始的连接信息到达 B 机,于是 B 机发信息给 A,此时 B 机就认为已和 A 建立连接,B 机就等待 A 传数据过来,永远地等待在 recv 接口上。发生死锁。
(二)
C 给 S 发送一个连接请求分组,S 收到此分组,并发送确认应答分组。按照两次握手的协定,S 认为连接已经建立,可以开始发送数据分组。
但是,S 的应答分组在传输中丢失。
C 不知道 S 是否已准备好,不知道 S 建议什么样的序列号,不知道 S 是否收到自己的连接请求分组。在这种情况下,C 认为连接还未建立成功,将忽略 S 发来的任何数据分组,只等待连接确认应答分组。而 S 在发出的数据分组超时后,重复发送同样的分组。这样就形成了死锁。

抓包

安装 wireshark :

1
apt install wireshark

windows 和 macos 安装比较容易,linux 安装参考:
https://www.wireshark.org/docs/wsug_html_chunked/ChapterBuildInstall.html

在 Debian 系统下运行 Wireshark 时遇到 “permission denied” 问题,通常是由于普通用户没有足够的权限捕获网络接口上的数据包。Wireshark 需要更高的权限来访问网络设备。

在安装过程中,系统会询问是否允许非 root 用户捕获数据包。选择“是”。

将当前用户添加到 wireshark 组

Wireshark 使用 wireshark 组来管理捕获数据包的权限。将当前用户添加到 wireshark 组中:

1
sudo usermod -aG wireshark $USER

然后,重新登录以使组更改生效,或运行以下命令刷新当前会话的用户组信息:

1
newgrp wireshark

重新配置 dumpcap 权限

Wireshark 使用一个名为 dumpcap 的工具来捕获数据包。需要确保它拥有合适的权限,以便普通用户可以使用它:

1
2
sudo chmod +x /usr/bin/dumpcap
sudo setcap cap_net_raw,cap_net_admin=eip /usr/bin/dumpcap

运行以下命令来确认 dumpcap 具有正确的权限:

1
getcap /usr/bin/dumpcap

输出应该类似于:

1
/usr/bin/dumpcap = cap_net_admin,cap_net_raw+eip

现在应该可以在普通用户模式下正常运行 Wireshark 而不会遇到“permission denied”错误。直接运行:

1
wireshark

TCP 数据报头

四次挥手(这个图只是为了展示 ACK 在哪):

86-8.png

ACK 位置 1 表明确认号是合法的。如果 ACK 为 0,那么数据报不包含确认信息,确认字段被省略。

86-9.png

SYN:用于建立连接。当 SYN=1 时,表示发起一个连接请求。

FIN:用于释放连接。当 FIN=1 时,表明此报文段的发送端的数据已发送完成,并要求释放连接。

UDP 代替 TCP

相比而言,UDP 能传输更多的数据(一种典型情况:1472 > 1460),这在经济上的好处是显而易见的(带宽很贵);另外,UDP 在无线环境下表现比 TCP 更好。因此有不少用 UDP 代替 TCP 的研究。

可以使用 UDP 模仿 TCP ,但是需要程序员自己设计应用层协议,实现相关功能,缓存、加序列号、重传等。

CS、BS 模式

CS 模式: client/server 模式。

  • 客户端位于目标主机上可以保证性能,将数据缓存至客户端本地,从而提高数据传输效率
  • 一般来说客户端和服务器程序由一个开发团队创作,所以他们之间所采用的协议相对灵活
  • 客户端和服务器都需要有一个开发团队来完
    成开发。工作量将成倍提升,开发周期较长
  • 从用户角度出发,需要将客户端安插至用户主机上,对用户主机的安全性构成威胁

BS 模式: browser/server 模式。

  • 没有独立的客户端,使用标准浏览器作为客户端,其工作开发量较小
  • 移植性非常好,不受平台限制
  • 由于使用第三方浏览器,因此网络应用支持受限
  • 没有客户端放到对方主机上,缓存数据不尽如人意,从而传输数据量受到限制
  • 采用标准 http 协议进行通信,协议选择不灵活

tcp 长连接和短连接

TCP 短连接:

  1. client 向 server 发起连接请求
  2. server 接到请求,双方建立连接
  3. client 向 server 发送消息
  4. server 回应 client
  5. 一次读写完成,此时双方任何一个都可以发起 close 操作

TCP 长连接:

  1. client 向 server 发起连接
  2. server 接到请求,双方建立连接
  3. client 向 server 发送消息
  4. server 回应 client
  5. 一次读写完成,连接不关闭
  6. 后续读写操作…
  7. 长时间操作之后 client 发起关闭请求

epoll

使用 epoll 实现的小型对话程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# server
import socket
import select
import sys

def tcp_server():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
addr = ('192.168.31.106', 5379)
s.bind(addr)
s.listen(128)
handle_c_fd, c_addr = s.accept()
print(c_addr)

epolL = select.epoll()
# 让 epoll 监听 handle_c_fd, sys.stdin
epolL.register(handle_c_fd.fileno(), select.EPOLLIN)
epolL.register(sys.stdin.fileno(), select.EPOLLIN)

while True:
# 谁的缓冲区有数据,就填写到 events
events = epolL.poll(-1)
for i, _ in events:
if i==handle_c_fd.fileno():
data = handle_c_fd.recv(100)
if data:
print(data.decode('utf8'))
else:
print('对方断开')
return

elif i==sys.stdin.fileno():
data = input() # server 说话,发给对方
handle_c_fd.send(data.encode('utf8'))

handle_c_fd.close()
s.close()

if __name__ == "__main__":
tcp_server()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# client
import socket
import select
import sys

def tcp_client():

c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
dest_addr = ('192.168.31.106', 5379)
c.connect(dest_addr)

epolL = select.epoll()
# 让 epoll 监听 c, sys.stdin
epolL.register(c.fileno(), select.EPOLLIN)
epolL.register(sys.stdin.fileno(), select.EPOLLIN)

while True:
# 谁的缓冲区有数据,就填写到 events
events = epolL.poll(-1)
for i, _ in events:
if i==c.fileno():
data = c.recv(100)
if data:
print(data.decode('utf8'))
else:
print('对方断开')
return

elif i==sys.stdin.fileno():
data = input()
c.send(data.encode('utf8'))

c.close()

if __name__ == "__main__":
tcp_client()

改进,client 断开还可以再次连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# server
import socket
import select
import sys

def tcp_server():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
addr = ('192.168.31.106', 5379)
s.bind(addr)
s.listen(128)

epolL = select.epoll()
epolL.register(s.fileno(), select.EPOLLIN)
epolL.register(sys.stdin.fileno(), select.EPOLLIN)

while True:
# 谁的缓冲区有数据,就填写到 events
events = epolL.poll(-1)
for i, event in events:
if i==s.fileno():
# 有客户端连接,就连上、注册
handle_c_fd, c_addr = s.accept()
print(c_addr)
epolL.register(handle_c_fd.fileno(), select.EPOLLIN)

if i==handle_c_fd.fileno():
data = handle_c_fd.recv(100)
if data:
print(data.decode('utf8'))
else:
print('对方断开')
epolL.unregister(handle_c_fd.fileno())
handle_c_fd.close()
break

elif i==sys.stdin.fileno():
try:
data = input() # server 说话,发给对方
except EOFError: # 按 ctrl+d 让 server 断开
print('I want to go.')
return
handle_c_fd.send(data.encode('utf8'))

s.close()

if __name__ == "__main__":
tcp_server()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# client
import socket
import select
import sys

def tcp_client():

c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
dest_addr = ('192.168.31.106', 5379)
c.connect(dest_addr)

epolL = select.epoll()
# 让 epoll 监听 c, sys.stdin
epolL.register(c.fileno(), select.EPOLLIN)
epolL.register(sys.stdin.fileno(), select.EPOLLIN)

while True:
# 谁的缓冲区有数据,就填写到 events
events = epolL.poll(-1)
for i, _ in events:
if i==c.fileno():
data = c.recv(100)
if data:
print(data.decode('utf8'))
else:
print('对方断开')
return

elif i==sys.stdin.fileno():
data = input()
c.send(data.encode('utf8'))

c.close()

if __name__ == "__main__":
tcp_client()

现在我们实现一个聊天室,client 程序保持不变,server 端的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# server
import socket
import select
import sys

def tcp_server():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
addr = ('192.168.31.106', 5379)
s.bind(addr)
s.listen(128)

epolL = select.epoll()
epolL.register(s.fileno(), select.EPOLLIN)
epolL.register(sys.stdin.fileno(), select.EPOLLIN)
client_list = [] # 糟糕的 ds 选择

while True:
# 谁的缓冲区有数据,就填写到 events
events = epolL.poll(-1)
for i, event in events:
if i==s.fileno():
# 有客户端连接,就连上、注册
handle_c_fd, c_addr = s.accept()
print(c_addr)
client_list.append(handle_c_fd)
epolL.register(handle_c_fd.fileno(), select.EPOLLIN)

else:
remove_client = None
for client in client_list:
if client.fileno()==i:
data = client.recv(100)
if data:
for others in client_list:
if others is client:
pass
else:
others.send(data)
else: # 断开了就记录一下 || 糟糕的 ds 选择
remove_client = client
if remove_client:
client_list.remove(remove_client)
epolL.unregister(remove_client.fileno())
remove_client.close()

if __name__ == "__main__":
tcp_server()

端口复用

1
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

btw, send() 函数可选 MSG_DONTWAIT 参数,效果是:即使对方没法接受这次要发送的全部信息(缓冲区不够用了),也强行发送,多余部分直接截断。

协议设计

考虑这样的场景:我们需要持续发送多个文件。

这会带来一个问题,这些多个文件在传输过程中,会以字节流的形式变成:

文件名1 + 文件1内容 + 文件名2 + 文件2内容 + …

这样我们就没办法区分出各个内容了,这种现象叫做沾包

为此,我们需要设计协议。一个自然的想法是:
(长度)&(内容)+(长度)&(内容)+ …

会用到这张表:

86-10.png

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import struct
import os
import time

train_content = '爱上地方传奇'.encode('utf8')
train_head = len(train_content)
print(train_head)
print(type(train_head))

print('-'*50)

train_head_bytes = struct.pack('I', train_head) # I 表示整型数
print((train_head_bytes))

b = struct.unpack('I', train_head_bytes)
print(b[0])

'''
18
<class 'int'>
--------------------------------------------------
b'\x12\x00\x00\x00'
18
'''

一个简单的小例子,注意这个例子里很多问题没有处理,比如并没有循环接数据(这样就没法接受大一点的文件),再比如没有考虑代码复用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# server 进行文件发送
from socket import *
import struct

def tcp_init():
s = socket(AF_INET, SOCK_STREAM)
addr = ('192.168.31.106', 2000)
s.bind(addr)
s.listen(128)
return s

def send_file():
file_name = 'adf.txt'
s = tcp_init()
handle_c_fd, client_addr = s.accept()

# 先发火车头(比喻)
file_name_bytes = file_name.encode('utf8')
train_head_bytes = struct.pack('I', len(file_name_bytes))
handle_c_fd.send(train_head_bytes + file_name_bytes)

# 再发文件内容
f = open(file_name, 'rb')
file_content = f.read()
train_head_bytes = struct.pack('I', len(file_content))
handle_c_fd.send(train_head_bytes + file_content)
f.close()
handle_c_fd.close()

s.close()

if __name__ == '__main__':
send_file()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# client
from socket import *
import struct

tcp_client_socket = socket(AF_INET, SOCK_STREAM)

addr = ('192.168.31.106', 2000)

tcp_client_socket.connect(addr)

# 每次先读 4 个字节的火车头
train_head_bytes = tcp_client_socket.recv(4)
train_content_len = struct.unpack('I', train_head_bytes)
file_name = tcp_client_socket.recv(train_content_len[0])
f = open('aaa'+file_name.decode('utf8'), 'wb')

# 接文件的长度及内容
train_head_bytes = tcp_client_socket.recv(4)
train_content_len = struct.unpack('I', train_head_bytes)
file_content = tcp_client_socket.recv(train_content_len[0])
f.write(file_content)
f.close()

tcp_client_socket.close()

网盘设计(一)

这是最初的一版,也是最简单的一版:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# server
from socket import *
import struct
import os

class Server:
def __init__(self, ip, port) -> None:
self.s_listen = None # 用来 listen 的 socket 对象
self.ip = ip
self.port = port

def tcp_init(self):
self.s_listen = socket(AF_INET, SOCK_STREAM)
self.s_listen.bind((self.ip, self.port))
self.s_listen.listen(128)

def task(self):
c_fd, c_addr = self.s_listen.accept()
user = User(c_fd)
user.deal_command()


class User:
'''
每个 User 对象对应一个客户端
'''
def __init__(self, handle_c_fd) -> None:
self.user_name = None
self.handle_c_fd = handle_c_fd
self.path = os.getcwd() # 存储连上的用户的路径

def deal_command(self):
while True:
command = self.recv_train().decode('utf8')
if command[:2] == 'ls':
self.do_ls()
elif command[:2] == 'cd':
self.do_cd(command)
elif command[:3] == 'pwd':
self.do_pwd()
elif command[:2] == 'rm':
self.do_rm(command)
elif command[:4] == 'gets':
self.do_gets()
elif command[:4] == 'puts':
self.do_puts()
else:
print('command wrong')

def send_train(self, send_bytes):
'''
send 火车,就是把某个字节流内容以火车形式发过去
'''
train_head_bytes = struct.pack('I', len(send_bytes))
self.handle_c_fd.send(train_head_bytes + send_bytes)

def recv_train(self):
'''
recv 火车,就是把火车 recv 的内容返回出去
'''
train_head_bytes = self.handle_c_fd.recv(4)
train_head = struct.unpack('I', train_head_bytes)
return self.handle_c_fd.recv(train_head[0])

def do_ls(self):
'''
当前路径下的信息传输给客户端
'''
data = ''
for file in os.listdir(self.path):
data += file + ' '*5 + str(os.stat(file).st_size) + '\n'
self.send_train(data.encode('utf8'))

def do_cd(self, command):
path = command.split()[1]
os.chdir(path)
self.path = os.getcwd()
self.send_train(self.path.encode('utf8'))

def do_pwd(self):
self.send_train(self.path.encode('utf8'))

def do_rm(self, command):
pass

def do_gets(self, command):
pass

def do_puts(self, command):
pass

if __name__ == '__main__':
server = Server('192.168.31.106', 2000)
server.tcp_init()
server.task()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# client
from socket import *
import struct

class Client:
def __init__(self, ip, port) -> None:
self.client:socket = None
self.ip = ip
self.port = port

def tcp_connect(self):
self.client = socket(AF_INET, SOCK_STREAM)
self.client.connect((self.ip, self.port))

def send_train(self, send_bytes):
'''
send 火车,就是把某个字节流内容以火车形式发过去
'''
train_head_bytes = struct.pack('I', len(send_bytes))
self.client.send(train_head_bytes + send_bytes)

def recv_train(self):
'''
recv 火车,就是把火车 recv 的内容返回出去
'''
train_head_bytes = self.client.recv(4)
train_head = struct.unpack('I', train_head_bytes)
return self.client.recv(train_head[0])

def send_command(self):
'''
发送各种命令给服务器
'''
while True:
command = input()
self.send_train(command.encode('utf8'))
if command[:2] == 'ls':
self.do_ls()
elif command[:2] == 'cd':
self.do_cd()
elif command[:3] == 'pwd':
self.do_pwd()
elif command[:2] == 'rm':
self.do_rm(command)
elif command[:4] == 'gets':
self.do_gets()
elif command[:4] == 'puts':
self.do_puts()
else:
print('command wrong')

def do_ls(self):
data = self.recv_train().decode('utf8')
print(data)

def do_cd(self):
print(self.recv_train().decode('utf8'))

def do_pwd(self):
print(self.recv_train().decode('utf8'))

def do_rm(self, command):
pass

def do_gets(self, command):
pass

def do_puts(self, command):
pass


if __name__ == '__main__':
client = Client('192.168.31.106', 2000)
client.tcp_connect()
client.send_command()

上面代码尚有功能未实现(其实就是核心的上传下载功能,不过这个在之前的代码里面有,可以直接拿过来),仅作示例。

多进程编程

前置知识

top 命令的前两行输出:

1
2
top - 16:46:57 up  1:58,  1 user,  load average: 1.51, 0.91, 0.91
Tasks: 300 total, 1 running, 299 sleeping, 0 stopped, 0 zombie

平均负载(load average),一般对于单个 cpu 来说,负载在 0~1.00 之间是正常的,超过 1.00 须引起注意。在多核 cpu 中,系统平均负载不应该高于 cpu 核心的总数。

buffers 与 cached 区别:buffers 指的是块设备的读写缓冲区,cached 指的是页面缓存

查看当前窗口启动的任务情况

  • python 1.while 死循环.py & 让进程后台运行
  • bg 让暂停的进程在后台运行
  • fg 拉到前台
  • jobs 看后台任务

设置定时任务

  • crontab –e 设置当前用户定时任务
  • vim /etc/crontab 设置定时任务
  • crontab -l 查看当前自己设置的定时任务

多进程的简单演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Process
import time

# 子进程的代码
def run_proc():
while True:
print('----2----')
time.sleep(1)

if __name__ == '__main__':
p = Process(target = run_proc)
p.start()
while True:
print('----1----')
time.sleep(1)

Process 语法结构 Process(group , target , name , args , kwargs)

  • target 如果传递了函数的引用,可以让这个子进程就执行这里的代码
  • args 给 target 指定的函数传递的参数,以元组的方式传递
  • kwargs 给 target 指定的函数传递命名参数,keyword 参数

Process 创建的实例对象的常用方法:

  • start() 启动子进程实例(创建子进程)
  • is_alive() 判断进程子进程是否还在活着
  • join([timeout]) 是否等待子进程执行结束,或等待多少秒 — 回收子进程尸体
  • terminate() 不管任务是否完成,立即终止子进程

获取 pid :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from multiprocessing import Process
import os

# 子进程的代码
def run_proc():
print('我是子进程 pid = {}'.format(os.getpid()))
print('子进程结束')

if __name__ == '__main__':
p = Process(target = run_proc)
p.start()
print('我是父进程 pid = {}'.format(os.getpid()))

'''
我是父进程 pid = 6079
我是子进程 pid = 6080
子进程结束
'''

给子进程指定的函数传递参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Process

# 子进程的代码
def run_proc(name, age, **kwargs):
print('子进程 {} {} {}'.format(name, age, kwargs))

if __name__ == '__main__':
p = Process(target = run_proc, args=('xiongda', 5), kwargs={'408':120})
p.start()
p.join()
print('我是父进程')

'''
子进程 xiongda 5 {'408': 120}
我是父进程
'''

孤儿进程

孤儿进程:父进程退出(kill 杀死父进程),子进程变为孤儿。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process
import os
import time

# 子进程的代码
def run_proc():
print('我是子进程 pid = {}'.format(os.getpid()))
while True:
time.sleep(1)

if __name__ == '__main__':
p = Process(target = run_proc)
p.start()
print('我是父进程 pid = {}'.format(os.getpid()))

terminal 1:

1
2
我是父进程 pid = 6586
我是子进程 pid = 6587

terminal 2:

1
2
3
4
5
zhiyue@168:~$ ps -elf|grep test
0 S zhiyue 6586 6574 0 80 0 - 5369 do_wai 10:35 pts/2 00:00:00 python3 -u /home/zhiyue/Documents/0928work/test.py
1 S zhiyue 6587 6586 0 80 0 - 5369 hrtime 10:35 pts/2 00:00:00 python3 -u /home/zhiyue/Documents/0928work/test.py
0 S zhiyue 6671 6646 0 80 0 - 2356 pipe_r 10:35 pts/3 00:00:00 grep test
zhiyue@168:~$ kill -9 6586

terminal 1:

1
2
3
我是父进程 pid = 6586
我是子进程 pid = 6587
Killed

terminal 2:

1
2
3
zhiyue@168:~$ ps -elf|grep test
1 S zhiyue 6587 1068 0 80 0 - 5369 hrtime 10:35 pts/2 00:00:00 python3 -u /home/zhiyue/Documents/0928work/test.py
0 S zhiyue 6733 6646 0 80 0 - 2356 pipe_r 10:36 pts/3 00:00:00 grep test

僵尸进程

僵尸进程:子进程退出,父进程在忙碌,没有回收它,要避免僵尸。

Python 进程变为僵尸进程后,名字会改变。可以使用 top 查看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Process
import os
import time

# 子进程的代码
def run_proc():
print('我是子进程 pid = {}'.format(os.getpid()))
print('子进程结束')

if __name__ == '__main__':
p = Process(target = run_proc)
p.start()
print('我是父进程 pid = {}'.format(os.getpid()))
while True:
time.sleep(1)

terminal:

1
2
3
4
5
我是父进程 pid = 7283
我是子进程 pid = 7284
子进程结束


是否共享全局变量

子进程创建是父进程的复制品,资源是独立使用的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from multiprocessing import Process
import os

nums = [11, 22]

def work1():
print('work1 {}'.format(os.getpid()))
nums.append(33)
print('work1 {}'.format(nums))

def work2():
print('work2 {}'.format(os.getpid()))
print(nums)

if __name__ == '__main__':
p = Process(target=work1)
p.start()
p.join()

print('parent {}'.format(nums))

p = Process(target=work2)
p.start()
p.join()

'''
work1 11308
work1 [11, 22, 33]
parent [11, 22]
work2 11309
[11, 22]
'''

进程间通信

Process 之间有时需要通信,操作系统提供了很多机制来实现进程间的通信(例如管道,共享内存)。

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Queue
q = Queue(3) # 初始化一个 Queue 对象,最多接受三条 put 消息
q.put(1)
q.put(2)
print(q.full()) # False
q.put(3)
print(q.full()) # True
# q.put(4) 队列满了后,再放会阻塞
print(q.get())
print(q.get())
print(q.get())
# print(q.get()) 队列空了后,再取会阻塞

put() 可以带参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from multiprocessing import Queue
q = Queue(3) # 初始化一个 Queue 对象,最多接受三条 put 消息
q.put(1)
q.put(2)
print(q.full()) # False
q.put(3)
print(q.full()) # True

try:
q.put('消息4', True, 2)
except:
print('消息队列已满,现有消息数量:%s'%q.qsize())


'''
False
True
消息队列已满,现有消息数量:3
'''

'''
最后一行会等待两秒才打印
'''
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from multiprocessing import Queue
q = Queue(3) # 初始化一个 Queue 对象,最多接受三条 put 消息
q.put(1)
q.put(2)
print(q.full()) # False
q.put(3)
print(q.full()) # True

try:
q.put('消息4', False)
except:
print('消息队列已满,现有消息数量:%s'%q.qsize())

'''
False
True
消息队列已满,现有消息数量:3
'''

'''
最后一行会立即输出
'''

推荐的方式:先判断消息列队是否已满,再写入;读取消息时,先判断消息列队是否为空,再读取。

1
2
3
4
5
6
7
if not q.full():
q.put_nowait("消息 4")


if not q.empty():
for i in range(q.qsize()):
print(q.get_nowait())

下面实际演示两个进程间的通信:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from multiprocessing import Queue
from multiprocessing import Process
import time

def writer(p):
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(1)

def reader(p):
while True:
if not q.empty():
value = q.get(True)
print('Get %s from queue.' % value)
time.sleep(2)
else:
break

if __name__ == '__main__':
q = Queue(10)
pw = Process(target=writer, args=(q,))
pr = Process(target=reader, args=(q,))

pw.start()
time.sleep(1)
pr.start()
pw.join()
pr.join()

'''
Put A to queue...
Put B to queue...
Get A from queue.
Put C to queue...
Get B from queue.
Get C from queue.
'''

进程池

Python 已经帮我们做好了 dirty work, 建议爽用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from multiprocessing.pool import Pool
import os, time, random

def worker(msg):
t_start = time.time()
print("%s 开始执行,进程号为%d" % (msg,os.getpid()))
# random.random()随机生成 0~1 之间的浮点数
time.sleep(random.random()*2)
t_stop = time.time()
print(msg,"执行完毕,耗时%0.2f" % (t_stop-t_start))

if __name__ == '__main__':
po = Pool(3)
for i in range(10):
# Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))
# 每次循环将会用空闲出来的子进程去调用目标
po.apply_async(worker, (i,))

print("----start----")
po.close() # 关闭进程池,关闭后 po 不再接收新的请求
po.join() # 等待 po 中所有子进程执行完成,必须放在 close 语句之后
print("-----end-----")

'''
----start----
0 开始执行,进程号为7836
1 开始执行,进程号为7837
2 开始执行,进程号为7838
0 执行完毕,耗时0.24
3 开始执行,进程号为7836
3 执行完毕,耗时0.02
4 开始执行,进程号为7836
2 执行完毕,耗时0.61
5 开始执行,进程号为7838
1 执行完毕,耗时0.70
6 开始执行,进程号为7837
6 执行完毕,耗时0.63
7 开始执行,进程号为7837
5 执行完毕,耗时0.89
8 开始执行,进程号为7838
7 执行完毕,耗时0.29
9 开始执行,进程号为7837
4 执行完毕,耗时1.55
9 执行完毕,耗时0.74
8 执行完毕,耗时1.43
-----end-----
'''

进程池中的 Queue
如果要使用 Pool 创建进程,就需要使用 multiprocessing.Manager() 中的 Queue() ,而不是 multiprocessing.Queue()

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 修改 import 中的 Queue 为 Manager
from multiprocessing import Manager,Pool
import os,time,random

def reader(q):
print("reader 启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
for i in range(q.qsize()):
print("reader 从 Queue 获取到消息:%s" % q.get(True))

def writer(q):
print("writer 启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
for i in "wangdao":
q.put(i)

if __name__=="__main__":
print("(%s) start" % os.getpid())
q = Manager().Queue() # 使用 Manager 中的 Queue
po = Pool()
po.apply_async(writer, (q,))

time.sleep(1) # 先让上面的任务向 Queue 存入数据,然后再让下面的任务开始从中取数据

po.apply_async(reader, (q,))

po.close()
po.join()
print("(%s) End" % os.getpid())

'''
(9168) start
writer 启动(9174),父进程为(9168)
reader 启动(9175),父进程为(9168)
reader 从 Queue 获取到消息:w
reader 从 Queue 获取到消息:a
reader 从 Queue 获取到消息:n
reader 从 Queue 获取到消息:g
reader 从 Queue 获取到消息:d
reader 从 Queue 获取到消息:a
reader 从 Queue 获取到消息:o
(9168) End
'''

多线程

一种很类似于考研应试的加锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import threading

g_num = 0

def work1(num):
global g_num
for i in range(num):
mutex.acquire()
g_num += 1
mutex.release()
print('----- in work1, g_num is %d -----'%g_num)

def work2(num):
global g_num
for i in range(num):
mutex.acquire()
g_num += 1
mutex.release()
print('----- in work2, g_num is %d -----'%g_num)

if __name__ == '__main__':
mutex = threading.Lock()
t1 = threading.Thread(target=work1, args=(10000000,))
t2 = threading.Thread(target=work2, args=(10000000,))

t1.start()
t2.start()

t1.join()
t2.join()

print("2 个线程对同一个全局变量操作之后的最终结果是:%s" % g_num)


'''
----- in work1, g_num is 19992828 ---------- in work2, g_num is 20000000 -----

2 个线程对同一个全局变量操作之后的最终结果是:20000000
'''

可迭代、迭代器、生成器

可迭代的:类中只重写了 __iter__ 内置方法,就是可迭代的,这个方法要求返回一个迭代器。

迭代器:类中重写了 __iter__ ,还需要重写 __next__

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from collections.abc import Iterable

class MyList:
def __init__(self) -> None:
self.container = []

def add(self, item):
self.container.append(item)

# 只要重写了 __iter__ 方法,就会变成可迭代的
def __iter__(self):
myiterator = MyIter(self)
return myiterator

class MyIter:
def __init__(self, mylist):
self.mylist:MyList = mylist
# current 记录当前访问到的位置
self.current = 0

def __next__(self):
current = self.current
self.current += 1
if current < len(self.mylist.container):
return self.mylist.container[current]
else:
raise StopIteration

def __iter__(self):
return self

if __name__ == '__main__':
mylist = MyList()
mylist.add(666)
mylist.add(667)
mylist.add(668)
print(isinstance(mylist, Iterable))

# myiter = iter(mylist)
# print(next(myiter))
# print(next(myiter))
# print(next(myiter))
# print(next(myiter))

for i in mylist:
print(i)

'''
True
666
667
668
'''

我们可以采用更简便的语法,即生成器(generator)。生成器是一类特殊的迭代器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
>>> G = ( x*2 for x in range(5))
>>> G
<generator object <genexpr> at 0x7fe8669f9630>
>>> next(G)
0
>>> next(G)
2
>>> next(G)
4
>>> next(G)
6
>>> next(G)
8
>>> next(G)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration

yield 的作用:把当前上下文(寄存器的状态)保存起来,返回一个值——它后面跟的那个值。下次执行 next 操作,会从上次 yield 保存的现场位置继续执行。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def fi(n):
current = 0
num1, num2 = 0, 1
while current < n:
num = num1
# 这个语法太自由了,给我楞了一下
num1, num2 = num2, num1 + num2
current += 1
yield num
return 'done'

F = fi(5)

print(next(F))
print(next(F))
print(next(F))
print(next(F))
print(next(F))
print(next(F))

'''
0
1
1
2
3
Traceback (most recent call last):
File "/home/zhiyue/Documents/0929work/test.py", line 19, in <module>
print(next(F))
^^^^^^^
StopIteration: done
'''

又或者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def fi(n):
current = 0
num1, num2 = 0, 1
while current < n:
num = num1
# 这个语法太自由了,给我楞了一下
num1, num2 = num2, num1 + num2
current += 1
yield num
return 'done'

F = fi(10)

for i in F:
print(i, end = ' ')

'''
0 1 1 2 3 5 8 13 21 34
'''

含有 yield 的函数称为生成器。

我们除了可以使用 next() 函数来唤醒生成器继续执行外,还可以使用 send() 函数来唤醒执行。使用 send() 函数的一个好处是可以在唤醒的同时向断点处传入一个附加数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def gen():
i = 0
while i<5:
temp = yield i
print(temp)
i+=1

f = gen()
print(next(f))
f.send('haha')
print(next(f))
f.send('haha')

'''
0
haha
None
2
haha
'''

协程

引入

协程(Coroutine),又称微线程,纤程。

协程是 Python 中另一种实现多任务的方式,只不过比线程需要的资源更少。通过保存、恢复 CPU 上下文的机制,可以在合适的时机把一个协程切换到另一个协程。

86-11.png

协程通过 yield 在用户态保存上下文,操作系统不感知。

实现多任务时,线程切换从系统层面远不止保存和恢复 CPU 上下文这么简单。操作系统为了程序运行的高效性每个线程都有自己的缓存 Cache 等数据,操作系统还会支持这些数据的恢复操作。所以线程的切换非常耗性能(相对协程来说)。但是协程的切换只是单纯地操作 CPU 的上下文,所以一秒钟切换个上百万次系统都抗的住

模拟协程的机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import time
def work1():
while True:
print("----work1---")
yield
time.sleep(0.5)

def work2():
while True:
print("----work2---")
yield
time.sleep(0.5)

def main():
w1 = work1()
w2 = work2()
while True:
next(w1)
next(w2)

if __name__ == "__main__":
main()

'''
----work1---
----work2---
----work1---
----work2---
----work1---
----work2---
...(省略)...
'''

greenlet

为了更好使用协程来完成多任务,python 中的 greenlet 模块对其封装,从而使得切换任务变的更加简单。

Debian 环境:

1
sudo apt install python3-greenlet

这只是一个功能非常初级的包,仅做演示目的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from greenlet import greenlet
import time

def test1():
while True:
print("---A--")
gr2.switch()
time.sleep(0.5)

def test2():
while True:
print("---B--")
gr1.switch()
time.sleep(0.5)

gr1 = greenlet(test1)
gr2 = greenlet(test2)

#切换到 gr1 中运行
gr1.switch()

'''
---A--
---B--
---A--
---B--
---A--
---B--
(后略)
'''

gevent

演示

greenlet 虽然实现了协程,但还需要人工切换,太麻烦。gevent 是一个比 greenlet 更强大的并且能够自动切换任务的模块。

其原理是当一个 greenlet 遇到 IO(访问网络、文件操作等)操作时,就自动切换到其他的 greenlet。等到 IO 操作完成,再适时切换回来继续执行。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import gevent

def f(n):
for i in range(n):
print(gevent.getcurrent(), i)

g1 = gevent.spawn(f, 5)
g2 = gevent.spawn(f, 5)
g3 = gevent.spawn(f, 5)
g1.join()
g2.join()
g3.join()

'''
<Greenlet at 0x7f2ddd145ee0: f(5)> 0
<Greenlet at 0x7f2ddd145ee0: f(5)> 1
<Greenlet at 0x7f2ddd145ee0: f(5)> 2
<Greenlet at 0x7f2ddd145ee0: f(5)> 3
<Greenlet at 0x7f2ddd145ee0: f(5)> 4
<Greenlet at 0x7f2ddd0cd260: f(5)> 0
<Greenlet at 0x7f2ddd0cd260: f(5)> 1
<Greenlet at 0x7f2ddd0cd260: f(5)> 2
<Greenlet at 0x7f2ddd0cd260: f(5)> 3
<Greenlet at 0x7f2ddd0cd260: f(5)> 4
<Greenlet at 0x7f2ddd0a8860: f(5)> 0
<Greenlet at 0x7f2ddd0a8860: f(5)> 1
<Greenlet at 0x7f2ddd0a8860: f(5)> 2
<Greenlet at 0x7f2ddd0a8860: f(5)> 3
<Greenlet at 0x7f2ddd0a8860: f(5)> 4
'''

可以看到,3 个 greenlet 是依次运行而不是交替运行。

gevent 切换执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import gevent

def f(n):
for i in range(n):
print(gevent.getcurrent(), i)
# 用来模拟一个耗时操作,注意不是 time 模块中的 sleep
gevent.sleep(1)

g1 = gevent.spawn(f, 5)
g2 = gevent.spawn(f, 5)
g3 = gevent.spawn(f, 5)
g1.join()
g2.join()
g3.join()

'''
<Greenlet at 0x7f892bb41ee0: f(5)> 0
<Greenlet at 0x7f892ba85260: f(5)> 0
<Greenlet at 0x7f892ba60860: f(5)> 0
<Greenlet at 0x7f892bb41ee0: f(5)> 1
<Greenlet at 0x7f892ba85260: f(5)> 1
<Greenlet at 0x7f892ba60860: f(5)> 1
<Greenlet at 0x7f892bb41ee0: f(5)> 2
<Greenlet at 0x7f892ba85260: f(5)> 2
<Greenlet at 0x7f892ba60860: f(5)> 2
<Greenlet at 0x7f892bb41ee0: f(5)> 3
<Greenlet at 0x7f892ba85260: f(5)> 3
<Greenlet at 0x7f892ba60860: f(5)> 3
<Greenlet at 0x7f892bb41ee0: f(5)> 4
<Greenlet at 0x7f892ba85260: f(5)> 4
<Greenlet at 0x7f892ba60860: f(5)> 4
'''

monkey 补丁

先看一下没有补丁的情况(使用了time.sleep()):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from gevent import monkey
import gevent
import random
import time

def coroutine_work(coroutine_name):
for i in range(10):
print(coroutine_name, i)
time.sleep(random.random())

gevent.joinall([
gevent.spawn(coroutine_work, "work1"),
gevent.spawn(coroutine_work, "work2")
])

'''
work1 0
work1 1
work1 2
work1 3
work1 4
work1 5
work1 6
work1 7
work1 8
work1 9
work2 0
work2 1
work2 2
work2 3
work2 4
work2 5
work2 6
work2 7
work2 8
work2 9
'''

打上补丁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from gevent import monkey
import gevent
import random
import time

# 有耗时操作时需要
monkey.patch_all() # 将程序中用到的耗时操作的代码,换为 gevent 中自己实现的模块

def coroutine_work(coroutine_name):
for i in range(10):
print(coroutine_name, i)
time.sleep(random.random())

gevent.joinall([
gevent.spawn(coroutine_work, "work1"),
gevent.spawn(coroutine_work, "work2")
])

'''
work1 0
work2 0
work1 1
work1 2
work2 1
work2 2
work2 3
work1 3
work1 4
work1 5
work1 6
work2 4
work1 7
work1 8
work1 9
work2 5
work2 6
work2 7
work2 8
work2 9
'''

'''
运行结果不唯一
'''

猴子补丁作用:
monkey patch 是在执行时动态替换,通常是在 startup 的时候。用过 gevent 就会知道,会在最开头的地方 gevent.monkey.patch_all(),把标准库中的 threadsocket 等给替换掉。这样我们在后面使用 socket 的时候能够跟寻常一样使用,无需改动代码,就可以将它变成非堵塞的

官方的 asyncio

文档: https://docs.python.org/zh-cn/3.12/library/asyncio-task.html

简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio

async def main():
print('hello')
await asyncio.sleep(1)
print('world')

asyncio.run(main())

'''
hello
world
'''

'''
第一行之后等待了一会才打印第二行
'''

对比下面两份代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
import time

async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)

async def main():
print(f"started at {time.strftime('%X')}")

await say_after(1, 'hello')
await say_after(2, 'world')

print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

'''
started at 15:43:57
hello
world
finished at 15:44:00
'''
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio
import time

async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)

async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))

task2 = asyncio.create_task(
say_after(2, 'world'))

print(f"started at {time.strftime('%X')}")

# 等待直到两个任务都完成
# (会花费约 2 秒钟。)
await task1
await task2

print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

'''
started at 15:49:46
hello
world
finished at 15:49:48
'''

注意,预期的输出显示代码段的运行时间比之前快了 1 秒。

并发下载器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from gevent import monkey
import gevent
import urllib.request

# 有耗时操作时需要
monkey.patch_all()

def my_downLoad(url):
print('GET: %s' % url)
resp = urllib.request.urlopen(url)
data = resp.read()
print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
gevent.spawn(my_downLoad, 'http://www.baidu.com/'),
gevent.spawn(my_downLoad, 'http://www.cskaoyan.com/'),
gevent.spawn(my_downLoad, 'http://www.qq.com/'),
])

'''
GET: http://www.baidu.com/
GET: http://www.cskaoyan.com/
GET: http://www.qq.com/
122235 bytes received from http://www.qq.com/.
412916 bytes received from http://www.baidu.com/.
46781 bytes received from http://www.cskaoyan.com/.
'''

从上能够看到,收到数据的先后顺序不一定与发送顺序相同,这体现了异步,即不确定什么时候会收到数据,顺序不一定。

网盘设计(二)

增加了下载功能,改为循环收发的设计。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# server
from socket import *
import struct
import os

class Server:
def __init__(self, ip, port) -> None:
self.s_listen = None # 用来 listen 的 socket 对象
self.ip = ip
self.port = port

def tcp_init(self):
self.s_listen = socket(AF_INET, SOCK_STREAM)
self.s_listen.bind((self.ip, self.port))
self.s_listen.listen(128)

def task(self):
c_fd, c_addr = self.s_listen.accept()
user = User(c_fd)
user.deal_command()


class User:
'''
每个 User 对象对应一个客户端
'''
def __init__(self, handle_c_fd) -> None:
self.user_name = None
self.handle_c_fd = handle_c_fd
self.path = os.getcwd() # 存储连上的用户的路径

def deal_command(self):
while True:
command = self.recv_train().decode('utf8')
if command[:2] == 'ls':
self.do_ls()
elif command[:2] == 'cd':
self.do_cd(command)
elif command[:3] == 'pwd':
self.do_pwd()
elif command[:2] == 'rm':
self.do_rm(command)
elif command[:4] == 'gets':
self.do_gets(command)
elif command[:4] == 'puts':
self.do_puts(command)
else:
print('command wrong')

def send_train(self, send_bytes):
'''
send 火车,就是把某个字节流内容以火车形式发过去
'''
train_head_bytes = struct.pack('I', len(send_bytes))
self.handle_c_fd.send(train_head_bytes + send_bytes)

def recv_train(self):
'''
recv 火车,就是把火车 recv 的内容返回出去
'''
train_head_bytes = self.handle_c_fd.recv(4)
train_head = struct.unpack('I', train_head_bytes)
return self.handle_c_fd.recv(train_head[0])

def do_ls(self):
'''
当前路径下的信息传输给客户端
'''
data = ''
for file in os.listdir(self.path):
data += file + ' '*5 + str(os.stat(file).st_size) + '\n'
self.send_train(data.encode('utf8'))

def do_cd(self, command):
path = command.split()[1]
os.chdir(path)
self.path = os.getcwd()
self.send_train(self.path.encode('utf8'))

def do_pwd(self):
self.send_train(self.path.encode('utf8'))

def do_rm(self, command):
pass

def do_gets(self, command):
# 向客户端发送文件
fileName = command.split()[1]
file_size = os.stat(fileName).st_size # 发送文件大小
# print('ohla')
# print(file_size)
self.send_train(struct.pack('I', file_size))
# print('server here')

f = open(fileName, 'rb')
while True:
file_content = f.read(10000)
if file_content:
self.handle_c_fd.send(file_content)
else:
break
f.close()

def do_puts(self, command):
fileName = command.split()[1]
# 将 client 发来的文件保存在 server 上
pass

if __name__ == '__main__':
server = Server('192.168.31.106', 2000)
server.tcp_init()
server.task()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# client
from socket import *
import struct

def cycle_recv(somefd, file, fileSize):
total = 0
while total < fileSize:
data = somefd.recv(10000)
file.write(data)
total += len(data)

class Client:
def __init__(self, ip, port) -> None:
self.client:socket = None
self.ip = ip
self.port = port

def tcp_connect(self):
self.client = socket(AF_INET, SOCK_STREAM)
self.client.connect((self.ip, self.port))

def send_train(self, send_bytes):
'''
send 火车,就是把某个字节流内容以火车形式发过去
'''
train_head_bytes = struct.pack('I', len(send_bytes))
self.client.send(train_head_bytes + send_bytes)

def recv_train(self):
'''
recv 火车,就是把火车 recv 的内容返回出去
'''
train_head_bytes = self.client.recv(4)
train_head = struct.unpack('I', train_head_bytes)
return self.client.recv(train_head[0])

def send_command(self):
'''
发送各种命令给服务器
'''
while True:
command = input()
self.send_train(command.encode('utf8'))
if command[:2] == 'ls':
self.do_ls()
elif command[:2] == 'cd':
self.do_cd()
elif command[:3] == 'pwd':
self.do_pwd()
elif command[:2] == 'rm':
self.do_rm(command)
elif command[:4] == 'gets':
self.do_gets(command)
elif command[:4] == 'puts':
self.do_puts(command)
else:
print('command wrong')

def do_ls(self):
data = self.recv_train().decode('utf8')
print(data)

def do_cd(self):
print(self.recv_train().decode('utf8'))

def do_pwd(self):
print(self.recv_train().decode('utf8'))

def do_rm(self, command):
pass

def do_gets(self, command): # 从 server 那里拿文件
# 先接文件的大小
file_size = struct.unpack('I', self.recv_train())[0]
# print('client recved size:')
# print(file_size)
fileName = command.split()[1]

# print('client here')
f = open('[接受]' + fileName, 'wb')
cycle_recv(self.client, f, file_size)
f.close()

def do_puts(self, command):
fileName = command.split()[1]
pass

if __name__ == '__main__':
client = Client('192.168.31.106', 2000)
client.tcp_connect()
client.send_command()

进一步改进的思路:

86-12.png

86-13.png

还可以考虑增加功能:

  • 用户名、密码存储在数据库中
  • 秒传(某用户上传其他用户已经上传过的内容不会重复存一份)

阻塞模式、非阻塞模式

本小节内容由 AIGC 改编。

阻塞模式(Blocking Mode)
这是套接字的默认行为。在阻塞模式下,诸如 recv()accept() 等操作会一直等待,直到有数据到达或操作完成为止。比如,服务器等待客户端的连接请求时,accept() 会阻塞,直到有客户端连接上。

非阻塞模式(Non-Blocking Mode)

  • 在非阻塞模式下,诸如 recv()send()accept() 等操作如果不能立即完成,就会抛出一个 BlockingIOError 异常。程序可以继续做其他事情,而不是在等待某个操作完成。
  • 非阻塞模式通常用于需要处理多个连接的服务器,如那些使用 select()poll() 的服务器架构。非阻塞模式的使用场景:高并发服务器、异步 I/O 等。

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import socket

# 创建一个 TCP 套接字
tcp_server_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 绑定地址和端口
tcp_server_tcp.bind(('127.0.0.1', 8080))

# 监听连接
tcp_server_tcp.listen(5)

# 设置为非阻塞模式
tcp_server_tcp.setblocking(False)

# 主循环,处理多个客户端连接
while True:
try:
client_socket, addr = tcp_server_tcp.accept()
print(f"Accepted connection from {addr}")
except BlockingIOError:
# 如果没有客户端连接,继续执行其他任务
print("No incoming connections, doing other work...")
# 可以在这里做其他的事情

我们在前面已经看到,有些代码可以在没有显式设置非阻塞的情况下使用 epoll 进行事件驱动的处理,但让我们仔细探讨一下为什么在某些情况下使用非阻塞模式仍然有其必要性,以及在某些代码中为什么不需要。

select.epoll() 是 Linux 下的一种高级 I/O 多路复用机制,适用于高并发场景。epoll 本质上是事件驱动的,它能够监视多个文件描述符(如套接字、标准输入等),并在这些描述符有事件发生时通知程序,因此并不需要手动设置套接字为非阻塞。

  • epoll.poll(-1) 会阻塞并等待,直到有事件(如客户端发来的数据或服务器端的输入)发生。
  • epoll 监控到文件描述符上有可用的事件时,才会返回并让你处理数据。这种机制下,套接字的操作在正常情况下不会被阻塞,因为事件驱动会确保只有在数据准备好时才进行 I/O 操作。

为什么在这种情况下不需要非阻塞模式:

epoll 自己会处理套接字的事件,因此不需要套接字是非阻塞的。程序会在 epoll.poll() 返回时知道哪个文件描述符有事件发生,并且这些事件都准备好了进行处理(如 recv()send()),所以它本质上已经是一种避免阻塞的机制。

非阻塞模式的使用场景:

虽然这里不需要手动设置非阻塞模式,但在一些不使用 epollselect 的代码中,设置非阻塞的好处包括:

  • 单独处理 I/O:如果不使用 epollselect,并且需要继续处理其他任务时,可以通过将套接字设置为非阻塞避免等待 I/O。

  • 配合异步框架:在使用异步编程时(如 asyncio),套接字通常会被设置为非阻塞模式,允许事件循环继续处理其他事件而不是被某个套接字阻塞住。

  • 处理多个客户端:在不使用 selectepoll 时,服务器如果采用阻塞模式处理 I/O,就只能一次处理一个客户端。如果设置为非阻塞模式,则可以轮询处理多个客户端的请求。

为什么在 epoll 中不需要显式设置非阻塞:

使用 epoll 监听文件描述符时,epoll 是事件驱动的,调用 recv()send() 之类的操作时,事件已经准备好,因此不需要再通过设置套接字为非阻塞来避免阻塞操作。

如果要结合非阻塞和 epoll

在某些情况下(如需要非阻塞的连接处理时),仍然可以结合两者使用。例如,如果想在其他地方做一些操作而不想等某个文件描述符就绪,就可以设置非阻塞。

HTTP 协议头部解析

一些基本知识可以在网上找找,F12 打开看看就行。

当请求头没有 content-length 时,怎么知道请求体结束了?

http 的 header 和 body 之间是空行分割的,又因为每个头部项是以 \r\n 作为结束符,所以,数据流中是以 \r\n\r\n 来分割解析请求头(响应头)与请求体(响应体)的。如下图所示:

86-14.png

怎么知道(请求体)响应体结束了呢? http 协议规定,响应头的字段 content-length 用来表示响应体长度大小,但是,有可能发送请求头时,并不能知道完整的响应体长度(比如当响应数据太大,服务端流式处理的情况),这时需要设置请求头 Transfer-Encoding: chunked,使用数据块的方式传输,数据块格式如下图所示:

86-15.png

HTTP 请求的类型(挑了几个重要的):

  • GET 查询
  • POST 新增
  • PUT 修改
  • DELETE 删除

下面制作一个最简单的 web server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import socket
tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 为了确保端口复用
tcp_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# 2. 绑定
tcp_server_socket.bind(("192.168.31.106", 7890))

# 3. 变为监听套接字
tcp_server_socket.listen(128)


new_socket, client_addr = tcp_server_socket.accept()
http_head=new_socket.recv(10000)
print(http_head.decode('utf8'))
response = "HTTP/1.1 200 OK\r\n"
response += "\r\n"
response +='<html><h1>hello world</h1></html>'
new_socket.send(response.encode('utf8'))

'''
GET / HTTP/1.1
Host: 192.168.31.106:7890
Connection: keep-alive
Cache-Control: max-age=0
DNT: 1
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8


浏览器的显示:一级标题 hello world
'''

注意需要在浏览器打开 192.168.31.106:7890(本机 IP 和对应的端口号)。

HTTP响应码总结(如果挂了可去 archive.org 查找):
https://zhuanlan.zhihu.com/p/66062179

概览:

86-16.png

改进这个 web server :
https://github.com/dropsong/py_webServer

Python 连接 MySQL

环境问题

debian12 中已经安装了 MariaDB,它会与 MySQL 发生冲突。

使用以下命令卸载 MariaDB:

1
2
3
sudo apt remove --purge mariadb-server mariadb-client mariadb-common mariadb-server-core mariadb-client-core
sudo apt autoremove
sudo apt autoclean

安装 MySQL :

1
2
sudo apt update
sudo apt install mysql-server mysql-client

安装 pymysql:

1
sudo apt install python3-pymysql

大功告成!

实战

一个简单有效的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from pymysql import *

def main():
# 创建 Connection 连接
conn = connect(host='localhost',port=3306,database='akashi',user ='root',password='你知道的太多了',charset='utf8')

# 获得 Cursor 对象
cs1 = conn.cursor() # 操作对象

# 执行 insert 语句,并返回受影响的行数:添加一条数据
# 增加
count = cs1.execute('insert into goods(name, cate_name, brand_name, price, is_show, is_saleoff) values("redmibook", "笔记本", "xiaomi", 112, true, true)')
#打印受影响的行数
print(count)

count = cs1.execute('insert into goods(name, cate_name, brand_name, price, is_show, is_saleoff) values("honor_book", "笔记本", "huawei", 111, true, true)')
print(count)

# # 更新
# count = cs1.execute('update goods_cates set name="机械硬盘" where name="硬盘"')
# # 删除
# count = cs1.execute('delete from goods_cates where id=6')

# 提交之前的操作,如果之前已经之执行过多次的 execute,那么就都进行提交
conn.commit()

# 关闭 Cursor 对象
cs1.close()
# 关闭 Connection 对象
conn.close()

if __name__ == '__main__':
main()

以上代码经过验证。

查询一行数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from pymysql import *

def main():
# 创建 Connection 连接
conn = connect(host='localhost',port=3306,user='root',password='你知道的太多了',database='akashi',charset='utf8')

# 获得 Cursor 对象
cs1 = conn.cursor()

# 执行 select 语句,并返回受影响的行数:查询一条数据
count = cs1.execute('select id,name from goods where id>=4')
# 打印受影响的行数
print("查询到%d 条数据:" % count)

for i in range(count):
# 获取查询的结果
result = cs1.fetchone()
# 打印查询的结果
print(result)

# 获取查询的结果
# 关闭 Cursor 对象
cs1.close()
conn.close()

if __name__ == '__main__':
main()

以上代码经过验证。

查询多行数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from pymysql import *

def main():
# 创建 Connection 连接
conn = connect(host='localhost',port=3306,user='root',password='你知道的太多了',database='akashi',charset='utf8')
# 获得 Cursor 对象
cs1 = conn.cursor()

# 执行 select 语句,并返回受影响的行数:查询一条数据
count = cs1.execute('select id,name from goods where id>=4')
# 打印受影响的行数
print("查询到%d 条数据:" % count)

result = cs1.fetchall()
print(result)
# 关闭 Cursor 对象
cs1.close()
conn.close()

if __name__ == '__main__':
main()

以上代码经过验证。