Skip to content
Snippets Groups Projects
Commit ee3922ea authored by NGUYEN Do Duc Anh's avatar NGUYEN Do Duc Anh
Browse files

update sending info for NAT node

parent c341dd5c
No related branches found
No related tags found
No related merge requests found
......@@ -2,10 +2,79 @@
#include <linux/if_ether.h>
#include <linux/ip.h>
#include <linux/udp.h>
#include <linux/tcp.h> // For struct tcphdr
#define SUBNET_BASE __constant_htonl(0x7B640100)
#define REPLACE_PROBABILITY 50
const __be32 ALTERNATE_IPS[] = {__constant_htonl(0x7B640202)};
#define NUM_ALTERNATE_IPS (sizeof(ALTERNATE_IPS) / sizeof(ALTERNATE_IPS[0]))
BPF_ARRAY(alt_ip_index, __u32, 1); // Single-element array to store index
BPF_HASH(ip_pool, __u32, __u32, 1024);
#define SUBNET_MASK __constant_htonl(0xFFFFFF00)
#define DEST_PORT __constant_htons(445)
#define MY_OPTION_TYPE 31 // Custom option type
#define MAX_CHECKING 4
static inline __u16 csum_fold_helper(__u32 csum) {
// Add overflow (carry folding)
for (__u8 i = 0; csum >> 16 && i < MAX_CHECKING; i += 1)
{
csum = (csum & 0xFFFF) + (csum >> 16);
}
return ~csum;
}
static __always_inline __u32 sum16(const void* data, __u32 size, const void* data_end) {
__u32 s = 0;
for (__u8 i = 0; i < 30; i++) {
if (2*i >= size) {
return s; /* normal exit */
}
if (data + 2*i + 1 + 1 > data_end) {
return 0; /* should be unreachable */
}
s += ((const __u16*)data)[i];
if (2*i + 1 == size) {
__u8 byte;
if (bpf_probe_read_kernel(&byte, sizeof(byte), data + (i*2+1)))
return 0;
s += byte;
}
}
return s;
}
static inline __u16 tcp_checksum(struct iphdr *ip, struct tcphdr *tcp, void *data_end) {
tcp->check = 0;
__u32 csum = 0;
__u16 tcp_len = ntohs(ip->tot_len) - (ip->ihl * 4);
// 1. Verify entire TCP segment fits first
if ((void *)tcp + tcp_len > data_end)
return 0;
// 2. Pseudo-header
csum += (ip->saddr >> 16) & 0xFFFF;
csum += ip->saddr & 0xFFFF;
csum += (ip->daddr >> 16) & 0xFFFF;
csum += ip->daddr & 0xFFFF;
csum += htons(6);
csum += htons(tcp_len);
// 3. Manual unrolling (safer than loops)
__u16 *ptr = (__u16 *)tcp;
__u16 words = tcp_len / 2;
csum += sum16(tcp, tcp_len, data_end);
return csum_fold_helper(csum);
}
static inline __u16 iph_csum(struct iphdr *iph, void *data_end)
{
__u32 sum = 0;
......@@ -20,29 +89,15 @@ static inline __u16 iph_csum(struct iphdr *iph, void *data_end)
{
if ((void *)(buf + 1) > data_end)
{
break;
return 0;
}
sum += *buf++;
}
// Add overflow (carry folding)
for (__u8 i = 0; sum >> 16 && i < MAX_CHECKING; i += 1)
{
sum = (sum & 0xFFFF) + (sum >> 16);
}
// Take one's complement
return ~sum;
return csum_fold_helper(sum);
}
// CustomIPOption structure
struct CustomIPEntry
{
__u16 id_val;
__u8 inter_val;
__u8 cmd_val;
} __attribute__((packed)); // Ensure packed struct to prevent padding issues
int inter_op_ebpf(struct xdp_md *ctx)
{
void *data = (void *)(long)ctx->data;
......@@ -84,56 +139,70 @@ int inter_op_ebpf(struct xdp_md *ctx)
__u8 *data_bytes = (__u8 *)data;
int shift_data_length = sizeof(*eth) + sizeof(struct iphdr);
if (option_length == 8){
for (int i = shift_data_length - 1; i >= 0; i--) {
if (option_length == 8)
{
for (int i = shift_data_length - 1; i >= 0; i--)
{
if ((void *)(data_bytes + i + option_length + 1) > data_end)
return XDP_PASS;
data_bytes[i + option_length] = data_bytes[i];
}
}
else if (option_length == 12){
for (int i = shift_data_length - 1; i >= 0; i--) {
else if (option_length == 12)
{
for (int i = shift_data_length - 1; i >= 0; i--)
{
if ((void *)(data_bytes + i + option_length + 1) > data_end)
return XDP_PASS;
data_bytes[i + option_length] = data_bytes[i];
}
}
else if (option_length == 16){
for (int i = shift_data_length - 1; i >= 0; i--) {
else if (option_length == 16)
{
for (int i = shift_data_length - 1; i >= 0; i--)
{
if ((void *)(data_bytes + i + option_length + 1) > data_end)
return XDP_PASS;
data_bytes[i + option_length] = data_bytes[i];
}
}
else if (option_length == 20){
for (int i = shift_data_length - 1; i >= 0; i--) {
else if (option_length == 20)
{
for (int i = shift_data_length - 1; i >= 0; i--)
{
if ((void *)(data_bytes + i + option_length + 1) > data_end)
return XDP_PASS;
data_bytes[i + option_length] = data_bytes[i];
}
}
else if (option_length == 24){
for (int i = shift_data_length - 1; i >= 0; i--) {
else if (option_length == 24)
{
for (int i = shift_data_length - 1; i >= 0; i--)
{
if ((void *)(data_bytes + i + option_length + 1) > data_end)
return XDP_PASS;
data_bytes[i + option_length] = data_bytes[i];
}
}
else if (option_length == 28){
for (int i = shift_data_length - 1; i >= 0; i--) {
else if (option_length == 28)
{
for (int i = shift_data_length - 1; i >= 0; i--)
{
if ((void *)(data_bytes + i + option_length + 1) > data_end)
return XDP_PASS;
data_bytes[i + option_length] = data_bytes[i];
}
}
else if (option_length == 32){
for (int i = shift_data_length - 1; i >= 0; i--) {
else if (option_length == 32)
{
for (int i = shift_data_length - 1; i >= 0; i--)
{
if ((void *)(data_bytes + i + option_length + 1) > data_end)
return XDP_PASS;
......@@ -141,7 +210,6 @@ int inter_op_ebpf(struct xdp_md *ctx)
}
}
int ret = bpf_xdp_adjust_head(ctx, option_length); // Cut option length bytes at the head
if (ret < 0)
{
......@@ -182,5 +250,63 @@ int inter_op_ebpf(struct xdp_md *ctx)
// bpf_trace_printk("Old checksum %d - New checksum %d\n", old_check, ip->check);
// }
}
__u32 original_src = ip->daddr;
__u32 original_dst = ip->daddr;
__u32 *saved_saddr = ip_pool.lookup(&ip->saddr);
__u32 *saved_daddr = ip_pool.lookup(&ip->daddr);
struct tcphdr *tcp = (void *)ip + (ip->ihl * 4);
if ((void *)tcp + sizeof(*tcp) > data_end)
return XDP_PASS;
if (saved_daddr)
{
ip->daddr = *saved_daddr;
bpf_trace_printk("Change dest");
ip->check = iph_csum(ip, data_end);
tcp->check = tcp_checksum(ip, tcp, data_end);
}
else if (saved_saddr)
{
ip->saddr = *saved_saddr;
bpf_trace_printk("Change src");
ip->check = iph_csum(ip, data_end);
tcp->check = tcp_checksum(ip, tcp, data_end);
}
else
{
__u32 alt_ip_index_key = 0;
__u32 *index_ptr = alt_ip_index.lookup(&alt_ip_index_key);
if (!index_ptr)
return XDP_PASS;
__u32 current_index = *index_ptr;
if (current_index < NUM_ALTERNATE_IPS && (ip->saddr & SUBNET_MASK) == SUBNET_BASE)
{
if (tcp->dest == DEST_PORT)
{
__u32 rand_num = bpf_get_prandom_u32() % 100;
if (rand_num < REPLACE_PROBABILITY)
{
// Replace destination IP
__u32 original_ip = ip->daddr;
__u32 new_ip = ALTERNATE_IPS[0];
ip->daddr = new_ip;
// Optional: log the replacement
bpf_trace_printk("Replaced 0x%x with 0x%x", original_ip, ip->daddr);
ip_pool.insert(&original_dst, &new_ip);
ip->daddr = new_ip;
current_index++;
alt_ip_index.update(&alt_ip_index_key, &current_index);
ip->check = iph_csum(ip, data_end);
tcp->check = tcp_checksum(ip, tcp, data_end);
}
}
}
}
return XDP_PASS;
}
# import os
# import sys
import subprocess
# sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# from netfilterqueue import NetfilterQueue
# import ubuntu_base.opportunistic_utils import *
# import ctypes
import threading
from bcc import BPF
from scapy.all import *
from scapy.layers.inet import UDP, IP
import re
import ipaddress
# from bcc.utils import printb
DIR = "/home/node/Desktop/gns3_unikernel_testbed"
stop_sniffing = False
def ip_to_subnet_hex(ip):
"""Convert an IP address to its subnet base (last octet set to 0) in hex format."""
ip_obj = ipaddress.IPv4Address(ip)
subnet_ip = ip_obj & ipaddress.IPv4Address("255.255.255.0") # Force last octet to 0
return f"0x{int(subnet_ip):08X}"
def ip_to_hex(ip):
"""Convert an IP address to hex format."""
return f"0x{int(ipaddress.IPv4Address(ip)):08X}"
def update_ebpf_file(file_path, subnet_ip, probability, alternate_ips):
with open(file_path, 'r') as f:
content = f.read()
# Update SUBNET_BASE (convert to subnet)
subnet_hex = ip_to_subnet_hex(subnet_ip)
content = re.sub(r"(#define SUBNET_BASE __constant_htonl\()(0x[0-9A-Fa-f]+)(\))", fr"\1{subnet_hex}\3", content)
# Update REPLACE_PROBABILITY
content = re.sub(r"(#define REPLACE_PROBABILITY )\d+", fr"\1{probability}", content)
# Update ALTERNATE_IPS (in one line)
alternate_hex = ', '.join(f'__constant_htonl({ip_to_hex(ip)})' for ip in alternate_ips)
content = re.sub(r"(const __be32 ALTERNATE_IPS\[\] = \{)([^\}]+)(\};)", fr"\1 {alternate_hex} \3", content, flags=re.DOTALL)
with open(file_path, 'w') as f:
f.write(content)
# def process_packet(new_packet):
# data = new_packet.get_payload()
# pkt = IP(data)
# if pkt.haslayer(IP):
# is_changed = False
# if pkt.options:
# options = pkt.options
# for option in options:
# if isinstance(option, CustomIPOption) and option.option == MY_OPTION_TYPE:
# pkt.options.remove(option)
# is_changed = True
# if is_changed:
# del pkt.len
# del pkt.ihl
# del pkt[IP].chksum # Invalidate the checksum
# new_packet.set_payload(bytes(pkt))
# new_packet.accept()
def packet_handler(packet):
global stop_sniffing
if packet.haslayer(UDP) and packet[UDP].dport == 5000:
print(f"From {packet[IP].src}: {packet[UDP].payload}")
udp_payload = bytes(packet[UDP].payload)
try:
json_array = json.loads(udp_payload.decode('utf-8'))
print(f"Intercepted from {packet[IP].src}:")
print(json_array)
file_path = DIR + '/ubuntu_intermediate/inter_op_ebpf.c'
update_ebpf_file(file_path, packet[IP].src, 75, json_array)
stop_sniffing = True
except (UnicodeDecodeError, json.JSONDecodeError):
print(f"Raw data: {udp_payload.hex()}")
def update_ebpf_file():
sniff(iface="br0", filter="udp port 5000", prn=packet_handler, stop_filter=lambda _: stop_sniffing)
print("Sniffing complete")
def main():
subprocess.run(f"sudo {DIR}/ubuntu_intermediate/./initiate.sh", shell=True, check=True)
print("./initiate is executed")
update_ebpf_file()
b = BPF(src_file=f"inter_op_ebpf.c")
fn = b.load_func("inter_op_ebpf", BPF.XDP)
......@@ -51,37 +90,10 @@ def main():
except Exception:
print("Interrupted")
for inter_num in range(3, 13):
for inter_num in range(2, 13):
inter_name = f"ens{inter_num + 1}"
b.remove_xdp(inter_name, 0)
# rule_check = "sudo iptables -C FORWARD -i br0 -j NFQUEUE --queue-num 1"
# try:
# subprocess.run(rule_check, shell=True, check=True)
# print("Rule already exists, no need to add it.")
# except subprocess.CalledProcessError:
# try:
# rule_1 = "sudo iptables -I FORWARD -i br0 -j NFQUEUE --queue-num 1"
# subprocess.run(rule_1, shell=True, check=True)
# print("Rule added successfully.")
# except subprocess.CalledProcessError as e:
# print(f"Failed to add the rule: {e}")
# q = NetfilterQueue()
# q.bind(1, process_packet) # Bind to queue number 1
# try:
# # Run the queue listener
# print("Listening for packets...")
# q.run()
# except KeyboardInterrupt:
# print("Stopping packet listener")
# lib = ctypes.CDLL(DIR + "/ubuntu_intermediate/./inter_c_library.so")
# lib.start_process_from_queue.restype = None # The function returns void
# threading.Thread(target=lib.start_process_from_queue).start()
if __name__ == "__main__":
main()
......@@ -5,6 +5,8 @@ import queue
import sys
import subprocess
import os
import json
os.environ["LD_LIBRARY_PATH"] = "/home/osboxes/D-ITG-2.8.1-r1023-src/D-ITG-2.8.1-r1023/bin:" + os.environ.get("LD_LIBRARY_PATH", "")
ip_queue = queue.Queue()
......@@ -29,7 +31,7 @@ def generate_and_send_traffic_matrix():
listen_port_map[point_ip_send] = 8999
for point_ip_recv in point_ips:
mean_rate = 1000
mean_rate = 10000
if point_ip_send == point_ip_recv:
mean_rate = 0
traffic_matrix[point_ip_send][point_ip_recv] = mean_rate
......@@ -39,7 +41,7 @@ def generate_and_send_traffic_matrix():
if src_ip != dest_ip:
with open(file_path, "w") as file:
dest_port = listen_port_map[dest_ip]
command = f"-T TCP -a {dest_ip} -rp {dest_port} -E {mean_rate} -t 300000"
command = f"-T TCP -a {dest_ip} -rp {dest_port} -O {mean_rate} -o 1400 -t 300000"
listen_port_map[dest_ip] += 1
file.write(f"{src_ip} {command}\n")
......@@ -52,6 +54,26 @@ def generate_and_send_traffic_matrix():
# print("Error:", stderr.decode())
def notify_ubuntu_internode():
global map_point
point_ips = map_point.keys()
for point_ip_send in point_ips:
array_to_send = []
for point_ip_recv in point_ips:
if point_ip_send != point_ip_recv:
array_to_send.append(point_ip_recv)
message = json.dumps(array_to_send).encode('utf-8')
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_socket.settimeout(1)
try:
print(f"Sending to {point_ip_send}...")
udp_socket.sendto(message, (point_ip_send, 5000)) # Send to port 5000
except Exception as e:
print(f"Error sending to {point_ip_send}: {e}")
udp_socket.close()
def read_and_write_points():
global map_point
......@@ -63,7 +85,9 @@ def read_and_write_points():
print(len(map_point.keys()), "machines has connected")
if len(map_point.keys()) == num_machine:
time.sleep(3) # Sleep to wait the last windows finish opening the generator tool
notify_ubuntu_internode()
input("Press Enter to continue...")
# time.sleep(3) # Sleep to wait the last windows finish opening the generator tool
generate_and_send_traffic_matrix()
else:
print("IP crashed:", ip_client[1])
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment