Задание №4
Задание
Реализовать двухпользовательский или многопользовательский чат. Реализация многопользовательского чата позволяет получить максимальное количество баллов.
Реализовать с помощью протокола TCP – 100% баллов, с помощью UDP – 80%.
Обязательно использовать библиотеку threading
.
Для реализации с помощью UDP, threading
использовать для получения
сообщений у клиента.
Для применения с TCP необходимо запускать клиентские подключения И прием
и отправку сообщений всем юзерам на сервере в потоках.
Не забудьте сохранять юзеров, чтобы потом отправлять им сообщения.
import socket
import threading
class Client:
def __init__(self, client_sock: socket.socket, address: tuple[str, int]):
self.socket = client_sock
self.address = address
self.socket.settimeout(0.5)
self.is_closed = False
class ClientThread(threading.Thread):
def __init__(self, client: Client, client_lock: threading.Lock):
super().__init__()
self.client = client
self.should_stop = threading.Event()
self.client_lock = client_lock
def run(self) -> None:
self.broadcast("Присоединился к чату")
while not self.should_stop.is_set():
if self.client.is_closed:
break
try:
message = self.client.socket.recv(1024).decode()
if not message:
self.remove_client()
break
else:
self.broadcast(message)
except socket.timeout:
pass
except OSError:
break
except ConnectionResetError:
self.remove_client()
break
def remove_client(self):
self.client.is_closed = True
self.broadcast("Покинул чат")
self.client_lock.acquire()
try:
clients.remove(self.client)
finally:
self.client_lock.release()
self.client.socket.close()
self.should_stop.set()
def broadcast(self, message: str) -> None:
self.client_lock.acquire()
try:
other_clients = clients[:]
finally:
self.client_lock.release()
for other_client in other_clients:
if other_client != self.client:
try:
other_client.socket.send(
f"{self.client.address[0]}:{self.client.address[1]} : "
f"{message}".encode()
)
except socket.timeout:
pass
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("127.0.0.1", 8080))
server_socket.listen(10)
server_socket.settimeout(0.5)
clients = []
client_lock = threading.Lock()
try:
while True:
try:
client_sock, address = server_socket.accept()
client = Client(client_sock, address)
client_lock.acquire()
try:
clients.append(client)
finally:
client_lock.release()
new_thread = ClientThread(client, client_lock)
new_thread.start()
except socket.timeout:
pass
except KeyboardInterrupt:
for client in clients:
client.is_closed = True
client.socket.close()
server_socket.close()
Создадим class Client
, в атрибутах которого будут храниться клиентский сокет, клиентский адрес,
булевое значение is_closed
(True
, если сокет закрыт. Необходимо для понимания других потоков,
что с этим сокетом больше работать не нужно). Также поставим таймаут для сокета, чтобы была возможность выйти
с помощью Ctrl+C.
Создадим class ClientThread
, наследуемый от threading.Thread
, к его атрибутам добавим объект
класса Client
, с которым этот поток работает, событие threading.Event
- should_stop
, которое будем проверять
для завершения потока, и threading.Lock
для того, чтобы в нужный момент блокировать поток при доступе к общим изменяемым данным.
В этом классе переопределим метод def run
, который запускается при использовании метода .start
у потока.
Первым делом этот метод запускает метод broadcast
, отвечающий за отправку сообщений всем пользователям,
о его работе поговорим чуть позже.
Далее метод запускает цикл while
, который работает до тех пор, пока Event
не скажет о том, что потоку следует
остановиться.
Дополнительно проверяем статус клиента is_closed
, на случай, если сокет клиента уже закрылся другим потоком.
Получаем сообщение клиента. Если оно пустое, значит, клиент отключился, и мы должны удалить его, вызвав метод remove_client
,
о нём тоже позже.
Если сообщение есть - вызываем метод broadcast
с текстом сообщения.
Дополнительно проверяем ошибку OSError
на случай, если поток всё-таки решится обратиться к уже закрытому сокету,
и ошибку ConnectionResetError
на случай, если мы будет ожидать сообщение от клиента в момент, когда он отключается.
Теперь рассмотрим метод remove_client
. В этом методе меняем атрибут is_closed
на True,
чтобы другой поток больше не использовал этот сокет. Также отправляем сообщение всем пользователям о том, что этот клиент
покинул чат.
С помощью Lock
удаляем клиента из списка клиентов. Закрываем сокет и включаем событие should_stop
для текущего потока.
В методе broadcast
с помощью Lock
делаем копию списка клиентов. Пробегаемся в цикле for
по всем клиентам
и отправляем всем клиентам, не являющимся тем клиентом, вызвал этот метод, сообщение с необходимым текстом.
В основной части программы создаём сокет сервера, так же выставляем для него таймаут. Создаём пустой список клиентов
и создаём объект Lock
.
В цикле while
принимаем соединение клиента, на основе полученных данных создаём объекта Client
.
С помощью Lock
добавляем этого клиента в список.
На основе объектов Client
и Lock
создаём наш кастомный поток и запускаем его.
Если программа завершается нажатием на Ctrl+C, закрываем все клиентские сокеты, также изменяя их атрибут is_closed
,
и затем закрываем сокет сервера.
import socket
import threading
class ReceiveThread(threading.Thread):
def __init__(self, client_sock: socket.socket):
super().__init__()
self.client_sock = client_sock
self.server_closed = False
def run(self) -> None:
while not stop_threads and not self.server_closed:
try:
message = self.client_sock.recv(1024)
if not message:
print(
"Сервер закрыл соединение! "
"Напишите любое сообщение или нажмите на Ctrl+C."
)
self.server_closed = True
break
print(message.decode())
except socket.timeout:
continue
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(("127.0.0.1", 8080))
client_socket.settimeout(0.5)
stop_threads = False
receiver = ReceiveThread(client_socket)
receiver.start()
try:
while True:
if receiver.server_closed:
break
try:
message = input()
client_socket.sendall(message.encode())
except KeyboardInterrupt:
print("Вы покинули чат!")
break
except KeyboardInterrupt:
print("Вы покинули чат!")
finally:
stop_threads = True
receiver.join()
client_socket.close()
Создадим class ReceiveThread
, наследуемый от threading.Thread
, к его атрибутам добавим объект
сокета и булевое значение server_closed
, которое становится True, если сервер закрывает соединение.
В этом классе переопределим метод def run
, который запускается при использовании метода .start
у потока.
В этом методе запускаем цикл while
, пока не поступило команды завершить потоки (глобальное булевое stop_threads
,
становится True, когда клиент завершает работу нажатием Ctrl+C) и пока ещё не известно, что сервер отключился.
В цикле получаем сообщение от сервера. Если оно пустое, значит сервер закрыл соеднение. Выводим об этом сообщение клиенту
и просим завершить сессию. Также меняем значение атрибута server_closed
на True.
В основной части программы инициализируем сокет, подключаем к сокету сервера, ставим ему таймаут.
С помощью сокета создаём объект нашего кастомного потока и запускаем его.
Запускаем цикл while
, работающий до тех пор, пока сервер не закроет соединение или клиент на завершит программу
нажатием на Ctrl+C. В самом цикле получаем ввод пользователя через input()
и отправляем полученное сообщение на сервер.
При завершении программы меняем булевое stop_threads
на True и дожидаемся завершения потока "приёмника" с помощью join.
Затем закрываем сокет.