Agregacja

Autor: Marcin Kasiński
21.01.2011 13:24:00 +0200

Agregacja jest ciekawą technika pozwalającą na wysłaniu z przepływu kilku zapytań, agregacji odpowiedzi, a następnie na podstawie odebranych odpowiedzi przygotowanie zagregowanej odpowiedzi. Do obsługi agregacji wykorzystujemy nody AggregateControl, AgregateRequest oraz AggregateReply.

AggregateControl

Nod AggregateControl służy do zaznaczenia rozpoczęcia procesu agregacji. Najważniejsze parametry noda ta:

  • Aggregate namenazwa agregacji (unikalny identyfikator)
  • Timeoutczas jaki będziemy oczekiwać na odpowiedź

AgregateRequest

Nod AgregateRequest służy do określenia komunikatu MQ, który jest częścią procesu agregacji. Umieszcza się go za node MQOutput aby zapisać informacje o tym na jakie pytania w procesie agregacji oczekujemy odpowiedzi. Najważniejszy parametr noda to:

  • Folder nameFolder, w którym będzie zapisana odpowiedz (unikalny identyfikator zapytania w ramach agregacji)

AggregateReply

Nod AggregateReply służy do magazynowania odpowiedzi w ramach agregacji i w efekcie zwraca drzewo, ze wszystkimi odpowiedziami. Najważniejsze parametry noda to:

  • Aggregate nameNazwa agregacji (unikalny identyfikator) skorelowany z tym samym parametrem noda AggregateControl
  • Unknow message timeoutOkreśla w sekundach czas jaki proces oczekuje w przypadku odczytania komunikatu, które nie może skorelować z żadnym z wysłanych wcześniej zapytań zanim wyśle go do terminala "Unknown"

Najważniejsze terminale noda to:

  • Failurestandardowy terminal do którego przekazywane są komunikaty, które spowodowały bład w procesowaniu
  • Unknowndo tego terminala przekazywane są komunikaty, które nie można skorelować z żadnym wysłanym wcześniej zapytaniem
  • Outdo tego terminala przekazywany jest komunikat grugujące wszystkie zebrane odpowiedzi
  • Timeoutdo tego terminala przekazywany jest komunikat grupujący wszystkie odpowiedzi ( które nadeszły w zakładnym czasie określonym na nodzie AggregateControl)

Scenariusz

Poniżej postaram się przedstawić scenariusz opisujący wykorzystanie agregacji w praktyce. Zaimplementujemy przepływ będący WebServices, którego zadaniem będzie zwrócenie liczby. WebService przygotuje i wyśle dwa zapytania MQ. Na każde zapytanie MQ otrzymamy w odpowiedzi liczbę. Zadaniem WebSevice jest zsumowanie tych liczb i zwrócenie tej sumy jako odpowiedzi WS. Pierwszym elementem przepływu jest nod HTTPInput oczekujący na połączenia WS. Dalej mamy nod AggregateControl, którego ustawiamy parametr Aggregate name na Aggr oraz Timeout na 15 sekund. W ten sposób określiliśmy identyfikator dla agregacji oraz określiliśmy skończony czas jaki oczekujemy na odpowiedź. Dalej mamy nod Compute, którego zadaniem jest przygotowanie komunikatów MQ W pierwszej części w module deklarujemy namespaces używane w WebService

	DECLARE tns NAMESPACE 
	'http://www.myserver.com.pl/WSPilot/SimpleTypes';
	DECLARE soapenv NAMESPACE 
	'http://schemas.xmlsoap.org/soap/envelope/'; 

Następnie deklarujemy funkcje PrepareMQMDRequest, która przygotuje request MQ i ustawia odpowiednie parametry nagłówka MQMD.

	CREATE FUNCTION PrepareMQMDRequest() 

	BEGIN

	SET OutputLocalEnvironment=InputLocalEnvironment;

	SET OutputRoot.Properties.MessageSet = '';

	SET OutputRoot.Properties.MessageType = '';

	SET OutputRoot.Properties.MessageFormat = '';

	
	SET OutputRoot.MQMD.StrucId = MQMD_STRUC_ID; 

	SET OutputRoot.MQMD.Version = MQMD_CURRENT_VERSION; 

	SET OutputRoot.MQMD.Format = MQFMT_STRING;

	SET OutputRoot.MQMD.Expiry= 300; --30 sekund

	SET OutputRoot.MQMD.MsgType = 1;

	SET OutputRoot.MQMD.Priority= 3;

	SET OutputRoot.MQMD.ReplyToQ = 'MQSI.MK_Q.IN';

	END;

Dalej w głónej funkcji Main w pierwszej części tworzymy odpowiednie domeny MQMD oraz XMLNS, usuwamy niepotrzebne elementy HTTPInputHeader oraz MRM, wywołujemy funkcje PrepareMQMDRequest wypełniającą odpowiednie pola MQMD, przygotowujemy dane komunikatu MQ na podstawie danych wejściowych. oraz propagujemy komunikat do terminala out1.

	CREATE NEXTSIBLING OF OutputRoot.Properties DOMAIN 'MQMD';

	CREATE NEXTSIBLING OF OutputRoot.MQMD DOMAIN 'XMLNS';


	SET OutputRoot.HTTPInputHeader=null;

	SET OutputRoot.MRM=null;	


	CALL PrepareMQMDRequest() ;

	SET OutputRoot.XMLNS.DATA.v1=
	InputRoot.MRM.soapenv:Body.tns:addInt.tns:intPair.v1;

	SET OutputRoot.XMLNS.DATA.v2=
	InputRoot.MRM.soapenv:Body.tns:addInt.tns:intPair.v2;

	PROPAGATE TO TERMINAL 'out1';

W ten sposób przygotowaliśmy pierwszy komunikat, który będzie wysłany do pierwszej aplikacji. Dalej przygotowujemy drugi komunikat, wywołujemy funkcje PrepareMQMDRequest wypełniającą odpowiednie pola MQMD, przygotowujemy dane komunikatu MQ na podstawie danych wejściowych. oraz propagujemy komunikat do terminala out2.

	CALL PrepareMQMDRequest() ;


	SET OutputRoot.XMLNS.DATA.MULTI.v1=

	InputRoot.MRM.soapenv:Body.tns:addInt.tns:intPair.v1;


	SET OutputRoot.XMLNS.DATA.MULTI.v2=

	InputRoot.MRM.soapenv:Body.tns:addInt.tns:intPair.v2;

	PROPAGATE TO TERMINAL 'out2';

W ten sposób przygotowaliśmy pierwszy komunikat, który będzie wysłany do drugiej aplikacji. Dalej musimy jeszcze utworzyć komunikat, który będzie zawierał informacje o identyfikatorze zapytania WS. Ten identyfikator będzie nam potrzebny później w celu skorelowania przygotowywanej odpowiedzi z zapytaniem WS. Aby to zrobić wywołujemy funkcje PrepareMQMDRequest wypełniającą odpowiednie pola MQMD, przygotowujemy treść komunikatu uzupełniając go o przeczytany identyfikator zapytania WS. Następnie ustawiamy typ komunikatu na 2 czyli REPLY, propagujemy komunikat do terminala out oraz zwracamy w funkcji wartość FALSE.

	CALL PrepareMQMDRequest();


	SET OutputRoot.XMLNS.DATA.HTTPID=

	CAST 
	(InputLocalEnvironment.Destination.HTTP.RequestIdentifier 
	AS CHARACTER);


	SET OutputRoot.MQMD.MsgType = 2;

	PROPAGATE TO TERMINAL 'out';

	RETURN FALSE;

Dalej z noda Compute wyprowadzone są terminale, out1 do noda MQOutput wskazującego na kolejkę aplikacji 1, out2 do noda MQOutput wskazującego na kolejkę aplikacji 2 oraz out do noda MQOutput wskazującego na kolejkę, w której zapisywane są identyfikatory zapytania HTTP. Dalej za każdym wymienionym wcześniej nodem MQOutput znajduje się nod AggregateRequest. Ich parametr Folder name ustawiony jest odpowiednio REQ1 dla kolejki 1, REQ2 dla kolejki 2 oraz HTTP dla kolejki HTTP. W ten sposób wysłaliśmy odpowiednie komunikaty oraz określiliśmy, że na agregacje składają sie 3 powyższe zapytania MQ. W drugiej części przepływu mamy dwa nody MQInput, jeden oczekujący na kolejce, w której mają pojawić się odpowiedzi od obu aplikacji oraz drugi na kolejce zawierającej identyfikatory zapytań WS. Nod MQInput związany z HTTP połączony jest z nodem Compute. Zadaniem tego noda jest przepisania pola MsgId do CorrelId. Pozwoli to poprawnie zidentyfikować komunikat jako odpowiedz na wysłany wcześniej komunikat.

	SET OutputRoot.MQMD.CorrelId=InputRoot.MQMD.MsgId;

Powyższy nod Compute oraz drugi nod MQInput związany z odpowiedziami z aplikacji połączony jest z nodem AggregateReply. Zadaniem tego noda jest magazynowanie napływających odpowiedzi danej agregacji. Parametr "Aggregate name" tego noda ustawiamy na Aggr, parametr "Unknow message timeout" na 0. Jeśli nod ten w zadanym czasie określonym wcześniej w nodzie AggregateControl zbierze wszystkie odpowiedzi procesowanie przejdzie do terminala "out". Terminal ten połączony jest z nodem Compute. W pierwszej części w module deklarujemy namespaces używane w WebService

	DECLARE tns NAMESPACE 
	'http://www.myserver.com.pl/WSPilot/SimpleTypes';

	DECLARE soapenv NAMESPACE 
	'http://schemas.xmlsoap.org/soap/envelope/'; 

Następnie w głównej metodzie Main deklarujemy dwie zmienne oraz podstawiamy pod nie dwie odpowiedzi od aplikacji. Następnie kopiujemy cały blok Properties oraz ustawiamy odpowiednie pola bloku Properties na wartości związane z MRM WebService.

	DECLARE response1 INTEGER; 

	DECLARE response2 INTEGER; 


	SET response1=
	InputRoot.ComIbmAggregateReplyBody.REQ1.XMLNS.RESPONSE.DATA.V; 

	SET response2=
	InputRoot.ComIbmAggregateReplyBody.REQ2.XMLNS.RESPONSE.DATA.V; 

	SET OutputRoot.Properties = InputRoot.Properties;

	SET OutputRoot.Properties.MessageSet = 'PHTM4NG002001';

	SET OutputRoot.Properties.MessageType = 'Envelope';

	SET OutputRoot.Properties.MessageFormat = 'XML1';

Dalej generujemy odpowiedz WS jako sumę zebranych odpowiedzi oraz ustawiamy identyfikator zapytania na podstawie przeczytanej wartości z wcześniej okreslonego folderu HTTP.

	SET OutputRoot.MRM.soapenv:Body.tns:addIntResponse.
	tns:addIntResult.v=response1+response2;

	SET OutputLocalEnvironment.Destination.HTTP.RequestIdentifier =

	CAST(
	InputRoot.ComIbmAggregateReplyBody.HTTP.XMLNS.DATA.HTTPID 
	AS BLOB); 

Dalej nod Compute podłączony jest do ostatniego noda w przepływie HTTPReply, którego zadaniem jest wysłanie odpowiedzi WS. Jeśli z kolej nod AggregateReply nie zdoła w zadanym czasie określonym wcześniej w nodzie AggregateControl zebrać wszystkich odpowiedzi procesowanie z noda przejdzie do terminala "Timeout". Tutaj mogliśmy oczywiście wysłać odpowiedz WS mówiący o błędzie. W naszym scenariuszu zdecydowałem się wysłać odpowiedź na podstawie tylko zebranych odpowiedzi. W związku z tym terminal "Timeout" noda AggregateReply podłączamy do noda Compute, którego zadaniem jest przygotowanie odpowiedniej odpowiedzi WS. W pierwszej części w module deklarujemy namespaces używane w WebService

	DECLARE tns NAMESPACE 
	'http://www.myserver.com.pl/WSPilot/SimpleTypes';

	DECLARE soapenv NAMESPACE 
	'http://schemas.xmlsoap.org/soap/envelope/'; 

Następnie w głównej metodzie Main deklarujemy zmienne określające odpowiedzi MQ oraz wartość jaką zwrócimy do WS. Podstawiamy pod dwie zmienne odpowiedzi od aplikacji oraz wyliczamy wartość response na podstawie odpowiedzi, które uzyskaliśmy.

	DECLARE response1 INTEGER; 

	DECLARE response2 INTEGER; 

	DECLARE response INTEGER; 

SET response1=
InputRoot.ComIbmAggregateReplyBody.REQ1.XMLNS.RESPONSE.DATA.V; 

SET response2=
InputRoot.ComIbmAggregateReplyBody.REQ2.XMLNS.RESPONSE.DATA.V; 

SET response=0;


IF response1 IS NOT NULL THEN SET response=response+response1;

END IF;

IF response2 IS NOT NULL THEN SET response=response+response2;

END IF;

Następnie kopiujemy cały blok Properties oraz ustawiamy odpowiednie pola bloku Properties na wartości związane z MRM WebService. Dalej generujemy odpowiedz WS jako sumę zebranych odpowiedzi oraz ustawiamy identyfikator zapytania na podstawie przeczytanej wartości z wcześniej określonego folderu HTTP.

	SET OutputRoot.Properties = InputRoot.Properties;

	SET OutputRoot.Properties.MessageSet = 'PHTM4NG002001';

	SET OutputRoot.Properties.MessageType = 'Envelope';

	SET OutputRoot.Properties.MessageFormat = 'XML1';


	SET OutputRoot.MRM.soapenv:Body.tns:addIntResponse.
	tns:addIntResult.v=response;

	SET OutputLocalEnvironment.Destination.HTTP.RequestIdentifier =
	CAST(
	InputRoot.ComIbmAggregateReplyBody.HTTP.XMLNS.DATA.HTTPID 
	AS BLOB); 

	SET Environment.HTTPID=
	InputRoot.ComIbmAggregateReplyBody.HTTP.XMLNS.DATA.HTTPID;

Na zakończenie wyjście out z noda Copute łączymy do omawianego wcześniej noda HTTPReply, którego zadaniem jest wysłanie przygotowanej odpowiedzi WS.


powrót
Zachęcam do przedstawienia swoich uwag i opinii w polu komentarzy.

Komentarze

Dodaj Komentarz