/* * File: SplitService.cc * Author: jgaebler * Author: sgaebler * * Created on November 11, 2011, 3:10 PM */ #include "SplitService.h" #include "Moversight.h" #include "Dispatcher.h" #include "event/events/FlushStartEvent.h" #include "event/events/FlushDoneEvent.h" #include "ms/events/LocalPeerUpdatedEvent.h" #include "ms/events/PeerLeftEvent.h" #include "split/SplitOperation.h" #include "split/timer/SplitAbortTimer.h" #include "split/msg/SplitMessageFactory.h" #include "split/msg/SplitAnnounce.h" #include "split/events/SplitDoneEvent.h" #include "split/events/SplitAbortEvent.h" namespace ubeeme { namespace moversight { #undef DEBUG #define DEBUG(msg) if (module.isPrintDebugTC()){ if(getLocalState()== DISJOINED){ MOV_DEBUG <<"SPLITS@TA_"<(this); dispatcher.subscribe(this); currentSplit = NULL; splitAbortTimer = NULL; splitDirector = false; } /** * @brief Finalise the split service. */ void SplitService::finalise() { dispatcher.unsubscribeAll(this); stopAndDeleteSplitAbortTimer(); if (currentSplit != NULL) { delete currentSplit; } setLocalSubState(NO_SUB_STATE); } /** * @brief This method handles a split announce message by split a set of given peers of the group to a new group * @param spa The split announce message to handle. * @param missedPeers Provides the IDs of the peers, which have missed the split announce. */ void SplitService::handleSplitAnnounceMessage(SplitAnnounce & spa, const PeerIDList & missedPeers) { //cancel SplitAnnounceTimer if (splitDirector) { stopAndDeleteSplitAbortTimer(); }//End if SplitOptions options(spa.getOptions()); //check reply flag with missedPeeers if (!doSplit(options, spa.getSplitPeers(), missedPeers)) { DEBUG("handleSplitAnnounceMessage - signal split abort because some members don't get the SPA message as expected"); //cancel split, because missedPeers not confirm to reply-flag if (splitDirector) { std::string reason("some peers did not answered as expected"); dispatcher.signal(new SplitAbortEvent(reason)); }//End if return; }//End if if (getLocalSubState() == NO_SUB_STATE) { //localPeerID and senderID of SPA is tie break of concurrent splits //first SPA is winner if (getLocalID() != spa.getSourceID()) { splitDirector = false; }//End if DEBUG("handleSPlitAnnounceMessage - create split object"); currentSplit = new SplitOperation(options, spa.getSplitPeers()); } else if (getLocalSubState() == FLUSHING) { DEBUG("handleSPlitAnnounceMessage - signal split abort because a split is in progress"); //cancel split, because a split is already in progress. no cascading splits allowed std::string reason("try later - split in progress "); dispatcher.signal(new SplitAbortEvent(reason)); return; } else { DEBUG("handleSPlitAnnounceMessage - signal split abort because a merge is in progress"); //cancel split, because a merge is in progress. std::string reason("try later - merge in progress "); dispatcher.signal(new SplitAbortEvent(reason)); return; }//End else //first flushing before split? if (options.isFlushSynchronisation()) { DEBUG("handleSPlitAnnounceMessage - do split but first flushing"); setLocalSubState(FLUSHING); //notify the upper tier that split flush starts progress dispatcher.signal(new FlushStartEvent()); return; }//End if if (currentSplit != NULL && currentSplit->getOptions().isSemanticValidation()) { DEBUG("handleSPlitAnnounceMessage - the semantic flag is set"); //set split validator dispatcher.setMessageValidator(SEMANTIC_SPLIT_VALIDATOR); }//End if PeerIDList splitPeers = currentSplit->getSplitPeers(); //check if peerIDs in splitPeers exist if (!splitPeersExist(splitPeers)) { DEBUG("handleSPlitAnnounceMessage - PeerID in split peers not a group member"); if (splitDirector) { //signal split abort std::string reason = "could not split - peerID not exist"; dispatcher.signal(new SplitAbortEvent(reason)); }//End if //clean up finalise(); initialise(); return; }//End if //check if set of splitPeers is identical to all Peers if (splitPeersEqualGroup(splitPeers)) { DEBUG("handleSPlitAnnounceMessage - set of split peers equal to whole group set"); //clean up finalise(); initialise(); //signal split done dispatcher.signal(new SplitDoneEvent()); return; }//End if DEBUG("handleSPlitAnnounceMessage - create new MemberRegister and finish split"); //safe old MemberRegister dispatcher.getMembershipService().saveLastMemberRegister(); //set new member register bool noError = true; try { MemberRegister newMR(createNewMemberRegisterAfterSplit(splitPeers)); //find local PeerID in new MemberRegister TransportAddress myTa = getLocalAddress(); PeerID localPeerID = newMR.getPeerIDbyTransportAddress(myTa); //switch old to new MemberRegister and set new local peerID dispatcher.getMembershipService().setupGroupFromMemberRegister(newMR, localPeerID); } catch (PeerNotFoundException pnfe) { noError = false; std::string reason(pnfe.what()); if (splitDirector) { //signal split abort dispatcher.signal(new SplitAbortEvent(reason)); }//End if } //clean up finalise(); initialise(); if (noError) { //signal split done dispatcher.signal(new SplitDoneEvent()); //signal peer update dispatcher.signal(new LocalPeerUpdatedEvent(dispatcher.getLocalPeer())); } } /** * @brief Handles an outdating SplitAbortTimer. * @param timer The timer to handle. */ void SplitService::handleSplitAbortTimer(SplitAbortTimer * timer) { if (timer->getNumberOfRetries() > 0) { DEBUG("handleSplitAbortTimer - send split announce message to group again"); //signal SPA again to the group SplitAnnounce spa = SplitMessageFactory::createSplitAnnounceMessage(timer->getSplit().getOptions(), timer->getSplit().getSplitPeers()); dispatcher.sendMessage(spa); timer->restart(); }//End if else { DEBUG("handleSplitAbortTimer - unable to emit SPA message"); //cancel split, because we are not able to emit the SPA std::string reason("split abort - unable to publish split announce within the group"); dispatcher.signal(new SplitAbortEvent(reason)); //clean up finalise(); initialise(); // @TODO re-synchronisation }//End else } /** * @brief Creates and starts a SplitAbortTimer. * @param spo The current split to save the parameters. */ void SplitService::createAndStartSplitAbortTimer(SplitOperation & spo) { if (splitAbortTimer == NULL) { splitAbortTimer = new SplitAbortTimer(*this); splitAbortTimer->setSplit(spo); splitAbortTimer->start(); }//End if else { DEBUG("createAndSplitAbortTimer - timer already created"); }//End else } /** * @brief Stop running SplitAbortTimer. */ void SplitService::stopAndDeleteSplitAbortTimer() { if (splitAbortTimer != NULL) { splitAbortTimer->stop(); //delete the timer delete splitAbortTimer; splitAbortTimer = NULL; }//End if else { DEBUG("stopAndDeleteSplitAbortTimer - timer already NULL"); }//End else } /** * @brief Starts the split. * @param options Contents the Flags for synchronising the message queues and SPLIT behavior * @param splitPeers Contents the PeerIDs of the splitting peers */ void SplitService::splitGroup(SplitOptions options, PeerIDList & splitPeers) { if (getLocalSubState() == FLUSHING) { std::string reason("flush in progress - please try later"); dispatcher.signal(new SplitAbortEvent(reason)); }//End if DEBUG("splitGroup - starts a split"); //check if this peer is already a split director if (splitDirector) { //cancel split, because a split is already in progress std::string reason("try later - split in progress "); dispatcher.signal(new SplitAbortEvent(reason)); return; }//End if //this peer is the current split director splitDirector = true; //create a SPA peer message SplitAnnounce spa = SplitMessageFactory::createSplitAnnounceMessage(options, splitPeers); // debugging - to see the member register before the split MemberRegister mr = dispatcher.getMembershipService().getCurrentMemberRegister(); mr.printMemberRegister(); dispatcher.sendMessage(spa); //start a split announce timer to ensure the proper completion //of the split process SplitOperation sp(options, splitPeers); createAndStartSplitAbortTimer(sp); }//End /** * @brief This method checks if a incoming split announce should be handled. If not then set error case. * @param options Contents the compare options how peers have to answer. * @param splitPeers Contents the PeerIDs of the splitting peers. * @param missedPeers Provides the IDs of the peers, which have missed the split announce. * @return True if comparing options with missedPeers is identically, false otherwise. */ bool SplitService::doSplit(SplitOptions options, PeerIDList splitPeers, const PeerIDList & missedPeers) { //check if all peers get the SPA message if (missedPeers.size() == 0) { return true; }//End if //check if all peers have to get the SPA message if (options.isReplyAll()) { return false; }//End if //just the staying peers have to get the SPA message PeerIDList allPeers = dispatcher.getMembershipService().getPeerIDList(); PeerIDList stayPeers(allPeers); for (size_t i = 0; i < allPeers.size(); i++) { if (splitPeers.contains(allPeers.get(i))) { stayPeers.remove(allPeers.get(i)); }//End if }//End for for (size_t i = 0; i < missedPeers.size(); i++) { if (stayPeers.contains(missedPeers.get(i))) { return false; }//End if }//End for return true; } /** * @brief Creates and set the new member register. * @param splitPeers Contents the PeerIDs of the splitting peers. * @return The new created member register. */ MemberRegister SplitService::createNewMemberRegisterAfterSplit(PeerIDList splitPeers) { DEBUG("createNewMemberRegisterAfterSplit - create a new MemberRegister"); size_t howManyPeersSplit = currentSplit->getSplitPeers().size(); PeerList allPeers = dispatcher.getMembershipService().getPeerList(); Roster currentRoster = dispatcher.getMembershipService().getRosterFromGroup(); MemberRegister newMemberRegister(dispatcher); newMemberRegister.initMemberRegisterFromRoster(currentRoster); //check if local peer is in set if splitPeers if (isLocalPeerInSplitPeers(splitPeers)) { bool remove = false; for (size_t i = 0; i < allPeers.size(); i++) { remove = true; for (size_t j = 0; j < howManyPeersSplit; j++) { if (allPeers.get(i).getPeerID() == splitPeers.get(j)) { remove = false; break; }//End if }//End for if (remove == true) { newMemberRegister.removePeer(allPeers.get(i).getPeerID()); }//End if }//End for } else { for (size_t i = 0; i < howManyPeersSplit; i++) { if (newMemberRegister.contains(splitPeers.get(i))) { newMemberRegister.removePeer(splitPeers.get(i)); }//End if }//End for }//End else if (module.isPrintDebugTC()) { newMemberRegister.printMemberRegister(); } return newMemberRegister; }//End /** * @brief Handels the peer left event by removing a peer within the set * of splitting peers. * @param e The peer left event. */ void SplitService::handleEvent(const PeerLeftEvent & e) { PeerID peerID = e.getPeer().getPeerID(); if (currentSplit != NULL) { if (currentSplit->getSplitPeers().contains(peerID)) { currentSplit->getSplitPeers().remove(peerID); }//End if }//End if } /** * @brief Signals that a flush operation during split has finished. */ void SplitService::handleEvent(const FlushDoneEvent & e) { if (currentSplit == NULL) return; DEBUG("handle FlushDoneEvent - now create new member register and split group"); PeerIDList splitPeers = currentSplit->getSplitPeers(); //check if peerIDs in splitPeers exist if (!splitPeersExist(splitPeers)) { //signal split abort std::string reason = "could not split - peerID not exist"; dispatcher.signal(new SplitAbortEvent(reason)); //clean up finalise(); initialise(); return; }//End if //check if set of splitPeers is identical to all Peers if (splitPeersEqualGroup(splitPeers)) { //signal split done dispatcher.signal(new SplitDoneEvent()); //clean up finalise(); initialise(); return; }//End if //safe old MemberRegister dispatcher.getMembershipService().saveLastMemberRegister(); //set new member register MemberRegister newMR(createNewMemberRegisterAfterSplit(splitPeers)); TransportAddress myTa = getLocalAddress(); try { //find local PeerID in new MemberRegister PeerID localPeerID = newMR.getPeerIDbyTransportAddress(myTa); //switch old to new MemberRegister and set new local peerID dispatcher.getMembershipService().setupGroupFromMemberRegister(newMR, localPeerID); } catch (PeerNotFoundException pnfe) { //signal split abort std::string reason(pnfe.what()); dispatcher.signal(new SplitAbortEvent(reason)); } //clean up finalise(); initialise(); //signal split done dispatcher.signal(new SplitDoneEvent()); dispatcher.signal(new LocalPeerUpdatedEvent(dispatcher.getLocalPeer())); } /** * @brief Check if the splitting peers equal to the current member register. * @param splitPeers The PeerIDs of the splitting peers. * @return True the set of split peers and member register is identical otherwise false. */ bool SplitService::splitPeersEqualGroup(PeerIDList splitPeers) { if (splitPeers.size() != dispatcher.getMembershipService().getPeerList().size()) { return false; }//End if for (size_t i = 0; i < splitPeers.size(); i++) { if (!dispatcher.getMembershipService().getPeerList().contains(splitPeers.get(i))) { return false; }//End if }//End for return true; } /** * @brief Check if the splitting peers in the current member register exist. * @param splitPeers The PeerIDs of the splitting peers. * @return True if all splitPeers exist otherwise false. */ bool SplitService::splitPeersExist(PeerIDList splitPeers) { for (size_t i = 0; i < splitPeers.size(); i++) { if (!dispatcher.getMembershipService().getPeerList().contains(splitPeers.get(i))) { return false; }//End if }//End for return true; } /** * @brief Check if the peer is within the set of splitting peers. * @param splitPeers The PeerIDs of the splitting peers. * @return True if peer is within splitPeers otherwise false. */ bool SplitService::isLocalPeerInSplitPeers(PeerIDList splitPeers) { return splitPeers.contains(getLocalID()); } /** * @brief Assignment operator. * @param other The instance to assign. * @return A reference of the current object. */ SplitService & SplitService::operator=(const SplitService & other) { if (this == &other) { return *this; }//End if splitAbortTimer = other.splitAbortTimer; currentSplit = other.currentSplit; splitDirector = other.splitDirector; return *this; } } }