Files
scandocs/uni/masterarbeit/source/moversight/mob/sync/SynchronizationService.cc
2014-06-30 13:58:10 +02:00

140 lines
3.9 KiB
C++

/*
* File: SynchronizationService.cc
* Author: jgaebler
*
* Created on July 26, 2012, 9:28 AM
*/
#include "SynchronizationService.h"
#include "Dispatcher.h"
#include "mt/msg/MulticastMessage.h"
#include "cloud/CloudStorage.h"
#include "p2p/P2PStorage.h"
namespace ubeeme {
namespace moversight {
#undef DEBUG
#define DEBUG(msg) if (module.isPrintDebugMOB()){ if(dispatcher.getLocalState()== DISJOINED){ MOV_DEBUG <<"SYN@TA_"<<module.getLocalAddress()<<" "<<msg<<endl; } else{ MOV_DEBUG <<"SYN@"<<dispatcher.getLocalID() <<" "<< msg <<endl; } }
/**
* @brief Constructor
* @param dis A reference to the dispatcher instance.
*/
SynchronizationService::SynchronizationService(Dispatcher & dis) : MoversightService(dis, "SynchronizationService"), storage(NULL){
}
/**
* @brief Destructor
*/
SynchronizationService::~SynchronizationService() {
}
/**
* @brief Initialize the service
*/
void
SynchronizationService::initialise() {
if(storage == NULL){
#if CLOUD_SYNC_ENABLED
storage = new CloudStorage(dispatcher);
#else
storage = new P2PStorage(dispatcher);
#endif
}//End if
storage->initialise();
}
/**
* @brief Finalize the service
*/
void
SynchronizationService::finalise() {
dispatcher.unsubscribeAll(this);
if(storage != NULL){
storage->finalise();
delete storage;
storage = NULL;
}//End if
}
/**
* @brief Stores a message in the cloud for synchronization.
* @param m The message to store.
*/
void
SynchronizationService::storeMessage( MulticastMessage & msg) {
//store each message, until we re-synchronize the peer
if (getLocalSubState() != SYNCHRONIZING) {
storage->store(msg);
}//End if
}
/**
* @brief Reads the messages from the storing service and replays the received messages.
* @return True, if the synchronization successfully done, false otherwise.
*/
bool
SynchronizationService::synchronize() {
//save the former subState
SubState formerSubState = getLocalSubState();
//we currently synching, dont start an additional one
if(formerSubState == SYNCHRONIZING) return false;
//we SYNCHRONIZING the peer
setLocalSubState(SYNCHRONIZING);
VirtualLogicalTime lastSeenLt = getLastSeenLogicalTime();
VirtualLogicalTime replayLt;
MulticastMessageQueue retrievedMessages;
retrievedMessages = storage->retrieve(lastSeenLt);
if (retrievedMessages.size() > 0) {
PeerIDList missedPeers;
for (size_t i = 0; i < retrievedMessages.size(); i++) {
MulticastMessage* message = retrievedMessages.get(i);
replayLt = message->getLogicalDeliveryTime();
message->handleDeliver(dispatcher, missedPeers);
//clean up
// @NOTE Error since the message is still in the queue...
delete message;
}//End for
}//End if
//reestablish the former sub state
setLocalSubState(formerSubState);
//return to caller
//we have processed messages and the time has moved forward?
//we are successfully synchronized
if(retrievedMessages.size() > 0 && lastSeenLt <= replayLt){
return true;
}
//something have failed
//we are not full synchronized to the group :(
return false;
}
}
}