997 lines
31 KiB
C++
997 lines
31 KiB
C++
/*
|
|
* File: MessageTransfer.cc
|
|
* Author: jgaebler
|
|
*
|
|
* Created on May 4, 2010, 5:18 PM
|
|
*/
|
|
|
|
#include "MessageTransfer.h"
|
|
|
|
|
|
#include "Dispatcher.h"
|
|
#include "Moversight.h"
|
|
#include "common/transport/MessageReference.h"
|
|
#include "event/events/FlushDoneEvent.h"
|
|
#include "mrs/MaintanceRoleService.h"
|
|
#include "mrs/sync/NextViewBuffer.h"
|
|
#include "ms/Peer.h"
|
|
#include "ms/events/PeerLeftEvent.h"
|
|
#include "mt/events/PendingPeersEvent.h"
|
|
#include "mt/events/MulticastMessageDeliveredEvent.h"
|
|
#include "mt/events/MulticastMessageEnqueuedEvent.h"
|
|
#include "mt/events/MulticastMessageDroppedEvent.h"
|
|
#include "mt/msg/MulticastMessage.h"
|
|
#include "mt/msg/GTMessage.h"
|
|
#include "mt/msg/LTMessage.h"
|
|
#include "mt/msg/MulticastMessage.h"
|
|
#include "mt/timer/MTTimer.h"
|
|
#include "mt/timer/LTTimer.h"
|
|
#include "mt/timer/GarbageTimer.h"
|
|
#include "mt/timer/GTTimer.h"
|
|
#include "mt/timer/GTAwaitTimer.h"
|
|
#include "ts/TimeService.h"
|
|
|
|
|
|
|
|
namespace ubeeme {
|
|
namespace moversight {
|
|
|
|
#undef DEBUG
|
|
#define DEBUG(msg) if (module.isPrintDebugMT()) MOV_DEBUG << "MT@" << getLocalID()<< " "<<msg<<std::endl;
|
|
|
|
/**
|
|
* @brief Constructor
|
|
* @param d A reference to the moversight dispatcher.
|
|
*/
|
|
MessageTransfer::MessageTransfer(Dispatcher & d) : MoversightService(d, "MessageTransfer") {
|
|
}
|
|
|
|
/**
|
|
* @brief Initialize the message transfer service.
|
|
*/
|
|
void
|
|
MessageTransfer::initialise() {
|
|
dispatcher.subscribe<PeerLeftEvent>(this);
|
|
sequ = 0;
|
|
lastSeenLt = 0;
|
|
}
|
|
|
|
/**
|
|
* @brief Finish the operation of the message transfer service.
|
|
*/
|
|
void
|
|
MessageTransfer::finalise() {
|
|
dispatcher.unsubscribeAll(this);
|
|
std::stringstream buf;
|
|
buf << "finalise - timerQueue.size(): " << timerQueue.size()
|
|
<< " gtAwaitTimerQueue.size(): " << gtAwaitTimerQueue.size()
|
|
<< " queue.size(): " << queue.size()
|
|
<< " lastMessageQueue.size(): " << lastMessageQueue.size();
|
|
DEBUG(buf.str().c_str());
|
|
DEBUG("clean up queues and timers");
|
|
stopAllTimers();
|
|
cleanMessageQueues();
|
|
}
|
|
|
|
/**
|
|
* @brief Destructor
|
|
*/
|
|
MessageTransfer::~MessageTransfer() {
|
|
}
|
|
|
|
/**
|
|
* @brief The common user interface to send a message to a closed group of peers using the message transfer,
|
|
* which provides virtual synchrony within the receiving peers.
|
|
* @param msg the message to send
|
|
*/
|
|
void
|
|
MessageTransfer::send(MulticastMessage& msg) {
|
|
// prepare the message
|
|
MessageReference mRef(getLocalID(), sequ);
|
|
msg.setMessageReference(mRef);
|
|
msg.setSourceID(getLocalID());
|
|
msg.setLastHop(getLocalID());
|
|
msg.setSequ(sequ);
|
|
msg.setLT(getCurrentLogicalTime());
|
|
// ensure that the message contains at least the current viewID
|
|
msg.setViewID(getViewID());
|
|
#if OMNETPP
|
|
msg.setByteLength(sizeof msg);
|
|
#endif
|
|
// enqueue the message for local delivery
|
|
enqueue(msg);
|
|
|
|
// only one member within the group?
|
|
if (ms().getNumberOfPeers() == 1) {
|
|
//deliver the message instantly
|
|
updateQueue(msg);
|
|
DEBUG("send - send DT message successfully");
|
|
sortAndDeliver();
|
|
} else {
|
|
//we have to send the message to the group
|
|
|
|
// are we a master?
|
|
if (isLocalPeerMaster()) {
|
|
DEBUG("send - send DT message to other masters");
|
|
sendToAllMasters(msg);
|
|
DEBUG("send - send DT message to slaves (mRef = " << mRef << ", lt = " << getCurrentLogicalTime() << ")");
|
|
sendToCluster(msg);
|
|
createAndStartGTTimer(mRef, getCurrentLogicalTime());
|
|
}
|
|
// we are a slave
|
|
else {
|
|
std::stringstream buf;
|
|
buf << "send - send DT message to master (mRef = " << mRef << ", lt = " << getCurrentLogicalTime() << ")";
|
|
DEBUG(buf.str().c_str());
|
|
sendToMaster(msg);
|
|
createAndStartGTAwaitTimer(mRef, getCurrentLogicalTime());
|
|
}
|
|
|
|
//store the message
|
|
lastMessageQueue.add(&msg);
|
|
}
|
|
|
|
//generate the next sequence number
|
|
sequ++;
|
|
}
|
|
|
|
/**
|
|
* @brief This method retransmits the message, referenced by the given timer
|
|
* @param timer The expired timer.
|
|
* @return True, have it retransmit the message, false otherwise.
|
|
*/
|
|
bool
|
|
MessageTransfer::resendMessageToPeers(MTTimer * const timer) {
|
|
bool retVal = false;
|
|
MessageReference mref = timer->getReference();
|
|
|
|
if (timer->getNumberOfMissingPeers() > 0) {
|
|
MulticastMessage* msg = lastMessageQueue.find(mref)->dup();
|
|
|
|
if (msg != NULL) {
|
|
//no
|
|
if (timer->getNumberOfRetries() > 0) {
|
|
DEBUG("resendMessageToPeers - resend msg " << msg->getMessageReference() << " to peers");
|
|
msg->setLastHop(getLocalID());
|
|
sendTo(*msg, timer->getMissingPeers());
|
|
timer->restart();
|
|
retVal = true;
|
|
}//End if
|
|
|
|
delete msg;
|
|
}//end if
|
|
}//End if
|
|
|
|
return retVal;
|
|
}
|
|
|
|
/**
|
|
* @brief This method handles an expired GarbageTimer.
|
|
* The method removes the GT message from the lastMessageQueue, associated with this timer.
|
|
* @param timer the expired timer
|
|
*/
|
|
void
|
|
MessageTransfer::handleGarbageTimer(MTTimer * timer) {
|
|
if (timer != NULL) {
|
|
MessageReference mRef(timer->getReference());
|
|
std::stringstream buf;
|
|
buf << "handleGarbageTimer - run garbage routine ( mRef: " << mRef << ")";
|
|
DEBUG(buf.str().c_str());
|
|
lastMessageQueue.remove(mRef);
|
|
stopAndDeleteTimer(mRef);
|
|
}//End if
|
|
}
|
|
|
|
/**
|
|
* @brief This method handles an expired GTAwait timer.
|
|
*
|
|
* This method handles an expired GTAwait timer. It is scheduled after receiving a DT message and resends the lt message for a dt to the master,
|
|
* if possible, otherwise it removes the received DT message from the queue and gives up.
|
|
* @param timer the expired timer
|
|
*/
|
|
void
|
|
MessageTransfer::handleGTAwaitTimer(MTTimer * timer) {
|
|
if (timer == NULL) return;
|
|
|
|
int retries = timer->getNumberOfRetries();
|
|
|
|
if (retries > 0) {
|
|
//resend the last message again
|
|
MessageReference mRef = timer->getReference();
|
|
MulticastMessage* pdu = lastMessageQueue.find(mRef);
|
|
|
|
//we are the sender of the message ?
|
|
if (pdu != NULL && pdu->getSourceID() == getLocalID()) {
|
|
sendToMaster(*pdu);
|
|
}//end if
|
|
else {
|
|
resendLTMessageToPeer(mRef);
|
|
}//end else
|
|
|
|
timer->restart();
|
|
}//End if retries > 0
|
|
else {
|
|
//max retries reached, give up
|
|
DEBUG("handleGTAwaitTimer - maximum number of retries for requesting GT message from master reached - give up (mRef: " << timer->getReference() << ")");
|
|
DEBUG("handleGTAwaitTimer - remove timer and message from queues");
|
|
MessageReference mRef(timer->getReference());
|
|
queue.remove(mRef);
|
|
stopAndDeleteTimer(mRef);
|
|
//if this peer the sender of the message, we have to remove it
|
|
lastMessageQueue.remove(mRef);
|
|
}//End else
|
|
}//End handleGTAwaitTimer
|
|
|
|
/**
|
|
* @brief This method handles an expired GT timer.
|
|
* @param timer The expired timer.
|
|
*/
|
|
void
|
|
MessageTransfer::handleGTTimer(MTTimer* timer) {
|
|
if (timer == NULL) return;
|
|
|
|
// master has not collected all desired lt messages
|
|
// and can retransmit the message??
|
|
if (resendMessageToPeers(timer)) {
|
|
// yes
|
|
return;
|
|
}//End if
|
|
|
|
// update local clock
|
|
MessageReference mref = timer->getReference();
|
|
MulticastMessage* msg = lastMessageQueue.find(mref);
|
|
|
|
//i know the message
|
|
if (msg != NULL) {
|
|
VirtualLogicalTime lt(getCurrentLogicalTime());
|
|
lt.update(VirtualLogicalTime(std::max(lt, timer->getLt())));
|
|
msg->setLT(lt);
|
|
GTMessage pdu(*msg, getLocalID());
|
|
// copy missing peers
|
|
copyMissingPeersAndForeignMissingPeersToMessage(&pdu, timer);
|
|
{
|
|
std::stringstream buf;
|
|
buf << "handleGTTimer - number of missing peers in group: " << pdu.getMissingPeerSize();
|
|
DEBUG(buf.str().c_str());
|
|
}
|
|
// update the message stored in the last message queue,
|
|
// needed for error handling (if a slave misses an lt/gt message)
|
|
updateMessage(msg, lt, timer);
|
|
{
|
|
std::stringstream buf;
|
|
buf << "handleGTTimer - send GT message to peers (mRef = " << mref << ", gt = " << lt << ")";
|
|
DEBUG(buf.str().c_str());
|
|
}
|
|
// update the local time
|
|
ts().update(lt);
|
|
sendToAllMasters(pdu);
|
|
sendToCluster(pdu);
|
|
DEBUG("handleGTTimer - send DT message successfully (mRef = " << mref << ")");
|
|
updateQueue(*msg);
|
|
sortAndDeliver();
|
|
// delete GT timer
|
|
stopAndDeleteTimer(mref);
|
|
//create and start GarbageTimer
|
|
createAndStartGarbageTimer(mref);
|
|
}//End if
|
|
}
|
|
|
|
/**
|
|
* @brief This method handles an expired LT timer.
|
|
* @param timer The expired timer.
|
|
*/
|
|
void
|
|
MessageTransfer::handleLTTimer(MTTimer * timer) {
|
|
std::stringstream buf;
|
|
buf << "handleLTTimer - handle LT timer (mRef: " << timer->getReference() << ")";
|
|
DEBUG(buf.str().c_str());
|
|
|
|
if (timer == NULL) return;
|
|
|
|
if (resendMessageToPeers(timer)) {
|
|
return;
|
|
}//End if
|
|
|
|
MessageReference mRef = timer->getReference();
|
|
MulticastMessage* msg = lastMessageQueue.find(mRef);
|
|
VirtualLogicalTime timerLt = timer->getLt();
|
|
|
|
//send lt message to sender
|
|
if (msg != NULL) {
|
|
VirtualLogicalTime aLt = std::max(getCurrentLogicalTime().getValue(),
|
|
timerLt.getValue());
|
|
LTMessage pdu(mRef, getLocalID(), msg->getSequ(), aLt);
|
|
DEBUG("handleLTTimer - number of missing peers: " << timer->getNumberOfMissingPeers());
|
|
pdu.setMissingPeers(timer->getMissingPeers());
|
|
std::stringstream buf2;
|
|
buf2 << "handleLTTimer - send LT message to sender (mRef = " << mRef << ", lt = " << aLt << ", destination = " << msg->getLastHop() << ")";
|
|
DEBUG(buf2.str().c_str());
|
|
PeerID lastHopID(msg->getLastHop());
|
|
sendTo(pdu, lastHopID);
|
|
//delete the lt timer
|
|
stopAndDeleteTimer(mRef);
|
|
createAndStartGarbageTimer(mRef);
|
|
}//End if
|
|
}
|
|
|
|
/**
|
|
* @brief This method handles a incoming DT peer message.
|
|
* @param pdu The received peer message.
|
|
*/
|
|
void
|
|
MessageTransfer::handleMessage(MulticastMessage* pdu) {
|
|
ts().incCurrentLogicalTime();
|
|
MessageReference mRef = pdu->getMessageReference();
|
|
|
|
//message already seen?
|
|
if (queue.contains(mRef)) {
|
|
DEBUG("handleDTMessage - duplicated DT message received (mRef: " << mRef << ")");
|
|
resendLTMessageToPeer(mRef);
|
|
return;
|
|
}//End if
|
|
|
|
//master
|
|
if (isLocalPeerMaster()) {
|
|
DEBUG("handleDTMessage - send DT message to slaves (mRef: " << mRef << ")");
|
|
PeerID lastHop(pdu->getLastHop());
|
|
PeerIDList recList = createReceiverList(lastHop);
|
|
MulticastMessage * pack = pdu->dup();
|
|
pack->setLastHop(getLocalID());
|
|
#if OMNETPP
|
|
pack->setByteLength(sizeof * pack);
|
|
#endif
|
|
sendTo(*pack, recList);
|
|
delete pack;
|
|
//we are the master of the sending node?
|
|
MTTimer* timer;
|
|
bool primaryMaster = isPrimaryMaster(lastHop);
|
|
|
|
//yes
|
|
if (primaryMaster) {
|
|
//this is the primary master
|
|
timer = new GTTimer(*this);
|
|
}//End if
|
|
//no
|
|
else {
|
|
timer = new LTTimer(*this);
|
|
//this is a secondary master, we have to ensure the receiving of the GT message
|
|
createAndStartGTAwaitTimer(mRef, getCurrentLogicalTime());
|
|
}//end else
|
|
|
|
setupTimer(timer, mRef, recList, getCurrentLogicalTime());
|
|
timer->start();
|
|
//add the message
|
|
lastMessageQueue.add(pdu);
|
|
|
|
//shortcut for a group with one (inviting another one)
|
|
//this shortens the data delivery latency
|
|
//not necessary, but nice
|
|
if (recList.size() == 0) {
|
|
enqueue(*pdu);
|
|
|
|
if (primaryMaster) {
|
|
handleGTTimer(timer);
|
|
} else {
|
|
handleLTTimer(timer);
|
|
}
|
|
|
|
return;
|
|
}
|
|
}//end if
|
|
//slave && not sender
|
|
else {
|
|
DEBUG("handleDTMessage - DT message received (mRef: " << mRef << ")");
|
|
LTMessage pack(mRef, getLocalID(), pdu->getSequ(), getCurrentLogicalTime());
|
|
std::stringstream buf;
|
|
buf << "handleDTMessage - send LT message to master " << getLocalMasterID() << " (mRef = " << mRef << ", lt = " << getCurrentLogicalTime() << ")";
|
|
DEBUG(buf.str().c_str());
|
|
sendTo(pack, getLocalMasterID());
|
|
//start gt await timer for ensure gt receiving as slave
|
|
createAndStartGTAwaitTimer(mRef, getCurrentLogicalTime());
|
|
}//End else
|
|
|
|
enqueue(*pdu);
|
|
}
|
|
|
|
/**
|
|
* @brief This method handles a incoming LT peer message.
|
|
* @param pdu The received peer message.
|
|
*/
|
|
void
|
|
MessageTransfer::handleMessage(LTMessage* pdu) {
|
|
MessageReference mRef = pdu->getMessageReference();
|
|
std::stringstream buf;
|
|
buf << "handleLTMessage - LT message received from " << pdu->getLastHop() << " (mRef: " << mRef << ", lt: " << pdu->getLT() << ")";
|
|
DEBUG(buf.str().c_str());
|
|
|
|
//for this message runs no GT/LT or gtAwait timer --> message handling has already been finished
|
|
if (!timerQueue.find(mRef) && !gtAwaitTimerQueue.find(mRef)) {
|
|
resendGTMessageToPeer(mRef, pdu);
|
|
}//End if
|
|
else {
|
|
updateMissingPeersInLastMessageQueue(pdu);
|
|
//update Timer, if present
|
|
updateTimer(pdu);
|
|
bool retVal = false;
|
|
retVal = isLTCollectingCompleted(pdu->getMessageReference());
|
|
|
|
if (retVal == true) {
|
|
DEBUG("handleLTMessage - stop running timers and finish message processing (mRef: " << mRef << ")");
|
|
MTTimer * timer = stopLTGTOrGarbageTimer(pdu->getMessageReference());
|
|
|
|
//check for LTTimer or GTTimer, no GarbageTimer
|
|
if (dynamic_cast<GarbageTimer*>(timer) == NULL) {
|
|
//let the timer run out next to now
|
|
timer->stop();
|
|
timer->setTimeout(0.1);
|
|
timer->start();
|
|
}//End if
|
|
}//if
|
|
}//End else
|
|
}
|
|
|
|
/**
|
|
* @brief This method handles a incoming GT peer message
|
|
* @param pdu The received peer message.
|
|
*/
|
|
void
|
|
MessageTransfer::handleMessage(GTMessage * pdu) {
|
|
ts().incCurrentLogicalTime();
|
|
MessageReference mRef(pdu->getMessageReference());
|
|
//the peer have to stop the GtAwaitTimer
|
|
stopAndDeleteTimer(mRef);
|
|
|
|
if (isLocalPeerMaster()) {
|
|
std::stringstream buf;
|
|
buf << "handleGTMessage - send GT message to slaves (mRef = " << mRef << ")";
|
|
DEBUG(buf.str().c_str());
|
|
PeerID lastHopID(pdu->getLastHop());
|
|
PeerIDList recList = createReceiverList(lastHopID);
|
|
GTMessage pack(*pdu);
|
|
pack.setSourceID(getLocalID());
|
|
sendTo(pack, recList);
|
|
}//End if
|
|
|
|
//master and slave
|
|
updateQueue(*pdu);
|
|
sortAndDeliver();
|
|
|
|
//sender
|
|
if (mRef.getPeerID() == getLocalID()) {
|
|
//remove the message
|
|
lastMessageQueue.remove(mRef);
|
|
}//End if
|
|
|
|
ts().update(VirtualLogicalTime(std::max(getCurrentLogicalTime(),
|
|
VirtualLogicalTime(pdu->getLT() + 1))
|
|
));
|
|
std::stringstream buf;
|
|
buf << "handleGTMessage - update lt to " << getCurrentLogicalTime();
|
|
DEBUG(buf.str().c_str());
|
|
}
|
|
|
|
/**
|
|
* @brief This method enqueues a received DT peer message into the delivery queue.
|
|
* The stored message is marked as not deliverable.
|
|
* @param pdu the received peer message
|
|
*/
|
|
void
|
|
MessageTransfer::enqueue(const MulticastMessage& pdu) {
|
|
MessageReference mRef = pdu.getMessageReference();
|
|
|
|
if (queue.contains(mRef)) {
|
|
DEBUG("enqueue - PDU already enqueued");
|
|
} else {
|
|
LTNode node;
|
|
node.setMessageReference(mRef);
|
|
node.setPeerID(mRef.getPeerID());
|
|
node.setSequ(mRef.getSequenceNumber());
|
|
node.setLT(getCurrentLogicalTime());
|
|
node.setMessage(pdu.dup());
|
|
std::stringstream buf;
|
|
buf << "enqueue - enqueue PDU (mRef = " << node.getMessageReference() << ", peerID: "
|
|
<< node.getPeerID() << ", sequ: " << node.getSequ() << ")";
|
|
DEBUG(buf.str().c_str());
|
|
queue.add(node);
|
|
dispatcher.signal(new MulticastMessageEnqueuedEvent(pdu.dup()));
|
|
}//End else
|
|
}
|
|
|
|
/**
|
|
* @brief This method updates a stored peer message within the delivery queue from the given peer message and marks this message as deliverable.
|
|
* @param pdu The peer message corresponding to the stored message, used for the update
|
|
*/
|
|
void
|
|
MessageTransfer::updateQueue(const MoversightMessage& pdu) {
|
|
MessageReference mref(pdu.getMessageReference());
|
|
LTNode* node = queue.find(mref);
|
|
|
|
if (node != NULL) {
|
|
std::stringstream buf;
|
|
buf << "updateQueue - update PDU (mRef = " << node->getMessageReference()
|
|
<< ", peerID: " << node->getPeerID() << ", sequ: " << node->getSequ() << ")";
|
|
DEBUG(buf.str().c_str());
|
|
VirtualLogicalTime lt(pdu.getLT());
|
|
node->setLT(lt);
|
|
node->setDeliverable(true);
|
|
node->getMessage()->setViewID(pdu.getViewID());
|
|
// update missing peers
|
|
node->getMessage()->setMissingPeers(pdu.getMissingPeers());
|
|
} else {
|
|
DEBUG("updateQueue - PDU not enqueued ");
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief This method sorts the delivery queue according the receiving time of each message.
|
|
* Is the first message within the queue marked as deliverable, this message is delivered to the moversight layer.
|
|
*/
|
|
void
|
|
MessageTransfer::sortAndDeliver() {
|
|
queue.sort();
|
|
|
|
while (queue.size() > 0 && queue.front().isDeliverable()) {
|
|
MulticastMessage * pdu = queue.front().getMessage();
|
|
size_t missingPeersCount = pdu->getMissingPeerSize();
|
|
std::stringstream buf;
|
|
buf << "sortAndDeliver - deliver DT ("
|
|
<< "mRef = " << queue.front().getMessageReference()
|
|
<< ", peerID: " << queue.front().getPeerID()
|
|
<< ", sequ: " << queue.front().getSequ()
|
|
<< ", lt: " << queue.front().getLT()
|
|
<< ", missing peers: " << missingPeersCount << ")";
|
|
DEBUG(buf.str().c_str());
|
|
//deliver message to moversight
|
|
//remember the time of the last seen message
|
|
lastSeenLt = pdu->getLogicalDeliveryTime();
|
|
|
|
//check sending view property again
|
|
if (getViewID() == pdu->getViewID()) {
|
|
|
|
// avoid losing gt from later sorted msg
|
|
if (getLocalSubState() == FLUSHING &&
|
|
dispatcher.getNextViewBuffer().isViewChangeAbleMessageType(pdu->getType())) {
|
|
DEBUG("READY to Flush?");
|
|
|
|
for (size_t i = 0; i < queue.size(); i++) {
|
|
if (!queue.get(i).isDeliverable()) {
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
//deliver the message
|
|
pdu->handleDeliver(dispatcher, pdu->getMissingPeers());
|
|
dispatcher.getResourceValueDistributor().estimateAllPeerResources(pdu);
|
|
dispatcher.signal(new MulticastMessageDeliveredEvent(pdu->dup()));
|
|
|
|
if (missingPeersCount > 0) {
|
|
dispatcher.signal(new PendingPeersEvent(pdu->getMissingPeers()));
|
|
}//End if
|
|
}//End if
|
|
else {
|
|
DEBUG("sortAndDeliver - drop message, because sending view property does not match (mRef: " << pdu->getMessageReference() << ")");
|
|
dispatcher.signal(new MulticastMessageDroppedEvent(pdu->dup()));
|
|
|
|
}//end else
|
|
|
|
//clean up
|
|
queue.dequeue();
|
|
|
|
if (getLocalSubState() == FLUSHING) {
|
|
if (queue.isEmpty()) {
|
|
dispatcher.signal(new FlushDoneEvent());
|
|
}//End if
|
|
}//End if
|
|
}//End while
|
|
}
|
|
|
|
/**
|
|
* @brief This method updates the missing peers in the last message queue from the given peer message.
|
|
* @param pdu the message used to update the missing peers set of the message, stored in the last message queue.
|
|
*/
|
|
void
|
|
MessageTransfer::updateMissingPeersInLastMessageQueue(const MoversightMessage * const pdu) {
|
|
if (pdu->getMissingPeerSize() > 0) {
|
|
MoversightMessage* msg = lastMessageQueue.find(pdu->getMessageReference());
|
|
|
|
if (msg != NULL) {
|
|
PeerIDList missingPeers = msg->getMissingPeers();
|
|
missingPeers.add(pdu->getMissingPeers());
|
|
msg->setMissingPeers(missingPeers);
|
|
}//End if
|
|
}//End if
|
|
}
|
|
|
|
/**
|
|
* @brief Update the timer corresponding to the given message (via the message reference field).
|
|
* @param pdu The received message used to update the lt field and the set of missing peers of the timer.
|
|
*/
|
|
void
|
|
MessageTransfer::updateTimer(const MoversightMessage * const pdu) {
|
|
//search the timer and run the update
|
|
MessageReference mref(pdu->getMessageReference());
|
|
MTTimer* timer = timerQueue.find(mref);
|
|
|
|
if (timer != NULL) {
|
|
updateTimer(timer, pdu);
|
|
}//End if
|
|
}
|
|
|
|
/**
|
|
* @brief Updates a given timer from a peer message. The method updates the lt field and the set of missing peers.
|
|
* @param timer The timer to update.
|
|
* @param pdu The received peer message used to update the timer.
|
|
*/
|
|
void
|
|
MessageTransfer::updateTimer(MTTimer * const timer, const MoversightMessage * const pdu) {
|
|
// error case, do nothing
|
|
if (timer == NULL || pdu == NULL) {
|
|
return;
|
|
}//End if
|
|
|
|
timer->removeMissingPeer(pdu->getLastHop());
|
|
VirtualLogicalTime lt = timer->getLt() + 1;
|
|
lt = std::max(lt, pdu->getLT());
|
|
timer->setLt(lt);
|
|
// copy missing peers
|
|
timer->setForeignMissingPeers(pdu->getMissingPeers());
|
|
}
|
|
|
|
/**
|
|
* @brief This method updates a message. For the msg, the logical time and the missing peers (from the timer instance)
|
|
* are updated. The message is usually stored within the last message queue and the updated values are needed to handle repeated LT/GT request be the slaves.
|
|
* @param msg the peer message to update
|
|
* @param aLt the logical time to set
|
|
* @param timer the timer knows the missing peers
|
|
*/
|
|
void
|
|
MessageTransfer::updateMessage(MulticastMessage* msg, VirtualLogicalTime & aLt, MTTimer* timer) {
|
|
if (msg != NULL && timer != NULL) {
|
|
msg->setLT(aLt);
|
|
copyMissingPeersAndForeignMissingPeersToMessage(msg, timer);
|
|
}//End if
|
|
}//End updateMessage
|
|
|
|
/**
|
|
* @brief Setups the timer from the given values. Does nothing, is the timer NULL.
|
|
* @param timer the timer to setup
|
|
* @param mRef the message reference of the timer
|
|
* @param missedPeer the peer for which the timer is waiting
|
|
* @param aLt the start time of the timer
|
|
* @return the timer
|
|
*/
|
|
MTTimer*
|
|
MessageTransfer::setupTimer(MTTimer * timer, MessageReference & mRef, PeerID missedPeer, const VirtualLogicalTime & aLt) {
|
|
PeerIDList list;
|
|
list.add(missedPeer);
|
|
return setupTimer(timer, mRef, list, aLt);
|
|
}//End setupTimer
|
|
|
|
/**
|
|
* @brief Setups the timer from the given values. Does nothing, is the timer NULL.
|
|
* @param timer the timer to setup
|
|
* @param mRef the message reference of the timer
|
|
* @param missingPeers the list of peers for which the timer is waiting
|
|
* @param aLt the start time of the timer
|
|
* @return the timer
|
|
*/
|
|
MTTimer*
|
|
MessageTransfer::setupTimer(MTTimer* timer, MessageReference & mRef, PeerIDList & missingPeers, const VirtualLogicalTime & aLt) {
|
|
if (timer != NULL) {
|
|
timer->setLt(aLt.getValue());
|
|
timer->setReference(mRef);
|
|
timer->setMissingPeers(missingPeers);
|
|
}//End if
|
|
|
|
return timer;
|
|
}//End setupTimer
|
|
|
|
/**
|
|
* @brief Resends a LT message to the master, based on the previously received DT message, identified by the given message reference.
|
|
* @param mRef the message reference, which identifies the previously received DT message
|
|
*/
|
|
void
|
|
MessageTransfer::resendLTMessageToPeer(MessageReference & mRef) {
|
|
LTNode* node = queue.find(mRef);
|
|
|
|
if (node != NULL) {
|
|
//recreate LT message and send it back to sender
|
|
VirtualLogicalTime aLt(node->getLT());
|
|
LTMessage pack(mRef, getLocalID(), node->getMessage()->getSequ(), aLt);
|
|
std::stringstream buf;
|
|
buf << "resendLTMessageToPeer - resend LT message to master (mRef = " << mRef << ",lt = " << node->getLT() << ")";
|
|
DEBUG(buf.str().c_str());
|
|
PeerID pId(node->getMessage()->getLastHop());
|
|
|
|
if (dispatcher.getMembershipService().getCurrentMemberRegister().contains(pId)) {
|
|
sendTo(pack, pId);
|
|
}
|
|
}//End if
|
|
else {
|
|
std::stringstream buf;
|
|
buf << "resendLTMessageToPeer - resend LT message failed, because message not present in queue (mRef: " << mRef << ")";
|
|
DEBUG(buf.str().c_str());
|
|
}//End else
|
|
}
|
|
|
|
/**
|
|
* @brief Resends a already send GT message to a single requesting peer, in case, the garbage timer is still running, otherwise, it does nothing.
|
|
* @param mref the message reference of the associated DT message
|
|
* @param pdu the repeated LT message of the slave
|
|
*/
|
|
void
|
|
MessageTransfer::resendGTMessageToPeer(const MessageReference& mref, const LTMessage * const pdu) {
|
|
MulticastMessage* msg = lastMessageQueue.find(mref);
|
|
|
|
//we still have a copy of the message? (garbage timer is still running)
|
|
if (msg != NULL && pdu != NULL) {
|
|
std::stringstream buf;
|
|
buf << "resendGTMessageToPeer - delayed LT message received for message " << pdu->getMessageReference();
|
|
DEBUG(buf.str().c_str());
|
|
buf.clear();
|
|
buf << "resendGTMessageToPeer - send GT message again to peer " << pdu->getLastHop();
|
|
DEBUG(buf.str().c_str());
|
|
GTMessage pack(*msg, getLocalID());
|
|
pack.setMissingPeers(msg->getMissingPeers());
|
|
sendTo(pack, pdu->getLastHop());
|
|
}//End if msg != NULL
|
|
}
|
|
|
|
/**
|
|
* @brief Copy the missing peers and foreign missing peers from the timer into the peer message.
|
|
* @param timer the source for the copy operation
|
|
* @param pdu the destination of the copy operation
|
|
*/
|
|
void
|
|
MessageTransfer::copyMissingPeersAndForeignMissingPeersToMessage(MoversightMessage * pdu, MTTimer * timer) {
|
|
if (timer == NULL || pdu == NULL) return;
|
|
|
|
if (timer->getMissingPeers().size() == 0 &&
|
|
timer->getForeignMissingPeers().size() == 0) return;
|
|
|
|
PeerIDList missingPeers;
|
|
missingPeers.add(timer->getMissingPeers());
|
|
missingPeers.add(timer->getForeignMissingPeers());
|
|
pdu->setMissingPeers(missingPeers);
|
|
}
|
|
|
|
/**
|
|
* @brief Creates and starts a GT Await Timer, waiting of a gt from the master peer.
|
|
* @param messageRef the message reference of the corresponding message
|
|
* @param alt the virtual logical start time of the timer
|
|
*/
|
|
void
|
|
MessageTransfer::createAndStartGTAwaitTimer(MessageReference & messageRef, const VirtualLogicalTime & alt) {
|
|
MTTimer* timer = new GTAwaitTimer(*this);
|
|
PeerID pId = getLocalMasterID();
|
|
setupTimer(timer, messageRef, pId, alt);
|
|
timer->start();
|
|
}
|
|
|
|
/**
|
|
* @brief Creates and starts a GT Timer, waiting a period of time for incoming lt messages of the slaves, before determine and send the global gt.
|
|
* @param messageRef the message reference of the corresponding message
|
|
* @param alt the virtual logical start time of the timer
|
|
*/
|
|
void
|
|
MessageTransfer::createAndStartGTTimer(MessageReference & messageRef, const VirtualLogicalTime & alt) {
|
|
PeerIDList list = ms().getClusterPeerIDListSlavesOnly(getLocalPeer().getClusterID());
|
|
PeerIDList maList = ms().getMasterPeerIDList();
|
|
PeerID myID = getLocalID();
|
|
|
|
for (size_t i = 0; i < maList.size(); i++) {
|
|
PeerID id = maList.get(i);
|
|
|
|
if (!(id == myID)) {
|
|
list.add(id);
|
|
}//End if
|
|
}
|
|
|
|
MTTimer* timer = new GTTimer(*this);
|
|
setupTimer(timer, messageRef, list, alt);
|
|
timer->start();
|
|
}
|
|
|
|
/**
|
|
* @brief Stops all running GT, LT or Garbage timer, associated with the given message reference.
|
|
* @param mRef the message reference of the message associated with the timer to stop
|
|
* @returns the stopped timer
|
|
*/
|
|
MTTimer *
|
|
MessageTransfer::stopLTGTOrGarbageTimer(const MessageReference & mRef) {
|
|
MTTimer* timer = timerQueue.find(mRef);
|
|
|
|
if (timer != NULL) {
|
|
timer->stop();
|
|
return timer;
|
|
}//end if
|
|
|
|
throw NullPointerException("timer not found");
|
|
}
|
|
|
|
/**
|
|
* @brief Creates a GarbageTimer instance and starts this instance, expect the peer is DISJOINED.
|
|
* @param mRef the message reference, associated with this timer
|
|
*/
|
|
void
|
|
MessageTransfer::createAndStartGarbageTimer(MessageReference & mRef) {
|
|
if (getLocalPeer().getPeerState() != DISJOINED) {
|
|
MTTimer* timer = new GarbageTimer(*this);
|
|
timer->setReference(mRef);
|
|
timer->start();
|
|
}//End if
|
|
else {
|
|
//we have finalized our service, do nothing
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief Checks, if the a timer scheduled for the given message reference. In that case, the timer is stopped and deleted from the corresponding timer queue.
|
|
* @param mRef the message reference of the timer
|
|
*/
|
|
void
|
|
MessageTransfer::stopAndDeleteTimer(MessageReference & mRef) {
|
|
//check for lt and gt timer or garbage timer
|
|
MTTimer* ltGtGarbageTimer = timerQueue.find(mRef);
|
|
|
|
if (ltGtGarbageTimer != NULL) {
|
|
ltGtGarbageTimer->stop();
|
|
delete ltGtGarbageTimer;
|
|
}//End if
|
|
|
|
//check for gtAwait timer
|
|
MTTimer* gtAwaitTimer = gtAwaitTimerQueue.find(mRef);
|
|
|
|
if (gtAwaitTimer != NULL) {
|
|
gtAwaitTimer->stop();
|
|
delete gtAwaitTimer;
|
|
}//End if
|
|
}
|
|
|
|
/**
|
|
* @brief Permits access to the moversight membership service.
|
|
* @return The moversight membership service.
|
|
*/
|
|
MembershipService &
|
|
MessageTransfer::ms() {
|
|
return dispatcher.getMembershipService();
|
|
}
|
|
|
|
/**
|
|
* @brief Permits access to the moversight time service.
|
|
* @return The moversight time service.
|
|
*/
|
|
TimeService &
|
|
MessageTransfer::ts() {
|
|
return dispatcher.getTimeService();
|
|
}
|
|
|
|
/**
|
|
* @brief Checks, if the LT collecting completed for the given timer.
|
|
* The result is derived from the missingPeers and foreignMissingPeers lists of the timer.
|
|
*
|
|
* @param mRef the message reference of the corresponding message, identifying the timer.
|
|
*/
|
|
bool
|
|
MessageTransfer::isLTCollectingCompleted(const MessageReference & mRef) {
|
|
MTTimer* timer = timerQueue.find(mRef);
|
|
|
|
if (timer != NULL) {
|
|
if (!(timer->getNumberOfMissingPeers() > 0) &&
|
|
!(timer->getNumberOfForeignMissingPeers() > 0)) {
|
|
return true;
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* @brief This method cleans all timer queues by stopping and deleting
|
|
* all running timers (LT-, GT- and GTAwaitTimers).
|
|
*/
|
|
void
|
|
MessageTransfer::stopAllTimers() {
|
|
while (timerQueue.size() > 0) {
|
|
MTTimer * timer = timerQueue.front();
|
|
|
|
if (timer != NULL) {
|
|
timer->stop();
|
|
delete timer;
|
|
}//End if
|
|
}//End while
|
|
|
|
while (gtAwaitTimerQueue.size() > 0) {
|
|
MTTimer * timer = gtAwaitTimerQueue.front();
|
|
|
|
if (timer != NULL) {
|
|
timer->stop();
|
|
delete timer;
|
|
}//End if
|
|
}//End while
|
|
}
|
|
|
|
/**
|
|
* @brief The method cleans the message queue by deleting all stored
|
|
* peer messages. The lastMessageQueue, which stores all send messages,
|
|
* is cleaned to.
|
|
*/
|
|
void
|
|
MessageTransfer::cleanMessageQueues() {
|
|
while (!queue.isEmpty()) {
|
|
LTNode node = queue.front();
|
|
|
|
if (node.getMessage() != NULL) {
|
|
delete node.getMessage();
|
|
}
|
|
|
|
MessageReference mRef = node.getMessageReference();
|
|
queue.remove(mRef);
|
|
}//End while
|
|
|
|
lastMessageQueue.clear();
|
|
}
|
|
|
|
/**
|
|
* @brief Handles a peer left event. The method removes references to the removed peer
|
|
* from the queues used within the message transfer module.
|
|
* @param leftPeer The left peer.
|
|
*/
|
|
void
|
|
MessageTransfer::handleEvent(const PeerLeftEvent & e) {
|
|
//clean up all the left peer from all queues
|
|
for (size_t i = 0; i < queue.size(); i++) {
|
|
queue.get(i).getMessage()->removeMissingPeer(e.getPeer().getPeerID());
|
|
}
|
|
|
|
for (size_t i = 0; i < timerQueue.size(); i++) {
|
|
timerQueue.get(i)->removeMissingPeer(e.getPeer().getPeerID());
|
|
timerQueue.get(i)->removeForeignMissingPeer(e.getPeer().getPeerID());
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief Assignment operator.
|
|
* @param other The instance to assign.
|
|
* @return A reference of the current object.
|
|
*/
|
|
MessageTransfer & MessageTransfer::operator=(const MessageTransfer & other) {
|
|
if (this == &other) {
|
|
return *this;
|
|
}
|
|
|
|
MoversightService::operator=(other);
|
|
queue = other.queue;
|
|
lastMessageQueue = other.lastMessageQueue;
|
|
timerQueue = other.timerQueue;
|
|
gtAwaitTimerQueue = other.gtAwaitTimerQueue;
|
|
lastSeenLt = other.lastSeenLt;
|
|
sequ = other.sequ;
|
|
return *this;
|
|
}
|
|
|
|
/**
|
|
* @brief Determines, if the current peer master of the sending one.
|
|
* @param lastHop The ID of the sending peer.
|
|
*/
|
|
bool
|
|
MessageTransfer::isPrimaryMaster(PeerID lastHop) {
|
|
if (isLocalPeerMaster()) {
|
|
return !(getMasterIDList().contains(lastHop));
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* @brief Returns the last seen logical time for a delivered message.
|
|
* @return The last seen logical time of a delivered message
|
|
*/
|
|
const VirtualLogicalTime &
|
|
MessageTransfer::getLastSeenLogicalTime() const {
|
|
return lastSeenLt;
|
|
}
|
|
}
|
|
}
|