项目概述
项目简介
这是一个基于Python Flask框架开发的轻量级局域网工具,集成了文件共享和设备扫描两大核心功能。 工具设计简洁易用,无需复杂配置即可实现局域网内的文件传输和设备发现。
适用于家庭、办公室等局域网环境,方便用户快速共享文件或查看网络中的其他设备。
技术栈
- 后端:Python 3.x、Flask框架
- 前端:HTML5、Tailwind CSS、原生JavaScript
- 网络:Socket编程、多线程扫描、ICMP协议(Ping)
- UI组件:Font Awesome图标库
核心功能
文件共享系统
- 支持拖放或点击上传文件,最大支持16GB大文件
- 提供文件下载功能,支持局域网内其他设备访问
- 文件删除功能,带有平滑动画效果
- 路径安全检查,防止目录遍历攻击
- 自动处理文件名重复问题
设备扫描系统
- 自动识别局域网IP范围,多线程并行扫描
- 获取设备IP、MAC地址和主机名信息
- 实时显示扫描进度,支持中途停止
- 自动标记本机设备,方便识别
- 支持直接访问其他设备的共享服务
核心代码
点击代码块右上角按钮可复制代码
初始化与配置
import os
import socket
import random
from flask import Flask, request, send_from_directory, render_template_string, jsonify, abort
import threading
import webbrowser
from datetime import datetime
import json
import ipaddress
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
# 创建Flask应用
app = Flask(__name__)
UPLOAD_FOLDER = 'shared_files'
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 * 1024 # 限制上传大小为16GB
# 确保共享目录存在
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
# 确保pp目录存在(用于存放图片)
if not os.path.exists('pp'):
os.makedirs('pp')
# 存储设备信息和扫描状态
devices = {}
scanning = False
max_threads = 50
total_ips = 0
scanned_ips = 0
获取本地网络信息
def get_local_ip():
"""获取本机在局域网中的IP地址"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "127.0.0.1"
def get_netmask():
"""获取子网掩码"""
try:
# 使用ipconfig命令获取子网掩码
if os.name == 'nt': # Windows
result = subprocess.run(["ipconfig"], capture_output=True, text=True, encoding='gbk')
else: # Linux/Mac
result = subprocess.run(["ifconfig"], capture_output=True, text=True)
lines = result.stdout.split('\n')
local_ip = get_local_ip()
for i, line in enumerate(lines):
if local_ip in line:
# 尝试从附近行找到子网掩码
for j in range(max(0, i-2), min(len(lines), i+2)):
if "子网掩码" in lines[j] or "netmask" in lines[j].lower():
parts = lines[j].split(":")
if len(parts) > 1:
return parts[1].strip().split()[0]
except Exception:
pass
return "255.255.255.0" # 默认子网掩码
def calculate_network_range():
"""计算局域网IP范围"""
global total_ips
local_ip = get_local_ip()
netmask = get_netmask()
try:
network = ipaddress.IPv4Network(f"{local_ip}/{netmask}", strict=False)
hosts = [str(host) for host in network.hosts()]
total_ips = len(hosts)
return hosts
except:
# 如果计算失败,使用常见的局域网IP范围
base_ip = ".".join(local_ip.split(".")[:3])
hosts = [f"{base_ip}.{i}" for i in range(1, 255)]
total_ips = len(hosts)
return hosts
设备扫描核心
def ping_host(ip):
"""ping主机检查是否在线"""
try:
# 使用系统ping命令
if os.name == 'nt': # Windows
command = ['ping', '-n', '1', '-w', '1000', ip]
else: # Linux/Mac
command = ['ping', '-c', '1', '-W', '1', ip]
# 使用subprocess运行ping命令
result = subprocess.run(command, capture_output=True, text=True)
return result.returncode == 0
except Exception:
return False
def scan_network():
"""扫描网络中的设备,使用多线程并行处理"""
global scanning, scanned_ips
scanning = True
scanned_ips = 0
network_range = calculate_network_range()
active_devices = []
print(f"开始扫描网络,共 {total_ips} 个IP地址")
# 使用线程池管理并发扫描
with ThreadPoolExecutor(max_workers=max_threads) as executor:
# 提交所有IP扫描任务
futures = {executor.submit(scan_single_ip, ip): ip for ip in network_range}
# 处理完成的任务
for future in as_completed(futures):
if not scanning: # 如果已停止扫描,取消所有剩余任务
executor.shutdown(wait=False, cancel_futures=True)
break
ip = futures[future]
try:
device = future.result()
if device:
active_devices.append(device)
mac = device["mac"]
if mac in devices:
devices[mac]["last_seen"] = datetime.now().isoformat()
else:
devices[mac] = device
print(f"发现设备: {device['ip']} ({device['hostname']}) - {device['mac']}")
except Exception as e:
print(f"处理 {ip} 结果时出错: {str(e)}")
scanning = False
print(f"扫描完成,发现 {len(active_devices)} 台设备")
return active_devices
文件上传与下载
@app.route('/upload', methods=['POST'])
def upload_file():
"""处理文件上传"""
if 'file' not in request.files:
return jsonify({'error': '没有文件部分'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': '没有选择文件'}), 400
if file:
# 处理文件名重复
filename = file.filename
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
counter = 1
while os.path.exists(filepath):
name, ext = os.path.splitext(filename)
filename = f"{name}_{counter}{ext}"
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
counter += 1
file.save(filepath)
return jsonify({'success': True, 'message': '文件上传成功'}), 200
@app.route('/download/')
def download_file(filename):
"""处理文件下载"""
try:
# 安全检查,防止路径遍历
safe_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
if not os.path.exists(safe_path) or not os.path.isfile(safe_path):
return jsonify({'success': False, 'message': '文件不存在'}), 404
# 确保文件在上传目录内
if not os.path.abspath(safe_path).startswith(os.path.abspath(app.config['UPLOAD_FOLDER'])):
return jsonify({'success': False, 'message': '非法访问'}), 403
return send_from_directory(app.config['UPLOAD_FOLDER'], filename, as_attachment=True)
except Exception as e:
return jsonify({'success': False, 'message': str(e)}), 500
启动服务器
def start_server():
"""启动服务器"""
local_ip = get_local_ip()
port = 5000
print("=" * 60)
print("=" * 60)
print(" .-----------------. .----------------. .----------------. .----------------. .----------------. ")
print("| .--------------. || .--------------. || .--------------. || .--------------. || .--------------. |")
print("| | ____ _____ | || | __ | || | ____ ____ | || | _____ | || | ________ | |")
print("| ||_ \|_ _| | || | / \ | || | |_ _||_ _| | || | |_ _| | || | |_ ___ `. | |")
print("| | | \ | | | || | / /\ \ | || | \ \ / / | || | | | | || | | | `. \ | |")
print("| | | |\ \| | | || | / ____ \ | || | > `' < | || | | | | || | | | | | | |")
print("| | _| |_\ |_ | || | _/ / \ \_ | || | _/ /'`\ \_ | || | _| |_ | || | _| |___.' / | |")
print("| ||_____|\____| | || ||____| |____|| || | |____||____| | || | |_____| | || | |________.' | |")
print("| | | || | | || | | || | | || | | |")
print("| '--------------' || '--------------' || '--------------' || '--------------' || '--------------' |")
print(" '----------------' '----------------' '----------------' '----------------' '----------------' ")
print("=" * 60)
print("=" * 60)
print(" 工具已启动,不要瞎几把上传奇怪的东西")
print(f" 本地访问地址: http://localhost:{port}")
print(f" 局域网访问地址: http://{local_ip}:{port}")
print("=" * 60)
print("官网工具记得star哦!github.com 支持一下吧真的没有star")
print("我的邮箱naxid@caoshen.top,欢迎交流")
print("在上学不一定会及时看和回复")
print("技术实现方式https://md.naxid.top/")
# 自动打开浏览器
try:
webbrowser.open(f"http://localhost:{port}")
except:
pass
# 启动Flask应用
app.run(host='0.0.0.0', port=port, debug=False, threaded=True)
if __name__ == '__main__':
start_server()
使用说明
安装步骤
- 确保已安装Python 3.x环境
- 安装依赖:
pip install flask - 创建项目目录,将代码保存为main.py
- 在项目目录下创建pp文件夹,并放入背景图片1.jpg
运行方法
- 打开终端,进入项目目录
- 运行命令:
python main.py - 程序会自动打开浏览器,显示工具界面
- 其他设备通过终端显示的局域网地址访问
使用技巧
- 上传大文件时保持页面不关闭
- 扫描设备前确保网络连接正常
- 防火墙可能会影响设备发现功能
- 共享文件保存在shared_files目录