foreach ($teacherMemberIdsArray as $memberId) {
// 发送消息队列进行生成excel
$jobData = [
'shop_id' => $shopId,
'teacher_member_id' => $memberId,
'job_unique' => $jobUnique,
'salary_job_id' => $salaryJob['id'],
'teacher_name' => $teacherNames[$memberId] ?? '',
'month_time' => $params['month_time'],
'op_member_id' => jwt('member_id'),
];
$delay = 0;
$queue = 'get_single_salary_excel';
queue(GetSingleSalaryExcel::class, $jobData, $delay, $queue);
}
public function fire(Job $job, $data)
{
if ($job->attempts() > 3) {
// todo 获取不能运转的正确原因--写日志或者其他方式
echo "任务尝试次数过多,删除任务\n";
$job->delete(); // 强制删除,避免无限循环
return;
}
$salaryJob = SalaryJob::lock(true)->find($data['salary_job_id']);
if (!$salaryJob || $salaryJob['status'] != 1) {
echo "任务不存在或状态异常,删除任务\n";
$job->delete();
return;
}
Db::startTrans();
try {
// 生成excel
$filePath = $this->generateExcel($data, $salaryJob);
echo "获取文件成功\n";
// 行锁只能保证多个请求不会同时修改同一条数据,并不能保证每次都能读取到最新的值
// 原值+1的写法有可能读不到最新的值,因为别的进程可能已经在修改了
// $job->delete(); // 删除任务之后只是删除了数据库的任务,之后的代码会继续执行
// 更新任务表数据
$this->updateSalaryJob($data, $salaryJob, $filePath);
echo "更新任务表数据\n";
// 推送wss数据
$this->sendWssMessage($data, $salaryJob);
echo "推送wss数据\n";
// 是否开启打包任务
$this->openPackageJob($data);
echo "是否开启打包任务判断\n";
$job->delete();
Db::commit();
} catch (\Throwable $e) {
echo $e->getMessage() . "\n";
$job->delete();
Db::rollback();
}
}
protected function openPackageJob($data)
{
$job = SalaryJob::field(['id', 'finished_file_num', 'total_file_num'])->where('id', $data['salary_job_id'])->find();
echo "开始打包任务检查\n";
echo "已完成文件数:" . $job['finished_file_num'] . "\n";
echo "总文件数:" . $job['total_file_num'] . "\n";
if ($job['finished_file_num'] >= $job['total_file_num']) {
SalaryJob::update([
'status' => 2
], ['id' => $data['salary_job_id']
]);
// 派发打包任务
$jobData = [
'salary_job_id' => $data['salary_job_id'],
'shop_id' => $data['shop_id'],
'op_member_id' => $data['op_member_id']
];
$delay = 0;
$queue = 'package_zip';
queue(PackageZip::class, $jobData, $delay, $queue);
}
}
public function fire(Job $job, $data)
{
try {
$salaryJob = SalaryJob::where([
'id' => $data['salary_job_id'],
])->find();
echo "打包已开始\n";
$zipPath = $salaryJob['zip_path'];
$dir = dirname($zipPath);
if (!is_dir($dir) && !mkdir($dir, 0777, true) && !is_dir($dir)) {
throw new \RuntimeException('无法创建目录: ' . $dir);
}
$zip = new \ZipArchive();
$zip->open($zipPath, \ZipArchive::CREATE | \ZipArchive::OVERWRITE);
// 获取子文件路径的数组
$extendArray = json_decode($salaryJob['extend_json'], true);
$filePaths = array_column($extendArray, 'file_path');
echo "开始添加文件到zip\n";
foreach ($filePaths as $f) {
$f = public_path() . $f;
$zip->addFile($f, basename($f));
}
$zip->close();
SalaryJob::update([
'expire_time' => time() + 86400, // 24小时后过期
'status' => 3, // 已打包
], ['id' => $data['salary_job_id']]);
$this->sendWssMessage($data['op_member_id'],$data['salary_job_id']);
$job->delete();
} catch (\Throwable $exception) {
echo $exception->getMessage() . "\n";
$job->delete();
}
}
1. 生成教师薪资excel--php think queue:listen --queue=get_single_salary_excel
2. 薪资打包成excel--php think queue:listen --queue=package_zip
3. 生成的文件X小时之后删除-- php think queue:listen --queue=delete_file(没有这个队列)(使用流式传输可以在下载完成之后直接删除文件,但是要把接口的请求时间改长一点)