212 lines
7.3 KiB
C++
212 lines
7.3 KiB
C++
/*
|
|
* File: CloudStorage.cc
|
|
* Author: jgaebler
|
|
*
|
|
* Created on April 26, 2012, 3:05 PM
|
|
*/
|
|
|
|
#include "CloudStorage.h"
|
|
|
|
#include "common/time/VirtualLogicalTime.h"
|
|
#include "mt/msg/MulticastMessage.h"
|
|
#include "Dispatcher.h"
|
|
#include "CloudProvider.h"
|
|
#include "mob/sync/cloud/provider/DropBoxProvider.h"
|
|
#include "mob/sync/cloud/transfer/TransferSerializer.h"
|
|
#include "common/util/StringUtil.h"
|
|
|
|
|
|
#undef DEBUG
|
|
#define DEBUG(msg) if (module.isPrintDebugMOB()){ if(dispatcher.getLocalState()== DISJOINED){ MOV_DEBUG <<"CS@TA_"<<module.getLocalAddress()<<" "<<msg<<endl; } else{ MOV_DEBUG <<"CS@"<<dispatcher.getMembershipService().getLocalID() <<" "<< msg <<endl; } }
|
|
|
|
namespace ubeeme {
|
|
namespace moversight {
|
|
|
|
/**
|
|
* @brief constructor
|
|
* @param d The dispatcher module.
|
|
*/
|
|
CloudStorage::CloudStorage(Dispatcher & dis) : StorageService(dis), provider(NULL), numberOfMessagesPerUpload(DEFAULT_NUMBER_OF_MESSAGES_PER_UPLOAD) {
|
|
}
|
|
|
|
/**
|
|
* @brief Default destructor
|
|
*
|
|
*/
|
|
CloudStorage::~CloudStorage() {
|
|
}
|
|
|
|
/**
|
|
* @brief Initialize the cloud service
|
|
* This method initialize the cloud module, by setting up all needed values.
|
|
* The method is called by the dispatcher. After initialize, the cloud service is ready
|
|
* to offer its services.
|
|
*/
|
|
void
|
|
CloudStorage::initialise() {
|
|
|
|
//TODO add switch for additional cloud provider
|
|
provider = new DropBoxProvider(dispatcher, module);
|
|
|
|
}
|
|
|
|
/**
|
|
* @brief This method finalizes the cloud service.
|
|
*/
|
|
void
|
|
CloudStorage::finalise() {
|
|
|
|
if (provider != NULL) {
|
|
delete provider;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief Stores a message in the cloud for synchronization.
|
|
* @param m The message to store.
|
|
*/
|
|
void
|
|
CloudStorage::store(MulticastMessage & message) {
|
|
|
|
DEBUG("store - store message in cloud");
|
|
|
|
GenericTime now = GenericTime::currentTime();
|
|
|
|
//create an upload message
|
|
MulticastMessage * copM = message.dup();
|
|
TransferMessage msg(copM, message.getLt());
|
|
|
|
list.add(msg);
|
|
std::cerr<<"CloudStorage - fix upload criteria"<<endl;
|
|
//check, if the upload criteria matched
|
|
if (true) {//list.size() >= getNumberOfMessagesPerUpload() || (now - lastUpload) > 5) {
|
|
|
|
DEBUG("store - start message upload to cloud")
|
|
|
|
//get the needed stuff from the upload container
|
|
PeerID uPID = getLocalID();
|
|
|
|
TransferContainer uc(uPID, list);
|
|
|
|
//use the virtual time of the first message as synchronization point
|
|
VirtualLogicalTime syncTime = list.get(0).getTime();
|
|
|
|
//the synchronization point defines a set stored message
|
|
//exchanged within the group in a certain period
|
|
std::string fileName = StringUtil::int2String(syncTime.toIntValue());
|
|
ByteArray uploadDataArray;
|
|
|
|
if (TransferSerializer::serialize(uc, uploadDataArray)) {
|
|
|
|
DEBUG("store - upload transfer container to cloud");
|
|
provider->upload(uploadDataArray, uPID, fileName);
|
|
|
|
}//End if
|
|
else {
|
|
DEBUG("store - serialization failed, skip upload");
|
|
}//End else
|
|
|
|
lastUpload = now;
|
|
list.cleanAndDelete();
|
|
|
|
}//End if
|
|
}
|
|
|
|
/**
|
|
* @brief Retrieves messages from the cloud to resync the peer. The
|
|
* lastSeenLt determines the virtual logical time of the last successfully
|
|
* seen message locally. Thus, the messages retrieved from the cloud
|
|
* are up from the lastSeenLt logical time.
|
|
* @param lastSeenLt The virtual time of the last seen message.
|
|
* @return The list of missed messages, found in the cloud, starting from lastSeenLt
|
|
*/
|
|
MulticastMessageQueue
|
|
CloudStorage::retrieve(VirtualLogicalTime const & lastSeenLt) {
|
|
|
|
DEBUG("retrieve - download messages from cloud")
|
|
|
|
MulticastMessageQueue resultQueue;
|
|
|
|
//determine the possible rendezvous points
|
|
PeerIDList possibleRendezvousPoints = provider->determineStoringPeers();
|
|
|
|
//determine rendezvous point
|
|
//here, we will start by local master
|
|
//and afterwards lockup for other peers, if needed
|
|
PeerID rp = getLocalMasterID();
|
|
|
|
if (possibleRendezvousPoints.contains(rp)) {
|
|
|
|
DEBUG("retrieve - try downloading messages from the cloud");
|
|
|
|
//the list contains a number of containers, each container stores
|
|
//messages exchanged in the group, until the peer has lost its
|
|
//connection
|
|
//we have to examine this messages
|
|
TransferContainerList tcl = provider->downloadMessagesFromPeer(rp);
|
|
|
|
DEBUG("retrieve - process retrieved messages");
|
|
|
|
for (size_t i = 0; i < tcl.size(); i++) {
|
|
|
|
//get the set of messages
|
|
TransferList tl = tcl.get(i).getTransferList();
|
|
|
|
//copy all unseen messages
|
|
for (size_t j = 0; j < tl.size(); j++) {
|
|
|
|
MulticastMessage * message = tl.get(j).getMessage();
|
|
if (message != NULL) {
|
|
|
|
std::stringstream buf;
|
|
buf << "retrieve - message type: " << message->getType() << " message ref: " << message->getMessageReference() << " viewID:" << message->getViewID();
|
|
DEBUG(buf.str().c_str());
|
|
std::cerr << getLocalID() << buf.str().c_str() << "\n";
|
|
//unseen?
|
|
if (message->getLogicalDeliveryTime() >= lastSeenLt) {
|
|
//yes
|
|
resultQueue.add(message);
|
|
}//End if
|
|
}//End if
|
|
else {
|
|
//message NULL --> de-serialization error?
|
|
}//End else
|
|
}//End for
|
|
}//End for
|
|
|
|
//all unseen messages are copied to the resultQueue
|
|
//the transfer container is automatically deleted
|
|
|
|
}//End if
|
|
//error case - no rendezvous point found
|
|
else {
|
|
std::stringstream buf;
|
|
buf << "error - no rendezvous point found, stop downloading messages";
|
|
DEBUG(buf.str().c_str());
|
|
}//End else
|
|
|
|
return resultQueue;
|
|
|
|
}
|
|
|
|
/**
|
|
* @brief Sets the number of messages per upload
|
|
* @param mpu The number of messages per upload
|
|
*/
|
|
void
|
|
CloudStorage::setNumberOfMessagesPerUpload(size_t mpu) {
|
|
numberOfMessagesPerUpload = mpu;
|
|
}
|
|
|
|
/**
|
|
* @brief Returns the number of messages per upload
|
|
* @return The number of message per upload
|
|
*/
|
|
size_t
|
|
CloudStorage::getNumberOfMessagesPerUpload() {
|
|
return numberOfMessagesPerUpload;
|
|
}
|
|
|
|
}
|
|
}
|