Subsections


5 Implementacja

Rozdział 5 zawiera opis procesu implementacji systemu. Moim celem nie jest jednak dokładne przedstawienie całego kodu źródłowego. Chcę raczej opisać najciekawsze lub najtrudniejsze problemy, na jakie się natknąłem oraz możliwe sposoby ich rozwiązania i dokonane przeze mnie wybory.

1 Uniezależnienie biblioteki od typu zadania

Jednym z podstawowych i oczywistych wymagań stawianych przed biblioteką do rozdzielania i rozproszonego przetwarzania zadań było uniezależnienie jej od typu wykonywanego zadania. W oparciu o projekt opisany w rozdziale 4 stworzyłem bibliotekę, która wymaga od korzystającego z niej programisty jedynie: Prosty przykład wykorzystania biblioteki znajduje się w dodatku A. Dodatkowo programista może też, choć nie musi, podsunąć bibliotece klasę z własnym algorytmem przydzielania zadań do węzłów (patrz punkt 4.4.2).

Wydruk 5.1: Interfejs Task
package jodl.task;

import java.io.Serializable;
import java.sql.Connection;

public interface Task extends Serializable {
  boolean initTask(String taskId, Connection dbConnection);
  boolean toBeDone();
  boolean doIt();
  void close();
  int size();
  String getId();
  Task newInstance();
}

Klasa realizująca zadanie musi implementować interfejs Task (wydruk 5.1). Najważniejsza z metod zadeklarowanych w tym interfejsie to initTask() oraz doIt(). Wykonanie pierwszej z nich powinno zainicjować klasę wykonującą zadanie. Połączenie do bazy danych (poprzez JDBC - Java Database Connectivity) jest przekazywane przez bibliotekę do metody w drugim parametrze. Identyfikator taskId jest kluczem głównym w tabeli Job; w większości zastosowań biblioteki będzie też kluczem obcym w tabeli opisującym szczegółowe zadanie (w przypadku tworzonego przeze mnie systemu jest to tabela Strona). Natomiast metoda doIt() powinna wczytać zadanie z bazy danych, wykonać je i, jeśli trzeba, zapisać wyniki z powrotem do bazy danych. Obie metody przekazują wartość logiczną true w przypadku pomyślnego zakończenia metody (odpowiednio zainicjowania zadania lub jego wykonania) lub false w przeciwnym przypadku.

Metoda toBeDone() informuje, czy zadanie powinno być wykonane w aktualnym cyklu. Metoda size() przekazuje wielkość zadania w postaci wartości liczbowej. Jednostka z punktu widzenia biblioteki jest nieistotna, jeśli jest ona wspólna dla wszystkich zadań z jednej grupy. Wielkość zadania jest wykorzystywana przez bibliotekę do porównywania szybkości realizacji zadania w zależności od jego wielkości. Dlatego też wartość ta powinna pozostawać w liniowej zależności z przewidywanym czasem wykonania danego zadania. Innymi słowy, jeśli wiadomo, że kod wykonujący zadanie ma złożoność kwadratową w stosunku do jakiejś wartości n zależnej od zadania, to funkcja powinna przekazać wartość n2. Natomiast metoda close() powinna uwalniać zasoby zajęte podczas ładowania i wykonywania zadania, zamykać otwarte pliki, połączenia sieciowe itp.

Programistę implementującego klasę wykonującą zadanie nie interesuje, na którym węźle zadanie będzie wykonywane. Nie musi się on też martwić stworzeniem osobnego wątku realizującego zadanie czy o nawiązanie połączenia z bazą danych. Jego obowiązkiem jest tylko zaimplementować metodę doIt() i inne metody zadeklarowane w interfejsie Task. Dostępna jest także klasa pomocnicza TaskAdapter, która implementuje ten interfejs. Klasa TaskAdapter posiada puste implementacje wszystkich zadeklarowanych w interfejsie metod. Dzięki temu klasy użytkowników dziedziczące po niej mogą implementować tylko metodę doIt().

Zadania są pogrupowane w tzw. rodziny (ang. family), zdefiniowane w bazie danych. Każda rodzina posiada między innymi informację o nazwie klasy realizującej dany typ zadania. Klasa Server pobiera tę nazwę z bazy danych, tworzy obiekt tej klasy i wysyła do każdego agenta utworzony obiekt jako parametr funkcji setTaskClass(Task taskClass) zadeklarowanej w interfejsie AgentInterface. W sytuacji, gdy kodu tej klasy nie ma jeszcze w maszynie wirtualnej Javy na danym węźle, mechanizm RMI ściąga z serwera potrzebny kod poprzez protokół HTTP. Dzięki takiej konstrukcji biblioteki oraz automatycznemu ściąganiu potrzebnych klas, węzły nie muszą być w żaden sposób przygotowane do obsłużenia nowego typu zadania.


2 Efektywne sprawdzanie stron WWW

Głównym zadaniem realizowanym przez system jest sprawdzanie, czy strony WWW śledzone przez użytkowników uległy zmianie od ostatniego sprawdzenia. Takie sprawdzenie może zostać wykonane na kilka różnych sposobów. Najprostszą i co ważniejsze najszybszą metodą jest porównanie daty ostatniej modyfikacji oraz wielkości strony z informacjami zachowanymi w bazie danych. Informacje te przesyłane są w nagłówku odpowiedzi HTTP, więc można o nie zapytać używając polecenia HEAD zamiast GET czy POST (por. [Fie99]). Jeśli wartości te się nie zmieniły, można założyć, że strona także nie została zmieniona. Ta metoda jest jednak ściśle zależna od typu strony i ustawień serwera HTTP. Na przykład przy sprawdzaniu stron generowanych przez skrypty PHP, Perl czy ASP serwer przekazuje czas wygenerowania strony w polu daty ostatniej modyfikacji. W przypadku dwóch pierwszych języków, serwer nie wysyła żadnej informacji o wielkości wygenerowanej odpowiedzi (brak pola Content-length). W przypadku, gdy z informacji odebranych w nagłówku odpowiedzi HTTP nie da się z całą pewnością wnioskować, że strona się nie zmieniła, pozostaje sprawdzenie jej zawartości. Wiążą się jednak z tym dwa problemy. Pierwszy to taki, że gdy śledzona strona (lub plik) jest za duży, może to wyraźnie przedłużyć proces jej sprawdzania. Należy pamiętać, że ściąganie kilkumegabajtowego pliku przez wolne połączenie lub z wolnego serwera może trwać wiele minut. Dopuszczając pewien margines błędu można jednak założyć, że jeśli w przypadku danej strony nie zmieniła się wielkość oraz zawartość pierwszych kilkudziesięciu lub kilkuset kilobajtów, to i cała strona nie uległa zmianie, nawet przy zmienionej dacie ostatniej modyfikacji. Wydaje się, że rozsądną wielkością sprawdzanej części stron jest pierwsze 100 KB -- taką wartość przyjąłem w tworzonym systemie. Oczywiście wartość ta może zostać w przyszłości w prosty sposób zmodyfikowana. Ponieważ jednak sprawdzanie tylko części strony nie pozwala z całą pewnością stwierdzić, czy zmiana nastąpiła, użytkownicy serwisu będą informowani o ograniczeniu na wielkość porównywanej zawartości stron.

Drugim z problemów jest ilość miejsca na dysku, które będzie zajmować ostatnia zawartość śledzonych stron. Kilka milionów śledzonych stron może oznaczać kilkaset gigabajtów potrzebnej przestrzeni dyskowej. Ponieważ system sprawdzania stron jest rozproszony, dane te muszą być dostępne tam, gdzie faktycznie następuje sprawdzanie -- czyli w węzłach. Rozwiązaniem byłoby przesyłanie zawartości strony z jednego centralnego serwera do węzła, który ją sprawdza, lub trzymanie zduplikowanych danych na każdym węźle. W drugim przypadku zmiana przydziału zadania sprawdzania danej strony oznaczałaby także konieczność przeniesienia przechowywanej zawartości tej strony. Oba te pomysły mają poważne wady (gorsza wydajność, konieczność zapewniania spójności danych, problemy z zapewnieniem odporności na błędy) i wyraźnie komplikują system.

Aby uniknąć konieczności przechowywania całej zawartości stron, przy sprawdzaniu stron WWW korzystam z funkcji MD5. Jest to funkcja haszująca (funkcja skrótu), która dla ciągu bajtów dowolnej długości przekazuje 128 bitów (16 bajtów). Zamiast porównywać aktualną i poprzednią zawartość strony, porównuję tylko wyniki funkcji MD5 na tych zawartościach. Oznacza to konieczność przechowywania jedynie wyniku tej funkcji na ostatniej zawartości każdej strony, a nie całej zawartości. Dobra funkcja haszująca H(M) przekazująca wartość h (h = H(M)), ma następujące cechy (przytaczam za [Sch96]):

Przy wykorzystaniu funkcji haszującej do rozróżniania stron WWW istotna staje się jeszcze jedna cecha: Funkcja MD5, stworzona przez Rona Rivesta, uznanego specjalistę w dziedzinie kryptografii[*], spełnia te warunki. Ponieważ wynik funkcji ma 128 bitów, prawdopodobieństwo, że funkcja MD5 na zawartości dwóch różnych stron przekaże ten sam wynik wynosi 2-128. Prawdopodobieństwo to jest na tyle małe, iż na potrzeby tworzonego przeze mnie systemu można przyjąć, że dwie różne strony dają dwa różne wyniki tej funkcji.


3 Wysyłanie powiadomień jako drugi typ zadania

Po każdym sprawdzeniu śledzonych stron WWW, system musi wysłać do użytkowników listy z powiadomieniem o zmianach. Wysyłanie powiadomień jest wykonywane w każdym cyklu (jeśli któraś ze śledzonych stron się zmieniła), więc zostało w systemie potraktowane jako nowy typ zadania. Dzięki temu powiadomienia są wysyłane z dostępnych węzłów, a nie tylko z serwera głównego. Wysyłanie powiadomień przy użyciu powstałej biblioteki jest przy okazji dobrym sposobem przetestowania jej funkcjonalności. Mogłem w ten sposób sprawdzić, czy -- zgodnie z projektem -- biblioteka umożliwia współistnienie dwóch odrębnych typów zadań.

4 Komunikacja przy użyciu RMI

Technologia zdalnego wołania metod RMI, zapewniająca komunikację między serwerem a węzłami, wymaga zdefiniowania interfejsu określającego jakie metody można wywoływać na udostępnianym zdalnie obiekcie. W przypadku tworzonej przeze mnie biblioteki będzie to interfejs AgentInterface (wydruk 5.2). Zgodnie z wymaganiami technologii RMI interfejs zdalnego obiektu musi dziedziczyć po java.rmi.Remote, a każda metoda powinna móc zgłaszać wyjątek java.rmi.RemoteException.

Kolejne metody zadeklarowane w interfejsie AgentInterface odpowiadają komunikatom przesyłanym z serwera do węzłów (punkt 4.4.3). Bezparametrowa, nie przekazująca żadnej wartości metoda test będzie służyć jedynie sprawdzaniu, czy połączenie z danym agentem jest wciąż aktywne.

Wydruk 5.2: Interfejs AgentInterface
package jodl.agent;

import jodl.task.*;

import java.rmi.Remote; import java.rmi.RemoteException;

public interface AgentInterface extends Remote {

  void initDatabase(String server, int port,
    String name, String user, String password)
    throws RemoteException;

  void setCycleNumber(int cycle) throws RemoteException;

  void setTaskClass(Task taskClass) throws RemoteException;

  void doTasks(String[] tasks) throws RemoteException;

  int howManyLeft() throws RemoteException;

  String[] getSomeUndone(int howMany, int minLeft)
    throws RemoteException;

  String[] getDone() throws RemoteException;

  String[] getFailed() throws RemoteException;

  void test() throws RemoteException;
}

5 Podział na pakiety

Klasy składające się na bibliotekę JODL oraz na korzystający z niej system do sprawdzania stron WWW zostały podzielone na pakiety. Podział ten odzwierciedla funkcje realizowane przez klasy, a nazwy pakietów określają ich zawartość. Wyraźny podział klas składających się na system ułatwił rozdzielenie biblioteki JODL od implementacji serwisu CoNowego.pl, a wewnątrz biblioteki rozróżnienie klas działających na serwerze, na węzłach oraz klas wykorzystywanych w obu tych miejscach. Zawartość poszczególnych pakietów jest następująca:


6 Biblioteka JODL: wydajność a łatwość korzystania

Sercem tworzonego przeze mnie systemu jest biblioteka o nazwie JODL, służąca do rozdzielania i rozproszonego przetwarzania zadań. Ma ona stanowić oddzielny, gotowy do użycia produkt. Dlatego też jednym z celów, jakie przed sobą stawiałem, było takie zaprojektowanie biblioteki, aby korzystanie z niej było proste. Jednak na etapie projektowania, a później implementacji systemu okazało się, że jednoczesne zapewnienie wydajności oraz łatwości obsługi jest niemożliwe -- wymagania te stawały często w sprzeczności. W takich sytuacjach wybierałem wydajność, gdyż jest to główne wymaganie stawiane przez biblioteką i całym systemem. W efekcie udało mi się spełnić wymagania wydajnościowe, jednak odbiło się to na prostocie biblioteki.

Konieczność dokonania wyboru pomiędzy wydajnością a prostotą jest najbardziej widoczna w sposobie przesyłania zadań do węzłów. W powstałej implementacji serwer główny najpierw wysyła do agentów obiekt klasy realizującej zadanie, a następnie listę identyfikatorów zadań do wykonania. Kolejne obiekty dla każdego zadania są tworzone i inicjowane po stronie agentów. Inicjowanie polega zwykle na połączeniu się z bazą danych znajdującą się na serwerze. Prostszym pomysłem mogłoby się wydawać tworzenie i inicjowanie obiektów na serwerze, po czym przesyłanie do węzłów gotowych obiektów zamiast samych identyfikatorów. Jednak takie rozwiązanie szybko okazałoby się wąskim gardłem systemu. Przesyłanie w argumentach zdalnie wołanej funkcji tablicy serializowanych[*] obiektów zamiast tablicy identyfikatorów tekstowych oznacza spowolnienie komunikacji. Z tych powodów zdecydowałem się na ostatecznie zaimplementowane rozwiązanie.

Projektując i implementując system starałem się jednak w miarę możliwości upraszczać korzystanie z biblioteki. Przykładem na to może być metoda initTask zadeklarowana w interfejsie Task. Metoda ta inicjuje zadanie o podanym identyfikatorze. Jednym z jej argumentów jest obiekt klasy java.sql.Connection będący otwartym połączeniem z bazą danych. Dzięki temu programista implementujący interfejs Task nie musi zajmować się nawiązywaniem połączenia z bazą danych, obsługą potencjalnie powstałych przy okazji błędów itp.

7 Dostęp do bazy danych

Aktualnie system korzysta z bazy danych Postgres. Implementując oprogramowanie serwera głównego i węzłów chciałem jednak umożliwić w przyszłości łatwą migrację na inny system bazodanowy. Aby zapewnić taką możliwość, wszystkie metody dostępu do bazy danych zostały zadeklarowane w abstrakcyjnej klasie Database. Klasa Postgres, dziedzicząca po Database, zawiera implementacje tych metod przy użyciu poleceń języka SQL działających w bazie danych Postgres. Aktualnie klasy Server oraz Agent korzystają z obiektu klasy Postgres. Sposób jej instancjonowania pokazano na wydruku 5.3. Zmiana używanej w systemie bazy danych pociąga za sobą konieczność stworzenia odpowiedniej podklasy klasy Database (np. klasy MySQL) oraz zmienienia tego jednego wiersza kodu. Oczywiście jest możliwe dalsze uproszczenie tej podmiany. Nazwa klasy implementującej połączenie z bazą danych mogłaby być trzymana w pliku konfiguracyjnym i używana do jej instancjonowania przy pomocy metody Class.forName(...).newInstance(). Wydaje się jednak, że nie ma takiej potrzeby, gdyż zmiana bazy danych, na której opiera się system, będzie dokonywana najwyżej sporadycznie.

Wydruk 5.3: Instancjonowanie klasy Postgres
...
Database db = new Postgres();
...

Metody zdefiniowane w klasie Postgres korzystają z klasy pomocniczej SQL. Udostępnia ona metody ułatwiające wysyłanie zapytań w języku SQL do serwera bazy danych oraz interpretowanie wyników tych zapytań. Przykładowo, metoda getArray() przedstawiona na wydruku 5.4 przekazuje tablicę obiektów klasy String z zawartością pierwszej kolumny wyników przekazanego zapytania.

Wydruk 5.4: Fragment klasy SQL
...
public static String[] getArray(Connection connection, String query)
  throws SQLException
{
  ResultSet rs = getResults(connection, query);
  String[] result = new String[rs.getFetchSize()];

  int i = 0;
  while (rs.next()) result[i++] = rs.getString(1);

  return result;
}
...

8 Synchronizacja dostępu do tablic

Pisząc program wielowątkowy, programista często staje przed problemem synchronizacji wątków przy dostępie do struktur danych. Klasycznym przykładem jest korzystanie z jednej listy przez kilka wątków, z których część wstawia elementy do tej listy, a część je pobiera. W literaturze (np. [Ben96]) taka sytuacja jest często omawiana na przykładzie tzw. problemu producenta-konsumenta. Jego rozwiązanie wymaga zwykle użycia mechanizmu semaforów lub monitorów.

W przypadku oprogramowania węzłów problem ten pojawia się przy odwołaniach do tablic z nieprzetworzonymi jeszcze zadaniami. Klasa Agent wstawia do tablicy zadania przydzielone przez serwer główny, a wątki realizujące zadania pobierają kolejne zadania z tej tablicy. Tablica, z której korzystam, jest obiektem standardowo dostępnej klasy java.util.Vector. Obiekty tej klasy są monitorami[*], co oznacza, że dwa wątki nie mogą być jednocześnie w trakcie wykonywania którejkolwiek metody tego samego obiektu tej klasy. Dzięki temu współbieżne operacje na obiektach klasy Vector nie generują błędów. Funkcjonalność podstawowych metod tej klasy jest jednak niewystarczająca. Potrzebna jest atomowa, blokująca metoda pobierania elementu z listy, która:

Aby udostępnić metodę oferującą taką funkcjonalność, musiałem napisać klasę SyncVector. Treści najważniejszych jej metod (putObject oraz getObject) są przedstawione na wydruku 5.5. Klasa ta jest monitorem (jej metody mają modyfikator synchronized), a jednocześnie korzysta z obiektu klasy Semaphore. Wartość trzymana w semaforze odpowiada różnicy liczby wywołań metody V() i P(), czyli jest równa liczbie elementów znajdujących się na liście. Dzięki temu wywołanie metody getObject(), gdy lista jest pusta, oznacza zawieszenie na semaforze wątku wołającego tę metodę.

Wydruk 5.5: Klasa SyncVector
package jodl.util;

import java.util.*;

public class SyncVector {

  private Vector vector;

  private Semaphore sem;

  public SyncVector()
  {
    vector = new Vector();
    sem = new Semaphore(this);
  }

  public synchronized void putObject(Object object)
  {
    vector.addElement(object);
    sem.V();
  }

  public synchronized Object getObject()
  {
    sem.P();
    Object o = vector.lastElement();
    vector.removeElementAt(vector.size() - 1);
    return o;
  }

// ...

}

Klasa Semaphore, przedstawiona na wydruku 5.6, jest także monitorem, ponieważ treści jej metod są sekcjami krytycznymi oznaczonymi słowem kluczowym synchronized. Jest ona rozszerzeniem klasy CountingSemaphore zaproponowanej w [Lea96]. Przy standardowym zainicjowaniu przez konstruktor bezparametrowy lub z parametrem int obiekt tej klasy może być wołany z kodu niesynchronizowanego. Wywołania metod V() i P() są wtedy synchronizowane na poziomie dostępu do obiektu semafora (identyfikator this). Natomiast gdy metody V() i P() są wołane z innego monitora (np. z obiektu klasy SyncVector), konieczne jest wskazanie obiektu wołającego jako obiektu synchronizującego. Do ustawienia tego obiektu służą dwa ostatnie konstruktory klasy Semaphore. Na obiekcie synchronizującym (monitorze) wołane są także metody wait() oraz notify().

Wydruk 5.6: Klasa Semaphore
package jodl.util;

public class Semaphore {

  private int value = 0;

  private Object object;

  public Semaphore() { object = this; }

  public Semaphore(int v) { object = this; value = v; }

  public Semaphore(Object syncObject) { object = syncObject; }

  public Semaphore(int v, Object syncObject)
    { object = syncObject; value = v; }

  public void P()
  {
    synchronized (object) {
      while (value <= 0) try { object.wait(); } catch (Exception e) { }
      value--;
    }
  }

  public void V()
  {
    synchronized (object) {
      value++;
      if (value > 0) object.notify();
    }
  }
  
}


Sebastian Łopieński