Ранее я представил пару небольших постов о потенциальной роли Spring Boot 2 в реактивном программировании. После этого я получил ряд вопросов о том, как работают асинхронные операции в программировании в целом. Сегодня я хочу разобрать, что такое Non-blocking I/O и как применить это знание для создания небольшого tcp–сервера на python, который сможет обрабатывать множество открытых и тяжелых (долгих) соединений в один поток. Знание python не требуется: все будет предельно просто со множеством комментариев. Приглашаю всех желающих!
Мне, как и многим другим разработчикам, очень нравятся эксперименты, поэтому вся последующая статья будет состоять как раз из серии экспериментов и выводов, которые они несут. Предполагается, что вы недостаточно хорошо знакомы с тематикой, и будете охотно экспериментировать со мной. Исходники примеров можно найти на github.
Начнем с написания очень простого tcp–сервера. Задача сервера будет заключаться в получении и печати данных из сокета и возвращения строки Hello from server!. Примерно так это выглядит:
import socket
# Задаем адрес сервера
SERVER_ADDRESS = ('localhost', 8686)
# Настраиваем сокет
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(SERVER_ADDRESS)
server_socket.listen(10)
print('server is running, please, press ctrl+c to stop')
# Слушаем запросы
while True:
connection, address = server_socket.accept()
print("new connection from {address}".format(address=address))
data = connection.recv(1024)
print(str(data))
connection.send(bytes('Hello from server!', encoding='UTF-8'))
connection.close()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket
MAX_CONNECTIONS = 20
address_to_server = ('localhost', 8686)
clients = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(MAX_CONNECTIONS)]
for client in clients:
client.connect(address_to_server)
for i in range(MAX_CONNECTIONS):
clients[i].send(bytes("hello from client number " + str(i), encoding='UTF-8'))
for client in clients:
data = client.recv(1024)
print(str(data))
server is running, please, press ctrl+c to stop
server is running, please, press ctrl+c to stop
new connection from ('127.0.0.1', 39196)
b'hello from client number 0'
new connection from ('127.0.0.1', 39198)
b'hello from client number 1'
...
import select
import socket
SERVER_ADDRESS = ('localhost', 8686)
# Говорит о том, сколько дескрипторов единовременно могут быть открыты
MAX_CONNECTIONS = 10
# Откуда и куда записывать информацию
INPUTS = list()
OUTPUTS = list()
def get_non_blocking_server_socket():
# Создаем сокет, который работает без блокирования основного потока
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)
# Биндим сервер на нужный адрес и порт
server.bind(SERVER_ADDRESS)
# Установка максимального количество подключений
server.listen(MAX_CONNECTIONS)
return server
def handle_readables(readables, server):
"""
Обработка появления событий на входах
"""
for resource in readables:
# Если событие исходит от серверного сокета, то мы получаем новое подключение
if resource is server:
connection, client_address = resource.accept()
connection.setblocking(0)
INPUTS.append(connection)
print("new connection from {address}".format(address=client_address))
# Если событие исходит не от серверного сокета, но сработало прерывание на наполнение входного буффера
else:
data = ""
try:
data = resource.recv(1024)
# Если сокет был закрыт на другой стороне
except ConnectionResetError:
pass
if data:
# Вывод полученных данных на консоль
print("getting data: {data}".format(data=str(data)))
# Говорим о том, что мы будем еще и писать в данный сокет
if resource not in OUTPUTS:
OUTPUTS.append(resource)
# Если данных нет, но событие сработало, то ОС нам отправляет флаг о полном прочтении ресурса и его закрытии
else:
# Очищаем данные о ресурсе и закрываем дескриптор
clear_resource(resource)
def clear_resource(resource):
"""
Метод очистки ресурсов использования сокета
"""
if resource in OUTPUTS:
OUTPUTS.remove(resource)
if resource in INPUTS:
INPUTS.remove(resource)
resource.close()
print('closing connection ' + str(resource))
def handle_writables(writables):
# Данное событие возникает когда в буффере на запись освобождается место
for resource in writables:
try:
resource.send(bytes('Hello from server!', encoding='UTF-8'))
except OSError:
clear_resource(resource)
if __name__ == '__main__':
# Создаем серверный сокет без блокирования основного потока в ожидании подключения
server_socket = get_non_blocking_server_socket()
INPUTS.append(server_socket)
print("server is running, please, press ctrl+c to stop")
try:
while INPUTS:
readables, writables, exceptional = select.select(INPUTS, OUTPUTS, INPUTS)
handle_readables(readables, server_socket)
handle_writables(writables)
except KeyboardInterrupt:
clear_resource(server_socket)
print("Server stopped! Thank you for using!")
server is running, please, press ctrl+c to stop
new connection from ('127.0.0.1', 56608)
new connection from ('127.0.0.1', 56610)
new connection from ('127.0.0.1', 56612)
new connection from ('127.0.0.1', 56614)
new connection from ('127.0.0.1', 56616)
new connection from ('127.0.0.1', 56618)
new connection from ('127.0.0.1', 56620)
new connection from ('127.0.0.1', 56622)
new connection from ('127.0.0.1', 56624)
getting data: b'hello from client number 0'
new connection from ('127.0.0.1', 56626)
getting data: b'hello from client number 1'
getting data: b'hello from client number 2'
К сожалению, не доступен сервер mySQL