Files
scandocs/uni/masterarbeit/source/moversight/merge/MergeService.cc
2014-06-30 13:58:10 +02:00

1029 lines
36 KiB
C++

/*
* File: MergeService.cc
* Author: jgaebler
*
* Created on November 16, 2011, 3:50 PM
*/
#include "MergeService.h"
#include "merge/events/MergeRequestEvent.h"
#include "merge/events/MergeConfirmEvent.h"
#include "merge/events/MergeRejectedEvent.h"
#include "merge/events/MergeAbortEvent.h"
#include "merge/events/MergeDoneEvent.h"
#include "merge/msg/MergeAnnounce.h"
#include "merge/msg/MergeFlush.h"
#include "merge/msg/MergeRoster.h"
#include "merge/msg/MergeFinish.h"
#include "merge/msg/MergeRequest.h"
#include "merge/msg/MergeConfirm.h"
#include "merge/msg/MergeAbort.h"
#include "merge/msg/MergeReject.h"
#include "merge/msg/MergeMessageFactory.h"
#include "merge/timer/MergeRejectToInviteeTimer.h"
#include "merge/timer/MergeAbortToInviteeGroupTimer.h"
#include "merge/timer/MergeAbortToInviterGroupTimer.h"
#include "merge/timer/MergeAbortTimer.h"
#include "merge/timer/MergeRollbackTimer.h"
#include "Dispatcher.h"
#include "Moversight.h"
#include "common/container/PeerIDList.h"
#include "event/events/FlushStartEvent.h"
#include "event/events/FlushDoneEvent.h"
#include "common/transport/TransportAddress.h"
#include "ms/events/LocalPeerUpdatedEvent.h"
#include "mt/msg/MulticastMessage.h"
#undef DEBUG
#define DEBUG(msg) if (module.isPrintDebugTC()){ if(getLocalState()== DISJOINED){ MOV_DEBUG <<"MRG@TA_"<<getLocalAddress()<<" "<<msg<<endl; } else{ MOV_DEBUG <<"MRG@"<<getLocalID() <<" "<< msg <<endl; } }
namespace ubeeme {
namespace moversight {
/**
* @brief Constructor
* @param d A reference to the moversight dispatcher
*/
MergeService::MergeService(Dispatcher& d) : MoversightService(d, "MergeService") {
}
/**
* @brief Destructor
*/
MergeService::~MergeService() {
}
/**
* @brief Initalise the merge sub service.
*/
void
MergeService::initialise() {
dispatcher.subscribe<FlushDoneEvent>(this);
currentMerge = NULL;
mergeRejectToInviteeTimer = NULL;
mergeAbortToInviteeGroupTimer = NULL;
mergeAbortToInviterGroupTimer = NULL;
mergeAbortTimer = NULL;
mergeRollbackTimer = NULL;
oldPeerID = getLocalID();
mergeDirector = false;
inviterMergeDirector = false;
}
/**
* @brief This method finalises the service.
*/
void
MergeService::finalise() {
dispatcher.unsubscribeAll(this);
stopAndDeleteAllTimers();
if (currentMerge != NULL) {
delete currentMerge;
currentMerge = NULL;
}
setLocalSubState(NO_SUB_STATE);
}
/**
* @brief This method handles a merge request message to ask if merge.
* @param mr The merge request message to handle.
*/
void
MergeService::handleMergeRequest(MergeRequest * mr) {
DEBUG("handleMergeRequest - merge received from " << mr->getSourceTA());
if (getLocalSubState() == NO_SUB_STATE) {
currentMerge = new MergeOperation();
setLocalSubState(WAITING_FOR_FLUSH);
mergeDirector = true;
otherMergeDirectorTA = TransportAddress(mr->getSourceTA());
//forward the request to the application
dispatcher.signal(new MergeRequestEvent());
}//End if
else if (getLocalSubState() == FLUSHING
|| getLocalSubState() == WAITING_FOR_FLUSH
|| getLocalSubState() == WAITING_FOR_MERGE) {
//first merge director re-send his MR message
if (mergeDirector) {
if (mr->getSourceTA() == otherMergeDirectorTA) {
//re-send MC message
sendMergeConfirm(otherMergeDirectorTA);
}//End if
else {
//cancel merge and send merge reject to sender of different merge request
std::string reason("merge reject - another merge in progress");
sendMergeReject(mr->getSourceTA(), reason);
}//End else
}//End if
}//end else if
else {
//cancel merge and send merge reject to sender of different merge request
std::string reason("merge reject - another merge in progress");
sendMergeReject(mr->getSourceTA(), reason);
}//End else
}
/**
* @brief This method handles a merge confirm message to confirm a merge.
* @param mc The merge confirm message to handle.
*/
void
MergeService::handleMergeConfirm(MergeConfirm * mc) {
DEBUG("handleMergeConfirm - merge confirm by " << mc->getSourceTA());
if (getLocalSubState() == WAITING_FOR_FLUSH) {
if (mergeDirector &&
mc->getSourceTA() == otherMergeDirectorTA) {
//cancel MergeInvitationTimer
stopAndDeleteMergeRejectToInviteeTimer();
//signal merge confirm
dispatcher.signal(new MergeConfirmEvent());
//send MergeFlush within inviter group
MergeFlush mfl = MergeMessageFactory::createMergeFlushMessage();
dispatcher.sendMessage(mfl);
//start MergeAbortTimer
createAndStartMergeAbortTimer(mfl);
}//End if
}//End if
}
/**
* @brief This method handles a merge reject message.
* @param mrj The merge reject message to handle.
*/
void
MergeService::handleMergeReject(MergeReject * mrj) {
DEBUG("handleMergeReject - merge rejected by " << mrj->getSourceTA());
if (getLocalSubState() != NO_SUB_STATE) {
if (mergeDirector) {
//do we know the sender because we are in a merge with him?
if (mrj->getSourceTA() == otherMergeDirectorTA) {
//invitee reject
if (inviterMergeDirector) {
std::string reason(mrj->getMrjMessage());
dispatcher.signal(new MergeRejectedEvent(reason));
//clean up
finalise();
initialise();
}//End if
//inviter cancel request
else {
//cancel merge, because the inviter canceled his request
std::string reason(mrj->getMrjMessage());
MergeAbort mab = MergeMessageFactory::createMergeAbortMessage(reason);
dispatcher.sendMessage(mab);
//start MergeAbortTimer
createAndStartMergeAbortTimer(mab);
}//End else
}//End if
}//End if
}//End if
}
/**
* @brief This method handles a roster message with roster of opponent group.
* @param mro The merge roster message to handle.
*/
void
MergeService::handleMergeRoster(MergeRoster * mro) {
DEBUG("handleMergeRoster - merge roster from " << mro->getSourceTA());
PeerPlacingStrategyType peerPlacing = dispatcher.getPeerPlacingStrategyType();
if (mergeDirector) {
if (getLocalSubState() == WAITING_FOR_FLUSH) {
DEBUG("handleMergeRoster - save merge roster message and wait for flush");
//save merge roster and return
currentMerge->setMergeRoster(*mro);
return;
}
else if (getLocalSubState() == WAITING_FOR_MERGE) {
//check that right MRO was receive
if (otherMergeDirectorTA == mro->getSourceTA()) {
if (!inviterMergeDirector) {
//cancel MergeAbortToInviteeGroupTimer
stopAndDeleteMergeAbortToInviteeGroupTimer();
DEBUG("--------------")
sendMergeRoster(otherMergeDirectorTA);
//Tie-Break - invitee group takes peer placing strategy by inviter group, if different
if (peerPlacing != mro->getStrategy()) {
peerPlacing = mro->getStrategy();
}//End if
}//End if
else {
//cancel MergeAbortToInviterGroupTimer
stopAndDeleteMergeAbortToInviterGroupTimer();
}//End else
//create roster given by opponent group
//and safe in current merge for timer repeat
Roster ro;
ro.setViewID(mro->getNextViewID());
ro.setNextPeerID(mro->getNextPeerID());
ro.setMemberDescriptionList(mro->getMemberDescriptionList());
//safe for timer repeat
currentMerge->setInviterGroup(inviterMergeDirector);
MergeAnnounce ma = MergeMessageFactory::createMergeAnnounceMessage(ro, inviterMergeDirector, peerPlacing);
dispatcher.sendMessage(ma);
//start MergeAbortTimer
createAndStartMergeAbortTimer(ma);
}//End if
}//End if
else if (getLocalSubState() == MERGING) {
//inviter repeats his MRO so we just send MRO back without new MA to group
//check that right MRO was receive
if (otherMergeDirectorTA == mro->getSourceTA()) {
if (!inviterMergeDirector) {
sendMergeRoster(otherMergeDirectorTA);
}//End if
}//End if
}//End else if
}//End if
}
/**
* @brief This method handles a merge flush message to flush before merge.
* @param mfl The merge flush message to handle.
* @param missedPeers The peers who didn't get the message.
*/
void
MergeService::handleMergeFlushMessage(MergeFlush & mfl, const PeerIDList & missedPeers) {
DEBUG("handleMergeFlush - got merge flush message");
if (mergeDirector) {
//cancel MergeAbortTimer
stopAndDeleteMergeAbortTimer();
}//End if
if (missedPeers.size() > 0) {
if (mergeDirector) {
//signal merge abort
std::string reason("merge flush failed. not all peers got the message.");
dispatcher.signal(new MergeAbortEvent(reason));
//clean up
finalise();
initialise();
}//End if
return;
}//End if
//localPeerID and senderID of MFL is tie break of concurrent merges
//first MFL is winner
if (getLocalID() != mfl.getSourceID()) {
delete currentMerge;
currentMerge = NULL;
mergeDirector = false;
inviterMergeDirector = false;
}//End if
//create current merge..mergeDirector has already
if (!mergeDirector) {
currentMerge = new MergeOperation();
}
setLocalSubState(FLUSHING);
dispatcher.signal(new FlushStartEvent());
//start MergeRollBackTimer
createAndStartMergeRollbackTimer(MA);
}
/**
* @brief This method handles a merge announce message by merge a set of given peers of the group into own group
* @param ma The merge announce message to handle.
* @param missedPeers Provides the IDs of the peers, which have missed the merge announce.
*/
void
MergeService::handleMergeAnnounceMessage(MergeAnnounce & ma, const PeerIDList & missedPeers) {
DEBUG("handleMergeAnnounce - got merge announce message");
if (mergeDirector) {
//cancel MergeAbortTimer
stopAndDeleteMergeAbortTimer();
}//End if
if (missedPeers.size() > 0) {
//signal merge abort
std::string reason("merge failed. not all peers got the announce message.");
dispatcher.signal(new MergeAbortEvent(reason));
//clean up
finalise();
initialise();
return;
}//End if
if (getLocalSubState() == WAITING_FOR_MERGE) {
//create roster from MA message
Roster ro;
ro.setNextPeerID(ma.getNextPeerID());
ro.setViewID(ma.getNextViewID());
ro.setMemberDescriptionList(ma.getMemberDescriptionList());
//safe old MemberRegister
dispatcher.getMembershipService().saveLastMemberRegister();
//create new MemberRegister
PeerPlacingStrategyType placingStrategy = ma.getStrategy();
MemberRegister newMR = createNewMemberRegisterAfterMerge(ro, ma.getInviterGroup(), placingStrategy);
TransportAddress myTa = getLocalAddress();
try {
//find local PeerID in new MemberRegister
PeerID localPeerID = newMR.getPeerIDbyTransportAddress(myTa);
//set new MemberRegister and new local peerID
dispatcher.getMembershipService().setupGroupFromMemberRegister(newMR, localPeerID);
}
catch (PeerNotFoundException pnfe) {
//signal merge abort
std::string reason(pnfe.what());
dispatcher.signal(new MergeAbortEvent(reason));
//clean up
finalise();
initialise();
}
if (module.isPrintDebugTC()) {
newMR.printMemberRegister();
}
//change sub-state
setLocalSubState(MERGING);
if (inviterMergeDirector) {
//send MF to all peers in new group
DEBUG("handleMergeAnnounce - send MF to the new group");
MergeFinish mf = MergeMessageFactory::createMergeFinishMessage();
dispatcher.sendMessage(mf);
//start MergeAbortTimer
createAndStartMergeAbortTimer(mf);
}//End if
//(re)start MergeRollbackTimer
stopAndDeleteMergeRollbackTimer();
createAndStartMergeRollbackTimer(MF);
}//End if
}
/**
* @brief This method handles a merge abort message.
* @param mab The merge abort message to handle.
* @param missedPeers The peers who didn't get the message.
*/
void
MergeService::handleMergeAbortMessage(MergeAbort & mab, const PeerIDList & missedPeers) {
DEBUG("handleMergeAbort - got merge abort message to cancel merge");
if (missedPeers.size() > 0) {
//signal merge abort
std::string reason("merge failed. not all peers got the abort message.");
dispatcher.signal(new MergeAbortEvent(reason));
//clean up
finalise();
initialise();
return;
}//End if
if (getLocalSubState() != NO_SUB_STATE) {
//signal merge abort
std::string reason = mab.getMabMessage();
dispatcher.signal(new MergeAbortEvent(reason));
//clean up
finalise();
initialise();
}//End if
}
/**
* @brief This method handles a merge finish message to signal end of merge.
* @param mef The merge finish message to handle.
* @param missedPeers The peers who didn't get the message.
*/
void
MergeService::handleMergeFinishMessage(MergeFinish & /* mef */, const PeerIDList & missedPeers) {
DEBUG("handleMergeFinish - got merge finish message");
if (inviterMergeDirector) {
//cancel MergeAbortTimer
stopAndDeleteMergeAbortTimer();
}//End if
//cancel MergeRollbackTimer
stopAndDeleteMergeRollbackTimer();
if (missedPeers.size() > 0) {
//signal merge abort
std::string reason("merge failed. not all peers got the finish message.");
dispatcher.signal(new MergeAbortEvent(reason));
//clean up
MemberRegister oldMR(dispatcher.getMembershipService().getLastMemberRegister());
dispatcher.getMembershipService().setupGroupFromMemberRegister(oldMR, oldPeerID);
finalise();
initialise();
return;
}//End if
if (getLocalSubState() == MERGING) {
//clean up
finalise();
initialise();
//signal merge done
dispatcher.signal(new MergeDoneEvent());
//signal peer update
dispatcher.signal(new LocalPeerUpdatedEvent(dispatcher.getLocalPeer()));
}//End if
}
/**
* @brief Handles an outdating MergeRejectToInviteeTimer.
* @param timer The timer to handle.
*/
void
MergeService::handleMergeRejectToInviteeTimer(MergeRejectToInviteeTimer * timer) {
if (timer->getNumberOfRetries() > 0) {
//send merge request to other group again
DEBUG("handleMergeRejectToInviteeTimer - send merge invitation message to opponent again");
sendMergeRequest(timer->getOtherDirectorAddress());
timer->restart();
}//End if
else {
//signal merge abort
std::string reason("merge abort - invitee did not answer");
dispatcher.signal(new MergeAbortEvent(reason));
//send merge reject
sendMergeReject(timer->getOtherDirectorAddress(), reason);
//clean up
finalise();
initialise();
}
}
/**
* @brief Handles an outdating MergeAbortToInviteeGroupTimer.
* @param timer The timer to handle.
*/
void
MergeService::handleMergeAbortToInviteeGroupTimer(MergeAbortToInviteeGroupTimer * /* timer */) {
DEBUG("handleMergeAbortToInviteeGroupTimer - abort merge");
//clean up timer
stopAndDeleteMergeAbortToInviteeGroupTimer();
//cancel merge, because we are not able to emit the MRO
std::string reason("merge abort - inviter group did not send the roster");
MergeAbort mab = MergeMessageFactory::createMergeAbortMessage(reason);
dispatcher.sendMessage(mab);
//start MergeAbortTimer
createAndStartMergeAbortTimer(mab);
}
/**
* @brief Handles an outdating MergeRosterFromInviteeTimer.
* @param timer The timer to handle.
*/
void
MergeService::handleMergeAbortToInviterGroupTimer(MergeAbortToInviterGroupTimer * timer) {
if (timer->getNumberOfRetries() > 0) {
DEBUG("handleMergeAbortToInviterGroupTimer - send merge roster to opponent again");
//send MRO to the invitee group again
sendMergeRoster(otherMergeDirectorTA);
timer->restart();
}//End if
else {
DEBUG("handleMergeAbortToInviterGroupTimer - abort merge");
//clean up timer
stopAndDeleteMergeAbortToInviterGroupTimer();
//cancel merge, because we are not able to emit the MRO
std::string reason("merge abort - invitee group did not send the roster");
MergeAbort mab = MergeMessageFactory::createMergeAbortMessage(reason);
dispatcher.sendMessage(mab);
//start MergeAbortTimer
createAndStartMergeAbortTimer(mab);
}//End else
}
/**
* @brief Handles an outdating MergeRollbackTimer.
* @param timer The timer to handle.
*/
void
MergeService::handleMergeRollbackTimer(MergeRollbackTimer * timer) {
DEBUG("handleMergeRollbackTimer - abort merge");
//signal merge abort
std::string reason("merge abort - merge could not be completed");
dispatcher.signal(new MergeAbortEvent(reason));
if (timer->getMessageType() == MF) {
//clean up
MemberRegister oldMR(dispatcher.getMembershipService().getLastMemberRegister());
dispatcher.getMembershipService().setupGroupFromMemberRegister(oldMR, oldPeerID);
}//End if
finalise();
initialise();
}
/**
* @brief Handles an outdating MergeAbortTimer.
* @param timer The timer to handle.
*/
void
MergeService::handleMergeAbortTimer(MergeAbortTimer * timer) {
MulticastMessage & mm = timer->getMessage();
if (timer->getNumberOfRetries() > 0) {
DEBUG("handleMergeAbortTimer - send " << mm.getType() << " to group again");
dispatcher.sendMessage(mm);
timer->restart();
}//End if
else {
DEBUG("handleMergeAbortTimer - unable to emit MAB message");
//signal merge abort
std::string reason("merge abort - unable to publish merge flush within the group");
dispatcher.signal(new MergeAbortEvent(reason));
//clean up
finalise();
initialise();
//TODO re-synchronisation
}//End else
}
/**
* @brief Creates and starts a MergeRejectToInviteeTimer.
* @param merge The current merge to save the parameters.
*/
void
MergeService::createAndStartMergeRejectToInviteeTimer(const TransportAddress & ta, const MergeOperation & merge) {
if (mergeRejectToInviteeTimer == NULL) {
mergeRejectToInviteeTimer = new MergeRejectToInviteeTimer(*this);
mergeRejectToInviteeTimer->setOtherDirectorAddress(ta);
mergeRejectToInviteeTimer->setMerge(merge);
mergeRejectToInviteeTimer->start();
}//End if
else {
DEBUG("createAndStartMergeRejectToInviteeTimer - timer created already");
}//End else
}
/**
* @brief Creates and starts a MergeAbortToInviteeGroupTimer.
*/
void
MergeService::createAndStartMergeAbortToInviteeGroupTimer() {
if (mergeAbortToInviteeGroupTimer == NULL) {
mergeAbortToInviteeGroupTimer = new MergeAbortToInviteeGroupTimer(*this);
mergeAbortToInviteeGroupTimer->start();
}//End if
else {
DEBUG("createAndStartMergeAbortToInviteeGroupTimer - timer create already");
}//End else
}
/**
* @brief Creates and starts a MergeAbortToInviterGroupTimer.
*/
void
MergeService::createAndStartMergeAbortToInviterGroupTimer() {
if (mergeAbortToInviterGroupTimer == NULL) {
mergeAbortToInviterGroupTimer = new MergeAbortToInviterGroupTimer(*this);
mergeAbortToInviterGroupTimer->start();
}//End if
else {
DEBUG("createAndStartMergeAbortToInviterGroupTimer - timer create already");
}//End else
}
/**
* @brief Creates and starts an MergeRollbackTimer.
*/
void
MergeService::createAndStartMergeRollbackTimer(MoversightMessageType messageType) {
if (mergeRollbackTimer == NULL) {
mergeRollbackTimer = new MergeRollbackTimer(*this, messageType);
mergeRollbackTimer->start();
}//End if
else {
DEBUG("createAndStartMergeRollbackTimer - timer create already");
}//End else
}
/**
* @brief Creates and starts a MergeAbortTimer.
*/
void
MergeService::createAndStartMergeAbortTimer(MulticastMessage & mm) {
if (mergeAbortTimer == NULL) {
mergeAbortTimer = new MergeAbortTimer(*this, mm);
mergeAbortTimer->start();
} else {
DEBUG("createAndStartMergeAbortTimer - timer already created");
}
}
/**
* @brief Stop running MergeRejectToInviteeTimer.
*/
void
MergeService::stopAndDeleteMergeRejectToInviteeTimer() {
if (mergeRejectToInviteeTimer != NULL) {
mergeRejectToInviteeTimer->stop();
//delete the timer
delete mergeRejectToInviteeTimer;
mergeRejectToInviteeTimer = NULL;
}//End if
else {
DEBUG("stopAndDeleteMergeRejectToInviteeTimer - timer already NULL");
}//End else
}
/**
* @brief Stop running MergeAbortToInviteeGroupTimer.
*/
void
MergeService::stopAndDeleteMergeAbortToInviteeGroupTimer() {
if (mergeAbortToInviteeGroupTimer != NULL) {
mergeAbortToInviteeGroupTimer->stop();
//delete the timer
delete mergeAbortToInviteeGroupTimer;
mergeAbortToInviteeGroupTimer = NULL;
}//End if
else {
DEBUG("stopAndDeleteMergeAbortToInviteeGroupTimer - timer already NULL");
}//End else
}
/**
* @brief Stop running MergeRosterFromInviteeTimer.
*/
void
MergeService::stopAndDeleteMergeAbortToInviterGroupTimer() {
if (mergeAbortToInviterGroupTimer != NULL) {
mergeAbortToInviterGroupTimer->stop();
//delete the timer
delete mergeAbortToInviterGroupTimer;
mergeAbortToInviterGroupTimer = NULL;
}//End if
else {
DEBUG("stopAndDeleteMergeAbortToInviterGroupTimer - timer already NULL");
}//End else
}
/**
* @brief Stop running MergeRollbackTimer.
*/
void
MergeService::stopAndDeleteMergeRollbackTimer() {
if (mergeRollbackTimer != NULL) {
mergeRollbackTimer->stop();
//delete the timer
delete mergeRollbackTimer;
mergeRollbackTimer = NULL;
}//End if
else {
DEBUG("stopAndDeleteMergeRollbackTimer - timer already NULL");
}//End else
}
/**
* @brief Stop running MergeAbortTimer.
*/
void
MergeService::stopAndDeleteMergeAbortTimer() {
if (mergeAbortTimer != NULL) {
mergeAbortTimer->stop();
//delete the timer
delete mergeAbortTimer;
mergeAbortTimer = NULL;
}//End if
else {
DEBUG("stopAndDeleteMergeAbortTimer - timer already NULL");
}//End else
}
/**
* @brief Stop all running timers.
*/
void
MergeService::stopAndDeleteAllTimers() {
stopAndDeleteMergeRejectToInviteeTimer();
stopAndDeleteMergeAbortToInviteeGroupTimer();
stopAndDeleteMergeAbortToInviterGroupTimer();
stopAndDeleteMergeRollbackTimer();
stopAndDeleteMergeAbortTimer();
}
/**
* @brief Creates and set the new member register in case of a merge.
* @param ro The roster of the opponent group to merge in current group.
* @param inviterGroup The flag to difference if peer has to attached at end the new group or has to attached at the end.
* @param peerPlacing The new PeerPlacingStrategy for invitee group
* @return The new created member register.
*/
MemberRegister
MergeService::createNewMemberRegisterAfterMerge(const Roster & ro, bool inviterGroup, const PeerPlacingStrategyType & /* peerPlacing */) {
DEBUG("createNewMemberRegisterAfterMerge - create a new MemberRegister");
Roster currentRoster = dispatcher.getMembershipService().getRosterFromGroup();
MemberRegister newMemberRegister(dispatcher);
//@TODO set PeerPlacingStrategy from inviter Group given by peerPlacing parameter
if (inviterGroup) {
newMemberRegister.initMemberRegisterFromRoster(currentRoster);
for (size_t i = 0; i < ro.size(); i++) {
MemberDescription memDesc = ro.getMemberDescription(i);
newMemberRegister.createPeer(memDesc);
}
}//End if
else {
newMemberRegister.initMemberRegisterFromRoster(ro);
for (size_t i = 0; i < currentRoster.size(); i++) {
MemberDescription memDesc = ro.getMemberDescription(i);
newMemberRegister.createPeer(memDesc);
}
}//End else
return newMemberRegister;
}
/**
* @brief Starts the merge with second group.
* @param ta The transport address of the peer to lead the merge in second group.
* @note API method.
*/
void
MergeService::mergeGroup(const TransportAddress & ta) {
if (getLocalSubState() == FLUSHING) {
std::string reason("flush in progress - please try later");
dispatcher.signal(new MergeAbortEvent(reason));
return;
}//End if
if (getLocalPeer().getLocalAddress() == ta) {
std::string reason("merge with my own");
dispatcher.signal(new MergeAbortEvent(reason));
return;
}
DEBUG("mergeGroup - starts a merge");
//no merge in progress on local peer
if (getLocalSubState() == NO_SUB_STATE) {
currentMerge = new MergeOperation();
setLocalSubState(WAITING_FOR_FLUSH);
mergeDirector = true;
inviterMergeDirector = true;
otherMergeDirectorTA = ta;
sendMergeRequest(ta);
//start a timer, which monitors the response of the merge request
createAndStartMergeRejectToInviteeTimer(ta, *currentMerge);
}//End if
else {
//signal merge abort
std::string reason("try later - merge in progress ");
dispatcher.signal(new MergeAbortEvent(reason));
}//End else
}
/**
* @brief The App response the requesting merge at inviter peer.
*/
void
MergeService::responseMerge() {
DEBUG("responseMerge - send MergeConfirm to other Group and start flushing");
if (mergeDirector && !inviterMergeDirector) {
//confirm merge request from inviter group
sendMergeConfirm(otherMergeDirectorTA);
//send MFL to group
MergeFlush mfl = MergeMessageFactory::createMergeFlushMessage();
dispatcher.sendMessage(mfl);
//start MergeAbortTimer
createAndStartMergeAbortTimer(mfl);
}//End if
}
/**
* @brief the App reject the requesting merge at inviter peer.
* @param reason The reason of the reject.
*/
void
MergeService::cancelMerge(const std::string & reason) {
DEBUG("cancleMerge - send MergeAbort");
sendMergeReject(otherMergeDirectorTA, reason);
}
/**
* @brief Creates and sends a merge request to peer, identified by the given invitation.
* @param inv The invitation for the desired peer.
*/
void
MergeService::sendMergeRequest(const TransportAddress & ta) {
TransportAddress inviterTA = getLocalAddress();
MergeRequest mr = MergeMessageFactory::createMergeRequestMessage(inviterTA);
DEBUG("sendMergeRequestToPeer - send merge to peer at address " << ta);
sendTo(mr, ta);
}
/**
* @brief Creates and sends a merge confirm to the desired peer in other group.
* @param ta The transport address of the desired peer.
*/
void
MergeService::sendMergeConfirm(const TransportAddress & ta) {
TransportAddress my = getLocalAddress();
MergeConfirm mc = MergeMessageFactory::createMergeConfirmMessage(my);
DEBUG("sendMergeConfirm - send merge confirm to peer at address " << ta);
sendTo( mc, ta);
}
/**
* @brief Creates and sends a merge reject to the desired peer in other group.
* @param ta The transport address of the desired peer.
* @param message The reason of rejection.
*/
void
MergeService::sendMergeReject(const TransportAddress & ta, const std::string & message) {
MergeReject mrj = MergeMessageFactory::createMergeRejectMessage(ta, message);
DEBUG("sendMergeReject - send merge reject to peer at address " << ta);
sendTo( mrj, ta);
}
/**
* @brief Creates and sends the current roster to peer, identified by the given invitation.
* @param inv The invitation for the desired peer.
*/
void
MergeService::sendMergeRoster(const TransportAddress & ta) {
Roster ro = dispatcher.getMembershipService().getRosterFromGroup();
TransportAddress myTA = getLocalAddress();
PeerPlacingStrategyType pp = dispatcher.getPeerPlacingStrategyType();
MergeRoster mro = MergeMessageFactory::createMergeRosterMessage(myTA, ro, pp);
DEBUG("sendMergeRoster - send roster to peer at address " << ta);
sendTo( mro, ta);
}
/**
* @brief Signals that a flush operation during merge has finished.
*/
void
MergeService::handleEvent(const FlushDoneEvent & e) {
if (currentMerge == NULL) return;
DEBUG("mergeFlushDone - inviter merge director send merge roster to invitee merge director");
if (mergeDirector) {
if (inviterMergeDirector) {
sendMergeRoster(otherMergeDirectorTA);
//start waiting timer of MRO from invitee
createAndStartMergeAbortToInviterGroupTimer();
}//End if
else {
//start waiting timer of MRO from inviter
createAndStartMergeAbortToInviteeGroupTimer();
}//End else
}//End if
setLocalSubState(WAITING_FOR_MERGE);
//handle the saved roster message
handleMergeRoster(&currentMerge->getMergeRoster());
}
}
}