/* * File: SynchronizationService.cc * Author: jgaebler * * Created on July 26, 2012, 9:28 AM */ #include "SynchronizationService.h" #include "Dispatcher.h" #include "mt/msg/MulticastMessage.h" #include "cloud/CloudStorage.h" #include "p2p/P2PStorage.h" namespace ubeeme { namespace moversight { #undef DEBUG #define DEBUG(msg) if (module.isPrintDebugMOB()){ if(dispatcher.getLocalState()== DISJOINED){ MOV_DEBUG <<"SYN@TA_"<initialise(); } /** * @brief Finalize the service */ void SynchronizationService::finalise() { dispatcher.unsubscribeAll(this); if(storage != NULL){ storage->finalise(); delete storage; storage = NULL; }//End if } /** * @brief Stores a message in the cloud for synchronization. * @param m The message to store. */ void SynchronizationService::storeMessage( MulticastMessage & msg) { //store each message, until we re-synchronize the peer if (getLocalSubState() != SYNCHRONIZING) { storage->store(msg); }//End if } /** * @brief Reads the messages from the storing service and replays the received messages. * @return True, if the synchronization successfully done, false otherwise. */ bool SynchronizationService::synchronize() { //save the former subState SubState formerSubState = getLocalSubState(); //we currently synching, dont start an additional one if(formerSubState == SYNCHRONIZING) return false; //we SYNCHRONIZING the peer setLocalSubState(SYNCHRONIZING); VirtualLogicalTime lastSeenLt = getLastSeenLogicalTime(); VirtualLogicalTime replayLt; MulticastMessageQueue retrievedMessages; retrievedMessages = storage->retrieve(lastSeenLt); if (retrievedMessages.size() > 0) { PeerIDList missedPeers; for (size_t i = 0; i < retrievedMessages.size(); i++) { MulticastMessage* message = retrievedMessages.get(i); replayLt = message->getLogicalDeliveryTime(); message->handleDeliver(dispatcher, missedPeers); //clean up // @NOTE Error since the message is still in the queue... delete message; }//End for }//End if //reestablish the former sub state setLocalSubState(formerSubState); //return to caller //we have processed messages and the time has moved forward? //we are successfully synchronized if(retrievedMessages.size() > 0 && lastSeenLt <= replayLt){ return true; } //something have failed //we are not full synchronized to the group :( return false; } } }