Files
2014-06-30 13:58:10 +02:00

212 lines
7.3 KiB
C++

/*
* File: CloudStorage.cc
* Author: jgaebler
*
* Created on April 26, 2012, 3:05 PM
*/
#include "CloudStorage.h"
#include "common/time/VirtualLogicalTime.h"
#include "mt/msg/MulticastMessage.h"
#include "Dispatcher.h"
#include "CloudProvider.h"
#include "mob/sync/cloud/provider/DropBoxProvider.h"
#include "mob/sync/cloud/transfer/TransferSerializer.h"
#include "common/util/StringUtil.h"
#undef DEBUG
#define DEBUG(msg) if (module.isPrintDebugMOB()){ if(dispatcher.getLocalState()== DISJOINED){ MOV_DEBUG <<"CS@TA_"<<module.getLocalAddress()<<" "<<msg<<endl; } else{ MOV_DEBUG <<"CS@"<<dispatcher.getMembershipService().getLocalID() <<" "<< msg <<endl; } }
namespace ubeeme {
namespace moversight {
/**
* @brief constructor
* @param d The dispatcher module.
*/
CloudStorage::CloudStorage(Dispatcher & dis) : StorageService(dis), provider(NULL), numberOfMessagesPerUpload(DEFAULT_NUMBER_OF_MESSAGES_PER_UPLOAD) {
}
/**
* @brief Default destructor
*
*/
CloudStorage::~CloudStorage() {
}
/**
* @brief Initialize the cloud service
* This method initialize the cloud module, by setting up all needed values.
* The method is called by the dispatcher. After initialize, the cloud service is ready
* to offer its services.
*/
void
CloudStorage::initialise() {
//TODO add switch for additional cloud provider
provider = new DropBoxProvider(dispatcher, module);
}
/**
* @brief This method finalizes the cloud service.
*/
void
CloudStorage::finalise() {
if (provider != NULL) {
delete provider;
}
}
/**
* @brief Stores a message in the cloud for synchronization.
* @param m The message to store.
*/
void
CloudStorage::store(MulticastMessage & message) {
DEBUG("store - store message in cloud");
GenericTime now = GenericTime::currentTime();
//create an upload message
MulticastMessage * copM = message.dup();
TransferMessage msg(copM, message.getLt());
list.add(msg);
std::cerr<<"CloudStorage - fix upload criteria"<<endl;
//check, if the upload criteria matched
if (true) {//list.size() >= getNumberOfMessagesPerUpload() || (now - lastUpload) > 5) {
DEBUG("store - start message upload to cloud")
//get the needed stuff from the upload container
PeerID uPID = getLocalID();
TransferContainer uc(uPID, list);
//use the virtual time of the first message as synchronization point
VirtualLogicalTime syncTime = list.get(0).getTime();
//the synchronization point defines a set stored message
//exchanged within the group in a certain period
std::string fileName = StringUtil::int2String(syncTime.toIntValue());
ByteArray uploadDataArray;
if (TransferSerializer::serialize(uc, uploadDataArray)) {
DEBUG("store - upload transfer container to cloud");
provider->upload(uploadDataArray, uPID, fileName);
}//End if
else {
DEBUG("store - serialization failed, skip upload");
}//End else
lastUpload = now;
list.cleanAndDelete();
}//End if
}
/**
* @brief Retrieves messages from the cloud to resync the peer. The
* lastSeenLt determines the virtual logical time of the last successfully
* seen message locally. Thus, the messages retrieved from the cloud
* are up from the lastSeenLt logical time.
* @param lastSeenLt The virtual time of the last seen message.
* @return The list of missed messages, found in the cloud, starting from lastSeenLt
*/
MulticastMessageQueue
CloudStorage::retrieve(VirtualLogicalTime const & lastSeenLt) {
DEBUG("retrieve - download messages from cloud")
MulticastMessageQueue resultQueue;
//determine the possible rendezvous points
PeerIDList possibleRendezvousPoints = provider->determineStoringPeers();
//determine rendezvous point
//here, we will start by local master
//and afterwards lockup for other peers, if needed
PeerID rp = getLocalMasterID();
if (possibleRendezvousPoints.contains(rp)) {
DEBUG("retrieve - try downloading messages from the cloud");
//the list contains a number of containers, each container stores
//messages exchanged in the group, until the peer has lost its
//connection
//we have to examine this messages
TransferContainerList tcl = provider->downloadMessagesFromPeer(rp);
DEBUG("retrieve - process retrieved messages");
for (size_t i = 0; i < tcl.size(); i++) {
//get the set of messages
TransferList tl = tcl.get(i).getTransferList();
//copy all unseen messages
for (size_t j = 0; j < tl.size(); j++) {
MulticastMessage * message = tl.get(j).getMessage();
if (message != NULL) {
std::stringstream buf;
buf << "retrieve - message type: " << message->getType() << " message ref: " << message->getMessageReference() << " viewID:" << message->getViewID();
DEBUG(buf.str().c_str());
std::cerr << getLocalID() << buf.str().c_str() << "\n";
//unseen?
if (message->getLogicalDeliveryTime() >= lastSeenLt) {
//yes
resultQueue.add(message);
}//End if
}//End if
else {
//message NULL --> de-serialization error?
}//End else
}//End for
}//End for
//all unseen messages are copied to the resultQueue
//the transfer container is automatically deleted
}//End if
//error case - no rendezvous point found
else {
std::stringstream buf;
buf << "error - no rendezvous point found, stop downloading messages";
DEBUG(buf.str().c_str());
}//End else
return resultQueue;
}
/**
* @brief Sets the number of messages per upload
* @param mpu The number of messages per upload
*/
void
CloudStorage::setNumberOfMessagesPerUpload(size_t mpu) {
numberOfMessagesPerUpload = mpu;
}
/**
* @brief Returns the number of messages per upload
* @return The number of message per upload
*/
size_t
CloudStorage::getNumberOfMessagesPerUpload() {
return numberOfMessagesPerUpload;
}
}
}