Segmentacja komunikatów

Autor: Marcin Kasiński
21.01.2011 13:24:00 +0200

Przy realizacji komunikacji MQ istnieją limity określające maksymalną wielkość komunikatu, jaki może zostać przesłany przez kanał (dla konfiguracji kanału) lub wielkość komunikatu, jaki może zostać zapisany w kolejce (dla kolejki). Parametr ten w obu przypadkach, czy to kanału, czy kolejki nazywa się MAXMSGL . Czasami się zdarza, że wielkość ta nie jest wystarczająca. W takim przypadku można powiększyć tą wartość, lecz nie zawsze jest to możliwe lub nie jest to zawsze najlepsze rozwiązanie.

Inną możliwością jest zastosowanie tu segmentacji komunikatu. Segmentacja polega na wprowadzeniu pojęcia "komunikatu logicznego". W takiej sytuacji istnieje możliwość grupowania kilku komunikatów fizycznych w jeden komunikat logiczny. Istnieją dwa sposoby realizacji segmentacji MQ. Jeden polega na zleceniu segmentacji menadżerowi. W takiej sytuacji z punktu widzenia aplikacji taki komunikat widziany jest jako jeden tak jak w przypadku standardowej obsługi MQ. Innym sposobem jest realizacja segmentacji przez aplikacji. W tym przypadku przy wysyłaniu, czy odczytywaniu komunikatu każdy fizyczny komunikat jest wysyłany lub odczytywany oddzielnie.

Segmentacja realizowana przez menadżera kolejek

Odczyt

W przypadku odczytu aby poinformować menadżer żeby sam zrealizował segmentacje należy dodać opcję MQC.MQGMO_COMPLETE_MSG do opcji odczytu komunikatu. Oznacza on, że w metodzie get odczytamy jeden komunikat po złożeniu go przez menadżera kolejek. Ważne jest, że ten złożony komunikat pojawi się dla aplikacji dopiero po tym jak wszystkie komunikaty fizyczne będące częścią komunikatu logicznego pojawią się w kolejce.

...


MQGetMessageOptions gmo = new MQGetMessageOptions();


gmo.options = MQC.MQGMO_FAIL_IF_QUIESCING|MQC.MQGMO_COMPLETE_MSG;


queueIN.get(retrievemqmessage, gmo);


...

Zapis

W przypadku zapisu aby poinformować menadżer żeby sam zrealizował segmentacje należy ustawić pole messageFlags wysyłanego komunikatu na MQC.MQMF_SEGMENTATION_ALLOWED . Oznacza ono, że w przypadku, kiedy komunikat przekroczy dozwoloną wielkość menadżer ma sam zrealizować segmentacje.

...


message.messageFlags=MQC.MQMF_SEGMENTATION_ALLOWED;


queue.put(message,pmo);


...

Segmentacja realizawana przez aplikacje

Odczyt

W przypadku odczytu komunikatu przy segmentacji realizowanej przez aplikacje każdy fizyczny komunikat będący częścią komunikatu logicznego jest odczytywany neizależnie i to w samej aplikacji musi zostać "złączony" w logiczną całość. Przykład kodu realizującego taką segmentacje może mieć postać:

...


	MQMessage retrievemqmessage = new MQMessage();

	byte readgroupId[];


	try {


int openOptions =


	MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_FAIL_IF_QUIESCING;

MQGetMessageOptions gmo = new MQGetMessageOptions();


gmo.options =	MQC.MQGMO_FAIL_IF_QUIESCING | 
MQC.MQGMO_LOGICAL_ORDER | MQC.MQPMO_SYNCPOINT;

queueIN.get(retrievemqmessage, gmo);


readgroupId=retrievemqmessage.groupId;


String msg = retrievemqmessage.readStringOfByteLength( 
retrievemqmessage.getDataLength() );


String flag="";

int isSEGMENT=retrievemqmessage.messageFlags & MQC.MQMF_SEGMENT;

int isLAST_SEGMENT=
retrievemqmessage.messageFlags & MQC.MQMF_LAST_SEGMENT;

if (isSEGMENT>0) flag="MQMF_SEGMENT";


if (isLAST_SEGMENT>0) flag=flag+" & MQMF_LAST_SEGMENT";

if (isSEGMENT>0)

{

	while (isLAST_SEGMENT==0)


	{


retrievemqmessage = new MQMessage();


retrievemqmessage.groupId=readgroupId;


queueIN.get(retrievemqmessage, gmo);

msg=msg+retrievemqmessage.readStringOfByteLength(
retrievemqmessage.getDataLength());

isLAST_SEGMENT=
retrievemqmessage.messageFlags & MQC.MQMF_LAST_SEGMENT;

	}//while (isLAST_SEGMENT>0)


}//if (isSEGMENT>0)


System.out.println("["+msg+"] [msgId = " + 
MQSegmentationGET.fromByteToHEX( retrievemqmessage.messageId) +"]");

System.out.println("************************************************");


qMgr.commit();


	} //try


	catch (MQException e) {


if (e.reasonCode == 2033) {


	Thread.sleep(200);


	return null;


} else {


	qMgr.backout();


	


	throw e;


}


	} catch (Exception e) {





throw e;


	}


	


	return retrievemqmessage;


...


Zapis

...


MQPutMessageOptions pmo= new MQPutMessageOptions();


pmo.options=MQC.MQPMO_FAIL_IF_QUIESCING |MQC.MQPMO_SYNCPOINT;



MQMessage mqmessage=new MQMessage();


mqmessage.format=MQC.MQFMT_STRING;


mqmessage.messageType = MQC.MQMT_REQUEST;


mqmessage.writeString(message1);


mqmessage.messageFlags=MQC.MQMF_SEGMENT;





queue.put(mqmessage, pmo);



MQMessage mqmessage2=new MQMessage();


mqmessage2.format=MQC.MQFMT_STRING;


mqmessage2.messageType = MQC.MQMT_REQUEST;


mqmessage2.writeString(message2);


mqmessage2.messageFlags=MQC.MQMF_LAST_SEGMENT;





mqmessage2.groupId=mqmessage.groupId;


mqmessage2.messageSequenceNumber=mqmessage.messageSequenceNumber+1;


mqmessage2.offset=message1.length();





queue.put(mqmessage2, pmo);





qMgr.commit();


...

powrót
Zachęcam do przedstawienia swoich uwag i opinii w polu komentarzy.

Komentarze

Dodaj Komentarz