1659 lines
60 KiB
C++
1659 lines
60 KiB
C++
/*
|
|
* File: MembershipService.cc
|
|
* Author: jgaebler
|
|
*
|
|
* Created on July 23, 2010, 2:11 PM
|
|
*/
|
|
|
|
#include "MembershipService.h"
|
|
|
|
#include "Dispatcher.h"
|
|
#include "Moversight.h"
|
|
#include "app/PeerDescription.h"
|
|
#include "common/transport/msg/ExteriorMessage.h"
|
|
#include "event/events/ConnectionLostEvent.h"
|
|
#include "ms/Peer.h"
|
|
#include "ms/timer/JoinAbortToInviterTimer.h"
|
|
#include "ms/timer/JoinAbortToInviteeTimer.h"
|
|
#include "ms/timer/JoinAnnounceTimer.h"
|
|
#include "ms/timer/LeaveAnnounceTimer.h"
|
|
#include "ms/msg/MSMessageFactory.h"
|
|
#include "ms/msg/PeerReconnectAnnounce.h"
|
|
#include "ms/events/GroupCreatedEvent.h"
|
|
#include "ms/events/GroupClosedEvent.h"
|
|
#include "ms/events/LocalPeerUpdatedEvent.h"
|
|
#include "ms/events/PeerJoinedEvent.h"
|
|
#include "ms/events/JoinRequestEvent.h"
|
|
#include "ms/events/JoinConfirmEvent.h"
|
|
#include "ms/events/JoinAbortedEvent.h"
|
|
#include "ms/events/JoinRejectedEvent.h"
|
|
#include "ms/events/JoinGroupDoneEvent.h"
|
|
#include "ms/events/PeerLeftEvent.h"
|
|
#include "mt/MessageTransfer.h"
|
|
#include "mt/events/PendingPeersEvent.h"
|
|
|
|
#include <sstream>
|
|
|
|
#if OMNETPP
|
|
#include "UDPControlInfo.h"
|
|
#endif
|
|
|
|
|
|
namespace ubeeme {
|
|
namespace moversight {
|
|
|
|
#undef DEBUG
|
|
#define DEBUG(msg) if (module.isPrintDebugMS()){ if(getLocalState()== DISJOINED){ MOV_DEBUG <<"MS@TA_"<<getLocalAddress()<<" "<<msg<<endl; } else{ MOV_DEBUG <<"MS@"<<getLocalID() <<" "<< msg <<endl; } }
|
|
|
|
/**
|
|
* @brief Creates a new MembershipService instance
|
|
* @param d A reference to the moversight dispatcher.
|
|
*/
|
|
MembershipService::MembershipService(Dispatcher & d) :
|
|
MoversightService(d, "MembershipService"),
|
|
mr(d),
|
|
lastmr(d),
|
|
myPeerID(UNDEFINED_PEER_ID),
|
|
myClusterID(UNDEFINED_CLUSTER_ID) {
|
|
}
|
|
|
|
/**
|
|
* @brief Destructor
|
|
*/
|
|
MembershipService::~MembershipService() {
|
|
}
|
|
|
|
/**
|
|
* @brief Initialise the membership service.
|
|
* This method initialise the membership service module, by setting up all needed values.
|
|
* The method is called by the dispatcher. After initialise, the membership service is ready
|
|
* to offer its services.
|
|
*/
|
|
void
|
|
MembershipService::initialise() {
|
|
|
|
dispatcher.subscribe<ConnectionLostEvent>(this);
|
|
dispatcher.subscribe<PendingPeersEvent>(this);
|
|
|
|
currentInvitationID = initCurrentInvitationID();
|
|
|
|
}
|
|
|
|
/**
|
|
* @brief This method finalise the membership service.
|
|
*/
|
|
void
|
|
MembershipService::finalise() {
|
|
|
|
dispatcher.unsubscribeAll(this);
|
|
|
|
stopAndDeleteAllTimers();
|
|
invitationList.clear();
|
|
mr.clear();
|
|
|
|
resetLocalPeer();
|
|
|
|
setLocalPeerState(DISJOINED);
|
|
setLocalSubState(NO_SUB_STATE);
|
|
|
|
}
|
|
|
|
/**
|
|
* @brief Invites a peer.
|
|
*
|
|
* The method invites a peer into the group. If the invitation successfully,
|
|
* the event signalJoinConfirm is created, otherwise the signalJoinAbort event.
|
|
* @param ta The transport address of the peer to invite.
|
|
* @param pDesc The peer description of the inviting peer.
|
|
*/
|
|
void
|
|
MembershipService::invitePeer(const TransportAddress & ta, const PeerDescription & pDesc) {
|
|
|
|
//check if peer not inviting already a member
|
|
if (getLocalState() != WAITING_FOR_ROSTER && isMember(ta)) {
|
|
std::string reason("the given transport address is already associated with a group member");
|
|
DEBUG("invitePeer - " << reason.c_str());
|
|
dispatcher.signal(new JoinAbortedEvent(ta, reason));
|
|
return;
|
|
}//End if
|
|
|
|
switch (getLocalState()) {
|
|
case WAITING_FOR_ROSTER:
|
|
{
|
|
std::string reason("currently joining a group, please try again later");
|
|
DEBUG("invitePeer - " << reason.c_str());
|
|
dispatcher.signal(new JoinAbortedEvent(ta, reason));
|
|
break;
|
|
}//End case
|
|
|
|
case DISJOINED:
|
|
{
|
|
//add the local peer as group member
|
|
//note: we continue with state INVITING !!!!
|
|
createLocalPeer(pDesc);
|
|
|
|
}//end case
|
|
|
|
case INVITING:
|
|
default:
|
|
{
|
|
//invite the peer
|
|
//create an invitation and remember that
|
|
Invitation inv(getNextInvitationID(), getLocalID(), getLocalAddress(), pDesc, ta);
|
|
invitationList.add(inv);
|
|
//create and send a join request to the peer to invite
|
|
sendJoinRequest(inv);
|
|
//start a timer, which monitors the response of the invitee
|
|
createAndStartJoinAbortToInviteeTimer(inv);
|
|
break;
|
|
}//end case
|
|
}//End switch
|
|
}//End
|
|
|
|
/**
|
|
* @brief Cancel the current inviting process.
|
|
* Calling this method performs a JoinAbort operation from the user to moversight.
|
|
* @param ta The transport address of the peer to invite.
|
|
* @param reason The cancel reason
|
|
*/
|
|
void
|
|
MembershipService::cancelInvitation(const TransportAddress & ta, const std::string & reason) {
|
|
try {
|
|
|
|
Invitation inv = invitationList.findByInviteeAddress(ta);
|
|
sendJoinAbort(inv, inv.getInviterAddress(), reason);
|
|
//cleanup
|
|
stopAndDeleteInvitationTimer(inv);
|
|
invitationList.remove(inv);
|
|
|
|
}//End try
|
|
catch (InvitationNotFoundException & e) {
|
|
|
|
std::string debugMsg;
|
|
debugMsg.append("cancelInvitation - ").append(e.toString());
|
|
DEBUG(debugMsg.c_str());
|
|
|
|
}//End catch
|
|
}//End
|
|
|
|
/**
|
|
* @brief Creates the local peer.
|
|
*
|
|
* This method is adds a peer to the group, which represents the local peer. It differs from the other peers, as the local peer is used
|
|
* by the different moversight modules to provide its service. Is the local peer not defined probably, the behavior of the moversight protocol is undefined.
|
|
* @note The method is called during the inviting process.
|
|
* @param pD the peer description of the local peer
|
|
*/
|
|
void
|
|
MembershipService::createLocalPeer(const PeerDescription & pD) {
|
|
|
|
PeerDescription dummyPeerDescription;
|
|
PeerResources dummyPeerResources = module.getPeerResources();
|
|
TransportAddress & localAddress = module.getLocalAddress();
|
|
|
|
PeerID pId = mr.createPeer(localAddress,
|
|
dummyPeerDescription,
|
|
dummyPeerResources);
|
|
setLocalPeer(pId);
|
|
setLocalPeerDescription(pD);
|
|
setLocalState(INVITING);
|
|
|
|
dispatcher.signal(new LocalPeerUpdatedEvent(getLocalPeer()));
|
|
|
|
}
|
|
|
|
/**
|
|
* @brief The method responses to a before received invitation.
|
|
* @param inv The received Invitation, which should be answered.
|
|
* @param ack True for accept, false otherwise.
|
|
* @param message The message for the inviting peer.
|
|
* @param pDesc The peer description of the local peer.
|
|
* @param resources The generic representation of the available peer resources.
|
|
*/
|
|
void
|
|
MembershipService::responseInvitation(const Invitation & inv,
|
|
const bool & ack,
|
|
const std::string & message,
|
|
const PeerDescription & pDesc,
|
|
const PeerResources & resources) {
|
|
|
|
if (invitationList.contains(inv)) {
|
|
//stop the timer and create the response
|
|
stopAndDeleteInvitationTimer(inv);
|
|
|
|
if (ack) {
|
|
//we accept the invitation, set new state
|
|
setLocalState(WAITING_FOR_ROSTER);
|
|
DEBUG("responseInvitation - send JoinConfirm");
|
|
sendJoinConfirm(inv, message, pDesc, resources);
|
|
createAndStartJoinAbortToInviterTimer(inv);
|
|
}//End if
|
|
else {
|
|
//send a JAB to the inviter and clean up
|
|
sendJoinAbort(inv, inv.getInviterAddress(), message);
|
|
DEBUG("responseInvitation - send JoinAbort");
|
|
invitationList.remove(inv);
|
|
}//End else
|
|
}//End if
|
|
else {
|
|
DEBUG("responseInvitation - invitation unknown");
|
|
std::string reason("invitation unknown - cancel joining (the invitation is probably outdated)");
|
|
dispatcher.signal(new JoinRejectedEvent(inv, reason));
|
|
}//End else
|
|
}
|
|
|
|
/**
|
|
* @brief Let the local peer leave the group.
|
|
*/
|
|
void
|
|
MembershipService::leaveGroup() {
|
|
if (getLocalState() == DISJOINED) {
|
|
DEBUG("leaveGroup - peer not within a group");
|
|
return;
|
|
}//End if
|
|
|
|
sendLeaveAnnounceMessage(getLocalID());
|
|
setLocalState(LEAVING);
|
|
|
|
dispatcher.signal(new LocalPeerUpdatedEvent(dispatcher.getLocalPeer()));
|
|
|
|
}
|
|
|
|
void
|
|
MembershipService::peerFailed(PeerID pId) {
|
|
//send PendingAnnounce to the group
|
|
}
|
|
|
|
/**
|
|
* @brief Handles a received join request message.
|
|
* @param msg The join request to handle.
|
|
*/
|
|
void
|
|
MembershipService::handleJoinRequest(JoinRequest* msg) {
|
|
#if OMNETPP
|
|
UDPDataIndication *ctrl = check_and_cast<UDPDataIndication *>(msg->getControlInfo());
|
|
TransportAddress srcAddr = ctrl->getSrcAddr();
|
|
TransportAddress destAddr = ctrl->getDestAddr();
|
|
srcAddr.setPort(ctrl->getSrcPort());
|
|
destAddr.setPort(ctrl->getDestPort());
|
|
Invitation inv(msg->getInvitationID(),
|
|
msg->getInviterID(),
|
|
srcAddr,
|
|
msg->getPeerDescription(),
|
|
destAddr);
|
|
#else
|
|
Invitation inv(msg->getInvitationID(),
|
|
msg->getInviterID(),
|
|
msg->getSource(),
|
|
msg->getPeerDescription(),
|
|
getLocalAddress());
|
|
#endif
|
|
DEBUG("handleJoinRequest - invitation received from " << inv.getInviterAddress());
|
|
|
|
switch (getLocalState()) {
|
|
case INVITING:
|
|
{
|
|
//we are currently creating a group
|
|
std::string reason("peer currently creating a group - please try again later");
|
|
sendJoinAbort(inv, msg->getSourceTA(), reason);
|
|
|
|
break;
|
|
}//end case
|
|
|
|
case WAITING_FOR_ROSTER:
|
|
{
|
|
//we are currently joining a group
|
|
//reject the join request
|
|
std::string reason("peer currently joining a group - reject invitation");
|
|
sendJoinAbort(inv, msg->getSourceTA(), reason);
|
|
|
|
break;
|
|
}//End case
|
|
|
|
case DISJOINED:
|
|
{
|
|
//we are not within a group
|
|
//remember the request and forward it to the application
|
|
DEBUG("handleJoinRequest - forward invitation to application");
|
|
invitationList.add(inv);
|
|
createAndStartJoinAbortToInviterTimer(inv);
|
|
|
|
dispatcher.signal(new JoinRequestEvent(inv));
|
|
|
|
break;
|
|
}//End case
|
|
|
|
case JOINED:
|
|
default:
|
|
{
|
|
//the inviting peer is in conference, merge???
|
|
std::string reason("group merge not implemented yet");
|
|
sendJoinAbort(inv, msg->getSourceTA(), reason);
|
|
|
|
break;
|
|
}//End case
|
|
}//End switch
|
|
}//End
|
|
|
|
/**
|
|
* @brief Handles a received join confirm message.
|
|
* @param msg The join confirm message to handle.
|
|
*/
|
|
void
|
|
MembershipService::handleJoinConfirm(JoinConfirm * msg) {
|
|
|
|
try {
|
|
const Invitation & inv = invitationList.find(msg->getInvitationID());
|
|
//we have find the corresponding invitation, thus stop the timer
|
|
stopAndDeleteInvitationTimer(inv);
|
|
#if UBEEME
|
|
// updated my ta with the address seen by the invitee
|
|
Peer & peer = getLocalPeer();
|
|
|
|
if (peer.getLocalAddress() != msg->getInviterTa()) {
|
|
peer.setLocalAddress(msg->getInviterTa());
|
|
}
|
|
|
|
#endif
|
|
//signal accepted join to the user (JOINconfirm)
|
|
std::string confMsg = msg->getJcMessage();
|
|
dispatcher.signal(new JoinConfirmEvent(inv, msg->getPeerDescription(), confMsg));
|
|
|
|
//update peer state
|
|
if (getLocalState() == INVITING) {
|
|
|
|
dispatcher.signal(new GroupCreatedEvent());
|
|
setLocalState(JOINED);
|
|
|
|
//add the new peer to the group
|
|
TransportAddress inviteeAddress = inv.getInviteeAddress();
|
|
PeerID pId = mr.createPeer(inviteeAddress,
|
|
msg->getPeerDescription(),
|
|
msg->getPeerResources());
|
|
|
|
//send roster message to the joined peer
|
|
RosterMessage roster = MSMessageFactory::createRosterMessage(*this, pId);
|
|
sendTo(roster, inviteeAddress);
|
|
|
|
DEBUG("handleJoinConfirm - peer " << pId << " joined successfully to group");
|
|
|
|
dispatcher.signal(new PeerJoinedEvent(pId, inv.getInviteeAddress(), msg->getPeerDescription()));
|
|
|
|
DEBUG("handleJoinConfirm - local peer joined successfully to the group");
|
|
dispatcher.signal(new JoinGroupDoneEvent(dispatcher.getLocalPeer()));
|
|
|
|
DEBUG("handleJoinConfirm - member register after peer joined");
|
|
mr.printMemberRegister();
|
|
//remove the invitation
|
|
invitationList.remove(inv);
|
|
}//End if
|
|
else {
|
|
|
|
//signal JA to group, the roster message is send later (see handleJoinAnnounce)
|
|
sendJoinAnnounce(inv, msg->getPeerDescription(), msg->getPeerResources());
|
|
//start a join announce timer to ensure the proper completion
|
|
//of the join process
|
|
createAndStartJoinAnnounceTimer(inv, msg->getPeerDescription(), msg->getPeerResources());
|
|
|
|
}//End else
|
|
}//End try
|
|
catch (InvitationNotFoundException & /* e */) {
|
|
DEBUG("handleJoinConfirm - no invitation for the given JC message present - drop message");
|
|
}//End catch
|
|
}
|
|
|
|
/**
|
|
* @brief Handles a received roster message.
|
|
*
|
|
* If the local peer is waiting for a roster, the method will init the group from this message, determine which of the peers is the local peer
|
|
* and will finish announcing an signalGroupCreated event. Otherwise, the message is dropped.
|
|
* @param msg The roster message to handle.
|
|
*/
|
|
void
|
|
MembershipService::handleRosterMessage(RosterMessage* msg) {
|
|
|
|
if (getLocalState() == WAITING_FOR_ROSTER) {
|
|
|
|
Roster roster;
|
|
roster.setNextPeerID(msg->getNextPeerID());
|
|
roster.setViewID(msg->getViewID());
|
|
|
|
dispatcher.signal(new GroupCreatedEvent());
|
|
|
|
MemberDescriptionList mDescList = msg->getMemberDescriptionList();
|
|
|
|
for (size_t i = 0; i < mDescList.size(); i++) {
|
|
MemberDescription mDesc = mDescList.get(i);
|
|
roster.addMemberDescription(mDesc);
|
|
|
|
if (mDesc.getPeerID() != msg->getJoinedPeerID()) {
|
|
PeerID pId = mDesc.getPeerID();
|
|
TransportAddress ta = mDesc.getTransportAddress();
|
|
PeerDescription pDesc = mDesc.getPeerDescription();
|
|
|
|
dispatcher.signal(new PeerJoinedEvent(pId, ta, pDesc));
|
|
|
|
}//End if
|
|
}//End for
|
|
|
|
DEBUG("handleRosterMessage - init group from roster");
|
|
mr.initMemberRegisterFromRoster(roster);
|
|
|
|
//finish group join
|
|
setLocalPeer(msg->getJoinedPeerID());
|
|
DEBUG("handleRosterMessage - join group successfully done");
|
|
dispatcher.signal(new JoinGroupDoneEvent(dispatcher.getLocalPeer()));
|
|
|
|
//clean up
|
|
try {
|
|
const Invitation & inv = invitationList.findByInviteeAddress(getLocalAddress());
|
|
stopAndDeleteInvitationTimer(inv);
|
|
invitationList.remove(inv);
|
|
}//End try
|
|
catch (InvitationNotFoundException & /* e */) {
|
|
}
|
|
|
|
}//End if
|
|
else {
|
|
DEBUG("handleRosterMessage - receive unexpected roster, drop message");
|
|
}
|
|
}//End
|
|
|
|
/**
|
|
* @brief Handles an incoming JAB messages
|
|
* @param msg The JAB message to handle.
|
|
*/
|
|
void
|
|
MembershipService::handleJoinAbort(JoinAbort* msg) {
|
|
try {
|
|
if (isSetDisjoinedAllowed()) {
|
|
setLocalState(DISJOINED);
|
|
}
|
|
|
|
const Invitation & inv = invitationList.find(msg->getInvitationID());
|
|
|
|
//signal rejected join to user
|
|
std::string rejectMsg = msg->getJabMessage();
|
|
dispatcher.signal(new JoinRejectedEvent(inv, rejectMsg));
|
|
|
|
//cleanup
|
|
stopAndDeleteInvitationTimer(inv);
|
|
invitationList.remove(inv);
|
|
}//end try
|
|
catch (InvitationNotFoundException & /* e */) {
|
|
DEBUG("handleJoinAbort - no invitation for the given JAB message present");
|
|
}//End catch
|
|
}//End
|
|
|
|
/**
|
|
* @brief This method handles a received join announce message by adding the joined peer to the local peer list.
|
|
* @param msg The join announce message to handle.
|
|
*/
|
|
void
|
|
MembershipService::handleJoinAnnounce(JoinAnnounce & msg) {
|
|
|
|
TransportAddress inviteeAddress = msg.getJoinedPeerTA();
|
|
DEBUG("handleJoinAnnounceMessage - member register before peer joined");
|
|
mr.printMemberRegister();
|
|
|
|
//add the peer to the group
|
|
PeerID pId = mr.createPeer(inviteeAddress, msg.getPeerDescription(), msg.getPeerResources());
|
|
|
|
//I'm the sender of the message?
|
|
if (msg.getSourceID() == getLocalID()) {
|
|
|
|
//send roster message to the joined peer
|
|
RosterMessage roster = MSMessageFactory::createRosterMessage(*this, pId);
|
|
sendTo( roster, inviteeAddress);
|
|
|
|
try {
|
|
|
|
Invitation inv = invitationList.findByInviteeAddress(inviteeAddress);
|
|
stopAndDeleteInvitationTimer(inv);
|
|
invitationList.remove(inv);
|
|
|
|
}//End try
|
|
catch (InvitationNotFoundException & e) {
|
|
|
|
std::stringstream buf;
|
|
buf << "handleJoinAnnounceMessage - " << e.what();
|
|
DEBUG(buf.str().c_str());
|
|
|
|
}//End catch
|
|
}//End if
|
|
|
|
DEBUG("handleJoinAnnounceMessage - peer " << pId << " joined successfully to group (view: " << msg.getViewID() << ")");
|
|
dispatcher.signal(new PeerJoinedEvent(pId, inviteeAddress, msg.getPeerDescription()));
|
|
|
|
DEBUG("handleJoinAnnounceMessage - member register after peer joined");
|
|
mr.printMemberRegister();
|
|
|
|
}
|
|
|
|
/**
|
|
* @brief Handles a leave announce message.
|
|
* @param msg The received leave message.
|
|
*/
|
|
void
|
|
MembershipService::handleLeaveAnnounce(LeaveAnnounce & msg) {
|
|
DEBUG("handleLeaveAnnounce");
|
|
PeerIDList leaveList = msg.getLeaveList();
|
|
|
|
//running timer?
|
|
stopAndDeleteLeaveAnnounceTimer(msg.getReferenceTime());
|
|
|
|
//we are leaving the group??
|
|
if (leaveList.contains(getLocalID())) {
|
|
closeGroup();
|
|
}//End if
|
|
else {
|
|
removeLeftPeersFromTimerQueues(leaveList);
|
|
|
|
for (size_t i = 0; i < leaveList.size(); i++) {
|
|
removePeer(leaveList.get(i));
|
|
}//End for
|
|
}//End else
|
|
}
|
|
|
|
/**
|
|
* @brief Handles a PRA message, received by the host.
|
|
* @param pra The peer reconnect announce message to handle.
|
|
*/
|
|
void
|
|
MembershipService::handlePeerReconnectAnnounce(PeerReconnectAnnounce & pra) {
|
|
PeerID reconnectedPeerID = pra.getReconnectedPeerID();
|
|
getPeer(reconnectedPeerID).setState(JOINED);
|
|
// dispatcher.getNetworkFailureDetector().peerReconnected(reconnectedPeerID);
|
|
std::cerr<<"handle peer reconnected announce\n";
|
|
}
|
|
|
|
/**
|
|
* @brief This method handles a list of peers suspected as pending by
|
|
* the message transfer.
|
|
*
|
|
* The message handles the given peers by setting its state to pending.
|
|
* If the given peer not a member of the group, the peer is removed from
|
|
* the list. A timer is started to monitor the rejoining of the pending
|
|
* peers. If the timer expires, the peers will be excluded.
|
|
* @param pList The list of the peers, suspected as pending.
|
|
*/
|
|
void
|
|
MembershipService::setPeersPending(const PeerIDList & pList) {
|
|
|
|
PeerIDList tempList;
|
|
|
|
//mark peers as pending
|
|
for (size_t i = 0; i < pList.size(); i++) {
|
|
if (contains(pList.get(i)) &&
|
|
getPeer(pList.get(i)).getState() != PENDING) {
|
|
|
|
getPeer(pList.get(i)).setState(PENDING);
|
|
tempList.add(pList.get(i));
|
|
}
|
|
}
|
|
|
|
if (tempList.size() > 0) {
|
|
//start the pending timer
|
|
createAndStartLeaveAnnounceTimer(tempList);
|
|
}//End if
|
|
}
|
|
|
|
/**
|
|
* @brief Handles a fired joinAbortToInviterTimer.
|
|
* @param timer The timer to handle.
|
|
*/
|
|
void
|
|
MembershipService::handleJoinAbortToInviterTimer(JoinAbortToInviterTimer * timer) {
|
|
|
|
Invitation inv = timer->getReference();
|
|
std::string reason("invitation outdated (on the side of the invitee)");
|
|
sendJoinAbort(inv, inv.getInviterAddress(), reason);
|
|
|
|
dispatcher.signal(new JoinRejectedEvent(inv, reason));
|
|
|
|
if (isSetDisjoinedAllowed()) {
|
|
setLocalState(DISJOINED);
|
|
}
|
|
else if (isGroupClosePossible()) {
|
|
closeGroup();
|
|
}//End else
|
|
|
|
stopAndDeleteInvitationTimer(inv);
|
|
invitationList.remove(inv);
|
|
|
|
}
|
|
|
|
/**
|
|
* @brief Handles an outdating joinAbortToInviteeTimer.
|
|
* @param timer The timer to handle.
|
|
*/
|
|
void
|
|
MembershipService::handleJoinAbortToInviteeTimer(JoinAbortToInviteeTimer * timer) {
|
|
|
|
Invitation inv = timer->getReference();
|
|
std::string reason("invitation outdated");
|
|
sendJoinAbort(inv, inv.getInviteeAddress(), reason);
|
|
dispatcher.signal(new JoinRejectedEvent(inv, reason));
|
|
|
|
if (isSetDisjoinedAllowed()) {
|
|
resetLocalPeer();
|
|
}
|
|
|
|
stopAndDeleteInvitationTimer(inv);
|
|
invitationList.remove(inv);
|
|
}
|
|
|
|
void
|
|
MembershipService::handleJoinAnnounceTimer(JoinAnnounceTimer * timer) {
|
|
|
|
Invitation inv = timer->getReference();
|
|
|
|
if (timer->getNumberOfRetries() > 0) {
|
|
//signal JA again to the group
|
|
DEBUG("handleJoinAnnounceTimer - send join announce message to group again");
|
|
sendJoinAnnounce(inv, timer->getInviteePeerDescription(), timer->getPeerResources());
|
|
//test is the timer still valid
|
|
//in case of a single member group, the timer become invalid
|
|
//during the sendJoinAnnounce method (in particular: the mt.send
|
|
//deliver the JA directly, which results in a nested call of the
|
|
//membership service handle method)
|
|
InvitationTimer * t = invitationTimerQueue.find(inv);
|
|
|
|
if (t != NULL) {
|
|
timer->restart();
|
|
}
|
|
}
|
|
else {
|
|
//cancel invitation, because we are not able to emit the JA
|
|
std::string reason("unable to publish invitation within the group, give up");
|
|
sendJoinAbort(inv, inv.getInviteeAddress(), reason);
|
|
|
|
dispatcher.signal(new JoinRejectedEvent(inv, reason));
|
|
|
|
//cleanup timer
|
|
stopAndDeleteInvitationTimer(inv);
|
|
invitationList.remove(inv);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief Creates and sends a join request to peer, identified by the given invitation.
|
|
* @param inv The invitation for the desired peer.
|
|
*/
|
|
void
|
|
MembershipService::sendJoinRequest(const Invitation & inv) {
|
|
JoinRequest joinReq = MSMessageFactory::createJoinRequestMessage(*this, inv.getInvitationID(), inv.getInviterPeerDescription());
|
|
DEBUG("sendJoinRequestToPeer - send invitation to peer at address " << inv.getInviteeAddress());
|
|
sendTo( joinReq, inv.getInviteeAddress());
|
|
}
|
|
|
|
/**
|
|
* @brief Sends a join confirm message to a dedicated peer.
|
|
* @param inv The corresponding invitation, used to create the join confirm.
|
|
* @param message The confirm message from the user / application.
|
|
* @param pDesc The peer description of the joined peer.
|
|
* @param resources The generic representation of the available peer resources.
|
|
*/
|
|
void
|
|
MembershipService::sendJoinConfirm(const Invitation & inv, const std::string & message, const PeerDescription & pDesc, const PeerResources & resources) {
|
|
JoinConfirm msg = MSMessageFactory::createJoinConfirmMessage(*this, inv, message, pDesc, resources);
|
|
DEBUG("sendJoinConfirm - " << message);
|
|
sendTo( msg, inv.getInviterAddress());
|
|
}
|
|
|
|
/**
|
|
* @brief Sends a join abort message to the dedicated peer.
|
|
* @param inv The corresponding invitation.
|
|
* @param destinationTa The transport address of the dedicated peer.
|
|
* @param jaMessage The reason for the join abort.
|
|
*/
|
|
void
|
|
MembershipService::sendJoinAbort(const Invitation & inv, const TransportAddress & destinationTa, const std::string & jaMessage) {
|
|
JoinAbort msg = MSMessageFactory::createJoinAbortMessage(*this, inv, jaMessage);
|
|
DEBUG("sendJoinAbortToPeer - " << jaMessage);
|
|
sendTo( msg, destinationTa);
|
|
}
|
|
|
|
/**
|
|
* @brief This method sends a join announce message to the peer, identified by the given invitation.
|
|
* @param inv The invitation corresponding to the joining peer.
|
|
* @param pDesc The peer description of the joining peer.
|
|
* @param resources The resource value of the new peer
|
|
*/
|
|
void
|
|
MembershipService::sendJoinAnnounce(const Invitation & inv, const PeerDescription & pDesc, const PeerResources & resources) {
|
|
DEBUG("sendJoinAnnounce - send JoinAnnounce to group");
|
|
JoinAnnounce joinAnnounce = MSMessageFactory::createJoinAnnounceMessage(inv.getInviteeAddress(), pDesc, resources);
|
|
dispatcher.sendMessage(joinAnnounce);
|
|
}
|
|
|
|
/**
|
|
* @brief The method sends a leave announce message to the group. This method
|
|
* is used in cases, where a peer leaves the group by itself.
|
|
* @param leavingPeer The peers that leaf the group.
|
|
*/
|
|
void
|
|
MembershipService::sendLeaveAnnounceMessage(const PeerID & leavingPeer) {
|
|
PeerIDList leavingPeers;
|
|
leavingPeers.add(leavingPeer);
|
|
sendLeaveAnnounceMessage(leavingPeers, getCurrentLogicalTime());
|
|
}
|
|
|
|
/**
|
|
* @brief The method sends a leave announce message to the group.
|
|
* @param leavingPeers The list of peers that leaf the group.
|
|
* @param referenceTime The time of the leave event.
|
|
*/
|
|
void
|
|
MembershipService::sendLeaveAnnounceMessage(const PeerIDList & leavingPeers, const VirtualLogicalTime & referenceTime) {
|
|
LeaveAnnounce leaveAnnounce = MSMessageFactory::createLeaveAnnounceMessage(leavingPeers, referenceTime);
|
|
dispatcher.sendMessage(leaveAnnounce);
|
|
}
|
|
|
|
/**
|
|
* @brief This method handles an expired leave announce timer. On expiring
|
|
* it creates a leave announce message for the peers given with the timer
|
|
* and disseminates these to the group. If the number of max retries
|
|
* reached the timer is deleted, without any further operations.
|
|
* @param timer The timer to handle.
|
|
*/
|
|
void
|
|
MembershipService::handleLeaveAnnounceTimer(LeaveAnnounceTimer * timer) {
|
|
DEBUG("handleLeaveAnnounceTimer");
|
|
|
|
//only master peers have to manage the leave operation
|
|
if (isLocalPeerMaster()) {
|
|
//the timer is expired
|
|
//the leave announce message is not yet send or not successfully
|
|
//sent to the group
|
|
|
|
//check if retry possible
|
|
if (timer->getNumberOfRetries() > 0) {
|
|
sendLeaveAnnounceMessage(timer->getPeerIDList(), timer->getReference());
|
|
timer->restart();
|
|
return;
|
|
}//End if
|
|
else {
|
|
std::stringstream buf;
|
|
buf << "handleLeaveAnnounceTimer - send leave announce " << timer->getReference() << " failed ";
|
|
DEBUG(buf.str());
|
|
}//End else
|
|
}//end if
|
|
|
|
//clean up
|
|
PeerIDList peersToRemove = timer->getPeerIDList();
|
|
stopAndDeleteLeaveAnnounceTimer(timer->getReference());
|
|
removeLeftPeersFromTimerQueues(peersToRemove);
|
|
}
|
|
|
|
/**
|
|
* @brief Returns the local peer ID.
|
|
* @return The local peer ID.
|
|
*/
|
|
PeerID
|
|
MembershipService::getLocalID() const {
|
|
return getLocalPeer().getPeerID();
|
|
}
|
|
|
|
/**
|
|
* @brief Returns the local cluster ID.
|
|
* @return The local cluster ID.
|
|
*/
|
|
ClusterID
|
|
MembershipService::getLocalClusterID() const {
|
|
return getLocalPeer().getClusterID();
|
|
}//End
|
|
|
|
/**
|
|
* @brief Defines, which peer within the MemberRegister is the local peer.
|
|
* @param pId The peer to select as local peer.
|
|
* @throw PeerNotFoundException If the given peer not member of the current MemberRegister.
|
|
*/
|
|
void
|
|
MembershipService::setLocalPeer(PeerID pId) {
|
|
if (mr.contains(pId)) {
|
|
Peer & p = getPeer(pId);
|
|
myPeerID = pId;
|
|
myClusterID = p.getClusterID();
|
|
p.setState(JOINED);
|
|
//restore dummy peer state
|
|
dummyPeer.setState(DISJOINED);
|
|
}//End if
|
|
else {
|
|
throw PeerNotFoundException("selected peer is not member of the current view");
|
|
}//End else
|
|
}
|
|
|
|
/**
|
|
* @brief Returns the local peer from the MemberRegister. If the peer not member of a MemberRegister, the method will return a dummy peer, which is disconnected and disjoined.
|
|
*
|
|
* @note The local peer have to be selected by the setLocalPeer method first.
|
|
* @return The local peer.
|
|
*/
|
|
Peer &
|
|
MembershipService::getLocalPeer() {
|
|
if (myPeerID == UNDEFINED_PEER_ID) {
|
|
return dummyPeer;
|
|
}//End if
|
|
|
|
Cluster & cluster = mr.getCluster(myClusterID);
|
|
return cluster.getPeer(myPeerID);
|
|
}//End
|
|
|
|
/**
|
|
* @brief Returns the local peer from the MemberRegister. If the peer not member of a MemberRegister, the method will return a dummy peer, which is disconnected and disjoined.
|
|
*
|
|
* @note The local peer have to be selected by the setLocalPeer method first.
|
|
* @return The local peer.
|
|
*/
|
|
const Peer &
|
|
MembershipService::getLocalPeer() const {
|
|
if (myPeerID == UNDEFINED_PEER_ID) {
|
|
return dummyPeer;
|
|
}//End if
|
|
|
|
const Cluster & cluster = mr.getCluster(myClusterID);
|
|
return cluster.getPeer(myPeerID);
|
|
}//End
|
|
|
|
/**
|
|
* @brief Resets the local peer settings.
|
|
*
|
|
* This method resets the local peer settings. Afterwards calls to getLocalPeer()
|
|
* or similar methods will be answered with a dummy peer.
|
|
*/
|
|
void
|
|
MembershipService::resetLocalPeer() {
|
|
|
|
//the local peer is no longer member of the MembershipService
|
|
myPeerID = UNDEFINED_PEER_ID;
|
|
myClusterID = UNDEFINED_CLUSTER_ID;
|
|
dummyPeer.setState(DISJOINED);
|
|
|
|
mr.clear();
|
|
|
|
}
|
|
|
|
/**
|
|
* @brief Returns the local address.
|
|
* @return The local address.
|
|
*/
|
|
const TransportAddress &
|
|
MembershipService::getLocalAddress() const {
|
|
return module.getLocalAddress();
|
|
}
|
|
|
|
/**
|
|
* @brief Returns the state of the local peer.
|
|
* @return The state of the local peer.
|
|
*/
|
|
PeerState &
|
|
MembershipService::getLocalPeerState() {
|
|
|
|
return getLocalPeer().getPeerState();
|
|
}
|
|
|
|
/**
|
|
* @brief Sets the peer state of the local peer.
|
|
* @param ps The peer state to set.
|
|
*/
|
|
void
|
|
MembershipService::setLocalPeerState(const PeerState & ps) {
|
|
getLocalPeer().setPeerState(ps);
|
|
}
|
|
|
|
/**
|
|
* @brief Sets the state part of the peer state object of the local peer.
|
|
* @param s The state to set.
|
|
*/
|
|
void
|
|
MembershipService::setLocalState(const State & s) {
|
|
getLocalPeer().setState(s);
|
|
}
|
|
|
|
/**
|
|
* @brief Returns a peer from the current group.
|
|
*
|
|
* The method searches within group for the given peer and returns them. Is the peer not found, a PeerNotFoundException is thrown.
|
|
* @throw PeerNotFoundException Is the desired peer not member of the group.
|
|
* @param pId The peer ID of the desired peer.
|
|
* @return The desired peer.
|
|
*/
|
|
Peer &
|
|
MembershipService::getPeer(const PeerID pId) {
|
|
return mr.getPeer(pId);
|
|
}
|
|
|
|
/**
|
|
* @brief Returns a peer from the current group.
|
|
*
|
|
* The method searches within group for the given peer and returns them. Is the peer not found, a PeerNotFoundException is thrown.
|
|
* @throw PeerNotFoundException Is the desired peer not member of the group.
|
|
* @param pId The peer ID of the desired peer.
|
|
* @return The desired peer.
|
|
*/
|
|
const Peer &
|
|
MembershipService::getPeer(const PeerID pId) const {
|
|
return mr.getPeer(pId);
|
|
}
|
|
|
|
/**
|
|
* @brief Returns the number of peers, associated with this group.
|
|
* @return The number of peers associated with the current group.
|
|
*/
|
|
size_t
|
|
MembershipService::getNumberOfPeers() const {
|
|
return mr.getNumberOfPeers();
|
|
}
|
|
|
|
/**
|
|
* @brief Returns the number of clusters, associated with this group.
|
|
* @return The number of clusters associated with the current group.
|
|
*/
|
|
size_t
|
|
MembershipService::getNumberOfClusters() const {
|
|
return mr.getNumberOfCluster();
|
|
}
|
|
|
|
/**
|
|
* @brief Returns a list of peer IDs of all peers within the cluster of the given peer.
|
|
* @param pId The peer id of a peer within the cluster to retrieve all members.
|
|
* @return The desired peer ID list.
|
|
* @throw PeerNotFoundException If the given peer ID not found within the group.
|
|
*/
|
|
PeerIDList
|
|
MembershipService::getClusterPeerIDList(const PeerID & pId) const {
|
|
return mr.getClusterPeerIDList(getPeer(pId).getClusterID());
|
|
}
|
|
|
|
/**
|
|
* @brief Returns a list of peer IDs of all slave-peers within a given clusters (the master address is not included!).
|
|
* @param cID The source cluster id.
|
|
* @return The desired peer ID list.
|
|
* @throw ClusterNotFoundException If the source cluster not found within the group.
|
|
*/
|
|
PeerIDList
|
|
MembershipService::getClusterPeerIDListSlavesOnly(const ClusterID cID) const {
|
|
|
|
PeerIDList list = mr.getClusterPeerIDList(cID);
|
|
|
|
list.remove(mr.getCluster(cID).getMasterID());
|
|
|
|
return list;
|
|
}
|
|
|
|
/**
|
|
* @brief Returns a list of all peers, within the group.
|
|
* @note This list is only for reading purpose. To edit the peers, access the appropriated MembershipService functions directly.
|
|
* @return The list of peers, associated with the current group.
|
|
*/
|
|
const PeerList
|
|
MembershipService::getPeerList() const {
|
|
return mr.getPeerList();
|
|
}
|
|
|
|
/**
|
|
* @brief Returns a list of all peer ID's, within the group.
|
|
* @note This list is only for reading purpose. To edit the peers, access the appropriated MembershipService functions directly.
|
|
* @return The list of peer ID's, associated with the current group.
|
|
*/
|
|
const PeerIDList
|
|
MembershipService::getPeerIDList() {
|
|
return mr.getPeerIDList();
|
|
}
|
|
|
|
/**
|
|
* @brief Returns a list of peer IDs of all masters within the group.
|
|
* @return The desired peer ID list.
|
|
*/
|
|
const PeerIDList
|
|
MembershipService::getMasterPeerIDList() {
|
|
return mr.getMasterPeerIDList();
|
|
}
|
|
|
|
/**
|
|
* @brief Returns a list of slave and master peer IDs.
|
|
*
|
|
* Returns a list with IDs of peers which are members of the same cluster as the given peer or from foreign clusters, which act as masters.
|
|
* The resulting list does not contain the peer itself.
|
|
* @note The peer itself is not member of the created list.
|
|
* @param self The peer to determine the desired cluster.
|
|
* @return The desired list
|
|
*/
|
|
PeerIDList
|
|
MembershipService::getClusterAndMasterPeerIDList(const Peer & self) {
|
|
|
|
//my cluster without me
|
|
PeerIDList list = mr.getClusterPeerIDList(self.getClusterID());
|
|
list.remove(self.getPeerID());
|
|
|
|
//all master IDs
|
|
PeerIDList masterList = getMasterPeerIDList();
|
|
|
|
//merge lists
|
|
list.unify(masterList);
|
|
|
|
//remove me
|
|
list.remove(self.getPeerID());
|
|
|
|
return list;
|
|
}
|
|
|
|
/**
|
|
* Returns the ClusterID in which the peer is located.
|
|
* @param pID The peer desired to find.
|
|
* @return The ClusterID of the peers cluster.
|
|
*/
|
|
ClusterID
|
|
MembershipService::findPeer( const PeerID & pID) {
|
|
return mr.findPeer(pID);
|
|
}
|
|
|
|
/**
|
|
* @brief Determines, if a peer is member of the group or not.
|
|
* @param ta The transport address of the desired peer.
|
|
* @return True, if the desired peer is a member of the group, otherwise false.
|
|
*/
|
|
bool
|
|
MembershipService::contains( const TransportAddress& ta) {
|
|
return mr.contains(ta);
|
|
}
|
|
|
|
/**
|
|
* @brief Determines, if a peer member of the group or not.
|
|
* @param pID The peer ID of the desired peer.
|
|
* @return True, if the desired peer element of the group, otherwise false.
|
|
*/
|
|
bool
|
|
MembershipService::contains(PeerID pID) {
|
|
return mr.contains(pID);
|
|
}
|
|
|
|
/**
|
|
* @brief Determines, if a peer member of the group or not.
|
|
* @param peer The desired peer.
|
|
* @return True, if the desired peer element of the group, otherwise false.
|
|
*/
|
|
bool
|
|
MembershipService::contains(Peer & peer) {
|
|
return contains(peer.getPeerID());
|
|
}
|
|
|
|
/**
|
|
* @brief Returns, if the local peer a master peer (acts as master within the group).
|
|
* @return True, if the local peer a master, false otherwise.
|
|
*/
|
|
bool
|
|
MembershipService::isLocalPeerMaster() const {
|
|
return getLocalPeer().isMaster();
|
|
}
|
|
|
|
/**
|
|
* @brief Checks, if the current peer master of the given peer.
|
|
* @param pId The id of the peer to check.
|
|
* @return True, if the current peer master of the given peer, false otherwise.
|
|
*/
|
|
bool
|
|
MembershipService::isMasterOf(const PeerID pId) const {
|
|
if (isLocalPeerMaster()) {
|
|
PeerIDList list = getClusterPeerIDList(getLocalID());
|
|
return list.contains(pId);
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* @brief Sets the peer description of the local peer.
|
|
* @param pDesc The peer description to set.
|
|
*/
|
|
void
|
|
MembershipService::setLocalPeerDescription(const PeerDescription & pDesc) {
|
|
|
|
getLocalPeer().setPeerDescription(pDesc);
|
|
|
|
}
|
|
|
|
/**
|
|
* @brief Removes a peer from the current view.
|
|
*
|
|
* This method removes the peer, identified by the given id, from the current view.
|
|
* If the given peer id not a member of group, the group keeps unchanged.
|
|
* @note Leads the removing of a peer to a situation whereas the call of the method
|
|
* isGroupClosePossible is evaluated to true, the group will be closed immediately. In this
|
|
* case, an event is emitted.
|
|
* @note The local peer might be the new master of a cluster, was the removed peer the master
|
|
* of its cluster and the peer placing strategy determines the local peer as new master.
|
|
* In this case, an event is emitted.
|
|
* @param pId The id of the peer to remove.
|
|
*/
|
|
void
|
|
MembershipService::removePeer(const PeerID & pId) {
|
|
|
|
if (contains(pId)) {
|
|
DEBUG("removePeer - remove peer " << pId << " from group");
|
|
PeerID oldMasterID = getLocalMasterID();
|
|
Peer peerToRemove = mr.getPeer(pId);
|
|
|
|
//remove the peer from the group
|
|
mr.removePeer(pId);
|
|
|
|
if (module.isPrintDebugMS()) {
|
|
mr.printMemberRegister();
|
|
}//End if
|
|
|
|
dispatcher.signal(new PeerLeftEvent(peerToRemove));
|
|
|
|
//we are the new master?
|
|
if (oldMasterID != getLocalMasterID()) {
|
|
if(isLocalPeerMaster()){
|
|
dispatcher.signal(new LocalPeerUpdatedEvent(dispatcher.getLocalPeer()));
|
|
}//End if
|
|
}//End if
|
|
|
|
//should the group be closed?
|
|
if (isGroupClosePossible()) {
|
|
closeGroup();
|
|
}//End if
|
|
}//End if
|
|
}
|
|
|
|
/**
|
|
* @brief Creates and starts a joinAbortToInvitee timer.
|
|
* @param invitation The invitation, associated with the timer to start.
|
|
*/
|
|
void
|
|
MembershipService::createAndStartJoinAbortToInviteeTimer(const Invitation & invitation) {
|
|
JoinAbortToInviteeTimer * timer = new JoinAbortToInviteeTimer(*this);
|
|
timer->setReference(invitation);
|
|
timer->start();
|
|
invitationTimerQueue.add(timer);
|
|
}
|
|
|
|
/**
|
|
* @brief Creates and starts a joinAbortToInviter timer.
|
|
* @param invitation The invitation, associated with the timer to start.
|
|
*/
|
|
void
|
|
MembershipService::createAndStartJoinAbortToInviterTimer(const Invitation & invitation) {
|
|
|
|
JoinAbortToInviterTimer * timer = new JoinAbortToInviterTimer(*this);
|
|
timer->setReference(invitation);
|
|
|
|
if (getLocalState() == DISJOINED) {
|
|
GenericTime t(JOIN_ABORT_TO_INVITER_TIMEOUT);
|
|
timer->setTimeout(t);
|
|
}//End if
|
|
else {
|
|
GenericTime t(JOIN_ABORT_TO_INVITER_TIMEOUT_MISSING_ROSTER);
|
|
timer->setTimeout(t);
|
|
}//End else
|
|
|
|
timer->start();
|
|
invitationTimerQueue.add(timer);
|
|
}
|
|
|
|
/**
|
|
* @brief Creates and starts a join announce timer.
|
|
* @param invitation The invitation, associated with the timer to start.
|
|
* @param pDesc The peer description of the peer to announce.
|
|
* @param resources The resources of the peer to announce.
|
|
*/
|
|
void
|
|
MembershipService::createAndStartJoinAnnounceTimer(const Invitation & invitation, const PeerDescription & pDesc, const PeerResources & resources) {
|
|
JoinAnnounceTimer * timer = new JoinAnnounceTimer(*this);
|
|
timer->setReference(invitation);
|
|
timer->setInviteePeerDescription(pDesc);
|
|
timer->setPeerResources(resources);
|
|
timer->start();
|
|
invitationTimerQueue.add(timer);
|
|
}
|
|
|
|
void
|
|
MembershipService::createAndStartLeaveAnnounceTimer(const PeerIDList & leavingPeerIdList) {
|
|
DEBUG("createAndStartLeaveAnnounceTimer - start leave timer for " << leavingPeerIdList.size() << " peer(s)")
|
|
LeaveAnnounceTimer * timer = new LeaveAnnounceTimer(*this);
|
|
timer->setPeerIDList(leavingPeerIdList);
|
|
timer->setReference(getCurrentLogicalTime());
|
|
timer->start();
|
|
timerQueue.add(timer);
|
|
}
|
|
|
|
/**
|
|
* @brief Stop running invitation timer (to invitee or inviter)
|
|
* @param inv The corresponding invitation, identifying the timer.
|
|
*/
|
|
void
|
|
MembershipService::stopAndDeleteInvitationTimer(const Invitation & inv) {
|
|
|
|
InvitationTimer * timer = invitationTimerQueue.find(inv);
|
|
|
|
if (timer != NULL) {
|
|
timer->stop();
|
|
delete timer;
|
|
invitationTimerQueue.remove(inv);
|
|
}//End if
|
|
else {
|
|
DEBUG("stopAndDeleteInvitationTimer - timer not found");
|
|
}//End else
|
|
}
|
|
|
|
/**
|
|
* @brief Stops and deletes a leave announce timer, identified by a reference.
|
|
* @param reference The reference to identify the timer.
|
|
*/
|
|
void
|
|
MembershipService::stopAndDeleteLeaveAnnounceTimer(const VirtualLogicalTime & reference) {
|
|
LeaveAnnounceTimer * timer = timerQueue.find(reference);
|
|
|
|
if (timer != NULL) {
|
|
timer->stop();
|
|
delete timer;
|
|
timerQueue.remove(reference);
|
|
}//End if
|
|
}
|
|
|
|
/**
|
|
* @brief This method stops and delete all running timers within the module.
|
|
*/
|
|
void
|
|
MembershipService::stopAndDeleteAllTimers() {
|
|
{
|
|
//invitation timer
|
|
InvitationList keys = invitationTimerQueue.getKeyList();
|
|
|
|
for (size_t i = 0; i < keys.size(); i++) {
|
|
stopAndDeleteInvitationTimer(keys.get(i));
|
|
}//end for
|
|
|
|
invitationTimerQueue.clear();
|
|
}
|
|
{
|
|
//leave timer
|
|
List<VirtualLogicalTime> keys;
|
|
keys = timerQueue.getKeyList();
|
|
|
|
for (size_t i = 0; i < keys.size(); i++) {
|
|
stopAndDeleteLeaveAnnounceTimer(keys.get(i));
|
|
}//end for
|
|
|
|
timerQueue.clear();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief Removes the given peers from all timers that monitor peers.
|
|
* @param leftPeers The peers to remove from the timers.
|
|
*/
|
|
void
|
|
MembershipService::removeLeftPeersFromTimerQueues(const PeerIDList & leftPeers) {
|
|
std::vector<VirtualLogicalTime> deleteCandidates;
|
|
|
|
for (size_t i = 0; i < timerQueue.size(); i++) {
|
|
LeaveAnnounceTimer * timer = timerQueue.get(i);
|
|
|
|
if (timer != NULL) {
|
|
PeerIDList peerList = timer->getPeerIDList();
|
|
peerList.remove(leftPeers);
|
|
|
|
if (peerList.size() <= 0) {
|
|
deleteCandidates.push_back(timer->getReference());
|
|
}//End if
|
|
}//End if
|
|
}//End for
|
|
|
|
//delete the time, which does not have any further monitored peers
|
|
std::vector<VirtualLogicalTime>::iterator it = deleteCandidates.begin();
|
|
|
|
while (it != deleteCandidates.end()) {
|
|
stopAndDeleteLeaveAnnounceTimer(*it);
|
|
it++;
|
|
}//End while
|
|
}
|
|
|
|
/**
|
|
* @brief Creates a new invitation ID.
|
|
* @return The created invitation ID.
|
|
*/
|
|
InvitationID
|
|
MembershipService::getNextInvitationID() {
|
|
return currentInvitationID++;
|
|
}//End
|
|
|
|
/**
|
|
* @brief Determines the initial invitation ID.
|
|
*
|
|
* The initial invitation ID is determined by the integer value of the local IP seeded with a 32bit random value
|
|
* @return The initial invitation ID.
|
|
*/
|
|
InvitationID
|
|
MembershipService::initCurrentInvitationID() {
|
|
#if OMNETPP
|
|
return (getLocalAddress().toIntValue() + (rand() % 0xFFFFFFFF));
|
|
#else
|
|
TransportAddress &addr = getLocalAddress();
|
|
return (addr.address.toIPv4Address() + (rand() % 0xFFFFFFFF));
|
|
#endif
|
|
}
|
|
|
|
/**
|
|
* @brief Checks, is it possible to close the group.
|
|
* We can close the group, if the number of peers within the group <= 1 and no invitation pending.
|
|
* @return True, if we can close the group, false otherwise.
|
|
*/
|
|
bool
|
|
MembershipService::isGroupClosePossible() const {
|
|
if (getNumberOfPeers() <= 1 &&
|
|
invitationList.size() <= 0) {
|
|
return true;
|
|
}//End if
|
|
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* @brief Checks, if it allowed to set the peer state to DISJOINED.
|
|
* @return True, if the current peer state WAITING_FOR_ROSTER or (INVITING and invitationList.size() <= 1), false otherwise.
|
|
*/
|
|
bool
|
|
MembershipService::isSetDisjoinedAllowed() const {
|
|
if (getLocalState() == WAITING_FOR_ROSTER) {
|
|
return true;
|
|
}
|
|
|
|
if (getLocalState() == INVITING &&
|
|
invitationList.size() <= 1) {
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* @brief This method setup a group from a given roster. This method is
|
|
* only assumed as debug method.
|
|
* @param roster The roster to setup the group.
|
|
* @param localPeerID The peer ID of the local peer to setup.
|
|
* @throws LogicException Thrown, if the given parameter not setup probably.
|
|
*/
|
|
void
|
|
MembershipService::setupGroupFromRoster(Roster & roster, PeerID localPeerID) {
|
|
|
|
dispatcher.signal(new GroupCreatedEvent());
|
|
|
|
DEBUG("setupGroupFromRoster - init group from roster");
|
|
|
|
if (getLocalSubState() == REJOIN_IN_PROGRESS) {
|
|
mr.initMemberRegisterAfterRejoin(roster);
|
|
}
|
|
else {
|
|
mr.initMemberRegisterFromRoster(roster);
|
|
}
|
|
|
|
if (mr.contains(localPeerID)) {
|
|
setLocalPeer(localPeerID);
|
|
|
|
dispatcher.signal(new LocalPeerUpdatedEvent(dispatcher.getLocalPeer()));
|
|
}
|
|
else {
|
|
throw LogicException("setupGroupFromRoster - local peer to setup not found within the group");
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief Create a Roster of the current MemberRegister
|
|
* @return ro The group created of the current MemberRegister
|
|
*/
|
|
Roster
|
|
MembershipService::getRosterFromGroup() {
|
|
return mr.createRoster();
|
|
}
|
|
|
|
/**
|
|
* @brief Return the current MemberRegister as const.
|
|
* @return The current MemberRegister.
|
|
*/
|
|
MemberRegister &
|
|
MembershipService::getCurrentMemberRegister() {
|
|
return mr;
|
|
}
|
|
|
|
/**
|
|
* @brief Return the current MemberRegister as const.
|
|
* @return The current MemberRegister.
|
|
*/
|
|
MemberRegister
|
|
const &
|
|
MembershipService::getCurrentMemberRegister() const {
|
|
return mr;
|
|
}
|
|
|
|
/**
|
|
* @brief Returns the last MemberRegister with the old View.
|
|
* @return The last MemberRegister.
|
|
*/
|
|
MemberRegister
|
|
const &
|
|
MembershipService::getLastMemberRegister() const {
|
|
return lastmr;
|
|
}
|
|
|
|
/**
|
|
* @brief Returns the last memberRegister with the old view.
|
|
* @return the last MemberRegister.
|
|
*/
|
|
MemberRegister &
|
|
MembershipService::getLastMemberRegister() {
|
|
return lastmr;
|
|
}
|
|
|
|
/**
|
|
* @brief Save the current MemberRegister
|
|
*/
|
|
void
|
|
MembershipService::saveLastMemberRegister() {
|
|
lastmr = mr;
|
|
}
|
|
|
|
/**
|
|
* @brief Saves a given MemberRegister.
|
|
* @param oldMR the MR to save
|
|
*/
|
|
void
|
|
MembershipService::saveLastMemberRegister(MemberRegister oldMR) {
|
|
lastmr = oldMR;
|
|
}
|
|
|
|
/**
|
|
* @brief Creates a new MemberRegister (MR) by given MemberRegister and updates local PeerID.
|
|
* @param memReg The new MR which overrides the old MR. The old MR will be saved before.
|
|
* @param newPeerID The new PeerID which updates the local PeerID
|
|
*/
|
|
void
|
|
MembershipService::setupGroupFromMemberRegister(MemberRegister & memReg, PeerID newPeerID) {
|
|
|
|
//set the member register
|
|
mr = memReg;
|
|
//set new peerID
|
|
setLocalPeer(newPeerID);
|
|
}
|
|
|
|
/**
|
|
* @brief Assignment operator.
|
|
* @param other The instance to assign.
|
|
* @return A reference of the current object.
|
|
*/
|
|
MembershipService &
|
|
|
|
MembershipService::operator =(const MembershipService & other) {
|
|
|
|
if (this == &other) {
|
|
return *this;
|
|
}
|
|
|
|
dispatcher = other.dispatcher;
|
|
module = other.module;
|
|
|
|
currentInvitationID = other.currentInvitationID;
|
|
mr = other.mr;
|
|
lastmr = other.lastmr;
|
|
invitationList = other.invitationList;
|
|
|
|
invitationTimerQueue = other.invitationTimerQueue;
|
|
timerQueue = other.timerQueue;
|
|
|
|
myPeerID = other.myPeerID;
|
|
myClusterID = other.myClusterID;
|
|
dummyPeer = other.dummyPeer;
|
|
|
|
return *this;
|
|
}
|
|
|
|
/**
|
|
* @brief Checks, if the given transport address associated with a member within the group.
|
|
* @param ta The transport address to check.
|
|
* @return True, if the given transport address associated with a member of the group, false otherwise.
|
|
*/
|
|
bool
|
|
MembershipService::isMember(const TransportAddress & ta) const {
|
|
|
|
PeerList peerList = getPeerList();
|
|
|
|
for (size_t i = 0; i < peerList.size(); i++) {
|
|
if (peerList.get(i).getLocalAddress() == ta) {
|
|
return true;
|
|
}//End if
|
|
}//End for
|
|
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* @brief Moves a peer from a source cluster to a destination cluster.
|
|
* @param pId The peer ID of the peer to move.
|
|
* @param destClusterId The cluster ID of the destination cluster.
|
|
* @throw throw PeerPlaceException If the move operation not possible.
|
|
*/
|
|
void
|
|
MembershipService::movePeer(PeerID pId, ClusterID destClusterId) {
|
|
mr.movePeer(pId, destClusterId);
|
|
|
|
//remember the new location of the local peer
|
|
if (pId == myPeerID) {
|
|
myClusterID = destClusterId;
|
|
}//End if
|
|
}
|
|
|
|
/**
|
|
* @brief Moves a peer from a source cluster to a destination cluster.
|
|
* @param p The peer to move.
|
|
* @param destClusterId The cluster ID of the destination cluster.
|
|
* @throw throw PeerPlaceException If the move operation not possible.
|
|
*/
|
|
void
|
|
MembershipService::movePeer(Peer & p, ClusterID destClusterId) {
|
|
movePeer(p.getPeerID(), destClusterId);
|
|
}
|
|
|
|
/**
|
|
* @brief Returns the current view ID.
|
|
* @return The desired view ID.
|
|
*/
|
|
ViewID
|
|
MembershipService::getViewID() const {
|
|
return mr.getViewID();
|
|
}
|
|
|
|
/**
|
|
* @brief Closes the local group.
|
|
*
|
|
* This method closes the local group by reseting the local peer and signaling
|
|
* the group close to the dispatcher.
|
|
*/
|
|
void
|
|
MembershipService::closeGroup() {
|
|
resetLocalPeer();
|
|
dispatcher.signal(new GroupClosedEvent());
|
|
}
|
|
|
|
/**
|
|
* @brief Returns the view ID of the last operation.
|
|
* @return The desired view ID.
|
|
*/
|
|
ViewID
|
|
MembershipService::getLastViewID() const {
|
|
return lastmr.getViewID();
|
|
}
|
|
|
|
/**
|
|
* @brief Placing the rejoining peer back in the current MR. Trying to add it to its old position.
|
|
* @param peer The rejoining peer to place.
|
|
* @param originalMR The saved original group.
|
|
*/
|
|
void
|
|
MembershipService::placeRejoiningPeer(Peer & peer, MemberRegister & originalMR) {
|
|
DEBUG("placeRejoiningPeer -place rejoining peer in cluster");
|
|
MemberRegister & curMR = getCurrentMemberRegister();
|
|
|
|
if (originalMR.getNumberOfPeers() != 0) {
|
|
ClusterID oldCID = originalMR.getPeer(peer.getPeerID()).getClusterID();
|
|
ClusterList cList = getCurrentMemberRegister().getClusters();
|
|
|
|
if (cList.contains(oldCID)) {
|
|
if (curMR.getCluster(oldCID).size() < curMR.getMaxPeerCount()) {
|
|
curMR.addPeer(peer, oldCID);
|
|
}
|
|
else {
|
|
curMR.placePeer(peer);
|
|
}
|
|
}
|
|
else {
|
|
curMR.addPeer(peer, oldCID);
|
|
}
|
|
}
|
|
else {
|
|
curMR.placePeer(peer);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief Handle ConnectionLostEvent.
|
|
* @param e The event.
|
|
*/
|
|
void
|
|
MembershipService::handleEvent(const ConnectionLostEvent & e) {
|
|
setLocalState(DISCONNECTED);
|
|
dispatcher.signal(new LocalPeerUpdatedEvent(dispatcher.getLocalPeer()));
|
|
}
|
|
|
|
/**
|
|
* @brief Handle PendingPeersEvent.
|
|
* @param e The event.
|
|
*/
|
|
void
|
|
MembershipService::handleEvent( const PendingPeersEvent & e) {
|
|
setPeersPending(e.getPeerIDList());
|
|
}
|
|
|
|
/**
|
|
* @brief Sets the local peer resources object.
|
|
* @param pres The peer resources object to set.
|
|
*/
|
|
void
|
|
MembershipService::setLocalPeerResources(const PeerResources & pres) {
|
|
getLocalPeer().setPeerResources(pres);
|
|
}
|
|
|
|
/**
|
|
* @brief Permits access to the local peer resources object.
|
|
* @return The local peer resources object.
|
|
*/
|
|
const PeerResources &
|
|
MembershipService::getLocalPeerResources() const {
|
|
return getLocalPeer().getPeerResources();
|
|
}
|
|
|
|
|
|
}//End namespace moversight
|
|
}//End namespace ubeeme
|