Files
scandocs/uni/masterarbeit/source/moversight/mob/MobilitySupport.cc
2014-06-30 13:58:10 +02:00

1266 lines
44 KiB
C++

/*
* File: MobilitySupport.cc
* Author: jgaebler
*
* Created on January 18, 2012, 6:25 PM
*/
#include "MobilitySupport.h"
#include "Dispatcher.h"
#include "Moversight.h"
#include "common/MoversightService.h"
#include "common/container/PeerIDList.h"
#include "common/container/PeerIDToTaMap.h"
#include "common/Exception.h"
#include "event/events/ConnectionLostEvent.h"
#include "event/events/ConnectionReEstablishedEvent.h"
#include "ms/MembershipService.h"
#include "ms/Peer.h"
#include "ms/PeerState.h"
#include "ms/events/PeerLeftEvent.h"
#include "ms/events/RejoinDoneEvent.h"
#include "ms/events/RejoinFailedEvent.h"
#include "ms/msg/MSMessageFactory.h"
#include "mt/msg/MulticastMessage.h"
#include "mob/events/UnableToReconnectToGroupEvent.h"
#include "mob/timer/GroupClosedCausedByConnectionLostTimer.h"
#include "mob/timer/RejoinTimer.h"
#include "mob/timer/ReJoinRosterRequestTimer.h"
#include "mob/timer/AwaitRejoinTimer.h"
#include "mob/timer/RejoinTimer.h"
#include "mob/msg/TempGroupAnnounce.h"
#include "mob/msg/ReducedRoster.h"
#include "mob/msg/RejoinRoster.h"
#include "mob/msg/RejoinRosterAnnounce.h"
#include "mob/msg/RejoinRosterConfirm.h"
#include "mob/msg/RejoinAnnounce.h"
#include "mob/msg/RejoinDone.h"
#include "mob/msg/ReconnectAnnounce.h"
#include "mob/msg/RejoinFailed.h"
#include "fd/FailureDetector.h"
#include "fd/partition/events/PartitionDetectedEvent.h"
#include "fd/partition/events/NeighborReachableAgainEvent.h"
#include "simutils/events/StartMeasuringEvent.h"
#include "simutils/events/StopMeasuringEvent.h"
namespace ubeeme {
namespace moversight {
#undef DEBUG
#define DEBUG(msg) if (module.isPrintDebugMOB()){ if(dispatcher.getLocalState()== DISJOINED){ MOV_DEBUG <<"MOB@TA_"<<getLocalAddress()<<" "<<msg<<endl; } else{ MOV_DEBUG <<"MOB@"<<getLocalID() <<" "<< msg <<endl; } }
/**
* @brief Constructor
* @param d The dispatcher module.
*/
MobilitySupport::MobilitySupport(Dispatcher & d) : MoversightService(d, "MobilitySupport"), sync(d) {
}
/**
* @brief Default destructor
*
*/
MobilitySupport::~MobilitySupport() {
}
/**
* @brief Initalise the mobility support.
* This method initalise the mobility support module, by setting up all needed values.
* The method is called by the dispatcher. After intialise, the mobility support is ready
* to offer its services.
*/
void
MobilitySupport::initialise() {
dispatcher.subscribe<ConnectionLostEvent>(this);
dispatcher.subscribe<ConnectionReEstablishedEvent>(this);
dispatcher.subscribe<PeerLeftEvent>(this);
dispatcher.subscribe<PartitionDetectedEvent>(this);
leaveTimer = NULL;
reJoinRosterRequestTimer = NULL;
// for the rejoin after detected partition
awaitRejoinTimer = NULL;
rejoinTimer = NULL;
numberOfTga = 0;
sync.initialise();
}
/**
* @brief This method finalises the mobility support.
*/
void
MobilitySupport::finalise() {
dispatcher.unsubscribeAll(this);
stopAndDeleteGroupClosedCausedByConnectionLostTimer();
stopAndDeleteReJoinRosterRequestTimer();
stopAndDeleteAwaitRejoinTimer();
stopAndDeleteRejoinTimer();
sync.finalise();
}
/**
* @brief Handles the connection lost event, emitted by the dispatcher.
*/
void
MobilitySupport::handleEvent(const ConnectionLostEvent & e) {
//start the GroupCausedByConnectionLostTimer to ensure,
//that we leave the group, if we don't get a new connection within the
//right time
DEBUG("connectionLost - start leave timer");
createAndStartGroupClosedCausedByConnectionLostTimer();
// getFD().connectionLost();
}
/**
* @brief Handles the connection re-established event, emitted by the dispatcher.
*
* UNUSED
*/
void
MobilitySupport::handleEvent(const ConnectionReEstablishedEvent& e) {
//stop the timer and try to rejoin the group
stopAndDeleteGroupClosedCausedByConnectionLostTimer();
rejoinTimer->start();
PeerIDList pIDL;
pIDL.add(getLocalID());
PeerIDList disjPIDL;
//RA sinnvoll? nicht vllt direkt redRoster - da ja eigentlich nur einer verb wieder hat?!
sendRejoinAnnounce(getLocalID(), disjPIDL);
}
/**
* @brief Creates and starts a GroupClosedCausedByConnectionLostTimer if now timer currently set.
*/
void
MobilitySupport::createAndStartGroupClosedCausedByConnectionLostTimer() {
if (leaveTimer == NULL) {
leaveTimer = new GroupClosedCausedByConnectionLostTimer(*this);
leaveTimer->start();
}
}
/**
* @brief Stops the current GroupClosedCausedByConnectionLostTimer and delete them.
*/
void
MobilitySupport::stopAndDeleteGroupClosedCausedByConnectionLostTimer() {
DEBUG("stopTimer - try to stop the timer GroupClosedCausedByConnectionLostTimer");
if (leaveTimer != NULL) {
leaveTimer->stop();
delete leaveTimer;
leaveTimer = NULL;
}//End if
}
/**
* @brief Handles the timeout of a GroupClosedCausedByConnectionLostTimer
* @param timer The timer to handle
*/
void
MobilitySupport::handleGroupClosedCausedByConnectionLost(GroupClosedCausedByConnectionLostTimer* timer) {
if (getLocalState() == DISCONNECTED) {
dispatcher.signal(new UnableToReconnectToGroupEvent());
closeGroup();
}//End if
}
/**
* @brief The method creates and starts a ReJoinRosterRequestTimer
* @param reconnectList The list of peers to request for the roster.
*/
void
MobilitySupport::createAndStartReJoinRosterRequestTimer(PeerIDList & reconnectList) {
if (reJoinRosterRequestTimer == NULL) {
reJoinRosterRequestTimer = new ReJoinRosterRequestTimer(*this);
reJoinRosterRequestTimer->setRequestList(reconnectList);
reJoinRosterRequestTimer->start();
}
}
/**
* @brief Stops the current ReJoinRosterRequestTimer and delete them.
*/
void
MobilitySupport::stopAndDeleteReJoinRosterRequestTimer() {
DEBUG("stopTimer - try to stop the timer ReJoinRosterRequestTimer");
if (reJoinRosterRequestTimer != NULL) {
reJoinRosterRequestTimer->stop();
delete reJoinRosterRequestTimer;
reJoinRosterRequestTimer = NULL;
}//End if
}
/**
* @brief Handles the reJoinRosterRequestTimer. If the size of the list,
* containing the peers to connect less equal to zero, the timer is stopped
* an the reJoining to the group is failed. Otherwise, the next peer on the
* list is requested for an update of the current group roster.
* @param timer The timer to handle.
*/
void
MobilitySupport::handleReJoinRosterRequestTimer(ReJoinRosterRequestTimer * timer) {
DEBUG("handleReJoinRosterRequestTimer - handle ReJoinRosterRequest timer");
PeerIDList requestList = timer->getRequestList();
if (requestList.size() > 0) {
reconnectToGroup(requestList);
}//End if
else {
dispatcher.signal(new UnableToReconnectToGroupEvent());
closeGroup();
//FROM HERE ON WE HAVE QUIT THE GROUP !!!!
}//End else
}
/**
* @brief The method instructs the membership service to close the group.
*/
void
MobilitySupport::closeGroup() {
DEBUG("closeGroup - close group");
//we couldn't re-established a connection - we give up
getMS().closeGroup();
}
/**
* @brief Assignment operator.
* @param other The instance to assign.
* @return A reference of the current object.
*/
MobilitySupport &
MobilitySupport::operator=(const MobilitySupport& other) {
if (this == &other) {
return *this;
}
this->dispatcher = other.dispatcher;
this->module = other.module;
this->leaveTimer = other.leaveTimer;
return *this;
}
/**
* @brief Tries to reconnect the peer to the group. If the peers has lost its
* connection, the virtual synchrony is damage. After reconnecting to the group,
* it is necessary that the peer is re-established the virtual synchrony. This method
* only handles the reconnect part of a ReJoin operation.
*
* UNUSED
*/
void
MobilitySupport::rejoinToGroup() {
DEBUG("rejoinToGroup - start rejoin to group");
//PeerIDList rosterReqList = createRosterRequestList();
//reconnectToGroup(rosterReqList);
synchroniseWithGroup();
}
/**
* @brief Receiving signal from dispatcher that a partition was detected by the pd.
* @param unreachable The peers that do not belong to our group
*/
void
MobilitySupport::handleEvent(const PartitionDetectedEvent& e) {
createTempGroup(e.getPeerIDList());
}
/**
* @brief Receiving signal from the dispatcher that at least one of the peers not
* belonging to our part of the group is reachable again
* @param receiverID The id of the peer who's reachable again
* @param disjoinedPeers The list of peers that disjoined during the partitionPeriod
*/
void
MobilitySupport::handleEvent(const NeighborReachableAgainEvent& e) {
sendRejoinAnnounce(e.getReceiverID(), e.getPeerIDList());
}
/**
* @brief Method to create the temporary groups, which are needed because we cannot
* reach some of our group members.
*/
void
MobilitySupport::createTempGroup(const PeerIDList & unreachable) {
DEBUG("createTempGroup");
PeerIDList unreachableList = unreachable;
getMS().saveLastMemberRegister();
createAndStartRejoinTimer();
if (rejoinTimer->getDisconnectedPeers().size() != 0) {
rejoinTimer->saveCurrentMRToOriginalMR();
}
else {
//PeerIDList unreachableList = rejoinTimer->getDisconnectedPeers();
//unreachableList.add(unreachable); //orginal, wohl bug ??? TODO
unreachableList.add(rejoinTimer->getDisconnectedPeers());
}//end else
rejoinTimer->setDisconnectedPeers(unreachableList);
removePendingPeers(unreachableList);
dispatcher.signal(new StartMeasuringEvent("TG"));
if (isLocalPeerMaster()) {
sendTempGroupAnnounce();
}
}
/**
* @brief Checking whether the local group is primary or not.
* @return true if primary, false else
*/
bool
MobilitySupport::isGroupPrimary() {
size_t oldSize = rejoinTimer->getOriginalMR().getNumberOfPeers();
size_t newSize = getMS().getNumberOfPeers();
PeerID minPeerID = getMS().getCurrentMemberRegister().getPeerIDList().getMinElem();
PeerID minUnreachableID = rejoinTimer->getDisconnectedPeers().getMinElem();
// the reduced peerIdList of all reachable peers
// needed to make sure that there are not 2 partitions thinking they're primary
// in case of an odd number of peers in the old MR
bool even = false;
if (oldSize % 2 == 0) {
even = true;
}
// default: not in primary group
bool primary = false;
// if we're in the bigger part or if the groups are equally sized but we
// have the smallest PeerID - set primary true
if ((oldSize / 2 < newSize) ||
((oldSize / 2 == newSize) &&
(minPeerID < minUnreachableID) && even)) {
primary = true;
}
return primary;
}
/**
* @brief Removes all the nonreachablePeers from the MR.
* @param unreachable The peers ti remove.
*/
void
MobilitySupport::removePendingPeers(PeerIDList & unreachable) {
MemberRegister & mr = getMS().getCurrentMemberRegister();
mr.removePeers(unreachable);
}
/**
* @brief Send a TGA to all masters so that those can check if the tempGroup is correct
*/
void
MobilitySupport::sendTempGroupAnnounce() {
TempGroupAnnounce tga;
tga.setPeersInCluster(getMS().getCurrentMemberRegister().getClusterPeerIDList(getMS().getLocalClusterID()));
PeerIDList master = getMS().getMasterPeerIDList();
sendTo(tga, master);
}
/**
* @brief Handles a TGA. Checking whether the cluster of this master was changed correctly.
* @param tga - The TempGroupAnnounce
*/
void
MobilitySupport::handleTempGroupAnnounce(TempGroupAnnounce & tga) {
numberOfTga++;
if (isLocalPeerMaster() && getLocalID() != tga.getSourceID()) {
if (getMS().getLastMemberRegister().getNumberOfPeers() == 0) {
getMS().saveLastMemberRegister();
dispatcher.signal(new StartMeasuringEvent("TG"));
}
createAndStartRejoinTimer();
updateMRAfterTGA(getMS().getClusterPeerIDList(tga.getSourceID()), tga.getPeersInCluster());
if (getMS().getClusterPeerIDList(getLocalID()).size() == module.getMaxPeerCount() && numberOfTga == 1) {
sendTempGroupAnnounce();
}
}
if (isLocalPeerMaster() && numberOfTga == getMS().getNumberOfClusters()) {
tga.setPeersInCluster(getMS().getCurrentMemberRegister().getPeerIDList());
tga.setSourceID(getLocalID());
sendToCluster(tga);
}
if (!isLocalPeerMaster()) {
handleTGAAsSlave(tga);
}
if (!isLocalPeerMaster() || (isLocalPeerMaster() && numberOfTga == getMS().getNumberOfClusters())) {
// only if I'm secondary group: set state to "WAITING_FOR_REJOIN" and send NDM to non-reachable peers
if (!isGroupPrimary()) {
SubState s = WAITING_FOR_REJOIN;
getMS().getCurrentMemberRegister().setSubStateOfAllPeers(s);
if (isLowestMasterID()) {
getFD().startND(rejoinTimer->getDisconnectedPeers());
}
}
rejoinTimer->getDisconnectedPeers().clear();
DEBUG("handleTempGroupAnnounce");
getMS().getCurrentMemberRegister().printMemberRegister();
dispatcher.signal(new StopMeasuringEvent("TG"));
}
}
/**
* @brief Handles a TGA as a slave.
*/
void
MobilitySupport::handleTGAAsSlave(TempGroupAnnounce & tga) {
if (rejoinTimer == NULL) {
createAndStartRejoinTimer();
rejoinTimer->saveCurrentMRToOriginalMR();
getMS().saveLastMemberRegister();
dispatcher.signal(new StartMeasuringEvent("TG"));
}
updateMRAfterTGA(getMS().getPeerIDList(), tga.getPeersInCluster());
}
void
MobilitySupport::updateMRAfterTGA(const PeerIDList & peersInMR, const PeerIDList & peersInTGA) {
for (size_t i = 0; i < peersInMR.size(); i++) {
PeerID peer = peersInMR.get(i);
if (!peersInTGA.contains(peer)) {
getMS().removePeer(peer);
rejoinTimer->addDisconnectedPeer(peer);
}
}
}
/**
* @brief Creates a list peer IDs, used to try to connect to the group. The list contains at first the ID of the local master,
* subsequently the IDs of the local SLAVES (slave peers of the same cluster as the local peer) followed by all other masters and finally
* all other slaves. The peer has to be in marked with the state JOINED, to be included in the list.
* @return
*
* UNUSED
*/
PeerIDList
MobilitySupport::createRosterRequestList() {
DEBUG("createRosterRequestList")
MembershipService & ms = getMS();
//create the request list
PeerIDList rosterRequestList;
//add local master, local slaves and other masters
rosterRequestList = ms.getClusterAndMasterPeerIDList(getLocalPeer());
//add other slaves by unify the created list with the group peer id list
PeerIDList memberList = ms.getPeerIDList();
rosterRequestList.unify(memberList);
for (size_t i = rosterRequestList.size() - 1; i > 0; i++) {
if (ms.getPeer(rosterRequestList.get(i)).getPeerState() != JOINED) {
rosterRequestList.remove(i);
}//End if
}//End for
return rosterRequestList;
}
/**
* @brief The method tries to reconnect to the group,
* @param rosterReqList
*/
void
MobilitySupport::reconnectToGroup(PeerIDList & rosterReqList) {
//more peers to reconnect?
if (rosterReqList.size() > 0) {
DEBUG("reconnectToGroup - request roster from peer " << rosterReqList.first());
//at first, we request the roster from the next peer
//following, we start the timer to monitor the reJoinRequestRosterPeriod
PeerID requestPeerID = rosterReqList.first();
sendReJoinRosterRequest(requestPeerID);
//remove the used peer id from list
rosterReqList.pop();
//first try
if (reJoinRosterRequestTimer == NULL) {
createAndStartReJoinRosterRequestTimer(rosterReqList);
}//End if
else {
reJoinRosterRequestTimer->restart();
}//End else
}//error case
else {
DEBUG("reconnectToGroup - closing group, because we are failed to reconnect to group")
//signal unableToReconnectToGroup
dispatcher.signal(new UnableToReconnectToGroupEvent());
closeGroup();
//FROM HERE ON WE HAVE QUIT THE GROUP !!!!
}//End else
}
/**
* @brief The method sends a ReJoinRosterRequest to the given peer.
* @param requestPeerID The id of the peer to request.
*/
void
MobilitySupport::sendReJoinRosterRequest(PeerID /* requestPeerID */) {
/*
ReJoinRosterRequest msg;
msg.setRequestPeerID(requestPeerID);
DEBUG("sendReJoinRosterRequest - send ReJoinRosterRequest to peer " << requestPeerID);
sendToPeer(msg, requestPeerID);
*/
}
/**
* @brief Starts the synchronize process. The method tries to reSynchronize the
* local peer with the group to re-establish the virtual synchrony after a
* peer have disconnected from the group.
*/
void
MobilitySupport::synchroniseWithGroup() {
DEBUG("synchroniseWithGroup - start synchronization service")
if (sync.synchronize()) {
DEBUG("synchroniseWithGroup - successfully done");
}//End if
else {
DEBUG("synchroniseWithGroup - failed");
}//End else
}
/**
* @brief Stores a message for synchronization. The messages will be used
* afterwards for reestablishing the virtual synchrony of a failed peer.
* @param m The message to store.
*/
void
MobilitySupport::storeMessage(MulticastMessage& m) {
sync.storeMessage(m);
}
/**
* @brief Sending the reduced roster to the other group to tell who is still active and who isn't
* @param receiverID - the peer in the other group that we could reach
* @param reachablePeers - the list of the peers that are the peer can reach
* @param disjoinedPeers - all the peers who left during partition
*/
void
MobilitySupport::sendReducedRoster(PeerID receiverID, PeerIDList reachablePeers, PeerIDList disjoinedPeers) {
ReducedRoster rRoster;
rRoster.setSourceTA(getLocalAddress());
PeerIDToTaMap reachablePeersInCluster;
reachablePeers.sort();
for (size_t i = 0; i < reachablePeers.size(); i++) {
PeerID pID = reachablePeers.get(i);
Peer& p = getMS().getPeer(pID);
if (p.getState() == JOINED) {
reachablePeersInCluster.add(pID, getMS().getCurrentMemberRegister().getTransportAddressByPeerID(pID));
}
}
rRoster.setDisjoinedPeerIdList(disjoinedPeers);
rRoster.setReachablePeersQueue(reachablePeersInCluster);
std::stringstream buf;
buf << "sendReducedRoster - send ReducedRoster message to peer " << receiverID;
DEBUG(buf.str().c_str());
if (awaitRejoinTimer == NULL) {
awaitRejoinTimer = new AwaitRejoinTimer(*this);
}
awaitRejoinTimer->setStartedRejoin(true);
awaitRejoinTimer->start();
rejoinTimer->setReachablePeerID(receiverID);
rejoinTimer->setSecondaryPeerIDList(reachablePeers);
rejoinTimer->setDisconnectedPeers(disjoinedPeers);
sendTo(rRoster, receiverID);
}
/**
* @brief Sending the RejoinAnnounce to the rest of the group to communicate
* that a other group could be reached again. Needed to make sure that only one
* rejoin can happen at a time. Principle: first one wins.
* @param receiverID - the peer we reached in the other group
* @param disjoinedPeers - all the peers who disjoined during the partition
*/
void
MobilitySupport::sendRejoinAnnounce(const PeerID& receiverID, const PeerIDList & disjoinedPeers) {
RejoinAnnounce ra;
ra.setReReachablePeer(receiverID);
ra.setReachablePeers(getMS().getPeerIDList());
ra.setDisjoinedPeers(disjoinedPeers);
ra.setViewID(getViewID());
dispatcher.signal(new StartMeasuringEvent("REJOIN"));
dispatcher.sendMessage(ra);
}
/**
* @brief Handling a received RejoinAnnounce. At first stop the ND-Service because a peer
* from another group could by reached again. Change the substate to "REJOIN_IN_PROGRESS"
* and the state operation to "LOCKED". If the local peer is the sender of the RA send the
* Reduced Roster to the peer in the other group that we could reach beforehand.
* @param ra - the rejoin Announce.
* @param missedPeers - Peers who didn't got the message
*/
void
MobilitySupport::handleRejoinAnnounce(RejoinAnnounce & ra, const PeerIDList & missedPeers) {
DEBUG("handleRejoinAnnounce");
getFD().stopND();
setLocalSubState(REJOIN_IN_PROGRESS);
setLocalStateOperation(LOCKED);
// all received RA now it's time to send the reduced roster to the other group
if (ra.getSourceID() == getLocalID()) {
PeerIDList disjoinedPeers = ra.getDisjoinedPeers();
PeerIDList reachablePeers = ra.getReachablePeers();
for (size_t i = 0; i < missedPeers.size(); i++) {
disjoinedPeers.add(missedPeers.get(i));
reachablePeers.remove(missedPeers.get(i));
}
sendReducedRoster(ra.getReReachablePeerID(), reachablePeers, disjoinedPeers);
}
else {
dispatcher.signal(new StartMeasuringEvent("REJOIN"));
}
}
/**
* @brief Handling the ReducedRoster by sending out the ReconnectAnnounce.
* @param rro - the received reduced roster.
*/
void
MobilitySupport::handleReducedRoster(const ReducedRoster * rro) {
DEBUG("handleReducedRoster");
// jetzt erstmal memberregister mit neuen Infos aus SG updaten
if (getLocalSubState() != REJOIN_IN_PROGRESS) {
PeerIDList disjoinedPeerIDs = rro->getDisjoinedPeerIdList();
PeerIDToTaMap rroQueue = rro-> getReachablePeersQueue();
TransportAddress source = rro->getSourceTA();
PeerID reachableSecondary = getMS().getLastMemberRegister().getPeerIDbyTransportAddress(source);
ReconnectAnnounce rca;
rca.setReconnectedPeers(rroQueue);
rca.setDisconnectedPeers(disjoinedPeerIDs);
rca.setViewID(getViewID());
rca.setReachablePeerID(reachableSecondary);
dispatcher.sendMessage(rca);
}
}
/**
* @brief Handles a RCA message, received by the host.
* @param para The reconnect announce message to handle.
* @param missedPeers - Peers who didn't got the message.
*/
void
MobilitySupport::handleReconnectAnnounce(ReconnectAnnounce & rca, const PeerIDList & missedPeers) {
DEBUG("handleReconnectAnnounce");
getFD().stopND();
MembershipService & ms = getMS();
// Lock first! no other operation is allowed to take place during a rejoin
setLocalStateOperation(LOCKED);
setLocalSubState(REJOIN_IN_PROGRESS);
ms.saveLastMemberRegister();
//every peer has to update its member register, even if I'm sender of the para
PeerIDToTaMap reconnectedPeers = rca.getReconnectedPeers();
PeerIDList reconnectedPeerIDList = updateMRDueToReconnectedPeers(reconnectedPeers);
// remove those peers which became disconnected during the partition
PeerIDList disconnected = rca.getDisconnectedPeers();
disconnected.add(missedPeers);
updateMRDueToDisconnectedPeers(disconnected);
rejoinTimer->setResetMR(true);
SubState s = REJOIN_IN_PROGRESS;
getMS().getCurrentMemberRegister().setSubStateOfAllPeers(s);
// if I'm sender of the para - send the rejoinRoster to the secondary group
if (rca.getSourceID() == getLocalID()) {
PeerID reachableSecondary = rca.getReachablePeerID();
disconnected.add(rejoinTimer->getDisconnectedPeers());
sendRejoinRoster(reachableSecondary, reconnectedPeerIDList, disconnected);
}
}
/**
* @brief Method updates the MR due to some reconnected Peers during a rejoin
* @param reconnectedPeers The PeerIDtoTAMap<id,ta> of peers reachable again.
* @return only the peerIDList of the reconnected peers
*/
PeerIDList
MobilitySupport::updateMRDueToReconnectedPeers(PeerIDToTaMap & reconnectedPeers) {
PeerIDList reconnectedPeerIDList;
MembershipService & ms = getMS();
MemberRegister originalMR = rejoinTimer->getOriginalMR();
for (size_t i = 0; i < reconnectedPeers.size(); i++) {
PeerID pID = reconnectedPeers.getKey(i);
reconnectedPeerIDList.add(pID);
TransportAddress ta = reconnectedPeers.get(i);
Peer peer;
if (originalMR.contains(pID)) {
peer = originalMR.getPeer(pID);
//check whether the transportadress has changed
if (peer.getLocalAddress() != ta) {
peer.setLocalAddress(ta);
}
}
else {
peer.setPeerID(pID);
peer.setLocalAddress(ta);
}
peer.setState(JOINED); //nfd bescheid geben?
// trying to add the peer at it's old position maybe??
ms.placeRejoiningPeer(peer, originalMR);
}
rejoinTimer->setOriginalMR(originalMR);
return reconnectedPeerIDList;
}
/**
* @brief Method updates the MR due to peers that left the group during partition time.
* @param disconnected The peeridlist of disconnected peers
*/
void
MobilitySupport::updateMRDueToDisconnectedPeers(PeerIDList & disconnected) {
MembershipService ms = getMS();
for (size_t i = 0; i < disconnected.size(); i++) {
// event peer disconnected
PeerID pID = disconnected.get(i);
ms.removePeer(pID);
rejoinTimer->getOriginalMR().removePeer(pID);
}
}
/**
* @brief Sending a Roster to the List of reconnected peers .
* @param reachableSecondary The peer who send the RRO before.
* @param reconnectedPeerIDList List of peers rejoining the group.
* @param disconnected PeerIDList of the ones who left the secondary group during partition time.
*/
void
MobilitySupport::sendRejoinRoster(PeerID reachableSecondary, PeerIDList reconnectedPeerIDList, PeerIDList disconnected) {
DEBUG("sendRejoinRoster - send rejoinRoster to secondary group");
//TODO fix ugly copy stuff
Roster roster(getMS().mr.createRoster());
RejoinRoster rejoinRo;
rejoinRo.setSourceID(getLocalID());
rejoinRo.setNextPeerID(roster.getNextPeerID());
rejoinRo.setViewID(roster.getViewID());
rejoinRo.setMemberDescriptionList(roster.getMemberDescriptionList());
rejoinRo.setDisconnectedPeers(disconnected);
sendTo(rejoinRo, reachableSecondary);
if (awaitRejoinTimer == NULL) {
awaitRejoinTimer = new AwaitRejoinTimer(*this);
}
awaitRejoinTimer->setStartedRejoin(false);
rejoinTimer->setSecondaryPeerIDList(reconnectedPeerIDList);
rejoinTimer->setDisconnectedPeers(disconnected);
rejoinTimer->setReachablePeerID(reachableSecondary);
awaitRejoinTimer->start();
}
/**
* @brief Handling the received rejoin roster by updating the memberRegister and
* sending an RejoinConfirm back.
* @param ro - The received rejoinRoster.
*/
void
MobilitySupport::handleRejoinRoster(RejoinRoster & ro) {
DEBUG("handleRejoinRoster");
stopAndDeleteAwaitRejoinTimer();
sendRejoinRosterAnnounce(ro);
}
/**
* @brief Sending the RejoiinRosterAnnounce to my own group via MT
* @param ro The rejoinRoster containing the needed information.
*/
void
MobilitySupport::sendRejoinRosterAnnounce(RejoinRoster& ro) {
DEBUG("sendRejoinRosterAnnounce");
RejoinRosterAnnounce roa;
roa.setSourceID(getLocalID());
roa.setNextPeerID(ro.getNextPeerID());
roa.setNewViewID(ro.getViewID());
roa.setMemberDescriptionList(ro.getMemberDescriptionList());
roa.setDisconnectedPeers(ro.getDisconnectedPeers());
dispatcher.sendMessage(roa);
}
/**
* @brief Handles an incoming ROA, by updating local MR.
* @param roa The RejoinRosterAnnounce containing all the information to update MR.
* @param missedPeers - Peers who didn't got the message.
*/
void
MobilitySupport::handleRejoinRosterAnnounce(RejoinRosterAnnounce& roa, const PeerIDList& missedPeers) {
DEBUG("handleRejoinRosterAnnounces");
if (missedPeers.size() == 0) {
Roster roster;
roster.setNextPeerID(roa.getNextPeerID());
roster.setViewID(roa.getNewViewID());
MemberDescriptionList mDescList = roa.getMemberDescriptionList();
for (size_t i = 0; i < mDescList.size(); i++) {
MemberDescription mDesc = mDescList.get(i);
roster.addMemberDescription(mDesc);
}//End for
getMS().saveLastMemberRegister();
rejoinTimer->setResetMR(true);
getMS().setupGroupFromRoster(roster, getLocalID());
// removing the disconnected peers
// so that afterwards no one thinks they are still missing
rejoinTimer->getOriginalMR().removePeers(roa.getDisconnectedPeers());
if (roa.getSourceID() == getLocalID()) {
sendRejoinRosterConfirm(rejoinTimer->getReachablePeerID());
}
}
else {
if (roa.getSourceID() == getLocalID()) {
sendRejoinFailedMessage();
}
}
}
/**
* @brief Sending a rejoinRosterConfirm to acknowledge the reception of the rejoinRoster.
* @param destination The peer who send the rejoinRoster
*/
void
MobilitySupport::sendRejoinRosterConfirm(PeerID destination) {
RejoinRosterConfirm rrc;
rrc.setSourceID(getLocalID());
sendTo(rrc, destination);
}
/**
* @brief Handles a received rejoinRosterConfirm. If all the expected peers have answered
* the rejoinDone can be send.
* @param rrc The received RejoinRosterConfirm
*/
void
MobilitySupport::handleRejoinRosterConfirm(RejoinRosterConfirm * rrc) {
sendRejoinDone();
stopAndDeleteAwaitRejoinTimer();
}
/**
* @brief Handles a received AwaitRejoinConfirmTimer if it times out.
* @param timer The outtimed AwaitRejoinConfirmTimer
*/
void
MobilitySupport::handleAwaitRejoinTimer(AwaitRejoinTimer * timer) {
DEBUG("handleAwaitRejoinTimer");
if (awaitRejoinTimer->getNumberOfRetries() > 0) {
if (awaitRejoinTimer->startedRejoin()) {
sendReducedRoster(rejoinTimer->getReachablePeerID(), rejoinTimer->getSecondaryPeerIDList(), rejoinTimer->getDisconnectedPeers());
}
else {
sendRejoinRoster(rejoinTimer->getReachablePeerID(), rejoinTimer->getSecondaryPeerIDList(), rejoinTimer->getDisconnectedPeers());
}
}
else {
stopAndDeleteAwaitRejoinTimer();
if (rejoinTimer->needToResetMR()) {
getMS().getCurrentMemberRegister() = getMS().getLastMemberRegister();
}
sendRejoinFailedMessage();
}
}
/**
* @brief Stops the current AwaitRejoinConfirmTimer and delete them.
*/
void
MobilitySupport::stopAndDeleteAwaitRejoinTimer() {
DEBUG("stopTimer - try to stop the timer AwaitRejoinConfirmTimer");
if (awaitRejoinTimer != NULL) {
if (awaitRejoinTimer->isRunning()) {
awaitRejoinTimer->stop();
}
delete awaitRejoinTimer;
awaitRejoinTimer = NULL;
}
}
/**
* @brief Handles a received rejoinTimer if its timed out
* @param timer The received timer to handle.
*/
void
MobilitySupport::handleRejoinTimer(RejoinTimer * timer) {
timer->stop();
getFD().stopND();
if (rejoinTimer->needToResetMR()) {
getMS().getCurrentMemberRegister() = getMS().getLastMemberRegister();
}
if (isLocalPeerMaster() && isLowestMasterID()) {
sendRejoinFailedMessage();
}
}
/**
* @brief Handles a peerLeft event during partition
*
* Updates the list of connected and disconnected secondary peers needed
* to be correct for rejoining the primary group.
*
* @param peer The left peer.
*/
void
MobilitySupport::handleEvent(const PeerLeftEvent& e) {
if (rejoinTimer != NULL && getLocalSubState() == REJOIN_IN_PROGRESS) {
if (getLocalSubState() == WAITING_FOR_REJOIN) {
rejoinTimer->removeSecondaryPeer(e.getPeer().getPeerID());
}
rejoinTimer->addDisconnectedPeer(e.getPeer().getPeerID());
getMS().getLastMemberRegister().removePeer(e.getPeer().getPeerID());
}
}
/**
* @brief Sending a rejoinDone to every peer in the group.
*/
void
MobilitySupport::sendRejoinDone() {
DEBUG("sendRejoinDone");
RejoinDone rd;
dispatcher.sendMessage(rd);
}
/**
* @brief Positive result: rejoin was successful - handles rejoinDoneMsg.
* @param msg - the received rejoinDone to handle
* @param missedPeers - peers who didn't replied with a rejoinRosterConfirm and so still are pending
*/
void
MobilitySupport::handleRejoinDone(RejoinDone & msg, const PeerIDList & missedPeers) {
DEBUG("handleRejoinDone");
rejoinTimer->setResetMR(false);
PeerIDList missingPeerIDList = determineStillMissingPeers();
if (missedPeers.size() != 0) {
missingPeerIDList.add(missedPeers);
getMS().getCurrentMemberRegister().removePeers(missedPeers);
}
rejoinTimer->setDisconnectedPeers(missingPeerIDList);
SubState subState;
if (isGroupPrimary()) {
subState = NO_SUB_STATE;
}
else {
subState = WAITING_FOR_REJOIN;
}
getMS().getCurrentMemberRegister().setSubStateOfAllPeers(subState);
setLocalStateOperation(NO_STATE_OPERATION);
getMS().saveLastMemberRegister(rejoinTimer->getOriginalMR());
getMS().getCurrentMemberRegister().printMemberRegister();
if (getLocalSubState() == WAITING_FOR_REJOIN) {
SubState s = WAITING_FOR_REJOIN;
getMS().getCurrentMemberRegister().setSubStateOfAllPeers(s);
if (isLowestMasterID()) {
getFD().startND(missingPeerIDList);
}
}
if (missingPeerIDList.size() == 0) {
rejoinTimer->stop();
}
dispatcher.signal(new RejoinDoneEvent());
}
/**
* @brief Sending a rejoin failed message due to being not able to reconnect.
* Possible reasons: timeout rejoinTimer, timeout awaitRejoinConfirmTimer..?
*/
void
MobilitySupport::sendRejoinFailedMessage() {
DEBUG("sendRejoinFailedMessage");
RejoinFailed rf;
dispatcher.sendMessage(rf);
}
/**
* @brief Handles a received rejoinFailedMessage.
* @param rf The message to handle
* @param missedPeers - Peers who didn't got the message.
*/
void
MobilitySupport::handleRejoinFailedMessage(RejoinFailed & rf, const PeerIDList & missedPeers) {
DEBUG("handleRejoinFailedMessage");
if (getViewID() == rf.getViewID()) {
getFD().finaliseND();
MembershipService ms = getMS();
if (rejoinTimer->needToResetMR()) {
ms.getCurrentMemberRegister() = getMS().getLastMemberRegister();
}
if (missedPeers.size() != 0) {
getMS().getCurrentMemberRegister().removePeers(missedPeers);
}
if (getLocalSubState() == WAITING_FOR_REJOIN || getLocalSubState() == REJOIN_IN_PROGRESS) {
SubState s = NO_SUB_STATE;
StateOperation so = NO_STATE_OPERATION;
ms.getCurrentMemberRegister().setSubStateOfAllPeers(s);
ms.getCurrentMemberRegister().setStateOperationOfAllPeers(so);
}
stopAndDeleteRejoinTimer();
// getFD().localPeerUpdated();
dispatcher.signal(new RejoinFailedEvent());
ms.getCurrentMemberRegister().printMemberRegister();
}
else {
DEBUG("handleRejoinFailedMessage - RejoinFailed already handled!");
}
}
/**
* @brief Starts the timer to monitor the whole rejoining process.
*/
void
MobilitySupport::createAndStartRejoinTimer() {
if (rejoinTimer == NULL) {
rejoinTimer = new RejoinTimer(*this);
rejoinTimer->start();
}
}
/**
* @brief Stops the current RejoinTimer and delete them.
*/
void
MobilitySupport::stopAndDeleteRejoinTimer() {
DEBUG("stopTimer - try to stop the timer RejoinTimer");
if (rejoinTimer != NULL) {
rejoinTimer->stop();
delete rejoinTimer;
rejoinTimer = NULL;
}
}
/**
* @brief Getter for the NFD
* @return Reference to the NFD
*/
FailureDetector &
MobilitySupport::getFD() {
return dispatcher.getFailureDetector();
}
/**
* @brief Getter for the MembershipService
* @return Reference to MS
*/
MembershipService &
MobilitySupport::getMS() {
return dispatcher.getMembershipService();
}
/**
* @brief Determine the lowest master id.
* @return the PeerID of a master
*/
bool
MobilitySupport::isLowestMasterID() {
PeerID localID = getLocalID();
PeerIDList masters = getMS().getMasterPeerIDList();
for (size_t i = 0; i < masters.size(); i++) {
if (masters.get(i) < localID) {
return false;
}
}
return true;
}
/**
* @brief Determining whether there are still peers missing, who didn't rejoin.
* @return PeerIDList of those peers who still are separated from us.
*/
PeerIDList
MobilitySupport::determineStillMissingPeers() {
PeerIDList oldMRPeers = rejoinTimer->getOriginalMR().getPeerIDList();
MemberRegister actualMR = getMS().getCurrentMemberRegister();
PeerIDList stillMissings;
for (size_t i = 0; i < oldMRPeers.size(); i++) {
PeerID oldPeerID = oldMRPeers.get(i);
if (!actualMR.contains(oldPeerID)) {
stillMissings.add(oldMRPeers.get(i));
}
}
return stillMissings;
}
/**
* @brief Permits access to the synchronization service.
* @return The synchronization service.
*/
SynchronizationService &
MobilitySupport::getSynchronizationService() {
return sync;
}
}
}