上传文件至 src/dispatch

This commit is contained in:
TY
2026-01-09 15:00:39 +08:00
parent 87fb647c36
commit abdb7048f6
5 changed files with 1156 additions and 253 deletions

View File

@@ -1,254 +1,254 @@
#include "advanced_dispatcher.h"
#include <iostream>
#include <algorithm>
AdvancedDispatcher::AdvancedDispatcher(GraphMap* map, ResourceManager* resource_manager)
: graph_map_(map)
, resource_manager_(resource_manager)
, agv_manager_(nullptr)
, state_(DispatcherState::STOPPED)
, should_stop_(false)
, dispatch_interval_ms_(1000) {
}
AdvancedDispatcher::AdvancedDispatcher(GraphMap* map, ResourceManager* resource_manager, AGVManagerBase* agv_manager)
: graph_map_(map)
, resource_manager_(resource_manager)
, agv_manager_(agv_manager)
, state_(DispatcherState::STOPPED)
, should_stop_(false)
, dispatch_interval_ms_(1000) {
}
AdvancedDispatcher::~AdvancedDispatcher() {
stop();
}
bool AdvancedDispatcher::start() {
if (state_.load() == DispatcherState::RUNNING) {
return true;
}
should_stop_.store(false);
state_.store(DispatcherState::RUNNING);
// 启动调度线程
dispatch_thread_ = std::thread(&AdvancedDispatcher::dispatchLoop, this);
std::cout << "[调度器] 高级调度器已启动" << std::endl;
return true;
}
void AdvancedDispatcher::stop() {
if (state_.load() == DispatcherState::STOPPED) {
return;
}
should_stop_.store(true);
state_.store(DispatcherState::STOPPED);
// 等待线程结束
if (dispatch_thread_.joinable()) {
dispatch_thread_.join();
}
std::cout << "[调度器] 高级调度器已停止" << std::endl;
}
void AdvancedDispatcher::pause() {
if (state_.load() == DispatcherState::RUNNING) {
state_.store(DispatcherState::PAUSED);
std::cout << "[调度器] 调度器已暂停" << std::endl;
}
}
void AdvancedDispatcher::resume() {
if (state_.load() == DispatcherState::PAUSED) {
state_.store(DispatcherState::RUNNING);
std::cout << "[调度器] 调度器已恢复" << std::endl;
}
}
bool AdvancedDispatcher::addTask(int task_id, int start_point_id, int end_point_id,
const std::string& description, int priority) {
std::lock_guard<std::mutex> lock(task_mutex_);
// 检查任务是否已存在
for (const auto& task : task_queue_) {
if (task->id == task_id) {
std::cout << "[调度器] 任务 " << task_id << " 已存在" << std::endl;
return false;
}
}
// 创建新任务
Point* start_point = graph_map_->getPointById(start_point_id);
Point* end_point = graph_map_->getPointById(end_point_id);
if (!start_point || !end_point) {
std::cout << "[调度器] 错误: 任务 " << task_id << " 的节点不存在" << std::endl;
return false;
}
auto task = std::make_unique<Task>(task_id, priority, start_point, end_point, description);
task_queue_.push_back(std::move(task));
statistics_.total_tasks++;
std::cout << "[调度器] 添加任务 " << task_id << ": " << description << std::endl;
return true;
}
bool AdvancedDispatcher::cancelTask(int task_id) {
std::lock_guard<std::mutex> lock(task_mutex_);
for (auto it = task_queue_.begin(); it != task_queue_.end(); ++it) {
if ((*it)->id == task_id) {
(*it)->status = TaskStatus::FAILED;
std::cout << "[调度器] 取消任务 " << task_id << std::endl;
return true;
}
}
return false;
}
void AdvancedDispatcher::onTaskStarted(int task_id, int agv_id) {
std::lock_guard<std::mutex> lock(task_mutex_);
for (const auto& task : task_queue_) {
if (task->id == task_id) {
task->status = TaskStatus::IN_PROGRESS;
std::cout << "[调度器] 任务 " << task_id << " 开始执行 (AGV-" << agv_id << ")" << std::endl;
break;
}
}
}
void AdvancedDispatcher::onTaskCompleted(int task_id, int agv_id, bool success) {
std::lock_guard<std::mutex> lock(task_mutex_);
for (const auto& task : task_queue_) {
if (task->id == task_id) {
task->status = success ? TaskStatus::COMPLETED : TaskStatus::FAILED;
if (success) {
statistics_.completed_tasks++;
std::cout << "[调度器] 任务 " << task_id << " 完成成功 (AGV-" << agv_id << ")" << std::endl;
} else {
statistics_.failed_tasks++;
std::cout << "[调度器] 任务 " << task_id << " 执行失败 (AGV-" << agv_id << ")" << std::endl;
}
break;
}
}
}
DispatcherStatistics AdvancedDispatcher::getStatistics() {
updateStatistics();
return statistics_;
}
void AdvancedDispatcher::printSystemStatus() const {
std::lock_guard<std::mutex> lock(dispatcher_mutex_);
std::cout << "\n===== 调度器状态 =====" << std::endl;
// 调度器状态
std::string state_str;
switch (state_.load()) {
case DispatcherState::STOPPED: state_str = "已停止"; break;
case DispatcherState::RUNNING: state_str = "运行中"; break;
case DispatcherState::PAUSED: state_str = "已暂停"; break;
}
std::cout << "调度器状态: " << state_str << std::endl;
// 任务统计
std::cout << "任务统计: 总数 " << statistics_.total_tasks
<< ", 完成 " << statistics_.completed_tasks
<< ", 失败 " << statistics_.failed_tasks << std::endl;
// 活跃AGV数量
std::cout << "活跃AGV数量: " << statistics_.active_agvs << std::endl;
// 任务队列状态
{
std::lock_guard<std::mutex> task_lock(task_mutex_);
size_t pending_count = 0;
size_t executing_count = 0;
size_t completed_count = 0;
for (const auto& task : task_queue_) {
switch (task->status) {
case TaskStatus::PENDING: pending_count++; break;
case TaskStatus::IN_PROGRESS: executing_count++; break;
case TaskStatus::COMPLETED: completed_count++; break;
default: break;
}
}
std::cout << "任务队列: 待处理 " << pending_count
<< ", 执行中 " << executing_count
<< ", 已完成 " << completed_count << std::endl;
}
std::cout << "====================" << std::endl;
}
void AdvancedDispatcher::dispatchLoop() {
while (!should_stop_.load()) {
if (state_.load() == DispatcherState::RUNNING) {
performDispatch();
}
std::this_thread::sleep_for(std::chrono::milliseconds(dispatch_interval_ms_));
}
}
void AdvancedDispatcher::performDispatch() {
// 如果有AGV管理器使用其分配功能
if (agv_manager_) {
agv_manager_->assignTasks();
} else {
// 简化的调度逻辑
std::lock_guard<std::mutex> task_lock(task_mutex_);
for (const auto& task : task_queue_) {
if (task->status == TaskStatus::PENDING) {
// 简单地将任务状态设为进行中实际实现中需要分配给AGV
task->status = TaskStatus::IN_PROGRESS;
std::cout << "[调度器] 分配任务 " << task->id << " (简化调度)" << std::endl;
break;
}
}
}
}
bool AdvancedDispatcher::assignTaskToAGV(Task* task, AGV* agv) {
if (!task || !agv || agv->state != AGVState::IDLE) {
return false;
}
// 设置任务分配
agv->current_task = task;
agv->state = AGVState::ASSIGNED;
task->status = TaskStatus::ASSIGNED;
task->assigned_agv = agv;
return true;
}
void AdvancedDispatcher::updateStatistics() {
std::lock_guard<std::mutex> lock(dispatcher_mutex_);
// 更新活跃AGV数量
if (agv_manager_) {
auto all_agvs = agv_manager_->getAllAGVs();
statistics_.active_agvs = 0;
for (AGV* agv : all_agvs) {
if (agv->state != AGVState::IDLE) {
statistics_.active_agvs++;
}
}
}
#include "advanced_dispatcher.h"
#include <iostream>
#include <algorithm>
AdvancedDispatcher::AdvancedDispatcher(GraphMap* map, ResourceManager* resource_manager)
: graph_map_(map)
, resource_manager_(resource_manager)
, agv_manager_(nullptr)
, state_(DispatcherState::STOPPED)
, should_stop_(false)
, dispatch_interval_ms_(1000) {
}
AdvancedDispatcher::AdvancedDispatcher(GraphMap* map, ResourceManager* resource_manager, AGVManagerBase* agv_manager)
: graph_map_(map)
, resource_manager_(resource_manager)
, agv_manager_(agv_manager)
, state_(DispatcherState::STOPPED)
, should_stop_(false)
, dispatch_interval_ms_(1000) {
}
AdvancedDispatcher::~AdvancedDispatcher() {
stop();
}
bool AdvancedDispatcher::start() {
if (state_.load() == DispatcherState::RUNNING) {
return true;
}
should_stop_.store(false);
state_.store(DispatcherState::RUNNING);
// 启动调度线程
dispatch_thread_ = std::thread(&AdvancedDispatcher::dispatchLoop, this);
std::cout << "[调度器] 高级调度器已启动" << std::endl;
return true;
}
void AdvancedDispatcher::stop() {
if (state_.load() == DispatcherState::STOPPED) {
return;
}
should_stop_.store(true);
state_.store(DispatcherState::STOPPED);
// 等待线程结束
if (dispatch_thread_.joinable()) {
dispatch_thread_.join();
}
std::cout << "[调度器] 高级调度器已停止" << std::endl;
}
void AdvancedDispatcher::pause() {
if (state_.load() == DispatcherState::RUNNING) {
state_.store(DispatcherState::PAUSED);
std::cout << "[调度器] 调度器已暂停" << std::endl;
}
}
void AdvancedDispatcher::resume() {
if (state_.load() == DispatcherState::PAUSED) {
state_.store(DispatcherState::RUNNING);
std::cout << "[调度器] 调度器已恢复" << std::endl;
}
}
bool AdvancedDispatcher::addTask(int task_id, int start_point_id, int end_point_id,
const std::string& description, int priority) {
std::lock_guard<std::mutex> lock(task_mutex_);
// 检查任务是否已存在
for (const auto& task : task_queue_) {
if (task->id == task_id) {
std::cout << "[调度器] 任务 " << task_id << " 已存在" << std::endl;
return false;
}
}
// 创建新任务
Point* start_point = graph_map_->getPointById(start_point_id);
Point* end_point = graph_map_->getPointById(end_point_id);
if (!start_point || !end_point) {
std::cout << "[调度器] 错误: 任务 " << task_id << " 的节点不存在" << std::endl;
return false;
}
auto task = std::make_unique<Task>(task_id, priority, start_point, end_point, description);
task_queue_.push_back(std::move(task));
statistics_.total_tasks++;
std::cout << "[调度器] 添加任务 " << task_id << ": " << description << std::endl;
return true;
}
bool AdvancedDispatcher::cancelTask(int task_id) {
std::lock_guard<std::mutex> lock(task_mutex_);
for (auto it = task_queue_.begin(); it != task_queue_.end(); ++it) {
if ((*it)->id == task_id) {
(*it)->status = TaskStatus::FAILED;
std::cout << "[调度器] 取消任务 " << task_id << std::endl;
return true;
}
}
return false;
}
void AdvancedDispatcher::onTaskStarted(int task_id, int agv_id) {
std::lock_guard<std::mutex> lock(task_mutex_);
for (const auto& task : task_queue_) {
if (task->id == task_id) {
task->status = TaskStatus::IN_PROGRESS;
std::cout << "[调度器] 任务 " << task_id << " 开始执行 (AGV-" << agv_id << ")" << std::endl;
break;
}
}
}
void AdvancedDispatcher::onTaskCompleted(int task_id, int agv_id, bool success) {
std::lock_guard<std::mutex> lock(task_mutex_);
for (const auto& task : task_queue_) {
if (task->id == task_id) {
task->status = success ? TaskStatus::COMPLETED : TaskStatus::FAILED;
if (success) {
statistics_.completed_tasks++;
std::cout << "[调度器] 任务 " << task_id << " 完成成功 (AGV-" << agv_id << ")" << std::endl;
} else {
statistics_.failed_tasks++;
std::cout << "[调度器] 任务 " << task_id << " 执行失败 (AGV-" << agv_id << ")" << std::endl;
}
break;
}
}
}
DispatcherStatistics AdvancedDispatcher::getStatistics() {
updateStatistics();
return statistics_;
}
void AdvancedDispatcher::printSystemStatus() const {
std::lock_guard<std::mutex> lock(dispatcher_mutex_);
std::cout << "\n===== 调度器状态 =====" << std::endl;
// 调度器状态
std::string state_str;
switch (state_.load()) {
case DispatcherState::STOPPED: state_str = "已停止"; break;
case DispatcherState::RUNNING: state_str = "运行中"; break;
case DispatcherState::PAUSED: state_str = "已暂停"; break;
}
std::cout << "调度器状态: " << state_str << std::endl;
// 任务统计
std::cout << "任务统计: 总数 " << statistics_.total_tasks
<< ", 完成 " << statistics_.completed_tasks
<< ", 失败 " << statistics_.failed_tasks << std::endl;
// 活跃AGV数量
std::cout << "活跃AGV数量: " << statistics_.active_agvs << std::endl;
// 任务队列状态
{
std::lock_guard<std::mutex> task_lock(task_mutex_);
size_t pending_count = 0;
size_t executing_count = 0;
size_t completed_count = 0;
for (const auto& task : task_queue_) {
switch (task->status) {
case TaskStatus::PENDING: pending_count++; break;
case TaskStatus::IN_PROGRESS: executing_count++; break;
case TaskStatus::COMPLETED: completed_count++; break;
default: break;
}
}
std::cout << "任务队列: 待处理 " << pending_count
<< ", 执行中 " << executing_count
<< ", 已完成 " << completed_count << std::endl;
}
std::cout << "====================" << std::endl;
}
void AdvancedDispatcher::dispatchLoop() {
while (!should_stop_.load()) {
if (state_.load() == DispatcherState::RUNNING) {
performDispatch();
}
std::this_thread::sleep_for(std::chrono::milliseconds(dispatch_interval_ms_));
}
}
void AdvancedDispatcher::performDispatch() {
// 如果有AGV管理器使用其分配功能
if (agv_manager_) {
agv_manager_->assignTasks();
} else {
// 简化的调度逻辑
std::lock_guard<std::mutex> task_lock(task_mutex_);
for (const auto& task : task_queue_) {
if (task->status == TaskStatus::PENDING) {
// 简单地将任务状态设为进行中实际实现中需要分配给AGV
task->status = TaskStatus::IN_PROGRESS;
std::cout << "[调度器] 分配任务 " << task->id << " (简化调度)" << std::endl;
break;
}
}
}
}
bool AdvancedDispatcher::assignTaskToAGV(Task* task, AGV* agv) {
if (!task || !agv || agv->state != AGVState::IDLE) {
return false;
}
// 设置任务分配
agv->current_task = task;
agv->state = AGVState::ASSIGNED;
task->status = TaskStatus::ASSIGNED;
task->assigned_agv = agv;
return true;
}
void AdvancedDispatcher::updateStatistics() {
std::lock_guard<std::mutex> lock(dispatcher_mutex_);
// 更新活跃AGV数量
if (agv_manager_) {
auto all_agvs = agv_manager_->getAllAGVs();
statistics_.active_agvs = 0;
for (AGV* agv : all_agvs) {
if (agv->state != AGVState::IDLE) {
statistics_.active_agvs++;
}
}
}
}

View File

@@ -0,0 +1,97 @@
#ifndef ADVANCED_DISPATCHER_H
#define ADVANCED_DISPATCHER_H
#include "graph_map.h"
#include "agv_manager_base.h"
#include "resource_manager.h"
#include <thread>
#include <atomic>
#include <chrono>
#include <functional>
/**
* @brief 高级调度器统计信息
*/
struct DispatcherStatistics {
uint64_t total_tasks;
uint64_t completed_tasks;
uint64_t failed_tasks;
uint64_t active_agvs;
DispatcherStatistics()
: total_tasks(0), completed_tasks(0), failed_tasks(0), active_agvs(0) {}
};
/**
* @brief 高级调度器 - 基于图的多AGV任务调度系统
*/
class AdvancedDispatcher {
public:
enum class DispatcherState {
STOPPED, // 停止
RUNNING, // 运行中
PAUSED // 暂停
};
private:
// 核心组件
GraphMap* graph_map_;
ResourceManager* resource_manager_;
AGVManagerBase* agv_manager_;
// 调度器状态
std::atomic<DispatcherState> state_;
std::atomic<bool> should_stop_;
// 调度线程
std::thread dispatch_thread_;
// 调度参数
int dispatch_interval_ms_; // 调度间隔(毫秒)
// 统计信息
DispatcherStatistics statistics_;
// 任务队列(简化实现)
std::vector<std::unique_ptr<Task>> task_queue_;
mutable std::mutex task_mutex_;
// 互斥锁
mutable std::mutex dispatcher_mutex_;
public:
AdvancedDispatcher(GraphMap* map, ResourceManager* resource_manager);
AdvancedDispatcher(GraphMap* map, ResourceManager* resource_manager, AGVManagerBase* agv_manager);
~AdvancedDispatcher();
// ==================== 系统控制 ====================
bool start();
void stop();
void pause();
void resume();
DispatcherState getState() const { return state_.load(); }
// ==================== 任务管理 ====================
bool addTask(int task_id, int start_point_id, int end_point_id,
const std::string& description, int priority = 1);
bool cancelTask(int task_id);
// ==================== 任务执行反馈 ====================
void onTaskStarted(int task_id, int agv_id);
void onTaskCompleted(int task_id, int agv_id, bool success);
// ==================== 配置和监控 ====================
void setDispatchInterval(int interval_ms) { dispatch_interval_ms_ = interval_ms; }
DispatcherStatistics getStatistics();
void printSystemStatus() const;
private:
// ==================== 内部方法 ====================
void dispatchLoop();
void performDispatch();
bool assignTaskToAGV(Task* task, AGV* agv);
void updateStatistics();
};
#endif // ADVANCED_DISPATCHER_H

View File

@@ -0,0 +1,308 @@
#include "advanced_dispatcher.h"
#include <iostream>
#include <sstream>
#include <thread>
AdvancedDispatcher::AdvancedDispatcher()
: state_(DispatcherState::STOPPED)
, should_stop_(false)
, dispatch_interval_ms_(1000)
, feedback_check_interval_ms_(100)
, total_tasks_assigned_(0)
, total_tasks_completed_(0)
, total_paths_executed_(0) {
}
AdvancedDispatcher::~AdvancedDispatcher() {
stop();
}
bool AdvancedDispatcher::initialize() {
// 创建地图和AGV管理器
graph_map_ = std::make_unique<GraphMap>();
agv_manager_ = std::make_unique<AGVManager>(graph_map_.get());
// 创建示例地图
createSampleMap();
// 创建示例AGV
createSampleAGVs();
// 设置任务完成回调
agv_manager_->setTaskCompleteCallback([this](const Task& task, int agv_id) {
total_tasks_completed_++;
if (task_completion_callback_) {
task_completion_callback_(task, agv_id);
}
});
std::cout << "[系统] 调度器初始化完成" << std::endl;
return true;
}
void AdvancedDispatcher::createSampleMap() {
// 创建示例节点(仓库布局)
std::vector<std::tuple<int, double, double, std::string>> points = {
{1, 0, 0, "入口"},
{2, 5, 0, "存储区A"},
{3, 10, 0, "存储区B"},
{4, 10, 5, "存储区C"},
{5, 10, 10, "存储区D"},
{6, 5, 10, "存储区E"},
{7, 0, 10, "存储区F"},
{8, 0, 5, "充电站"},
{9, 5, 5, "中心枢纽"},
};
for (auto [id, x, y, name] : points) {
graph_map_->addPoint(id, x, y, name);
}
// 创建路径(双向)
std::vector<std::tuple<int, int, int>> paths = {
{101, 1, 2}, {102, 2, 3}, {103, 3, 4}, {104, 4, 5},
{105, 5, 6}, {106, 6, 7}, {107, 7, 1}, {108, 1, 8},
{109, 2, 9}, {110, 9, 3}, {111, 9, 6}, {112, 9, 8},
{113, 7, 8}
};
for (auto [id, start, end] : paths) {
graph_map_->addPath(id, start, end, true);
}
std::cout << "[地图] 创建了 " << points.size() << " 个节点和 "
<< paths.size() << " 条路径" << std::endl;
}
void AdvancedDispatcher::createSampleAGVs() {
// 创建示例AGV
agv_manager_->addAGV(101, "AGV-01", 1); // 入口
agv_manager_->addAGV(102, "AGV-02", 2); // 存储区A
agv_manager_->addAGV(103, "AGV-03", 8); // 充电站
std::cout << "[系统] 创建了 3 个AGV" << std::endl;
}
bool AdvancedDispatcher::start() {
if (state_ == DispatcherState::RUNNING) {
std::cout << "[警告] 调度器已在运行" << std::endl;
return false;
}
should_stop_ = false;
state_ = DispatcherState::RUNNING;
// 启动调度线程
dispatch_thread_ = std::thread(&AdvancedDispatcher::dispatchLoop, this);
feedback_thread_ = std::thread(&AdvancedDispatcher::feedbackLoop, this);
std::cout << "[系统] 调度器已启动" << std::endl;
return true;
}
void AdvancedDispatcher::stop() {
if (state_ == DispatcherState::STOPPED) {
return;
}
should_stop_ = true;
state_ = DispatcherState::STOPPED;
if (dispatch_thread_.joinable()) {
dispatch_thread_.join();
}
if (feedback_thread_.joinable()) {
feedback_thread_.join();
}
std::cout << "[系统] 调度器已停止" << std::endl;
}
void AdvancedDispatcher::pause() {
if (state_ == DispatcherState::RUNNING) {
state_ = DispatcherState::PAUSED;
std::cout << "[系统] 调度器已暂停" << std::endl;
}
}
void AdvancedDispatcher::resume() {
if (state_ == DispatcherState::PAUSED) {
state_ = DispatcherState::RUNNING;
std::cout << "[系统] 调度器已恢复" << std::endl;
}
}
Point* AdvancedDispatcher::addMapPoint(int id, double x, double y, const std::string& name) {
return graph_map_->addPoint(id, x, y, name);
}
Path* AdvancedDispatcher::addMapPath(int id, int start_point_id, int end_point_id, bool bidirectional) {
return graph_map_->addPath(id, start_point_id, end_point_id, bidirectional);
}
AGV* AdvancedDispatcher::addAGV(int id, const std::string& name, int start_point_id) {
return agv_manager_->addAGV(id, name, start_point_id);
}
AGV* AdvancedDispatcher::getAGV(int id) {
return agv_manager_->getAGV(id);
}
int AdvancedDispatcher::addTransportTask(int priority, int start_point_id, int end_point_id,
const std::string& description) {
int task_id = agv_manager_->addTask(priority, start_point_id, end_point_id, description);
total_tasks_assigned_++;
std::cout << "[任务] 添加运输任务 " << task_id << ": " << description << std::endl;
return task_id;
}
bool AdvancedDispatcher::cancelTask(int task_id) {
// 实现任务取消逻辑
std::cout << "[系统] 取消任务 " << task_id << std::endl;
return true;
}
void AdvancedDispatcher::getTaskQueueStatus(size_t& pending_count, size_t& executing_count) const {
agv_manager_->getTaskQueueStatus(pending_count, executing_count);
}
void AdvancedDispatcher::onPathExecutionFeedback(int agv_id, int path_id, bool success, int error_code) {
agv_manager_->onPathCompleted(agv_id, path_id, success);
total_paths_executed_++;
if (path_execution_callback_) {
std::string status = success ? "成功" : "失败";
path_execution_callback_(agv_id, status + " (路径" + std::to_string(path_id) + ")");
}
}
Path* AdvancedDispatcher::getNextPathForAGV(int agv_id) {
AGV* agv = agv_manager_->getAGV(agv_id);
if (!agv) {
return nullptr;
}
return agv->getNextPath();
}
void AdvancedDispatcher::setPathExecutionCallback(std::function<void(int, const std::string&)> callback) {
path_execution_callback_ = callback;
}
void AdvancedDispatcher::setTaskCompletionCallback(std::function<void(const Task&, int)> callback) {
task_completion_callback_ = callback;
}
void AdvancedDispatcher::getStatistics(uint64_t& total_assigned, uint64_t& total_completed,
uint64_t& total_executed) const {
total_assigned = total_tasks_assigned_;
total_completed = total_tasks_completed_;
total_executed = total_paths_executed_;
}
void AdvancedDispatcher::printSystemStatus() const {
std::cout << "\n===== 系统状态 =====" << std::endl;
std::string state_str;
switch (state_) {
case DispatcherState::STOPPED: state_str = "停止"; break;
case DispatcherState::RUNNING: state_str = "运行中"; break;
case DispatcherState::PAUSED: state_str = "暂停"; break;
}
std::cout << "调度器状态: " << state_str << std::endl;
size_t pending, executing;
getTaskQueueStatus(pending, executing);
std::cout << "待处理任务: " << pending << ", 执行中任务: " << executing << std::endl;
uint64_t assigned, completed, executed;
getStatistics(assigned, completed, executed);
std::cout << "已分配任务: " << assigned << ", 已完成任务: " << completed << std::endl;
std::cout << "已执行路径: " << executed << std::endl;
// 打印AGV状态
agv_manager_->printAGVStatus();
}
void AdvancedDispatcher::printMapInfo() const {
size_t num_points, num_paths;
graph_map_->getStatistics(num_points, num_paths);
std::cout << "\n===== 地图信息 =====" << std::endl;
std::cout << "节点数: " << num_points << std::endl;
std::cout << "路径数: " << num_paths << std::endl;
std::cout << "===================" << std::endl;
}
void AdvancedDispatcher::printTaskAssignment() const {
size_t pending, executing;
getTaskQueueStatus(pending, executing);
std::cout << "\n===== 任务分配情况 =====" << std::endl;
std::cout << "待分配任务数: " << pending << std::endl;
std::cout << "执行中任务数: " << executing << std::endl;
// 可以添加更详细的任务分配信息
std::cout << "========================" << std::endl;
}
void AdvancedDispatcher::dispatchLoop() {
while (!should_stop_) {
if (state_ == DispatcherState::RUNNING) {
performDispatch();
}
std::this_thread::sleep_for(std::chrono::milliseconds(dispatch_interval_ms_));
}
}
void AdvancedDispatcher::feedbackLoop() {
while (!should_stop_) {
if (state_ == DispatcherState::RUNNING) {
// 处理反馈和等待的AGV
handleWaitingAGVs();
// 定期检测死锁
static int counter = 0;
if (++counter >= 10) { // 每10秒检测一次
detectAndResolveDeadlocks();
counter = 0;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(feedback_check_interval_ms_));
}
}
void AdvancedDispatcher::performDispatch() {
// 执行任务分配
int assigned = agv_manager_->assignTasks();
if (assigned > 0) {
std::cout << "[调度器] 成功分配了 " << assigned << " 个任务" << std::endl;
}
}
void AdvancedDispatcher::handleWaitingAGVs() {
// 检查并处理等待中的AGV
// 实现等待AGV的重新调度逻辑
}
void AdvancedDispatcher::detectAndResolveDeadlocks() {
// 检测并解决死锁
// 实现死锁检测和解决算法
}
std::string AdvancedDispatcher::agvStateToString(AGVState state) {
switch (state) {
case AGVState::IDLE: return "空闲";
case AGVState::ASSIGNED: return "已分配";
case AGVState::MOVING: return "移动中";
case AGVState::WAITING: return "等待中";
case AGVState::CHARGING: return "充电中";
case AGVState::ERROR: return "错误";
case AGVState::MAINTENANCE: return "维护中";
default: return "未知";
}
}

View File

@@ -0,0 +1,109 @@
#ifndef ADVANCED_DISPATCHER_H
#define ADVANCED_DISPATCHER_H
#include "graph_map.h"
#include "agv_manager.h"
#include <thread>
#include <atomic>
#include <chrono>
#include <functional>
/**
* @brief 新版调度器 - 基于图的多AGV任务调度系统
*/
class AdvancedDispatcher {
public:
enum class DispatcherState {
STOPPED, // 停止
RUNNING, // 运行中
PAUSED // 暂停
};
private:
// 核心组件
std::unique_ptr<GraphMap> graph_map_;
std::unique_ptr<AGVManager> agv_manager_;
// 调度器状态
std::atomic<DispatcherState> state_;
std::atomic<bool> should_stop_;
// 调度线程
std::thread dispatch_thread_;
std::thread feedback_thread_;
// 调度参数
int dispatch_interval_ms_; // 调度间隔(毫秒)
int feedback_check_interval_ms_; // 反馈检查间隔(毫秒)
// 统计信息
std::atomic<uint64_t> total_tasks_assigned_;
std::atomic<uint64_t> total_tasks_completed_;
std::atomic<uint64_t> total_paths_executed_;
// 回调函数
std::function<void(int, const std::string&)> path_execution_callback_;
std::function<void(const Task&, int)> task_completion_callback_;
// 互斥锁
mutable std::mutex dispatcher_mutex_;
public:
AdvancedDispatcher();
~AdvancedDispatcher();
// ==================== 系统控制 ====================
bool initialize();
bool start();
void stop();
void pause();
void resume();
DispatcherState getState() const { return state_.load(); }
// ==================== 地图管理 ====================
Point* addMapPoint(int id, double x, double y, const std::string& name = "");
Path* addMapPath(int id, int start_point_id, int end_point_id, bool bidirectional = true);
// ==================== AGV管理 ====================
AGV* addAGV(int id, const std::string& name, int start_point_id);
AGV* getAGV(int id);
// ==================== 任务管理 ====================
int addTransportTask(int priority, int start_point_id, int end_point_id,
const std::string& description = "");
bool cancelTask(int task_id);
void getTaskQueueStatus(size_t& pending_count, size_t& executing_count) const;
// ==================== 路径执行反馈 ====================
void onPathExecutionFeedback(int agv_id, int path_id, bool success, int error_code = 0);
Path* getNextPathForAGV(int agv_id);
// ==================== 配置和监控 ====================
void setDispatchInterval(int interval_ms) { dispatch_interval_ms_ = interval_ms; }
void setPathExecutionCallback(std::function<void(int, const std::string&)> callback);
void setTaskCompletionCallback(std::function<void(const Task&, int)> callback);
void getStatistics(uint64_t& total_assigned, uint64_t& total_completed,
uint64_t& total_executed) const;
void printSystemStatus() const;
void printMapInfo() const;
void printTaskAssignment() const;
private:
// ==================== 内部方法 ====================
void dispatchLoop();
void feedbackLoop();
void performDispatch();
bool assignTaskToAGV(AGV* agv, Task& task);
bool isPathAvailable(Path* path, const AGV* agv);
bool reservePathResources(AGV* agv, const std::vector<Path*>& path);
void releasePathResources(AGV* agv);
void handleWaitingAGVs();
void detectAndResolveDeadlocks();
std::string agvStateToString(AGVState state);
// 初始化辅助方法
void createSampleMap();
void createSampleAGVs();
};
#endif // ADVANCED_DISPATCHER_H

View File

@@ -0,0 +1,389 @@
#include "agv_executor.h"
#include <iostream>
#include <algorithm>
AGVExecutor::AGVExecutor(AGV* agv, ResourceManager* resource_manager, GraphMap* graph_map)
: agv_(agv)
, resource_manager_(resource_manager)
, graph_map_(graph_map)
, state_(ExecutionState::IDLE)
, should_stop_(false)
, is_paused_(false)
, current_path_index_(0)
, current_executing_path_(nullptr)
, request_timeout_ms_(5000)
, execution_interval_ms_(100)
, speed_factor_(1.0)
, total_paths_executed_(0)
, total_wait_time_(0)
, total_execution_time_(0) {
}
AGVExecutor::~AGVExecutor() {
stopExecution();
if (execution_thread_.joinable()) {
execution_thread_.join();
}
}
void AGVExecutor::setParameters(int request_timeout_ms, int execution_interval_ms, double speed_factor) {
request_timeout_ms_ = request_timeout_ms;
execution_interval_ms_ = execution_interval_ms;
speed_factor_ = speed_factor;
}
bool AGVExecutor::startExecution(const std::vector<Path*>& path) {
std::lock_guard<std::mutex> lock(execution_mutex_);
if (state_.load() != ExecutionState::IDLE) {
return false;
}
if (path.empty()) {
handleError("路径为空");
return false;
}
planned_path_ = path;
current_path_index_ = 0;
current_executing_path_ = nullptr;
should_stop_ = false;
is_paused_ = false;
state_.store(ExecutionState::PLANNING);
// 启动执行线程
execution_thread_ = std::thread(&AGVExecutor::executionLoop, this);
return true;
}
void AGVExecutor::pauseExecution() {
std::unique_lock<std::mutex> lock(execution_mutex_);
is_paused_ = true;
execution_cv_.notify_all();
}
void AGVExecutor::resumeExecution() {
std::unique_lock<std::mutex> lock(execution_mutex_);
is_paused_ = false;
execution_cv_.notify_all();
}
void AGVExecutor::stopExecution() {
{
std::unique_lock<std::mutex> lock(execution_mutex_);
should_stop_ = true;
execution_cv_.notify_all();
}
if (execution_thread_.joinable()) {
execution_thread_.join();
}
state_.store(ExecutionState::IDLE);
}
void AGVExecutor::forceStop() {
std::lock_guard<std::mutex> lock(execution_mutex_);
// 立即释放所有资源
if (current_executing_path_) {
resource_manager_->releasePath(agv_, current_executing_path_);
current_executing_path_ = nullptr;
}
should_stop_ = true;
execution_cv_.notify_all();
state_.store(ExecutionState::ERROR);
}
Path* AGVExecutor::getCurrentPath() const {
std::lock_guard<std::mutex> lock(execution_mutex_);
return current_executing_path_;
}
std::pair<size_t, size_t> AGVExecutor::getProgress() const {
std::lock_guard<std::mutex> lock(execution_mutex_);
return std::make_pair(current_path_index_, planned_path_.size());
}
void AGVExecutor::getStatistics(uint64_t& total_paths, uint64_t& wait_time,
uint64_t& execution_time) const {
total_paths = total_paths_executed_;
wait_time = total_wait_time_;
execution_time = total_execution_time_;
}
void AGVExecutor::executionLoop() {
while (!should_stop_) {
// 检查暂停状态
{
std::unique_lock<std::mutex> lock(execution_mutex_);
if (is_paused_) {
execution_cv_.wait(lock, [this] { return !is_paused_ || should_stop_; });
continue;
}
}
state_.store(ExecutionState::EXECUTING);
// 检查是否还有路径要执行
{
std::lock_guard<std::mutex> lock(execution_mutex_);
if (current_path_index_ >= planned_path_.size()) {
state_.store(ExecutionState::COMPLETED);
break;
}
current_executing_path_ = planned_path_[current_path_index_];
}
// 执行当前路径段
if (!executePathSegment(current_executing_path_)) {
handleError("路径执行失败");
break;
}
// 移动到下一个路径
if (!moveToNextPath()) {
state_.store(ExecutionState::COMPLETED);
break;
}
// 短暂休息
std::this_thread::sleep_for(std::chrono::milliseconds(execution_interval_ms_));
}
if (state_.load() == ExecutionState::COMPLETED) {
std::cout << "[AGV-" << agv_->name << "] 路径执行完成" << std::endl;
}
}
bool AGVExecutor::executePathSegment(Path* path) {
auto start_time = std::chrono::steady_clock::now();
// 1. 请求路径资源
state_.store(ExecutionState::REQUESTING);
auto wait_start = std::chrono::steady_clock::now();
if (!requestPathResource(path)) {
auto wait_end = std::chrono::steady_clock::now();
auto wait_duration = std::chrono::duration_cast<std::chrono::milliseconds>(wait_end - wait_start);
total_wait_time_ += wait_duration.count();
return false;
}
auto wait_end = std::chrono::steady_clock::now();
auto wait_duration = std::chrono::duration_cast<std::chrono::milliseconds>(wait_end - wait_start);
total_wait_time_ += wait_duration.count();
// 2. 执行路径
state_.store(ExecutionState::EXECUTING);
if (path_started_callback_) {
path_started_callback_(agv_->id, path);
}
auto exec_start = std::chrono::steady_clock::now();
int exec_time = simulatePathExecution(path);
// 模拟执行时间
std::this_thread::sleep_for(std::chrono::milliseconds(exec_time));
auto exec_end = std::chrono::steady_clock::now();
auto exec_duration = std::chrono::duration_cast<std::chrono::milliseconds>(exec_end - exec_start);
total_execution_time_ += exec_duration.count();
if (path_completed_callback_) {
path_completed_callback_(agv_->id, path);
}
// 3. 更新AGV位置
updateAGVPosition(path);
// 4. 释放路径资源
releasePathResource(path);
total_paths_executed_++;
return true;
}
bool AGVExecutor::requestPathResource(Path* path) {
// 使用资源管理器请求路径
auto status = resource_manager_->requestPath(agv_, path, request_timeout_ms_);
switch (status) {
case ResourceRequestStatus::GRANTED:
return true;
case ResourceRequestStatus::TIMEOUT:
handleError("资源请求超时");
return false;
case ResourceRequestStatus::CANCELLED:
handleError("资源请求被取消");
return false;
default:
return false;
}
}
void AGVExecutor::releasePathResource(Path* path) {
resource_manager_->releasePath(agv_, path);
}
bool AGVExecutor::moveToNextPath() {
std::lock_guard<std::mutex> lock(execution_mutex_);
if (current_path_index_ < planned_path_.size()) {
current_path_index_++;
return true;
}
return false;
}
int AGVExecutor::simulatePathExecution(Path* path) {
// 计算执行时间基于路径长度和AGV速度
if (!path || agv_ == nullptr) {
return 1000; // 默认1秒
}
// 实际速度 = 最大速度 × 速度因子
double actual_speed = agv_->max_speed * speed_factor_;
// 执行时间 = 路径长度 / 实际速度(秒)
double execution_time_seconds = path->length / actual_speed;
// 转换为毫秒
return static_cast<int>(execution_time_seconds * 1000);
}
void AGVExecutor::handleError(const std::string& error_message) {
state_.store(ExecutionState::ERROR);
std::cout << "[AGV-" << agv_->name << "] 错误: " << error_message << std::endl;
if (error_callback_) {
error_callback_(agv_->id, error_message);
}
}
void AGVExecutor::updateAGVPosition(Path* path) {
if (!path || !agv_) {
return;
}
// 更新AGV位置到路径的终点
if (agv_->current_position == path->start) {
agv_->current_position = path->end;
} else if (agv_->current_position == path->end) {
agv_->current_position = path->start;
}
// 更新节点占用
if (agv_->current_position->occupied_by == agv_) {
// 已经占用,不需要更新
} else {
// 清除旧占用
auto all_paths = graph_map_->getAllPaths();
for (auto path_ptr : all_paths) {
if (path_ptr->occupied_by == agv_) {
path_ptr->occupied_by = nullptr;
}
}
// 设置新占用
agv_->current_position->occupied_by = agv_;
}
}
std::string AGVExecutor::stateToString(ExecutionState state) {
switch (state) {
case ExecutionState::IDLE: return "空闲";
case ExecutionState::PLANNING: return "规划中";
case ExecutionState::REQUESTING: return "请求资源";
case ExecutionState::EXECUTING: return "执行中";
case ExecutionState::WAITING: return "等待中";
case ExecutionState::COMPLETED: return "已完成";
case ExecutionState::ERROR: return "错误";
case ExecutionState::PAUSED: return "暂停";
default: return "未知";
}
}
// AGVExecutorManager实现
AGVExecutorManager::AGVExecutorManager(ResourceManager* resource_manager, GraphMap* graph_map)
: resource_manager_(resource_manager)
, graph_map_(graph_map) {
}
void AGVExecutorManager::addAGVExecutor(AGV* agv) {
if (agv) {
auto executor = std::make_unique<AGVExecutor>(agv, resource_manager_, graph_map_);
executors_[agv->id] = std::move(executor);
}
}
void AGVExecutorManager::removeAGVExecutor(int agv_id) {
auto it = executors_.find(agv_id);
if (it != executors_.end()) {
it->second->stopExecution();
executors_.erase(it);
}
}
AGVExecutor* AGVExecutorManager::getExecutor(int agv_id) {
auto it = executors_.find(agv_id);
return it != executors_.end() ? it->second.get() : nullptr;
}
void AGVExecutorManager::stopAll() {
for (auto& pair : executors_) {
pair.second->stopExecution();
}
}
void AGVExecutorManager::pauseAll() {
for (auto& pair : executors_) {
pair.second->pauseExecution();
}
}
void AGVExecutorManager::resumeAll() {
for (auto& pair : executors_) {
pair.second->resumeExecution();
}
}
void AGVExecutorManager::getStatistics(std::map<int, std::pair<size_t, size_t>>& progress) const {
progress.clear();
for (const auto& pair : executors_) {
int agv_id = pair.first;
auto& executor = pair.second;
progress[agv_id] = executor->getProgress();
}
}
void AGVExecutorManager::printStatus() const {
std::cout << "\n===== AGV执行器状态 =====" << std::endl;
for (const auto& pair : executors_) {
int agv_id = pair.first;
auto& executor = pair.second;
std::cout << "AGV-" << agv_id << ": " << executor->stateToString(executor->getState());
auto progress = executor->getProgress();
std::cout << " (进度: " << progress.first << "/" << progress.second << ")";
if (executor->getCurrentPath()) {
std::cout << " - 正在执行路径" << executor->getCurrentPath()->id;
}
std::cout << std::endl;
}
std::cout << "========================" << std::endl;
}