零碎知识 这里是 Python 的一些零碎知识点。
多值参数:
1 2 3 4 5 6 7 8 9 10 def demo (num, *args, **kwargs ): print (num) print (args) print (kwargs) demo(1 , 2 , 3 , 4 , 5 , name="小明" , age = 18 , gender = True )
另外,了解元组和字典的拆包(自行查找资料)。
if __name__ == "__main__"
是什么?
该语句下面缩进的内容在python xxx.py
时会执行,而在import xxx
时不会执行。
类属性 ,类似于 C++ 中的类的静态成员 。
类方法 :
1 2 3 @classmethod def show_tool_cnt (cls ): print ("工具对象总数: %d" % cls.count)
单例模式 实现起来比 C++ 简单。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class MusicPlayer (object ): instance = None @classmethod def __new__ (cls, *args, **kwargs ): if cls.instance is None : print ("创建对象,分配空间" ) cls.instance = super ().__new__(cls) return cls.instance def __init__ (self, music_name ): print ("播放器初始化" ) self.music_name = music_name player1 = MusicPlayer('弯弯的月亮' ) print (player1.music_name)player2 = MusicPlayer('竹楼情歌' ) print (player2.music_name)print (player1)print (player2)''' 创建对象,分配空间 播放器初始化 弯弯的月亮 播放器初始化 竹楼情歌 <__main__.MusicPlayer object at 0x7f4c18f16790> <__main__.MusicPlayer object at 0x7f4c18f16790> '''
异常 观其大略 在实际开发中,为了能够处理复杂的异常情况,完整的异常语法如下(伪代码):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 try : pass except 错误类型 1 : pass except 错误类型 2 : pass except (错误类型 3 , 错误类型 4 ): pass except Exception as result: print (result) else : pass finally : print ("无论是否有异常,都会执行的代码" )
异常的传递 当函数执行出现异常,会将异常传递给函数的调用一方。如果传递到主程序,仍然没有异常处理,程序才会被终止。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def demo1 (): return int (input ("输入整数:" )) def demo2 (): return demo1() try : print (demo2()) except Exception as result: print ("未知错误 %s" % result) ''' 输入整数:abc 未知错误 invalid literal for int() with base 10: 'abc' '''
抛出 raise 异常 应用场景: 在开发中,除了代码执行出错 Python 解释器会抛出异常之外,还可以根据应用程序特有的业务需求主动抛出异常。
示例: 提示用户输入密码,如果长度少于 8,抛出异常。当前函数只负责提示用户输入密码,如果密码长度不正确,需要其他的函数进行额外处理,因此可以抛出异常,由其他需要处理的函数捕获异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def input_password (): pwd = input ("请输入密码:" ) if len (pwd) >= 8 : return pwd print ("主动抛出异常" ) ex = Exception("密码长度不够" ) raise ex try : print (input_password()) except Exception as result: print (result) ''' 请输入密码:123 主动抛出异常 密码长度不够 ''' ''' 请输入密码:1234234545 1234234545 '''
让自定义异常像通用异常一样使用(若链接内容不可访问,可去 archive.ph 查找):https://python3-cookbook.readthedocs.io/zh-cn/latest/c14/p08_creating_custom_exceptions.html
模块和包 原则:每一个文件都可以被导入。
一个独立的 Python 文件就是一个模块。在导入文件时,文件中所有没有任何缩进的代码,都会被执行一遍。因此需要配合 __name__
属性使用。
包是一个包含多个模块的特殊目录。 目录下有一个特殊的文件 __init__.py
在外界使用包中的模块:
如果希望自己开发的模块,分享给其他人,可以按照以下步骤操作:
制作发布压缩包
创建 setup.py
构建模块
生成发布压缩包
安装模块
创建 setup.py
:
构建模块:
生成发布压缩包(注意:要制作哪个版本的模块,就使用哪个版本的解释器执行):
安装模块(示例):
1 2 3 tar xf wd_message-1.0.tar.gz cd wd_message-1.0 sudo python3 setup.py install
卸载模块(直接从安装目录下,把安装模块的目录删除就可以):
1 2 cd /usr/local/lib/python3.6/dist-packages/ sudo rm -r wd_message*
文件操作 没什么好记录的,大致和 Cpp 差不多。
展示目录:
1 2 3 4 5 6 7 8 9 10 11 import osdef dir_dfs (path, width ): file_list = os.listdir(path) for filename in file_list: print (' ' *width + filename) if os.path.isdir(path+'/' +filename): dir_dfs(path+'/' +filename, width+4 ) if __name__ == '__main__' : dir_dfs('.' , 0 )
手撕红黑树 之前文章里写过平衡二叉树(虽然没写完),一些旋转操作的思路是通的。
pygame 游戏素材在本小节末尾放出。
验证是否安装成功:
1 python -m pygame.examples.aliens
游戏中的坐标系:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import pygamepygame.init() hero_rect = pygame.Rect(100 , 500 , 120 , 125 ) print ("英雄的原点 %d %d" % (hero_rect.x, hero_rect.y))print ("英雄的尺寸 %d %d" % (hero_rect.width, hero_rect.height))print ("%d %d" % hero_rect.size)pygame.quit() ''' pygame 2.6.0 (SDL 2.28.4, Python 3.12.1) Hello from the pygame community. https://www.pygame.org/contribute.html 英雄的原点 100 500 英雄的尺寸 120 125 120 125 '''
初始化游戏显示窗口:
1 pygame.display.set_mode()
刷新屏幕内容显示:
set_mode 方法:
作用:创建游戏显示窗口
resolution 指定屏幕的 宽 和 高,默认创建的窗口大小和屏幕大小一致
flags 参数指定屏幕的附加选项,例如是否全屏等等,默认不需要传递
depth 参数表示颜色的位数,默认自动匹配
返回值 :暂时可以理解为游戏的屏幕,游戏的元素 都需要被绘制到游戏的屏幕上
注意 :必须使用变量记录 set_mode 方法的返回结果!因为:后续所有的图像绘制都基于这个返回结果
1 set_mode(resolution=(0 ,0 ), flags=0 , depth=0 ) -> Surface
图像、游戏循环、游戏时钟、监听事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import pygamepygame.init() screen = pygame.display.set_mode((480 , 700 )) bg = pygame.image.load("./images/background.png" ) screen.blit(bg, (0 ,0 )) hero = pygame.image.load("./images/me1.png" ) screen.blit(hero, (150 ,300 )) pygame.display.update() clock = pygame.time.Clock() hero_rect = pygame.Rect(150 , 300 , 102 , 126 ) while True : clock.tick(60 ) event_list = pygame.event.get() if len (event_list) > 0 : print (event_list) for event in event_list: if event.type == pygame.QUIT: print ("游戏退出" ) pygame.quit() exit() hero_rect.y -= 1 if (hero_rect.bottom < 0 ): hero_rect.y = 700 screen.blit(bg, (0 ,0 )) screen.blit(hero, hero_rect) pygame.display.update()
在刚刚完成的案例中,图像加载、位置变化、绘制图像都需要程序员编写代码分别处理,为了简化开发步骤,pygame 提供了两个类:
pygame.sprite.Sprite
存储图像数据 image 和 位置 rect 的对象
pygame.sprite.Group
精灵 :
在游戏开发中,通常把显示图像的对象叫做精灵 Sprite
精灵有两个重要的属性
image
要显示的图像
rect
图像要显示在屏幕的位置
默认的 update()
方法什么也没做。子类可以重写此方法 ,在每次刷新屏幕时,更新精灵位置
注意 :pygame.sprite.Sprite
并没有提供 image 和 rect 两个属性。需要程序员从 pygame.sprite.Sprite 派生子类,并在子类的初始化方法中,设置 image 和 rect 属性。
精灵组 :
一个精灵组 可以包含多个精灵 对象
调用精灵组 对象的 update()
方法可以自动调用组内每一个精灵 的 update()
方法
调用精灵组对象的 draw
方法可以将组内每一个精灵的 image 绘制在 rect 位置
注意 :仍然需要调用 pygame.display.update()
才能在屏幕看到最终结果
背景交替滚动的实现思路:
pygame 提供了两个非常方便的方法实现碰撞检测:
pygame.sprite.groupcollide()
两个精灵组 中所有精灵的碰撞检测
pygame.sprite.spritecollide()
判断某个精灵 和指定精灵组中的精灵的碰撞
1 2 3 groupcollide(group1, group2, dokill1, dokill2, collided = None ) -> Sprite_dict
1 2 3 4 spritecollide(sprite, group, dokill, collided = None ) -> Sprite_list
整体的代码相当简洁,共两个代码文件和若干素材文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 import randomimport pygameSCREEN_RECT = pygame.Rect(0 , 0 , 480 , 700 ) FRAME_PER_SEC = 15 CREATE_ENEMY_EVENT = pygame.USEREVENT HERO_FIRE_EVENT = pygame.USEREVENT + 1 class GameSprite (pygame.sprite.Sprite): """飞机大战游戏精灵""" def __init__ (self, image_name, speed=1 ): super ().__init__() self.image = pygame.image.load(image_name) self.rect = self.image.get_rect() self.speed = speed def update (self ): self.rect.y += self.speed class Background (GameSprite ): """游戏背景精灵""" def __init__ (self, is_alt=False ): super ().__init__("./images/background.png" ) if is_alt: self.rect.y = -self.rect.height def update (self ): super ().update() if self.rect.y >= SCREEN_RECT.height: self.rect.y = -self.rect.height class Enemy (GameSprite ): """敌机精灵""" def __init__ (self ): super ().__init__("./images/enemy1.png" ) self.speed = random.randint(1 , 3 ) self.rect.bottom = 0 max_x = SCREEN_RECT.width - self.rect.width self.rect.x = random.randint(0 , max_x) def update (self ): super ().update() if self.rect.y >= SCREEN_RECT.height: self.kill() def __del__ (self ): pass class Hero (GameSprite ): """英雄精灵""" def __init__ (self ): super ().__init__("./images/me1.png" , 0 ) self.rect.centerx = SCREEN_RECT.centerx self.rect.bottom = SCREEN_RECT.bottom - 120 self.bullets = pygame.sprite.Group() def update (self ): self.rect.x += self.speed if self.rect.x < 0 : self.rect.x = 0 elif self.rect.right > SCREEN_RECT.right: self.rect.right = SCREEN_RECT.right def fire (self ): print ("发射子弹..." ) for i in (0 , 1 , 2 ): bullet = Bullet() bullet.rect.bottom = self.rect.y - i * 20 bullet.rect.centerx = self.rect.centerx self.bullets.add(bullet) class Bullet (GameSprite ): """子弹精灵""" def __init__ (self ): super ().__init__("./images/bullet1.png" , -5 ) def update (self ): super ().update() if self.rect.bottom < 0 : self.kill() def __del__ (self ): print ("子弹被销毁..." )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 import pygamefrom plane_sprites import *import timeclass PlaneGame (object ): """飞机大战主游戏""" def __init__ (self ): print ("游戏初始化" ) self.screen = pygame.display.set_mode(SCREEN_RECT.size) self.clock = pygame.time.Clock() self.__create_sprites() pygame.time.set_timer(CREATE_ENEMY_EVENT, 1000 ) pygame.time.set_timer(HERO_FIRE_EVENT, 300 ) def __create_sprites (self ): bg1 = Background() bg2 = Background(True ) self.back_group = pygame.sprite.Group(bg1, bg2) self.enemy_group = pygame.sprite.Group() self.hero = Hero() self.hero_group = pygame.sprite.Group(self.hero) def start_game (self ): print ("游戏开始..." ) while True : self.clock.tick(FRAME_PER_SEC) self.__event_handler() self.__check_collide() self.__update_sprites() pygame.display.update() def __event_handler (self ): for event in pygame.event.get(): if event.type == pygame.QUIT: PlaneGame.__game_over() elif event.type == CREATE_ENEMY_EVENT: enemy = Enemy() self.enemy_group.add(enemy) elif event.type == HERO_FIRE_EVENT: self.hero.fire() keys_pressed = pygame.key.get_pressed() if keys_pressed[pygame.K_RIGHT]: self.hero.speed = 8 elif keys_pressed[pygame.K_LEFT]: self.hero.speed = -8 else : self.hero.speed = 0 def __check_collide (self ): pygame.sprite.groupcollide(self.hero.bullets, self.enemy_group, True , True ) enemies = pygame.sprite.spritecollide(self.hero, self.enemy_group, True ) if len (enemies) > 0 : self.hero.kill() m = "./sound/use_bomb.wav" pygame.mixer.music.load(m) pygame.mixer.music.play() time.sleep(2 ) PlaneGame.__game_over() def __update_sprites (self ): self.back_group.update() self.back_group.draw(self.screen) self.enemy_group.update() self.enemy_group.draw(self.screen) self.hero_group.update() self.hero_group.draw(self.screen) self.hero.bullets.update() self.hero.bullets.draw(self.screen) @staticmethod def __game_over (): print ("游戏结束" ) pygame.quit() exit() if __name__ == '__main__' : pygame.init() game = PlaneGame() game.start_game()
其中,图片素材、音频素材 被放在 images 和 sound 文件夹下,被组织在上面两份代码的同级目录下。
网络编程 Linux 命令 查看或配置网卡信息(ifconfig):
路由查看:route 可以查看路由。route -n
1 2 3 4 5 内核 IP 路由表 目标 网关 子网掩码 0.0.0.0 192.168.19.2 0.0.0.0 169.254.0.0 0.0.0.0 255.255.0.0 192.168.19.0 0.0.0.0 255.255.255.0
0.0.0.0
代表任意目的地,网关就是转发数据的设备。
怎样查看端口及谁使用了端口?
用 netstat -an
查看端口状态
sudo lsof -i [tcp/udp]:2425
必须是 root 才能查看
sudo lsof -i tcp:22
查看哪一个进程用了这个端口
ps -elf |grep udp_server
查看某个进程是否还在
UDP UDP 通信流程:
port 的形象理解:
看一段代码:
1 2 3 4 5 6 7 8 9 10 import sockets = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) addr = ('192.168.31.106' , 7656 ) s.bind(addr) temp = s.recvfrom(100 ) print (temp[0 ])print (temp[1 ])s.close()
运行这段代码,然后在终端中:
1 2 ~$ netstat -an|grep 7656 udp 0 0 192.168.31.106:7656 0.0.0.0:*
终止运行(则查找不到了):
1 ~$ netstat -an|grep 7656
客户端:
1 2 3 4 5 6 7 import socketc = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) dest_addr = ('192.168.31.106' , 7656 ) c.sendto(b'hello' , dest_addr) c.close()
在两个终端中操作:
1 2 3 4 python3 server.py python3 client.py b'hello' ('192.168.31.106', 33817)
发送中文、全双工:
1 2 3 4 5 6 7 8 9 10 import sockets = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) addr = ('192.168.31.106' , 7656 ) s.bind(addr) data, c_addr = s.recvfrom(100 ) print (data.decode('utf8' ))print (c_addr)s.sendto('??????' .encode('utf8' ), c_addr) s.close()
1 2 3 4 5 6 7 8 9 import socketc = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) dest_addr = ('192.168.31.106' , 7656 ) c.sendto('zui & 罚' .encode('utf8' ), dest_addr) data , _ = c.recvfrom(100 ) print (data.decode('utf8' ))c.close()
终端:
1 2 3 $ python3 server.py zui & 罚 ('192.168.31.106', 58158)
1 2 $ python3 client.py ??????
当 UDP recvfrom
函数内填的大小,小于 client 发来的数据的大小时,Windows 会报错,Linux 会截断数据。
sendto
和 recvfrom
次数对等。
相关的命令总结:
查看 ip ifconfig
查看路由 route -n
端口状态
netstat -an|grep 端口
端口正在被哪个进程使用 sudo lsof -i udp:2000
进程查看 ps -elf|grep 进程名字
TCP 可以查看之前的笔记。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import socketdef tcp_server (): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) addr = ('192.168.31.106' , 2000 ) s.bind(addr) s.listen(128 ) handle_c_fd, c_addr = s.accept() print (c_addr) handle_c_fd.send('我是服务器' .encode('utf8' )) data = handle_c_fd.recv(100 ) print (data.decode('utf8' )) handle_c_fd.close() s.close() if __name__ == "__main__" : tcp_server()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import socketdef tcp_client (): c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) dest_addr = ('192.168.31.106' , 2000 ) c.connect(dest_addr) data = c.recv(100 ) print (data.decode('utf8' )) c.send('我是客户端' .encode('utf8' )) c.close() if __name__ == "__main__" : tcp_client()
其实之前的笔记中,代码的变量命名容易造成误解,这里的更好。
演示 TCP 特性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import socketdef tcp_server (): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) addr = ('192.168.31.106' , 2000 ) s.bind(addr) s.listen(128 ) handle_c_fd, c_addr = s.accept() print (c_addr) handle_c_fd.send('abcdefghij' .encode('utf8' )) data = handle_c_fd.recv(100 ) print (data.decode('utf8' )) handle_c_fd.close() s.close() if __name__ == "__main__" : tcp_server() ''' ('192.168.31.106', 57226) 客户端消息 '''
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import socketdef tcp_client (): c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) dest_addr = ('192.168.31.106' , 2000 ) c.connect(dest_addr) data = c.recv(5 ) print (data.decode('utf8' )) data = c.recv(5 ) print (data.decode('utf8' )) c.send('客户端消息' .encode('utf8' )) c.close() if __name__ == "__main__" : tcp_client() ''' abcde fghij '''
下面我们实现一个简易的文件下载器(实际上它可以发送 txt, jpg 等格式的文件):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 from socket import *import sysdef get_file_content (file_name ): """获取文件的内容""" try : with open (file_name, "rb" ) as f: content = f.read() return content except : print ("没有下载的文件:%s" % file_name) def main (): if len (sys.argv) != 2 : print ("请按照如下方式运行:python3 xxx.py 7890" ) return else : port = int (sys.argv[1 ]) tcp_server_socket = socket(AF_INET, SOCK_STREAM) address = ('192.168.31.106' , port) tcp_server_socket.bind(address) tcp_server_socket.listen(128 ) while True : client_socket, clientAddr = tcp_server_socket.accept() recv_data = client_socket.recv(1024 ) file_name = recv_data.decode("utf-8" ) print ("对方请求下载的文件名为:%s" % file_name) file_content = get_file_content(file_name) if file_content: client_socket.send(file_content) client_socket.close() tcp_server_socket.close() if __name__ == "__main__" : main()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 from socket import *def main (): tcp_client_socket = socket(AF_INET, SOCK_STREAM) server_ip = '192.168.31.106' server_port = int (input ("请输入服务器 port:" )) tcp_client_socket.connect((server_ip, server_port)) file_name = input ("请输入要下载的文件名:" ) tcp_client_socket.send(file_name.encode("utf-8" )) recv_data = b"" while True : chunk = tcp_client_socket.recv(1024 ) if not chunk: break recv_data += chunk if recv_data: with open ("[接收]" +file_name, "wb" ) as f: f.write(recv_data) tcp_client_socket.close() if __name__ == "__main__" : main()
注意:若一次性接收数据,网络传输的数据量不一定能一次性到达。 即使将接收缓冲区的大小设置为 102440000,仍可能出现问题。这是因为 TCP 数据传输的机制不是一次性发送所有数据(尤其在网络延迟或带宽有限的情况下),实际上,数据是以小块的形式分批到达的。*因此需要循环接收数据,直到完整地接收到文件的全部内容。
提问:TCP 为什么要三次握手? 省流 :两次握手不足够,会造成 server 死锁。(可以分两种情况解释)详细 : 假设我们设计为两次握手。 (一) A(client) 机要连到 B(server) 机,但连接信息在网络中延误了。 于是A 机重新发送,这次 B 收到了,于是 B 发信息回 A,两机建立连接。传输完毕后,断开连接。 但此时,一开始的连接信息到达 B 机,于是 B 机发信息给 A,此时 B 机就认为已和 A 建立连接,B 机就等待 A 传数据过来,永远地等待在 recv 接口上。发生死锁。 (二) C 给 S 发送一个连接请求分组,S 收到此分组,并发送确认应答分组。按照两次握手的协定,S 认为连接已经建立,可以开始发送数据分组。 但是,S 的应答分组在传输中丢失。 C 不知道 S 是否已准备好,不知道 S 建议什么样的序列号,不知道 S 是否收到自己的连接请求分组。在这种情况下,C 认为连接还未建立成功,将忽略 S 发来的任何数据分组,只等待连接确认应答分组。而 S 在发出的数据分组超时后,重复发送同样的分组。这样就形成了死锁。
抓包 安装 wireshark :
windows 和 macos 安装比较容易,linux 安装参考:https://www.wireshark.org/docs/wsug_html_chunked/ChapterBuildInstall.html
在 Debian 系统下运行 Wireshark 时遇到 “permission denied” 问题,通常是由于普通用户没有足够的权限捕获网络接口上的数据包。Wireshark 需要更高的权限来访问网络设备。
在安装过程中,系统会询问是否允许非 root 用户捕获数据包。选择“是”。
将当前用户添加到 wireshark 组
Wireshark 使用 wireshark 组来管理捕获数据包的权限。将当前用户添加到 wireshark 组中:
1 sudo usermod -aG wireshark $USER
然后,重新登录以使组更改生效,或运行以下命令刷新当前会话的用户组信息:
重新配置 dumpcap 权限
Wireshark 使用一个名为 dumpcap 的工具来捕获数据包。需要确保它拥有合适的权限,以便普通用户可以使用它:
1 2 sudo chmod +x /usr/bin/dumpcap sudo setcap cap_net_raw,cap_net_admin=eip /usr/bin/dumpcap
运行以下命令来确认 dumpcap 具有正确的权限:
输出应该类似于:
1 /usr/bin/dumpcap = cap_net_admin,cap_net_raw+eip
现在应该可以在普通用户模式下正常运行 Wireshark 而不会遇到“permission denied”错误。直接运行:
TCP 数据报头 四次挥手(这个图只是为了展示 ACK 在哪):
ACK 位置 1 表明确认号是合法的。如果 ACK 为 0,那么数据报不包含确认信息,确认字段被省略。
SYN:用于建立连接。当 SYN=1 时,表示发起一个连接请求。
FIN:用于释放连接。当 FIN=1 时,表明此报文段的发送端的数据已发送完成,并要求释放连接。
UDP 代替 TCP 相比而言,UDP 能传输更多的数据(一种典型情况:1472 > 1460),这在经济上的好处是显而易见的(带宽很贵);另外,UDP 在无线环境下表现比 TCP 更好。因此有不少用 UDP 代替 TCP 的研究。
可以使用 UDP 模仿 TCP ,但是需要程序员自己设计应用层协议,实现相关功能,缓存、加序列号、重传等。
CS、BS 模式 CS 模式: client/server 模式。
客户端位于目标主机上可以保证性能,将数据缓存至客户端本地,从而提高数据传输效率 。
一般来说客户端和服务器程序由一个开发团队创作,所以他们之间所采用的协议相对灵活 。
客户端和服务器都需要有一个开发团队来完 成开发。工作量将成倍提升,开发周期较长 。
从用户角度出发,需要将客户端安插至用户主机上,对用户主机的安全性构成威胁 。
BS 模式: browser/server 模式。
没有独立的客户端,使用标准浏览器作为客户端,其工作开发量较小 。
移植性非常好,不受平台限制 。
由于使用第三方浏览器,因此网络应用支持受限 。
没有客户端放到对方主机上,缓存数据不尽如人意,从而传输数据量受到限制 。
采用标准 http 协议进行通信,协议选择不灵活 。
tcp 长连接和短连接 TCP 短连接:
client 向 server 发起连接请求
server 接到请求,双方建立连接
client 向 server 发送消息
server 回应 client
一次读写完成,此时双方任何一个都可以发起 close 操作
TCP 长连接:
client 向 server 发起连接
server 接到请求,双方建立连接
client 向 server 发送消息
server 回应 client
一次读写完成,连接不关闭
后续读写操作…
长时间操作之后 client 发起关闭请求
epoll 使用 epoll 实现的小型对话程序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import socketimport selectimport sysdef tcp_server (): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) addr = ('192.168.31.106' , 5379 ) s.bind(addr) s.listen(128 ) handle_c_fd, c_addr = s.accept() print (c_addr) epolL = select.epoll() epolL.register(handle_c_fd.fileno(), select.EPOLLIN) epolL.register(sys.stdin.fileno(), select.EPOLLIN) while True : events = epolL.poll(-1 ) for i, _ in events: if i==handle_c_fd.fileno(): data = handle_c_fd.recv(100 ) if data: print (data.decode('utf8' )) else : print ('对方断开' ) return elif i==sys.stdin.fileno(): data = input () handle_c_fd.send(data.encode('utf8' )) handle_c_fd.close() s.close() if __name__ == "__main__" : tcp_server()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import socketimport selectimport sysdef tcp_client (): c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) dest_addr = ('192.168.31.106' , 5379 ) c.connect(dest_addr) epolL = select.epoll() epolL.register(c.fileno(), select.EPOLLIN) epolL.register(sys.stdin.fileno(), select.EPOLLIN) while True : events = epolL.poll(-1 ) for i, _ in events: if i==c.fileno(): data = c.recv(100 ) if data: print (data.decode('utf8' )) else : print ('对方断开' ) return elif i==sys.stdin.fileno(): data = input () c.send(data.encode('utf8' )) c.close() if __name__ == "__main__" : tcp_client()
改进,client 断开还可以再次连接:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import socketimport selectimport sysdef tcp_server (): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) addr = ('192.168.31.106' , 5379 ) s.bind(addr) s.listen(128 ) epolL = select.epoll() epolL.register(s.fileno(), select.EPOLLIN) epolL.register(sys.stdin.fileno(), select.EPOLLIN) while True : events = epolL.poll(-1 ) for i, event in events: if i==s.fileno(): handle_c_fd, c_addr = s.accept() print (c_addr) epolL.register(handle_c_fd.fileno(), select.EPOLLIN) if i==handle_c_fd.fileno(): data = handle_c_fd.recv(100 ) if data: print (data.decode('utf8' )) else : print ('对方断开' ) epolL.unregister(handle_c_fd.fileno()) handle_c_fd.close() break elif i==sys.stdin.fileno(): try : data = input () except EOFError: print ('I want to go.' ) return handle_c_fd.send(data.encode('utf8' )) s.close() if __name__ == "__main__" : tcp_server()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import socketimport selectimport sysdef tcp_client (): c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) dest_addr = ('192.168.31.106' , 5379 ) c.connect(dest_addr) epolL = select.epoll() epolL.register(c.fileno(), select.EPOLLIN) epolL.register(sys.stdin.fileno(), select.EPOLLIN) while True : events = epolL.poll(-1 ) for i, _ in events: if i==c.fileno(): data = c.recv(100 ) if data: print (data.decode('utf8' )) else : print ('对方断开' ) return elif i==sys.stdin.fileno(): data = input () c.send(data.encode('utf8' )) c.close() if __name__ == "__main__" : tcp_client()
现在我们实现一个聊天室,client 程序保持不变,server 端的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import socketimport selectimport sysdef tcp_server (): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) addr = ('192.168.31.106' , 5379 ) s.bind(addr) s.listen(128 ) epolL = select.epoll() epolL.register(s.fileno(), select.EPOLLIN) epolL.register(sys.stdin.fileno(), select.EPOLLIN) client_list = [] while True : events = epolL.poll(-1 ) for i, event in events: if i==s.fileno(): handle_c_fd, c_addr = s.accept() print (c_addr) client_list.append(handle_c_fd) epolL.register(handle_c_fd.fileno(), select.EPOLLIN) else : remove_client = None for client in client_list: if client.fileno()==i: data = client.recv(100 ) if data: for others in client_list: if others is client: pass else : others.send(data) else : remove_client = client if remove_client: client_list.remove(remove_client) epolL.unregister(remove_client.fileno()) remove_client.close() if __name__ == "__main__" : tcp_server()
端口复用 :
1 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 )
btw, send()
函数可选 MSG_DONTWAIT
参数,效果是:即使对方没法接受这次要发送的全部信息(缓冲区不够用了),也强行发送,多余部分直接截断。
协议设计 考虑这样的场景:我们需要持续发送多个文件。
这会带来一个问题,这些多个文件在传输过程中,会以字节流的形式变成:
文件名1 + 文件1内容 + 文件名2 + 文件2内容 + …
这样我们就没办法区分出各个内容了,这种现象叫做沾包 。
为此,我们需要设计协议。一个自然的想法是: (长度)&(内容)+(长度)&(内容)+ …
会用到这张表:
例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import structimport osimport timetrain_content = '爱上地方传奇' .encode('utf8' ) train_head = len (train_content) print (train_head)print (type (train_head))print ('-' *50 )train_head_bytes = struct.pack('I' , train_head) print ((train_head_bytes))b = struct.unpack('I' , train_head_bytes) print (b[0 ])''' 18 <class 'int'> -------------------------------------------------- b'\x12\x00\x00\x00' 18 '''
一个简单的小例子,注意这个例子里很多问题没有处理,比如并没有循环接数据(这样就没法接受大一点的文件),再比如没有考虑代码复用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 from socket import *import structdef tcp_init (): s = socket(AF_INET, SOCK_STREAM) addr = ('192.168.31.106' , 2000 ) s.bind(addr) s.listen(128 ) return s def send_file (): file_name = 'adf.txt' s = tcp_init() handle_c_fd, client_addr = s.accept() file_name_bytes = file_name.encode('utf8' ) train_head_bytes = struct.pack('I' , len (file_name_bytes)) handle_c_fd.send(train_head_bytes + file_name_bytes) f = open (file_name, 'rb' ) file_content = f.read() train_head_bytes = struct.pack('I' , len (file_content)) handle_c_fd.send(train_head_bytes + file_content) f.close() handle_c_fd.close() s.close() if __name__ == '__main__' : send_file()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from socket import *import structtcp_client_socket = socket(AF_INET, SOCK_STREAM) addr = ('192.168.31.106' , 2000 ) tcp_client_socket.connect(addr) train_head_bytes = tcp_client_socket.recv(4 ) train_content_len = struct.unpack('I' , train_head_bytes) file_name = tcp_client_socket.recv(train_content_len[0 ]) f = open ('aaa' +file_name.decode('utf8' ), 'wb' ) train_head_bytes = tcp_client_socket.recv(4 ) train_content_len = struct.unpack('I' , train_head_bytes) file_content = tcp_client_socket.recv(train_content_len[0 ]) f.write(file_content) f.close() tcp_client_socket.close()
网盘设计(一) 这是最初的一版,也是最简单的一版:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 from socket import *import structimport osclass Server : def __init__ (self, ip, port ) -> None : self.s_listen = None self.ip = ip self.port = port def tcp_init (self ): self.s_listen = socket(AF_INET, SOCK_STREAM) self.s_listen.bind((self.ip, self.port)) self.s_listen.listen(128 ) def task (self ): c_fd, c_addr = self.s_listen.accept() user = User(c_fd) user.deal_command() class User : ''' 每个 User 对象对应一个客户端 ''' def __init__ (self, handle_c_fd ) -> None : self.user_name = None self.handle_c_fd = handle_c_fd self.path = os.getcwd() def deal_command (self ): while True : command = self.recv_train().decode('utf8' ) if command[:2 ] == 'ls' : self.do_ls() elif command[:2 ] == 'cd' : self.do_cd(command) elif command[:3 ] == 'pwd' : self.do_pwd() elif command[:2 ] == 'rm' : self.do_rm(command) elif command[:4 ] == 'gets' : self.do_gets() elif command[:4 ] == 'puts' : self.do_puts() else : print ('command wrong' ) def send_train (self, send_bytes ): ''' send 火车,就是把某个字节流内容以火车形式发过去 ''' train_head_bytes = struct.pack('I' , len (send_bytes)) self.handle_c_fd.send(train_head_bytes + send_bytes) def recv_train (self ): ''' recv 火车,就是把火车 recv 的内容返回出去 ''' train_head_bytes = self.handle_c_fd.recv(4 ) train_head = struct.unpack('I' , train_head_bytes) return self.handle_c_fd.recv(train_head[0 ]) def do_ls (self ): ''' 当前路径下的信息传输给客户端 ''' data = '' for file in os.listdir(self.path): data += file + ' ' *5 + str (os.stat(file).st_size) + '\n' self.send_train(data.encode('utf8' )) def do_cd (self, command ): path = command.split()[1 ] os.chdir(path) self.path = os.getcwd() self.send_train(self.path.encode('utf8' )) def do_pwd (self ): self.send_train(self.path.encode('utf8' )) def do_rm (self, command ): pass def do_gets (self, command ): pass def do_puts (self, command ): pass if __name__ == '__main__' : server = Server('192.168.31.106' , 2000 ) server.tcp_init() server.task()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 from socket import *import structclass Client : def __init__ (self, ip, port ) -> None : self.client:socket = None self.ip = ip self.port = port def tcp_connect (self ): self.client = socket(AF_INET, SOCK_STREAM) self.client.connect((self.ip, self.port)) def send_train (self, send_bytes ): ''' send 火车,就是把某个字节流内容以火车形式发过去 ''' train_head_bytes = struct.pack('I' , len (send_bytes)) self.client.send(train_head_bytes + send_bytes) def recv_train (self ): ''' recv 火车,就是把火车 recv 的内容返回出去 ''' train_head_bytes = self.client.recv(4 ) train_head = struct.unpack('I' , train_head_bytes) return self.client.recv(train_head[0 ]) def send_command (self ): ''' 发送各种命令给服务器 ''' while True : command = input () self.send_train(command.encode('utf8' )) if command[:2 ] == 'ls' : self.do_ls() elif command[:2 ] == 'cd' : self.do_cd() elif command[:3 ] == 'pwd' : self.do_pwd() elif command[:2 ] == 'rm' : self.do_rm(command) elif command[:4 ] == 'gets' : self.do_gets() elif command[:4 ] == 'puts' : self.do_puts() else : print ('command wrong' ) def do_ls (self ): data = self.recv_train().decode('utf8' ) print (data) def do_cd (self ): print (self.recv_train().decode('utf8' )) def do_pwd (self ): print (self.recv_train().decode('utf8' )) def do_rm (self, command ): pass def do_gets (self, command ): pass def do_puts (self, command ): pass if __name__ == '__main__' : client = Client('192.168.31.106' , 2000 ) client.tcp_connect() client.send_command()
上面代码尚有功能未实现(其实就是核心的上传下载功能,不过这个在之前的代码里面有,可以直接拿过来),仅作示例。
多进程编程 前置知识 top
命令的前两行输出:
1 2 top - 16:46:57 up 1:58, 1 user, load average: 1.51, 0.91, 0.91 Tasks: 300 total, 1 running, 299 sleeping, 0 stopped, 0 zombie
平均负载(load average),一般对于单个 cpu 来说,负载在 0~1.00 之间是正常的,超过 1.00 须引起注意。在多核 cpu 中,系统平均负载不应该高于 cpu 核心的总数。
buffers 与 cached 区别:buffers 指的是块设备的读写缓冲区 ,cached 指的是页面缓存 。
查看当前窗口启动的任务情况 :
python 1.while 死循环.py &
让进程后台运行
bg
让暂停的进程在后台运行
fg
拉到前台
jobs
看后台任务
设置定时任务 :
crontab –e
设置当前用户定时任务
vim /etc/crontab
设置定时任务
crontab -l
查看当前自己设置的定时任务
多进程的简单演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from multiprocessing import Processimport timedef run_proc (): while True : print ('----2----' ) time.sleep(1 ) if __name__ == '__main__' : p = Process(target = run_proc) p.start() while True : print ('----1----' ) time.sleep(1 )
Process 语法结构 Process(group , target , name , args , kwargs)
target
如果传递了函数的引用,可以让这个子进程就执行这里的代码
args
给 target 指定的函数传递的参数,以元组的方式传递
kwargs
给 target 指定的函数传递命名参数,keyword 参数
Process 创建的实例对象的常用方法:
start()
启动子进程实例(创建子进程)
is_alive()
判断进程子进程是否还在活着
join([timeout])
是否等待子进程执行结束,或等待多少秒 — 回收子进程尸体
terminate()
不管任务是否完成,立即终止子进程
获取 pid :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from multiprocessing import Processimport osdef run_proc (): print ('我是子进程 pid = {}' .format (os.getpid())) print ('子进程结束' ) if __name__ == '__main__' : p = Process(target = run_proc) p.start() print ('我是父进程 pid = {}' .format (os.getpid())) ''' 我是父进程 pid = 6079 我是子进程 pid = 6080 子进程结束 '''
给子进程指定的函数传递参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from multiprocessing import Processdef run_proc (name, age, **kwargs ): print ('子进程 {} {} {}' .format (name, age, kwargs)) if __name__ == '__main__' : p = Process(target = run_proc, args=('xiongda' , 5 ), kwargs={'408' :120 }) p.start() p.join() print ('我是父进程' ) ''' 子进程 xiongda 5 {'408': 120} 我是父进程 '''
孤儿进程 孤儿进程:父进程退出(kill 杀死父进程),子进程变为孤儿。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from multiprocessing import Processimport osimport timedef run_proc (): print ('我是子进程 pid = {}' .format (os.getpid())) while True : time.sleep(1 ) if __name__ == '__main__' : p = Process(target = run_proc) p.start() print ('我是父进程 pid = {}' .format (os.getpid()))
terminal 1:
1 2 我是父进程 pid = 6586 我是子进程 pid = 6587
terminal 2:
1 2 3 4 5 zhiyue@168:~$ ps -elf|grep test 0 S zhiyue 6586 6574 0 80 0 - 5369 do_wai 10:35 pts/2 00:00:00 python3 -u /home/zhiyue/Documents/0928work/test.py 1 S zhiyue 6587 6586 0 80 0 - 5369 hrtime 10:35 pts/2 00:00:00 python3 -u /home/zhiyue/Documents/0928work/test.py 0 S zhiyue 6671 6646 0 80 0 - 2356 pipe_r 10:35 pts/3 00:00:00 grep test zhiyue@168:~$ kill -9 6586
terminal 1:
1 2 3 我是父进程 pid = 6586 我是子进程 pid = 6587 Killed
terminal 2:
1 2 3 zhiyue@168:~$ ps -elf|grep test 1 S zhiyue 6587 1068 0 80 0 - 5369 hrtime 10:35 pts/2 00:00:00 python3 -u /home/zhiyue/Documents/0928work/test.py 0 S zhiyue 6733 6646 0 80 0 - 2356 pipe_r 10:36 pts/3 00:00:00 grep test
僵尸进程 僵尸进程:子进程退出,父进程在忙碌,没有回收它,要避免僵尸。
Python 进程变为僵尸进程后,名字会改变。可以使用 top
查看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from multiprocessing import Processimport osimport timedef run_proc (): print ('我是子进程 pid = {}' .format (os.getpid())) print ('子进程结束' ) if __name__ == '__main__' : p = Process(target = run_proc) p.start() print ('我是父进程 pid = {}' .format (os.getpid())) while True : time.sleep(1 )
terminal:
1 2 3 4 5 我是父进程 pid = 7283 我是子进程 pid = 7284 子进程结束
是否共享全局变量 子进程创建是父进程的复制品,资源是独立使用的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 from multiprocessing import Processimport osnums = [11 , 22 ] def work1 (): print ('work1 {}' .format (os.getpid())) nums.append(33 ) print ('work1 {}' .format (nums)) def work2 (): print ('work2 {}' .format (os.getpid())) print (nums) if __name__ == '__main__' : p = Process(target=work1) p.start() p.join() print ('parent {}' .format (nums)) p = Process(target=work2) p.start() p.join() ''' work1 11308 work1 [11, 22, 33] parent [11, 22] work2 11309 [11, 22] '''
进程间通信 Process 之间有时需要通信,操作系统提供了很多机制来实现进程间的通信(例如管道,共享内存)。
1 2 3 4 5 6 7 8 9 10 11 12 from multiprocessing import Queueq = Queue(3 ) q.put(1 ) q.put(2 ) print (q.full()) q.put(3 ) print (q.full()) print (q.get())print (q.get())print (q.get())
put()
可以带参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from multiprocessing import Queueq = Queue(3 ) q.put(1 ) q.put(2 ) print (q.full()) q.put(3 ) print (q.full()) try : q.put('消息4' , True , 2 ) except : print ('消息队列已满,现有消息数量:%s' %q.qsize()) ''' False True 消息队列已满,现有消息数量:3 ''' ''' 最后一行会等待两秒才打印 '''
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from multiprocessing import Queueq = Queue(3 ) q.put(1 ) q.put(2 ) print (q.full()) q.put(3 ) print (q.full()) try : q.put('消息4' , False ) except : print ('消息队列已满,现有消息数量:%s' %q.qsize()) ''' False True 消息队列已满,现有消息数量:3 ''' ''' 最后一行会立即输出 '''
推荐的方式:先判断消息列队是否已满,再写入;读取消息时,先判断消息列队是否为空,再读取。
1 2 3 4 5 6 7 if not q.full(): q.put_nowait("消息 4" ) if not q.empty(): for i in range (q.qsize()): print (q.get_nowait())
下面实际演示两个进程间的通信:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 from multiprocessing import Queuefrom multiprocessing import Processimport timedef writer (p ): for value in ['A' , 'B' , 'C' ]: print ('Put %s to queue...' % value) q.put(value) time.sleep(1 ) def reader (p ): while True : if not q.empty(): value = q.get(True ) print ('Get %s from queue.' % value) time.sleep(2 ) else : break if __name__ == '__main__' : q = Queue(10 ) pw = Process(target=writer, args=(q,)) pr = Process(target=reader, args=(q,)) pw.start() time.sleep(1 ) pr.start() pw.join() pr.join() ''' Put A to queue... Put B to queue... Get A from queue. Put C to queue... Get B from queue. Get C from queue. '''
进程池 Python 已经帮我们做好了 dirty work, 建议爽用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 from multiprocessing.pool import Poolimport os, time, randomdef worker (msg ): t_start = time.time() print ("%s 开始执行,进程号为%d" % (msg,os.getpid())) time.sleep(random.random()*2 ) t_stop = time.time() print (msg,"执行完毕,耗时%0.2f" % (t_stop-t_start)) if __name__ == '__main__' : po = Pool(3 ) for i in range (10 ): po.apply_async(worker, (i,)) print ("----start----" ) po.close() po.join() print ("-----end-----" ) ''' ----start---- 0 开始执行,进程号为7836 1 开始执行,进程号为7837 2 开始执行,进程号为7838 0 执行完毕,耗时0.24 3 开始执行,进程号为7836 3 执行完毕,耗时0.02 4 开始执行,进程号为7836 2 执行完毕,耗时0.61 5 开始执行,进程号为7838 1 执行完毕,耗时0.70 6 开始执行,进程号为7837 6 执行完毕,耗时0.63 7 开始执行,进程号为7837 5 执行完毕,耗时0.89 8 开始执行,进程号为7838 7 执行完毕,耗时0.29 9 开始执行,进程号为7837 4 执行完毕,耗时1.55 9 执行完毕,耗时0.74 8 执行完毕,耗时1.43 -----end----- '''
进程池中的 Queue : 如果要使用 Pool 创建进程,就需要使用 multiprocessing.Manager()
中的 Queue()
,而不是 multiprocessing.Queue()
。
例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from multiprocessing import Manager,Poolimport os,time,randomdef reader (q ): print ("reader 启动(%s),父进程为(%s)" % (os.getpid(), os.getppid())) for i in range (q.qsize()): print ("reader 从 Queue 获取到消息:%s" % q.get(True )) def writer (q ): print ("writer 启动(%s),父进程为(%s)" % (os.getpid(), os.getppid())) for i in "wangdao" : q.put(i) if __name__=="__main__" : print ("(%s) start" % os.getpid()) q = Manager().Queue() po = Pool() po.apply_async(writer, (q,)) time.sleep(1 ) po.apply_async(reader, (q,)) po.close() po.join() print ("(%s) End" % os.getpid()) ''' (9168) start writer 启动(9174),父进程为(9168) reader 启动(9175),父进程为(9168) reader 从 Queue 获取到消息:w reader 从 Queue 获取到消息:a reader 从 Queue 获取到消息:n reader 从 Queue 获取到消息:g reader 从 Queue 获取到消息:d reader 从 Queue 获取到消息:a reader 从 Queue 获取到消息:o (9168) End '''
多线程 一种很类似于考研应试的加锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import threadingg_num = 0 def work1 (num ): global g_num for i in range (num): mutex.acquire() g_num += 1 mutex.release() print ('----- in work1, g_num is %d -----' %g_num) def work2 (num ): global g_num for i in range (num): mutex.acquire() g_num += 1 mutex.release() print ('----- in work2, g_num is %d -----' %g_num) if __name__ == '__main__' : mutex = threading.Lock() t1 = threading.Thread(target=work1, args=(10000000 ,)) t2 = threading.Thread(target=work2, args=(10000000 ,)) t1.start() t2.start() t1.join() t2.join() print ("2 个线程对同一个全局变量操作之后的最终结果是:%s" % g_num) ''' ----- in work1, g_num is 19992828 ---------- in work2, g_num is 20000000 ----- 2 个线程对同一个全局变量操作之后的最终结果是:20000000 '''
可迭代、迭代器、生成器 可迭代的:类中只重写了 __iter__
内置方法,就是可迭代的,这个方法要求返回一个迭代器。
迭代器:类中重写了 __iter__
,还需要重写 __next__
。
例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 from collections.abc import Iterableclass MyList : def __init__ (self ) -> None : self.container = [] def add (self, item ): self.container.append(item) def __iter__ (self ): myiterator = MyIter(self) return myiterator class MyIter : def __init__ (self, mylist ): self.mylist:MyList = mylist self.current = 0 def __next__ (self ): current = self.current self.current += 1 if current < len (self.mylist.container): return self.mylist.container[current] else : raise StopIteration def __iter__ (self ): return self if __name__ == '__main__' : mylist = MyList() mylist.add(666 ) mylist.add(667 ) mylist.add(668 ) print (isinstance (mylist, Iterable)) for i in mylist: print (i) ''' True 666 667 668 '''
我们可以采用更简便的语法,即生成器(generator) 。生成器是一类特殊的迭代器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 >>> G = ( x*2 for x in range (5 ))>>> G<generator object <genexpr> at 0x7fe8669f9630 > >>> next (G)0 >>> next (G)2 >>> next (G)4 >>> next (G)6 >>> next (G)8 >>> next (G)Traceback (most recent call last): File "<stdin>" , line 1 , in <module> StopIteration
yield
的作用:把当前上下文(寄存器的状态)保存起来,返回一个值——它后面跟的那个值。下次执行 next 操作,会从上次 yield 保存的现场位置继续执行。
例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 def fi (n ): current = 0 num1, num2 = 0 , 1 while current < n: num = num1 num1, num2 = num2, num1 + num2 current += 1 yield num return 'done' F = fi(5 ) print (next (F))print (next (F))print (next (F))print (next (F))print (next (F))print (next (F))''' 0 1 1 2 3 Traceback (most recent call last): File "/home/zhiyue/Documents/0929work/test.py", line 19, in <module> print(next(F)) ^^^^^^^ StopIteration: done '''
又或者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def fi (n ): current = 0 num1, num2 = 0 , 1 while current < n: num = num1 num1, num2 = num2, num1 + num2 current += 1 yield num return 'done' F = fi(10 ) for i in F: print (i, end = ' ' ) ''' 0 1 1 2 3 5 8 13 21 34 '''
含有 yield
的函数称为生成器。
我们除了可以使用 next()
函数来唤醒生成器继续执行外,还可以使用 send()
函数来唤醒执行。使用 send()
函数的一个好处是可以在唤醒的同时向断点处传入一个附加数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def gen (): i = 0 while i<5 : temp = yield i print (temp) i+=1 f = gen() print (next (f))f.send('haha' ) print (next (f))f.send('haha' ) ''' 0 haha None 2 haha '''
协程 引入 协程(Coroutine),又称微线程,纤程。
协程是 Python 中另一种实现多任务的方式,只不过比线程需要的资源更少。通过保存、恢复 CPU 上下文 的机制,可以在合适的时机把一个协程切换到另一个协程。
协程通过 yield
在用户态保存上下文,操作系统不感知。
实现多任务时,线程切换从系统层面远不止保存和恢复 CPU 上下文这么简单 。操作系统为了程序运行的高效性每个线程都有自己的缓存 Cache 等数据,操作系统还会支持这些数据的恢复操作。所以线程的切换非常耗性能(相对协程来说)。但是协程的切换只是单纯地操作 CPU 的上下文,所以一秒钟切换个上百万次系统都抗的住 。
模拟协程的机制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import timedef work1 (): while True : print ("----work1---" ) yield time.sleep(0.5 ) def work2 (): while True : print ("----work2---" ) yield time.sleep(0.5 ) def main (): w1 = work1() w2 = work2() while True : next (w1) next (w2) if __name__ == "__main__" : main() ''' ----work1--- ----work2--- ----work1--- ----work2--- ----work1--- ----work2--- ...(省略)... '''
greenlet 为了更好使用协程来完成多任务,python 中的 greenlet 模块对其封装,从而使得切换任务变的更加简单。
Debian 环境:
1 sudo apt install python3-greenlet
这只是一个功能非常初级的包,仅做演示目的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from greenlet import greenletimport timedef test1 (): while True : print ("---A--" ) gr2.switch() time.sleep(0.5 ) def test2 (): while True : print ("---B--" ) gr1.switch() time.sleep(0.5 ) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch() ''' ---A-- ---B-- ---A-- ---B-- ---A-- ---B-- (后略) '''
gevent 演示 greenlet 虽然实现了协程,但还需要人工切换,太麻烦。gevent 是一个比 greenlet 更强大的并且能够自动切换任务的模块。
其原理是当一个 greenlet 遇到 IO(访问网络、文件操作等)操作时,就自动切换到其他的 greenlet。等到 IO 操作完成,再适时切换回来继续执行。
例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import geventdef f (n ): for i in range (n): print (gevent.getcurrent(), i) g1 = gevent.spawn(f, 5 ) g2 = gevent.spawn(f, 5 ) g3 = gevent.spawn(f, 5 ) g1.join() g2.join() g3.join() ''' <Greenlet at 0x7f2ddd145ee0: f(5)> 0 <Greenlet at 0x7f2ddd145ee0: f(5)> 1 <Greenlet at 0x7f2ddd145ee0: f(5)> 2 <Greenlet at 0x7f2ddd145ee0: f(5)> 3 <Greenlet at 0x7f2ddd145ee0: f(5)> 4 <Greenlet at 0x7f2ddd0cd260: f(5)> 0 <Greenlet at 0x7f2ddd0cd260: f(5)> 1 <Greenlet at 0x7f2ddd0cd260: f(5)> 2 <Greenlet at 0x7f2ddd0cd260: f(5)> 3 <Greenlet at 0x7f2ddd0cd260: f(5)> 4 <Greenlet at 0x7f2ddd0a8860: f(5)> 0 <Greenlet at 0x7f2ddd0a8860: f(5)> 1 <Greenlet at 0x7f2ddd0a8860: f(5)> 2 <Greenlet at 0x7f2ddd0a8860: f(5)> 3 <Greenlet at 0x7f2ddd0a8860: f(5)> 4 '''
可以看到,3 个 greenlet 是依次运行而不是交替运行。
gevent 切换执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import geventdef f (n ): for i in range (n): print (gevent.getcurrent(), i) gevent.sleep(1 ) g1 = gevent.spawn(f, 5 ) g2 = gevent.spawn(f, 5 ) g3 = gevent.spawn(f, 5 ) g1.join() g2.join() g3.join() ''' <Greenlet at 0x7f892bb41ee0: f(5)> 0 <Greenlet at 0x7f892ba85260: f(5)> 0 <Greenlet at 0x7f892ba60860: f(5)> 0 <Greenlet at 0x7f892bb41ee0: f(5)> 1 <Greenlet at 0x7f892ba85260: f(5)> 1 <Greenlet at 0x7f892ba60860: f(5)> 1 <Greenlet at 0x7f892bb41ee0: f(5)> 2 <Greenlet at 0x7f892ba85260: f(5)> 2 <Greenlet at 0x7f892ba60860: f(5)> 2 <Greenlet at 0x7f892bb41ee0: f(5)> 3 <Greenlet at 0x7f892ba85260: f(5)> 3 <Greenlet at 0x7f892ba60860: f(5)> 3 <Greenlet at 0x7f892bb41ee0: f(5)> 4 <Greenlet at 0x7f892ba85260: f(5)> 4 <Greenlet at 0x7f892ba60860: f(5)> 4 '''
monkey 补丁 先看一下没有补丁的情况(使用了time.sleep()
):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 from gevent import monkeyimport geventimport randomimport timedef coroutine_work (coroutine_name ): for i in range (10 ): print (coroutine_name, i) time.sleep(random.random()) gevent.joinall([ gevent.spawn(coroutine_work, "work1" ), gevent.spawn(coroutine_work, "work2" ) ]) ''' work1 0 work1 1 work1 2 work1 3 work1 4 work1 5 work1 6 work1 7 work1 8 work1 9 work2 0 work2 1 work2 2 work2 3 work2 4 work2 5 work2 6 work2 7 work2 8 work2 9 '''
打上补丁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 from gevent import monkeyimport geventimport randomimport timemonkey.patch_all() def coroutine_work (coroutine_name ): for i in range (10 ): print (coroutine_name, i) time.sleep(random.random()) gevent.joinall([ gevent.spawn(coroutine_work, "work1" ), gevent.spawn(coroutine_work, "work2" ) ]) ''' work1 0 work2 0 work1 1 work1 2 work2 1 work2 2 work2 3 work1 3 work1 4 work1 5 work1 6 work2 4 work1 7 work1 8 work1 9 work2 5 work2 6 work2 7 work2 8 work2 9 ''' ''' 运行结果不唯一 '''
猴子补丁作用: monkey patch 是在执行时动态替换,通常是在 startup 的时候。用过 gevent 就会知道,会在最开头的地方 gevent.monkey.patch_all()
,把标准库中的 thread
、socket
等给替换掉。这样我们在后面使用 socket 的时候能够跟寻常一样使用,无需改动代码,就可以将它变成非堵塞的 。
官方的 asyncio 文档: https://docs.python.org/zh-cn/3.12/library/asyncio-task.html
简单示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import asyncioasync def main (): print ('hello' ) await asyncio.sleep(1 ) print ('world' ) asyncio.run(main()) ''' hello world ''' ''' 第一行之后等待了一会才打印第二行 '''
对比下面两份代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import asyncioimport timeasync def say_after (delay, what ): await asyncio.sleep(delay) print (what) async def main (): print (f"started at {time.strftime('%X' )} " ) await say_after(1 , 'hello' ) await say_after(2 , 'world' ) print (f"finished at {time.strftime('%X' )} " ) asyncio.run(main()) ''' started at 15:43:57 hello world finished at 15:44:00 '''
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import asyncioimport timeasync def say_after (delay, what ): await asyncio.sleep(delay) print (what) async def main (): task1 = asyncio.create_task( say_after(1 , 'hello' )) task2 = asyncio.create_task( say_after(2 , 'world' )) print (f"started at {time.strftime('%X' )} " ) await task1 await task2 print (f"finished at {time.strftime('%X' )} " ) asyncio.run(main()) ''' started at 15:49:46 hello world finished at 15:49:48 '''
注意,预期的输出显示代码段的运行时间比之前快了 1 秒。
并发下载器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from gevent import monkeyimport geventimport urllib.requestmonkey.patch_all() def my_downLoad (url ): print ('GET: %s' % url) resp = urllib.request.urlopen(url) data = resp.read() print ('%d bytes received from %s.' % (len (data), url)) gevent.joinall([ gevent.spawn(my_downLoad, 'http://www.baidu.com/' ), gevent.spawn(my_downLoad, 'http://www.cskaoyan.com/' ), gevent.spawn(my_downLoad, 'http://www.qq.com/' ), ]) ''' GET: http://www.baidu.com/ GET: http://www.cskaoyan.com/ GET: http://www.qq.com/ 122235 bytes received from http://www.qq.com/. 412916 bytes received from http://www.baidu.com/. 46781 bytes received from http://www.cskaoyan.com/. '''
从上能够看到,收到数据的先后顺序不一定与发送顺序相同,这体现了异步,即不确定什么时候会收到数据,顺序不一定。
网盘设计(二) 增加了下载功能,改为循环收发的设计。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 from socket import *import structimport osclass Server : def __init__ (self, ip, port ) -> None : self.s_listen = None self.ip = ip self.port = port def tcp_init (self ): self.s_listen = socket(AF_INET, SOCK_STREAM) self.s_listen.bind((self.ip, self.port)) self.s_listen.listen(128 ) def task (self ): c_fd, c_addr = self.s_listen.accept() user = User(c_fd) user.deal_command() class User : ''' 每个 User 对象对应一个客户端 ''' def __init__ (self, handle_c_fd ) -> None : self.user_name = None self.handle_c_fd = handle_c_fd self.path = os.getcwd() def deal_command (self ): while True : command = self.recv_train().decode('utf8' ) if command[:2 ] == 'ls' : self.do_ls() elif command[:2 ] == 'cd' : self.do_cd(command) elif command[:3 ] == 'pwd' : self.do_pwd() elif command[:2 ] == 'rm' : self.do_rm(command) elif command[:4 ] == 'gets' : self.do_gets(command) elif command[:4 ] == 'puts' : self.do_puts(command) else : print ('command wrong' ) def send_train (self, send_bytes ): ''' send 火车,就是把某个字节流内容以火车形式发过去 ''' train_head_bytes = struct.pack('I' , len (send_bytes)) self.handle_c_fd.send(train_head_bytes + send_bytes) def recv_train (self ): ''' recv 火车,就是把火车 recv 的内容返回出去 ''' train_head_bytes = self.handle_c_fd.recv(4 ) train_head = struct.unpack('I' , train_head_bytes) return self.handle_c_fd.recv(train_head[0 ]) def do_ls (self ): ''' 当前路径下的信息传输给客户端 ''' data = '' for file in os.listdir(self.path): data += file + ' ' *5 + str (os.stat(file).st_size) + '\n' self.send_train(data.encode('utf8' )) def do_cd (self, command ): path = command.split()[1 ] os.chdir(path) self.path = os.getcwd() self.send_train(self.path.encode('utf8' )) def do_pwd (self ): self.send_train(self.path.encode('utf8' )) def do_rm (self, command ): pass def do_gets (self, command ): fileName = command.split()[1 ] file_size = os.stat(fileName).st_size self.send_train(struct.pack('I' , file_size)) f = open (fileName, 'rb' ) while True : file_content = f.read(10000 ) if file_content: self.handle_c_fd.send(file_content) else : break f.close() def do_puts (self, command ): fileName = command.split()[1 ] pass if __name__ == '__main__' : server = Server('192.168.31.106' , 2000 ) server.tcp_init() server.task()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 from socket import *import structdef cycle_recv (somefd, file, fileSize ): total = 0 while total < fileSize: data = somefd.recv(10000 ) file.write(data) total += len (data) class Client : def __init__ (self, ip, port ) -> None : self.client:socket = None self.ip = ip self.port = port def tcp_connect (self ): self.client = socket(AF_INET, SOCK_STREAM) self.client.connect((self.ip, self.port)) def send_train (self, send_bytes ): ''' send 火车,就是把某个字节流内容以火车形式发过去 ''' train_head_bytes = struct.pack('I' , len (send_bytes)) self.client.send(train_head_bytes + send_bytes) def recv_train (self ): ''' recv 火车,就是把火车 recv 的内容返回出去 ''' train_head_bytes = self.client.recv(4 ) train_head = struct.unpack('I' , train_head_bytes) return self.client.recv(train_head[0 ]) def send_command (self ): ''' 发送各种命令给服务器 ''' while True : command = input () self.send_train(command.encode('utf8' )) if command[:2 ] == 'ls' : self.do_ls() elif command[:2 ] == 'cd' : self.do_cd() elif command[:3 ] == 'pwd' : self.do_pwd() elif command[:2 ] == 'rm' : self.do_rm(command) elif command[:4 ] == 'gets' : self.do_gets(command) elif command[:4 ] == 'puts' : self.do_puts(command) else : print ('command wrong' ) def do_ls (self ): data = self.recv_train().decode('utf8' ) print (data) def do_cd (self ): print (self.recv_train().decode('utf8' )) def do_pwd (self ): print (self.recv_train().decode('utf8' )) def do_rm (self, command ): pass def do_gets (self, command ): file_size = struct.unpack('I' , self.recv_train())[0 ] fileName = command.split()[1 ] f = open ('[接受]' + fileName, 'wb' ) cycle_recv(self.client, f, file_size) f.close() def do_puts (self, command ): fileName = command.split()[1 ] pass if __name__ == '__main__' : client = Client('192.168.31.106' , 2000 ) client.tcp_connect() client.send_command()
进一步改进的思路:
还可以考虑增加功能:
用户名、密码存储在数据库中
秒传(某用户上传其他用户已经上传过的内容不会重复存一份)
阻塞模式、非阻塞模式 本小节内容由 AIGC 改编。
阻塞模式(Blocking Mode) : 这是套接字的默认行为。在阻塞模式下,诸如 recv()
或 accept()
等操作会一直等待,直到有数据到达或操作完成为止。比如,服务器等待客户端的连接请求时,accept()
会阻塞,直到有客户端连接上。
非阻塞模式(Non-Blocking Mode) :
在非阻塞模式下,诸如 recv()
、send()
、accept()
等操作如果不能立即完成,就会抛出一个 BlockingIOError
异常。程序可以继续做其他事情,而不是在等待某个操作完成。
非阻塞模式通常用于需要处理多个连接的服务器,如那些使用 select()
或 poll()
的服务器架构。非阻塞模式的使用场景:高并发服务器、异步 I/O 等。
示例代码 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import sockettcp_server_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) tcp_server_tcp.bind(('127.0.0.1' , 8080 )) tcp_server_tcp.listen(5 ) tcp_server_tcp.setblocking(False ) while True : try : client_socket, addr = tcp_server_tcp.accept() print (f"Accepted connection from {addr} " ) except BlockingIOError: print ("No incoming connections, doing other work..." )
我们在前面已经看到,有些代码可以在没有显式设置非阻塞的情况下使用 epoll
进行事件驱动的处理,但让我们仔细探讨一下为什么在某些情况下使用非阻塞模式仍然有其必要性 ,以及在某些代码中为什么不需要。
select.epoll()
是 Linux 下的一种高级 I/O 多路复用机制,适用于高并发场景。epoll
本质上是事件驱动的,它能够监视多个文件描述符(如套接字、标准输入等),并在这些描述符有事件发生时通知程序,因此并不需要手动设置套接字为非阻塞。
epoll.poll(-1)
会阻塞并等待,直到有事件(如客户端发来的数据或服务器端的输入)发生。
当 epoll
监控到文件描述符上有可用的事件时,才会返回并让你处理数据。这种机制下,套接字的操作在正常情况下不会被阻塞,因为事件驱动会确保只有在数据准备好时才进行 I/O 操作。
为什么在这种情况下不需要非阻塞模式:
epoll
自己会处理套接字的事件,因此不需要套接字是非阻塞的。程序会在 epoll.poll()
返回时知道哪个文件描述符有事件发生,并且这些事件都准备好了进行处理(如 recv()
或 send()
),所以它本质上已经是一种避免阻塞的机制。
非阻塞模式的使用场景:
虽然这里不需要手动设置非阻塞模式,但在一些不使用 epoll
或 select
的代码中,设置非阻塞的好处包括:
单独处理 I/O :如果不使用 epoll
或 select
,并且需要继续处理其他任务时,可以通过将套接字设置为非阻塞避免等待 I/O。
配合异步框架 :在使用异步编程时(如 asyncio
),套接字通常会被设置为非阻塞模式,允许事件循环继续处理其他事件而不是被某个套接字阻塞住。
处理多个客户端 :在不使用 select
或 epoll
时,服务器如果采用阻塞模式处理 I/O,就只能一次处理一个客户端。如果设置为非阻塞模式,则可以轮询处理多个客户端的请求。
为什么在 epoll
中不需要显式设置非阻塞:
使用 epoll
监听文件描述符时,epoll
是事件驱动的,调用 recv()
或 send()
之类的操作时,事件已经准备好,因此不需要再通过设置套接字为非阻塞来避免阻塞操作。
如果要结合非阻塞和 epoll
:
在某些情况下(如需要非阻塞的连接处理时),仍然可以结合两者使用。例如,如果想在其他地方做一些操作而不想等某个文件描述符就绪,就可以设置非阻塞。
HTTP 协议头部解析 一些基本知识可以在网上找找,F12 打开看看就行。
当请求头没有 content-length 时,怎么知道请求体结束了?
http 的 header 和 body 之间是空行分割的,又因为每个头部项是以 \r\n
作为结束符,所以,数据流中是以 \r\n\r\n
来分割解析请求头(响应头)与请求体(响应体)的。如下图所示:
怎么知道(请求体)响应体结束了呢? http 协议规定,响应头的字段 content-length 用来表示响应体长度大小,但是,有可能发送请求头时,并不能知道完整的响应体长度(比如当响应数据太大,服务端流式处理的情况 ),这时需要设置请求头 Transfer-Encoding: chunked,使用数据块的方式传输,数据块格式如下图所示:
HTTP 请求的类型(挑了几个重要的):
GET
查询
POST
新增
PUT
修改
DELETE
删除
下面制作一个最简单的 web server:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import sockettcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) tcp_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 ) tcp_server_socket.bind(("192.168.31.106" , 7890 )) tcp_server_socket.listen(128 ) new_socket, client_addr = tcp_server_socket.accept() http_head=new_socket.recv(10000 ) print (http_head.decode('utf8' ))response = "HTTP/1.1 200 OK\r\n" response += "\r\n" response +='<html><h1>hello world</h1></html>' new_socket.send(response.encode('utf8' )) ''' GET / HTTP/1.1 Host: 192.168.31.106:7890 Connection: keep-alive Cache-Control: max-age=0 DNT: 1 Upgrade-Insecure-Requests: 1 User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7 Accept-Encoding: gzip, deflate Accept-Language: zh-CN,zh;q=0.9,en;q=0.8 浏览器的显示:一级标题 hello world '''
注意需要在浏览器打开 192.168.31.106:7890
(本机 IP 和对应的端口号)。
HTTP响应码总结(如果挂了可去 archive.org 查找):https://zhuanlan.zhihu.com/p/66062179
概览:
改进这个 web server :https://github.com/dropsong/py_webServer
Python 连接 MySQL 环境问题 debian12 中已经安装了 MariaDB,它会与 MySQL 发生冲突。
使用以下命令卸载 MariaDB:
1 2 3 sudo apt remove --purge mariadb-server mariadb-client mariadb-common mariadb-server-core mariadb-client-core sudo apt autoremove sudo apt autoclean
安装 MySQL :
1 2 sudo apt update sudo apt install mysql-server mysql-client
安装 pymysql:
1 sudo apt install python3-pymysql
大功告成!
实战 一个简单有效的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 from pymysql import *def main (): conn = connect(host='localhost' ,port=3306 ,database='akashi' ,user ='root' ,password='你知道的太多了' ,charset='utf8' ) cs1 = conn.cursor() count = cs1.execute('insert into goods(name, cate_name, brand_name, price, is_show, is_saleoff) values("redmibook", "笔记本", "xiaomi", 112, true, true)' ) print (count) count = cs1.execute('insert into goods(name, cate_name, brand_name, price, is_show, is_saleoff) values("honor_book", "笔记本", "huawei", 111, true, true)' ) print (count) conn.commit() cs1.close() conn.close() if __name__ == '__main__' : main()
以上代码经过验证。
查询一行数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from pymysql import *def main (): conn = connect(host='localhost' ,port=3306 ,user='root' ,password='你知道的太多了' ,database='akashi' ,charset='utf8' ) cs1 = conn.cursor() count = cs1.execute('select id,name from goods where id>=4' ) print ("查询到%d 条数据:" % count) for i in range (count): result = cs1.fetchone() print (result) cs1.close() conn.close() if __name__ == '__main__' : main()
以上代码经过验证。
查询多行数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from pymysql import *def main (): conn = connect(host='localhost' ,port=3306 ,user='root' ,password='你知道的太多了' ,database='akashi' ,charset='utf8' ) cs1 = conn.cursor() count = cs1.execute('select id,name from goods where id>=4' ) print ("查询到%d 条数据:" % count) result = cs1.fetchall() print (result) cs1.close() conn.close() if __name__ == '__main__' : main()
以上代码经过验证。