
IO 多路復用
就是我們說的select,poll,epoll,有些地方也稱這種IO方式為event driven IO。
select/epoll的好處就在于單個process就可以同時處理多個網絡連接的IO。
它的基本原理就是select,poll,epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有數據到達了,就通知用戶進程。
epoll簡單模型
import socket
import select
# 創建套接字
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 設置可以重復使用綁定的信息
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
# 綁定本機信息
s.bind(("",7788))
# 變為被動
s.listen(10)
# 創建一個epoll對象
epoll = select.epoll()
# 測試,用來打印套接字對應的文件描述符
# print(s.fileno())
# print(select.EPOLLIN|select.EPOLLET)
# 注冊事件到epoll中
# epoll.register(fd[, eventmask])
# 注意,如果fd已經注冊過,則會發生異常
# 將創建的套接字添加到epoll的事件監聽中
epoll.register(s.fileno(), select.EPOLLIN|select.EPOLLET)
connections = {}
addresses = {}
# 循環等待客戶端的到來或者對方發送數據
while True:
# epoll 進行 fd 掃描的地方 -- 未指定超時時間則為阻塞等待
epoll_list = epoll.poll()
# 對事件進行判斷
for fd, events in epoll_list:
# print fd
# print events
# 如果是socket創建的套接字被激活
if fd == s.fileno():
new_socket, new_addr = s.accept()
print('有新的客戶端到來%s' % str(new_addr))
# 將 conn 和 addr 信息分別保存起來
connections[new_socket.fileno()] = new_socket
addresses[new_socket.fileno()] = new_addr
# 向 epoll 中注冊 新socket 的 可讀 事件
epoll.register(new_socket.fileno(), select.EPOLLIN|select.EPOLLET)
# 如果是客戶端發送數據
elif events == select.EPOLLIN:
# 從激活 fd 上接收
recvData = connections[fd].recv(1024).decode("utf-8")
if recvData:
print('recv:%s' % recvData)
else:
# 從 epoll 中移除該 連接 fd
epoll.unregister(fd)
# server 側主動關閉該 連接 fd
connections[fd].close()
print("%s---offline---" % str(addresses[fd]))
del connections[fd]
del addresses[fd]
說明
EPOLLIN (可讀)
EPOLLOUT (可寫)
EPOLLET (ET模式)
epoll對文件描述符的操作有兩種模式:LT(level trigger)和ET(edge trigger)。LT模式是默認模式,LT模式與ET模式的區別如下:
LT模式:當epoll檢測到描述符事件發生并將此事件通知應用程序,應用程序可以不立即處理該事件。下次調用epoll時,會再次響應應用程序并通知此事件。 ET模式:當epoll檢測到描述符事件發生并將此事件通知應用程序,應用程序必須立即處理該事件。如果不處理,下次調用epoll時,不會再次響應應用程序并通知此事件。
web靜態服務器-epool
以下代碼,支持http的長連接,即使用了Content-Length
import socket
import time
import sys
import re
import select
class WSGIServer(object):
"""定義一個WSGI服務器的類"""
def __init__(self, port, documents_root):
# 1. 創建套接字
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 2. 綁定本地信息
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind(("", port))
# 3. 變為監聽套接字
self.server_socket.listen(128)
self.documents_root = documents_root
# 創建epoll對象
self.epoll = select.epoll()
# 將tcp服務器套接字加入到epoll中進行監聽
self.epoll.register(self.server_socket.fileno(), select.EPOLLIN|select.EPOLLET)
# 創建添加的fd對應的套接字
self.fd_socket = dict()
def run_forever(self):
"""運行服務器"""
# 等待對方鏈接
while True:
# epoll 進行 fd 掃描的地方 -- 未指定超時時間則為阻塞等待
epoll_list = self.epoll.poll()
# 對事件進行判斷
for fd, event in epoll_list:
# 如果是服務器套接字可以收數據,那么意味著可以進行accept
if fd == self.server_socket.fileno():
new_socket, new_addr = self.server_socket.accept()
# 向 epoll 中注冊 連接 socket 的 可讀 事件
self.epoll.register(new_socket.fileno(), select.EPOLLIN | select.EPOLLET)
# 記錄這個信息
self.fd_socket[new_socket.fileno()] = new_socket
# 接收到數據
elif event == select.EPOLLIN:
request = self.fd_socket[fd].recv(1024).decode("utf-8")
if request:
self.deal_with_request(request, self.fd_socket[fd])
else:
# 在epoll中注銷客戶端的信息
self.epoll.unregister(fd)
# 關閉客戶端的文件句柄
self.fd_socket[fd].close()
# 在字典中刪除與已關閉客戶端相關的信息
del self.fd_socket[fd]
def deal_with_request(self, request, client_socket):
"""為這個瀏覽器服務器"""
if not request:
return
request_lines = request.splitlines()
for i, line in enumerate(request_lines):
print(i, line)
# 提取請求的文件(index.html)
# GET /a/b/c/d/e/index.html HTTP/1.1
ret = re.match(r"([^/]*)([^ ]+)", request_lines[0])
if ret:
print("正則提取數據:", ret.group(1))
print("正則提取數據:", ret.group(2))
file_name = ret.group(2)
if file_name == "/":
file_name = "/index.html"
# 讀取文件數據
try:
f = open(self.documents_root+file_name, "rb")
except:
response_body = "file not found, 請輸入正確的url"
response_header = "HTTP/1.1 404 not foundrn"
response_header += "Content-Type: text/html; charset=utf-8rn"
response_header += "Content-Length: %drn" % len(response_body)
response_header += "rn"
# 將header返回給瀏覽器
client_socket.send(response_header.encode('utf-8'))
# 將body返回給瀏覽器
client_socket.send(response_body.encode("utf-8"))
else:
content = f.read()
f.close()
response_body = content
response_header = "HTTP/1.1 200 OKrn"
response_header += "Content-Length: %drn" % len(response_body)
response_header += "rn"
# 將數據返回給瀏覽器
client_socket.send(response_header.encode("utf-8")+response_body)
# 設置服務器服務靜態資源時的路徑
DOCUMENTS_ROOT = "./html"
def main():
"""控制web服務器整體"""
# Python3 xxxx.py 7890
if len(sys.argv) == 2:
port = sys.argv[1]
if port.isdigit():
port = int(port)
else:
print("運行方式如: python3 xxx.py 7890")
return
print("http服務器使用的port:%s" % port)
http_server = WSGIServer(port, DOCUMENTS_ROOT)
http_server.run_forever()
if __name__ == "__main__":
main()
小總結
I/O 多路復用的特點:
通過一種機制使一個進程能同時等待多個文件描述符,而這些文件描述符(套接字描述符)其中的任意一個進入讀就緒狀態,epoll()函數就可以返回。 所以, IO多路復用,本質上不會有并發的功能,因為任何時候還是只有一個進程或線程進行工作,它之所以能提高效率是因為selectepoll 把進來的socket放到他們的 ‘監視’ 列表里面,當任何socket有可讀可寫數據立馬處理,那如果selectepoll 手里同時檢測著很多socket, 一有動靜馬上返回給進程處理,總比一個一個socket過來,阻塞等待,處理高效率。
當然也可以多線程/多進程方式,一個連接過來開一個進程/線程處理,這樣消耗的內存和進程切換頁會耗掉更多的系統資源。 所以我們可以結合IO多路復用和多進程/多線程 來高性能并發,IO復用負責提高接受socket的通知效率,收到請求后,交給進程池/線程池來處理邏輯。






