Files
scandocs/uni/masterarbeit/source/moversight/ms/MembershipService.cc
2014-06-30 13:58:10 +02:00

1659 lines
60 KiB
C++

/*
* File: MembershipService.cc
* Author: jgaebler
*
* Created on July 23, 2010, 2:11 PM
*/
#include "MembershipService.h"
#include "Dispatcher.h"
#include "Moversight.h"
#include "app/PeerDescription.h"
#include "common/transport/msg/ExteriorMessage.h"
#include "event/events/ConnectionLostEvent.h"
#include "ms/Peer.h"
#include "ms/timer/JoinAbortToInviterTimer.h"
#include "ms/timer/JoinAbortToInviteeTimer.h"
#include "ms/timer/JoinAnnounceTimer.h"
#include "ms/timer/LeaveAnnounceTimer.h"
#include "ms/msg/MSMessageFactory.h"
#include "ms/msg/PeerReconnectAnnounce.h"
#include "ms/events/GroupCreatedEvent.h"
#include "ms/events/GroupClosedEvent.h"
#include "ms/events/LocalPeerUpdatedEvent.h"
#include "ms/events/PeerJoinedEvent.h"
#include "ms/events/JoinRequestEvent.h"
#include "ms/events/JoinConfirmEvent.h"
#include "ms/events/JoinAbortedEvent.h"
#include "ms/events/JoinRejectedEvent.h"
#include "ms/events/JoinGroupDoneEvent.h"
#include "ms/events/PeerLeftEvent.h"
#include "mt/MessageTransfer.h"
#include "mt/events/PendingPeersEvent.h"
#include <sstream>
#if OMNETPP
#include "UDPControlInfo.h"
#endif
namespace ubeeme {
namespace moversight {
#undef DEBUG
#define DEBUG(msg) if (module.isPrintDebugMS()){ if(getLocalState()== DISJOINED){ MOV_DEBUG <<"MS@TA_"<<getLocalAddress()<<" "<<msg<<endl; } else{ MOV_DEBUG <<"MS@"<<getLocalID() <<" "<< msg <<endl; } }
/**
* @brief Creates a new MembershipService instance
* @param d A reference to the moversight dispatcher.
*/
MembershipService::MembershipService(Dispatcher & d) :
MoversightService(d, "MembershipService"),
mr(d),
lastmr(d),
myPeerID(UNDEFINED_PEER_ID),
myClusterID(UNDEFINED_CLUSTER_ID) {
}
/**
* @brief Destructor
*/
MembershipService::~MembershipService() {
}
/**
* @brief Initialise the membership service.
* This method initialise the membership service module, by setting up all needed values.
* The method is called by the dispatcher. After initialise, the membership service is ready
* to offer its services.
*/
void
MembershipService::initialise() {
dispatcher.subscribe<ConnectionLostEvent>(this);
dispatcher.subscribe<PendingPeersEvent>(this);
currentInvitationID = initCurrentInvitationID();
}
/**
* @brief This method finalise the membership service.
*/
void
MembershipService::finalise() {
dispatcher.unsubscribeAll(this);
stopAndDeleteAllTimers();
invitationList.clear();
mr.clear();
resetLocalPeer();
setLocalPeerState(DISJOINED);
setLocalSubState(NO_SUB_STATE);
}
/**
* @brief Invites a peer.
*
* The method invites a peer into the group. If the invitation successfully,
* the event signalJoinConfirm is created, otherwise the signalJoinAbort event.
* @param ta The transport address of the peer to invite.
* @param pDesc The peer description of the inviting peer.
*/
void
MembershipService::invitePeer(const TransportAddress & ta, const PeerDescription & pDesc) {
//check if peer not inviting already a member
if (getLocalState() != WAITING_FOR_ROSTER && isMember(ta)) {
std::string reason("the given transport address is already associated with a group member");
DEBUG("invitePeer - " << reason.c_str());
dispatcher.signal(new JoinAbortedEvent(ta, reason));
return;
}//End if
switch (getLocalState()) {
case WAITING_FOR_ROSTER:
{
std::string reason("currently joining a group, please try again later");
DEBUG("invitePeer - " << reason.c_str());
dispatcher.signal(new JoinAbortedEvent(ta, reason));
break;
}//End case
case DISJOINED:
{
//add the local peer as group member
//note: we continue with state INVITING !!!!
createLocalPeer(pDesc);
}//end case
case INVITING:
default:
{
//invite the peer
//create an invitation and remember that
Invitation inv(getNextInvitationID(), getLocalID(), getLocalAddress(), pDesc, ta);
invitationList.add(inv);
//create and send a join request to the peer to invite
sendJoinRequest(inv);
//start a timer, which monitors the response of the invitee
createAndStartJoinAbortToInviteeTimer(inv);
break;
}//end case
}//End switch
}//End
/**
* @brief Cancel the current inviting process.
* Calling this method performs a JoinAbort operation from the user to moversight.
* @param ta The transport address of the peer to invite.
* @param reason The cancel reason
*/
void
MembershipService::cancelInvitation(const TransportAddress & ta, const std::string & reason) {
try {
Invitation inv = invitationList.findByInviteeAddress(ta);
sendJoinAbort(inv, inv.getInviterAddress(), reason);
//cleanup
stopAndDeleteInvitationTimer(inv);
invitationList.remove(inv);
}//End try
catch (InvitationNotFoundException & e) {
std::string debugMsg;
debugMsg.append("cancelInvitation - ").append(e.toString());
DEBUG(debugMsg.c_str());
}//End catch
}//End
/**
* @brief Creates the local peer.
*
* This method is adds a peer to the group, which represents the local peer. It differs from the other peers, as the local peer is used
* by the different moversight modules to provide its service. Is the local peer not defined probably, the behavior of the moversight protocol is undefined.
* @note The method is called during the inviting process.
* @param pD the peer description of the local peer
*/
void
MembershipService::createLocalPeer(const PeerDescription & pD) {
PeerDescription dummyPeerDescription;
PeerResources dummyPeerResources = module.getPeerResources();
TransportAddress & localAddress = module.getLocalAddress();
PeerID pId = mr.createPeer(localAddress,
dummyPeerDescription,
dummyPeerResources);
setLocalPeer(pId);
setLocalPeerDescription(pD);
setLocalState(INVITING);
dispatcher.signal(new LocalPeerUpdatedEvent(getLocalPeer()));
}
/**
* @brief The method responses to a before received invitation.
* @param inv The received Invitation, which should be answered.
* @param ack True for accept, false otherwise.
* @param message The message for the inviting peer.
* @param pDesc The peer description of the local peer.
* @param resources The generic representation of the available peer resources.
*/
void
MembershipService::responseInvitation(const Invitation & inv,
const bool & ack,
const std::string & message,
const PeerDescription & pDesc,
const PeerResources & resources) {
if (invitationList.contains(inv)) {
//stop the timer and create the response
stopAndDeleteInvitationTimer(inv);
if (ack) {
//we accept the invitation, set new state
setLocalState(WAITING_FOR_ROSTER);
DEBUG("responseInvitation - send JoinConfirm");
sendJoinConfirm(inv, message, pDesc, resources);
createAndStartJoinAbortToInviterTimer(inv);
}//End if
else {
//send a JAB to the inviter and clean up
sendJoinAbort(inv, inv.getInviterAddress(), message);
DEBUG("responseInvitation - send JoinAbort");
invitationList.remove(inv);
}//End else
}//End if
else {
DEBUG("responseInvitation - invitation unknown");
std::string reason("invitation unknown - cancel joining (the invitation is probably outdated)");
dispatcher.signal(new JoinRejectedEvent(inv, reason));
}//End else
}
/**
* @brief Let the local peer leave the group.
*/
void
MembershipService::leaveGroup() {
if (getLocalState() == DISJOINED) {
DEBUG("leaveGroup - peer not within a group");
return;
}//End if
sendLeaveAnnounceMessage(getLocalID());
setLocalState(LEAVING);
dispatcher.signal(new LocalPeerUpdatedEvent(dispatcher.getLocalPeer()));
}
void
MembershipService::peerFailed(PeerID pId) {
//send PendingAnnounce to the group
}
/**
* @brief Handles a received join request message.
* @param msg The join request to handle.
*/
void
MembershipService::handleJoinRequest(JoinRequest* msg) {
#if OMNETPP
UDPDataIndication *ctrl = check_and_cast<UDPDataIndication *>(msg->getControlInfo());
TransportAddress srcAddr = ctrl->getSrcAddr();
TransportAddress destAddr = ctrl->getDestAddr();
srcAddr.setPort(ctrl->getSrcPort());
destAddr.setPort(ctrl->getDestPort());
Invitation inv(msg->getInvitationID(),
msg->getInviterID(),
srcAddr,
msg->getPeerDescription(),
destAddr);
#else
Invitation inv(msg->getInvitationID(),
msg->getInviterID(),
msg->getSource(),
msg->getPeerDescription(),
getLocalAddress());
#endif
DEBUG("handleJoinRequest - invitation received from " << inv.getInviterAddress());
switch (getLocalState()) {
case INVITING:
{
//we are currently creating a group
std::string reason("peer currently creating a group - please try again later");
sendJoinAbort(inv, msg->getSourceTA(), reason);
break;
}//end case
case WAITING_FOR_ROSTER:
{
//we are currently joining a group
//reject the join request
std::string reason("peer currently joining a group - reject invitation");
sendJoinAbort(inv, msg->getSourceTA(), reason);
break;
}//End case
case DISJOINED:
{
//we are not within a group
//remember the request and forward it to the application
DEBUG("handleJoinRequest - forward invitation to application");
invitationList.add(inv);
createAndStartJoinAbortToInviterTimer(inv);
dispatcher.signal(new JoinRequestEvent(inv));
break;
}//End case
case JOINED:
default:
{
//the inviting peer is in conference, merge???
std::string reason("group merge not implemented yet");
sendJoinAbort(inv, msg->getSourceTA(), reason);
break;
}//End case
}//End switch
}//End
/**
* @brief Handles a received join confirm message.
* @param msg The join confirm message to handle.
*/
void
MembershipService::handleJoinConfirm(JoinConfirm * msg) {
try {
const Invitation & inv = invitationList.find(msg->getInvitationID());
//we have find the corresponding invitation, thus stop the timer
stopAndDeleteInvitationTimer(inv);
#if UBEEME
// updated my ta with the address seen by the invitee
Peer & peer = getLocalPeer();
if (peer.getLocalAddress() != msg->getInviterTa()) {
peer.setLocalAddress(msg->getInviterTa());
}
#endif
//signal accepted join to the user (JOINconfirm)
std::string confMsg = msg->getJcMessage();
dispatcher.signal(new JoinConfirmEvent(inv, msg->getPeerDescription(), confMsg));
//update peer state
if (getLocalState() == INVITING) {
dispatcher.signal(new GroupCreatedEvent());
setLocalState(JOINED);
//add the new peer to the group
TransportAddress inviteeAddress = inv.getInviteeAddress();
PeerID pId = mr.createPeer(inviteeAddress,
msg->getPeerDescription(),
msg->getPeerResources());
//send roster message to the joined peer
RosterMessage roster = MSMessageFactory::createRosterMessage(*this, pId);
sendTo(roster, inviteeAddress);
DEBUG("handleJoinConfirm - peer " << pId << " joined successfully to group");
dispatcher.signal(new PeerJoinedEvent(pId, inv.getInviteeAddress(), msg->getPeerDescription()));
DEBUG("handleJoinConfirm - local peer joined successfully to the group");
dispatcher.signal(new JoinGroupDoneEvent(dispatcher.getLocalPeer()));
DEBUG("handleJoinConfirm - member register after peer joined");
mr.printMemberRegister();
//remove the invitation
invitationList.remove(inv);
}//End if
else {
//signal JA to group, the roster message is send later (see handleJoinAnnounce)
sendJoinAnnounce(inv, msg->getPeerDescription(), msg->getPeerResources());
//start a join announce timer to ensure the proper completion
//of the join process
createAndStartJoinAnnounceTimer(inv, msg->getPeerDescription(), msg->getPeerResources());
}//End else
}//End try
catch (InvitationNotFoundException & /* e */) {
DEBUG("handleJoinConfirm - no invitation for the given JC message present - drop message");
}//End catch
}
/**
* @brief Handles a received roster message.
*
* If the local peer is waiting for a roster, the method will init the group from this message, determine which of the peers is the local peer
* and will finish announcing an signalGroupCreated event. Otherwise, the message is dropped.
* @param msg The roster message to handle.
*/
void
MembershipService::handleRosterMessage(RosterMessage* msg) {
if (getLocalState() == WAITING_FOR_ROSTER) {
Roster roster;
roster.setNextPeerID(msg->getNextPeerID());
roster.setViewID(msg->getViewID());
dispatcher.signal(new GroupCreatedEvent());
MemberDescriptionList mDescList = msg->getMemberDescriptionList();
for (size_t i = 0; i < mDescList.size(); i++) {
MemberDescription mDesc = mDescList.get(i);
roster.addMemberDescription(mDesc);
if (mDesc.getPeerID() != msg->getJoinedPeerID()) {
PeerID pId = mDesc.getPeerID();
TransportAddress ta = mDesc.getTransportAddress();
PeerDescription pDesc = mDesc.getPeerDescription();
dispatcher.signal(new PeerJoinedEvent(pId, ta, pDesc));
}//End if
}//End for
DEBUG("handleRosterMessage - init group from roster");
mr.initMemberRegisterFromRoster(roster);
//finish group join
setLocalPeer(msg->getJoinedPeerID());
DEBUG("handleRosterMessage - join group successfully done");
dispatcher.signal(new JoinGroupDoneEvent(dispatcher.getLocalPeer()));
//clean up
try {
const Invitation & inv = invitationList.findByInviteeAddress(getLocalAddress());
stopAndDeleteInvitationTimer(inv);
invitationList.remove(inv);
}//End try
catch (InvitationNotFoundException & /* e */) {
}
}//End if
else {
DEBUG("handleRosterMessage - receive unexpected roster, drop message");
}
}//End
/**
* @brief Handles an incoming JAB messages
* @param msg The JAB message to handle.
*/
void
MembershipService::handleJoinAbort(JoinAbort* msg) {
try {
if (isSetDisjoinedAllowed()) {
setLocalState(DISJOINED);
}
const Invitation & inv = invitationList.find(msg->getInvitationID());
//signal rejected join to user
std::string rejectMsg = msg->getJabMessage();
dispatcher.signal(new JoinRejectedEvent(inv, rejectMsg));
//cleanup
stopAndDeleteInvitationTimer(inv);
invitationList.remove(inv);
}//end try
catch (InvitationNotFoundException & /* e */) {
DEBUG("handleJoinAbort - no invitation for the given JAB message present");
}//End catch
}//End
/**
* @brief This method handles a received join announce message by adding the joined peer to the local peer list.
* @param msg The join announce message to handle.
*/
void
MembershipService::handleJoinAnnounce(JoinAnnounce & msg) {
TransportAddress inviteeAddress = msg.getJoinedPeerTA();
DEBUG("handleJoinAnnounceMessage - member register before peer joined");
mr.printMemberRegister();
//add the peer to the group
PeerID pId = mr.createPeer(inviteeAddress, msg.getPeerDescription(), msg.getPeerResources());
//I'm the sender of the message?
if (msg.getSourceID() == getLocalID()) {
//send roster message to the joined peer
RosterMessage roster = MSMessageFactory::createRosterMessage(*this, pId);
sendTo( roster, inviteeAddress);
try {
Invitation inv = invitationList.findByInviteeAddress(inviteeAddress);
stopAndDeleteInvitationTimer(inv);
invitationList.remove(inv);
}//End try
catch (InvitationNotFoundException & e) {
std::stringstream buf;
buf << "handleJoinAnnounceMessage - " << e.what();
DEBUG(buf.str().c_str());
}//End catch
}//End if
DEBUG("handleJoinAnnounceMessage - peer " << pId << " joined successfully to group (view: " << msg.getViewID() << ")");
dispatcher.signal(new PeerJoinedEvent(pId, inviteeAddress, msg.getPeerDescription()));
DEBUG("handleJoinAnnounceMessage - member register after peer joined");
mr.printMemberRegister();
}
/**
* @brief Handles a leave announce message.
* @param msg The received leave message.
*/
void
MembershipService::handleLeaveAnnounce(LeaveAnnounce & msg) {
DEBUG("handleLeaveAnnounce");
PeerIDList leaveList = msg.getLeaveList();
//running timer?
stopAndDeleteLeaveAnnounceTimer(msg.getReferenceTime());
//we are leaving the group??
if (leaveList.contains(getLocalID())) {
closeGroup();
}//End if
else {
removeLeftPeersFromTimerQueues(leaveList);
for (size_t i = 0; i < leaveList.size(); i++) {
removePeer(leaveList.get(i));
}//End for
}//End else
}
/**
* @brief Handles a PRA message, received by the host.
* @param pra The peer reconnect announce message to handle.
*/
void
MembershipService::handlePeerReconnectAnnounce(PeerReconnectAnnounce & pra) {
PeerID reconnectedPeerID = pra.getReconnectedPeerID();
getPeer(reconnectedPeerID).setState(JOINED);
// dispatcher.getNetworkFailureDetector().peerReconnected(reconnectedPeerID);
std::cerr<<"handle peer reconnected announce\n";
}
/**
* @brief This method handles a list of peers suspected as pending by
* the message transfer.
*
* The message handles the given peers by setting its state to pending.
* If the given peer not a member of the group, the peer is removed from
* the list. A timer is started to monitor the rejoining of the pending
* peers. If the timer expires, the peers will be excluded.
* @param pList The list of the peers, suspected as pending.
*/
void
MembershipService::setPeersPending(const PeerIDList & pList) {
PeerIDList tempList;
//mark peers as pending
for (size_t i = 0; i < pList.size(); i++) {
if (contains(pList.get(i)) &&
getPeer(pList.get(i)).getState() != PENDING) {
getPeer(pList.get(i)).setState(PENDING);
tempList.add(pList.get(i));
}
}
if (tempList.size() > 0) {
//start the pending timer
createAndStartLeaveAnnounceTimer(tempList);
}//End if
}
/**
* @brief Handles a fired joinAbortToInviterTimer.
* @param timer The timer to handle.
*/
void
MembershipService::handleJoinAbortToInviterTimer(JoinAbortToInviterTimer * timer) {
Invitation inv = timer->getReference();
std::string reason("invitation outdated (on the side of the invitee)");
sendJoinAbort(inv, inv.getInviterAddress(), reason);
dispatcher.signal(new JoinRejectedEvent(inv, reason));
if (isSetDisjoinedAllowed()) {
setLocalState(DISJOINED);
}
else if (isGroupClosePossible()) {
closeGroup();
}//End else
stopAndDeleteInvitationTimer(inv);
invitationList.remove(inv);
}
/**
* @brief Handles an outdating joinAbortToInviteeTimer.
* @param timer The timer to handle.
*/
void
MembershipService::handleJoinAbortToInviteeTimer(JoinAbortToInviteeTimer * timer) {
Invitation inv = timer->getReference();
std::string reason("invitation outdated");
sendJoinAbort(inv, inv.getInviteeAddress(), reason);
dispatcher.signal(new JoinRejectedEvent(inv, reason));
if (isSetDisjoinedAllowed()) {
resetLocalPeer();
}
stopAndDeleteInvitationTimer(inv);
invitationList.remove(inv);
}
void
MembershipService::handleJoinAnnounceTimer(JoinAnnounceTimer * timer) {
Invitation inv = timer->getReference();
if (timer->getNumberOfRetries() > 0) {
//signal JA again to the group
DEBUG("handleJoinAnnounceTimer - send join announce message to group again");
sendJoinAnnounce(inv, timer->getInviteePeerDescription(), timer->getPeerResources());
//test is the timer still valid
//in case of a single member group, the timer become invalid
//during the sendJoinAnnounce method (in particular: the mt.send
//deliver the JA directly, which results in a nested call of the
//membership service handle method)
InvitationTimer * t = invitationTimerQueue.find(inv);
if (t != NULL) {
timer->restart();
}
}
else {
//cancel invitation, because we are not able to emit the JA
std::string reason("unable to publish invitation within the group, give up");
sendJoinAbort(inv, inv.getInviteeAddress(), reason);
dispatcher.signal(new JoinRejectedEvent(inv, reason));
//cleanup timer
stopAndDeleteInvitationTimer(inv);
invitationList.remove(inv);
}
}
/**
* @brief Creates and sends a join request to peer, identified by the given invitation.
* @param inv The invitation for the desired peer.
*/
void
MembershipService::sendJoinRequest(const Invitation & inv) {
JoinRequest joinReq = MSMessageFactory::createJoinRequestMessage(*this, inv.getInvitationID(), inv.getInviterPeerDescription());
DEBUG("sendJoinRequestToPeer - send invitation to peer at address " << inv.getInviteeAddress());
sendTo( joinReq, inv.getInviteeAddress());
}
/**
* @brief Sends a join confirm message to a dedicated peer.
* @param inv The corresponding invitation, used to create the join confirm.
* @param message The confirm message from the user / application.
* @param pDesc The peer description of the joined peer.
* @param resources The generic representation of the available peer resources.
*/
void
MembershipService::sendJoinConfirm(const Invitation & inv, const std::string & message, const PeerDescription & pDesc, const PeerResources & resources) {
JoinConfirm msg = MSMessageFactory::createJoinConfirmMessage(*this, inv, message, pDesc, resources);
DEBUG("sendJoinConfirm - " << message);
sendTo( msg, inv.getInviterAddress());
}
/**
* @brief Sends a join abort message to the dedicated peer.
* @param inv The corresponding invitation.
* @param destinationTa The transport address of the dedicated peer.
* @param jaMessage The reason for the join abort.
*/
void
MembershipService::sendJoinAbort(const Invitation & inv, const TransportAddress & destinationTa, const std::string & jaMessage) {
JoinAbort msg = MSMessageFactory::createJoinAbortMessage(*this, inv, jaMessage);
DEBUG("sendJoinAbortToPeer - " << jaMessage);
sendTo( msg, destinationTa);
}
/**
* @brief This method sends a join announce message to the peer, identified by the given invitation.
* @param inv The invitation corresponding to the joining peer.
* @param pDesc The peer description of the joining peer.
* @param resources The resource value of the new peer
*/
void
MembershipService::sendJoinAnnounce(const Invitation & inv, const PeerDescription & pDesc, const PeerResources & resources) {
DEBUG("sendJoinAnnounce - send JoinAnnounce to group");
JoinAnnounce joinAnnounce = MSMessageFactory::createJoinAnnounceMessage(inv.getInviteeAddress(), pDesc, resources);
dispatcher.sendMessage(joinAnnounce);
}
/**
* @brief The method sends a leave announce message to the group. This method
* is used in cases, where a peer leaves the group by itself.
* @param leavingPeer The peers that leaf the group.
*/
void
MembershipService::sendLeaveAnnounceMessage(const PeerID & leavingPeer) {
PeerIDList leavingPeers;
leavingPeers.add(leavingPeer);
sendLeaveAnnounceMessage(leavingPeers, getCurrentLogicalTime());
}
/**
* @brief The method sends a leave announce message to the group.
* @param leavingPeers The list of peers that leaf the group.
* @param referenceTime The time of the leave event.
*/
void
MembershipService::sendLeaveAnnounceMessage(const PeerIDList & leavingPeers, const VirtualLogicalTime & referenceTime) {
LeaveAnnounce leaveAnnounce = MSMessageFactory::createLeaveAnnounceMessage(leavingPeers, referenceTime);
dispatcher.sendMessage(leaveAnnounce);
}
/**
* @brief This method handles an expired leave announce timer. On expiring
* it creates a leave announce message for the peers given with the timer
* and disseminates these to the group. If the number of max retries
* reached the timer is deleted, without any further operations.
* @param timer The timer to handle.
*/
void
MembershipService::handleLeaveAnnounceTimer(LeaveAnnounceTimer * timer) {
DEBUG("handleLeaveAnnounceTimer");
//only master peers have to manage the leave operation
if (isLocalPeerMaster()) {
//the timer is expired
//the leave announce message is not yet send or not successfully
//sent to the group
//check if retry possible
if (timer->getNumberOfRetries() > 0) {
sendLeaveAnnounceMessage(timer->getPeerIDList(), timer->getReference());
timer->restart();
return;
}//End if
else {
std::stringstream buf;
buf << "handleLeaveAnnounceTimer - send leave announce " << timer->getReference() << " failed ";
DEBUG(buf.str());
}//End else
}//end if
//clean up
PeerIDList peersToRemove = timer->getPeerIDList();
stopAndDeleteLeaveAnnounceTimer(timer->getReference());
removeLeftPeersFromTimerQueues(peersToRemove);
}
/**
* @brief Returns the local peer ID.
* @return The local peer ID.
*/
PeerID
MembershipService::getLocalID() const {
return getLocalPeer().getPeerID();
}
/**
* @brief Returns the local cluster ID.
* @return The local cluster ID.
*/
ClusterID
MembershipService::getLocalClusterID() const {
return getLocalPeer().getClusterID();
}//End
/**
* @brief Defines, which peer within the MemberRegister is the local peer.
* @param pId The peer to select as local peer.
* @throw PeerNotFoundException If the given peer not member of the current MemberRegister.
*/
void
MembershipService::setLocalPeer(PeerID pId) {
if (mr.contains(pId)) {
Peer & p = getPeer(pId);
myPeerID = pId;
myClusterID = p.getClusterID();
p.setState(JOINED);
//restore dummy peer state
dummyPeer.setState(DISJOINED);
}//End if
else {
throw PeerNotFoundException("selected peer is not member of the current view");
}//End else
}
/**
* @brief Returns the local peer from the MemberRegister. If the peer not member of a MemberRegister, the method will return a dummy peer, which is disconnected and disjoined.
*
* @note The local peer have to be selected by the setLocalPeer method first.
* @return The local peer.
*/
Peer &
MembershipService::getLocalPeer() {
if (myPeerID == UNDEFINED_PEER_ID) {
return dummyPeer;
}//End if
Cluster & cluster = mr.getCluster(myClusterID);
return cluster.getPeer(myPeerID);
}//End
/**
* @brief Returns the local peer from the MemberRegister. If the peer not member of a MemberRegister, the method will return a dummy peer, which is disconnected and disjoined.
*
* @note The local peer have to be selected by the setLocalPeer method first.
* @return The local peer.
*/
const Peer &
MembershipService::getLocalPeer() const {
if (myPeerID == UNDEFINED_PEER_ID) {
return dummyPeer;
}//End if
const Cluster & cluster = mr.getCluster(myClusterID);
return cluster.getPeer(myPeerID);
}//End
/**
* @brief Resets the local peer settings.
*
* This method resets the local peer settings. Afterwards calls to getLocalPeer()
* or similar methods will be answered with a dummy peer.
*/
void
MembershipService::resetLocalPeer() {
//the local peer is no longer member of the MembershipService
myPeerID = UNDEFINED_PEER_ID;
myClusterID = UNDEFINED_CLUSTER_ID;
dummyPeer.setState(DISJOINED);
mr.clear();
}
/**
* @brief Returns the local address.
* @return The local address.
*/
const TransportAddress &
MembershipService::getLocalAddress() const {
return module.getLocalAddress();
}
/**
* @brief Returns the state of the local peer.
* @return The state of the local peer.
*/
PeerState &
MembershipService::getLocalPeerState() {
return getLocalPeer().getPeerState();
}
/**
* @brief Sets the peer state of the local peer.
* @param ps The peer state to set.
*/
void
MembershipService::setLocalPeerState(const PeerState & ps) {
getLocalPeer().setPeerState(ps);
}
/**
* @brief Sets the state part of the peer state object of the local peer.
* @param s The state to set.
*/
void
MembershipService::setLocalState(const State & s) {
getLocalPeer().setState(s);
}
/**
* @brief Returns a peer from the current group.
*
* The method searches within group for the given peer and returns them. Is the peer not found, a PeerNotFoundException is thrown.
* @throw PeerNotFoundException Is the desired peer not member of the group.
* @param pId The peer ID of the desired peer.
* @return The desired peer.
*/
Peer &
MembershipService::getPeer(const PeerID pId) {
return mr.getPeer(pId);
}
/**
* @brief Returns a peer from the current group.
*
* The method searches within group for the given peer and returns them. Is the peer not found, a PeerNotFoundException is thrown.
* @throw PeerNotFoundException Is the desired peer not member of the group.
* @param pId The peer ID of the desired peer.
* @return The desired peer.
*/
const Peer &
MembershipService::getPeer(const PeerID pId) const {
return mr.getPeer(pId);
}
/**
* @brief Returns the number of peers, associated with this group.
* @return The number of peers associated with the current group.
*/
size_t
MembershipService::getNumberOfPeers() const {
return mr.getNumberOfPeers();
}
/**
* @brief Returns the number of clusters, associated with this group.
* @return The number of clusters associated with the current group.
*/
size_t
MembershipService::getNumberOfClusters() const {
return mr.getNumberOfCluster();
}
/**
* @brief Returns a list of peer IDs of all peers within the cluster of the given peer.
* @param pId The peer id of a peer within the cluster to retrieve all members.
* @return The desired peer ID list.
* @throw PeerNotFoundException If the given peer ID not found within the group.
*/
PeerIDList
MembershipService::getClusterPeerIDList(const PeerID & pId) const {
return mr.getClusterPeerIDList(getPeer(pId).getClusterID());
}
/**
* @brief Returns a list of peer IDs of all slave-peers within a given clusters (the master address is not included!).
* @param cID The source cluster id.
* @return The desired peer ID list.
* @throw ClusterNotFoundException If the source cluster not found within the group.
*/
PeerIDList
MembershipService::getClusterPeerIDListSlavesOnly(const ClusterID cID) const {
PeerIDList list = mr.getClusterPeerIDList(cID);
list.remove(mr.getCluster(cID).getMasterID());
return list;
}
/**
* @brief Returns a list of all peers, within the group.
* @note This list is only for reading purpose. To edit the peers, access the appropriated MembershipService functions directly.
* @return The list of peers, associated with the current group.
*/
const PeerList
MembershipService::getPeerList() const {
return mr.getPeerList();
}
/**
* @brief Returns a list of all peer ID's, within the group.
* @note This list is only for reading purpose. To edit the peers, access the appropriated MembershipService functions directly.
* @return The list of peer ID's, associated with the current group.
*/
const PeerIDList
MembershipService::getPeerIDList() {
return mr.getPeerIDList();
}
/**
* @brief Returns a list of peer IDs of all masters within the group.
* @return The desired peer ID list.
*/
const PeerIDList
MembershipService::getMasterPeerIDList() {
return mr.getMasterPeerIDList();
}
/**
* @brief Returns a list of slave and master peer IDs.
*
* Returns a list with IDs of peers which are members of the same cluster as the given peer or from foreign clusters, which act as masters.
* The resulting list does not contain the peer itself.
* @note The peer itself is not member of the created list.
* @param self The peer to determine the desired cluster.
* @return The desired list
*/
PeerIDList
MembershipService::getClusterAndMasterPeerIDList(const Peer & self) {
//my cluster without me
PeerIDList list = mr.getClusterPeerIDList(self.getClusterID());
list.remove(self.getPeerID());
//all master IDs
PeerIDList masterList = getMasterPeerIDList();
//merge lists
list.unify(masterList);
//remove me
list.remove(self.getPeerID());
return list;
}
/**
* Returns the ClusterID in which the peer is located.
* @param pID The peer desired to find.
* @return The ClusterID of the peers cluster.
*/
ClusterID
MembershipService::findPeer( const PeerID & pID) {
return mr.findPeer(pID);
}
/**
* @brief Determines, if a peer is member of the group or not.
* @param ta The transport address of the desired peer.
* @return True, if the desired peer is a member of the group, otherwise false.
*/
bool
MembershipService::contains( const TransportAddress& ta) {
return mr.contains(ta);
}
/**
* @brief Determines, if a peer member of the group or not.
* @param pID The peer ID of the desired peer.
* @return True, if the desired peer element of the group, otherwise false.
*/
bool
MembershipService::contains(PeerID pID) {
return mr.contains(pID);
}
/**
* @brief Determines, if a peer member of the group or not.
* @param peer The desired peer.
* @return True, if the desired peer element of the group, otherwise false.
*/
bool
MembershipService::contains(Peer & peer) {
return contains(peer.getPeerID());
}
/**
* @brief Returns, if the local peer a master peer (acts as master within the group).
* @return True, if the local peer a master, false otherwise.
*/
bool
MembershipService::isLocalPeerMaster() const {
return getLocalPeer().isMaster();
}
/**
* @brief Checks, if the current peer master of the given peer.
* @param pId The id of the peer to check.
* @return True, if the current peer master of the given peer, false otherwise.
*/
bool
MembershipService::isMasterOf(const PeerID pId) const {
if (isLocalPeerMaster()) {
PeerIDList list = getClusterPeerIDList(getLocalID());
return list.contains(pId);
}
return false;
}
/**
* @brief Sets the peer description of the local peer.
* @param pDesc The peer description to set.
*/
void
MembershipService::setLocalPeerDescription(const PeerDescription & pDesc) {
getLocalPeer().setPeerDescription(pDesc);
}
/**
* @brief Removes a peer from the current view.
*
* This method removes the peer, identified by the given id, from the current view.
* If the given peer id not a member of group, the group keeps unchanged.
* @note Leads the removing of a peer to a situation whereas the call of the method
* isGroupClosePossible is evaluated to true, the group will be closed immediately. In this
* case, an event is emitted.
* @note The local peer might be the new master of a cluster, was the removed peer the master
* of its cluster and the peer placing strategy determines the local peer as new master.
* In this case, an event is emitted.
* @param pId The id of the peer to remove.
*/
void
MembershipService::removePeer(const PeerID & pId) {
if (contains(pId)) {
DEBUG("removePeer - remove peer " << pId << " from group");
PeerID oldMasterID = getLocalMasterID();
Peer peerToRemove = mr.getPeer(pId);
//remove the peer from the group
mr.removePeer(pId);
if (module.isPrintDebugMS()) {
mr.printMemberRegister();
}//End if
dispatcher.signal(new PeerLeftEvent(peerToRemove));
//we are the new master?
if (oldMasterID != getLocalMasterID()) {
if(isLocalPeerMaster()){
dispatcher.signal(new LocalPeerUpdatedEvent(dispatcher.getLocalPeer()));
}//End if
}//End if
//should the group be closed?
if (isGroupClosePossible()) {
closeGroup();
}//End if
}//End if
}
/**
* @brief Creates and starts a joinAbortToInvitee timer.
* @param invitation The invitation, associated with the timer to start.
*/
void
MembershipService::createAndStartJoinAbortToInviteeTimer(const Invitation & invitation) {
JoinAbortToInviteeTimer * timer = new JoinAbortToInviteeTimer(*this);
timer->setReference(invitation);
timer->start();
invitationTimerQueue.add(timer);
}
/**
* @brief Creates and starts a joinAbortToInviter timer.
* @param invitation The invitation, associated with the timer to start.
*/
void
MembershipService::createAndStartJoinAbortToInviterTimer(const Invitation & invitation) {
JoinAbortToInviterTimer * timer = new JoinAbortToInviterTimer(*this);
timer->setReference(invitation);
if (getLocalState() == DISJOINED) {
GenericTime t(JOIN_ABORT_TO_INVITER_TIMEOUT);
timer->setTimeout(t);
}//End if
else {
GenericTime t(JOIN_ABORT_TO_INVITER_TIMEOUT_MISSING_ROSTER);
timer->setTimeout(t);
}//End else
timer->start();
invitationTimerQueue.add(timer);
}
/**
* @brief Creates and starts a join announce timer.
* @param invitation The invitation, associated with the timer to start.
* @param pDesc The peer description of the peer to announce.
* @param resources The resources of the peer to announce.
*/
void
MembershipService::createAndStartJoinAnnounceTimer(const Invitation & invitation, const PeerDescription & pDesc, const PeerResources & resources) {
JoinAnnounceTimer * timer = new JoinAnnounceTimer(*this);
timer->setReference(invitation);
timer->setInviteePeerDescription(pDesc);
timer->setPeerResources(resources);
timer->start();
invitationTimerQueue.add(timer);
}
void
MembershipService::createAndStartLeaveAnnounceTimer(const PeerIDList & leavingPeerIdList) {
DEBUG("createAndStartLeaveAnnounceTimer - start leave timer for " << leavingPeerIdList.size() << " peer(s)")
LeaveAnnounceTimer * timer = new LeaveAnnounceTimer(*this);
timer->setPeerIDList(leavingPeerIdList);
timer->setReference(getCurrentLogicalTime());
timer->start();
timerQueue.add(timer);
}
/**
* @brief Stop running invitation timer (to invitee or inviter)
* @param inv The corresponding invitation, identifying the timer.
*/
void
MembershipService::stopAndDeleteInvitationTimer(const Invitation & inv) {
InvitationTimer * timer = invitationTimerQueue.find(inv);
if (timer != NULL) {
timer->stop();
delete timer;
invitationTimerQueue.remove(inv);
}//End if
else {
DEBUG("stopAndDeleteInvitationTimer - timer not found");
}//End else
}
/**
* @brief Stops and deletes a leave announce timer, identified by a reference.
* @param reference The reference to identify the timer.
*/
void
MembershipService::stopAndDeleteLeaveAnnounceTimer(const VirtualLogicalTime & reference) {
LeaveAnnounceTimer * timer = timerQueue.find(reference);
if (timer != NULL) {
timer->stop();
delete timer;
timerQueue.remove(reference);
}//End if
}
/**
* @brief This method stops and delete all running timers within the module.
*/
void
MembershipService::stopAndDeleteAllTimers() {
{
//invitation timer
InvitationList keys = invitationTimerQueue.getKeyList();
for (size_t i = 0; i < keys.size(); i++) {
stopAndDeleteInvitationTimer(keys.get(i));
}//end for
invitationTimerQueue.clear();
}
{
//leave timer
List<VirtualLogicalTime> keys;
keys = timerQueue.getKeyList();
for (size_t i = 0; i < keys.size(); i++) {
stopAndDeleteLeaveAnnounceTimer(keys.get(i));
}//end for
timerQueue.clear();
}
}
/**
* @brief Removes the given peers from all timers that monitor peers.
* @param leftPeers The peers to remove from the timers.
*/
void
MembershipService::removeLeftPeersFromTimerQueues(const PeerIDList & leftPeers) {
std::vector<VirtualLogicalTime> deleteCandidates;
for (size_t i = 0; i < timerQueue.size(); i++) {
LeaveAnnounceTimer * timer = timerQueue.get(i);
if (timer != NULL) {
PeerIDList peerList = timer->getPeerIDList();
peerList.remove(leftPeers);
if (peerList.size() <= 0) {
deleteCandidates.push_back(timer->getReference());
}//End if
}//End if
}//End for
//delete the time, which does not have any further monitored peers
std::vector<VirtualLogicalTime>::iterator it = deleteCandidates.begin();
while (it != deleteCandidates.end()) {
stopAndDeleteLeaveAnnounceTimer(*it);
it++;
}//End while
}
/**
* @brief Creates a new invitation ID.
* @return The created invitation ID.
*/
InvitationID
MembershipService::getNextInvitationID() {
return currentInvitationID++;
}//End
/**
* @brief Determines the initial invitation ID.
*
* The initial invitation ID is determined by the integer value of the local IP seeded with a 32bit random value
* @return The initial invitation ID.
*/
InvitationID
MembershipService::initCurrentInvitationID() {
#if OMNETPP
return (getLocalAddress().toIntValue() + (rand() % 0xFFFFFFFF));
#else
TransportAddress &addr = getLocalAddress();
return (addr.address.toIPv4Address() + (rand() % 0xFFFFFFFF));
#endif
}
/**
* @brief Checks, is it possible to close the group.
* We can close the group, if the number of peers within the group <= 1 and no invitation pending.
* @return True, if we can close the group, false otherwise.
*/
bool
MembershipService::isGroupClosePossible() const {
if (getNumberOfPeers() <= 1 &&
invitationList.size() <= 0) {
return true;
}//End if
return false;
}
/**
* @brief Checks, if it allowed to set the peer state to DISJOINED.
* @return True, if the current peer state WAITING_FOR_ROSTER or (INVITING and invitationList.size() <= 1), false otherwise.
*/
bool
MembershipService::isSetDisjoinedAllowed() const {
if (getLocalState() == WAITING_FOR_ROSTER) {
return true;
}
if (getLocalState() == INVITING &&
invitationList.size() <= 1) {
return true;
}
return false;
}
/**
* @brief This method setup a group from a given roster. This method is
* only assumed as debug method.
* @param roster The roster to setup the group.
* @param localPeerID The peer ID of the local peer to setup.
* @throws LogicException Thrown, if the given parameter not setup probably.
*/
void
MembershipService::setupGroupFromRoster(Roster & roster, PeerID localPeerID) {
dispatcher.signal(new GroupCreatedEvent());
DEBUG("setupGroupFromRoster - init group from roster");
if (getLocalSubState() == REJOIN_IN_PROGRESS) {
mr.initMemberRegisterAfterRejoin(roster);
}
else {
mr.initMemberRegisterFromRoster(roster);
}
if (mr.contains(localPeerID)) {
setLocalPeer(localPeerID);
dispatcher.signal(new LocalPeerUpdatedEvent(dispatcher.getLocalPeer()));
}
else {
throw LogicException("setupGroupFromRoster - local peer to setup not found within the group");
}
}
/**
* @brief Create a Roster of the current MemberRegister
* @return ro The group created of the current MemberRegister
*/
Roster
MembershipService::getRosterFromGroup() {
return mr.createRoster();
}
/**
* @brief Return the current MemberRegister as const.
* @return The current MemberRegister.
*/
MemberRegister &
MembershipService::getCurrentMemberRegister() {
return mr;
}
/**
* @brief Return the current MemberRegister as const.
* @return The current MemberRegister.
*/
MemberRegister
const &
MembershipService::getCurrentMemberRegister() const {
return mr;
}
/**
* @brief Returns the last MemberRegister with the old View.
* @return The last MemberRegister.
*/
MemberRegister
const &
MembershipService::getLastMemberRegister() const {
return lastmr;
}
/**
* @brief Returns the last memberRegister with the old view.
* @return the last MemberRegister.
*/
MemberRegister &
MembershipService::getLastMemberRegister() {
return lastmr;
}
/**
* @brief Save the current MemberRegister
*/
void
MembershipService::saveLastMemberRegister() {
lastmr = mr;
}
/**
* @brief Saves a given MemberRegister.
* @param oldMR the MR to save
*/
void
MembershipService::saveLastMemberRegister(MemberRegister oldMR) {
lastmr = oldMR;
}
/**
* @brief Creates a new MemberRegister (MR) by given MemberRegister and updates local PeerID.
* @param memReg The new MR which overrides the old MR. The old MR will be saved before.
* @param newPeerID The new PeerID which updates the local PeerID
*/
void
MembershipService::setupGroupFromMemberRegister(MemberRegister & memReg, PeerID newPeerID) {
//set the member register
mr = memReg;
//set new peerID
setLocalPeer(newPeerID);
}
/**
* @brief Assignment operator.
* @param other The instance to assign.
* @return A reference of the current object.
*/
MembershipService &
MembershipService::operator =(const MembershipService & other) {
if (this == &other) {
return *this;
}
dispatcher = other.dispatcher;
module = other.module;
currentInvitationID = other.currentInvitationID;
mr = other.mr;
lastmr = other.lastmr;
invitationList = other.invitationList;
invitationTimerQueue = other.invitationTimerQueue;
timerQueue = other.timerQueue;
myPeerID = other.myPeerID;
myClusterID = other.myClusterID;
dummyPeer = other.dummyPeer;
return *this;
}
/**
* @brief Checks, if the given transport address associated with a member within the group.
* @param ta The transport address to check.
* @return True, if the given transport address associated with a member of the group, false otherwise.
*/
bool
MembershipService::isMember(const TransportAddress & ta) const {
PeerList peerList = getPeerList();
for (size_t i = 0; i < peerList.size(); i++) {
if (peerList.get(i).getLocalAddress() == ta) {
return true;
}//End if
}//End for
return false;
}
/**
* @brief Moves a peer from a source cluster to a destination cluster.
* @param pId The peer ID of the peer to move.
* @param destClusterId The cluster ID of the destination cluster.
* @throw throw PeerPlaceException If the move operation not possible.
*/
void
MembershipService::movePeer(PeerID pId, ClusterID destClusterId) {
mr.movePeer(pId, destClusterId);
//remember the new location of the local peer
if (pId == myPeerID) {
myClusterID = destClusterId;
}//End if
}
/**
* @brief Moves a peer from a source cluster to a destination cluster.
* @param p The peer to move.
* @param destClusterId The cluster ID of the destination cluster.
* @throw throw PeerPlaceException If the move operation not possible.
*/
void
MembershipService::movePeer(Peer & p, ClusterID destClusterId) {
movePeer(p.getPeerID(), destClusterId);
}
/**
* @brief Returns the current view ID.
* @return The desired view ID.
*/
ViewID
MembershipService::getViewID() const {
return mr.getViewID();
}
/**
* @brief Closes the local group.
*
* This method closes the local group by reseting the local peer and signaling
* the group close to the dispatcher.
*/
void
MembershipService::closeGroup() {
resetLocalPeer();
dispatcher.signal(new GroupClosedEvent());
}
/**
* @brief Returns the view ID of the last operation.
* @return The desired view ID.
*/
ViewID
MembershipService::getLastViewID() const {
return lastmr.getViewID();
}
/**
* @brief Placing the rejoining peer back in the current MR. Trying to add it to its old position.
* @param peer The rejoining peer to place.
* @param originalMR The saved original group.
*/
void
MembershipService::placeRejoiningPeer(Peer & peer, MemberRegister & originalMR) {
DEBUG("placeRejoiningPeer -place rejoining peer in cluster");
MemberRegister & curMR = getCurrentMemberRegister();
if (originalMR.getNumberOfPeers() != 0) {
ClusterID oldCID = originalMR.getPeer(peer.getPeerID()).getClusterID();
ClusterList cList = getCurrentMemberRegister().getClusters();
if (cList.contains(oldCID)) {
if (curMR.getCluster(oldCID).size() < curMR.getMaxPeerCount()) {
curMR.addPeer(peer, oldCID);
}
else {
curMR.placePeer(peer);
}
}
else {
curMR.addPeer(peer, oldCID);
}
}
else {
curMR.placePeer(peer);
}
}
/**
* @brief Handle ConnectionLostEvent.
* @param e The event.
*/
void
MembershipService::handleEvent(const ConnectionLostEvent & e) {
setLocalState(DISCONNECTED);
dispatcher.signal(new LocalPeerUpdatedEvent(dispatcher.getLocalPeer()));
}
/**
* @brief Handle PendingPeersEvent.
* @param e The event.
*/
void
MembershipService::handleEvent( const PendingPeersEvent & e) {
setPeersPending(e.getPeerIDList());
}
/**
* @brief Sets the local peer resources object.
* @param pres The peer resources object to set.
*/
void
MembershipService::setLocalPeerResources(const PeerResources & pres) {
getLocalPeer().setPeerResources(pres);
}
/**
* @brief Permits access to the local peer resources object.
* @return The local peer resources object.
*/
const PeerResources &
MembershipService::getLocalPeerResources() const {
return getLocalPeer().getPeerResources();
}
}//End namespace moversight
}//End namespace ubeeme