Создаём сокет-сервер на Python

6 min read

Часто возникает задача разработки системы, включающей в себя несколько независимых модулей. Эти модули могут быть расположены на разных компьютерах и выполнять трудоёмкие задачи. Для реализации взаимодействия между ними используются брокеры сообщений (например, rabbitmq). В этой статье мы напишем простейшего самодельного брокера на основе обычных сокетов.

Создадим приложение состоящее из двух пользовательских скриптов, осуществляющих коммуникацию друг с другом через сокеты.

Первым делом следует прочесть официальную документацию Python по созданию Socket сервера. Наиболее интересен раздел 21.21.4.3. Asynchronous Mixins. Обратите внимание, что используется 3-я версия языка Python.

Создайте файл codex_queue.py и заполните его заготовкой из статьи.

# codex_queue.py import threading import socketserver class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): pass class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): def handle(self): data = self.request.recv(1024) print(data)

Каждое приложение будет иметь собственную очередь сообщений, в которую ей можно будет отправить данные. По мере необходимости приложение берёт сообщения из этой очереди и обрабатывает их.

Очереди сообщений
# продолжение файла codex_queue.py class Queue: def __init__(self, ip, port): self.server = ThreadedTCPServer((ip, port), ThreadedTCPRequestHandler) self.server.queue = self self.server_thread = threading.Thread(target=self.server.serve_forever) self.server_thread.daemon = True self.messages = []

В конструкторе __init__ происходит инициализация сокет-сервера для работы в отдельном потоке.

self.messages – массив сообщений. Та самая очередь.

self.server.queue = self

Данный код записывает указатель на объект очереди в сокет-сервер, чтобы к нему можно было обращаться из другого потока.

Допишем в класс вспомогательные функции для запуска и остановки сервера, а также для работы с очередью.

# продолжение файла codex_queue.py def start_server(self): self.server_thread.start() print("Server loop running in thread:", self.server_thread.name) def stop_server(self): self.server.shutdown() self.server.server_close() def add(self, message): self.messages.append(message) def view(self): return self.messages def get(self): return self.messages.pop() def exists(self): return len(self.messages)

Класс очереди готов. Скачать его полную версию можно по ссылке.

Теперь приступим к написанию сервера, который будет работать с этой очередью. Создадим файл server.py.

# файл server.py import socket import time from codex_queue import Queue class Server: def __init__(self, ip, port): self.queue = Queue(ip, port) def start_server(self): self.queue.start_server() def stop_server(self): self.queue.stop_server()

Дополнительно понадобится функция loop, которая будет в цикле проверять есть ли данные в очереди, забирать оттуда один элемент и передавать на обработку в метод handle.

def loop(self): while True: time.sleep(1) while self.queue.exists(): self.handle(self.queue.get()) def handle(self, message): """ Prototype """ pass

Готовый файл вы можете скачать по ссылке.

Теперь можно создать произвольное количество сервисов, наследуя их от класса Server и переопределяя метод handle. Мы остановимся для примера на двух скриптах – Alice и Bob. Боб будет извлекать URL сайтов из сообщения в JSON формате, делать HTTP запрос по полученному адресу и отправлять Алисе коды HTTP ответов. Алиса же просто будет выводить их на экран.

Создадим файл alice.py.

from server import Server class Alice(Server): def handle(self, message): try: print("Got: {}".format(message)) except Exception as e: print("Error: {}".format(e)) if __name__ == "__main__": print("Alice started.") app = Alice("localhost", 8889) app.start_server() app.loop() app.stop_server()

Запустите этот скрипт командой python alice.py. Откройте отдельную консоль и наберите там команду nc localhost 8889, введите сообщение и нажмите enter. Класс Alice поймает сообщение и выведет его на экран. Есть вы пользователь windows – установите программу putty.

Перейдем ко второму классу. Создайте файл bob.py.

import json import requests from server import Server class Bob(Server): def handle(self, message): try: print("Got: {}".format(message)) url = json.loads(str(message, 'ascii'))["url"] response = requests.get(url) except Exception as e: print("Error: {}".format(e)) else: result = {} result['status_code'] = response.status_code self.send("localhost", 8889, json.dumps(result)) if __name__ == "__main__": print("Bob started.") getter = Bob("localhost", 8887) getter.start_server() getter.loop() getter.stop_server()

Можно видеть, что данный класс почти не отличается от предыдущего за исключением порта 8887 и функции обработки.

Осталось только написать функцию отправки сообщения на сокет. Допишем её в класс Server.

def send(self, ip, port, message): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((ip, port)) try: sock.sendall(bytes(message, 'ascii')) finally: sock.close()

Все файлы вы можете скачать по ссылке.

Запустите оба скрипта командами python bob.py и python alice.py. Затем в отдельной консоли введите команду nc localhost 8887 и отправьте сообщение.

{"url": "http://ifmo.su"}
Демонстрация работы системы

На основе этого кода можно создавать произвольное количество программ, расположенных на разных машинах и общающихся друг с другом посредством очередей сообщений.