Разработчики описывают порт завершения как средство, повышающее
производительность приложений, часто использующих операции ввода/вывода.
Действительно, IOCP часто используют при написании масштабируемых серверных
приложений.
Теория вопроса мультиплексирования ввода-вывода
Объект <порт>, по сути, представляет собой очередь событий ядра, из которой
извлекаются и в которую добавляются сообщения об операциях ввода/вывода.
Естественно туда добавляются не все текущие операции, а только те, которые мы
указали порту. Делается это путем связывания дескриптора (хендла) файла (не
обязательно именно файла на диске, это может быть сокет, пайп, мэилслот и т.д.)
с дескриптором порта. Когда над файлом инициируется асинхронная операция
ввода/вывода, то после ее завершения соответствующая запись добавляется в порт.
Для обработки результатов используется пул потоков, количество которых
выбирается пользователем. Когда поток присоединяют к пулу, он извлекает из
очереди один результат операции и обрабатывает его. Если на момент
присоединения очередь пуста, то поток засыпает до тех пор, пока не появится
сообщение для обработки. К интересной особенности порта завершения можно
отнести то, что в очередь можно <руками> положить какое-то сообщение, чтобы
потом его извлечь.
Как сказано в книге Роберта Лава
(2-е издание, стр. 82), мультиплексированный ввод-вывод позволяет приложениям
параллельно блокировать несколько файловых дескрипторов и получать уведомления,
как только любой из них будет готов к чтению или записи без блокировки. По
умолчанию файловые дескприторы (например, сокеты, каналы) блокируют выполнение
процесса. Это означает, что когда мы вызываем на файловом дескрипторе функцию,
которая не может выполниться немедленно, наш процесс переходит в "спящее"
состояние и ждет, когда будет выполнено определенное условие. Для того, чтобы
использовать неблокируемый ввод-вывод, необходимо использовать флаг O_NONBLOCK
для функций open или fcntl. Примеры неблокирующего ввода-вывода рассмотриваются
в лекциях по клиент-серверным приложениям.
Реализация мультиплексирования ввода-вывода использованием IOCP
После описания схемы работы перейдем к более конкретным вещам. А именно
реализации приложения, использующего IOCP. В качестве примера я буду
использовать сервер, который просто принимает входящие подключения и пакеты от
клиентов. Используемый язык - С.
Итак, для начала было бы неплохо этот самый порт создать. Делается это
APIшкой
Что примечательно, этот же вызов используется для связывания хендла файла с уже существующим портом. Зачем так было делать - неизвестно.
Строчка HANDLE iocp=CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,0);
создаст новый объект порта завершения и вернет нам его хендл. Здесь в качестве первого аргумента передается значение INVALID_HANDLE_VALUE, которое и означает то, что нам нужен новый порт. Следующие два аргумента должны иметь значение 0. При создании можно указать сколько потоков одновременно смогут работать для этого порта с помощью последнего аргумента. Если указать 0, то будет использовано значение, равное количеству процессоров в системе.
Следующим шагом является создание потоков, которые будут задействованы в пуле. Тут нельзя дать универсального совета. Некоторые говорят, что потоков должно быть в два раза больше процессоров в системе, другие, что их количество должно быть равно, третьи динамически изменяют размеры пула. Как тут поступить зависит от приложения и конфигурации компьютера. У меня старенький пенек с HyperThreading, поэтому система видит мой процессор как два. По этой причине в моем примере будет 2 рабочих потока в пуле.
Обращаю ваше внимание: мы передаем рабочим потокам хендл порта завершения в качестве параметра. Он понадобится им, когда потоки будут заявлять о своей готовности поработать. Сама функция WorkingThread() будет приведена ниже.
Теперь, когда потоки созданы, можно начинать принимать клиенты и их сообщения. Код инициализации Winsock я здесь приводить не буду (но он есть в тексте исходников этой статьи), поэтому просто напишу:
Вызов accept возвращает нам сокет очередного клиента, в который можно писать и из которого можно читать как из обычного файла. В вашем случае тут может быть файл на диске или любой другой IO-объект.
Теперь мы должны оповестить порт завершения о том, что хотим, чтобы он наблюдал за этим сокетом. Для этого связываем дескрипторы сокета и порта:
Последний аргумент в данном случае игнорируется, поскольку порт уже создан. А вот предпоследний требует особого разбирательства. В прототипе он значится как CompletionKey (<ключ завершения>). Фактически ключ является указателем на любые данные, т.е. на структуру или экземпляр класса, определенные вами. Он используется для того, чтобы внутри потока вы могли отличить одну операцию от другой или же для хранения состояния того или иного клиента. Как минимум вам придется хранить там байт, показывающий какая операция завершилась - отправки или приема (чтения или записи).
После связывания дескрипторов можно инициировать асинхронный ввод/вывод. Для сокетов используются функции Winsock2 - WSASend() и WSARecv() с переданным туда указателем на структуру OVERLAPPED, которая собственно и знаменует собой асинхронную операцию. Для файлов можно использовать функции WriteFile() и ReadFile() соответственно.
Помимо структуры OVERLAPPED вам потребуется передавать в поток еще кое-какую IO-информацию - например, адрес буфера, его длину или даже сам буфер. Это можно делать либо через ключ завершения, либо создать структуру, содержащую OVERLAPPED первым полем и передавать указатель на нее в WSASend()/WSARecv().
Теперь рассмотрим функцию API, которая присоединяет вызывающий ее поток к пулу:
Здесь CompletionPort - хендл порта, к пулу которого следует подключиться; lpNumberOfBytes - указатель на переменнную, в которую запишется количество переданных байт в результате завершения операции (фактически это возвращаемое значение recv() и send() в синхронном режиме); lpCompletionKey - указатель на переменную, в которую запишется указатель на ключ завершения; lpOverlapped - указатель на OVERLAPPED, ассоциированную с этой IO-транзакцией; наконец, dwMilliseconds - время, на которое поток может уснуть в ожидании завершения какого-либо запроса. Если указать INFINITE, то будет ждать вечно.
Теперь, когда мы познакомились с функцией извлечения из очереди можно взглянуть на функцию, с которой начинают выполнение рабочие потоки.
void WorkingThread(HANDLE iocp)
{
while(1)
{
if(!GetQueuedCompletionStatus(iocp,&bytes,&key,&overlapped,INFINITE))
//ошибка порта
break;
if(!bytes)
//0 означает что дескриптор файла закрыт, т.е. клиент отсоединился
switch(key->OpType)
{
...
}
}
}
Внутри switch'а вызываются новые асинхронные операции, которые будут обработаны при следующем прохождении цикла. Если мы не хотим, чтобы определенная операция по завершению не была передана в порт (например, когда нам не важен результат), можно использовать следующий трюк - установить первый бит поля OVERLAPPED.hEvent равным 1. Стоит отметить что, с точки зрения производительности, помещать обработку пришедшей информации в этот же цикл не самое разумное решение, т.к. это замедлит реакцию сервера на входящие пакеты. Чтобы решить проблему можно вынести разбор прочитанной информации в еще один отдельный поток, и тут нам пригодится третья API-функция:
Суть ее понятна из названия - она помещает в очередь порта сообщение. Собственно все асинхронные функции по завершении операции незаметно ее и вызывают. Все аргументы перечисленные здесь сразу же передаются одному из потоков. Благодаря PostQueuedCompletionStatus порт завершения можно использовать не только для обработки IO-операций, но и просто для эффективной организации очереди с пулом потоков.
В нашем примере имеет смысл создать еще один порт и после завершения какой-нибудь операции вызывать PostQueuedCompletionStatus(), передавая в ключе принятый пакет на обработку в другом потоке.
Внутреннее устройство IOCP
Порт завершения представляет собой следующую структуру:
При создании порта функцией CreateIoCompletionPort вызывается внутренний сервис NtCreateIoCompletion. Затем происходит его инициализация с помощью функции KeInitializeQueue. Когда происходит связывание порта с объектом <файл>, Win32-функция CreateIoCompletionPort вызывает NtSetInformationFile.
Для этой функции FILE_INFORMATION_CLASS устанавливается как FileCompletionInformation, а в качестве параметра FileInformation передается указатель на структуру IO_COMPLETION_CONTEXT или FILE_COMPLETION_INFORMATION.
После завершения асинхронной операции ввода/вывода для ассоциированного файла диспетчер ввода/вывода создает пакет запроса из структуры OVERLAPPED и ключа завершения и помещает его в очередь с помощью вызова KeInsertQueue. Когда поток вызывает функцию GetQueuedCompletionStatus, на самом деле вызывается функция NtRemoveIoCompletion. NtRemoveIoCompletion проверяет параметры и вызывает функцию KeRemoveQueue, которая блокирует поток, если в очереди отсутствуют запросы, или поле CurrentCount структуры KQUEUE больше или равно MaximumCount. Если запросы есть, и число активных потоков меньше максимального, KeRemoveQueue удаляет вызвавший ее поток из очереди ожидающих потоков и увеличивает число активных потоков на 1. При занесении потока в очередь ожидающих потоков поле Queue структуры KTHREAD устанавливается равным адресу порта завершения. Когда запрос помещается в порт завершения функцией PostQueuedCompletionStatus, на самом деле вызывается функция NtSetIoCompletion, которая после проверки параметров и преобразования хендла порта в указатель, вызывает KeInsertQueue.
Знаете ли Вы, что в 1974 - 1980 годах профессор Стефан Маринов из г. Грац, Австрия, проделал серию экспериментов, в которых показал, что Земля движется по отношению к некоторой космической системе отсчета со скоростью 360±30 км/с, которая явно имеет какой-то абсолютный статус. Естественно, ему не давали нигде выступать и он вынужден был начать выпуск своего научного журнала "Deutsche Physik", где объяснял открытое им явление. Подробнее читайте в FAQ по эфирной физике.