Руководство программиста для Linux

    778b1c86   

Очереди сообщений


Базовые принципы

Очереди сообщений представляют собой связный список в адресном пространстве ядра. Сообщения могут посылаться в очередь по порядку и доставаться из очереди несколькими разными путями. Каждая очередь сообщений однозначно определена идентификатором IPC.

Внутренние и пользовательские структуры данных

Ключом к полному осознанию такой сложной системы, как System V IPC, является более тесное знакомство с различными структурами данных, которые лежат внутри самого ядра. Даже для большинства примитивных операций необходим прямой доступ к некоторым из этих структур, хотя другие используются только на гораздо более низком уровне.

Буфер сообщения

Первой структурой, которую мы рассмотрим, будет msgbuf. Его можно понимать как шаблон для данных сообщения. Поскольку данные в сообщении программист определяет сам, он обязан понимать, что на самом деле они являются структурой msgbuf. Его описание находится в linux/msg.h: /* буфер сообщения для вызовов msgsnd и msgrcv*/ struct msgbuf { long mtype; /* тип сообщения */ char mtext[1]; /* текст сообщения */ }; mtype

Тип сообщения, представленный натуральным числом. Он обязан быть натуральным! mtext

Собственно сообщение.

Возможность приписывать тип конкретному сообщению позволяет держать в одной очереди разнородные сообщения. Это может понадобиться, например, когда сообщения процесса-клиента помечаются одним магическим числом, а сообщения сообщения процесса-сервера - другим; или приложение ставит в очередь сообщения об ошибках с типом 1, сообщения-запросы - с типом 2 и т.д. Ваши возможности просто безграничны.

С другой стороны, старайтесь дать наглядное имя элементу данных сообщения (в примере был mtext). В это поле можно записывать не только массивы литер, но и вообще любые данные в любой форме. Поле действительно полностью произвольно, поэтому вся структура может быть переопределена программистом, например, так: struct my_msgbuf { long mtype; /* тип сообщения */ long request_id; /* идентификатор запроса */ struct client info; /* информация о клиенте */ }




Здесь мы также видим структуру сообщения, но второй элемент заменился на два, причем один из них - другая структура! В этом прелесть очередей сообщений, ядро не разбирает данные, какими бы они ни были.

Существует, однако, ограничение на максимальный размер сообщения.В LINUX-е он определен в linux/msg.h: #define MSGMAX 4056 /* <= 4056 */ /* максимальный размер сообщения, в байтах*/

Сообщения не могут быть больше, чем 4056 байт, сюда входит и элемент mtype, который занимает 4 байта (long).

Структура msg ядра

Ядро хранит сообщение в очереди структуры msg. Она определена в linux/msg.h следующим образом: struct msg { struct msg *msg_next; /* следующее сообщение в очереди */ long msg_type; char *msg_spot; /* адрес текста сообщения */ short msg_ts; /* размер текста */ }; msg_next

Указатель на следующее сообщение в очереди. Сообщения объединены в односвязный список и находятся в адресном пространстве ядра. msg_type Тип сообщения, каким он был объявлен в msgbuf. msg_spot Указатель на начало тела сообщения. msg_ts Длина текста (или тела) сообщения. Структура msqid_ds ядра

Каждый из трех типов IPC-объектов имеет внутреннее представление, которое поддерживается ядром. Для очередей сообщений это структура msqid_ds. Ядро создает, хранит и сопровождает образец такой структуры для каждой очереди сообщений в системе. Она определена в linux/msg.h следующим образом: /* структура msqid для каждой очереди в системе */ struct msqid_ds { struct ipc_perm msg_perm; struct msg *msg_first; /* первое сообщение в очереди */ struct msg *msg_last; /* последнее сообщение в очереди */ time_t msg_stime; /* время последнего вызова msgsnd */ time_t msg_rtime; /* время последнего вызова msgrcv */ time_t msg_ctime; /* время последнего изменения */ struct wait_queue *wwait; struct wait_queue *rwait; ushort msg_cbytes; ushort msg_qnum; ushort msg_qbytes; /* максимальное число байтов на очередь */ ushort msg_lspid; /* pid последнего испустившего msgsnd */ ushort msg_lrpid; /* последний полученный pid */ };





Хотя большинство элементов этой структуры вас будет мало волновать, для какой-то законченности мы вкратце поясним каждый.

msg_perm

Экземпляр структуры ipc_perm, определенной в linux/ipc.h. Она содержит информацию о доступе для очереди сообщений, включая права доступа и информацию о создателе сообщения (uid и т.п.).

msg_first

Ссылка на первое сообщение в очереди (голова списка).

msg_last

Ссылка на последний элемент списка (хвост списка).

msg_stime

Момент времени (time_t) посылки последнего сообщения из очереди.

msg_rtime

Момент времени последнего изъятия элемента из очереди.

msg_ctime

Момент времени последнего изменения, проделанного в очереди (подробнее об этом позже).

wwait и rwait

Указатели в очередь ожидания ядра. Они используются, когда операция над очередью сообщений переводит процесс в состояние спячки (то есть очередь переполнена, и процесс ждет открытия).

msg_cbytes

Число байт, стоящих в очереди (суммарный размер всех сообщений).

msg_qnum

Количество сообщений в очереди на настоящий момент.

msg_qbytes

Максимальный размер очереди.

msg_lspid

PID процесса, пославшего последнее в очереди сообщение.

msg_lrpid

PID последнего процесса, взявшего из очереди сообщение.

Структура ipc_perm ядра

Информацию о доступе к IPC-объектам ядро хранит в структуре ipc_perm. Например, описанная выше структура очереди сообщений содержит одну структуру типа ipc_perm в качестве элемента. Следующее ее определение дано в linux/ipc.h. struct ipc_perm { key_t key; ushort uid; /* euid и egid владельца */ ushort gid; ushort cuid; /* euid и egid создателя */ ushort cgid; ushort mode; /* режим доступа, см. режимные флаги ниже */ ushort seq; /* порядковый номер использования гнезда */ };

Все приведенное выше говорит само за себя. Сохраняемая отдельно вместе с ключом IPC-объекта информация содержит данные о владельце и создателе этого объекта (они могут различаться). Режимы восьмеричного доступа также хранятся здесь, как unsigned short. Наконец, сохраняется порядковый номер использования гнезда. Каждый раз когда IPC объект закрывается через системный вызов (уничтожается), этот номер уменьшается на максимальное число объектов IPC, которые могут находиться в системе. Касается вас это значение? Нет.

Системный вызов msgget() нужен для того, чтобы создать очередь сообщений или подключиться к существующей. SYSTEM CALL: msgget() PROTOTYPE: int msgget( key_t key, int msgflg ); RETURNS: идентификатор очереди сообщений в случае успеха; -1 в случае ошибки. При этом errno = EACCESS (доступ отклонен) EEXIST (такая очередь уже есть, создание невозможно) EIDRM (очередь помечена как удаляемая) ENOENT (очередь не существует) ENOMEM (не хватает памяти для создания новой очереди) ENOSPC (исчерпан лимит на количество очередей)



NOTES:

Первый аргумент msgget() значение ключа ( мы его получаем при помощи ftok()). Этот ключ сравнивается с ключами уже существующих в ядре очередей. При этом операция открытия или доступа к очереди зависит от содержимого аргумента msgflg:

IPC_CREAT

Создает очередь, если она не была создана ранее.

IPC_EXCL

При использовании совместно с IPC_CREAT, приводит к неудаче если очередь уже существует.

Вызов msgget() с IPC_CREAT, но без IPC_EXCL всегда выдает идентификатор (существующей с таким ключом или созданной) очереди. Использование IPC_EXCL вместе с IPC_CREAT либо создает новую очередь, либо, если очередь уже существует, заканчивается неудачей. Самостоятельно IPC_EXCL бесполезен, но вместе c IPC_CREAT он дает гарантию, что ни одна из существующих очередей не открывается для доступа.

Восьмеричный режим может быть OR-нут в маску доступа. Каждый IPC-объект имеет права доступа, аналогичные правам доступа к файлу в файловой системе UNIX-а.

Напишем оберточную функцию для открытия или создания очереди сообщений. int open_queue( key_t keyval ) { int qid; if ((qid = msgget ( keyval, IPC_CREAT | 0660 )) == -1) { return (-1); } return (qid); }

Отметьте использование точного ограничителя доступа 0660. Эта небольшая функция возвращает идентификатор очереди (int) или -1 в случае ошибки. Единственный требуемый аргумент - ключевое значение.

Системный вызов msgsnd()

Получив идентификатор очереди, мы можем выполнять над ней различные действия. Чтобы поставить сообщение в очередь, используйте системный вызов msgsnd(): SYSTEM CALL: msgsnd(); PROTOTYPE: int msgsnd( int msqid, struct msgbuf *msgp, int msgsz, int msgflg ); RETURNS: 0 в случае успеха -1 в случае ошибки: errno = EAGAIN (очередь переполнена, и установлен IPC_NOWAIT) EACCES (доступ отклонен, нет разрешения на запись) EFAULT (адрес msgp недоступен, неверно...) EIDRM (очередь сообщений удалена) EINTR (получен сигнал во время ожидания печати) EINVAL (ошибочный идентификатор очереди сообщений, неположительный тип сообщения или неправильный размер сообщения) ENOMEM (не хватает памяти для копии буфера сообщения) NOTES:



Первый аргумент msgsnd - идентификатор нашей очереди, возвращенный предварительным вызовом msgget. Второй аргумент, msgp - это указатель на редекларированный и загруженный буфер сообщения. Аргумент msgsz содержит длину сообщения в байтах, не учитывая тип сообщения (long 4 байта). Аргумент msgflg может быть нулем или: IPC_NOWAIT

Если очередь переполнена, то сообщение не записывается в очередь, и управление передается вызывающему процессу. Если эта ситуация не обрабатывается вызывающим процессом, то он приостанавливается (блокируется), пока сообщение не будет прочитано.

Напишем незатейливую оберточную функцию для посылки сообщения: int send_message( int qid, struct mymsgbuf *qbuf ) { int result, length; /* Длина есть в точности размер структуры минус sizeof(mtype) */ length = sizeof(struct mymsgbuf) - sizeof(long); if((result = msgsnd( qid, qbuf, length, 0)) == -1) { return(-1); } return(result); }

Эта функция пытается послать сообщение, лежащее по указанному адресу (qbuf), в очередь сообщений, идентифицированную qid. Напишем небольшую утилиту с нашими двумя оберточными функциями: #include #include #include #include main() { int qid; key_t msgkey; struct mymsgbuf { long mtype; /* тип сообщения */ int request; /* рабочий номер запроса */ double salary; /* зарплата */ } msg; /* Генерируем IPC-ключ */ msgkey = ftok(".", 'm'); /* Открываем/создаем очередь */ if (( qid = open_queue( msgkey)) == -1) { perror("open_queue"); exit(1); } /* Заполняем сообщение произвольными тестовыми данными */ msg.type = 1; /* тип сообщения должен быть положительным! */ msg.request = 1; /* элемент данных 1 */ msg.salary = 1000.00; /* элемент данных 2 (моя годовая зарплата! - авт.) */ /* Бомбим! */ if((send_message( qid, &msg )) == -1) { perror("send_message"); exit(1); } }

После создания/открытия нашей очереди принимаемся за загрузку буфера сообщения с тестовыми данными (заметьте отсутствие текстовых данных для иллюстрации нашего положения о посылке двоичной информации). Вызов нашего send_message ловко доставит сообщение в очередь.

Теперь, когда мы имеем сообщение в очереди, попытайтесь при помощи ipcs посмотреть на статус нашей очереди. Обсудим, как забрать из очереди сообщение. Для этого используется системный вызов msgrcv(): SYSTEM CALL: msgrcv(); PROTOTYPE: int msgrcv( int msqid, struct msgbuf *msgp, int msgsz, long mtype, $$) RETURNS: число байт, скопированных в буфер сообщения -1 в случае ошибки: errno = E2BIG (длина сообщения больше, чем msgsz, $$) EACCES (нет права на чтение) EFAULT (адрес, на который указывает msgp, ошибочен) EIDRM (очередь была уничтожена в период изъятия сообщения) EINTR (прервано поступившим сигналом) EINVAL (msgqid ошибочен или msgsz меньше 0) ENOMSG (установлен IPC_NOWAIT, но в очереди нет ни одного сообщения, удовлетворяющего запросу) NOTES:



Конечно, первый аргумент определяет очередь, из которой будет взято сообщение (должен быть возвращен сделанным предварительно вызовом msgget). Второй аргумент (msgp) представляет собой адрес буфера, куда будет положено изъятое сообщение. Третий аргумент, msgsz, ограничивает размер структуры-буфера без учета длины элемента mtype.

Еще раз повторимся, это может быть легко вычислено: msgsz = sizeof(struct mymsgbuf) - sizeof(long);

Четвертый аргумент, mtype - это тип сообщения, изымаемого из очереди. Ядро будет искать в очереди наиболее старое сообщение такого типа и вернет его копию по адресу, указанному аргументом msgp. Существует один особый случай: если mtype = 0, то будет возвращено наиболее старое сообщение, независимо от типа.

Если IPC_NOWAIT был послан флагом, и нет ни одного удовлетворительного сообщения, msgrcv вернет вызывающему процессу ENOMSG. В противном случае вызывающий процесс блокируется, пока в очередь не прибудет сообщение, соответствующее параметрам msgrcv(). Если, пока клиент ждет сообщения, очередь удаляется, то ему возвращается EIDRM. EINTR возвращается, если сигнал поступил, пока процесс находился на промежуточной стадии между ожиданием и блокировкой.

Давайте рассмотрим функцию-переходник для изъятия сообщения из нашей очереди. int read_message( int qid, long type, struct mymsgbuf *qbuf ) { int result, length; /* Длина есть в точности размер структуры минус sizeof(mtype) */ length = sizeof(struct mymsgbuf) - sizeof(long); if((result = msgrcv( qid, qbuf, length, type, 0 )) == -1) { return(-1); } return(result); }

После успешного изъятия сообщения удаляется из очереди и его ярлык.

Бит MSG_NOERROR в msgflg предоставляет некоторые дополнительные возможности. Если физическая длина сообщения больше, чем msgsz, и MSG_NOERROR установлен, то сообщение обрезается и возвращается только msgsz байт. Нормальный же msgrcv() возвращает -1 (E2BIG), и сообщение остается в очереди до последующих запросов. Такое поведение можно использовать для создания другой оберточной функции, которая позволит нам "подглядывать" внутрь очереди, чтобы узнать, пришло ли сообщение, удовлетворяющее нашему запросу. int peek_message( int qid, long type ) { int result, length; if((result = msgrcv( qid, NULL, 0, type, IPC_NOWAIT )) == -1) { if(errno == E2BIG) return(TRUE); } return(FALSE); }



Выше вы заметили отсутствие адреса буфера и длины. В этом конкретном случае мы хотели, чтобы вызов прошел неудачно. Однако мы проверили возвращение E2BIG, которое должно показать, существует ли сообщение затребованного типа. Оберточная функция возвращает TRUE в случае успеха, и FALSE - в противном случае. Отметьте также установленный флаг IPC_NOWAIT, который помешает блокировке, о которой мы говорили раньше.

Системный вызов msgсtl()

Благодаря использованию функций-переходников вы имеете некий элегантный подход к созданию и использованию очередей сообщений в ваших приложениях. Теперь коснемся непосредственно манипулирования внутренними структурами, связанными с данной очередью сообщений.

Для осуществления контроля над очередью предназначен системный вызов msgсtl. SYSTEM CALL: msgctl() PROTOTYPE: int msgctl ( int msgqid, int cmd, struct msqid_ds *buf ); RETURNS: 0 в случае успеха -1 в случае неудачи errno = EACCES (нет прав на чтение и cmd есть IPC_STAT) EFAULT (адрес, на который указывает buf, ошибочен для команд IPC_SET и IPC_STAT) EIDRM (очередь была уничтожена во время запроса) EINVAL (ошибочный msqid или msgsz меньше 0) EPERM (IPC_SET- или IPC_RMID-команда была послана процессом, не имеющим прав на запись (в очередь)) NOTES:

Теперь из общих соображений ясно, что прямые манипуляции с внутреностями ядра могут привести к очень занимательным последствиям. К сожалению, по-настоящему весело будет только тому, кто любит вдребезги и с наслаждением крушить подсистему IPC. Однако при использовании msgctl() с некоторыми командами вероятность огорчительных результатов не очень велика. Вот их и рассмотрим.

IPC_STAT

Сохраняет по адресу buf структуру msqid_ds для очереди сообщений.

IPC_SET

Устанавливает значение элемента ipc_perm структуры msqid. Значения выбирает из буфера.

IPC_RMID

Удаляет очередь из ядра.

Вернемся к нашему разговору о внутреннем представлении очереди сообщений: msqid_ds. Ядро держит экземпляр этой структуры для каждой очереди, существующей в системе. IPC_STAT дает возможность заиметь копию такой структуры для испытаний. Посмотрим на оберточную функцию,которая берет эту структуру и размещает копию по указанному адресу. int get_queue_ds( int qid, struct msgqid_ds *qbuf ) { if( msgctl( qid, IPC_STAT, qbuf) == -1 ) { return(-1); } return(0); }



Если копирование во внутренний буфер невозможно, то вызывающей функции возвращается -1. Если же все прошло нормально, то возвращается 0, и посланный буфер должен содержать копию внутренней структуры данных для очереди с идентификатором qid.

Что же мы можем делать с полученной копией структуры? Единственное, что можно поменять, это элемент ipc_perm. Это права доступа очереди, информация о создателе и владельце очереди. Однако и отсюда менять позволено только mode, uid и gid.

Давайте напишем оберточную функцию, изменяющую режим доступа очереди. Режим должен быть передан как массив литер (например, "660"). int change_queue_mode( int qid, char *mode ) { struct msqid_ds tmpbuf; /* Берем текущую копию внутренней структуры */ get_queue_ds( qid, &tmpbuf ); /* Применяем уже известный прикол для изменения прав доступа */ sscanf(mode, "%ho", &tmpbuf.msg_perm.mode); /* Модернизируем внутреннюю структуру */ if( msgctl( qid, IPC_SET, &tmpbuf ) == -1 ) { return(-1); } return(0); }

Мы взяли текущую копию внутренней структуры данных посредством вызова нашей get_queue_ds; затем sscanf() меняет элемент mode структуры msg_perm. Однако ничего не произойдет, пока msgctl c IPC_SET не обновил внутреннюю версию.

ОСТОРОЖНО! Изменяя права доступа, можно случайно лишить прав себя самого! Помните, что IPC-объекты не исчезают, пока они не уничтожены должным образом или не перезагружена система. Поэтому то, что вы не видите очереди ipcs-ом, не означает, что ее нет на самом деле.

После того, как сообщение взято из очереди, оно удаляется. Однако, как отмечалось ранее, IPC-объекты остаются в системе до персонального удаления или перезапуска всей системы. Поэтому наша очередь сообщений все еще существует в ядре и пригодна к употреблению в любое время, несмотря на то, что последнее его соообщение уже давно на небесах. Чтобы и душа нашей очереди с миром отошла к богам, нужен вызов msgctl(), использующий команду IPC_RMID: int remove_queue( int qid ) { if( msgctl( qid, IPC_RMID, 0) == -1) { return(-1) } return(0); }



Эта функция- переходник возвращает 0, если очередь удалена без инцедентов, в противном случае ввыдается -1. Удаление очереди неделимо и попытка любого обращения к ней будет безуспешной.

msgtool: интерактивный обработчик очередей сообщений

Мало кто станет отрицать непосредственную выгоду от возможности в любой момент получить точную техническую информацию. Подобные материалы представляют собой мощный механизм для обучения и исследования новых областей. Однако, неплохо было бы добавить к технической информации и реальные примеры. Это непременно ускорит и укрепит процесс обучения.

До сей поры все то хорошее, что мы сделали - это оберточные функции для манипуляций с очередями сообщений. Хотя они чрезвычайно полезны, ими неудобно пользоваться для дальнейшего обучения и экспериментов. Существует средство, позволяющее работать с IPC-очередями из командной строки - msgtool(). Хотя msgtool() будет использован в целях обучения, он пригодится и реально при написании скриптов.

Описание

Поведение msgtool()-а зависит от аргументов командной строки, что удобно для вызова из скрипта shell. Позволяет делать все что угодно, от создания, посылки и получения сообщений до редактирования прав доступа и удаления очереди. Изначально данными сообщений могут быть только литерные массивы. Упражнение - измените это так, чтобы можно было посылать и другие данные. Синтаксис командной строки Посылка сообщений msgtool s (type) "text" Изъятие сообщений msgtool к (type) Изменение прав доступа msgtool (mode) Уничтожение очереди msgtool d Примеры msgtool s 1 test msgtool s 5 test msgtool s 1 "This is test" msgtool r 1 msgtool d msgtool m 660

Код

Следующее, что мы рассмотрим, это исходный текст msgtool. Его следует компилировать в версии системы, которая поддерживает System V IPC. Убедитесь в наличии System V IPC в ядре, когда будете пересобирать программу!

На полях отметим, что наша утилита будет всегда создавать очередь, если ее не было.

Замечание. Поскольку msgtool использует ftok() для генерации ключей IPC, вы можете нарваться на конфликты, связанные с директориями. Если вы где-то в скрипте меняете директории, то все это наверняка не сработает. Это обходится путем более явного указания пути в msgtool, вроде "/tmp/msgtool", или даже запроса пути из командной строки вместе с остальными аргументами. /**************************************************************************** Excerpt from "Linux Programmer's Guide - Chapter 6" (C)opyright 1994-1995, Scott Burkett **************************************************************************** MODULE: msgtool.c **************************************************************************** Средство командной строки для работы со очередями сообщений в стиле SysV ****************************************************************************/ #include #include #include #include #include #include #define MAX_SEND_SIZE 80 struct mymsgbuf { long mtype; char mtext[MAX_SEND_SIZE]; }; void send_message(int qid, struct mymsgbuf *qbuf, long type, char *text); void read_message(int qid, struct mymsgbuf *qbuf, long type); void remove_queue(int qid); void change_queue_mode(int qid, char *mode); void usage(void); int main(int argc, char *argv[]) { key_t key; int msgqueue_id; struct mymsgbuf qbuf; if(argc == 1) usage(); /* Создаем уникальный ключ через вызов ftok() */ key = ftok(".",'m'); /* Открываем очередь - при необходимости создаем */ if((msgqueue_id = msgget(key, IPC_CREAT|0660)) == -1) { perror("msgget"); exit(1); } switch(tolower(argv[1][0])) { case 's': send_message(msgqueue_id, (struct mymsg buf *)&qbuf, atol(argv[2]), argv[3]); break; case 'r': read_message(msgqueue_id, &qbuf, atol(argv[2])); break; case 'd': remove_queue(msgqueue_id); break; case 'm': change_queue_mode(msgqueue_id, argv[2]); break; default: usage(); } return(0); } void send_message(int qid, struct mymsgbuf *qbuf, long type, char *text) { /* Посылаем сообщение в очередь */ printf("Sending a message ...\n"); qbuf->mtype = type; strcopy(qbuf->mtext, text); if((msgsnd(qid, (struct msgbuf *)qbuf, strlen(qbuf->mtext)+1, 0)) == -1) { perror("msgsnd"); exit(1); } } void read_message(int qid, struct mymsgbuf *qbuf, long type) { /* Вычитываем сообщение из очереди */ printf("Reading a message ...\"); qbuf->mtype = type; msgrcv(qid, (struct msgbuf *)qbuf, MAX_SEND_SIZE, type, 0); printf("Type: %ld Text: %s\n", qbuf->mtype, qbuf->mtext); } void remove_queue(int qid) { /* Удаляем очередь */ msgctl(qid, IPC_RMID, 0); } void change_queue_mode(int qid, char *mode) { struct msqid_ds myqueue_ds; /* Получаем текущее состояние */ msgctl(qid, IPC_STAT, &myqueue_ds); /* Меняем состояние в копии внутренней структуры данных */ sscanf(mode, "%ho", &myqueue_ds.msg_perm.mode) /* Обновляем состояние в самой внутренней структуре данных */ msgctl(qid, IPC_SET, &myqueue_ds); } void usage(void) { fprintf(stderr, "msgtool - A utility for tinkering with msg queues\n"); fprintf(stderr, "\nUSAGE: msgtool (s)end \n"); fprintf(stderr, " (r)ecv \n"); fprintf(stderr, " (d)elete\n"); fprintf(stderr, " (m)ode \n"); exit(1); }


Содержание раздела