位置: 文档库 > Python > 使用Python3制作TCP端口扫描器的图文代码详解

使用Python3制作TCP端口扫描器的图文代码详解

樊少皇 上传于 2021-12-28 06:20

《使用Python3制作TCP端口扫描器的图文代码详解》

一、引言

在网络安全领域,端口扫描是信息收集的重要环节。通过探测目标主机的开放端口,可以分析其运行的服务类型,为后续渗透测试或安全评估提供基础数据。本文将详细介绍如何使用Python3实现一个高效的TCP端口扫描器,涵盖多线程扫描、超时控制、结果可视化等核心功能,并附完整代码与分步解析。

二、TCP端口扫描原理

TCP端口扫描基于三次握手机制:

1. 客户端发送SYN包到目标端口

2. 若端口开放,服务器返回SYN+ACK

3. 客户端发送ACK完成连接(全连接扫描)

或客户端直接发送RST终止连接(半开放扫描)

本文采用半开放扫描(SYN扫描)的简化实现,通过socket库的connect_ex方法检测端口状态。

三、基础扫描器实现

1. 单线程扫描实现

import socket

def single_thread_scan(target, ports):
    open_ports = []
    for port in ports:
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(1)
            result = sock.connect_ex((target, port))
            if result == 0:
                open_ports.append(port)
            sock.close()
        except Exception as e:
            print(f"Error scanning port {port}: {e}")
    return open_ports

target = "127.0.0.1"
ports = [21, 22, 80, 443, 3306]
print("Open ports:", single_thread_scan(target, ports))

缺点:串行执行效率低,扫描1000个端口需数分钟

四、多线程优化实现

1. 线程池设计

import socket
import threading
from queue import Queue

class PortScanner:
    def __init__(self, target, ports, max_threads=100):
        self.target = target
        self.ports = ports
        self.max_threads = max_threads
        self.open_ports = []
        self.lock = threading.Lock()
        self.queue = Queue()

    def worker(self):
        while True:
            port = self.queue.get()
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.settimeout(0.5)
                if sock.connect_ex((self.target, port)) == 0:
                    with self.lock:
                        self.open_ports.append(port)
                sock.close()
            except Exception as e:
                pass
            self.queue.task_done()

    def start_scan(self):
        for _ in range(self.max_threads):
            t = threading.Thread(target=self.worker)
            t.daemon = True
            t.start()

        for port in self.ports:
            self.queue.put(port)

        self.queue.join()
        return sorted(self.open_ports)

# 使用示例
scanner = PortScanner("127.0.0.1", range(1, 1025))
print("Open ports:", scanner.start_scan())

优化点:

- 使用Queue实现生产者-消费者模型

- 线程锁保护共享资源

- 可配置线程数量

五、高级功能扩展

1. 扫描进度显示

import time
from tqdm import tqdm

class ProgressScanner(PortScanner):
    def start_scan(self):
        total_ports = len(self.ports)
        with tqdm(total=total_ports, desc="Scanning") as pbar:
            for _ in range(self.max_threads):
                t = threading.Thread(target=self.worker, args=(pbar,))
                t.daemon = True
                t.start()

            for port in self.ports:
                self.queue.put(port)

            self.queue.join()
            pbar.close()
            return sorted(self.open_ports)

    def worker(self, pbar):
        while True:
            port = self.queue.get()
            try:
                # ...原有扫描逻辑...
                pbar.update(1)
            except:
                pass
            self.queue.task_done()

2. 服务识别功能

def get_service_name(port):
    services = {
        21: "FTP",
        22: "SSH",
        23: "Telnet",
        80: "HTTP",
        443: "HTTPS",
        3306: "MySQL",
        3389: "RDP"
    }
    return services.get(port, "Unknown")

# 在扫描结果中添加服务名
open_ports_with_service = [(port, get_service_name(port)) 
                          for port in scanner.start_scan()]

3. 结果导出功能

import csv

def export_to_csv(results, filename="scan_results.csv"):
    with open(filename, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["Port", "Service", "Status"])
        for port, service in results:
            writer.writerow([port, service, "Open"])

六、完整实现代码

import socket
import threading
from queue import Queue
from tqdm import tqdm
import csv
import argparse

class AdvancedPortScanner:
    def __init__(self, target, ports, max_threads=100, timeout=0.5):
        self.target = target
        self.ports = ports if isinstance(ports, list) else list(range(1, ports+1))
        self.max_threads = max_threads
        self.timeout = timeout
        self.open_ports = []
        self.lock = threading.Lock()
        self.queue = Queue()

    def scan_port(self, port, pbar=None):
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(self.timeout)
            if sock.connect_ex((self.target, port)) == 0:
                with self.lock:
                    service = self.get_service_name(port)
                    self.open_ports.append((port, service))
                    if pbar:
                        pbar.update(1)
            sock.close()
        except:
            pass

    def get_service_name(self, port):
        services = {
            21: "FTP", 22: "SSH", 23: "Telnet", 25: "SMTP",
            53: "DNS", 80: "HTTP", 110: "POP3", 135: "MSRPC",
            139: "NetBIOS", 143: "IMAP", 443: "HTTPS", 
            445: "SMB", 3306: "MySQL", 3389: "RDP", 
            8080: "HTTP Alternate"
        }
        return services.get(port, "Unknown")

    def worker(self, pbar=None):
        while True:
            port = self.queue.get()
            self.scan_port(port, pbar)
            self.queue.task_done()

    def start_scan(self, show_progress=True):
        if show_progress:
            with tqdm(total=len(self.ports), desc=f"Scanning {self.target}") as pbar:
                for _ in range(self.max_threads):
                    t = threading.Thread(target=self.worker, args=(pbar,))
                    t.daemon = True
                    t.start()

                for port in self.ports:
                    self.queue.put(port)

                self.queue.join()
        else:
            for _ in range(self.max_threads):
                t = threading.Thread(target=self.worker)
                t.daemon = True
                t.start()

            for port in self.ports:
                self.queue.put(port)
            self.queue.join()

        return sorted(self.open_ports, key=lambda x: x[0])

    def export_results(self, results, filename="scan_results.csv"):
        with open(filename, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(["Port", "Service", "Status"])
            for port, service in results:
                writer.writerow([port, service, "Open"])

def main():
    parser = argparse.ArgumentParser(description="Advanced TCP Port Scanner")
    parser.add_argument("target", help="Target IP address or hostname")
    parser.add_argument("-p", "--ports", type=str, 
                       help="Port range (e.g., 1-1024) or specific ports (e.g., 22,80,443)")
    parser.add_argument("-t", "--threads", type=int, default=100,
                       help="Maximum number of threads (default: 100)")
    parser.add_argument("-o", "--output", help="Output results to CSV file")
    args = parser.parse_args()

    # 处理端口参数
    if args.ports:
        if '-' in args.ports:
            start, end = map(int, args.ports.split('-'))
            ports = list(range(start, end+1))
        else:
            ports = list(map(int, args.ports.split(',')))
    else:
        ports = list(range(1, 1025))  # 默认扫描1-1024端口

    scanner = AdvancedPortScanner(
        target=args.target,
        ports=ports,
        max_threads=args.threads
    )

    results = scanner.start_scan()
    print("\nScan Results:")
    for port, service in results:
        print(f"Port {port}: {service}")

    if args.output:
        scanner.export_results(results, args.output)
        print(f"\nResults saved to {args.output}")

if __name__ == "__main__":
    main()

七、使用说明

1. 安装依赖

pip install tqdm

2. 基本扫描

python scanner.py 192.168.1.1

3. 指定端口范围

python scanner.py 192.168.1.1 -p 1-100

4. 指定特定端口

python scanner.py 192.168.1.1 -p 22,80,443

5. 导出结果

python scanner.py 192.168.1.1 -o results.csv

八、性能优化技巧

1. 线程数选择:建议设置为端口数量的1/10到1/5

2. 超时设置:0.3-1秒之间平衡速度与准确性

3. 扫描策略:

- 先扫描常见端口(22,80,443等)

- 再扫描完整范围

4. 避免防火墙拦截:

- 使用慢速扫描(间隔0.5-1秒)

- 随机化端口顺序

九、安全与法律注意事项

1. 仅扫描您拥有权限的目标

2. 未经授权扫描可能违反《网络安全法》

3. 扫描频率不宜过高(建议每端口间隔>0.3秒)

4. 避免使用root权限运行扫描器

十、扩展方向

1. 添加UDP端口扫描功能

2. 实现SYN半开放扫描(需raw socket权限)

3. 添加操作系统指纹识别

4. 开发Web界面(使用Flask/Django)

5. 集成到自动化安全测试框架

关键词:Python3、TCP端口扫描、多线程、网络安全socket编程tqdm进度条CSV导出参数解析

简介:本文详细介绍了使用Python3开发TCP端口扫描器的完整过程,涵盖从基础单线程实现到高级多线程优化的各个阶段,包含服务识别、结果导出、进度显示等实用功能,并提供了完整的可执行代码和详细的使用说明,适合网络安全爱好者和Python开发者学习参考。