《使用Python3制作TCP端口扫描器的图文代码详解》
一、引言
在网络安全领域,端口扫描是信息收集的重要环节。通过探测目标主机的开放端口,可以分析其运行的服务类型,为后续渗透测试或安全评估提供基础数据。本文将详细介绍如何使用Python3实现一个高效的TCP端口扫描器,涵盖多线程扫描、超时控制、结果可视化等核心功能,并附完整代码与分步解析。
二、TCP端口扫描原理
TCP端口扫描基于三次握手机制:
1. 客户端发送SYN包到目标端口
2. 若端口开放,服务器返回SYN+ACK
3. 客户端发送ACK完成连接(全连接扫描)
或客户端直接发送RST终止连接(半开放扫描)
本文采用半开放扫描(SYN扫描)的简化实现,通过socket库的connect_ex方法检测端口状态。
三、基础扫描器实现
1. 单线程扫描实现
import socket
def single_thread_scan(target, ports):
open_ports = []
for port in ports:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((target, port))
if result == 0:
open_ports.append(port)
sock.close()
except Exception as e:
print(f"Error scanning port {port}: {e}")
return open_ports
target = "127.0.0.1"
ports = [21, 22, 80, 443, 3306]
print("Open ports:", single_thread_scan(target, ports))
缺点:串行执行效率低,扫描1000个端口需数分钟
四、多线程优化实现
1. 线程池设计
import socket
import threading
from queue import Queue
class PortScanner:
def __init__(self, target, ports, max_threads=100):
self.target = target
self.ports = ports
self.max_threads = max_threads
self.open_ports = []
self.lock = threading.Lock()
self.queue = Queue()
def worker(self):
while True:
port = self.queue.get()
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.5)
if sock.connect_ex((self.target, port)) == 0:
with self.lock:
self.open_ports.append(port)
sock.close()
except Exception as e:
pass
self.queue.task_done()
def start_scan(self):
for _ in range(self.max_threads):
t = threading.Thread(target=self.worker)
t.daemon = True
t.start()
for port in self.ports:
self.queue.put(port)
self.queue.join()
return sorted(self.open_ports)
# 使用示例
scanner = PortScanner("127.0.0.1", range(1, 1025))
print("Open ports:", scanner.start_scan())
优化点:
- 使用Queue实现生产者-消费者模型
- 线程锁保护共享资源
- 可配置线程数量
五、高级功能扩展
1. 扫描进度显示
import time
from tqdm import tqdm
class ProgressScanner(PortScanner):
def start_scan(self):
total_ports = len(self.ports)
with tqdm(total=total_ports, desc="Scanning") as pbar:
for _ in range(self.max_threads):
t = threading.Thread(target=self.worker, args=(pbar,))
t.daemon = True
t.start()
for port in self.ports:
self.queue.put(port)
self.queue.join()
pbar.close()
return sorted(self.open_ports)
def worker(self, pbar):
while True:
port = self.queue.get()
try:
# ...原有扫描逻辑...
pbar.update(1)
except:
pass
self.queue.task_done()
2. 服务识别功能
def get_service_name(port):
services = {
21: "FTP",
22: "SSH",
23: "Telnet",
80: "HTTP",
443: "HTTPS",
3306: "MySQL",
3389: "RDP"
}
return services.get(port, "Unknown")
# 在扫描结果中添加服务名
open_ports_with_service = [(port, get_service_name(port))
for port in scanner.start_scan()]
3. 结果导出功能
import csv
def export_to_csv(results, filename="scan_results.csv"):
with open(filename, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["Port", "Service", "Status"])
for port, service in results:
writer.writerow([port, service, "Open"])
六、完整实现代码
import socket
import threading
from queue import Queue
from tqdm import tqdm
import csv
import argparse
class AdvancedPortScanner:
def __init__(self, target, ports, max_threads=100, timeout=0.5):
self.target = target
self.ports = ports if isinstance(ports, list) else list(range(1, ports+1))
self.max_threads = max_threads
self.timeout = timeout
self.open_ports = []
self.lock = threading.Lock()
self.queue = Queue()
def scan_port(self, port, pbar=None):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
if sock.connect_ex((self.target, port)) == 0:
with self.lock:
service = self.get_service_name(port)
self.open_ports.append((port, service))
if pbar:
pbar.update(1)
sock.close()
except:
pass
def get_service_name(self, port):
services = {
21: "FTP", 22: "SSH", 23: "Telnet", 25: "SMTP",
53: "DNS", 80: "HTTP", 110: "POP3", 135: "MSRPC",
139: "NetBIOS", 143: "IMAP", 443: "HTTPS",
445: "SMB", 3306: "MySQL", 3389: "RDP",
8080: "HTTP Alternate"
}
return services.get(port, "Unknown")
def worker(self, pbar=None):
while True:
port = self.queue.get()
self.scan_port(port, pbar)
self.queue.task_done()
def start_scan(self, show_progress=True):
if show_progress:
with tqdm(total=len(self.ports), desc=f"Scanning {self.target}") as pbar:
for _ in range(self.max_threads):
t = threading.Thread(target=self.worker, args=(pbar,))
t.daemon = True
t.start()
for port in self.ports:
self.queue.put(port)
self.queue.join()
else:
for _ in range(self.max_threads):
t = threading.Thread(target=self.worker)
t.daemon = True
t.start()
for port in self.ports:
self.queue.put(port)
self.queue.join()
return sorted(self.open_ports, key=lambda x: x[0])
def export_results(self, results, filename="scan_results.csv"):
with open(filename, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["Port", "Service", "Status"])
for port, service in results:
writer.writerow([port, service, "Open"])
def main():
parser = argparse.ArgumentParser(description="Advanced TCP Port Scanner")
parser.add_argument("target", help="Target IP address or hostname")
parser.add_argument("-p", "--ports", type=str,
help="Port range (e.g., 1-1024) or specific ports (e.g., 22,80,443)")
parser.add_argument("-t", "--threads", type=int, default=100,
help="Maximum number of threads (default: 100)")
parser.add_argument("-o", "--output", help="Output results to CSV file")
args = parser.parse_args()
# 处理端口参数
if args.ports:
if '-' in args.ports:
start, end = map(int, args.ports.split('-'))
ports = list(range(start, end+1))
else:
ports = list(map(int, args.ports.split(',')))
else:
ports = list(range(1, 1025)) # 默认扫描1-1024端口
scanner = AdvancedPortScanner(
target=args.target,
ports=ports,
max_threads=args.threads
)
results = scanner.start_scan()
print("\nScan Results:")
for port, service in results:
print(f"Port {port}: {service}")
if args.output:
scanner.export_results(results, args.output)
print(f"\nResults saved to {args.output}")
if __name__ == "__main__":
main()
七、使用说明
1. 安装依赖
pip install tqdm
2. 基本扫描
python scanner.py 192.168.1.1
3. 指定端口范围
python scanner.py 192.168.1.1 -p 1-100
4. 指定特定端口
python scanner.py 192.168.1.1 -p 22,80,443
5. 导出结果
python scanner.py 192.168.1.1 -o results.csv
八、性能优化技巧
1. 线程数选择:建议设置为端口数量的1/10到1/5
2. 超时设置:0.3-1秒之间平衡速度与准确性
3. 扫描策略:
- 先扫描常见端口(22,80,443等)
- 再扫描完整范围
4. 避免防火墙拦截:
- 使用慢速扫描(间隔0.5-1秒)
- 随机化端口顺序
九、安全与法律注意事项
1. 仅扫描您拥有权限的目标
2. 未经授权扫描可能违反《网络安全法》
3. 扫描频率不宜过高(建议每端口间隔>0.3秒)
4. 避免使用root权限运行扫描器
十、扩展方向
1. 添加UDP端口扫描功能
2. 实现SYN半开放扫描(需raw socket权限)
3. 添加操作系统指纹识别
4. 开发Web界面(使用Flask/Django)
5. 集成到自动化安全测试框架
关键词:Python3、TCP端口扫描、多线程、网络安全、socket编程、tqdm进度条、CSV导出、参数解析
简介:本文详细介绍了使用Python3开发TCP端口扫描器的完整过程,涵盖从基础单线程实现到高级多线程优化的各个阶段,包含服务识别、结果导出、进度显示等实用功能,并提供了完整的可执行代码和详细的使用说明,适合网络安全爱好者和Python开发者学习参考。