mirror of
https://github.com/mfontanini/libtins
synced 2026-01-29 13:04:28 +01:00
Refactor TCP stream code and add http_dump example
This commit is contained in:
262
src/tcp_ip.cpp
262
src/tcp_ip.cpp
@@ -68,33 +68,33 @@ int seq_compare(uint32_t seq1, uint32_t seq2) {
|
||||
}
|
||||
}
|
||||
|
||||
// TCPFlow
|
||||
// Flow
|
||||
|
||||
TCPFlow::TCPFlow(const IPv4Address& dest_address, uint16_t dest_port,
|
||||
uint32_t sequence_number)
|
||||
Flow::Flow(const IPv4Address& dest_address, uint16_t dest_port,
|
||||
uint32_t sequence_number)
|
||||
: seq_number_(sequence_number), dest_port_(dest_port), is_v6_(false),
|
||||
state_(UNKNOWN) {
|
||||
OutputMemoryStream output(dest_address_.data(), dest_address_.size());
|
||||
output.write(dest_address);
|
||||
}
|
||||
|
||||
TCPFlow::TCPFlow(const IPv6Address& dest_address, uint16_t dest_port,
|
||||
uint32_t sequence_number)
|
||||
Flow::Flow(const IPv6Address& dest_address, uint16_t dest_port,
|
||||
uint32_t sequence_number)
|
||||
: seq_number_(sequence_number), dest_port_(dest_port), is_v6_(true),
|
||||
state_(UNKNOWN) {
|
||||
OutputMemoryStream output(dest_address_.data(), dest_address_.size());
|
||||
output.write(dest_address);
|
||||
}
|
||||
|
||||
void TCPFlow::data_callback(const event_callback& callback) {
|
||||
void Flow::data_callback(const event_callback& callback) {
|
||||
on_data_callback_ = callback;
|
||||
}
|
||||
|
||||
void TCPFlow::buffering_callback(const event_callback& callback) {
|
||||
void Flow::buffering_callback(const event_callback& callback) {
|
||||
on_buffering_callback_= callback;
|
||||
}
|
||||
|
||||
void TCPFlow::process_packet(PDU& pdu) {
|
||||
void Flow::process_packet(PDU& pdu) {
|
||||
TCP* tcp = pdu.find_pdu<TCP>();
|
||||
RawPDU* raw = pdu.find_pdu<RawPDU>();
|
||||
// If we sent a packet with RST or FIN on, this flow is done
|
||||
@@ -172,7 +172,7 @@ void TCPFlow::process_packet(PDU& pdu) {
|
||||
}
|
||||
}
|
||||
|
||||
void TCPFlow::store_payload(uint32_t seq, const payload_type& payload) {
|
||||
void Flow::store_payload(uint32_t seq, const payload_type& payload) {
|
||||
buffered_payload_type::iterator iter = buffered_payload_.find(seq);
|
||||
// New segment, store it
|
||||
if (iter == buffered_payload_.end()) {
|
||||
@@ -185,7 +185,7 @@ void TCPFlow::store_payload(uint32_t seq, const payload_type& payload) {
|
||||
}
|
||||
}
|
||||
|
||||
TCPFlow::buffered_payload_type::iterator TCPFlow::erase_iterator(buffered_payload_type::iterator iter) {
|
||||
Flow::buffered_payload_type::iterator Flow::erase_iterator(buffered_payload_type::iterator iter) {
|
||||
buffered_payload_type::iterator output = iter;
|
||||
++output;
|
||||
buffered_payload_.erase(iter);
|
||||
@@ -195,7 +195,7 @@ TCPFlow::buffered_payload_type::iterator TCPFlow::erase_iterator(buffered_payloa
|
||||
return output;
|
||||
}
|
||||
|
||||
void TCPFlow::update_state(const TCP& tcp) {
|
||||
void Flow::update_state(const TCP& tcp) {
|
||||
if ((tcp.flags() & TCP::FIN) != 0) {
|
||||
state_ = FIN_SENT;
|
||||
}
|
||||
@@ -212,15 +212,15 @@ void TCPFlow::update_state(const TCP& tcp) {
|
||||
}
|
||||
}
|
||||
|
||||
bool TCPFlow::is_v6() const {
|
||||
bool Flow::is_v6() const {
|
||||
return is_v6_;
|
||||
}
|
||||
|
||||
bool TCPFlow::is_finished() const {
|
||||
bool Flow::is_finished() const {
|
||||
return state_ == FIN_SENT || state_ == RST_SENT;
|
||||
}
|
||||
|
||||
bool TCPFlow::packet_belongs(const PDU& packet) const {
|
||||
bool Flow::packet_belongs(const PDU& packet) const {
|
||||
if (is_v6()) {
|
||||
const IPv6* ip = packet.find_pdu<IPv6>();
|
||||
if (!ip || ip->dst_addr() != dst_addr_v6()) {
|
||||
@@ -237,105 +237,177 @@ bool TCPFlow::packet_belongs(const PDU& packet) const {
|
||||
return tcp && tcp->dport() == dport();
|
||||
}
|
||||
|
||||
IPv4Address TCPFlow::dst_addr_v4() const {
|
||||
IPv4Address Flow::dst_addr_v4() const {
|
||||
InputMemoryStream stream(dest_address_.data(), dest_address_.size());
|
||||
return stream.read<IPv4Address>();
|
||||
}
|
||||
|
||||
IPv6Address TCPFlow::dst_addr_v6() const {
|
||||
IPv6Address Flow::dst_addr_v6() const {
|
||||
InputMemoryStream stream(dest_address_.data(), dest_address_.size());
|
||||
return stream.read<IPv6Address>();
|
||||
}
|
||||
|
||||
uint16_t TCPFlow::dport() const {
|
||||
uint16_t Flow::dport() const {
|
||||
return dest_port_;
|
||||
}
|
||||
|
||||
const TCPFlow::payload_type& TCPFlow::payload() const {
|
||||
const Flow::payload_type& Flow::payload() const {
|
||||
return payload_;
|
||||
}
|
||||
|
||||
TCPFlow::payload_type& TCPFlow::payload() {
|
||||
return payload_;
|
||||
}
|
||||
|
||||
void TCPFlow::state(State new_state) {
|
||||
state_ = new_state;
|
||||
}
|
||||
|
||||
TCPFlow::State TCPFlow::state() const {
|
||||
Flow::State Flow::state() const {
|
||||
return state_;
|
||||
}
|
||||
|
||||
uint32_t TCPFlow::sequence_number() const {
|
||||
uint32_t Flow::sequence_number() const {
|
||||
return seq_number_;
|
||||
}
|
||||
|
||||
// TCPStream
|
||||
const Flow::buffered_payload_type& Flow::buffered_payload() const {
|
||||
return buffered_payload_;
|
||||
}
|
||||
|
||||
TCPStream::TCPStream(const PDU& packet)
|
||||
Flow::buffered_payload_type& Flow::buffered_payload() {
|
||||
return buffered_payload_;
|
||||
}
|
||||
|
||||
Flow::payload_type& Flow::payload() {
|
||||
return payload_;
|
||||
}
|
||||
|
||||
void Flow::state(State new_state) {
|
||||
state_ = new_state;
|
||||
}
|
||||
|
||||
// Stream
|
||||
|
||||
Stream::Stream(const PDU& packet)
|
||||
: client_flow_(extract_client_flow(packet)),
|
||||
server_flow_(extract_server_flow(packet)) {
|
||||
|
||||
}
|
||||
|
||||
TCPStream::TCPStream(const TCPFlow& client_flow, const TCPFlow& server_flow)
|
||||
Stream::Stream(const Flow& client_flow, const Flow& server_flow)
|
||||
: client_flow_(client_flow), server_flow_(server_flow) {
|
||||
|
||||
}
|
||||
|
||||
void TCPStream::process_packet(PDU& packet) {
|
||||
void Stream::process_packet(PDU& packet) {
|
||||
if (client_flow_.packet_belongs(packet)) {
|
||||
client_flow_.process_packet(packet);
|
||||
}
|
||||
else if (server_flow_.packet_belongs(packet)) {
|
||||
server_flow_.process_packet(packet);
|
||||
}
|
||||
if (is_finished() && on_stream_closed_) {
|
||||
on_stream_closed_(*this);
|
||||
}
|
||||
}
|
||||
|
||||
TCPFlow& TCPStream::client_flow() {
|
||||
Flow& Stream::client_flow() {
|
||||
return client_flow_;
|
||||
}
|
||||
|
||||
const TCPFlow& TCPStream::client_flow() const {
|
||||
const Flow& Stream::client_flow() const {
|
||||
return client_flow_;
|
||||
}
|
||||
|
||||
TCPFlow& TCPStream::server_flow() {
|
||||
Flow& Stream::server_flow() {
|
||||
return server_flow_;
|
||||
}
|
||||
|
||||
const TCPFlow& TCPStream::server_flow() const {
|
||||
const Flow& Stream::server_flow() const {
|
||||
return server_flow_;
|
||||
}
|
||||
|
||||
void TCPStream::client_data_callback(const stream_callback& callback) {
|
||||
void Stream::stream_closed_callback(const stream_callback& callback) {
|
||||
on_stream_closed_ = callback;
|
||||
}
|
||||
|
||||
void Stream::client_data_callback(const stream_callback& callback) {
|
||||
on_client_data_callback_ = callback;
|
||||
}
|
||||
|
||||
void TCPStream::server_data_callback(const stream_callback& callback) {
|
||||
void Stream::server_data_callback(const stream_callback& callback) {
|
||||
on_server_data_callback_ = callback;
|
||||
}
|
||||
|
||||
void TCPStream::client_buffering_callback(const stream_callback& callback) {
|
||||
void Stream::client_buffering_callback(const stream_callback& callback) {
|
||||
on_client_buffering_callback_ = callback;
|
||||
}
|
||||
|
||||
void TCPStream::server_buffering_callback(const stream_callback& callback) {
|
||||
void Stream::server_buffering_callback(const stream_callback& callback) {
|
||||
on_server_buffering_callback_ = callback;
|
||||
}
|
||||
|
||||
TCPFlow TCPStream::extract_client_flow(const PDU& packet) {
|
||||
bool Stream::is_finished() const {
|
||||
const Flow::State client_state = client_flow_.state();
|
||||
const Flow::State server_state = server_flow_.state();
|
||||
// If either peer sent a RST then the stream is done
|
||||
if (client_state == Flow::RST_SENT || server_state == Flow::RST_SENT) {
|
||||
return true;
|
||||
}
|
||||
else {
|
||||
// Otherwise, only finish if both sent a FIN
|
||||
return client_state == Flow::FIN_SENT && server_state == Flow::FIN_SENT;
|
||||
}
|
||||
}
|
||||
|
||||
bool Stream::is_v6() const {
|
||||
return server_flow().is_v6();
|
||||
}
|
||||
|
||||
IPv4Address Stream::client_addr_v4() const {
|
||||
return server_flow().dst_addr_v4();
|
||||
}
|
||||
|
||||
IPv6Address Stream::client_addr_v6() const {
|
||||
return server_flow().dst_addr_v6();
|
||||
}
|
||||
|
||||
IPv4Address Stream::server_addr_v4() const {
|
||||
return client_flow().dst_addr_v4();
|
||||
}
|
||||
|
||||
IPv6Address Stream::server_addr_v6() const {
|
||||
return client_flow().dst_addr_v6();
|
||||
}
|
||||
|
||||
uint16_t Stream::client_port() const {
|
||||
return server_flow().dport();
|
||||
}
|
||||
|
||||
uint16_t Stream::server_port() const {
|
||||
return client_flow().dport();
|
||||
}
|
||||
|
||||
const Stream::payload_type& Stream::client_payload() const {
|
||||
return client_flow().payload();
|
||||
}
|
||||
|
||||
Stream::payload_type& Stream::client_payload() {
|
||||
return client_flow().payload();
|
||||
}
|
||||
|
||||
const Stream::payload_type& Stream::server_payload() const {
|
||||
return server_flow().payload();
|
||||
}
|
||||
|
||||
Stream::payload_type& Stream::server_payload() {
|
||||
return server_flow().payload();
|
||||
}
|
||||
|
||||
Flow Stream::extract_client_flow(const PDU& packet) {
|
||||
const TCP* tcp = packet.find_pdu<TCP>();
|
||||
if (!tcp) {
|
||||
// TODO: define proper exception
|
||||
throw runtime_error("No TCP");
|
||||
}
|
||||
if (const IP* ip = packet.find_pdu<IP>()) {
|
||||
return TCPFlow(ip->dst_addr(), tcp->dport(), tcp->seq());
|
||||
return Flow(ip->dst_addr(), tcp->dport(), tcp->seq());
|
||||
}
|
||||
else if (const IPv6* ip = packet.find_pdu<IPv6>()) {
|
||||
return TCPFlow(ip->dst_addr(), tcp->dport(), tcp->seq());
|
||||
return Flow(ip->dst_addr(), tcp->dport(), tcp->seq());
|
||||
}
|
||||
else {
|
||||
// TODO: define proper exception
|
||||
@@ -343,17 +415,17 @@ TCPFlow TCPStream::extract_client_flow(const PDU& packet) {
|
||||
}
|
||||
}
|
||||
|
||||
TCPFlow TCPStream::extract_server_flow(const PDU& packet) {
|
||||
Flow Stream::extract_server_flow(const PDU& packet) {
|
||||
const TCP* tcp = packet.find_pdu<TCP>();
|
||||
if (!tcp) {
|
||||
// TODO: define proper exception
|
||||
throw runtime_error("No TCP");
|
||||
}
|
||||
if (const IP* ip = packet.find_pdu<IP>()) {
|
||||
return TCPFlow(ip->src_addr(), tcp->sport(), tcp->ack_seq());
|
||||
return Flow(ip->src_addr(), tcp->sport(), tcp->ack_seq());
|
||||
}
|
||||
else if (const IPv6* ip = packet.find_pdu<IPv6>()) {
|
||||
return TCPFlow(ip->src_addr(), tcp->sport(), tcp->ack_seq());
|
||||
return Flow(ip->src_addr(), tcp->sport(), tcp->ack_seq());
|
||||
}
|
||||
else {
|
||||
// TODO: define proper exception
|
||||
@@ -361,46 +433,49 @@ TCPFlow TCPStream::extract_server_flow(const PDU& packet) {
|
||||
}
|
||||
}
|
||||
|
||||
void TCPStream::setup_flows_callbacks() {
|
||||
void Stream::setup_flows_callbacks() {
|
||||
using std::placeholders::_1;
|
||||
client_flow_.data_callback(bind(&TCPStream::on_client_flow_data, this, _1));
|
||||
server_flow_.data_callback(bind(&TCPStream::on_server_flow_data, this, _1));
|
||||
client_flow_.buffering_callback(bind(&TCPStream::on_client_buffering, this, _1));
|
||||
server_flow_.buffering_callback(bind(&TCPStream::on_server_buffering, this, _1));
|
||||
client_flow_.data_callback(bind(&Stream::on_client_flow_data, this, _1));
|
||||
server_flow_.data_callback(bind(&Stream::on_server_flow_data, this, _1));
|
||||
client_flow_.buffering_callback(bind(&Stream::on_client_buffering, this, _1));
|
||||
server_flow_.buffering_callback(bind(&Stream::on_server_buffering, this, _1));
|
||||
}
|
||||
|
||||
void TCPStream::on_client_flow_data(const TCPFlow& flow) {
|
||||
|
||||
void Stream::on_client_flow_data(const Flow& flow) {
|
||||
if (on_client_data_callback_) {
|
||||
on_client_data_callback_(*this);
|
||||
}
|
||||
}
|
||||
|
||||
void TCPStream::on_server_flow_data(const TCPFlow& flow) {
|
||||
void Stream::on_server_flow_data(const Flow& flow) {
|
||||
if (on_server_data_callback_) {
|
||||
on_server_data_callback_(*this);
|
||||
}
|
||||
}
|
||||
|
||||
void TCPStream::on_client_buffering(const TCPFlow& flow) {
|
||||
void Stream::on_client_buffering(const Flow& flow) {
|
||||
if (on_client_buffering_callback_) {
|
||||
on_client_buffering_callback_(*this);
|
||||
}
|
||||
}
|
||||
|
||||
void TCPStream::on_server_buffering(const TCPFlow& flow) {
|
||||
void Stream::on_server_buffering(const Flow& flow) {
|
||||
if (on_server_buffering_callback_) {
|
||||
on_server_buffering_callback_(*this);
|
||||
}
|
||||
}
|
||||
|
||||
// TCPStreamFollower
|
||||
// StreamFollower
|
||||
|
||||
TCPStreamFollower::TCPStreamFollower()
|
||||
: attach_to_flows_(false) {
|
||||
const size_t StreamFollower::DEFAULT_MAX_BUFFERED_CHUNKS = 512;
|
||||
|
||||
StreamFollower::StreamFollower()
|
||||
: max_buffered_chunks_(DEFAULT_MAX_BUFFERED_CHUNKS), attach_to_flows_(false) {
|
||||
|
||||
}
|
||||
|
||||
void TCPStreamFollower::process_packet(PDU& packet) {
|
||||
bool StreamFollower::process_packet(PDU& packet) {
|
||||
stream_id identifier = make_stream_id(packet);
|
||||
streams_type::iterator iter = streams_.find(identifier);
|
||||
bool process = true;
|
||||
@@ -409,17 +484,24 @@ void TCPStreamFollower::process_packet(PDU& packet) {
|
||||
// Start tracking if they're either SYNs or they contain data (attach
|
||||
// to an already running flow).
|
||||
if (tcp.flags() == TCP::SYN || (attach_to_flows_ && tcp.find_pdu<RawPDU>() != 0)) {
|
||||
iter = streams_.insert(make_pair(identifier, make_stream(packet))).first;
|
||||
iter = streams_.insert(make_pair(identifier, Stream(packet))).first;
|
||||
iter->second.setup_flows_callbacks();
|
||||
if (on_new_connection_) {
|
||||
on_new_connection_(iter->second);
|
||||
}
|
||||
else {
|
||||
// TODO: use proper exception
|
||||
throw runtime_error("No new connection callback set");
|
||||
}
|
||||
if (tcp.flags() == TCP::SYN) {
|
||||
// If it's a SYN, set the proper state
|
||||
iter->second.client_flow().state(TCPFlow::SYN_SENT);
|
||||
iter->second.client_flow().state(Flow::SYN_SENT);
|
||||
process = false;
|
||||
}
|
||||
else {
|
||||
// Otherwise, assume the connection is established
|
||||
iter->second.client_flow().state(TCPFlow::ESTABLISHED);
|
||||
iter->second.server_flow().state(TCPFlow::ESTABLISHED);
|
||||
iter->second.client_flow().state(Flow::ESTABLISHED);
|
||||
iter->second.server_flow().state(Flow::ESTABLISHED);
|
||||
}
|
||||
}
|
||||
else {
|
||||
@@ -429,28 +511,23 @@ void TCPStreamFollower::process_packet(PDU& packet) {
|
||||
// We'll process it if we had already seen this stream or if we just attached to
|
||||
// it and it contains payload
|
||||
if (process) {
|
||||
iter->second.process_packet(packet);
|
||||
Stream& stream = iter->second;
|
||||
stream.process_packet(packet);
|
||||
size_t total_chunks = stream.client_flow().buffered_payload().size() +
|
||||
stream.server_flow().buffered_payload().size();
|
||||
if (stream.is_finished() || total_chunks > max_buffered_chunks_) {
|
||||
streams_.erase(iter);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void TCPStreamFollower::client_data_callback(const stream_callback& callback) {
|
||||
on_client_data_callback_ = callback;
|
||||
void StreamFollower::new_stream_callback(const stream_callback& callback) {
|
||||
on_new_connection_ = callback;
|
||||
}
|
||||
|
||||
void TCPStreamFollower::server_data_callback(const stream_callback& callback) {
|
||||
on_server_data_callback_ = callback;
|
||||
}
|
||||
|
||||
void TCPStreamFollower::client_buffering_callback(const stream_callback& callback) {
|
||||
on_client_buffering_callback_ = callback;
|
||||
}
|
||||
|
||||
void TCPStreamFollower::server_buffering_callback(const stream_callback& callback) {
|
||||
on_server_buffering_callback_ = callback;
|
||||
}
|
||||
|
||||
TCPStream& TCPStreamFollower::find_stream(IPv4Address client_addr, uint16_t client_port,
|
||||
IPv4Address server_addr, uint16_t server_port) {
|
||||
Stream& StreamFollower::find_stream(IPv4Address client_addr, uint16_t client_port,
|
||||
IPv4Address server_addr, uint16_t server_port) {
|
||||
stream_id identifier(serialize(client_addr), client_port,
|
||||
serialize(server_addr), server_port);
|
||||
streams_type::iterator iter = streams_.find(identifier);
|
||||
@@ -463,7 +540,7 @@ TCPStream& TCPStreamFollower::find_stream(IPv4Address client_addr, uint16_t clie
|
||||
}
|
||||
}
|
||||
|
||||
TCPStreamFollower::stream_id TCPStreamFollower::make_stream_id(const PDU& packet) {
|
||||
StreamFollower::stream_id StreamFollower::make_stream_id(const PDU& packet) {
|
||||
const TCP* tcp = packet.find_pdu<TCP>();
|
||||
if (!tcp) {
|
||||
// TODO: define proper exception
|
||||
@@ -483,23 +560,14 @@ TCPStreamFollower::stream_id TCPStreamFollower::make_stream_id(const PDU& packet
|
||||
}
|
||||
}
|
||||
|
||||
TCPStream TCPStreamFollower::make_stream(const PDU& packet) {
|
||||
TCPStream stream(packet);
|
||||
stream.client_data_callback(on_client_data_callback_);
|
||||
stream.server_data_callback(on_server_data_callback_);
|
||||
stream.client_buffering_callback(on_client_buffering_callback_);
|
||||
stream.server_buffering_callback(on_server_buffering_callback_);
|
||||
return stream;
|
||||
}
|
||||
|
||||
TCPStreamFollower::address_type TCPStreamFollower::serialize(IPv4Address address) {
|
||||
StreamFollower::address_type StreamFollower::serialize(IPv4Address address) {
|
||||
address_type addr;
|
||||
OutputMemoryStream output(addr.data(), addr.size());
|
||||
output.write(address);
|
||||
return addr;
|
||||
}
|
||||
|
||||
TCPStreamFollower::address_type TCPStreamFollower::serialize(const IPv6Address& address) {
|
||||
StreamFollower::address_type StreamFollower::serialize(const IPv6Address& address) {
|
||||
address_type addr;
|
||||
OutputMemoryStream output(addr.data(), addr.size());
|
||||
output.write(address);
|
||||
@@ -508,10 +576,10 @@ TCPStreamFollower::address_type TCPStreamFollower::serialize(const IPv6Address&
|
||||
|
||||
// stream_id
|
||||
|
||||
TCPStreamFollower::stream_id::stream_id(const address_type& client_addr,
|
||||
uint16_t client_port,
|
||||
const address_type& server_addr,
|
||||
uint16_t server_port)
|
||||
StreamFollower::stream_id::stream_id(const address_type& client_addr,
|
||||
uint16_t client_port,
|
||||
const address_type& server_addr,
|
||||
uint16_t server_port)
|
||||
: min_address(client_addr), max_address(server_addr), min_address_port(client_port),
|
||||
max_address_port(server_port) {
|
||||
if (min_address > max_address) {
|
||||
@@ -520,7 +588,7 @@ max_address_port(server_port) {
|
||||
}
|
||||
}
|
||||
|
||||
bool TCPStreamFollower::stream_id::operator<(const stream_id& rhs) const {
|
||||
bool StreamFollower::stream_id::operator<(const stream_id& rhs) const {
|
||||
return tie(min_address, min_address_port, max_address, max_address_port) <
|
||||
tie(rhs.min_address, rhs.min_address_port, rhs.max_address, rhs.max_address_port);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user