Перейти к содержанию
  • Лента
  • Категории
  • Последние
  • Метки
  • Популярные
  • Пользователи
  • Группы
Свернуть
exlends
Категории
  1. Главная
  2. Категории
  3. Programming languages
  4. Python
  5. Пишем свой API сервер на Python без фреймворков и библиотек

Пишем свой API сервер на Python без фреймворков и библиотек

Запланировано Прикреплена Закрыта Перенесена Python
1 Сообщения 1 Постеры 62 Просмотры
  • Сначала старые
  • Сначала новые
  • По количеству голосов
Ответить
  • Ответить, создав новую тему
Авторизуйтесь, чтобы ответить
Эта тема была удалена. Только пользователи с правом управления темами могут её видеть.
  • AladdinA Не в сети
    AladdinA Не в сети
    Aladdin
    js
    написал в отредактировано Aladdin
    #1

    Введение

    В этом гайде вы изучите принципы работы веб-серверов и научитесь создавать собственный HTTP API сервер на Python, используя только стандартные библиотеки языка. Материал предназначен для программистов-школьников и студентов, желающих понять, как работают веб-серверы на низком уровне.

    Что такое HTTP сервер

    HTTP сервер — это программа, которая принимает запросы от клиентов через протокол HTTP и отправляет им ответы. Современный веб основан на текстовом обмене данными между клиентами и серверами.

    Основные компоненты HTTP сервера:

    • Сетевой слой - принимает TCP соединения
    • Парсер запросов - разбирает HTTP запросы
    • Маршрутизация - определяет обработчик для запроса
    • Генератор ответов - формирует HTTP ответы
    • Менеджер соединений - управляет клиентскими подключениями

    HTTP протокол: основы

    HTTP (HyperText Transfer Protocol) - это протокол прикладного уровня для передачи данных между веб-сервером и клиентом. Протокол работает по принципу запрос-ответ.

    Структура HTTP запроса:

    GET /api/users HTTP/1.1
    Host: localhost:8080
    User-Agent: Mozilla/5.0
    Accept: application/json
    Content-Type: application/json
    Content-Length: 25
    
    {"name": "John Doe"}
    

    HTTP методы

    Метод Описание Безопасный Идемпотентный
    GET Получение данных Да Да
    POST Создание/отправка данных Нет Нет
    PUT Обновление/создание Нет Да
    DELETE Удаление Нет Да
    HEAD Получение только заголовков Да Да
    OPTIONS Получение разрешенных методов Да Да

    HTTP статус коды

    • 2xx - Успешные операции
      • 200 OK - Запрос успешно обработан
      • 201 Created - Ресурс успешно создан
      • 204 No Content - Успех без содержимого
    • 4xx - Ошибки клиента
      • 400 Bad Request - Неверный формат запроса
      • 401 Unauthorized - Требуется аутентификация
      • 404 Not Found - Ресурс не найден
    • 5xx - Ошибки сервера
      • 500 Internal Server Error - Внутренняя ошибка сервера
      • 502 Bad Gateway - Ошибка шлюза
      • 503 Service Unavailable - Сервис недоступен

    Socket Programming в Python

    Socket (сокет) — это конечная точка двустороннего канала связи между процессами в сети. Python предоставляет мощный модуль socket для работы с сетевыми соединениями.

    Основные методы работы с сокетами:

    import socket
    
    # Создание TCP сокета
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    # Настройка переиспользования адреса
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    
    # Привязка к адресу и порту
    server_socket.bind(('127.0.0.1', 8080))
    
    # Начало прослушивания (до 5 соединений в очереди)
    server_socket.listen(5)
    
    # Принятие клиентского соединения
    client_socket, address = server_socket.accept()
    
    # Получение данных
    data = client_socket.recv(4096)
    
    # Отправка данных
    client_socket.send(response.encode('utf-8'))
    
    # Закрытие соединения
    client_socket.close()
    

    Базовый HTTP сервер

    Вот полный код простого HTTP сервера, работающего без внешних зависимостей:

    import socket
    import threading
    from datetime import datetime
    
    class SimpleHTTPServer:
        def __init__(self, host='127.0.0.1', port=8080):
            self.host = host
            self.port = port
            self.socket = None
            self.running = False
        
        def start(self):
            """Запуск HTTP сервера"""
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            
            try:
                self.socket.bind((self.host, self.port))
                self.socket.listen(5)
                self.running = True
                
                print(f"HTTP сервер запущен на http://{self.host}:{self.port}")
                
                while self.running:
                    try:
                        client_socket, address = self.socket.accept()
                        # Создаем отдельный поток для каждого клиента
                        client_thread = threading.Thread(
                            target=self.handle_client, 
                            args=(client_socket, address)
                        )
                        client_thread.daemon = True
                        client_thread.start()
                        
                    except socket.error:
                        break
                        
            except Exception as e:
                print(f"Ошибка сервера: {e}")
            finally:
                self.stop()
        
        def handle_client(self, client_socket, address):
            """Обработка запроса клиента"""
            try:
                request_data = client_socket.recv(4096).decode('utf-8')
                
                if request_data:
                    http_request = self.parse_request(request_data)
                    response = self.generate_response(http_request)
                    client_socket.send(response.encode('utf-8'))
                    
            except Exception as e:
                print(f"Ошибка обработки клиента {address}: {e}")
            finally:
                client_socket.close()
        
        def parse_request(self, request_data):
            """Парсинг HTTP запроса"""
            lines = request_data.split('\r\n')
            
            # Парсим стартовую строку: "GET /path HTTP/1.1"
            request_line = lines[^0].split()
            method = request_line[^0] if len(request_line) > 0 else 'GET'
            path = request_line[^1] if len(request_line) > 1 else '/'
            version = request_line[^2] if len(request_line) > 2 else 'HTTP/1.1'
            
            # Парсим заголовки
            headers = {}
            for line in lines[1:]:
                if line == '':  # Пустая строка означает конец заголовков
                    break
                if ':' in line:
                    key, value = line.split(':', 1)
                    headers[key.strip()] = value.strip()
            
            return {
                'method': method,
                'path': path,
                'version': version,
                'headers': headers
            }
        
        def generate_response(self, request):
            """Генерация HTTP ответа"""
            method = request['method']
            path = request['path']
            
            # Простая маршрутизация
            if path == '/':
                status_code = 200
                body = self.get_home_page()
                content_type = 'text/html'
            elif path == '/api/time':
                status_code = 200
                body = f'{{"current_time": "{datetime.now().isoformat()}"}}'
                content_type = 'application/json'
            else:
                status_code = 404
                body = '<h1>404 - Страница не найдена</h1>'
                content_type = 'text/html'
            
            # Формируем HTTP ответ
            response_headers = [
                f'HTTP/1.1 {status_code} {"OK" if status_code == 200 else "Not Found"}',
                f'Content-Type: {content_type}; charset=utf-8',
                f'Content-Length: {len(body.encode("utf-8"))}',
                'Connection: close',
                f'Date: {datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT")}',
                '',  # Пустая строка разделяет заголовки и тело
                body
            ]
            
            return '\r\n'.join(response_headers)
        
        def get_home_page(self):
            """Генерация главной страницы"""
            return '''<!DOCTYPE html>
    <html>
    <head>
        <title>Simple HTTP Server</title>
        <meta charset="utf-8">
    </head>
    <body>
        <h1>Простой HTTP сервер на Python</h1>
        <p>Сервер работает без фреймворков!</p>
        <ul>
            <li><a href="/">/</a> - Главная страница</li>
            <li><a href="/api/time">/api/time</a> - Текущее время</li>
        </ul>
    </body>
    </html>'''
        
        def stop(self):
            """Остановка сервера"""
            self.running = False
            if self.socket:
                self.socket.close()
    
    # Запуск сервера
    if __name__ == '__main__':
        server = SimpleHTTPServer('127.0.0.1', 8080)
        try:
            server.start()
        except KeyboardInterrupt:
            print("\nСервер остановлен")
            server.stop()
    

    Многопоточность и производительность

    Без многопоточности сервер может обрабатывать только один запрос за раз, что создает блокировки. Python предоставляет несколько способов обработки множественных соединений:

    Threading vs Multiprocessing

    Подход Преимущества Недостатки
    Threading Общая память, быстрое переключение GIL ограничивает параллелизм
    Multiprocessing Истинный параллелизм Больше памяти, сложнее обмен данными
    AsyncIO Эффективность I/O операций Сложность программирования

    Пул потоков для оптимизации:

    from concurrent.futures import ThreadPoolExecutor
    
    class ThreadedServer(SimpleHTTPServer):
        def __init__(self, host='127.0.0.1', port=8080, max_workers=50):
            super().__init__(host, port)
            self.executor = ThreadPoolExecutor(max_workers=max_workers)
        
        def start(self):
            """Запуск с пулом потоков"""
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            
            try:
                self.socket.bind((self.host, self.port))
                self.socket.listen(5)
                self.running = True
                
                while self.running:
                    try:
                        client_socket, address = self.socket.accept()
                        # Используем пул потоков
                        self.executor.submit(
                            self.handle_client, 
                            client_socket, 
                            address
                        )
                    except socket.error:
                        break
            finally:
                self.executor.shutdown()
    

    Управление памятью в Python

    Python автоматически управляет памятью через reference counting и garbage collection. Однако в веб-серверах важно оптимизировать использование памяти.

    Python Memory Management in Web Servers

    Основные техники оптимизации памяти:

    1. Используйте генераторы вместо списков для больших данных
    2. Ограничивайте размеры буферов и кэшей
    3. Применяйте weak references для автоматической очистки
    4. Мониторьте память и принудительно вызывайте сборщик мусора
    import gc
    import weakref
    from collections import deque
    
    class MemoryOptimizedServer:
        def __init__(self):
            # Ограничиваем историю запросов
            self.request_history = deque(maxlen=1000)
            
            # Weak references для соединений
            self.active_connections = weakref.WeakSet()
            
            # Настройка сборщика мусора
            gc.set_threshold(700, 10, 10)
        
        def process_large_data(self, data):
            """Используем генераторы для экономии памяти"""
            for chunk in self.data_chunks(data, 1024):
                yield self.process_chunk(chunk)
        
        def data_chunks(self, data, chunk_size):
            """Генератор для обработки данных по частям"""
            for i in range(0, len(data), chunk_size):
                yield data[i:i + chunk_size]
        
        def monitor_memory(self):
            """Мониторинг использования памяти"""
            import psutil
            process = psutil.Process()
            memory_percent = process.memory_percent()
            
            # Принудительная очистка при превышении лимита
            if memory_percent > 80:
                gc.collect()
            
            return memory_percent
    

    Продвинутый парсинг HTTP запросов

    Для надежной работы сервера необходим качественный парсинг HTTP запросов:

    import re
    import json
    from urllib.parse import parse_qs, urlparse
    
    class HTTPRequestParser:
        def __init__(self):
            self.request_line_pattern = re.compile(
                r'^([A-Z]+)\s+([^\s]+)\s+HTTP/([0-9]\.[0-9])$'
            )
        
        def parse(self, request_data):
            """Полный парсинг HTTP запроса"""
            if isinstance(request_data, bytes):
                request_data = request_data.decode('utf-8')
            
            # Разделяем заголовки и тело
            if '\r\n\r\n' in request_data:
                headers_part, body = request_data.split('\r\n\r\n', 1)
            else:
                headers_part = request_data
                body = ''
            
            lines = headers_part.split('\r\n')
            
            # Парсим компоненты
            method, path, version = self.parse_request_line(lines[^0])
            url_parts = self.parse_url(path)
            headers = self.parse_headers(lines[1:])
            parsed_body = self.parse_body(body, headers)
            
            return {
                'method': method,
                'path': url_parts['path'],
                'query_params': url_parts['query'],
                'version': version,
                'headers': headers,
                'body': parsed_body
            }
        
        def parse_body(self, body, headers):
            """Парсинг тела запроса по Content-Type"""
            if not body:
                return None
            
            content_type = headers.get('content-type', '').lower()
            
            if 'application/json' in content_type:
                try:
                    return json.loads(body)
                except json.JSONDecodeError:
                    return body
            elif 'application/x-www-form-urlencoded' in content_type:
                return parse_qs(body)
            else:
                return body
    

    Маршрутизация запросов

    Система маршрутизации определяет, какой код должен обработать конкретный запрос:

    import re
    from functools import wraps
    
    class Router:
        def __init__(self):
            self.routes = []
        
        def add_route(self, method, pattern, handler):
            """Добавление маршрута"""
            compiled_pattern = re.compile(pattern)
            self.routes.append({
                'method': method.upper(),
                'pattern': compiled_pattern,
                'handler': handler
            })
        
        def get(self, pattern):
            """Декоратор для GET запросов"""
            def decorator(func):
                self.add_route('GET', pattern, func)
                return func
            return decorator
        
        def post(self, pattern):
            """Декоратор для POST запросов"""
            def decorator(func):
                self.add_route('POST', pattern, func)
                return func
            return decorator
        
        def route(self, request):
            """Поиск подходящего маршрута"""
            method = request['method']
            path = request['path']
            
            for route in self.routes:
                if route['method'] == method:
                    match = route['pattern'].match(path)
                    if match:
                        request['params'] = match.groups()
                        return route['handler'](request)
            
            return self.not_found_response()
    
    # Использование роутера
    router = Router()
    
    @router.get(r'^/$')
    def home_page(request):
        return {
            'status': 200,
            'headers': {'Content-Type': 'text/html'},
            'body': '<h1>Главная страница</h1>'
        }
    
    @router.get(r'^/api/users/(\d+)$')
    def get_user(request):
        user_id = request['params'][^0]
        return {
            'status': 200,
            'headers': {'Content-Type': 'application/json'},
            'body': f'{{"user_id": {user_id}}}'
        }
    

    Обработка ошибок и отладка

    Качественная обработка ошибок критична для стабильности сервера:

    import logging
    from datetime import datetime
    
    # Настройка логирования
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    class LoggingServer(SimpleHTTPServer):
        def handle_client(self, client_socket, address):
            """Обработка с логированием"""
            start_time = datetime.now()
            
            try:
                request_data = client_socket.recv(4096).decode('utf-8')
                
                if request_data:
                    request_line = request_data.split('\r\n')[^0]
                    logger.info(f"Запрос от {address}: {request_line}")
                    
                    http_request = self.parse_request(request_data)
                    response = self.generate_response(http_request)
                    
                    client_socket.send(response.encode('utf-8'))
                    
                    duration = datetime.now() - start_time
                    logger.info(f"Обработано за {duration.total_seconds():.3f}с")
                    
            except Exception as e:
                logger.error(f"Ошибка обработки {address}: {e}")
                error_response = self.generate_error_response(500, str(e))
                try:
                    client_socket.send(error_response.encode('utf-8'))
                except:
                    pass
            finally:
                client_socket.close()
    

    Тестирование сервера

    Для проверки работоспособности сервера создадим простой тестовый клиент:

    import socket
    import time
    
    def test_server(host='127.0.0.1', port=8080):
        """Тестирование HTTP сервера"""
        
        test_requests = [
            "GET / HTTP/1.1\r\nHost: localhost\r\n\r\n",
            "GET /api/time HTTP/1.1\r\nHost: localhost\r\n\r\n",
            "GET /nonexistent HTTP/1.1\r\nHost: localhost\r\n\r\n"
        ]
        
        for i, request in enumerate(test_requests):
            print(f"\n--- Тест {i+1} ---")
            
            try:
                client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                client_socket.connect((host, port))
                
                client_socket.send(request.encode('utf-8'))
                response = client_socket.recv(4096).decode('utf-8')
                
                status_line = response.split('\r\n')[^0]
                print(f"Статус: {status_line}")
                
                client_socket.close()
                
            except Exception as e:
                print(f"Ошибка теста: {e}")
    
    if __name__ == '__main__':
        test_server()
    

    Оптимизация производительности

    Кэширование ответов

    import time
    from functools import lru_cache
    
    class CachingServer:
        def __init__(self):
            self.response_cache = {}
            self.cache_ttl = 60  # секунд
        
        def get_cached_response(self, cache_key):
            """Получение ответа из кэша"""
            if cache_key in self.response_cache:
                response, timestamp = self.response_cache[cache_key]
                if time.time() - timestamp < self.cache_ttl:
                    return response
            return None
        
        @lru_cache(maxsize=128)
        def generate_static_response(self, path):
            """Кэшированная генерация статических ответов"""
            if path == '/':
                return self.get_home_page()
            return None
    

    Сжатие ответов

    import gzip
    
    def compress_response(body, encoding='gzip'):
        """Сжатие тела ответа"""
        if isinstance(body, str):
            body = body.encode('utf-8')
        
        if encoding == 'gzip' and len(body) > 1024:
            return gzip.compress(body)
        return body
    

    Безопасность

    Базовые принципы безопасности для веб-серверов:

    import time
    import hashlib
    
    class SecureServer:
        def __init__(self):
            self.rate_limits = {}  # IP -> timestamps
            self.blocked_ips = set()
        
        def check_rate_limit(self, client_ip, max_requests=100, window=3600):
            """Проверка ограничений скорости запросов"""
            now = time.time()
            
            if client_ip in self.rate_limits:
                self.rate_limits[client_ip] = [
                    ts for ts in self.rate_limits[client_ip]
                    if now - ts < window
                ]
            else:
                self.rate_limits[client_ip] = []
            
            if len(self.rate_limits[client_ip]) >= max_requests:
                self.blocked_ips.add(client_ip)
                return False
            
            self.rate_limits[client_ip].append(now)
            return True
        
        def validate_request(self, request):
            """Валидация запроса"""
            path = request['path']
            
            # Проверка на Path Traversal
            if '..' in path or '~' in path:
                return False, "Path traversal detected"
            
            # Проверка размера
            content_length = int(request['headers'].get('content-length', 0))
            if content_length > 10 * 1024 * 1024:  # 10MB
                return False, "Request too large"
            
            return True, "OK"
    

    Заключение

    Ограничения самодельных серверов:

    • Производительность уступает оптимизированным фреймворкам
    • Сложность обеспечения полной совместимости с HTTP стандартами
    • Требуется больше времени на разработку и отладку

    Для продакшена рекомендуется использовать:

    • FastAPI - современный, быстрый фреймворк
    • Flask - простой и гибкий
    • aiohttp - асинхронный веб-сервер

    Помните: понимание низкоуровневых принципов работы веб-серверов поможет вам стать лучшим разработчиком и эффективнее использовать готовые фреймворки!

    1 ответ Последний ответ
    1

    Категории

    • Главная
    • Новости
    • Фронтенд
    • Бекенд
    • Языки программирования

    Контакты

    • Сотрудничество
    • info@exlends.com
    • Наш чат
    • Наш ТГ канал

    © 2024 - 2025 ExLends, Inc. Все права защищены.

    Политика конфиденциальности
    • Войти

    • Нет учётной записи? Зарегистрироваться

    • Войдите или зарегистрируйтесь для поиска.
    • Первое сообщение
      Последнее сообщение
    0
    • Лента
    • Категории
    • Последние
    • Метки
    • Популярные
    • Пользователи
    • Группы