Files
scandocs/uni/masterarbeit/source/moversight/rvd/ResourceValueDistributor.cc
2014-06-30 13:58:10 +02:00

208 lines
6.9 KiB
C++

#include "ResourceValueDistributor.h"
#include "rvd/profil/TimerResourceValueProfile.h"
#include "rvd/metric/ResourceValueMetric.h"
#include "rvd/metric/DeltaResourceValueMetric.h"
#include "rvd/msg/ResourceValueUpdate.h"
#include "rvd/profil/MessageResourceValueProfile.h"
#include "rvd/profil/ResourceValueProfile.h"
#include "Moversight.h"
#include "Dispatcher.h"
#define DEBUG(msg) if(getLocalState()== DISJOINED){ MOV_DEBUG <<"RVD@TA_"<<getLocalAddress()<<" "<<msg<<endl; } else{ MOV_DEBUG <<"RVD@"<<getLocalID() <<" "<<msg<<endl;}
namespace ubeeme {
namespace moversight {
/**
* @brief Constructor
* @param reference of the dispatcher
*/
ResourceValueDistributor::ResourceValueDistributor(Dispatcher & d) : MoversightService(d, "ResourceValueDistributor"), rvp(NULL) {
}
/**
* @brief De-Constructor
*/
ResourceValueDistributor::~ResourceValueDistributor() {
//dtor
}
/**
* @brief Copy-Constructor
* @param source object
*/
ResourceValueDistributor::ResourceValueDistributor(const ResourceValueDistributor& other) : MoversightService(other) {
operator=(other);
}
/** Assignment operator
* @param other Object to assign from
* @return A reference to this
*/
ResourceValueDistributor & ResourceValueDistributor::operator=(const ResourceValueDistributor& rhs) {
if (this == &rhs) {
return *this;
}
metric = rhs.metric->clone();
rvp = rhs.rvp->clone();
resources = rhs.resources;
return *this;
}
/**
* @brief initialize the distributor
*/
void
ResourceValueDistributor::initialise() {
resources = module.getPeerResources();
if (rvp != NULL) {
delete rvp;
}//End if
//rvp = new MessageResourceValueProfile(*this, dispatcher);
rvp = new TimerResourceValueProfile(*this);
metric = new DeltaResourceValueMetric(resources);
}
/**
* @brief finalize the distributor
*/
void
ResourceValueDistributor::finalise() {
dispatcher.unsubscribeAll(this);
if (rvp != NULL) {
delete rvp;
rvp = NULL;
}
if (metric != NULL) {
delete metric;
metric = NULL;
}
}
/**
* @brief method send a resource value update messga to the group
* @param the resource value which the peer wanna tell to the group
*/
void
ResourceValueDistributor::sendResourceValueUpdate(const PeerResources & peerResources) {
PeerID localID = getLocalID();
if (dispatcher.getMembershipService().getPeerIDList().contains(localID) && getState(localID) == JOINED) {
DEBUG("it is necessary to send a resource value update to the group");
ResourceValueUpdate rvu(peerResources, getLocalID());
dispatcher.sendMessage(rvu);
}
}
/**
* @brief method set resource value of the distributor and send this value to the group
* @param the resource value
*/
void
ResourceValueDistributor::setAndSendResourceValue(const PeerResources & peerResources) {
resources.setResourceValue(peerResources.getResourceValue());
DEBUG("current ressource value of is" << resources);
if (metric->shouldSendResourceValueMessage(resources)) {
sendResourceValueUpdate(resources);
metric->setReferenceResourceValue(resources);
}
}
/**
* @brief Get the current resource value of the distributor(lokal peer)
* @return the resource value
*/
const PeerResources &
ResourceValueDistributor::getCurrentResourceValue() {
return resources;
}
/**
* @brief Method estimate and set the reource value of all peers depends a multicast messages
* @param msg is the messages, which help to estimate the ressource value
*/
void
ResourceValueDistributor::estimateAllPeerResources(MulticastMessage * msg) {
ClusterList & clusterList = dispatcher.getMembershipService().getCurrentMemberRegister().getClusters();
for (size_t i = 0; i < clusterList.size(); i++) {
estimateClusterPeerResources(clusterList.get(i), msg);
}
}
/**
* @brief Method estimate and set the reource value of a cluster depends a multicast messages
* @param msg is the messages, which help to estimate the ressource value
*/
void ResourceValueDistributor::estimateClusterPeerResources(Cluster& cluster, MulticastMessage * msg)
{
estimatePeerListPeerResources(cluster.getPeerList(),msg);
}
/**
* @brief Method estimate and set the reource value of a certian peer list depends a multicast messages
* @param msg is the messages, which help to estimate the ressource value
*/
void ResourceValueDistributor::estimatePeerListPeerResources(PeerList& peerList, MulticastMessage * msg)
{
for (size_t i = 0; i < peerList.size(); i++) {
estimatePeerPeerResources(peerList.get(i),msg);
}
}
/**
* @brief Method estimate and set the reource value a peers depends a multicast messages
* @param msg is the messages, which help to estimate the ressource value
*/
void ResourceValueDistributor::estimatePeerPeerResources(Peer& peer, MulticastMessage * msg)
{
PeerResources estimatedPeerResources = estimatePeerResourcesByMessagesStatic(peer, msg, resources,dispatcher.getMembershipService().getCurrentMemberRegister());
peer.setPeerResources(estimatedPeerResources);
if (peer.getPeerID() == getLocalID()) {
metric->setReferenceResourceValue(estimatedPeerResources);
}
DEBUG("estimated resource vaule from " << peer.getPeerID() << " is " << estimatedPeerResources)
}
/**
* @brief Method estimate and set the reource value a peers depends a multicast messages
* @param msg is the messages, which help to estimate the ressource value
*/
PeerResources ResourceValueDistributor::estimatePeerResourcesByMessagesStatic(const Peer & peer, const MulticastMessage * msg, const PeerResources & currentResources, MemberRegister & memberReg)
{
bool isPeerMaster = peer.isMaster();
PeerResources::ResourceValue nRsv = 0;
PeerResources::ResourceValue rsv = currentResources.getResourceValue();
ClusterID clusterID = peer.getClusterID();
bool existCluster = memberReg.getClusters().contains(clusterID);
if (isPeerMaster && existCluster) {
// calculate the load for a primary master
size_t clusteSize = memberReg.getCluster(clusterID).size();
size_t clusterCount = memberReg.getNumberOfCluster();
nRsv = 2 * (clusteSize + clusterCount - 2);
}
if (!isPeerMaster || (existCluster && !memberReg.getCluster(clusterID).contains(msg->getSourceID()))) {
// increment for non master or 2nd master
nRsv++;
}
if (nRsv > rsv || rsv < 1) {
rsv = 0;
} else {
rsv -= nRsv;
}
PeerResources estimatedPeerResources(rsv);
return estimatedPeerResources;
}
}
}