添加 src/dispatch/advanced_dispatcher.cpp

This commit is contained in:
TY
2026-01-09 12:12:23 +08:00
parent 7b6c956b6a
commit 87fb647c36

View File

@@ -0,0 +1,254 @@
#include "advanced_dispatcher.h"
#include <iostream>
#include <algorithm>
AdvancedDispatcher::AdvancedDispatcher(GraphMap* map, ResourceManager* resource_manager)
: graph_map_(map)
, resource_manager_(resource_manager)
, agv_manager_(nullptr)
, state_(DispatcherState::STOPPED)
, should_stop_(false)
, dispatch_interval_ms_(1000) {
}
AdvancedDispatcher::AdvancedDispatcher(GraphMap* map, ResourceManager* resource_manager, AGVManagerBase* agv_manager)
: graph_map_(map)
, resource_manager_(resource_manager)
, agv_manager_(agv_manager)
, state_(DispatcherState::STOPPED)
, should_stop_(false)
, dispatch_interval_ms_(1000) {
}
AdvancedDispatcher::~AdvancedDispatcher() {
stop();
}
bool AdvancedDispatcher::start() {
if (state_.load() == DispatcherState::RUNNING) {
return true;
}
should_stop_.store(false);
state_.store(DispatcherState::RUNNING);
// 启动调度线程
dispatch_thread_ = std::thread(&AdvancedDispatcher::dispatchLoop, this);
std::cout << "[调度器] 高级调度器已启动" << std::endl;
return true;
}
void AdvancedDispatcher::stop() {
if (state_.load() == DispatcherState::STOPPED) {
return;
}
should_stop_.store(true);
state_.store(DispatcherState::STOPPED);
// 等待线程结束
if (dispatch_thread_.joinable()) {
dispatch_thread_.join();
}
std::cout << "[调度器] 高级调度器已停止" << std::endl;
}
void AdvancedDispatcher::pause() {
if (state_.load() == DispatcherState::RUNNING) {
state_.store(DispatcherState::PAUSED);
std::cout << "[调度器] 调度器已暂停" << std::endl;
}
}
void AdvancedDispatcher::resume() {
if (state_.load() == DispatcherState::PAUSED) {
state_.store(DispatcherState::RUNNING);
std::cout << "[调度器] 调度器已恢复" << std::endl;
}
}
bool AdvancedDispatcher::addTask(int task_id, int start_point_id, int end_point_id,
const std::string& description, int priority) {
std::lock_guard<std::mutex> lock(task_mutex_);
// 检查任务是否已存在
for (const auto& task : task_queue_) {
if (task->id == task_id) {
std::cout << "[调度器] 任务 " << task_id << " 已存在" << std::endl;
return false;
}
}
// 创建新任务
Point* start_point = graph_map_->getPointById(start_point_id);
Point* end_point = graph_map_->getPointById(end_point_id);
if (!start_point || !end_point) {
std::cout << "[调度器] 错误: 任务 " << task_id << " 的节点不存在" << std::endl;
return false;
}
auto task = std::make_unique<Task>(task_id, priority, start_point, end_point, description);
task_queue_.push_back(std::move(task));
statistics_.total_tasks++;
std::cout << "[调度器] 添加任务 " << task_id << ": " << description << std::endl;
return true;
}
bool AdvancedDispatcher::cancelTask(int task_id) {
std::lock_guard<std::mutex> lock(task_mutex_);
for (auto it = task_queue_.begin(); it != task_queue_.end(); ++it) {
if ((*it)->id == task_id) {
(*it)->status = TaskStatus::FAILED;
std::cout << "[调度器] 取消任务 " << task_id << std::endl;
return true;
}
}
return false;
}
void AdvancedDispatcher::onTaskStarted(int task_id, int agv_id) {
std::lock_guard<std::mutex> lock(task_mutex_);
for (const auto& task : task_queue_) {
if (task->id == task_id) {
task->status = TaskStatus::IN_PROGRESS;
std::cout << "[调度器] 任务 " << task_id << " 开始执行 (AGV-" << agv_id << ")" << std::endl;
break;
}
}
}
void AdvancedDispatcher::onTaskCompleted(int task_id, int agv_id, bool success) {
std::lock_guard<std::mutex> lock(task_mutex_);
for (const auto& task : task_queue_) {
if (task->id == task_id) {
task->status = success ? TaskStatus::COMPLETED : TaskStatus::FAILED;
if (success) {
statistics_.completed_tasks++;
std::cout << "[调度器] 任务 " << task_id << " 完成成功 (AGV-" << agv_id << ")" << std::endl;
} else {
statistics_.failed_tasks++;
std::cout << "[调度器] 任务 " << task_id << " 执行失败 (AGV-" << agv_id << ")" << std::endl;
}
break;
}
}
}
DispatcherStatistics AdvancedDispatcher::getStatistics() {
updateStatistics();
return statistics_;
}
void AdvancedDispatcher::printSystemStatus() const {
std::lock_guard<std::mutex> lock(dispatcher_mutex_);
std::cout << "\n===== 调度器状态 =====" << std::endl;
// 调度器状态
std::string state_str;
switch (state_.load()) {
case DispatcherState::STOPPED: state_str = "已停止"; break;
case DispatcherState::RUNNING: state_str = "运行中"; break;
case DispatcherState::PAUSED: state_str = "已暂停"; break;
}
std::cout << "调度器状态: " << state_str << std::endl;
// 任务统计
std::cout << "任务统计: 总数 " << statistics_.total_tasks
<< ", 完成 " << statistics_.completed_tasks
<< ", 失败 " << statistics_.failed_tasks << std::endl;
// 活跃AGV数量
std::cout << "活跃AGV数量: " << statistics_.active_agvs << std::endl;
// 任务队列状态
{
std::lock_guard<std::mutex> task_lock(task_mutex_);
size_t pending_count = 0;
size_t executing_count = 0;
size_t completed_count = 0;
for (const auto& task : task_queue_) {
switch (task->status) {
case TaskStatus::PENDING: pending_count++; break;
case TaskStatus::IN_PROGRESS: executing_count++; break;
case TaskStatus::COMPLETED: completed_count++; break;
default: break;
}
}
std::cout << "任务队列: 待处理 " << pending_count
<< ", 执行中 " << executing_count
<< ", 已完成 " << completed_count << std::endl;
}
std::cout << "====================" << std::endl;
}
void AdvancedDispatcher::dispatchLoop() {
while (!should_stop_.load()) {
if (state_.load() == DispatcherState::RUNNING) {
performDispatch();
}
std::this_thread::sleep_for(std::chrono::milliseconds(dispatch_interval_ms_));
}
}
void AdvancedDispatcher::performDispatch() {
// 如果有AGV管理器使用其分配功能
if (agv_manager_) {
agv_manager_->assignTasks();
} else {
// 简化的调度逻辑
std::lock_guard<std::mutex> task_lock(task_mutex_);
for (const auto& task : task_queue_) {
if (task->status == TaskStatus::PENDING) {
// 简单地将任务状态设为进行中实际实现中需要分配给AGV
task->status = TaskStatus::IN_PROGRESS;
std::cout << "[调度器] 分配任务 " << task->id << " (简化调度)" << std::endl;
break;
}
}
}
}
bool AdvancedDispatcher::assignTaskToAGV(Task* task, AGV* agv) {
if (!task || !agv || agv->state != AGVState::IDLE) {
return false;
}
// 设置任务分配
agv->current_task = task;
agv->state = AGVState::ASSIGNED;
task->status = TaskStatus::ASSIGNED;
task->assigned_agv = agv;
return true;
}
void AdvancedDispatcher::updateStatistics() {
std::lock_guard<std::mutex> lock(dispatcher_mutex_);
// 更新活跃AGV数量
if (agv_manager_) {
auto all_agvs = agv_manager_->getAllAGVs();
statistics_.active_agvs = 0;
for (AGV* agv : all_agvs) {
if (agv->state != AGVState::IDLE) {
statistics_.active_agvs++;
}
}
}
}