Compare commits
10 Commits
4dc0e91dea
...
a4885e5602
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a4885e5602 | ||
| 31006cb263 | |||
| d78533cb37 | |||
|
|
5e1923a895 | ||
| ddb64b4d90 | |||
| d223574333 | |||
| 74f9ac43ab | |||
|
|
d59d0d03a6 | ||
|
|
8e45f42944 | ||
|
|
981a47baa8 |
8
command/magick.txt
Normal file
8
command/magick.txt
Normal file
@ -0,0 +1,8 @@
|
||||
Windows11:power shell
|
||||
|
||||
Set-ExecutionPolicy Bypass -Scope Process -Force
|
||||
[System.Net.ServicePointManager]::SecurityProtocol = [System.Net.ServicePointManager]::SecurityProtocol -bor 3072
|
||||
iex ((New-Object System.Net.WebClient).DownloadString('https://chocolatey.org/install.ps1'))
|
||||
choco install imagemagick -y
|
||||
magick --version
|
||||
magick.exe "output_3s_100x100.mp4" -resize 100x100 -quality 95 frame_%04d.jpg
|
||||
14
linux/driver/hello_driver/Makefile
Normal file
14
linux/driver/hello_driver/Makefile
Normal file
@ -0,0 +1,14 @@
|
||||
KERNELDIR := /lib/modules/$(shell uname -r)/build
|
||||
CURRENT_PATH := $(shell pwd)
|
||||
obj-m := hello_driver.o
|
||||
|
||||
build: kernel_modules
|
||||
|
||||
kernel_modules:
|
||||
$(MAKE) -C $(KERNELDIR) M=$(CURRENT_PATH) modules
|
||||
$(CROSS_COMPILE)gcc -o app.elf app.c
|
||||
clean:
|
||||
$(MAKE) -C $(KERNELDIR) M=$(CURRENT_PATH) clean
|
||||
rm -rf app.elf
|
||||
|
||||
|
||||
36
linux/driver/hello_driver/app.c
Normal file
36
linux/driver/hello_driver/app.c
Normal file
@ -0,0 +1,36 @@
|
||||
#include <stdio.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <stdint.h>
|
||||
|
||||
uint8_t buffer[512] = { 0 };
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
int fd;
|
||||
int ret;
|
||||
|
||||
fd = open(argv[1], O_RDWR);
|
||||
if (fd < 0) {
|
||||
printf("open failed!\r\n");
|
||||
return -1;
|
||||
}
|
||||
if (!strcmp("read", argv[2])) {
|
||||
printf("read data from kernel\r\n");
|
||||
ret = read(fd, buffer, sizeof(buffer));
|
||||
printf("ret len:%d data:%s\r\n", ret, buffer);
|
||||
} else if (!strcmp("write", argv[2])) {
|
||||
printf("write data to kernel %s len:%ld\r\n", argv[3], strlen(argv[3]));
|
||||
ret = write(fd, argv[3], strlen(argv[3]));
|
||||
printf("ret len:%d\r\n", ret);
|
||||
} else {
|
||||
printf("please input correct para, read/write\r\n");
|
||||
}
|
||||
close(fd);
|
||||
return 0;
|
||||
}
|
||||
|
||||
89
linux/driver/hello_driver/hello_driver.c
Normal file
89
linux/driver/hello_driver/hello_driver.c
Normal file
@ -0,0 +1,89 @@
|
||||
#include <linux/types.h>
|
||||
#include <linux/init.h>
|
||||
#include <linux/interrupt.h>
|
||||
#include <linux/mm.h>
|
||||
#include <linux/slab.h>
|
||||
#include <linux/spinlock.h>
|
||||
#include <linux/module.h>
|
||||
#include <linux/device.h>
|
||||
#include <linux/cdev.h>
|
||||
|
||||
dev_t hello_dev_id;
|
||||
struct cdev hello_cdev;
|
||||
int hello_major = 0;
|
||||
int hello_minor;
|
||||
|
||||
uint8_t kernel_buffer[1024] = { 0 };
|
||||
static struct class *hello_driver_class;
|
||||
|
||||
static int hello_driver_open(struct inode *inode, struct file *file)
|
||||
{
|
||||
printk("hello_driver_open\r\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int hello_driver_release(struct inode *inode, struct file *file)
|
||||
{
|
||||
printk("hello_driver_release\r\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
static ssize_t hello_driver_read(struct file *file, char __user *buffer, size_t size, loff_t *ppos)
|
||||
{
|
||||
printk("hello_driver_read size=%ld\r\n", size);
|
||||
copy_to_user(buffer, kernel_buffer, size);
|
||||
return size;
|
||||
}
|
||||
|
||||
static ssize_t hello_driver_write(struct file *file, const char __user *buffer, size_t size, loff_t *ppos)
|
||||
{
|
||||
printk("hello_driver_write size=%ld\r\n", size);
|
||||
copy_from_user(kernel_buffer, buffer, size);
|
||||
return size;
|
||||
}
|
||||
|
||||
static const struct file_operations hello_driver_fops = {
|
||||
.owner = THIS_MODULE,
|
||||
.open = hello_driver_open,
|
||||
.release = hello_driver_release,
|
||||
.read = hello_driver_read,
|
||||
.write = hello_driver_write,
|
||||
};
|
||||
|
||||
static int __init hello_driver_init(void)
|
||||
{
|
||||
printk("hello_driver_init\r\n");
|
||||
|
||||
/* automatically apply for device number */
|
||||
alloc_chrdev_region(&hello_dev_id, 0, 1, "hello_driver");
|
||||
hello_major = MAJOR(hello_dev_id);
|
||||
hello_minor = MINOR(hello_dev_id);
|
||||
printk("hello_driver: major=%d, minor=%d\r\n", hello_major, hello_minor);
|
||||
|
||||
hello_cdev.owner = THIS_MODULE;
|
||||
cdev_init(&hello_cdev, &hello_driver_fops);
|
||||
cdev_add(&hello_cdev, hello_dev_id, 1);
|
||||
// ret = register_chrdev(CHRDEVBASE_MAJOR, "hello_driver", &hello_driver_fops);
|
||||
|
||||
/* automatically add device node file */
|
||||
hello_driver_class = class_create("hello_driver_class");
|
||||
device_create(hello_driver_class, NULL, hello_dev_id, NULL, "hello_driver_device");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void __exit hello_driver_exit(void)
|
||||
{
|
||||
printk("hello_driver_exit\r\n");
|
||||
|
||||
cdev_del(&hello_cdev);
|
||||
unregister_chrdev_region(hello_dev_id, 1);
|
||||
|
||||
device_destroy(hello_driver_class, hello_dev_id);
|
||||
class_destroy(hello_driver_class);
|
||||
// unregister_chrdev(CHRDEVBASE_MAJOR, "hello_driver");
|
||||
}
|
||||
|
||||
module_init(hello_driver_init);
|
||||
module_exit(hello_driver_exit);
|
||||
MODULE_LICENSE("GPL");
|
||||
33
parse/sdio/cmd52.c
Normal file
33
parse/sdio/cmd52.c
Normal file
@ -0,0 +1,33 @@
|
||||
#include "stdio.h"
|
||||
#include "stdlib.h"
|
||||
#include "stdint.h"
|
||||
|
||||
void main(int argc, char *argv[])
|
||||
{
|
||||
uint32_t cmd, resp;
|
||||
uint32_t addr, data;
|
||||
|
||||
while (1) {
|
||||
printf("Please input cmd52 and response\r\n");
|
||||
scanf("%x %x", &cmd, &resp);
|
||||
printf("your input is cmd52:0x%08lX, resp:0x%08lX\r\n", cmd, resp);
|
||||
/* parse cmd52 */
|
||||
addr = (cmd >> 9) & 0x1FFFF;
|
||||
data = (cmd >> 0) & 0xFF;
|
||||
if (cmd & (1 << 31)) {
|
||||
printf("write 0x%02X to 0x%06X. ", data, addr);
|
||||
if (cmd & (1 << 27)) {
|
||||
printf("with read. ");
|
||||
} else {
|
||||
printf("without read. ");
|
||||
}
|
||||
} else {
|
||||
printf("read data from 0x%06X. ", addr);
|
||||
}
|
||||
printf("func=%d\r\n", (cmd >> 28) & 0x7);
|
||||
/* parse response */
|
||||
addr = (resp >> 8) & 0xFF;
|
||||
data = (resp >> 0) & 0xFF;
|
||||
printf("return data is 0x%02lX, status=0x%02lX\r\n\r\n", data, addr);
|
||||
}
|
||||
}
|
||||
@ -149,6 +149,8 @@ class MainWindow(QMainWindow):
|
||||
self.modbus_worker = None
|
||||
self.init_ui()
|
||||
self.init_modbus()
|
||||
|
||||
concentration = 0
|
||||
|
||||
def init_ui(self):
|
||||
"""初始化用户界面"""
|
||||
@ -254,6 +256,7 @@ class MainWindow(QMainWindow):
|
||||
|
||||
def update_concentration(self, concentration):
|
||||
"""更新浓度显示"""
|
||||
self.concentration = concentration
|
||||
# 直接显示整数,不加小数位
|
||||
if license_check():
|
||||
self.concentration_label.setText(f"{concentration} ppm")
|
||||
@ -261,9 +264,9 @@ class MainWindow(QMainWindow):
|
||||
self.concentration_label.setText("许可证无效")
|
||||
|
||||
# 根据浓度值改变颜色
|
||||
if concentration < 100:
|
||||
if concentration < 100000:
|
||||
color = "#27ae60" # 绿色
|
||||
elif concentration < 500:
|
||||
elif concentration < 500000:
|
||||
color = "#f39c12" # 橙色
|
||||
else:
|
||||
color = "#e74c3c" # 红色
|
||||
@ -280,6 +283,22 @@ class MainWindow(QMainWindow):
|
||||
""")
|
||||
|
||||
def start_calibration(self):
|
||||
"""当浓度大于10%时提示用户"""
|
||||
if self.concentration > 100000:
|
||||
msg_box = QMessageBox()
|
||||
msg_box.setWindowTitle("标定操作")
|
||||
msg_box.setText("当前浓度较高,是否继续标定?")
|
||||
msg_box.setIcon(QMessageBox.Question)
|
||||
calibrate_button = msg_box.addButton("继续标定", QMessageBox.AcceptRole)
|
||||
cancel_button = msg_box.addButton("取消", QMessageBox.RejectRole)
|
||||
msg_box.setDefaultButton(cancel_button)
|
||||
msg_box.exec_()
|
||||
clicked_button = msg_box.clickedButton()
|
||||
if clicked_button == calibrate_button:
|
||||
print("用户选择了【进行标定】")
|
||||
else:
|
||||
print("用户选择了【取消标定】")
|
||||
return
|
||||
"""开始标定"""
|
||||
if self.modbus_worker and not self.modbus_worker.calibrating:
|
||||
self.calibrate_button.setEnabled(False)
|
||||
|
||||
@ -17,6 +17,9 @@ import subprocess
|
||||
import platform
|
||||
import logging
|
||||
|
||||
import paho.mqtt.client as mqtt
|
||||
import json
|
||||
|
||||
class ModbusGasAnalyzer:
|
||||
def __init__(self, root):
|
||||
self.root = root
|
||||
@ -33,9 +36,9 @@ class ModbusGasAnalyzer:
|
||||
except Exception as e:
|
||||
print(f"设置图标失败: {e}")
|
||||
|
||||
# 设置黄金比例窗口大小 (800x495)
|
||||
# 设置黄金比例窗口大小 (800x480)
|
||||
window_width = 800
|
||||
window_height = 495
|
||||
window_height = 480
|
||||
self.root.geometry(f"{window_width}x{window_height}")
|
||||
self.root.configure(bg=self.custom_blue)
|
||||
|
||||
@ -59,9 +62,22 @@ class ModbusGasAnalyzer:
|
||||
"样品信息": "",
|
||||
"出厂日期": "",
|
||||
"产品型号": "",
|
||||
"编号": ""
|
||||
"编号": "",
|
||||
"测试单位": "",
|
||||
"生产单位": "",
|
||||
"送样单位": "",
|
||||
}
|
||||
|
||||
# 服务器MQTT信息 - 初始化值
|
||||
self.mqtt_info = {
|
||||
"服务器地址": "122.112.229.121",
|
||||
"端口号": "1883",
|
||||
"主题": "xiaofang/sensor/data",
|
||||
"用户名": "xiaofang",
|
||||
"密码": "xiaofang@qwer",
|
||||
}
|
||||
self.mqtt_client = None
|
||||
|
||||
# 创建页面
|
||||
self.create_page1()
|
||||
|
||||
@ -83,52 +99,70 @@ class ModbusGasAnalyzer:
|
||||
|
||||
# 左侧内容
|
||||
# 测试人员
|
||||
tk.Label(left_frame, text="测试人员:", font=('Arial', 12), bg=self.custom_blue).pack(anchor='w', pady=5)
|
||||
tk.Label(left_frame, text="测试人员:", font=('Arial', 12), bg=self.custom_blue).grid(row=0, column=0, sticky='w', pady=10, padx=(30, 20))
|
||||
self.tester_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
|
||||
self.tester_entry.pack(fill='x', pady=5)
|
||||
self.tester_entry.grid(row=0, column=1, sticky='ew', pady=10)
|
||||
# 保留之前填写的信息
|
||||
if self.user_info["测试人员"]:
|
||||
self.tester_entry.insert(0, self.user_info["测试人员"])
|
||||
|
||||
# 测试时间
|
||||
tk.Label(left_frame, text="测试时间:", font=('Arial', 12), bg=self.custom_blue).pack(anchor='w', pady=5)
|
||||
tk.Label(left_frame, text="测试时间:", font=('Arial', 12), bg=self.custom_blue).grid(row=1, column=0, sticky='w', pady=10, padx=(30, 20))
|
||||
self.test_time_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
|
||||
self.test_time_entry.pack(fill='x', pady=5)
|
||||
self.test_time_entry.grid(row=1, column=1, sticky='ew', pady=10)
|
||||
# 自动填入当前时间,但如果已有信息则保留
|
||||
if self.user_info["测试时间"]:
|
||||
self.test_time_entry.insert(0, self.user_info["测试时间"])
|
||||
else:
|
||||
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
self.test_time_entry.insert(0, current_time)
|
||||
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
self.test_time_entry.insert(0, current_time)
|
||||
|
||||
# 样品信息
|
||||
tk.Label(left_frame, text="样品信息:", font=('Arial', 12), bg=self.custom_blue).pack(anchor='w', pady=5)
|
||||
tk.Label(left_frame, text="样品信息:", font=('Arial', 12), bg=self.custom_blue).grid(row=2, column=0, sticky='w', pady=10, padx=(30, 20))
|
||||
self.sample_info_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
|
||||
self.sample_info_entry.pack(fill='x', pady=5)
|
||||
self.sample_info_entry.grid(row=2, column=1, sticky='ew', pady=10)
|
||||
if self.user_info["样品信息"]:
|
||||
self.sample_info_entry.insert(0, self.user_info["样品信息"])
|
||||
|
||||
# 出厂日期
|
||||
tk.Label(left_frame, text="出厂日期:", font=('Arial', 12), bg=self.custom_blue).pack(anchor='w', pady=5)
|
||||
tk.Label(left_frame, text="出厂日期:", font=('Arial', 12), bg=self.custom_blue).grid(row=3, column=0, sticky='w', pady=10, padx=(30, 20))
|
||||
self.production_date_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
|
||||
self.production_date_entry.pack(fill='x', pady=5)
|
||||
self.production_date_entry.grid(row=3, column=1, sticky='ew', pady=10)
|
||||
if self.user_info["出厂日期"]:
|
||||
self.production_date_entry.insert(0, self.user_info["出厂日期"])
|
||||
|
||||
# 产品型号
|
||||
tk.Label(left_frame, text="产品型号:", font=('Arial', 12), bg=self.custom_blue).pack(anchor='w', pady=5)
|
||||
tk.Label(left_frame, text="产品型号:", font=('Arial', 12), bg=self.custom_blue).grid(row=4, column=0, sticky='w', pady=10, padx=(30, 20))
|
||||
self.model_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
|
||||
self.model_entry.pack(fill='x', pady=5)
|
||||
self.model_entry.grid(row=4, column=1, sticky='ew', pady=10)
|
||||
if self.user_info["产品型号"]:
|
||||
self.model_entry.insert(0, self.user_info["产品型号"])
|
||||
|
||||
# 编号
|
||||
tk.Label(left_frame, text="编号:", font=('Arial', 12), bg=self.custom_blue).pack(anchor='w', pady=5)
|
||||
tk.Label(left_frame, text="编号:", font=('Arial', 12), bg=self.custom_blue).grid(row=5, column=0, sticky='w', pady=10, padx=(30, 20))
|
||||
self.serial_number_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
|
||||
self.serial_number_entry.pack(fill='x', pady=5)
|
||||
self.serial_number_entry.grid(row=5, column=1, sticky='ew', pady=10)
|
||||
if self.user_info["编号"]:
|
||||
self.serial_number_entry.insert(0, self.user_info["编号"])
|
||||
|
||||
# 测试单位
|
||||
tk.Label(left_frame, text="测试单位:", font=('Arial', 12), bg=self.custom_blue).grid(row=6, column=0, sticky='w', pady=10, padx=(30, 20))
|
||||
self.test_institute_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
|
||||
self.test_institute_entry.grid(row=6, column=1, sticky='ew', pady=10)
|
||||
if self.user_info["测试单位"]:
|
||||
self.test_institute_entry.insert(0, self.user_info["测试单位"])
|
||||
|
||||
# 生产单位
|
||||
tk.Label(left_frame, text="生产单位:", font=('Arial', 12), bg=self.custom_blue).grid(row=7, column=0, sticky='w', pady=10, padx=(30, 20))
|
||||
self.production_institute_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
|
||||
self.production_institute_entry.grid(row=7, column=1, sticky='ew', pady=10)
|
||||
if self.user_info["生产单位"]:
|
||||
self.production_institute_entry.insert(0, self.user_info["生产单位"])
|
||||
|
||||
# 送样单位
|
||||
tk.Label(left_frame, text="送样单位:", font=('Arial', 12), bg=self.custom_blue).grid(row=8, column=0, sticky='w', pady=10, padx=(30, 20))
|
||||
self.sample_institute_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
|
||||
self.sample_institute_entry.grid(row=8, column=1, sticky='ew', pady=10)
|
||||
if self.user_info["送样单位"]:
|
||||
self.sample_institute_entry.insert(0, self.user_info["送样单位"])
|
||||
|
||||
# 右侧内容 - 标题
|
||||
title_label = tk.Label(right_frame, text="HFC-RapidScan多通道\n灭火剂气体快检仪",
|
||||
font=('Arial', 18, 'bold'), bg=self.custom_blue,
|
||||
@ -242,12 +276,15 @@ class ModbusGasAnalyzer:
|
||||
("样品信息:", self.user_info["样品信息"]),
|
||||
("出厂日期:", self.user_info["出厂日期"]),
|
||||
("产品型号:", self.user_info["产品型号"]),
|
||||
("编号:", self.user_info["编号"])
|
||||
("编号:", self.user_info["编号"]),
|
||||
("测试单位:", self.user_info["测试单位"]),
|
||||
("生产单位:", self.user_info["生产单位"]),
|
||||
("送样单位:", self.user_info["送样单位"]),
|
||||
]
|
||||
|
||||
for label, value in info_items:
|
||||
label_frame = tk.Frame(info_frame, bg='white')
|
||||
label_frame.pack(fill='x', pady=8)
|
||||
label_frame.pack(fill='x', pady=3)
|
||||
|
||||
tk.Label(label_frame, text=label, font=('Arial', 12, 'bold'),
|
||||
bg='white', width=10, anchor='w').pack(side='left')
|
||||
@ -283,18 +320,103 @@ class ModbusGasAnalyzer:
|
||||
tk.Button(button_frame, text="保存报告", font=('Arial', 12),
|
||||
command=self.save_report, bg='blue', fg='white').pack(side='left', padx=20)
|
||||
|
||||
tk.Button(button_frame, text="上传到云", font=('Arial', 12),
|
||||
command=self.upload_mqtt, bg='green', fg='white').pack(side='left', padx=20)
|
||||
|
||||
tk.Button(button_frame, text="退出", font=('Arial', 12),
|
||||
command=self.root.quit, bg='red', fg='white').pack(side='right', padx=20)
|
||||
|
||||
# 自动通过MQTT上传到云
|
||||
self.auto_upload_mqtt_start()
|
||||
# 自动保存报告
|
||||
self.save_report()
|
||||
|
||||
def create_page4(self):
|
||||
"""创建第四个页面"""
|
||||
self.clear_window()
|
||||
|
||||
# 创建一个主容器,包含顶部内容区和底部按钮区
|
||||
container = tk.Frame(self.root, bg=self.custom_blue)
|
||||
container.pack(fill='both', expand=True, padx=20, pady=20)
|
||||
|
||||
# 顶部内容框架
|
||||
content_frame = tk.Frame(container, bg=self.custom_blue)
|
||||
content_frame.pack(fill='both', expand=True)
|
||||
|
||||
# 左侧区域
|
||||
left_frame = tk.Frame(content_frame, bg=self.custom_blue)
|
||||
left_frame.pack(side='left', fill='both', expand=True, padx=(0, 10), pady=(50, 0))
|
||||
|
||||
# 右侧区域
|
||||
right_frame = tk.Frame(content_frame, bg=self.custom_blue)
|
||||
right_frame.pack(side='right', fill='both', expand=True, padx=(10, 0))
|
||||
|
||||
# 左侧内容
|
||||
# 服务器地址
|
||||
tk.Label(left_frame, text="服务器地址:", font=('Arial', 12), bg=self.custom_blue).grid(row=0, column=0, sticky='w', pady=15, padx=(30, 20))
|
||||
self.ip_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
|
||||
self.ip_entry.grid(row=0, column=1, sticky='ew', pady=10)
|
||||
# 保留之前填写的信息
|
||||
if self.mqtt_info["服务器地址"]:
|
||||
self.ip_entry.insert(0, self.mqtt_info["服务器地址"])
|
||||
|
||||
# 端口号
|
||||
tk.Label(left_frame, text="端口号:", font=('Arial', 12), bg=self.custom_blue).grid(row=1, column=0, sticky='w', pady=15, padx=(30, 20))
|
||||
self.port_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
|
||||
self.port_entry.grid(row=1, column=1, sticky='ew', pady=10)
|
||||
# 自动填入当前时间,但如果已有信息则保留
|
||||
if self.mqtt_info["端口号"]:
|
||||
self.port_entry.insert(0, self.mqtt_info["端口号"])
|
||||
|
||||
# 主题
|
||||
tk.Label(left_frame, text="主题:", font=('Arial', 12), bg=self.custom_blue).grid(row=2, column=0, sticky='w', pady=15, padx=(30, 20))
|
||||
self.topic_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
|
||||
self.topic_entry.grid(row=2, column=1, sticky='ew', pady=10)
|
||||
if self.mqtt_info["主题"]:
|
||||
self.topic_entry.insert(0, self.mqtt_info["主题"])
|
||||
|
||||
# 用户名
|
||||
tk.Label(left_frame, text="用户名:", font=('Arial', 12), bg=self.custom_blue).grid(row=3, column=0, sticky='w', pady=15, padx=(30, 20))
|
||||
self.user_name_entry = tk.Entry(left_frame, font=('Arial', 12), width=20)
|
||||
self.user_name_entry.grid(row=3, column=1, sticky='ew', pady=10)
|
||||
if self.mqtt_info["用户名"]:
|
||||
self.user_name_entry.insert(0, self.mqtt_info["用户名"])
|
||||
|
||||
# 密码
|
||||
tk.Label(left_frame, text="密码:", font=('Arial', 12), bg=self.custom_blue).grid(row=4, column=0, sticky='w', pady=15, padx=(30, 20))
|
||||
self.user_password_entry = tk.Entry(left_frame, font=('Arial', 12), show="*", width=20)
|
||||
self.user_password_entry.grid(row=4, column=1, sticky='ew', pady=10)
|
||||
if self.mqtt_info["密码"]:
|
||||
self.user_password_entry.insert(0, self.mqtt_info["密码"])
|
||||
|
||||
# 右侧内容 - 标题
|
||||
title_label = tk.Label(right_frame, text="HFC-RapidScan多通道\n灭火剂气体快检仪",
|
||||
font=('Arial', 18, 'bold'), bg=self.custom_blue,
|
||||
justify='center')
|
||||
title_label.pack(pady=40)
|
||||
|
||||
# 开始上传按钮
|
||||
self.upload_mqtt_start_button = tk.Button(right_frame, text="开始上传", font=('Arial', 16, 'bold'),
|
||||
bg='green', fg='white', width=15, height=2,
|
||||
command=self.upload_mqtt_start)
|
||||
self.upload_mqtt_start_button.pack(pady=40)
|
||||
|
||||
# 底部按钮框架 - 放在容器的最下方
|
||||
button_frame = tk.Frame(container, bg=self.custom_blue)
|
||||
button_frame.pack(side='bottom', fill='x', pady=(20, 10))
|
||||
|
||||
tk.Button(button_frame, text="重新测试", font=('Arial', 12),
|
||||
command=self.back_to_page1, bg='green', fg='white').pack(side='left', padx=50)
|
||||
|
||||
tk.Button(button_frame, text="退出", font=('Arial', 12),
|
||||
command=self.root.quit, bg='red', fg='white').pack(side='right', padx=50)
|
||||
|
||||
def save_report(self):
|
||||
"""保存汇总信息为JPG图片"""
|
||||
try:
|
||||
# 创建图片 - 增大高度以容纳所有信息
|
||||
img_width = 1000
|
||||
img_height = 800
|
||||
img_width = 800
|
||||
img_height = 600
|
||||
img = Image.new('RGB', (img_width, img_height), color=self.custom_blue)
|
||||
draw = ImageDraw.Draw(img)
|
||||
|
||||
@ -313,7 +435,7 @@ class ModbusGasAnalyzer:
|
||||
|
||||
# 绘制标题
|
||||
title = "HFC-RapidScan多通道灭火剂气体快检仪"
|
||||
draw.text((img_width//2 - 250, 35), title, fill='black', font=title_font)
|
||||
draw.text((img_width//2 - 310, 35), title, fill='black', font=title_font)
|
||||
|
||||
# 绘制分隔线
|
||||
draw.line([(50, 100), (img_width-50, 100)], fill='black', width=2)
|
||||
@ -326,7 +448,10 @@ class ModbusGasAnalyzer:
|
||||
("样品信息:", self.user_info["样品信息"]),
|
||||
("出厂日期:", self.user_info["出厂日期"]),
|
||||
("产品型号:", self.user_info["产品型号"]),
|
||||
("编号:", self.user_info["编号"])
|
||||
("编号:", self.user_info["编号"]),
|
||||
("测试单位:", self.user_info["测试单位"]),
|
||||
("生产单位:", self.user_info["生产单位"]),
|
||||
("送样单位:", self.user_info["送样单位"]),
|
||||
]
|
||||
|
||||
for label, value in info_items:
|
||||
@ -406,7 +531,10 @@ class ModbusGasAnalyzer:
|
||||
"样品信息": self.sample_info_entry.get(),
|
||||
"出厂日期": self.production_date_entry.get(),
|
||||
"产品型号": self.model_entry.get(),
|
||||
"编号": self.serial_number_entry.get()
|
||||
"编号": self.serial_number_entry.get(),
|
||||
"测试单位": self.test_institute_entry.get(),
|
||||
"生产单位": self.production_institute_entry.get(),
|
||||
"送样单位": self.sample_institute_entry.get(),
|
||||
}
|
||||
|
||||
# 切换到第二个页面
|
||||
@ -509,10 +637,13 @@ class ModbusGasAnalyzer:
|
||||
|
||||
# 限制浓度范围
|
||||
if self.concentration < 0:
|
||||
print(f"DEBUG - : {self.concentration} -> 0")
|
||||
self.concentration = 0.0
|
||||
elif self.concentration > 99.990:
|
||||
self.concentration = 99.990
|
||||
|
||||
elif self.concentration > 99.99:
|
||||
print(f"DEBUG - : {self.concentration} -> 随机数")
|
||||
self.concentration = round(random.uniform(99.960, 99.990), 3)
|
||||
else:
|
||||
print(f"DEBUG - : {self.concentration} normal")
|
||||
# 记录浓度用于变化检测
|
||||
current_time = time.time()
|
||||
self.last_concentrations.append((current_time, self.concentration))
|
||||
@ -526,12 +657,7 @@ class ModbusGasAnalyzer:
|
||||
|
||||
# 检查停止条件 (120秒或浓度稳定)
|
||||
if self.check_stop_conditions():
|
||||
if self.average_concentration > 85:
|
||||
self.show_concentration = round(random.uniform(99.960, 99.990), 3)
|
||||
elif 73 <= self.average_concentration <= 85:
|
||||
self.show_concentration = self.average_concentration + 15
|
||||
else:
|
||||
self.show_concentration = self.average_concentration
|
||||
self.show_concentration = self.average_concentration
|
||||
self.root.after(0, self.create_page3)
|
||||
break
|
||||
|
||||
@ -546,17 +672,7 @@ class ModbusGasAnalyzer:
|
||||
while self.is_testing:
|
||||
try:
|
||||
# 更新浓度显示
|
||||
if self.concentration > 85:
|
||||
temp_concentration = round(random.uniform(99.960, 99.990), 3)
|
||||
print(f"DEBUG - 原始值: {self.concentration} 随机数: {temp_concentration}")
|
||||
elif 73 <= self.concentration <= 85:
|
||||
temp_concentration = self.concentration + 15
|
||||
print(f"DEBUG - 加8: {self.concentration} -> {temp_concentration}")
|
||||
else:
|
||||
temp_concentration = self.concentration
|
||||
print(f"DEBUG - 正常值: {self.concentration} -> {temp_concentration}")
|
||||
concentration_display = max(0.0, min(temp_concentration, 99.990))
|
||||
print(f"DEBUG - 最终值: {temp_concentration} -> {concentration_display}")
|
||||
concentration_display = max(0.0, min(self.concentration, 99.990))
|
||||
concentration_text = f"{concentration_display:.3f}%"
|
||||
|
||||
# 在主线程中更新UI
|
||||
@ -564,13 +680,11 @@ class ModbusGasAnalyzer:
|
||||
|
||||
time.sleep(0.5) # 每0.5秒更新一次UI
|
||||
except Exception as e:
|
||||
print(f"DEBUG - 显示浓度失败: {temp_concentration} -> {concentration_display}")
|
||||
# 打印详细的异常信息
|
||||
print(f"DEBUG - 异常类型: {type(e).__name__}")
|
||||
print(f"DEBUG - 异常信息: {str(e)}")
|
||||
print(f"DEBUG - 异常发生时的变量值:")
|
||||
print(f" self.concentration: {getattr(self, 'concentration', '未定义')}")
|
||||
print(f" temp_concentration: {locals().get('temp_concentration', '未定义')}")
|
||||
print(f" concentration_display: {locals().get('concentration_display', '未定义')}")
|
||||
|
||||
# 打印完整的异常堆栈
|
||||
@ -609,6 +723,138 @@ class ModbusGasAnalyzer:
|
||||
|
||||
return False
|
||||
|
||||
def upload_mqtt(self):
|
||||
# 切换到MQTT页面准备将数据上传
|
||||
self.create_page4()
|
||||
|
||||
def upload_mqtt_start(self):
|
||||
# 保存MQTT信息
|
||||
self.mqtt_info = {
|
||||
"服务器地址": self.ip_entry.get(),
|
||||
"端口号": self.port_entry.get(),
|
||||
"主题": self.topic_entry.get(),
|
||||
"用户名": self.user_name_entry.get(),
|
||||
"密码": self.user_password_entry.get(),
|
||||
}
|
||||
# 将数据通过MQTT上传到服务器
|
||||
print(f"开始上传到MQTT");
|
||||
"""连接MQTT服务器"""
|
||||
try:
|
||||
self.mqtt_client = mqtt.Client()
|
||||
self.mqtt_client.username_pw_set(
|
||||
self.mqtt_info["用户名"],
|
||||
self.mqtt_info["密码"]
|
||||
)
|
||||
def on_connect(client, userdata, flags, rc):
|
||||
if rc == 0:
|
||||
print(f"MQTT连接成功: {rc}")
|
||||
else:
|
||||
messagebox.showerror("错误", f"MQTT连接失败: {rc}")
|
||||
return
|
||||
self.mqtt_client.on_connect = on_connect
|
||||
self.mqtt_client.connect(
|
||||
self.mqtt_info["服务器地址"],
|
||||
int(self.mqtt_info["端口号"]),
|
||||
3
|
||||
)
|
||||
self.mqtt_client.loop_start()
|
||||
except ValueError as e:
|
||||
# 处理端口号转换错误
|
||||
messagebox.showerror("错误", f"端口号必须是数字: {str(e)}")
|
||||
return
|
||||
except Exception as e:
|
||||
messagebox.showerror("错误", f"MQTT连接错误: {str(e)}")
|
||||
return
|
||||
"""整理MQTT数据"""
|
||||
mqtt_payload = []
|
||||
for key, value in self.user_info.items():
|
||||
mqtt_payload.append(f"{key}: {value}")
|
||||
mqtt_payload.append(f"浓度: {max(0.0, min(self.show_concentration, 99.990)):.3f}%")
|
||||
"""上传数据到MQTT"""
|
||||
try:
|
||||
if self.mqtt_client:
|
||||
result = self.mqtt_client.publish(
|
||||
self.mqtt_info["主题"],
|
||||
"\r\n".join(mqtt_payload),
|
||||
qos=1 #服务质量等级
|
||||
)
|
||||
if result.rc == mqtt.MQTT_ERR_SUCCESS:
|
||||
messagebox.showinfo("成功", f"MQTT上传成功")
|
||||
else:
|
||||
messagebox.showerror("错误", f"MQTT上传失败: {result}")
|
||||
return
|
||||
except Exception as e:
|
||||
messagebox.showerror("错误", f"MQTT上传错误: {str(e)}")
|
||||
return
|
||||
"""断开MQTT连接"""
|
||||
try:
|
||||
self.mqtt_client.loop_stop()
|
||||
self.mqtt_client.disconnect()
|
||||
self.mqtt_client = None
|
||||
except Exception as e:
|
||||
print(f"MQTT断开错误: {e}")
|
||||
return
|
||||
|
||||
def auto_upload_mqtt_start(self):
|
||||
# 将数据通过MQTT上传到服务器
|
||||
print(f"自动开始上传到MQTT");
|
||||
"""连接MQTT服务器"""
|
||||
try:
|
||||
self.mqtt_client = mqtt.Client()
|
||||
self.mqtt_client.username_pw_set(
|
||||
self.mqtt_info["用户名"],
|
||||
self.mqtt_info["密码"]
|
||||
)
|
||||
def on_connect(client, userdata, flags, rc):
|
||||
if rc == 0:
|
||||
print(f"自动上传MQTT连接成功: {rc}")
|
||||
else:
|
||||
print(f"自动上传MQTT连接失败: {rc}")
|
||||
return
|
||||
self.mqtt_client.on_connect = on_connect
|
||||
self.mqtt_client.connect(
|
||||
self.mqtt_info["服务器地址"],
|
||||
int(self.mqtt_info["端口号"]),
|
||||
1
|
||||
)
|
||||
self.mqtt_client.loop_start()
|
||||
except ValueError as e:
|
||||
# 处理端口号转换错误
|
||||
print(f"自动上传的端口号必须是数字: {str(e)}")
|
||||
return
|
||||
except Exception as e:
|
||||
print(f"自动上传MQTT连接错误: {str(e)}")
|
||||
return
|
||||
"""整理MQTT数据"""
|
||||
mqtt_payload = []
|
||||
for key, value in self.user_info.items():
|
||||
mqtt_payload.append(f"{key}: {value}")
|
||||
mqtt_payload.append(f"浓度: {max(0.0, min(self.show_concentration, 99.990)):.3f}%")
|
||||
"""上传数据到MQTT"""
|
||||
try:
|
||||
if self.mqtt_client:
|
||||
result = self.mqtt_client.publish(
|
||||
self.mqtt_info["主题"],
|
||||
"\r\n".join(mqtt_payload),
|
||||
qos=1 #服务质量等级
|
||||
)
|
||||
if result.rc == mqtt.MQTT_ERR_SUCCESS:
|
||||
print(f"自动MQTT上传成功")
|
||||
else:
|
||||
print(f"自动MQTT上传失败: {result}")
|
||||
return
|
||||
except Exception as e:
|
||||
print(f"自动MQTT上传错误: {str(e)}")
|
||||
return
|
||||
"""断开MQTT连接"""
|
||||
try:
|
||||
self.mqtt_client.loop_stop()
|
||||
self.mqtt_client.disconnect()
|
||||
self.mqtt_client = None
|
||||
except Exception as e:
|
||||
print(f"自动MQTT断开错误: {e}")
|
||||
return
|
||||
|
||||
def get_windows_serial_number():
|
||||
"""
|
||||
获取Windows系统的唯一序列号
|
||||
|
||||
1
python/mm/mp4_2_jpg/mp4_2_jpg.cmd
Normal file
1
python/mm/mp4_2_jpg/mp4_2_jpg.cmd
Normal file
@ -0,0 +1 @@
|
||||
python.exe .\mp4_2_jpg.py xxx.mp4 -o images -w 480 -ht 272 -q 75 -s 300 -d 2 -c
|
||||
411
python/mm/mp4_2_jpg/mp4_2_jpg.py
Normal file
411
python/mm/mp4_2_jpg/mp4_2_jpg.py
Normal file
@ -0,0 +1,411 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
MP4 to JPEG Converter with ultra-simplified C array generation
|
||||
生成极简C语言数组,只包含原始数据和偏移地址
|
||||
"""
|
||||
|
||||
import cv2
|
||||
import os
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
def align_to(value, alignment):
|
||||
"""将值对齐到指定的字节边界"""
|
||||
return ((value + alignment - 1) // alignment) * alignment
|
||||
|
||||
def save_as_c_array(images_data, output_c_file, array_name="image_data"):
|
||||
"""
|
||||
将图片数据保存为最简化的C语言数组
|
||||
|
||||
Args:
|
||||
images_data: 图片数据字节串列表
|
||||
output_c_file: 输出C文件路径
|
||||
array_name: C数组名称前缀
|
||||
"""
|
||||
|
||||
if not images_data:
|
||||
print("Warning: No image data to save")
|
||||
return False
|
||||
|
||||
print(f"Generating simplified C array file: {output_c_file}")
|
||||
|
||||
try:
|
||||
# 计算总数据大小和偏移地址
|
||||
total_size = 0
|
||||
offsets = []
|
||||
|
||||
for data in images_data:
|
||||
data_size = len(data)
|
||||
aligned_size = align_to(data_size, 8)
|
||||
offsets.append(total_size)
|
||||
total_size += aligned_size
|
||||
|
||||
print(f" Total images: {len(images_data)}")
|
||||
print(f" Total data size: {total_size} bytes")
|
||||
|
||||
with open(output_c_file, 'w', encoding='utf-8') as f:
|
||||
# 文件头
|
||||
f.write("/* Auto-generated by MP4 to JPEG converter */\n")
|
||||
f.write(f"/* Images: {len(images_data)}, Total size: {total_size} bytes, 8-byte aligned */\n\n")
|
||||
|
||||
# 图片数量宏定义
|
||||
f.write(f"#define {array_name.upper()}_COUNT {len(images_data)}\n\n")
|
||||
|
||||
# 原始数据数组
|
||||
f.write(f"/* All image data (8-byte aligned) */\n")
|
||||
f.write(f"__attribute((aligned(32))) ATTR_PSRAM_DATA_SECTION uint8_t {array_name}_raw[{total_size}] = {{\n")
|
||||
|
||||
# 写入所有图片数据(连续存储)
|
||||
all_data = bytearray()
|
||||
for i, data in enumerate(images_data):
|
||||
data_size = len(data)
|
||||
aligned_size = align_to(data_size, 8)
|
||||
|
||||
# 添加图片数据
|
||||
all_data.extend(data)
|
||||
|
||||
# 添加对齐填充
|
||||
padding_size = aligned_size - data_size
|
||||
all_data.extend(b'\x00' * padding_size)
|
||||
|
||||
# 写入数据,每行16个字节
|
||||
for i in range(0, len(all_data), 16):
|
||||
line = " "
|
||||
for j in range(16):
|
||||
if i + j < len(all_data):
|
||||
byte_val = all_data[i + j]
|
||||
line += f"0x{byte_val:02X}, "
|
||||
f.write(line + "\n")
|
||||
|
||||
f.write("};\n\n")
|
||||
|
||||
# 偏移地址数组
|
||||
f.write(f"/* Array of image data offset */\n")
|
||||
f.write(f"const uint32_t {array_name}_offset[{array_name.upper()}_COUNT] = {{\n")
|
||||
|
||||
# 每行4个偏移值
|
||||
for i in range(0, len(offsets), 4):
|
||||
line = " "
|
||||
for j in range(4):
|
||||
idx = i + j
|
||||
if idx < len(offsets):
|
||||
offset = offsets[idx]
|
||||
f.write(f"{offset:8d}, /* Image {idx:3d} */\n")
|
||||
|
||||
f.write("};\n")
|
||||
|
||||
print(f"C array file generated successfully!")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error generating C array file: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
return False
|
||||
|
||||
def save_as_simple_c_array(images_data, output_c_file, array_name="image_data"):
|
||||
"""
|
||||
保存为更简单的C数组格式,每行一个偏移值
|
||||
"""
|
||||
|
||||
if not images_data:
|
||||
print("Warning: No image data to save")
|
||||
return False
|
||||
|
||||
print(f"Generating simple C array file: {output_c_file}")
|
||||
|
||||
try:
|
||||
# 计算总数据大小和偏移地址
|
||||
total_size = 0
|
||||
offsets = []
|
||||
|
||||
for data in images_data:
|
||||
data_size = len(data)
|
||||
aligned_size = align_to(data_size, 8)
|
||||
offsets.append(total_size)
|
||||
total_size += aligned_size
|
||||
|
||||
print(f" Total images: {len(images_data)}")
|
||||
print(f" Total data size: {total_size} bytes")
|
||||
|
||||
with open(output_c_file, 'w', encoding='utf-8') as f:
|
||||
# 文件头
|
||||
f.write("/* Auto-generated by MP4 to JPEG converter */\n")
|
||||
f.write(f"/* Images: {len(images_data)}, Total size: {total_size} bytes */\n\n")
|
||||
|
||||
# 图片数量宏定义
|
||||
f.write(f"#define {array_name.upper()}_COUNT {len(images_data)}\n\n")
|
||||
|
||||
# 原始数据数组
|
||||
f.write(f"__attribute((aligned(32))) ATTR_PSRAM_DATA_SECTION uint8_t {array_name}_raw[{total_size}] = {{\n")
|
||||
|
||||
# 写入所有图片数据(连续存储)
|
||||
all_data = bytearray()
|
||||
for data in images_data:
|
||||
data_size = len(data)
|
||||
aligned_size = align_to(data_size, 8)
|
||||
|
||||
# 添加图片数据
|
||||
all_data.extend(data)
|
||||
|
||||
# 添加对齐填充
|
||||
padding_size = aligned_size - data_size
|
||||
all_data.extend(b'\x00' * padding_size)
|
||||
|
||||
# 写入数据,每行16个字节
|
||||
bytes_written = 0
|
||||
while bytes_written < len(all_data):
|
||||
line = " "
|
||||
for j in range(16):
|
||||
if bytes_written < len(all_data):
|
||||
byte_val = all_data[bytes_written]
|
||||
line += f"0x{byte_val:02X}, "
|
||||
bytes_written += 1
|
||||
f.write(line + "\n")
|
||||
|
||||
f.write("};\n\n")
|
||||
|
||||
# 偏移地址数组
|
||||
f.write(f"const uint32_t {array_name}_offset[{array_name.upper()}_COUNT] = {{\n")
|
||||
|
||||
# 每行一个偏移值
|
||||
for i, offset in enumerate(offsets):
|
||||
f.write(f" {offset}, /* Image {i} */\n")
|
||||
|
||||
f.write("};\n")
|
||||
|
||||
print(f"Simple C array file generated successfully!")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error generating simple C array file: {e}")
|
||||
return False
|
||||
|
||||
def mp4_to_jpg(video_path, output_dir, width=None, height=None,
|
||||
quality=95, start_time=0, duration=None, prefix='frame',
|
||||
generate_c_array=False, c_array_name="image_data", c_array_file=None):
|
||||
"""
|
||||
Convert MP4 video to JPEG images with ultra-simplified C array generation
|
||||
"""
|
||||
|
||||
# Create output directory
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
print(f"Output directory: {output_dir}")
|
||||
|
||||
# Check input file
|
||||
if not os.path.exists(video_path):
|
||||
print(f"Error: File '{video_path}' does not exist")
|
||||
return False, []
|
||||
|
||||
print(f"Input video: {video_path}")
|
||||
|
||||
# Open video
|
||||
cap = cv2.VideoCapture(video_path)
|
||||
if not cap.isOpened():
|
||||
print(f"Error: Cannot open video file")
|
||||
return False, []
|
||||
|
||||
# Get video info
|
||||
fps = cap.get(cv2.CAP_PROP_FPS)
|
||||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
|
||||
if fps <= 0 or total_frames <= 0:
|
||||
print("Error: Invalid video file")
|
||||
cap.release()
|
||||
return False, []
|
||||
|
||||
print(f"Video FPS: {fps:.2f}")
|
||||
print(f"Total frames: {total_frames}")
|
||||
|
||||
# Calculate frame range
|
||||
start_frame = int(start_time * fps)
|
||||
|
||||
if duration is not None:
|
||||
end_frame = int((start_time + duration) * fps)
|
||||
end_frame = min(end_frame, total_frames)
|
||||
else:
|
||||
end_frame = total_frames
|
||||
|
||||
print(f"Start frame: {start_frame}")
|
||||
print(f"End frame: {end_frame}")
|
||||
print(f"Frames to extract: {end_frame - start_frame}")
|
||||
|
||||
# Set start position
|
||||
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
|
||||
|
||||
# Get original size
|
||||
original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
print(f"Original size: {original_width} x {original_height}")
|
||||
|
||||
# Determine output size
|
||||
if width is None and height is None:
|
||||
output_width, output_height = original_width, original_height
|
||||
print("Keeping original size")
|
||||
elif width is not None and height is not None:
|
||||
output_width, output_height = width, height
|
||||
print(f"Target size: {output_width} x {output_height}")
|
||||
else:
|
||||
# Calculate maintaining aspect ratio
|
||||
if width is None:
|
||||
aspect_ratio = original_width / original_height
|
||||
output_height = height
|
||||
output_width = int(output_height * aspect_ratio)
|
||||
else:
|
||||
aspect_ratio = original_height / original_width
|
||||
output_width = width
|
||||
output_height = int(output_width * aspect_ratio)
|
||||
print(f"Resizing to: {output_width} x {output_height}")
|
||||
|
||||
# Validate quality
|
||||
if quality < 0 or quality > 100:
|
||||
print(f"Warning: Quality {quality} out of range. Using 95")
|
||||
quality = 95
|
||||
|
||||
print(f"JPEG quality: {quality}")
|
||||
if generate_c_array:
|
||||
print(f"Will generate C array with name: {c_array_name}")
|
||||
|
||||
# Extract frames
|
||||
frame_count = 0
|
||||
saved_count = 0
|
||||
images_data = [] # 存储图片数据(bytes)
|
||||
|
||||
try:
|
||||
while cap.isOpened():
|
||||
if start_frame + frame_count >= end_frame:
|
||||
break
|
||||
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
break
|
||||
|
||||
# Resize
|
||||
if (output_width, output_height) != (original_width, original_height):
|
||||
frame = cv2.resize(frame, (output_width, output_height))
|
||||
|
||||
# Save image to file
|
||||
filename = f"{prefix}_{saved_count:06d}.jpg"
|
||||
output_path = os.path.join(output_dir, filename)
|
||||
|
||||
# 使用imwrite保存图片
|
||||
success = cv2.imwrite(output_path, frame, [cv2.IMWRITE_JPEG_QUALITY, quality])
|
||||
|
||||
if success:
|
||||
# 读取保存的图片文件数据
|
||||
try:
|
||||
with open(output_path, 'rb') as img_file:
|
||||
img_data = img_file.read()
|
||||
|
||||
images_data.append(img_data)
|
||||
saved_count += 1
|
||||
|
||||
if saved_count % 50 == 0:
|
||||
print(f" Saved {saved_count} images...")
|
||||
except Exception as e:
|
||||
print(f" Warning: Failed to read saved image {saved_count}: {e}")
|
||||
else:
|
||||
print(f" Warning: Failed to save image {saved_count}")
|
||||
|
||||
frame_count += 1
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nInterrupted by user")
|
||||
except Exception as e:
|
||||
print(f"Error during extraction: {e}")
|
||||
finally:
|
||||
cap.release()
|
||||
|
||||
print(f"\nImage extraction completed!")
|
||||
print(f"Total images saved: {saved_count}")
|
||||
print(f"Location: {output_dir}")
|
||||
|
||||
# 生成C数组文件
|
||||
if generate_c_array and images_data:
|
||||
if c_array_file is None:
|
||||
c_array_file = os.path.join(output_dir, f"{c_array_name}.c")
|
||||
|
||||
success = save_as_simple_c_array(images_data, c_array_file, c_array_name)
|
||||
|
||||
if success:
|
||||
print(f"C array generation completed!")
|
||||
else:
|
||||
print(f"C array generation failed!")
|
||||
|
||||
return saved_count > 0, images_data
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Convert MP4 to JPEG images with ultra-simplified C array generation',
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
Examples:
|
||||
# Basic conversion (no C array)
|
||||
python mp4_2_jpg.py video.mp4 -o images
|
||||
|
||||
# With specific size and quality
|
||||
python mp4_2_jpg.py video.mp4 -o images -w 640 -ht 480 -q 90
|
||||
|
||||
# With time range
|
||||
python mp4_2_jpg.py video.mp4 -o images -s 30 -d 10 -w 320
|
||||
|
||||
# Generate C array file
|
||||
python mp4_2_jpg.py video.mp4 -o images -w 240 -c -n video_frames
|
||||
"""
|
||||
)
|
||||
|
||||
# Required argument
|
||||
parser.add_argument('input', help='Input video file path')
|
||||
|
||||
# Optional arguments
|
||||
parser.add_argument('-o', '--output', default='output_frames',
|
||||
help='Output directory (default: output_frames)')
|
||||
parser.add_argument('-w', '--width', type=int,
|
||||
help='Output image width (pixels)')
|
||||
parser.add_argument('-ht', '--height', type=int,
|
||||
help='Output image height (pixels)')
|
||||
parser.add_argument('-q', '--quality', type=int, default=95,
|
||||
help='JPEG quality (0-100, default: 95)')
|
||||
parser.add_argument('-s', '--start', type=float, default=0,
|
||||
help='Start time in seconds (default: 0)')
|
||||
parser.add_argument('-d', '--duration', type=float,
|
||||
help='Duration to extract in seconds')
|
||||
parser.add_argument('-p', '--prefix', default='frame',
|
||||
help='Filename prefix (default: frame)')
|
||||
|
||||
# C array generation arguments
|
||||
parser.add_argument('-c', '--c-array', action='store_true',
|
||||
help='Generate C array file with image data')
|
||||
parser.add_argument('-n', '--c-name', default='image_data',
|
||||
help='C array name prefix (default: image_data)')
|
||||
parser.add_argument('-cf', '--c-file',
|
||||
help='Output C file path (default: <output_dir>/<c_name>.c)')
|
||||
|
||||
# If no args, show help
|
||||
if len(sys.argv) == 1:
|
||||
parser.print_help()
|
||||
return
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Call conversion function
|
||||
success, images_data = mp4_to_jpg(
|
||||
video_path=args.input,
|
||||
output_dir=args.output,
|
||||
width=args.width,
|
||||
height=args.height,
|
||||
quality=args.quality,
|
||||
start_time=args.start,
|
||||
duration=args.duration,
|
||||
prefix=args.prefix,
|
||||
generate_c_array=args.c_array,
|
||||
c_array_name=args.c_name,
|
||||
c_array_file=args.c_file
|
||||
)
|
||||
|
||||
if not success:
|
||||
print("Conversion failed!")
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Reference in New Issue
Block a user