Do spisu tresci tematu 3

3.2.2 Kolejki komunikatow




Spis tresci


Wprowadzenie

Kolejki komunikatow to specjalne listy (kolejki) w jadrze, zawierajace odpowiednio sformatowane dane i umozliwiajace ich wymiane poprzez dowolne procesy w systemie. Istnieje mozliwosc umieszczania komunikatow w okreslonych kolejkach (z zachowaniem kolejnosci ich wysylania przez procesy) oraz odbierania komunikatu na pare roznych sposobow (zaleznie od typu, czasu przybycia itp.).

Struktury danych


Wprowadzenie

Za kazda kolejke komunikatow odpowiada jedna struktura typu msqid_ds. Komunikaty danej kolejki przechowywane sa na liscie, ktorej elementami sa struktury typu msg - kazda z nich posiada informacje o typie komunikatu, wskaznik do nastepnej struktury msg oraz wskaznik do miejsca w pamieci, gdzie przechowywana jest wlasciwa tresc komunikatu. Dodatkowo, kazdej kolejce komunikatow przydziela sie dwie kolejki typu wait_queue, na ktorych spia procesy zawieszone podczas wykonywania operacji czytania badz pisania do danej kolejki. Ponizszy rysunek przedstawia wyzej omowione zaleznosci:

(rysunek)

W pliku include/linux/msg.h zdefiniowane sa ograniczenia na liczbe i wielkosc kolejek oraz komunikatow w nich umieszczanych:

#define MSGMNI 128    /* <= 1K    max # kolejek komunikatow          */
#define MSGMAX 4056   /* <= 4056  max rozmiar komunikatu (w bajtach) */
#define MSGMNB 16384  /* ?        max wielkosc kolejki (w bajtach)   */

Struktura msqid_ds

Oto dokladna definicja struktury msqid_ds z pliku include/linux/msg.h:
/* jedna struktura msg dla kazdej kolejki w systemie */
struct msqid_ds {
  struct ipc_perm     msg_perm;
  struct msg         *msg_first;    /* pierwszy komunikat w kolejce */
  struct msg         *msg_last;     /* ostatni komunikat w kolejce */
  __kernel_time_t     msg_stime;    /* czas ostatniego msgsnd */
  __kernel_time_t     msg_rtime;    /* czas ostatniego msgrcv */
  __kernel_time_t     msg_ctime;    /* czas ostatniej zmiany */
  struct wait_queue  *wwait;
  struct wait_queue  *rwait;
  unsigned short      msg_cbytes;   /* liczba bajtow w kolejce */
  unsigned short      msg_qnum;     /* liczba komunikatow w kolejce */
  unsigned short      msg_qbytes;   /* maksymalna liczba bajtow w kolejce */
  __kernel_ipc_pid_t  msg_lspid;    /* pid ostatniego msgsnd */
  __kernel_ipc_pid_t  msg_lrpid;    /* pid ostatniego receive*/
};
Dodatkowe wyjasnienia:
msg_perm
Jest to instancja struktury ipc_perm, zdefiniowanej w pliku linux/ipc.h. Zawiera informacje o prawach dostepu do danej kolejki oraz o jej zalozycielu.

wwait, rwait
Przydzielone danej kolejce komunikatow dwie kolejki typu wait_queue, na ktorych spia procesy zawieszone podczas wykonywania operacji odpowiednio czytania oraz pisania w danej kolejce komunikatow.

Struktura msg

Oto dokladna definicja struktury msg z pliku include/linux/msg.h:
/* jedna struktura msg dla kazdego komunikatu */
struct msg {
  struct msg *msg_next;   /* nastepny komunikat w kolejce */
  long        msg_type;          
  char       *msg_spot;
  time_t      msg_stime;  /* czas wyslania tego komunikatu */
  short       msg_ts;     /* dlugosc wlasciwej tresci komunikatu */
};
Dodatkowe wyjasnienia:
msg_type
Typ przechowywanego komunikatu. Wysylanemu do kolejki komunikatowi nadawca przypisuje dodatnia liczbe naturalna, stajaca sie jego typem. Przy odbiorze komunikatu mozna zazadac komunikatow okreslonego typu (patrz opis funkcji msgrcv()).

msg_spot
Wskaznik do miejsca w pamieci, gdzie przechowywana jest wlasciwa tresc komunikatu. Na kazdy komunikat przydzielane jest oddzielne miejsce w pamieci.

Funkcje i ich implementacja


Wprowadzenie

Istnieja cztery funkcje systemowe do obslugi komunikatow: msgget() sluzy do uzyskania identyfikatora kolejki komunikatow uzywanego przez pozostale funkcje, msgctl() umozliwia ustawianie i pobieranie wartosci parametrow zwiazanych z kolejkami komunikatow oraz usuwanie kolejek, msgsnd() wysyla komunikat, a msgrcv() komunikat odbiera.

Funkcja msgget()

Funkcja ta sluzy do utworzenia nowej kolejki komunikatow, lub uzyskania dostepu do kolejki istniejacej.
DEFINICJA: int msgget(key_t key, int msgflg)
    WYNIK: identyfikator kolejki w przypadku sukcesu
           -1, gdy blad: errno = EACCESS (brak praw)
                                 EEXIST  (kolejka o podanym kluczu istnieje,
                                          wiec niemozliwe jest jej utworzenie)
                                 EIDRM   (kolejka zostala w miedzyczasie skasowana)
                                 ENOENT  (kolejka nie istnieje),
                                 EIDRM  (kolejka zostala w miedzyczasie skasowana)
                                 ENOMEM  (brak pamieci na kolejke)
                                 ENOSPC  (liczba kolejek w systemie jest rowna
                                          maksymalnej)
Pierwszym argumentem funkcji jest wartosc klucza, porownywana z istniejacymi wartosciami kluczy. Zwracana jest kolejka o podanym kluczu, przy czym flaga IPC_CREAT powoduje utworzenie kolejki w przypadku braku kolejki o podanym kluczu, zas flaga IPC_EXCL uzyta z IPC_CREAT powoduje blad EEXIST, jesli kolejka o podanym kluczu juz istnieje. Wartosc klucza rowna IPC_PRIVATE zawsze powoduje utworzenie nowej kolejki.

Dzialanie funkcji jest analogiczne do odpowiednich funkcji na semaforach oraz segmentach pamieci dzielonej (patrz opis algorytmu *get w rozdziale ,,Cechy wspolne IPC''). W przypadku koniecznosci utworzenia nowej kolejki alokowana jest nowa struktura typu msqid_ds.

Funkcja msgsnd()

- sluzy do wyslania komunikatu do kolejki.
DEFINICJA: int msgsnd(int msqid, struct msgbuf *msgp, int msgsz,
                      int msgflg)
    WYNIK: 0 w przypadku sukcesu
           -1, gdy blad: errno = EAGAIN (pelna kolejka (IPC_NOWAIT))
                                 EACCES (brak praw zapisu)
                                 EFAULT (zly adres msgp)
                                 EIDRM  (kolejka zostala w miedzyczasie skasowana)
                                 EINTR  (otrzymano sygnal podczas czekania)
                                 EINVAL (zly identyfikator kolejki, typ lub rozmiar
                                         komunikatu)
                                 ENOMEM (brak pamieci na komunikat)
Pierwszym argumentem funkcji jest identyfikator kolejki. msgp jest wskaznikiem do struktury typu msgbuf, zawierajacej wysylany komunikat. Struktura ta jest zdefiniowana w pliku linux/msg.h nastepujaco:
/* message buffer for msgsnd and msgrcv calls */
struct msgbuf {
  long mtype;         /* typ komunikatu   */
  char mtext[1];      /* tresc komunikatu */
};
Jest to jedynie przykladowa postac tej struktury; programista moze zdefiniowac sobie a nastepnie wysylac dowolna inna strukture, pod warunkiem, ze jej pierwszym polem bedzie wartosc typu long, zas rozmiar nie bedzie przekraczac wartosci MSGMAX (=4096). Wartosc msgsz w wywolaniu funkcji msgsnd jest rowna rozmiarowi komunikatu (w bajtach), nie liczac typu komunikatu (sizeof(long)). Flaga IPC_NOWAIT zapewnia, ze w przypadku braku miejsca w kolejce funkcja natychmiast zwroci blad EAGAIN.

Implementacja funkcji:

{
   sprawdzenie poprawnosci parametrow msqid, msgp i msgsz;
   if ( msgque[msqid] == IPC_UNUSED || msgque[msqid] == IPC_NOID )
      return( EIDRM );
   if ( ipc_perm.seq != msqid/MSGMNI )
      return( EINVAL );  /* patrz rozdzial o cechach wspolnych IPC */
   while (nie ma wolnego miejsca w kolejce) do
   {
      if ( flaga IPC_NOWAIT )
         return( EAGAIN );
      interruptible_sleep_on( msqid_ds.wwait );  /* spimy w kolejce */
      if ( przyczyna przebudzenia bylo przerwanie )
         return( EINTR );
   }
   if ( msgque[msqid] == IPC_UNUSED || msgque[msqid] == IPC_NOID )
      return( EIDRM );
   if ( ipc_perm.seq != msqid/MSGMNI )
      return( EINVAL );  /* patrz rozdzial o cechach wspolnych IPC */
   przydzielenie nowej struktury msg oraz bufora na wlasciwy komunikat;
   przepisanie tekstu od uzytkownika oraz nadanie odpowiednich wartosci
      pozostalym polom struktury msg;
   zamaskowanie przerwan na czas dolaczenia struktury msg na koniec
      kolejki komunikatow;
   aktualizacja statystyk kolejki;
   wake_up( msqid_ds.rwait ); /*obudzenie kolejki czekajacych na czytanie */
}

Funkcja msgrcv()

- sluzy do odebrania komunikatu z kolejki.
DEFINICJA: int msgrcv(int msgqid, struct msgbuf *msgp, int msgsz,
                      long type, int msgflg)
    WYNIK: liczba bajtow skopiowanych do bufora w przypadku sukcesu
           -1, gdy blad: errno = E2BIG  (dlugosc komunikatu wieksza od msgsz)
                                 EACCES (brak praw odczytu)
                                 EFAULT (zly adres msgp)
                                 EIDRM  (kolejka zostala w miedzyczasie skasowana)
                                 EINTR  (otrzymano sygnal podczas czekania)
                                 EINVAL (zly identyfikator kolejki lub msgsz < 0)
                                 ENOMSG (brak komunikatu (IPC_NOWAIT))
Pierwszym argumentem funkcji jest identyfikator kolejki. msgp wskazuje na adres bufora, do ktorego ma byc przekopiowany odbierany komunikat. msgsz to rozmiar owego bufora, z wylaczeniem pola mtype (sizeof(long)). mtype wskazuje na rodzaj komunikatu, ktory chcemy odebrac. Jadro przydzieli nam najstarszy komunikat zadanego typu, przy czym: Ponadto, flaga IPC_NOWAIT w przypadku braku odpowiedniego komunikatu powoduje natychmiastowe wyjscie z bledem, zas MSG_NOERROR powoduje brak bledu w przypadku, gdy komunikat nie miesci sie w buforze (zostaje przekopiowane tyle, ile sie miesci).

Implementacja funkcji:

{
   sprawdzenie poprawnosci parametrow msqid, msgp i msgsz;
   if ( msgque[msqid] == IPC_UNUSED || msgque[msqid] == IPC_NOID )
      return( EIDRM );
   if ( ipc_perm.seq != msqid/MSGMNI )
      return( EINVAL );  /* patrz rozdzial o cechach wspolnych IPC */
   while (nie ma w kolejce interesujacego nas komunikatu) do
   {
      if ( flaga IPC_NOWAIT )
         return( EAGAIN );
      interruptible_sleep_on( msqid_ds.rwait );   /* spimy w kolejce */
      if ( przyczyna przebudzenia bylo przerwanie )
         return( EINTR );
   }
   if ( msgque[msqid] == IPC_UNUSED || msgque[msqid] == IPC_NOID )
      return( EIDRM );
   if ( ipc_perm.seq != msqid/MSGMNI )
      return( EINVAL );  /* patrz rozdzial o cechach wspolnych IPC */
   if ( ( za duzy komunikat ) && ( ! flaga MSG_NOERROR ) )
      return( E2BIG );
   zamaskowanie przerwan na czas usuwania struktury msg z kolejki komunikatow;
   aktualizacja statystyk kolejki;
   wake_up( msqid_ds.wwait );   /* obudzenie kolejki czekajacych na pisanie */
   przepisanie komunikatu do uzytkownika oraz zwolnienie struktury msg;
}

Funkcja msgctl()

- sluzy do modyfikowania oraz odczytu rozmaitych wlasciwosci kolejki.
DEFINICJA: int msgctl(int msgqid, int cmd, struct msqid_ds *buf)
    WYNIK: 0 w przypadku sukcesu
           -1, gdy blad: errno = EACCES (brak praw czytania (IPC_STAT))
                                 EFAULT (zly adres buf)
                                 EIDRM  (kolejka zostala w miedzyczasie skasowana)
                                 EINVAL (zly identyfikator kolejki lub msgsz < 0)
                                 EPERM  (brak praw zapisu (IPC_SET lub IPC_RMID))
Dopuszczalne komendy to:

Dzialanie funkcji sprowadza sie do przekopiowania odpowiednich wartosci od lub do uzytkownika, lub skasowania kolejki. Usuniecie kolejki wyglada nastepujaco:

{
   msqid_ds.ipc_perm.seq+=1;     /* patrz opis struktury ipc_perm w rozdziale
                                    o cechach wspolnych mechanizmow IPC */
   msg_seq+=1;        /* zwiekszenie wartosci globalnej zmiennej zwiazanej z
                         ipc_perm.seq - patrz tenze rozdzial */
   uaktualnienie statystyk;
   msgque[msqid]=IPC_UNUSED;
   obudzenie czekajacych na pisanie lub czytanie do/z usuwanej kolejki;
   zwolnienie struktur przydzielonych kolejce;
}

Bibliografia

  1. Pliki zrodlowe Linuxa:
  2. Sven Goldt, Sven van der Meer, Scott Burkett, Matt Welsh: The Linux Programmer's Guide - rozdzial o kolejkach komunikatow

Pytania i odpowiedzi

1. Po umieszczeniu nowego komunikatu proces, ktory to uczynil, budzi wszystkich czekajacych w kolejce msqid_ds.rwait wywolujac funkcje wake_up(). Czy zapewnia to brak zaglodzenia, tj. procesy, ktore czekaja najdluzej, pierwsze zostana obudzone, czy tez kolejnosc budzenia ich bedzie przypadkowa?

Teoretycznie moze dojsc do zaglodzenia. Wywolanie funkcji wake_up() spowoduje obudzenie wszystkich spiacych procesow. Nie mozna przewidziec, ktory z nich odbierze nowy komunikat.


Autor: Tomasz Lukaszewicz