Subsections
5 Implementacja
Rozdział 5 zawiera opis procesu implementacji systemu.
Moim celem nie jest jednak dokładne przedstawienie całego kodu
źródłowego. Chcę raczej opisać najciekawsze lub najtrudniejsze
problemy, na jakie się natknąłem oraz możliwe sposoby ich
rozwiązania i dokonane przeze mnie wybory.
Jednym z podstawowych i oczywistych wymagań stawianych przed
biblioteką do rozdzielania i rozproszonego przetwarzania zadań
było uniezależnienie jej od typu wykonywanego zadania. W oparciu o
projekt opisany w rozdziale 4 stworzyłem bibliotekę,
która wymaga od korzystającego z niej programisty jedynie:
- napisania klasy realizującej zadanie i umieszczenia jej na serwerze głównym,
- napisania klasy tworzącej obiekt klasy Server (opisanej w punkcie 4.4.4) i wywołującej jego metodę run.
Prosty przykład wykorzystania biblioteki znajduje się w dodatku
A. Dodatkowo programista może też, choć nie musi, podsunąć
bibliotece klasę z własnym algorytmem przydzielania zadań do
węzłów (patrz punkt 4.4.2).
Wydruk 5.1:
Interfejs Task
package jodl.task;
import java.io.Serializable;
import java.sql.Connection;
public interface Task extends Serializable {
boolean initTask(String taskId, Connection dbConnection);
boolean toBeDone();
boolean doIt();
void close();
int size();
String getId();
Task newInstance();
}
|
Klasa realizująca zadanie musi implementować interfejs
Task (wydruk 5.1). Najważniejsza z metod zadeklarowanych w tym interfejsie to initTask() oraz
doIt(). Wykonanie pierwszej z nich powinno zainicjować
klasę wykonującą zadanie. Połączenie do bazy danych (poprzez JDBC
- Java Database Connectivity) jest przekazywane przez
bibliotekę do metody w drugim parametrze. Identyfikator
taskId jest kluczem głównym w tabeli Job; w
większości zastosowań biblioteki będzie też kluczem obcym w tabeli
opisującym szczegółowe zadanie (w przypadku tworzonego przeze mnie
systemu jest to tabela Strona). Natomiast metoda
doIt() powinna wczytać zadanie z bazy danych, wykonać je
i, jeśli trzeba, zapisać wyniki z powrotem do bazy danych. Obie
metody przekazują wartość logiczną true w przypadku
pomyślnego zakończenia metody (odpowiednio zainicjowania zadania
lub jego wykonania) lub false w przeciwnym przypadku.
Metoda toBeDone() informuje, czy zadanie powinno być
wykonane w aktualnym cyklu. Metoda size() przekazuje
wielkość zadania w postaci wartości liczbowej. Jednostka z punktu
widzenia biblioteki jest nieistotna, jeśli jest ona wspólna dla
wszystkich zadań z jednej grupy. Wielkość zadania jest
wykorzystywana przez bibliotekę do porównywania szybkości
realizacji zadania w zależności od jego wielkości. Dlatego też
wartość ta powinna pozostawać w liniowej zależności z
przewidywanym czasem wykonania danego zadania. Innymi słowy, jeśli
wiadomo, że kod wykonujący zadanie ma złożoność kwadratową w
stosunku do jakiejś wartości n zależnej od zadania, to
funkcja powinna przekazać wartość n2. Natomiast metoda
close() powinna uwalniać zasoby zajęte podczas ładowania
i wykonywania zadania, zamykać otwarte pliki, połączenia sieciowe
itp.
Programistę implementującego klasę wykonującą zadanie nie
interesuje, na którym węźle zadanie będzie wykonywane. Nie
musi się on też martwić stworzeniem osobnego wątku realizującego
zadanie czy o nawiązanie połączenia z bazą danych. Jego obowiązkiem
jest tylko zaimplementować metodę doIt() i inne metody
zadeklarowane w interfejsie Task. Dostępna jest także
klasa pomocnicza TaskAdapter, która implementuje ten
interfejs. Klasa TaskAdapter posiada puste implementacje
wszystkich zadeklarowanych w interfejsie metod. Dzięki temu klasy
użytkowników dziedziczące po niej mogą implementować tylko metodę
doIt().
Zadania są pogrupowane w tzw. rodziny (ang. family),
zdefiniowane w bazie danych. Każda rodzina posiada między innymi
informację o nazwie klasy realizującej dany typ zadania. Klasa
Server pobiera tę nazwę z bazy danych, tworzy obiekt tej klasy i
wysyła do każdego agenta utworzony obiekt jako parametr funkcji
setTaskClass(Task taskClass) zadeklarowanej w interfejsie
AgentInterface. W sytuacji, gdy kodu tej klasy nie ma jeszcze w
maszynie wirtualnej Javy na danym węźle, mechanizm RMI ściąga z
serwera potrzebny kod poprzez protokół HTTP. Dzięki takiej
konstrukcji biblioteki oraz automatycznemu ściąganiu potrzebnych
klas, węzły nie muszą być w żaden sposób przygotowane do
obsłużenia nowego typu zadania.
2 Efektywne sprawdzanie stron WWW
Głównym zadaniem realizowanym przez system jest sprawdzanie, czy
strony WWW śledzone przez użytkowników uległy zmianie od
ostatniego sprawdzenia. Takie sprawdzenie może zostać wykonane na
kilka różnych sposobów. Najprostszą i co ważniejsze najszybszą
metodą jest porównanie daty ostatniej modyfikacji oraz wielkości
strony z informacjami zachowanymi w bazie danych. Informacje te
przesyłane są w nagłówku odpowiedzi HTTP, więc można o nie zapytać
używając polecenia HEAD zamiast GET czy POST (por.
[Fie99]). Jeśli wartości te się nie zmieniły, można
założyć, że strona także nie została zmieniona. Ta metoda jest
jednak ściśle zależna od typu strony i ustawień serwera HTTP. Na
przykład przy sprawdzaniu stron generowanych przez skrypty PHP,
Perl czy ASP serwer przekazuje czas wygenerowania strony w polu daty
ostatniej modyfikacji. W przypadku dwóch pierwszych języków,
serwer nie wysyła żadnej informacji o wielkości wygenerowanej
odpowiedzi (brak pola Content-length).
W przypadku, gdy z informacji odebranych w nagłówku odpowiedzi
HTTP nie da się z całą pewnością wnioskować, że strona się nie
zmieniła, pozostaje sprawdzenie jej zawartości. Wiążą się jednak z
tym dwa problemy. Pierwszy to taki, że gdy śledzona strona (lub
plik) jest za duży, może to wyraźnie przedłużyć proces jej
sprawdzania. Należy pamiętać, że ściąganie kilkumegabajtowego
pliku przez wolne połączenie lub z wolnego serwera może trwać
wiele minut. Dopuszczając pewien margines błędu można jednak
założyć, że jeśli w przypadku danej strony nie zmieniła się
wielkość oraz zawartość pierwszych kilkudziesięciu lub kilkuset
kilobajtów, to i cała strona nie uległa zmianie, nawet przy
zmienionej dacie ostatniej modyfikacji. Wydaje się, że rozsądną
wielkością sprawdzanej części stron jest pierwsze 100 KB -- taką
wartość przyjąłem w tworzonym systemie. Oczywiście wartość ta może
zostać w przyszłości w prosty sposób zmodyfikowana. Ponieważ
jednak sprawdzanie tylko części strony nie pozwala z całą
pewnością stwierdzić, czy zmiana nastąpiła, użytkownicy serwisu
będą informowani o ograniczeniu na wielkość porównywanej
zawartości stron.
Drugim z problemów jest ilość miejsca na dysku, które będzie
zajmować ostatnia zawartość śledzonych stron. Kilka milionów
śledzonych stron może oznaczać kilkaset gigabajtów potrzebnej
przestrzeni dyskowej. Ponieważ system sprawdzania stron jest
rozproszony, dane te muszą być dostępne tam, gdzie faktycznie
następuje sprawdzanie -- czyli w węzłach. Rozwiązaniem byłoby
przesyłanie zawartości strony z jednego centralnego serwera do
węzła, który ją sprawdza, lub trzymanie zduplikowanych danych na
każdym węźle. W drugim przypadku zmiana przydziału zadania
sprawdzania danej strony oznaczałaby także konieczność
przeniesienia przechowywanej zawartości tej strony. Oba te pomysły
mają poważne wady (gorsza wydajność, konieczność zapewniania
spójności danych, problemy z zapewnieniem odporności na błędy) i
wyraźnie komplikują system.
Aby uniknąć konieczności przechowywania całej zawartości stron,
przy sprawdzaniu stron WWW korzystam z funkcji MD5. Jest to
funkcja haszująca (funkcja skrótu), która dla ciągu bajtów dowolnej
długości przekazuje 128 bitów (16 bajtów). Zamiast porównywać aktualną
i poprzednią zawartość strony, porównuję tylko wyniki funkcji MD5
na tych zawartościach. Oznacza to konieczność przechowywania
jedynie wyniku tej funkcji na ostatniej zawartości każdej strony,
a nie całej zawartości.
Dobra funkcja haszująca H(M) przekazująca wartość h (h = H(M)),
ma następujące cechy (przytaczam za [Sch96]):
- znając M, można łatwo (szybko) wyliczyć h = H(M),
- znając h, trudno wyliczyć M takie że H(M) = h,
- znając M, trudno wyliczyć M' <> M takie że H(M) = H(M').
Przy wykorzystaniu funkcji haszującej do rozróżniania stron WWW
istotna staje się jeszcze jedna cecha:
- trudno znaleźć dwa różne M i M' takie że H(M) = H(M') (tzw. odporność na kolizje).
Funkcja MD5, stworzona przez Rona Rivesta, uznanego specjalistę w
dziedzinie kryptografii, spełnia te warunki. Ponieważ
wynik funkcji ma 128 bitów, prawdopodobieństwo, że funkcja MD5 na
zawartości dwóch różnych stron przekaże ten sam wynik wynosi 2-128. Prawdopodobieństwo to jest na tyle małe, iż na
potrzeby tworzonego przeze mnie systemu można przyjąć, że dwie
różne strony dają dwa różne wyniki tej funkcji.
3 Wysyłanie powiadomień jako drugi typ zadania
Po każdym sprawdzeniu śledzonych stron WWW, system musi wysłać do
użytkowników listy z powiadomieniem o zmianach. Wysyłanie
powiadomień jest wykonywane w każdym cyklu (jeśli któraś ze
śledzonych stron się zmieniła), więc zostało w systemie
potraktowane jako nowy typ zadania. Dzięki temu powiadomienia są
wysyłane z dostępnych węzłów, a nie tylko z serwera głównego.
Wysyłanie powiadomień przy użyciu powstałej biblioteki jest przy
okazji dobrym sposobem przetestowania jej funkcjonalności. Mogłem
w ten sposób sprawdzić, czy -- zgodnie z projektem -- biblioteka
umożliwia współistnienie dwóch odrębnych typów zadań.
Technologia zdalnego wołania metod RMI, zapewniająca komunikację
między serwerem a węzłami, wymaga zdefiniowania interfejsu
określającego jakie metody można wywoływać na udostępnianym
zdalnie obiekcie. W przypadku tworzonej przeze mnie biblioteki
będzie to interfejs AgentInterface (wydruk
5.2). Zgodnie z wymaganiami technologii RMI
interfejs zdalnego obiektu musi dziedziczyć po
java.rmi.Remote, a każda metoda powinna móc zgłaszać
wyjątek java.rmi.RemoteException.
Kolejne metody zadeklarowane w interfejsie AgentInterface
odpowiadają komunikatom przesyłanym z serwera do węzłów (punkt
4.4.3). Bezparametrowa, nie
przekazująca żadnej wartości metoda test będzie służyć
jedynie sprawdzaniu, czy połączenie z danym agentem jest wciąż
aktywne.
Wydruk 5.2:
Interfejs AgentInterface
package jodl.agent;
import jodl.task.*;
import java.rmi.Remote; import java.rmi.RemoteException;
public interface AgentInterface extends Remote {
void initDatabase(String server, int port,
String name, String user, String password)
throws RemoteException;
void setCycleNumber(int cycle) throws RemoteException;
void setTaskClass(Task taskClass) throws RemoteException;
void doTasks(String[] tasks) throws RemoteException;
int howManyLeft() throws RemoteException;
String[] getSomeUndone(int howMany, int minLeft)
throws RemoteException;
String[] getDone() throws RemoteException;
String[] getFailed() throws RemoteException;
void test() throws RemoteException;
}
|
Klasy składające się na bibliotekę JODL oraz na
korzystający z niej system do sprawdzania stron WWW zostały
podzielone na pakiety. Podział ten odzwierciedla funkcje
realizowane przez klasy, a nazwy pakietów określają ich zawartość.
Wyraźny podział klas składających się na system ułatwił
rozdzielenie biblioteki JODL od implementacji serwisu
CoNowego.pl, a wewnątrz biblioteki rozróżnienie klas działających
na serwerze, na węzłach oraz klas wykorzystywanych w obu tych
miejscach. Zawartość poszczególnych pakietów jest następująca:
- jodl.admin - klasy zapewniające możliwość administracji systemem,
- jodl.agent - oprogramowanie agentów (programów działających na węzłach),
- jodl.db - klasy umożliwiające i ułatwiające dostęp do bazy danych,
- jodl.example.prime - prosty przykład wykorzystania biblioteki zaprezentowany w dodatku A,
- jodl.policy - interfejs oraz podstawowa implementacja klasy realizującej przydzielanie zadania do nowego węzła,
- jodl.server - oprogramowanie serwera głównego,
- jodl.task - interfejs klasy realizującej zadanie oraz klasy z nim powiązane,
- jodl.util - klasy pomocnicze,
- conowego.sprawdzanie - klasy realizujące sprawdzanie stron WWW,
- conowego.powiadomienia - klasy przygotowujące i wysyłające powiadomienia do użytkowników serwisu,
- conowego.app - kod główny aplikacji do sprawdzania stron WWW i wysyłania powiadomień.
6 Biblioteka JODL: wydajność a łatwość korzystania
Sercem tworzonego przeze mnie systemu jest biblioteka o nazwie
JODL, służąca do rozdzielania i rozproszonego
przetwarzania zadań. Ma ona stanowić oddzielny, gotowy do użycia
produkt. Dlatego też jednym z celów, jakie przed sobą stawiałem,
było takie zaprojektowanie biblioteki, aby korzystanie z niej było
proste. Jednak na etapie projektowania, a później implementacji
systemu okazało się, że jednoczesne zapewnienie wydajności oraz
łatwości obsługi jest niemożliwe -- wymagania te stawały często w
sprzeczności. W takich sytuacjach wybierałem wydajność, gdyż jest
to główne wymaganie stawiane przez biblioteką i całym systemem. W
efekcie udało mi się spełnić wymagania wydajnościowe, jednak
odbiło się to na prostocie biblioteki.
Konieczność dokonania wyboru pomiędzy wydajnością a prostotą jest
najbardziej widoczna w sposobie przesyłania zadań do węzłów. W
powstałej implementacji serwer główny najpierw wysyła do agentów
obiekt klasy realizującej zadanie, a następnie listę
identyfikatorów zadań do wykonania. Kolejne obiekty dla każdego
zadania są tworzone i inicjowane po stronie agentów. Inicjowanie
polega zwykle na połączeniu się z bazą danych znajdującą się na
serwerze. Prostszym pomysłem mogłoby się wydawać tworzenie i
inicjowanie obiektów na serwerze, po czym przesyłanie do węzłów
gotowych obiektów zamiast samych identyfikatorów. Jednak takie
rozwiązanie szybko okazałoby się wąskim gardłem systemu.
Przesyłanie w argumentach zdalnie wołanej funkcji tablicy
serializowanych obiektów zamiast tablicy identyfikatorów
tekstowych oznacza spowolnienie komunikacji. Z tych powodów
zdecydowałem się na ostatecznie zaimplementowane rozwiązanie.
Projektując i implementując system starałem się jednak w miarę
możliwości upraszczać korzystanie z biblioteki. Przykładem na to
może być metoda initTask zadeklarowana w interfejsie
Task. Metoda ta inicjuje zadanie o podanym
identyfikatorze. Jednym z jej argumentów jest obiekt klasy
java.sql.Connection będący otwartym połączeniem z bazą
danych. Dzięki temu programista implementujący interfejs
Task nie musi zajmować się nawiązywaniem połączenia z
bazą danych, obsługą potencjalnie powstałych przy okazji błędów
itp.
Aktualnie system korzysta z bazy danych Postgres. Implementując
oprogramowanie serwera głównego i węzłów chciałem jednak umożliwić
w przyszłości łatwą migrację na inny system bazodanowy. Aby
zapewnić taką możliwość, wszystkie metody dostępu do bazy danych
zostały zadeklarowane w abstrakcyjnej klasie Database.
Klasa Postgres, dziedzicząca po Database,
zawiera implementacje tych metod przy użyciu poleceń języka SQL
działających w bazie danych Postgres. Aktualnie klasy
Server oraz Agent korzystają z obiektu klasy
Postgres. Sposób jej instancjonowania pokazano na wydruku
5.3. Zmiana używanej w systemie bazy danych pociąga za
sobą konieczność stworzenia odpowiedniej podklasy klasy
Database (np. klasy MySQL) oraz zmienienia tego
jednego wiersza kodu. Oczywiście jest możliwe dalsze uproszczenie
tej podmiany. Nazwa klasy implementującej połączenie z bazą danych
mogłaby być trzymana w pliku konfiguracyjnym i używana do jej
instancjonowania przy pomocy metody
Class.forName(...).newInstance(). Wydaje się jednak, że
nie ma takiej potrzeby, gdyż zmiana bazy danych, na której opiera
się system, będzie dokonywana najwyżej sporadycznie.
Wydruk 5.3:
Instancjonowanie klasy Postgres
...
Database db = new Postgres();
...
|
Metody zdefiniowane w klasie Postgres korzystają z klasy
pomocniczej SQL. Udostępnia ona metody ułatwiające wysyłanie
zapytań w języku SQL do serwera bazy danych oraz interpretowanie
wyników tych zapytań. Przykładowo, metoda getArray() przedstawiona
na wydruku 5.4 przekazuje tablicę obiektów klasy String z zawartością
pierwszej kolumny wyników przekazanego zapytania.
Wydruk 5.4:
Fragment klasy SQL
...
public static String[] getArray(Connection connection, String query)
throws SQLException
{
ResultSet rs = getResults(connection, query);
String[] result = new String[rs.getFetchSize()];
int i = 0;
while (rs.next()) result[i++] = rs.getString(1);
return result;
}
...
|
Pisząc program wielowątkowy, programista często staje przed
problemem synchronizacji wątków przy dostępie do struktur danych.
Klasycznym przykładem jest korzystanie z jednej listy przez kilka
wątków, z których część wstawia elementy do tej listy, a część je
pobiera. W literaturze (np. [Ben96]) taka sytuacja jest często
omawiana na przykładzie tzw. problemu producenta-konsumenta. Jego
rozwiązanie wymaga zwykle użycia mechanizmu semaforów lub
monitorów.
W przypadku oprogramowania węzłów problem ten pojawia się przy
odwołaniach do tablic z nieprzetworzonymi jeszcze zadaniami. Klasa
Agent wstawia do tablicy zadania przydzielone przez
serwer główny, a wątki realizujące zadania pobierają kolejne
zadania z tej tablicy. Tablica, z której korzystam, jest obiektem
standardowo dostępnej klasy java.util.Vector. Obiekty tej
klasy są monitorami, co
oznacza, że dwa wątki nie mogą być jednocześnie w trakcie
wykonywania którejkolwiek metody tego samego obiektu tej klasy.
Dzięki temu współbieżne operacje na obiektach klasy
Vector nie generują błędów. Funkcjonalność podstawowych
metod tej klasy jest jednak niewystarczająca. Potrzebna jest
atomowa, blokująca metoda pobierania elementu z listy, która:
- czeka, aż lista nie będzie pusta,
- pobiera z listy jeden (np. ostatni, ze względów wydajnościowych) element,
- usuwa ten element z listy,
- przekazuje pobrany element.
Aby udostępnić metodę oferującą taką funkcjonalność, musiałem
napisać klasę SyncVector. Treści najważniejszych
jej metod (putObject oraz getObject) są
przedstawione na wydruku 5.5. Klasa ta jest
monitorem (jej metody mają modyfikator synchronized), a
jednocześnie korzysta z obiektu klasy Semaphore. Wartość
trzymana w semaforze odpowiada różnicy liczby wywołań metody
V() i P(), czyli jest równa liczbie elementów
znajdujących się na liście. Dzięki temu wywołanie metody
getObject(), gdy lista jest pusta, oznacza zawieszenie na
semaforze wątku wołającego tę metodę.
Wydruk 5.5:
Klasa SyncVector
package jodl.util;
import java.util.*;
public class SyncVector {
private Vector vector;
private Semaphore sem;
public SyncVector()
{
vector = new Vector();
sem = new Semaphore(this);
}
public synchronized void putObject(Object object)
{
vector.addElement(object);
sem.V();
}
public synchronized Object getObject()
{
sem.P();
Object o = vector.lastElement();
vector.removeElementAt(vector.size() - 1);
return o;
}
// ...
}
|
Klasa Semaphore, przedstawiona na wydruku
5.6, jest także monitorem, ponieważ treści jej
metod są sekcjami krytycznymi oznaczonymi słowem kluczowym
synchronized. Jest ona rozszerzeniem klasy
CountingSemaphore zaproponowanej w [Lea96]. Przy
standardowym zainicjowaniu przez konstruktor bezparametrowy lub z
parametrem int obiekt tej klasy może być wołany z kodu
niesynchronizowanego. Wywołania metod V() i P()
są wtedy synchronizowane na poziomie dostępu do obiektu semafora
(identyfikator this). Natomiast gdy metody V() i
P() są wołane z innego monitora (np. z obiektu klasy
SyncVector), konieczne jest wskazanie obiektu wołającego
jako obiektu synchronizującego. Do ustawienia tego obiektu służą
dwa ostatnie konstruktory klasy Semaphore. Na obiekcie
synchronizującym (monitorze) wołane są także metody
wait() oraz notify().
Wydruk 5.6:
Klasa Semaphore
package jodl.util;
public class Semaphore {
private int value = 0;
private Object object;
public Semaphore() { object = this; }
public Semaphore(int v) { object = this; value = v; }
public Semaphore(Object syncObject) { object = syncObject; }
public Semaphore(int v, Object syncObject)
{ object = syncObject; value = v; }
public void P()
{
synchronized (object) {
while (value <= 0) try { object.wait(); } catch (Exception e) { }
value--;
}
}
public void V()
{
synchronized (object) {
value++;
if (value > 0) object.notify();
}
}
}
|
Sebastian Łopieński