Files
scandocs/uni/masterarbeit/source/moversight/mt/MessageTransfer.cc
2014-06-30 13:58:10 +02:00

997 lines
31 KiB
C++

/*
* File: MessageTransfer.cc
* Author: jgaebler
*
* Created on May 4, 2010, 5:18 PM
*/
#include "MessageTransfer.h"
#include "Dispatcher.h"
#include "Moversight.h"
#include "common/transport/MessageReference.h"
#include "event/events/FlushDoneEvent.h"
#include "mrs/MaintanceRoleService.h"
#include "mrs/sync/NextViewBuffer.h"
#include "ms/Peer.h"
#include "ms/events/PeerLeftEvent.h"
#include "mt/events/PendingPeersEvent.h"
#include "mt/events/MulticastMessageDeliveredEvent.h"
#include "mt/events/MulticastMessageEnqueuedEvent.h"
#include "mt/events/MulticastMessageDroppedEvent.h"
#include "mt/msg/MulticastMessage.h"
#include "mt/msg/GTMessage.h"
#include "mt/msg/LTMessage.h"
#include "mt/msg/MulticastMessage.h"
#include "mt/timer/MTTimer.h"
#include "mt/timer/LTTimer.h"
#include "mt/timer/GarbageTimer.h"
#include "mt/timer/GTTimer.h"
#include "mt/timer/GTAwaitTimer.h"
#include "ts/TimeService.h"
namespace ubeeme {
namespace moversight {
#undef DEBUG
#define DEBUG(msg) if (module.isPrintDebugMT()) MOV_DEBUG << "MT@" << getLocalID()<< " "<<msg<<std::endl;
/**
* @brief Constructor
* @param d A reference to the moversight dispatcher.
*/
MessageTransfer::MessageTransfer(Dispatcher & d) : MoversightService(d, "MessageTransfer") {
}
/**
* @brief Initialize the message transfer service.
*/
void
MessageTransfer::initialise() {
dispatcher.subscribe<PeerLeftEvent>(this);
sequ = 0;
lastSeenLt = 0;
}
/**
* @brief Finish the operation of the message transfer service.
*/
void
MessageTransfer::finalise() {
dispatcher.unsubscribeAll(this);
std::stringstream buf;
buf << "finalise - timerQueue.size(): " << timerQueue.size()
<< " gtAwaitTimerQueue.size(): " << gtAwaitTimerQueue.size()
<< " queue.size(): " << queue.size()
<< " lastMessageQueue.size(): " << lastMessageQueue.size();
DEBUG(buf.str().c_str());
DEBUG("clean up queues and timers");
stopAllTimers();
cleanMessageQueues();
}
/**
* @brief Destructor
*/
MessageTransfer::~MessageTransfer() {
}
/**
* @brief The common user interface to send a message to a closed group of peers using the message transfer,
* which provides virtual synchrony within the receiving peers.
* @param msg the message to send
*/
void
MessageTransfer::send(MulticastMessage& msg) {
// prepare the message
MessageReference mRef(getLocalID(), sequ);
msg.setMessageReference(mRef);
msg.setSourceID(getLocalID());
msg.setLastHop(getLocalID());
msg.setSequ(sequ);
msg.setLT(getCurrentLogicalTime());
// ensure that the message contains at least the current viewID
msg.setViewID(getViewID());
#if OMNETPP
msg.setByteLength(sizeof msg);
#endif
// enqueue the message for local delivery
enqueue(msg);
// only one member within the group?
if (ms().getNumberOfPeers() == 1) {
//deliver the message instantly
updateQueue(msg);
DEBUG("send - send DT message successfully");
sortAndDeliver();
} else {
//we have to send the message to the group
// are we a master?
if (isLocalPeerMaster()) {
DEBUG("send - send DT message to other masters");
sendToAllMasters(msg);
DEBUG("send - send DT message to slaves (mRef = " << mRef << ", lt = " << getCurrentLogicalTime() << ")");
sendToCluster(msg);
createAndStartGTTimer(mRef, getCurrentLogicalTime());
}
// we are a slave
else {
std::stringstream buf;
buf << "send - send DT message to master (mRef = " << mRef << ", lt = " << getCurrentLogicalTime() << ")";
DEBUG(buf.str().c_str());
sendToMaster(msg);
createAndStartGTAwaitTimer(mRef, getCurrentLogicalTime());
}
//store the message
lastMessageQueue.add(&msg);
}
//generate the next sequence number
sequ++;
}
/**
* @brief This method retransmits the message, referenced by the given timer
* @param timer The expired timer.
* @return True, have it retransmit the message, false otherwise.
*/
bool
MessageTransfer::resendMessageToPeers(MTTimer * const timer) {
bool retVal = false;
MessageReference mref = timer->getReference();
if (timer->getNumberOfMissingPeers() > 0) {
MulticastMessage* msg = lastMessageQueue.find(mref)->dup();
if (msg != NULL) {
//no
if (timer->getNumberOfRetries() > 0) {
DEBUG("resendMessageToPeers - resend msg " << msg->getMessageReference() << " to peers");
msg->setLastHop(getLocalID());
sendTo(*msg, timer->getMissingPeers());
timer->restart();
retVal = true;
}//End if
delete msg;
}//end if
}//End if
return retVal;
}
/**
* @brief This method handles an expired GarbageTimer.
* The method removes the GT message from the lastMessageQueue, associated with this timer.
* @param timer the expired timer
*/
void
MessageTransfer::handleGarbageTimer(MTTimer * timer) {
if (timer != NULL) {
MessageReference mRef(timer->getReference());
std::stringstream buf;
buf << "handleGarbageTimer - run garbage routine ( mRef: " << mRef << ")";
DEBUG(buf.str().c_str());
lastMessageQueue.remove(mRef);
stopAndDeleteTimer(mRef);
}//End if
}
/**
* @brief This method handles an expired GTAwait timer.
*
* This method handles an expired GTAwait timer. It is scheduled after receiving a DT message and resends the lt message for a dt to the master,
* if possible, otherwise it removes the received DT message from the queue and gives up.
* @param timer the expired timer
*/
void
MessageTransfer::handleGTAwaitTimer(MTTimer * timer) {
if (timer == NULL) return;
int retries = timer->getNumberOfRetries();
if (retries > 0) {
//resend the last message again
MessageReference mRef = timer->getReference();
MulticastMessage* pdu = lastMessageQueue.find(mRef);
//we are the sender of the message ?
if (pdu != NULL && pdu->getSourceID() == getLocalID()) {
sendToMaster(*pdu);
}//end if
else {
resendLTMessageToPeer(mRef);
}//end else
timer->restart();
}//End if retries > 0
else {
//max retries reached, give up
DEBUG("handleGTAwaitTimer - maximum number of retries for requesting GT message from master reached - give up (mRef: " << timer->getReference() << ")");
DEBUG("handleGTAwaitTimer - remove timer and message from queues");
MessageReference mRef(timer->getReference());
queue.remove(mRef);
stopAndDeleteTimer(mRef);
//if this peer the sender of the message, we have to remove it
lastMessageQueue.remove(mRef);
}//End else
}//End handleGTAwaitTimer
/**
* @brief This method handles an expired GT timer.
* @param timer The expired timer.
*/
void
MessageTransfer::handleGTTimer(MTTimer* timer) {
if (timer == NULL) return;
// master has not collected all desired lt messages
// and can retransmit the message??
if (resendMessageToPeers(timer)) {
// yes
return;
}//End if
// update local clock
MessageReference mref = timer->getReference();
MulticastMessage* msg = lastMessageQueue.find(mref);
//i know the message
if (msg != NULL) {
VirtualLogicalTime lt(getCurrentLogicalTime());
lt.update(VirtualLogicalTime(std::max(lt, timer->getLt())));
msg->setLT(lt);
GTMessage pdu(*msg, getLocalID());
// copy missing peers
copyMissingPeersAndForeignMissingPeersToMessage(&pdu, timer);
{
std::stringstream buf;
buf << "handleGTTimer - number of missing peers in group: " << pdu.getMissingPeerSize();
DEBUG(buf.str().c_str());
}
// update the message stored in the last message queue,
// needed for error handling (if a slave misses an lt/gt message)
updateMessage(msg, lt, timer);
{
std::stringstream buf;
buf << "handleGTTimer - send GT message to peers (mRef = " << mref << ", gt = " << lt << ")";
DEBUG(buf.str().c_str());
}
// update the local time
ts().update(lt);
sendToAllMasters(pdu);
sendToCluster(pdu);
DEBUG("handleGTTimer - send DT message successfully (mRef = " << mref << ")");
updateQueue(*msg);
sortAndDeliver();
// delete GT timer
stopAndDeleteTimer(mref);
//create and start GarbageTimer
createAndStartGarbageTimer(mref);
}//End if
}
/**
* @brief This method handles an expired LT timer.
* @param timer The expired timer.
*/
void
MessageTransfer::handleLTTimer(MTTimer * timer) {
std::stringstream buf;
buf << "handleLTTimer - handle LT timer (mRef: " << timer->getReference() << ")";
DEBUG(buf.str().c_str());
if (timer == NULL) return;
if (resendMessageToPeers(timer)) {
return;
}//End if
MessageReference mRef = timer->getReference();
MulticastMessage* msg = lastMessageQueue.find(mRef);
VirtualLogicalTime timerLt = timer->getLt();
//send lt message to sender
if (msg != NULL) {
VirtualLogicalTime aLt = std::max(getCurrentLogicalTime().getValue(),
timerLt.getValue());
LTMessage pdu(mRef, getLocalID(), msg->getSequ(), aLt);
DEBUG("handleLTTimer - number of missing peers: " << timer->getNumberOfMissingPeers());
pdu.setMissingPeers(timer->getMissingPeers());
std::stringstream buf2;
buf2 << "handleLTTimer - send LT message to sender (mRef = " << mRef << ", lt = " << aLt << ", destination = " << msg->getLastHop() << ")";
DEBUG(buf2.str().c_str());
PeerID lastHopID(msg->getLastHop());
sendTo(pdu, lastHopID);
//delete the lt timer
stopAndDeleteTimer(mRef);
createAndStartGarbageTimer(mRef);
}//End if
}
/**
* @brief This method handles a incoming DT peer message.
* @param pdu The received peer message.
*/
void
MessageTransfer::handleMessage(MulticastMessage* pdu) {
ts().incCurrentLogicalTime();
MessageReference mRef = pdu->getMessageReference();
//message already seen?
if (queue.contains(mRef)) {
DEBUG("handleDTMessage - duplicated DT message received (mRef: " << mRef << ")");
resendLTMessageToPeer(mRef);
return;
}//End if
//master
if (isLocalPeerMaster()) {
DEBUG("handleDTMessage - send DT message to slaves (mRef: " << mRef << ")");
PeerID lastHop(pdu->getLastHop());
PeerIDList recList = createReceiverList(lastHop);
MulticastMessage * pack = pdu->dup();
pack->setLastHop(getLocalID());
#if OMNETPP
pack->setByteLength(sizeof * pack);
#endif
sendTo(*pack, recList);
delete pack;
//we are the master of the sending node?
MTTimer* timer;
bool primaryMaster = isPrimaryMaster(lastHop);
//yes
if (primaryMaster) {
//this is the primary master
timer = new GTTimer(*this);
}//End if
//no
else {
timer = new LTTimer(*this);
//this is a secondary master, we have to ensure the receiving of the GT message
createAndStartGTAwaitTimer(mRef, getCurrentLogicalTime());
}//end else
setupTimer(timer, mRef, recList, getCurrentLogicalTime());
timer->start();
//add the message
lastMessageQueue.add(pdu);
//shortcut for a group with one (inviting another one)
//this shortens the data delivery latency
//not necessary, but nice
if (recList.size() == 0) {
enqueue(*pdu);
if (primaryMaster) {
handleGTTimer(timer);
} else {
handleLTTimer(timer);
}
return;
}
}//end if
//slave && not sender
else {
DEBUG("handleDTMessage - DT message received (mRef: " << mRef << ")");
LTMessage pack(mRef, getLocalID(), pdu->getSequ(), getCurrentLogicalTime());
std::stringstream buf;
buf << "handleDTMessage - send LT message to master " << getLocalMasterID() << " (mRef = " << mRef << ", lt = " << getCurrentLogicalTime() << ")";
DEBUG(buf.str().c_str());
sendTo(pack, getLocalMasterID());
//start gt await timer for ensure gt receiving as slave
createAndStartGTAwaitTimer(mRef, getCurrentLogicalTime());
}//End else
enqueue(*pdu);
}
/**
* @brief This method handles a incoming LT peer message.
* @param pdu The received peer message.
*/
void
MessageTransfer::handleMessage(LTMessage* pdu) {
MessageReference mRef = pdu->getMessageReference();
std::stringstream buf;
buf << "handleLTMessage - LT message received from " << pdu->getLastHop() << " (mRef: " << mRef << ", lt: " << pdu->getLT() << ")";
DEBUG(buf.str().c_str());
//for this message runs no GT/LT or gtAwait timer --> message handling has already been finished
if (!timerQueue.find(mRef) && !gtAwaitTimerQueue.find(mRef)) {
resendGTMessageToPeer(mRef, pdu);
}//End if
else {
updateMissingPeersInLastMessageQueue(pdu);
//update Timer, if present
updateTimer(pdu);
bool retVal = false;
retVal = isLTCollectingCompleted(pdu->getMessageReference());
if (retVal == true) {
DEBUG("handleLTMessage - stop running timers and finish message processing (mRef: " << mRef << ")");
MTTimer * timer = stopLTGTOrGarbageTimer(pdu->getMessageReference());
//check for LTTimer or GTTimer, no GarbageTimer
if (dynamic_cast<GarbageTimer*>(timer) == NULL) {
//let the timer run out next to now
timer->stop();
timer->setTimeout(0.1);
timer->start();
}//End if
}//if
}//End else
}
/**
* @brief This method handles a incoming GT peer message
* @param pdu The received peer message.
*/
void
MessageTransfer::handleMessage(GTMessage * pdu) {
ts().incCurrentLogicalTime();
MessageReference mRef(pdu->getMessageReference());
//the peer have to stop the GtAwaitTimer
stopAndDeleteTimer(mRef);
if (isLocalPeerMaster()) {
std::stringstream buf;
buf << "handleGTMessage - send GT message to slaves (mRef = " << mRef << ")";
DEBUG(buf.str().c_str());
PeerID lastHopID(pdu->getLastHop());
PeerIDList recList = createReceiverList(lastHopID);
GTMessage pack(*pdu);
pack.setSourceID(getLocalID());
sendTo(pack, recList);
}//End if
//master and slave
updateQueue(*pdu);
sortAndDeliver();
//sender
if (mRef.getPeerID() == getLocalID()) {
//remove the message
lastMessageQueue.remove(mRef);
}//End if
ts().update(VirtualLogicalTime(std::max(getCurrentLogicalTime(),
VirtualLogicalTime(pdu->getLT() + 1))
));
std::stringstream buf;
buf << "handleGTMessage - update lt to " << getCurrentLogicalTime();
DEBUG(buf.str().c_str());
}
/**
* @brief This method enqueues a received DT peer message into the delivery queue.
* The stored message is marked as not deliverable.
* @param pdu the received peer message
*/
void
MessageTransfer::enqueue(const MulticastMessage& pdu) {
MessageReference mRef = pdu.getMessageReference();
if (queue.contains(mRef)) {
DEBUG("enqueue - PDU already enqueued");
} else {
LTNode node;
node.setMessageReference(mRef);
node.setPeerID(mRef.getPeerID());
node.setSequ(mRef.getSequenceNumber());
node.setLT(getCurrentLogicalTime());
node.setMessage(pdu.dup());
std::stringstream buf;
buf << "enqueue - enqueue PDU (mRef = " << node.getMessageReference() << ", peerID: "
<< node.getPeerID() << ", sequ: " << node.getSequ() << ")";
DEBUG(buf.str().c_str());
queue.add(node);
dispatcher.signal(new MulticastMessageEnqueuedEvent(pdu.dup()));
}//End else
}
/**
* @brief This method updates a stored peer message within the delivery queue from the given peer message and marks this message as deliverable.
* @param pdu The peer message corresponding to the stored message, used for the update
*/
void
MessageTransfer::updateQueue(const MoversightMessage& pdu) {
MessageReference mref(pdu.getMessageReference());
LTNode* node = queue.find(mref);
if (node != NULL) {
std::stringstream buf;
buf << "updateQueue - update PDU (mRef = " << node->getMessageReference()
<< ", peerID: " << node->getPeerID() << ", sequ: " << node->getSequ() << ")";
DEBUG(buf.str().c_str());
VirtualLogicalTime lt(pdu.getLT());
node->setLT(lt);
node->setDeliverable(true);
node->getMessage()->setViewID(pdu.getViewID());
// update missing peers
node->getMessage()->setMissingPeers(pdu.getMissingPeers());
} else {
DEBUG("updateQueue - PDU not enqueued ");
}
}
/**
* @brief This method sorts the delivery queue according the receiving time of each message.
* Is the first message within the queue marked as deliverable, this message is delivered to the moversight layer.
*/
void
MessageTransfer::sortAndDeliver() {
queue.sort();
while (queue.size() > 0 && queue.front().isDeliverable()) {
MulticastMessage * pdu = queue.front().getMessage();
size_t missingPeersCount = pdu->getMissingPeerSize();
std::stringstream buf;
buf << "sortAndDeliver - deliver DT ("
<< "mRef = " << queue.front().getMessageReference()
<< ", peerID: " << queue.front().getPeerID()
<< ", sequ: " << queue.front().getSequ()
<< ", lt: " << queue.front().getLT()
<< ", missing peers: " << missingPeersCount << ")";
DEBUG(buf.str().c_str());
//deliver message to moversight
//remember the time of the last seen message
lastSeenLt = pdu->getLogicalDeliveryTime();
//check sending view property again
if (getViewID() == pdu->getViewID()) {
// avoid losing gt from later sorted msg
if (getLocalSubState() == FLUSHING &&
dispatcher.getNextViewBuffer().isViewChangeAbleMessageType(pdu->getType())) {
DEBUG("READY to Flush?");
for (size_t i = 0; i < queue.size(); i++) {
if (!queue.get(i).isDeliverable()) {
return;
}
}
}
//deliver the message
pdu->handleDeliver(dispatcher, pdu->getMissingPeers());
dispatcher.getResourceValueDistributor().estimateAllPeerResources(pdu);
dispatcher.signal(new MulticastMessageDeliveredEvent(pdu->dup()));
if (missingPeersCount > 0) {
dispatcher.signal(new PendingPeersEvent(pdu->getMissingPeers()));
}//End if
}//End if
else {
DEBUG("sortAndDeliver - drop message, because sending view property does not match (mRef: " << pdu->getMessageReference() << ")");
dispatcher.signal(new MulticastMessageDroppedEvent(pdu->dup()));
}//end else
//clean up
queue.dequeue();
if (getLocalSubState() == FLUSHING) {
if (queue.isEmpty()) {
dispatcher.signal(new FlushDoneEvent());
}//End if
}//End if
}//End while
}
/**
* @brief This method updates the missing peers in the last message queue from the given peer message.
* @param pdu the message used to update the missing peers set of the message, stored in the last message queue.
*/
void
MessageTransfer::updateMissingPeersInLastMessageQueue(const MoversightMessage * const pdu) {
if (pdu->getMissingPeerSize() > 0) {
MoversightMessage* msg = lastMessageQueue.find(pdu->getMessageReference());
if (msg != NULL) {
PeerIDList missingPeers = msg->getMissingPeers();
missingPeers.add(pdu->getMissingPeers());
msg->setMissingPeers(missingPeers);
}//End if
}//End if
}
/**
* @brief Update the timer corresponding to the given message (via the message reference field).
* @param pdu The received message used to update the lt field and the set of missing peers of the timer.
*/
void
MessageTransfer::updateTimer(const MoversightMessage * const pdu) {
//search the timer and run the update
MessageReference mref(pdu->getMessageReference());
MTTimer* timer = timerQueue.find(mref);
if (timer != NULL) {
updateTimer(timer, pdu);
}//End if
}
/**
* @brief Updates a given timer from a peer message. The method updates the lt field and the set of missing peers.
* @param timer The timer to update.
* @param pdu The received peer message used to update the timer.
*/
void
MessageTransfer::updateTimer(MTTimer * const timer, const MoversightMessage * const pdu) {
// error case, do nothing
if (timer == NULL || pdu == NULL) {
return;
}//End if
timer->removeMissingPeer(pdu->getLastHop());
VirtualLogicalTime lt = timer->getLt() + 1;
lt = std::max(lt, pdu->getLT());
timer->setLt(lt);
// copy missing peers
timer->setForeignMissingPeers(pdu->getMissingPeers());
}
/**
* @brief This method updates a message. For the msg, the logical time and the missing peers (from the timer instance)
* are updated. The message is usually stored within the last message queue and the updated values are needed to handle repeated LT/GT request be the slaves.
* @param msg the peer message to update
* @param aLt the logical time to set
* @param timer the timer knows the missing peers
*/
void
MessageTransfer::updateMessage(MulticastMessage* msg, VirtualLogicalTime & aLt, MTTimer* timer) {
if (msg != NULL && timer != NULL) {
msg->setLT(aLt);
copyMissingPeersAndForeignMissingPeersToMessage(msg, timer);
}//End if
}//End updateMessage
/**
* @brief Setups the timer from the given values. Does nothing, is the timer NULL.
* @param timer the timer to setup
* @param mRef the message reference of the timer
* @param missedPeer the peer for which the timer is waiting
* @param aLt the start time of the timer
* @return the timer
*/
MTTimer*
MessageTransfer::setupTimer(MTTimer * timer, MessageReference & mRef, PeerID missedPeer, const VirtualLogicalTime & aLt) {
PeerIDList list;
list.add(missedPeer);
return setupTimer(timer, mRef, list, aLt);
}//End setupTimer
/**
* @brief Setups the timer from the given values. Does nothing, is the timer NULL.
* @param timer the timer to setup
* @param mRef the message reference of the timer
* @param missingPeers the list of peers for which the timer is waiting
* @param aLt the start time of the timer
* @return the timer
*/
MTTimer*
MessageTransfer::setupTimer(MTTimer* timer, MessageReference & mRef, PeerIDList & missingPeers, const VirtualLogicalTime & aLt) {
if (timer != NULL) {
timer->setLt(aLt.getValue());
timer->setReference(mRef);
timer->setMissingPeers(missingPeers);
}//End if
return timer;
}//End setupTimer
/**
* @brief Resends a LT message to the master, based on the previously received DT message, identified by the given message reference.
* @param mRef the message reference, which identifies the previously received DT message
*/
void
MessageTransfer::resendLTMessageToPeer(MessageReference & mRef) {
LTNode* node = queue.find(mRef);
if (node != NULL) {
//recreate LT message and send it back to sender
VirtualLogicalTime aLt(node->getLT());
LTMessage pack(mRef, getLocalID(), node->getMessage()->getSequ(), aLt);
std::stringstream buf;
buf << "resendLTMessageToPeer - resend LT message to master (mRef = " << mRef << ",lt = " << node->getLT() << ")";
DEBUG(buf.str().c_str());
PeerID pId(node->getMessage()->getLastHop());
if (dispatcher.getMembershipService().getCurrentMemberRegister().contains(pId)) {
sendTo(pack, pId);
}
}//End if
else {
std::stringstream buf;
buf << "resendLTMessageToPeer - resend LT message failed, because message not present in queue (mRef: " << mRef << ")";
DEBUG(buf.str().c_str());
}//End else
}
/**
* @brief Resends a already send GT message to a single requesting peer, in case, the garbage timer is still running, otherwise, it does nothing.
* @param mref the message reference of the associated DT message
* @param pdu the repeated LT message of the slave
*/
void
MessageTransfer::resendGTMessageToPeer(const MessageReference& mref, const LTMessage * const pdu) {
MulticastMessage* msg = lastMessageQueue.find(mref);
//we still have a copy of the message? (garbage timer is still running)
if (msg != NULL && pdu != NULL) {
std::stringstream buf;
buf << "resendGTMessageToPeer - delayed LT message received for message " << pdu->getMessageReference();
DEBUG(buf.str().c_str());
buf.clear();
buf << "resendGTMessageToPeer - send GT message again to peer " << pdu->getLastHop();
DEBUG(buf.str().c_str());
GTMessage pack(*msg, getLocalID());
pack.setMissingPeers(msg->getMissingPeers());
sendTo(pack, pdu->getLastHop());
}//End if msg != NULL
}
/**
* @brief Copy the missing peers and foreign missing peers from the timer into the peer message.
* @param timer the source for the copy operation
* @param pdu the destination of the copy operation
*/
void
MessageTransfer::copyMissingPeersAndForeignMissingPeersToMessage(MoversightMessage * pdu, MTTimer * timer) {
if (timer == NULL || pdu == NULL) return;
if (timer->getMissingPeers().size() == 0 &&
timer->getForeignMissingPeers().size() == 0) return;
PeerIDList missingPeers;
missingPeers.add(timer->getMissingPeers());
missingPeers.add(timer->getForeignMissingPeers());
pdu->setMissingPeers(missingPeers);
}
/**
* @brief Creates and starts a GT Await Timer, waiting of a gt from the master peer.
* @param messageRef the message reference of the corresponding message
* @param alt the virtual logical start time of the timer
*/
void
MessageTransfer::createAndStartGTAwaitTimer(MessageReference & messageRef, const VirtualLogicalTime & alt) {
MTTimer* timer = new GTAwaitTimer(*this);
PeerID pId = getLocalMasterID();
setupTimer(timer, messageRef, pId, alt);
timer->start();
}
/**
* @brief Creates and starts a GT Timer, waiting a period of time for incoming lt messages of the slaves, before determine and send the global gt.
* @param messageRef the message reference of the corresponding message
* @param alt the virtual logical start time of the timer
*/
void
MessageTransfer::createAndStartGTTimer(MessageReference & messageRef, const VirtualLogicalTime & alt) {
PeerIDList list = ms().getClusterPeerIDListSlavesOnly(getLocalPeer().getClusterID());
PeerIDList maList = ms().getMasterPeerIDList();
PeerID myID = getLocalID();
for (size_t i = 0; i < maList.size(); i++) {
PeerID id = maList.get(i);
if (!(id == myID)) {
list.add(id);
}//End if
}
MTTimer* timer = new GTTimer(*this);
setupTimer(timer, messageRef, list, alt);
timer->start();
}
/**
* @brief Stops all running GT, LT or Garbage timer, associated with the given message reference.
* @param mRef the message reference of the message associated with the timer to stop
* @returns the stopped timer
*/
MTTimer *
MessageTransfer::stopLTGTOrGarbageTimer(const MessageReference & mRef) {
MTTimer* timer = timerQueue.find(mRef);
if (timer != NULL) {
timer->stop();
return timer;
}//end if
throw NullPointerException("timer not found");
}
/**
* @brief Creates a GarbageTimer instance and starts this instance, expect the peer is DISJOINED.
* @param mRef the message reference, associated with this timer
*/
void
MessageTransfer::createAndStartGarbageTimer(MessageReference & mRef) {
if (getLocalPeer().getPeerState() != DISJOINED) {
MTTimer* timer = new GarbageTimer(*this);
timer->setReference(mRef);
timer->start();
}//End if
else {
//we have finalized our service, do nothing
}
}
/**
* @brief Checks, if the a timer scheduled for the given message reference. In that case, the timer is stopped and deleted from the corresponding timer queue.
* @param mRef the message reference of the timer
*/
void
MessageTransfer::stopAndDeleteTimer(MessageReference & mRef) {
//check for lt and gt timer or garbage timer
MTTimer* ltGtGarbageTimer = timerQueue.find(mRef);
if (ltGtGarbageTimer != NULL) {
ltGtGarbageTimer->stop();
delete ltGtGarbageTimer;
}//End if
//check for gtAwait timer
MTTimer* gtAwaitTimer = gtAwaitTimerQueue.find(mRef);
if (gtAwaitTimer != NULL) {
gtAwaitTimer->stop();
delete gtAwaitTimer;
}//End if
}
/**
* @brief Permits access to the moversight membership service.
* @return The moversight membership service.
*/
MembershipService &
MessageTransfer::ms() {
return dispatcher.getMembershipService();
}
/**
* @brief Permits access to the moversight time service.
* @return The moversight time service.
*/
TimeService &
MessageTransfer::ts() {
return dispatcher.getTimeService();
}
/**
* @brief Checks, if the LT collecting completed for the given timer.
* The result is derived from the missingPeers and foreignMissingPeers lists of the timer.
*
* @param mRef the message reference of the corresponding message, identifying the timer.
*/
bool
MessageTransfer::isLTCollectingCompleted(const MessageReference & mRef) {
MTTimer* timer = timerQueue.find(mRef);
if (timer != NULL) {
if (!(timer->getNumberOfMissingPeers() > 0) &&
!(timer->getNumberOfForeignMissingPeers() > 0)) {
return true;
}
}
return false;
}
/**
* @brief This method cleans all timer queues by stopping and deleting
* all running timers (LT-, GT- and GTAwaitTimers).
*/
void
MessageTransfer::stopAllTimers() {
while (timerQueue.size() > 0) {
MTTimer * timer = timerQueue.front();
if (timer != NULL) {
timer->stop();
delete timer;
}//End if
}//End while
while (gtAwaitTimerQueue.size() > 0) {
MTTimer * timer = gtAwaitTimerQueue.front();
if (timer != NULL) {
timer->stop();
delete timer;
}//End if
}//End while
}
/**
* @brief The method cleans the message queue by deleting all stored
* peer messages. The lastMessageQueue, which stores all send messages,
* is cleaned to.
*/
void
MessageTransfer::cleanMessageQueues() {
while (!queue.isEmpty()) {
LTNode node = queue.front();
if (node.getMessage() != NULL) {
delete node.getMessage();
}
MessageReference mRef = node.getMessageReference();
queue.remove(mRef);
}//End while
lastMessageQueue.clear();
}
/**
* @brief Handles a peer left event. The method removes references to the removed peer
* from the queues used within the message transfer module.
* @param leftPeer The left peer.
*/
void
MessageTransfer::handleEvent(const PeerLeftEvent & e) {
//clean up all the left peer from all queues
for (size_t i = 0; i < queue.size(); i++) {
queue.get(i).getMessage()->removeMissingPeer(e.getPeer().getPeerID());
}
for (size_t i = 0; i < timerQueue.size(); i++) {
timerQueue.get(i)->removeMissingPeer(e.getPeer().getPeerID());
timerQueue.get(i)->removeForeignMissingPeer(e.getPeer().getPeerID());
}
}
/**
* @brief Assignment operator.
* @param other The instance to assign.
* @return A reference of the current object.
*/
MessageTransfer & MessageTransfer::operator=(const MessageTransfer & other) {
if (this == &other) {
return *this;
}
MoversightService::operator=(other);
queue = other.queue;
lastMessageQueue = other.lastMessageQueue;
timerQueue = other.timerQueue;
gtAwaitTimerQueue = other.gtAwaitTimerQueue;
lastSeenLt = other.lastSeenLt;
sequ = other.sequ;
return *this;
}
/**
* @brief Determines, if the current peer master of the sending one.
* @param lastHop The ID of the sending peer.
*/
bool
MessageTransfer::isPrimaryMaster(PeerID lastHop) {
if (isLocalPeerMaster()) {
return !(getMasterIDList().contains(lastHop));
}
return false;
}
/**
* @brief Returns the last seen logical time for a delivered message.
* @return The last seen logical time of a delivered message
*/
const VirtualLogicalTime &
MessageTransfer::getLastSeenLogicalTime() const {
return lastSeenLt;
}
}
}