577 lines
18 KiB
C++
577 lines
18 KiB
C++
/*
|
|
* File: StaticIntervalDetector.cc
|
|
* Author: jgaebler
|
|
*
|
|
* Created on February 24, 2014, 2:28 PM
|
|
*/
|
|
|
|
#include "StaticIntervalDetector.h"
|
|
|
|
#include "fd/nfd/msg/ControlMessage.h"
|
|
#include "fd/nfd/msg/ControlMessageConfirm.h"
|
|
|
|
#include "Moversight.h"
|
|
#include "Dispatcher.h"
|
|
#include "ms/events/PeerJoinedEvent.h"
|
|
#include "ms/events/PeerLeftEvent.h"
|
|
#include "ms/events/LocalPeerUpdatedEvent.h"
|
|
#include "mt/events/PendingPeersEvent.h"
|
|
#include "fd/events/PeerReconnectedEvent.h"
|
|
#include "fd/nfd/timer/DetectionTimer.h"
|
|
#include "fd/events/PeerFailedEvent.h"
|
|
|
|
|
|
namespace ubeeme {
|
|
namespace moversight {
|
|
|
|
#undef DEBUG
|
|
#define DEBUG(msg) if (module.isPrintDebugNFD()) MOV_DEBUG << "NFD@" << getLocalID() << " "<<msg<<endl;
|
|
|
|
/**
|
|
* @brief Constructor
|
|
* @param d A reference to the local dispatcher
|
|
*/
|
|
StaticIntervalDetector::StaticIntervalDetector(Dispatcher & d) : NetworkFailureDetector(d) {
|
|
}
|
|
|
|
/**
|
|
* @brief Copy constructor
|
|
* @param orig The instance to copy
|
|
*/
|
|
StaticIntervalDetector::StaticIntervalDetector(const StaticIntervalDetector& orig) : NetworkFailureDetector(orig) {
|
|
operator=(orig);
|
|
}
|
|
|
|
/**
|
|
* @brief Destructor
|
|
*/
|
|
StaticIntervalDetector::~StaticIntervalDetector() {
|
|
}
|
|
|
|
/**
|
|
* @brief Assignment operator
|
|
* @param other The instance to assign
|
|
* @return A reference to the local instance
|
|
*/
|
|
StaticIntervalDetector&
|
|
StaticIntervalDetector::operator =(const StaticIntervalDetector & other) {
|
|
|
|
if (this != &other) {
|
|
|
|
NetworkFailureDetector::operator =(other);
|
|
|
|
}//End if
|
|
|
|
return *this;
|
|
}
|
|
|
|
/**
|
|
* @brief Initialize the failure detector service
|
|
*/
|
|
void
|
|
StaticIntervalDetector::initialise() {
|
|
|
|
dispatcher.subscribe<PeerJoinedEvent>(this);
|
|
dispatcher.subscribe<PeerLeftEvent>(this);
|
|
dispatcher.subscribe<PendingPeersEvent>(this);
|
|
dispatcher.subscribe<PeerReconnectedEvent>(this);
|
|
dispatcher.subscribe<LocalPeerUpdatedEvent>(this);
|
|
dispatcher.subscribe<JoinGroupDoneEvent>(this);
|
|
|
|
setRunning(false);
|
|
}
|
|
|
|
/**
|
|
* @brief Runs operations to finalize the NFD service
|
|
*/
|
|
void
|
|
StaticIntervalDetector::finalise() {
|
|
|
|
dispatcher.unsubscribeAll(this);
|
|
|
|
if (isRunning()) {
|
|
stop();
|
|
}//End if
|
|
}
|
|
|
|
/**
|
|
* @brief Handles a detection timer timeout.
|
|
* @param timer The timer to handle.
|
|
*/
|
|
void
|
|
StaticIntervalDetector::handleDetectionTimer(DetectionTimer * timer) {
|
|
|
|
if (!isRunning()) return;
|
|
|
|
PeerID monitoredPeerId = timer->getReference();
|
|
PeerSuspicionState peerState = nfdStateTable.getEntry(monitoredPeerId);
|
|
|
|
switch (peerState) {
|
|
case NO_SUSPECT:
|
|
{
|
|
|
|
DEBUG("handleDetectionTimer - HALF_SUSPECT peer " << monitoredPeerId);
|
|
sendControlMessage(monitoredPeerId);
|
|
|
|
resetDetectionTimer(monitoredPeerId);
|
|
nfdStateTable.updateEntry(monitoredPeerId, HALF_SUSPECT);
|
|
|
|
break;
|
|
|
|
}//End case
|
|
case HALF_SUSPECT:
|
|
{
|
|
|
|
if (timer->getNumberOfRetries() > 0) {
|
|
|
|
DEBUG("handleDetectionTimer - try to contact peer again");
|
|
|
|
sendControlMessage(monitoredPeerId);
|
|
startDetectionTimer(monitoredPeerId);
|
|
|
|
}//End if
|
|
else {
|
|
|
|
DEBUG("handleDetectionTimer - peer does not response ... give up");
|
|
DEBUG("handleDetectionTimer - FULL_SUSPECT peer " << monitoredPeerId);
|
|
DEBUG("handleDetectionTimer - signal peer failed");
|
|
|
|
nfdStateTable.updateEntry(monitoredPeerId, FULL_SUSPECT);
|
|
dispatcher.signal(new PeerFailedEvent(monitoredPeerId));
|
|
|
|
|
|
}//End else
|
|
break;
|
|
}//End case
|
|
default:
|
|
{
|
|
DEBUG("handleDetectionTimer - unknown peer state");
|
|
break;
|
|
}//end default case
|
|
}//End switch
|
|
}
|
|
|
|
/**
|
|
* @brief Handles a received moversight message to analyze remote peer status
|
|
* @param pdu The message to analyze
|
|
*/
|
|
void
|
|
StaticIntervalDetector::handleMessage(const MoversightMessage * pdu) {
|
|
|
|
if (!isRunning()) return;
|
|
|
|
if (pdu != NULL) {
|
|
|
|
PeerID senderPeerId = pdu->getSourceID();
|
|
restartMonitorPeer(senderPeerId);
|
|
|
|
//to a ControlMessage we have to response
|
|
if (pdu->getType() == CM) {
|
|
sendControlMessageConfirm(senderPeerId);
|
|
}//End if
|
|
}//End if
|
|
}
|
|
|
|
/**
|
|
* @brief A slot for receiving the peer joined event.
|
|
* @param e The event to handle.
|
|
*/
|
|
void
|
|
StaticIntervalDetector::handleEvent(const PeerJoinedEvent & e) {
|
|
|
|
if (isTargetPeer(e.getPeerID())) {
|
|
|
|
if (!isLocalPeerMaster()) {
|
|
stopMonitorAllPeers();
|
|
}//End if
|
|
|
|
DEBUG("peerJoined - monitor peer ID " << e.getPeerID());
|
|
startMonitorPeer(e.getPeerID());
|
|
|
|
if (module.isPrintDebugNFD()) {
|
|
DEBUG("peerJoined - resulting suspicion table");
|
|
nfdStateTable.printTable();
|
|
}//End if
|
|
|
|
}//End if
|
|
}
|
|
|
|
/**
|
|
* @brief A slot for receiving the peer left event.
|
|
* @param e The event to handle.
|
|
*/
|
|
void
|
|
StaticIntervalDetector::handleEvent(const PeerLeftEvent & e) {
|
|
stopMonitorPeer(e.getPeer().getPeerID());
|
|
}
|
|
|
|
/**
|
|
* @brief A slot for receiving the peer pending event.
|
|
* @param e The event to handle.
|
|
*/
|
|
void
|
|
StaticIntervalDetector::handleEvent(const PendingPeersEvent & e) {
|
|
PeerIDList peers = e.getPeerIDList();
|
|
for (size_t i = 0; i < peers.size(); i++) {
|
|
stopMonitorPeer(peers.get(i));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief A slot for receiving the peer reconnected event.
|
|
* @param e The event to handle.
|
|
*/
|
|
void
|
|
StaticIntervalDetector::handleEvent(const PeerReconnectedEvent & e) {
|
|
|
|
if (isTargetPeer(e.getPeerID())) {
|
|
|
|
DEBUG("peerReconnected - monitor peer ID " << e.getPeerID());
|
|
startMonitorPeer(e.getPeerID());
|
|
|
|
}//End if
|
|
}
|
|
|
|
/**
|
|
* @brief A slot for receiving the local peer updated event.
|
|
* @param e The event to handle
|
|
*/
|
|
void
|
|
StaticIntervalDetector::handleEvent(const LocalPeerUpdatedEvent & /* e */) {
|
|
|
|
//we are running ??
|
|
if (isRunning()) {
|
|
|
|
if (getLocalState() == JOINED) {
|
|
|
|
if (isLocalPeerMaster()) {
|
|
|
|
PeerIDList list1 = nfdStateTable.getStoredPeerIDs();
|
|
PeerIDList list2 = dispatcher.getMembershipService().getClusterAndMasterPeerIDList(getLocalPeer());
|
|
|
|
list1.sort();
|
|
list2.sort();
|
|
|
|
//nothing changed?
|
|
if (list1 == list2) {
|
|
return;
|
|
}//End if
|
|
|
|
}//End if
|
|
//slave
|
|
else {
|
|
if (nfdStateTable.size() == 1) {
|
|
return;
|
|
}//End if
|
|
}//End else
|
|
|
|
stop();
|
|
start();
|
|
}
|
|
else if (getLocalState() == LEAVING) {
|
|
stop();
|
|
}//end
|
|
}
|
|
//not running
|
|
else {
|
|
|
|
//we have joined the group, so we can start the detection of the
|
|
//foreign peers
|
|
if (getLocalState() == JOINED) {
|
|
start();
|
|
}//End if Joined
|
|
|
|
return;
|
|
|
|
}//End else - not running
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief Handles the join group done event by starting the detector.
|
|
* @param e The event to handle.
|
|
*/
|
|
void
|
|
StaticIntervalDetector::handleEvent(const JoinGroupDoneEvent & e){
|
|
if(!isRunning()){
|
|
start();
|
|
}//End if
|
|
}
|
|
|
|
/**
|
|
* @brief Starts for a given peer ID a stored detection timer. If no timer stored, an exception is thrown.
|
|
* @param pId The peer ID of peer corresponding to the timer to start.
|
|
* @throws TimerNotFoundException If no detection timer stored within the detection timer queue for the given peer ID.
|
|
*/
|
|
void
|
|
StaticIntervalDetector::startDetectionTimer(const PeerID & pId) {
|
|
|
|
DetectionTimer * timer = findDetectionTimer(pId);
|
|
|
|
if (timer != NULL) {
|
|
|
|
{
|
|
GenericTime time = GenericTime::currentTime() + DETECTION_TIMEOUT;
|
|
DEBUG("startDetectionTimer - timeout for peer " << pId << ": " << time.toString());
|
|
}
|
|
|
|
timer->setTimeout(DETECTION_TIMEOUT);
|
|
timer->start();
|
|
|
|
}//end if
|
|
}
|
|
|
|
/**
|
|
* @brief Resets for a given peer ID a stored detection timer. If no timer
|
|
* stored, an exception is thrown. The number of retires per timer will
|
|
* also be reset to the start value.
|
|
* @param pId The peer ID of peer corresponding to the timer to reset (stop + start).
|
|
* @throws TimerNotFoundException If no detection timer stored within the detection timer queue for the given peer ID.
|
|
*/
|
|
void
|
|
StaticIntervalDetector::resetDetectionTimer(const PeerID & pId) {
|
|
|
|
DEBUG("reset detection timer for peer " << pId);
|
|
stopDetectionTimer(pId);
|
|
|
|
//reset timer retries
|
|
findDetectionTimer(pId)->resetNumberOfRetries();
|
|
startDetectionTimer(pId);
|
|
|
|
}
|
|
|
|
/**
|
|
* @brief Stops for a given peer ID a stored detection timer.
|
|
* @param pId The peer ID of peer corresponding to the timer to stop.
|
|
*/
|
|
void
|
|
StaticIntervalDetector::stopDetectionTimer(const PeerID & pId) {
|
|
|
|
try {
|
|
|
|
DetectionTimer * timer = findDetectionTimer(pId);
|
|
timer->stop();
|
|
|
|
}//End try
|
|
catch (TimerNotFoundException & /*e*/) {
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief Stop and deletes the detection timer for the given peer from
|
|
* the detection timer queue.
|
|
* @param pId The peer ID of the peer corresponding to the timer
|
|
*/
|
|
void
|
|
StaticIntervalDetector::stopAndDeleteDetectionTimer(const PeerID & pId) {
|
|
|
|
DEBUG("stopAndDeleteDetectionTimer - remove detection timer for peer " << pId);
|
|
|
|
try {
|
|
|
|
DetectionTimer * timer = findDetectionTimer(pId);
|
|
timer->stop();
|
|
detectionTimerQueue.remove(pId);
|
|
delete timer;
|
|
|
|
}//End try
|
|
catch (TimerNotFoundException & /*e*/) {
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief Finds the corresponding detection timer for the given peer ID.
|
|
* @param pId The peer ID of the peer to find the corresponding
|
|
* detection timer.
|
|
* @throws TimerNotFoundException If no detection timer stored within
|
|
* the detection timer queue for the given peer ID.
|
|
* @return The desired timer.
|
|
*/
|
|
DetectionTimer *
|
|
StaticIntervalDetector::findDetectionTimer(const PeerID & pId) {
|
|
|
|
DetectionTimer * timer = detectionTimerQueue.find(pId);
|
|
|
|
if (timer != NULL) {
|
|
return timer;
|
|
}//End if
|
|
|
|
throw TimerNotFoundException("findDetectionTimer - no detection timer stored within the detection timer queue for the given peer ID");
|
|
|
|
}
|
|
|
|
/**
|
|
* @brief Creates and enqueues a detection timer for a given peer.
|
|
* A new timer is created and stored within the detection timer queue,
|
|
* if no timer for the given peer ID already stored within the queue.
|
|
* If a timer already stored for the given peer ID, an additional
|
|
* timer is not created and the method returns without do anything.
|
|
* @param pId The peer ID of the peer corresponding to the created
|
|
* and stored timer.
|
|
*/
|
|
void
|
|
StaticIntervalDetector::createAndEnqueueDetectionTimer(const PeerID & pId) {
|
|
|
|
try {
|
|
|
|
DetectionTimer * timer = findDetectionTimer(pId);
|
|
|
|
//if already a timer stored for this peer???
|
|
if (timer != NULL) {
|
|
return; //do nothing
|
|
}//End if
|
|
}
|
|
catch (TimerNotFoundException & /*e*/) {
|
|
}
|
|
|
|
DetectionTimer * timer = new DetectionTimer(*this);
|
|
timer->setReference(pId);
|
|
detectionTimerQueue.add(timer);
|
|
|
|
}
|
|
|
|
/**
|
|
* @brief This stops the monitoring off all peers, register within
|
|
* the nfd.
|
|
*
|
|
* This method stops and removes all detection timer.
|
|
*/
|
|
void
|
|
StaticIntervalDetector::stopMonitorAllPeers() {
|
|
|
|
PeerIDList monitoredPeerIDList = nfdStateTable.getStoredPeerIDs();
|
|
|
|
for (size_t i = 0; i < monitoredPeerIDList.size(); i++) {
|
|
stopMonitorPeer(monitoredPeerIDList.get(i));
|
|
}//End for
|
|
}
|
|
|
|
/**
|
|
* @brief Stops to monitor a peer.
|
|
*
|
|
* The method ends to monitor the peer, identified by the given peer ID,
|
|
* by:
|
|
* <ul>
|
|
* <li>remove the peer id form the nfd state table</li>
|
|
* <li>stop and remove the detection timer.</li>
|
|
* </ul>
|
|
* @param pId The id of the peer that monitoring is stopped.
|
|
*/
|
|
void
|
|
StaticIntervalDetector::stopMonitorPeer(const PeerID & pId) {
|
|
|
|
if (nfdStateTable.contains(pId)) {
|
|
|
|
nfdStateTable.removeEntry(pId);
|
|
stopAndDeleteDetectionTimer(pId);
|
|
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief Starts monitoring a peer.
|
|
*
|
|
* This method starts to monitor the peer, identified by the given
|
|
* peer ID, by:
|
|
* <ul>
|
|
* <li> create an nfd state table entry for the peer</li>
|
|
* <li> create, enqueue and start a detection timer for this peer.</li>
|
|
* </ul>
|
|
* @param pId The id of the peer to monitor.
|
|
*/
|
|
void
|
|
StaticIntervalDetector::startMonitorPeer(const PeerID & pId) {
|
|
|
|
//already monitoring that peer? return.
|
|
if (nfdStateTable.contains(pId)) return;
|
|
|
|
//add an entry to the monitor table
|
|
nfdStateTable.addEntry(pId, NO_SUSPECT);
|
|
///create and start the timer
|
|
createAndEnqueueDetectionTimer(pId);
|
|
startDetectionTimer(pId);
|
|
|
|
}
|
|
|
|
/**
|
|
* @brief Starts to monitor peers as a master peer by monitor the local
|
|
* cluster and the remaining master peers.
|
|
*/
|
|
void
|
|
StaticIntervalDetector::startMonitorPeersAsMaster() {
|
|
|
|
PeerIDList cmpList = dispatcher.getMembershipService().getClusterAndMasterPeerIDList(getLocalPeer());
|
|
|
|
//add peers
|
|
for (size_t i = 0; i < cmpList.size(); i++) {
|
|
|
|
PeerID pId = cmpList.get(i);
|
|
DEBUG("startMonitorPeersAsMaster - monitor peer ID " << pId);
|
|
startMonitorPeer(pId);
|
|
|
|
}//End for
|
|
}
|
|
|
|
/**
|
|
* @brief Starts the monitoring as a slave by monitor the master peer.
|
|
*/
|
|
void
|
|
StaticIntervalDetector::startMonitorPeersAsSlave() {
|
|
|
|
//begin monitor new master
|
|
DEBUG("startMonitorPeersAsSlave - monitor master " << getLocalPeer().getMasterID());
|
|
startMonitorPeer(getLocalPeer().getMasterID());
|
|
}
|
|
|
|
/**
|
|
* @brief Unsuspects the given peer.
|
|
*
|
|
* This method sets the suspicion state peer of the given peer to NO_SUSPECT, if the peer was former half or full suspected.
|
|
* @param peerId The id of the peer to unsuspect
|
|
* @param suspectionState The former suspection state.
|
|
*/
|
|
void
|
|
StaticIntervalDetector::unsuspectPeer(const PeerID & peerId, const PeerSuspicionState suspectionState) {
|
|
|
|
if (suspectionState == HALF_SUSPECT ||
|
|
suspectionState == FULL_SUSPECT) {
|
|
|
|
nfdStateTable.updateEntry(peerId, NO_SUSPECT);
|
|
DEBUG("unsuspectPeer - UNSUSPECT peer " << peerId);
|
|
|
|
//the master suspects this peer?
|
|
if (getLocalPeer().isMaster() &&
|
|
suspectionState == FULL_SUSPECT) {
|
|
|
|
//welcome back
|
|
DEBUG("unsuspectPeer - announce peer reconnect");
|
|
//dispatcher.signal( new PeerReconnectedEvent( peerId, dispatcher.getLocalAddressFromPeer( peerId)));
|
|
// sendPeerReconnectAnnounceMessage(peerId);
|
|
std::cerr << getLocalID()<<": unsuspectPeer - announce peer reconnect for peer " << peerId << " not finish implemented yet. \n";
|
|
}//End if MASTER && peerState == FULL_SUSPECT
|
|
}//End if FULL_SUSPECT || HALF_SUSPECT
|
|
}
|
|
|
|
/**
|
|
* @brief Restarts to monitor the peer, identified by the given id.
|
|
*
|
|
* The method restarts to monitor the peer with the given id by
|
|
* restarting the detection timer. The peer state is set to NO_SUSPECT.
|
|
* @param pId The id of the peer to monitor.
|
|
*/
|
|
void
|
|
StaticIntervalDetector::restartMonitorPeer(const PeerID & pId) {
|
|
|
|
//monitored peer?
|
|
if (nfdStateTable.contains(pId)) {
|
|
|
|
PeerSuspicionState peerState = nfdStateTable.getEntry(pId);
|
|
resetDetectionTimer(pId);
|
|
unsuspectPeer(pId, peerState);
|
|
|
|
}//End if
|
|
}
|
|
|
|
|
|
}
|
|
} |