foreach ($teacherMemberIdsArray as $memberId) {
    // 发送消息队列进行生成excel
    $jobData = [
        'shop_id' => $shopId,
        'teacher_member_id' => $memberId,
        'job_unique' => $jobUnique,
        'salary_job_id' => $salaryJob['id'],
        'teacher_name' => $teacherNames[$memberId] ?? '',
        'month_time' => $params['month_time'],
        'op_member_id' => jwt('member_id'),
    ];

    $delay = 0;
    $queue = 'get_single_salary_excel';
    queue(GetSingleSalaryExcel::class, $jobData, $delay, $queue);

}

public function fire(Job $job, $data)
{
    if ($job->attempts() > 3) {
        // todo 获取不能运转的正确原因--写日志或者其他方式
        echo "任务尝试次数过多,删除任务\n";
        $job->delete();  // 强制删除,避免无限循环
        return;
    }

    $salaryJob = SalaryJob::lock(true)->find($data['salary_job_id']);
    if (!$salaryJob || $salaryJob['status'] != 1) {
        echo "任务不存在或状态异常,删除任务\n";
        $job->delete();
        return;
    }


    Db::startTrans();
    try {
        // 生成excel
        $filePath = $this->generateExcel($data, $salaryJob);
        echo "获取文件成功\n";

        // 行锁只能保证多个请求不会同时修改同一条数据,并不能保证每次都能读取到最新的值
        // 原值+1的写法有可能读不到最新的值,因为别的进程可能已经在修改了

        // $job->delete(); // 删除任务之后只是删除了数据库的任务,之后的代码会继续执行

        // 更新任务表数据
        $this->updateSalaryJob($data, $salaryJob, $filePath);
        echo "更新任务表数据\n";

        // 推送wss数据
        $this->sendWssMessage($data, $salaryJob);
        echo "推送wss数据\n";

        // 是否开启打包任务
        $this->openPackageJob($data);
        echo "是否开启打包任务判断\n";
        $job->delete();
        Db::commit();
    } catch (\Throwable $e) {
        echo $e->getMessage() . "\n";
        $job->delete();
        Db::rollback();
    }
}

protected function openPackageJob($data)
{

    $job = SalaryJob::field(['id', 'finished_file_num', 'total_file_num'])->where('id', $data['salary_job_id'])->find();
    echo "开始打包任务检查\n";
    echo "已完成文件数:" . $job['finished_file_num'] . "\n";
    echo "总文件数:" . $job['total_file_num'] . "\n";

    if ($job['finished_file_num'] >= $job['total_file_num']) {

        SalaryJob::update([
            'status' => 2
        ], ['id' => $data['salary_job_id']
        ]);
        // 派发打包任务
        $jobData = [
            'salary_job_id' => $data['salary_job_id'],
            'shop_id' => $data['shop_id'],
            'op_member_id' => $data['op_member_id']
        ];
        $delay = 0;
        $queue = 'package_zip';
        queue(PackageZip::class, $jobData, $delay, $queue);
    }
}

public function fire(Job $job, $data)
{
    try {
        $salaryJob = SalaryJob::where([
            'id' => $data['salary_job_id'],
        ])->find();
        echo "打包已开始\n";
        $zipPath = $salaryJob['zip_path'];
        $dir = dirname($zipPath);
        if (!is_dir($dir) && !mkdir($dir, 0777, true) && !is_dir($dir)) {
            throw new \RuntimeException('无法创建目录: ' . $dir);
        }
        $zip = new \ZipArchive();
        $zip->open($zipPath, \ZipArchive::CREATE | \ZipArchive::OVERWRITE);

        // 获取子文件路径的数组
        $extendArray = json_decode($salaryJob['extend_json'], true);
        $filePaths = array_column($extendArray, 'file_path');

        echo "开始添加文件到zip\n";
        foreach ($filePaths as $f) {
            $f = public_path() . $f;
            $zip->addFile($f, basename($f));

        }
        $zip->close();

        SalaryJob::update([
            'expire_time' => time() + 86400, // 24小时后过期
            'status' => 3, // 已打包
        ], ['id' => $data['salary_job_id']]);


        $this->sendWssMessage($data['op_member_id'],$data['salary_job_id']);

        $job->delete();
    } catch (\Throwable $exception) {
        echo $exception->getMessage() . "\n";
        $job->delete();
    }
}

1. 生成教师薪资excel--php think queue:listen --queue=get_single_salary_excel
2. 薪资打包成excel--php think queue:listen --queue=package_zip
3. 生成的文件X小时之后删除-- php think queue:listen --queue=delete_file(没有这个队列)(使用流式传输可以在下载完成之后直接删除文件,但是要把接口的请求时间改长一点)