329 lines
9.3 KiB
C++
329 lines
9.3 KiB
C++
#include "MaintanceRoleService.h"
|
|
#include "Dispatcher.h"
|
|
#include "metric/MessageCountMomentMetric.h"
|
|
#include "metric/MessageSeqCountMomentMetric.h"
|
|
#include "metric/MessageClusterCountMomentMetric.h"
|
|
#include "metric/LogicTimeMomentMetric.h"
|
|
#include "metric/ResourceValueMomentMetric.h"
|
|
#include "metric/ResourceValueMomentSecondChanceMetric.h"
|
|
#include "metric/NoMomentMetric.h"
|
|
#include "msg/RoleSwitchAnounce.h"
|
|
#include "common/Defines.h"
|
|
#include "common/container/PeerList.h"
|
|
#include "ms/MemberRegister.h"
|
|
#include "ms/events/ViewChangeEvent.h"
|
|
#include "ms/register/ClusterList.h"
|
|
#include "mt/events/MulticastMessageEnqueuedEvent.h"
|
|
#include "mt/events/MulticastMessageDeliveredEvent.h"
|
|
|
|
#include <fstream>
|
|
|
|
#define DEBUG(msg) if(getLocalState()== DISJOINED){ MOV_DEBUG <<"MRS@TA_"<<getLocalAddress()<<" "<<msg<<endl; } else{ MOV_DEBUG <<"MRS@"<<getLocalID() <<" "<<msg<<endl;}
|
|
|
|
namespace ubeeme {
|
|
namespace moversight {
|
|
|
|
/**
|
|
* @brief Constructor
|
|
* @param Dispatcher : d - Dispatcherreference
|
|
*/
|
|
MaintenanceRoleService::MaintenanceRoleService(Dispatcher & d) :
|
|
MoversightService(d, "MaintanceRoleService"),
|
|
mmMetric(NULL) {
|
|
}
|
|
|
|
/**
|
|
* @brief Copy-Constructor
|
|
* @param orig
|
|
*/
|
|
|
|
MaintenanceRoleService::MaintenanceRoleService(const MaintenanceRoleService& orig) : MoversightService(orig) {
|
|
operator=(orig);
|
|
}
|
|
|
|
/**
|
|
* @brief De-Constructor
|
|
*/
|
|
MaintenanceRoleService::~MaintenanceRoleService() {
|
|
}
|
|
|
|
/**
|
|
* @brief initialise the service
|
|
*/
|
|
void
|
|
MaintenanceRoleService::initialise() {
|
|
dispatcher.subscribe<JoinGroupDoneEvent>(this);
|
|
dispatcher.subscribe<PeerJoinedEvent>(this);
|
|
dispatcher.subscribe<PeerLeftEvent>(this);
|
|
dispatcher.subscribe<FlushDoneEvent>(this);
|
|
dispatcher.subscribe<MulticastMessageEnqueuedEvent>(this);
|
|
dispatcher.subscribe<MulticastMessageDeliveredEvent>(this);
|
|
setMaintanceMomentMetric(dispatcher.getMaintanceMomentMetricType());
|
|
const PeerIDList & masterList = dispatcher.getMembershipService().getMasterPeerIDList();
|
|
mmMetric->initialise(masterList);
|
|
}
|
|
|
|
/**
|
|
* @brief finalise the service
|
|
*/
|
|
void
|
|
MaintenanceRoleService::finalise() {
|
|
dispatcher.unsubscribeAll(this);
|
|
|
|
if (mmMetric != NULL) {
|
|
mmMetric->finalise();
|
|
delete mmMetric;
|
|
mmMetric = NULL;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief Method set the SendMessageSema from the nextview buffer and change the
|
|
* substate to MAINTANCE_FLUSHING
|
|
*/
|
|
void
|
|
MaintenanceRoleService::activatedSendMessageSema() {
|
|
dispatcher.getNextViewBuffer().setLockedSendMsgSemaphoreAndStartFlush();
|
|
}
|
|
|
|
/**
|
|
* @brief Method unset the SendMessageSema from the nextview buffer and change the
|
|
* substate to previously state
|
|
*/
|
|
void
|
|
MaintenanceRoleService::deactivatedSendMessageSema() {
|
|
dispatcher.getNextViewBuffer().unsetLockedSendMsgSemaphoreAndStopFlush();
|
|
}
|
|
|
|
/**
|
|
* @brief method execute a roleswitch
|
|
*/
|
|
void
|
|
MaintenanceRoleService::runRoleSwitch() {
|
|
if (mmMetric->isRoleSwitchNecessary()) {
|
|
// get all master, which need a roleswitch
|
|
const PeerIDList & switchMasterIDList = mmMetric->GetMasterIdListForsRoleSwitch();
|
|
size_t switchMasterIDListSize = switchMasterIDList.size();
|
|
|
|
if (switchMasterIDListSize > 0) {
|
|
MemberRegister & memberReg = dispatcher.getMembershipService().getCurrentMemberRegister();
|
|
|
|
for (size_t i = 0; i < switchMasterIDListSize; i++) {
|
|
PeerID currentMasterID = switchMasterIDList.get(i);
|
|
ClusterID clusterID = memberReg.findPeer(currentMasterID);
|
|
// determine new master
|
|
PeerID newMasterID = memberReg.determinePossibleNewMaster(clusterID);
|
|
|
|
if (newMasterID != currentMasterID) {
|
|
memberReg.selectMaster(clusterID, newMasterID);
|
|
DEBUG("excute RoleSwitch - new master of cluster " << clusterID << " is peer " << newMasterID << "(prev. master was peer" << currentMasterID << ")");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
refreshMasterIdTreshholdMap();
|
|
deactivatedSendMessageSema();
|
|
}
|
|
|
|
/**
|
|
* @brief method update the momentmetric
|
|
* @param MulticastMessage : pdu
|
|
*/
|
|
void
|
|
MaintenanceRoleService::updateMaintanceMomentMetric(const MulticastMessage & pdu, bool globalState) {
|
|
|
|
if (getLocalSubState() != FLUSHING && useMomentMetricGlobalsStates() == globalState) {
|
|
DEBUG("update MaintanceMomentMetric");
|
|
mmMetric->incMetricTresholdAndFire(pdu);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief method refresh the masterlist of the momentmetric
|
|
*/
|
|
void
|
|
MaintenanceRoleService::refreshMasterIdTreshholdMap() {
|
|
const PeerIDList & masterIDList = dispatcher.getMembershipService().getMasterPeerIDList();
|
|
mmMetric->finalise();
|
|
mmMetric->initialise(masterIDList);
|
|
}
|
|
|
|
/**
|
|
* @brief method force an roleswitch for an certain master
|
|
* @param PeerID : masterId
|
|
*/
|
|
void
|
|
MaintenanceRoleService::forceRoleSwitch(PeerID masterId) {
|
|
const PeerIDList & masterIDList = dispatcher.getMembershipService().getMasterPeerIDList();
|
|
|
|
if (!masterIDList.contains(masterId)) {
|
|
return;
|
|
}
|
|
|
|
// determine new master
|
|
MemberRegister & memberReg = dispatcher.getMembershipService().getCurrentMemberRegister();
|
|
ClusterID clusterID = memberReg.findPeer(masterId);
|
|
PeerID newMasterID = memberReg.determinePossibleNewMaster(clusterID);
|
|
|
|
// select new master
|
|
if (newMasterID != masterId) {
|
|
memberReg.selectMaster(clusterID, newMasterID);
|
|
memberReg.printMemberRegister();
|
|
refreshMasterIdTreshholdMap();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief method send a RoleSwitchAnounce message
|
|
* @param PeerID : masterId - Id of the master which need a roleswitch
|
|
*/
|
|
void
|
|
MaintenanceRoleService::sendRSAMessage(PeerID masterId) {
|
|
RoleSwitchAnounce rsa(masterId);
|
|
dispatcher.sendMessage(rsa);
|
|
}
|
|
|
|
/**
|
|
* @brief Assignment Operator
|
|
*/
|
|
MaintenanceRoleService &
|
|
MaintenanceRoleService::operator=(const MaintenanceRoleService & rhs) {
|
|
if (this == &rhs) {
|
|
// handle self assignment
|
|
return *this;
|
|
}//end if
|
|
|
|
mmMetric = rhs.mmMetric->clone();
|
|
//assignment operator
|
|
return *this;
|
|
}
|
|
|
|
/**
|
|
* @brief method ini a certain MaintanceMomentMetric
|
|
* @param MaintanceMomentMetricType : type
|
|
*/
|
|
void
|
|
MaintenanceRoleService::setMaintanceMomentMetric(MaintanceMomentMetricType type) {
|
|
if (mmMetric != NULL) {
|
|
delete mmMetric;
|
|
}//End if
|
|
|
|
switch (type) {
|
|
case NOMOMENTMETRIC:
|
|
mmMetric = new NoMomentMetric(*this);
|
|
break;
|
|
|
|
case MESSAGECOUNTMOMENTMETRIC:
|
|
mmMetric = new MessageCountMomentMetric(*this);
|
|
break;
|
|
|
|
case MESSAGESEQCOUNTMOMENTMETRIC:
|
|
mmMetric = new MessageSeqCountMomentMetric(*this);
|
|
break;
|
|
|
|
case LOGICTIMEMOMENTMETRIC:
|
|
mmMetric = new LogicTimeMomentMetric(*this);
|
|
break;
|
|
|
|
case RESOURCEVALUEMOMENTMETRIC2CHANCE:
|
|
mmMetric = new ResourceValueMomentSecondChanceMetric(*this);
|
|
break;
|
|
|
|
case RESOURCEVALUEMOMENTMETRIC:
|
|
mmMetric = new ResourceValueMomentMetric(*this);
|
|
break;
|
|
|
|
case MESSAGECLUSTERCOUNTMOMENTMETRIC:
|
|
mmMetric = new MessageClusterCountMomentMetric(*this);
|
|
break;
|
|
|
|
default:
|
|
mmMetric = new NoMomentMetric(*this);
|
|
break;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief method indicates that the metric use globalstates like resource value or logic time
|
|
* @return true, if metric use global states, otherwise false
|
|
*/
|
|
bool
|
|
MaintenanceRoleService::useMomentMetricGlobalsStates() {
|
|
return mmMetric->useGlobalStatesVariable();
|
|
}
|
|
|
|
/**
|
|
* @brief Handles the PeerJoined event by updating the master ID threshold map.
|
|
* @param The event to handle.
|
|
*/
|
|
void
|
|
MaintenanceRoleService::handleEvent(const PeerJoinedEvent & /* e */) {
|
|
refreshMasterIdTreshholdMap();
|
|
}
|
|
|
|
/**
|
|
* @brief Handles the PeerLeft event by updating the master ID threshold map.
|
|
* @param The event to handle.
|
|
*/
|
|
void
|
|
MaintenanceRoleService::handleEvent(const PeerLeftEvent & /* e */) {
|
|
refreshMasterIdTreshholdMap();
|
|
}
|
|
|
|
/**
|
|
* @brief Handles the JoinGroupDone event by updating the master ID
|
|
* threshold map and pop the first received message of the next view buffer.
|
|
* @param The event to handle.
|
|
*/
|
|
void
|
|
MaintenanceRoleService::handleEvent(const JoinGroupDoneEvent & /* e */) {
|
|
refreshMasterIdTreshholdMap();
|
|
}
|
|
|
|
/**
|
|
* @brief Handles the FlushDone event by performing the roles switch
|
|
* and cleaning the next view buffer.
|
|
* @param The event to handle.
|
|
*/
|
|
void
|
|
MaintenanceRoleService::handleEvent(const FlushDoneEvent & /* e */) {
|
|
if (getLocalSubState() == FLUSHING) {
|
|
runRoleSwitch();
|
|
}//End if
|
|
}
|
|
|
|
/**
|
|
* @brief Handles the MulticastMessageEnqueued event by updating the next
|
|
* view buffer and the maintenance moment metric.
|
|
* @param e The event to handle
|
|
*/
|
|
void
|
|
MaintenanceRoleService::handleEvent(const MulticastMessageEnqueuedEvent & e) {
|
|
ViewID currentViewID = getViewID();
|
|
ViewID msgViewID = e.getMessage().getViewID();
|
|
|
|
if (currentViewID == msgViewID) {
|
|
if (dispatcher.getNextViewBuffer().isViewChangeAbleMessageType(e.getMessage().getType())) {
|
|
activatedSendMessageSema();
|
|
}
|
|
|
|
updateMaintanceMomentMetric(e.getMessage(), false);
|
|
} else {
|
|
DEBUG("skip Event: " << e)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief Handles the MulticastMessageDelivered event by updating the
|
|
* maintenance moment metric.
|
|
* @param e The event to handle
|
|
*/
|
|
void
|
|
MaintenanceRoleService::handleEvent(const MulticastMessageDeliveredEvent & e) {
|
|
updateMaintanceMomentMetric(e.getMessage(), true);
|
|
}
|
|
|
|
}
|
|
}
|