mirror of
https://github.com/mfontanini/libtins
synced 2026-01-23 02:35:57 +01:00
Add support for out of order data packet detection
This commit is contained in:
@@ -94,9 +94,19 @@ public:
|
||||
typedef std::map<uint32_t, payload_type> buffered_payload_type;
|
||||
|
||||
/**
|
||||
* The type used to store the callbacks that this class triggers
|
||||
* The type used to store the callback called when new data is available
|
||||
*/
|
||||
typedef std::function<void(Flow&)> event_callback;
|
||||
typedef std::function<void(Flow&)> data_available_callback_type;
|
||||
|
||||
/**
|
||||
* \brief The type used to store the callback called when data is buffered
|
||||
*
|
||||
* The arguments are the flow, the sequence number and payload that will
|
||||
* be buffered.
|
||||
*/
|
||||
typedef std::function<void(Flow&,
|
||||
uint32_t,
|
||||
const payload_type&)> out_of_order_callback_type;
|
||||
|
||||
/**
|
||||
* Construct a Flow from an IPv4 address
|
||||
@@ -126,17 +136,17 @@ public:
|
||||
*
|
||||
* \param callback The callback to be executed
|
||||
*/
|
||||
void data_callback(const event_callback& callback);
|
||||
void data_callback(const data_available_callback_type& callback);
|
||||
|
||||
/**
|
||||
* \brief Sets the callback that will be executed when data is buffered.
|
||||
* \brief Sets the callback that will be executed when out of order data arrives
|
||||
*
|
||||
* Whenever this flow receives out-of-order data, this callback will be
|
||||
* executed.
|
||||
*
|
||||
* \param callback The callback to be executed
|
||||
*/
|
||||
void buffering_callback(const event_callback& callback);
|
||||
void out_of_order_callback(const out_of_order_callback_type& callback);
|
||||
|
||||
/**
|
||||
* \brief Processes a packet.
|
||||
@@ -250,8 +260,8 @@ private:
|
||||
uint32_t seq_number_;
|
||||
std::array<uint8_t, 16> dest_address_;
|
||||
uint16_t dest_port_;
|
||||
event_callback on_data_callback_;
|
||||
event_callback on_buffering_callback_;
|
||||
data_available_callback_type on_data_callback_;
|
||||
out_of_order_callback_type on_out_of_order_callback_;
|
||||
State state_;
|
||||
bool is_v6_;
|
||||
bool ignore_data_packets_;
|
||||
@@ -275,17 +285,27 @@ public:
|
||||
/**
|
||||
* The type used for callbacks
|
||||
*/
|
||||
typedef std::function<void(Stream&)> stream_callback;
|
||||
typedef std::function<void(Stream&)> stream_callback_type;
|
||||
|
||||
/**
|
||||
* The type used to store payloads
|
||||
*/
|
||||
typedef Flow::payload_type payload_type;
|
||||
|
||||
/**
|
||||
* The type used for callbacks
|
||||
*
|
||||
* /sa Flow::buffering_callback
|
||||
*/
|
||||
typedef std::function<void(Stream&,
|
||||
uint32_t,
|
||||
const payload_type&)> out_of_order_callback_type;
|
||||
|
||||
/**
|
||||
* The type used to store hardware addresses
|
||||
*/
|
||||
typedef HWAddress<6> hwaddress_type;
|
||||
|
||||
/**
|
||||
* The type used to store payloads
|
||||
*/
|
||||
typedef Flow::payload_type payload_type;
|
||||
|
||||
/**
|
||||
* \brief Constructs a TCP stream using the provided packet.
|
||||
@@ -416,7 +436,7 @@ public:
|
||||
*
|
||||
* \param callback The callback to be set
|
||||
*/
|
||||
void stream_closed_callback(const stream_callback& callback);
|
||||
void stream_closed_callback(const stream_callback_type& callback);
|
||||
|
||||
/**
|
||||
* \brief Sets the callback to be executed when there's client data
|
||||
@@ -424,7 +444,7 @@ public:
|
||||
* \sa Flow::data_callback
|
||||
* \param callback The callback to be set
|
||||
*/
|
||||
void client_data_callback(const stream_callback& callback);
|
||||
void client_data_callback(const stream_callback_type& callback);
|
||||
|
||||
/**
|
||||
* \brief Sets the callback to be executed when there's server data
|
||||
@@ -432,7 +452,7 @@ public:
|
||||
* \sa Flow::data_callback
|
||||
* \param callback The callback to be set
|
||||
*/
|
||||
void server_data_callback(const stream_callback& callback);
|
||||
void server_data_callback(const stream_callback_type& callback);
|
||||
|
||||
/**
|
||||
* \brief Sets the callback to be executed when there's new buffered
|
||||
@@ -441,7 +461,7 @@ public:
|
||||
* \sa Flow::buffering_callback
|
||||
* \param callback The callback to be set
|
||||
*/
|
||||
void client_buffering_callback(const stream_callback& callback);
|
||||
void client_out_of_order_callback(const out_of_order_callback_type& callback);
|
||||
|
||||
/**
|
||||
* \brief Sets the callback to be executed when there's new buffered
|
||||
@@ -450,7 +470,7 @@ public:
|
||||
* \sa Flow::buffering_callback
|
||||
* \param callback The callback to be set
|
||||
*/
|
||||
void server_buffering_callback(const stream_callback& callback);
|
||||
void server_out_of_order_callback(const out_of_order_callback_type& callback);
|
||||
|
||||
/**
|
||||
* \brief Indicates that the data packets sent by the client should be
|
||||
@@ -502,16 +522,20 @@ private:
|
||||
|
||||
void on_client_flow_data(const Flow& flow);
|
||||
void on_server_flow_data(const Flow& flow);
|
||||
void on_client_buffering(const Flow& flow);
|
||||
void on_server_buffering(const Flow& flow);
|
||||
void on_client_out_of_order(const Flow& flow,
|
||||
uint32_t seq,
|
||||
const payload_type& payload);
|
||||
void on_server_out_of_order(const Flow& flow,
|
||||
uint32_t seq,
|
||||
const payload_type& payload);
|
||||
|
||||
Flow client_flow_;
|
||||
Flow server_flow_;
|
||||
stream_callback on_stream_closed_;
|
||||
stream_callback on_client_data_callback_;
|
||||
stream_callback on_server_data_callback_;
|
||||
stream_callback on_client_buffering_callback_;
|
||||
stream_callback on_server_buffering_callback_;
|
||||
stream_callback_type on_stream_closed_;
|
||||
stream_callback_type on_client_data_callback_;
|
||||
stream_callback_type on_server_data_callback_;
|
||||
out_of_order_callback_type on_client_out_of_order_callback_;
|
||||
out_of_order_callback_type on_server_out_of_order_callback_;
|
||||
hwaddress_type client_hw_addr_;
|
||||
hwaddress_type server_hw_addr_;
|
||||
bool auto_cleanup_;
|
||||
@@ -546,7 +570,7 @@ public:
|
||||
/**
|
||||
* \brief The type used for callbacks
|
||||
*/
|
||||
typedef Stream::stream_callback stream_callback;
|
||||
typedef Stream::stream_callback_type stream_callback_type;
|
||||
|
||||
/**
|
||||
* Default constructor
|
||||
@@ -576,7 +600,7 @@ public:
|
||||
*
|
||||
* \param callback The callback to be set
|
||||
*/
|
||||
void new_stream_callback(const stream_callback& callback);
|
||||
void new_stream_callback(const stream_callback_type& callback);
|
||||
|
||||
/**
|
||||
* Finds the stream identified by the provided arguments.
|
||||
@@ -625,7 +649,7 @@ private:
|
||||
static address_type serialize(const IPv6Address& address);
|
||||
|
||||
streams_type streams_;
|
||||
stream_callback on_new_connection_;
|
||||
stream_callback_type on_new_connection_;
|
||||
size_t max_buffered_chunks_;
|
||||
bool attach_to_flows_;
|
||||
};
|
||||
|
||||
@@ -92,12 +92,12 @@ Flow::Flow(const IPv6Address& dest_address, uint16_t dest_port,
|
||||
output.write(dest_address);
|
||||
}
|
||||
|
||||
void Flow::data_callback(const event_callback& callback) {
|
||||
void Flow::data_callback(const data_available_callback_type& callback) {
|
||||
on_data_callback_ = callback;
|
||||
}
|
||||
|
||||
void Flow::buffering_callback(const event_callback& callback) {
|
||||
on_buffering_callback_= callback;
|
||||
void Flow::out_of_order_callback(const out_of_order_callback_type& callback) {
|
||||
on_out_of_order_callback_ = callback;
|
||||
}
|
||||
|
||||
void Flow::process_packet(PDU& pdu) {
|
||||
@@ -118,6 +118,11 @@ void Flow::process_packet(PDU& pdu) {
|
||||
if (seq_compare(chunk_end, seq_number_) >= 0) {
|
||||
bool added_some = false;
|
||||
uint32_t seq = tcp->seq();
|
||||
// If we're going to buffer this and we have a buffering callback, execute it
|
||||
if (seq > seq_number_ && on_out_of_order_callback_) {
|
||||
on_out_of_order_callback_(*this, seq, raw->payload());
|
||||
}
|
||||
|
||||
// If it starts before our sequence number, slice it
|
||||
if (seq_compare(seq, seq_number_) < 0) {
|
||||
const uint32_t diff = seq_number_ - seq;
|
||||
@@ -173,11 +178,6 @@ void Flow::process_packet(PDU& pdu) {
|
||||
on_data_callback_(*this);
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (on_buffering_callback_) {
|
||||
on_buffering_callback_(*this);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -337,24 +337,24 @@ const Flow& Stream::server_flow() const {
|
||||
return server_flow_;
|
||||
}
|
||||
|
||||
void Stream::stream_closed_callback(const stream_callback& callback) {
|
||||
void Stream::stream_closed_callback(const stream_callback_type& callback) {
|
||||
on_stream_closed_ = callback;
|
||||
}
|
||||
|
||||
void Stream::client_data_callback(const stream_callback& callback) {
|
||||
void Stream::client_data_callback(const stream_callback_type& callback) {
|
||||
on_client_data_callback_ = callback;
|
||||
}
|
||||
|
||||
void Stream::server_data_callback(const stream_callback& callback) {
|
||||
void Stream::server_data_callback(const stream_callback_type& callback) {
|
||||
on_server_data_callback_ = callback;
|
||||
}
|
||||
|
||||
void Stream::client_buffering_callback(const stream_callback& callback) {
|
||||
on_client_buffering_callback_ = callback;
|
||||
void Stream::client_out_of_order_callback(const out_of_order_callback_type& callback) {
|
||||
on_client_out_of_order_callback_ = callback;
|
||||
}
|
||||
|
||||
void Stream::server_buffering_callback(const stream_callback& callback) {
|
||||
on_server_buffering_callback_ = callback;
|
||||
void Stream::server_out_of_order_callback(const out_of_order_callback_type& callback) {
|
||||
on_server_out_of_order_callback_ = callback;
|
||||
}
|
||||
|
||||
void Stream::ignore_client_data() {
|
||||
@@ -467,11 +467,14 @@ Flow Stream::extract_server_flow(const PDU& packet) {
|
||||
}
|
||||
|
||||
void Stream::setup_flows_callbacks() {
|
||||
using std::placeholders::_1;
|
||||
using namespace std::placeholders;
|
||||
|
||||
client_flow_.data_callback(bind(&Stream::on_client_flow_data, this, _1));
|
||||
server_flow_.data_callback(bind(&Stream::on_server_flow_data, this, _1));
|
||||
client_flow_.buffering_callback(bind(&Stream::on_client_buffering, this, _1));
|
||||
server_flow_.buffering_callback(bind(&Stream::on_server_buffering, this, _1));
|
||||
client_flow_.out_of_order_callback(bind(&Stream::on_client_out_of_order,
|
||||
this, _1, _2, _3));
|
||||
server_flow_.out_of_order_callback(bind(&Stream::on_server_out_of_order,
|
||||
this, _1, _2, _3));
|
||||
}
|
||||
|
||||
void Stream::auto_cleanup_payloads(bool value) {
|
||||
@@ -496,15 +499,19 @@ void Stream::on_server_flow_data(const Flow& /*flow*/) {
|
||||
}
|
||||
}
|
||||
|
||||
void Stream::on_client_buffering(const Flow& flow) {
|
||||
if (on_client_buffering_callback_) {
|
||||
on_client_buffering_callback_(*this);
|
||||
void Stream::on_client_out_of_order(const Flow& flow,
|
||||
uint32_t seq,
|
||||
const payload_type& payload) {
|
||||
if (on_client_out_of_order_callback_) {
|
||||
on_client_out_of_order_callback_(*this, seq, payload);
|
||||
}
|
||||
}
|
||||
|
||||
void Stream::on_server_buffering(const Flow& flow) {
|
||||
if (on_server_buffering_callback_) {
|
||||
on_server_buffering_callback_(*this);
|
||||
void Stream::on_server_out_of_order(const Flow& flow,
|
||||
uint32_t seq,
|
||||
const payload_type& payload) {
|
||||
if (on_server_out_of_order_callback_) {
|
||||
on_server_out_of_order_callback_(*this, seq, payload);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -562,7 +569,7 @@ bool StreamFollower::process_packet(PDU& packet) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void StreamFollower::new_stream_callback(const stream_callback& callback) {
|
||||
void StreamFollower::new_stream_callback(const stream_callback_type& callback) {
|
||||
on_new_connection_ = callback;
|
||||
}
|
||||
|
||||
@@ -633,7 +640,7 @@ StreamFollower::stream_id::stream_id(const address_type& client_addr,
|
||||
const address_type& server_addr,
|
||||
uint16_t server_port)
|
||||
: min_address(client_addr), max_address(server_addr), min_address_port(client_port),
|
||||
max_address_port(server_port) {
|
||||
max_address_port(server_port) {
|
||||
if (min_address > max_address) {
|
||||
swap(min_address, max_address);
|
||||
swap(min_address_port, max_address_port);
|
||||
|
||||
@@ -45,7 +45,7 @@ public:
|
||||
void on_new_stream(Stream& stream);
|
||||
void cumulative_stream_client_data_handler(Stream& stream);
|
||||
void cumulative_stream_server_data_handler(Stream& stream);
|
||||
void buffered_payload_handle(Flow& session);
|
||||
void out_of_order_handler(Flow& session, uint32_t seq, Flow::payload_type payload);
|
||||
void run_test(uint32_t initial_seq, const ordering_info_type& chunks,
|
||||
const string& payload);
|
||||
void run_test(uint32_t initial_seq, const ordering_info_type& chunks);
|
||||
@@ -64,6 +64,7 @@ public:
|
||||
uint16_t dst_port);
|
||||
|
||||
vector<Flow::payload_type> flow_payload_chunks;
|
||||
vector<pair<uint32_t, Flow::payload_type> > flow_out_of_order_chunks;
|
||||
vector<Flow::payload_type> stream_client_payload_chunks;
|
||||
vector<Flow::payload_type> stream_server_payload_chunks;
|
||||
};
|
||||
@@ -127,8 +128,8 @@ void FlowTest::cumulative_stream_server_data_handler(Stream& stream) {
|
||||
stream_server_payload_chunks.push_back(stream.server_flow().payload());
|
||||
}
|
||||
|
||||
void FlowTest::buffered_payload_handle(Flow& session) {
|
||||
|
||||
void FlowTest::out_of_order_handler(Flow& session, uint32_t seq, Flow::payload_type payload) {
|
||||
flow_out_of_order_chunks.push_back(make_pair(seq, move(payload)));
|
||||
}
|
||||
|
||||
void FlowTest::run_test(uint32_t initial_seq, const ordering_info_type& chunks,
|
||||
@@ -290,6 +291,36 @@ TEST_F(FlowTest, IgnoreDataPackets) {
|
||||
EXPECT_TRUE(flow_payload_chunks.empty());
|
||||
}
|
||||
|
||||
TEST_F(FlowTest, OutOfOrderCallback) {
|
||||
using namespace std::placeholders;
|
||||
|
||||
ordering_info_type chunks = split_payload(payload, 5);
|
||||
Flow flow(IPv4Address("1.2.3.4"), 22, 0);
|
||||
flow.out_of_order_callback(bind(&FlowTest::out_of_order_handler, this, _1, _2, _3));
|
||||
vector<EthernetII> packets = chunks_to_packets(0, chunks, payload);
|
||||
reverse(packets.begin(), packets.end());
|
||||
// Copy, as Flow::process_packet takes ownership of the payload
|
||||
vector<EthernetII> original_packets = packets;
|
||||
for (size_t i = 0; i < packets.size(); ++i) {
|
||||
flow.process_packet(packets[i]);
|
||||
}
|
||||
// All elements should be out of order except the first
|
||||
// one (last one in reverse order)
|
||||
ASSERT_EQ(original_packets.size() - 1, flow_out_of_order_chunks.size());
|
||||
for (size_t i = 0; i < original_packets.size() - 1; ++i) {
|
||||
// Compare sequence number
|
||||
EXPECT_EQ(
|
||||
original_packets[i].rfind_pdu<TCP>().seq(),
|
||||
flow_out_of_order_chunks[i].first
|
||||
);
|
||||
// Compare payload
|
||||
EXPECT_EQ(
|
||||
original_packets[i].rfind_pdu<RawPDU>().payload(),
|
||||
flow_out_of_order_chunks[i].second
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Stream follower tests
|
||||
|
||||
TEST_F(FlowTest, StreamFollower_ThreeWayHandshake) {
|
||||
|
||||
Reference in New Issue
Block a user