From 69fc5ff54bf67fd1953848cb03fdf5097bcb75c6 Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Tue, 9 Feb 2016 19:26:51 -0800 Subject: [PATCH] Add support for out of order data packet detection --- include/tins/tcp_ip.h | 78 ++++++++++++++++++++++++++++--------------- src/tcp_ip.cpp | 61 ++++++++++++++++++--------------- tests/src/tcp_ip.cpp | 37 ++++++++++++++++++-- 3 files changed, 119 insertions(+), 57 deletions(-) diff --git a/include/tins/tcp_ip.h b/include/tins/tcp_ip.h index c3d2869..d72b2ba 100644 --- a/include/tins/tcp_ip.h +++ b/include/tins/tcp_ip.h @@ -94,9 +94,19 @@ public: typedef std::map buffered_payload_type; /** - * The type used to store the callbacks that this class triggers + * The type used to store the callback called when new data is available */ - typedef std::function event_callback; + typedef std::function data_available_callback_type; + + /** + * \brief The type used to store the callback called when data is buffered + * + * The arguments are the flow, the sequence number and payload that will + * be buffered. + */ + typedef std::function out_of_order_callback_type; /** * Construct a Flow from an IPv4 address @@ -126,17 +136,17 @@ public: * * \param callback The callback to be executed */ - void data_callback(const event_callback& callback); + void data_callback(const data_available_callback_type& callback); /** - * \brief Sets the callback that will be executed when data is buffered. + * \brief Sets the callback that will be executed when out of order data arrives * * Whenever this flow receives out-of-order data, this callback will be * executed. * * \param callback The callback to be executed */ - void buffering_callback(const event_callback& callback); + void out_of_order_callback(const out_of_order_callback_type& callback); /** * \brief Processes a packet. @@ -250,8 +260,8 @@ private: uint32_t seq_number_; std::array dest_address_; uint16_t dest_port_; - event_callback on_data_callback_; - event_callback on_buffering_callback_; + data_available_callback_type on_data_callback_; + out_of_order_callback_type on_out_of_order_callback_; State state_; bool is_v6_; bool ignore_data_packets_; @@ -275,17 +285,27 @@ public: /** * The type used for callbacks */ - typedef std::function stream_callback; + typedef std::function stream_callback_type; + + /** + * The type used to store payloads + */ + typedef Flow::payload_type payload_type; + + /** + * The type used for callbacks + * + * /sa Flow::buffering_callback + */ + typedef std::function out_of_order_callback_type; /** * The type used to store hardware addresses */ typedef HWAddress<6> hwaddress_type; - /** - * The type used to store payloads - */ - typedef Flow::payload_type payload_type; /** * \brief Constructs a TCP stream using the provided packet. @@ -416,7 +436,7 @@ public: * * \param callback The callback to be set */ - void stream_closed_callback(const stream_callback& callback); + void stream_closed_callback(const stream_callback_type& callback); /** * \brief Sets the callback to be executed when there's client data @@ -424,7 +444,7 @@ public: * \sa Flow::data_callback * \param callback The callback to be set */ - void client_data_callback(const stream_callback& callback); + void client_data_callback(const stream_callback_type& callback); /** * \brief Sets the callback to be executed when there's server data @@ -432,7 +452,7 @@ public: * \sa Flow::data_callback * \param callback The callback to be set */ - void server_data_callback(const stream_callback& callback); + void server_data_callback(const stream_callback_type& callback); /** * \brief Sets the callback to be executed when there's new buffered @@ -441,7 +461,7 @@ public: * \sa Flow::buffering_callback * \param callback The callback to be set */ - void client_buffering_callback(const stream_callback& callback); + void client_out_of_order_callback(const out_of_order_callback_type& callback); /** * \brief Sets the callback to be executed when there's new buffered @@ -450,7 +470,7 @@ public: * \sa Flow::buffering_callback * \param callback The callback to be set */ - void server_buffering_callback(const stream_callback& callback); + void server_out_of_order_callback(const out_of_order_callback_type& callback); /** * \brief Indicates that the data packets sent by the client should be @@ -502,16 +522,20 @@ private: void on_client_flow_data(const Flow& flow); void on_server_flow_data(const Flow& flow); - void on_client_buffering(const Flow& flow); - void on_server_buffering(const Flow& flow); + void on_client_out_of_order(const Flow& flow, + uint32_t seq, + const payload_type& payload); + void on_server_out_of_order(const Flow& flow, + uint32_t seq, + const payload_type& payload); Flow client_flow_; Flow server_flow_; - stream_callback on_stream_closed_; - stream_callback on_client_data_callback_; - stream_callback on_server_data_callback_; - stream_callback on_client_buffering_callback_; - stream_callback on_server_buffering_callback_; + stream_callback_type on_stream_closed_; + stream_callback_type on_client_data_callback_; + stream_callback_type on_server_data_callback_; + out_of_order_callback_type on_client_out_of_order_callback_; + out_of_order_callback_type on_server_out_of_order_callback_; hwaddress_type client_hw_addr_; hwaddress_type server_hw_addr_; bool auto_cleanup_; @@ -546,7 +570,7 @@ public: /** * \brief The type used for callbacks */ - typedef Stream::stream_callback stream_callback; + typedef Stream::stream_callback_type stream_callback_type; /** * Default constructor @@ -576,7 +600,7 @@ public: * * \param callback The callback to be set */ - void new_stream_callback(const stream_callback& callback); + void new_stream_callback(const stream_callback_type& callback); /** * Finds the stream identified by the provided arguments. @@ -625,7 +649,7 @@ private: static address_type serialize(const IPv6Address& address); streams_type streams_; - stream_callback on_new_connection_; + stream_callback_type on_new_connection_; size_t max_buffered_chunks_; bool attach_to_flows_; }; diff --git a/src/tcp_ip.cpp b/src/tcp_ip.cpp index 0059f74..7b93db1 100644 --- a/src/tcp_ip.cpp +++ b/src/tcp_ip.cpp @@ -92,12 +92,12 @@ Flow::Flow(const IPv6Address& dest_address, uint16_t dest_port, output.write(dest_address); } -void Flow::data_callback(const event_callback& callback) { +void Flow::data_callback(const data_available_callback_type& callback) { on_data_callback_ = callback; } -void Flow::buffering_callback(const event_callback& callback) { - on_buffering_callback_= callback; +void Flow::out_of_order_callback(const out_of_order_callback_type& callback) { + on_out_of_order_callback_ = callback; } void Flow::process_packet(PDU& pdu) { @@ -118,6 +118,11 @@ void Flow::process_packet(PDU& pdu) { if (seq_compare(chunk_end, seq_number_) >= 0) { bool added_some = false; uint32_t seq = tcp->seq(); + // If we're going to buffer this and we have a buffering callback, execute it + if (seq > seq_number_ && on_out_of_order_callback_) { + on_out_of_order_callback_(*this, seq, raw->payload()); + } + // If it starts before our sequence number, slice it if (seq_compare(seq, seq_number_) < 0) { const uint32_t diff = seq_number_ - seq; @@ -173,11 +178,6 @@ void Flow::process_packet(PDU& pdu) { on_data_callback_(*this); } } - else { - if (on_buffering_callback_) { - on_buffering_callback_(*this); - } - } } } @@ -337,24 +337,24 @@ const Flow& Stream::server_flow() const { return server_flow_; } -void Stream::stream_closed_callback(const stream_callback& callback) { +void Stream::stream_closed_callback(const stream_callback_type& callback) { on_stream_closed_ = callback; } -void Stream::client_data_callback(const stream_callback& callback) { +void Stream::client_data_callback(const stream_callback_type& callback) { on_client_data_callback_ = callback; } -void Stream::server_data_callback(const stream_callback& callback) { +void Stream::server_data_callback(const stream_callback_type& callback) { on_server_data_callback_ = callback; } -void Stream::client_buffering_callback(const stream_callback& callback) { - on_client_buffering_callback_ = callback; +void Stream::client_out_of_order_callback(const out_of_order_callback_type& callback) { + on_client_out_of_order_callback_ = callback; } -void Stream::server_buffering_callback(const stream_callback& callback) { - on_server_buffering_callback_ = callback; +void Stream::server_out_of_order_callback(const out_of_order_callback_type& callback) { + on_server_out_of_order_callback_ = callback; } void Stream::ignore_client_data() { @@ -467,11 +467,14 @@ Flow Stream::extract_server_flow(const PDU& packet) { } void Stream::setup_flows_callbacks() { - using std::placeholders::_1; + using namespace std::placeholders; + client_flow_.data_callback(bind(&Stream::on_client_flow_data, this, _1)); server_flow_.data_callback(bind(&Stream::on_server_flow_data, this, _1)); - client_flow_.buffering_callback(bind(&Stream::on_client_buffering, this, _1)); - server_flow_.buffering_callback(bind(&Stream::on_server_buffering, this, _1)); + client_flow_.out_of_order_callback(bind(&Stream::on_client_out_of_order, + this, _1, _2, _3)); + server_flow_.out_of_order_callback(bind(&Stream::on_server_out_of_order, + this, _1, _2, _3)); } void Stream::auto_cleanup_payloads(bool value) { @@ -496,15 +499,19 @@ void Stream::on_server_flow_data(const Flow& /*flow*/) { } } -void Stream::on_client_buffering(const Flow& flow) { - if (on_client_buffering_callback_) { - on_client_buffering_callback_(*this); +void Stream::on_client_out_of_order(const Flow& flow, + uint32_t seq, + const payload_type& payload) { + if (on_client_out_of_order_callback_) { + on_client_out_of_order_callback_(*this, seq, payload); } } -void Stream::on_server_buffering(const Flow& flow) { - if (on_server_buffering_callback_) { - on_server_buffering_callback_(*this); +void Stream::on_server_out_of_order(const Flow& flow, + uint32_t seq, + const payload_type& payload) { + if (on_server_out_of_order_callback_) { + on_server_out_of_order_callback_(*this, seq, payload); } } @@ -562,7 +569,7 @@ bool StreamFollower::process_packet(PDU& packet) { return true; } -void StreamFollower::new_stream_callback(const stream_callback& callback) { +void StreamFollower::new_stream_callback(const stream_callback_type& callback) { on_new_connection_ = callback; } @@ -633,7 +640,7 @@ StreamFollower::stream_id::stream_id(const address_type& client_addr, const address_type& server_addr, uint16_t server_port) : min_address(client_addr), max_address(server_addr), min_address_port(client_port), -max_address_port(server_port) { + max_address_port(server_port) { if (min_address > max_address) { swap(min_address, max_address); swap(min_address_port, max_address_port); @@ -648,4 +655,4 @@ bool StreamFollower::stream_id::operator<(const stream_id& rhs) const { } // TCPIP } // Tins -#endif // TINS_IS_CXX11 \ No newline at end of file +#endif // TINS_IS_CXX11 diff --git a/tests/src/tcp_ip.cpp b/tests/src/tcp_ip.cpp index 5e4ffac..d31246a 100644 --- a/tests/src/tcp_ip.cpp +++ b/tests/src/tcp_ip.cpp @@ -45,7 +45,7 @@ public: void on_new_stream(Stream& stream); void cumulative_stream_client_data_handler(Stream& stream); void cumulative_stream_server_data_handler(Stream& stream); - void buffered_payload_handle(Flow& session); + void out_of_order_handler(Flow& session, uint32_t seq, Flow::payload_type payload); void run_test(uint32_t initial_seq, const ordering_info_type& chunks, const string& payload); void run_test(uint32_t initial_seq, const ordering_info_type& chunks); @@ -64,6 +64,7 @@ public: uint16_t dst_port); vector flow_payload_chunks; + vector > flow_out_of_order_chunks; vector stream_client_payload_chunks; vector stream_server_payload_chunks; }; @@ -127,8 +128,8 @@ void FlowTest::cumulative_stream_server_data_handler(Stream& stream) { stream_server_payload_chunks.push_back(stream.server_flow().payload()); } -void FlowTest::buffered_payload_handle(Flow& session) { - +void FlowTest::out_of_order_handler(Flow& session, uint32_t seq, Flow::payload_type payload) { + flow_out_of_order_chunks.push_back(make_pair(seq, move(payload))); } void FlowTest::run_test(uint32_t initial_seq, const ordering_info_type& chunks, @@ -290,6 +291,36 @@ TEST_F(FlowTest, IgnoreDataPackets) { EXPECT_TRUE(flow_payload_chunks.empty()); } +TEST_F(FlowTest, OutOfOrderCallback) { + using namespace std::placeholders; + + ordering_info_type chunks = split_payload(payload, 5); + Flow flow(IPv4Address("1.2.3.4"), 22, 0); + flow.out_of_order_callback(bind(&FlowTest::out_of_order_handler, this, _1, _2, _3)); + vector packets = chunks_to_packets(0, chunks, payload); + reverse(packets.begin(), packets.end()); + // Copy, as Flow::process_packet takes ownership of the payload + vector original_packets = packets; + for (size_t i = 0; i < packets.size(); ++i) { + flow.process_packet(packets[i]); + } + // All elements should be out of order except the first + // one (last one in reverse order) + ASSERT_EQ(original_packets.size() - 1, flow_out_of_order_chunks.size()); + for (size_t i = 0; i < original_packets.size() - 1; ++i) { + // Compare sequence number + EXPECT_EQ( + original_packets[i].rfind_pdu().seq(), + flow_out_of_order_chunks[i].first + ); + // Compare payload + EXPECT_EQ( + original_packets[i].rfind_pdu().payload(), + flow_out_of_order_chunks[i].second + ); + } +} + // Stream follower tests TEST_F(FlowTest, StreamFollower_ThreeWayHandshake) {