“””by lyuwenyu
RT-DETR Transformer解码器实现
核心功能:实现RT-DETR的目标检测Transformer解码器,包含Encoder和Decoder两部分

  • Encoder: 对Backbone特征进行编码
  • Decoder: 使用可变形注意力机制进行目标检测
    “””

导入Python标准库

import math # 数学运算库,提供数学常量和函数(如pi)
import copy # 深拷贝工具,用于复制对象
from collections import OrderedDict # 有序字典,保持字典的插入顺序

导入PyTorch相关库

import torch # PyTorch主库,提供张量运算和神经网络功能
import torch.nn as nn # 神经网络模块,提供各种层结构
import torch.nn.functional as F # 函数式接口,提供各种操作函数
import torch.nn.init as init # 参数初始化工具

导入项目内部模块

from .denoising import get_contrastive_denoising_training_group # 对比去噪训练组生成函数
from .utils import deformable_attention_core_func, get_activation, inverse_sigmoid # 工具函数
from .utils import bias_init_with_prob # 偏置初始化函数

导入注册器,用于模型注册

from src.core import register

定义模块导出列表

all = [‘RTDETRTransformer’] # 只导出RTDETRTransformer类

=========================================================================

类名: MLP (多层感知机)

类型: nn.Module 子类

代码逻辑链条中的具体职责: 作为基础的前馈神经网络模块,用于 bbox 和 score 的预测。

在整个解码器中,MLP 被多次实例化用于生成边界框坐标和类别分数的预测头

=========================================================================

class MLP(nn.Module):
def init(self, input_dim, hidden_dim, output_dim, num_layers, act=’relu’):

    # 初始化MLP的层结构
    # input_dim: 输入特征维度 [B, input_dim]
    # hidden_dim: 隐藏层维度
    # output_dim: 输出特征维度,通常为4(bbox坐标)或类别数
    # num_layers: 网络层数
    # act: 激活函数类型
    super().__init__()  # 调用父类初始化方法
    self.num_layers = num_layers  # 保存层数到实例属性
    h = [hidden_dim] * (num_layers - 1)  # 创建隐藏层维度列表 [hidden_dim, hidden_dim, ...]
    # 创建线性层列表,使用zip配对输入输出维度
    self.layers = nn.ModuleList(
        nn.Linear(n, k) for n, k in zip([input_dim] + h, h + [output_dim])
    )  # layers: [Linear(input_dim, hidden_dim), Linear(hidden_dim, hidden_dim), ..., Linear(hidden_dim, output_dim)]
    self.act = nn.Identity() if act is None else get_activation(act)  # 根据act参数选择激活函数

def forward(self, x):
    # 前向传播函数
    # x: 输入张量 [B, seq_len, input_dim]
    for i, layer in enumerate(self.layers):  # 遍历每一层
        x = self.act(layer(x)) if i < self.num_layers - 1 else layer(x)
        # 隐藏层:线性层 + 激活函数
        # 输出层:仅线性层(不经过激活)
    return x  # 返回输出张量 [B, seq_len, output_dim]

=========================================================================

类名: MSDeformableAttention (多尺度可变形注意力)

类型: nn.Module 子类

代码逻辑链条中的具体职责: 实现多尺度可变形注意力机制,是RT-DETR的核心创新点。

该模块能够在多个特征尺度上对参考点周围的特征进行采样和聚合,

大幅减少计算量的同时保持对任意形状目标的建模能力。

=========================================================================

class MSDeformableAttention(nn.Module):
def init(self, embed_dim=256, num_heads=8, num_levels=4, num_points=4,):

    # 初始化多尺度可变形注意力模块
    # embed_dim: 嵌入维度(查询/值的特征维度)
    # num_heads: 注意力头数
    # num_levels: 特征金字塔的层数(多尺度)
    # num_points: 每个参考点采样的点数
    super(MSDeformableAttention, self).__init__()  # 调用父类初始化方法
    self.embed_dim = embed_dim  # 保存嵌入维度 [B, query_len, embed_dim]
    self.num_heads = num_heads  # 保存注意力头数
    self.num_levels = num_levels  # 保存特征层数
    self.num_points = num_points  # 保存每层采样点数
    self.total_points = num_heads * num_levels * num_points  # 计算总采样点数

    self.head_dim = embed_dim // num_heads  # 计算每个头的维度
    assert self.head_dim * num_heads == self.embed_dim, "embed_dim must be divisible by num_heads"  # 断言维度整除

    # 定义可变形注意力的核心网络层
    # sampling_offsets: 生成采样偏移量,输出维度为 total_points * 2(x,y偏移)
    self.sampling_offsets = nn.Linear(embed_dim, self.total_points * 2,)  # [B, query_len, total_points*2]
    # attention_weights: 生成注意力权重
    self.attention_weights = nn.Linear(embed_dim, self.total_points)  # [B, query_len, total_points]
    # value_proj: 将value投影到多头空间
    self.value_proj = nn.Linear(embed_dim, embed_dim)  # [B, value_len, embed_dim]
    # output_proj: 输出投影层
    self.output_proj = nn.Linear(embed_dim, embed_dim)  # [B, query_len, embed_dim]

    # 绑定可变形注意力的核心计算函数
    self.ms_deformable_attn_core = deformable_attention_core_func

    self._reset_parameters()  # 调用参数初始化方法

def _reset_parameters(self):
    # 初始化网络参数
    # 初始化sampling_offsets的权重为0
    init.constant_(self.sampling_offsets.weight, 0)  # sampling_offsets.weight: [total_points*2, embed_dim] → 全0
    # 计算注意力头的角度初始化值
    thetas = torch.arange(self.num_heads, dtype=torch.float32) * (2.0 * math.pi / self.num_heads)
    # thetas: [num_heads],表示每个头对应的角度
    # 创建网格初始化的基础向量
    grid_init = torch.stack([thetas.cos(), thetas.sin()], -1)  # [num_heads, 2]
    # 对初始向量进行归一化,使其绝对值最大为1
    grid_init = grid_init / grid_init.abs().max(-1, keepdim=True).values  # [num_heads, 2]
    # 扩展初始化向量到所有层和所有采样点
    grid_init = grid_init.reshape(self.num_heads, 1, 1, 2).tile([1, self.num_levels, self.num_points, 1])
    # [num_heads, num_levels, num_points, 2]
    # 计算每层的缩放因子
    scaling = torch.arange(1, self.num_points + 1, dtype=torch.float32).reshape(1, 1, -1, 1)
    # scaling: [1, 1, num_points, 1],值从1到num_points递增
    grid_init *= scaling  # 应用缩放,使不同采样点有不同的初始偏移幅度
    # 将初始化值赋值给sampling_offsets的偏置
    self.sampling_offsets.bias.data[...] = grid_init.flatten()  # [total_points*2]

    # 初始化attention_weights的权重和偏置为0
    init.constant_(self.attention_weights.weight, 0)  # attention_weights.weight: [total_points, embed_dim]
    init.constant_(self.attention_weights.bias, 0)  # attention_weights.bias: [total_points]

    # 使用Xavier均匀初始化value_proj的权重
    init.xavier_uniform_(self.value_proj.weight)  # value_proj.weight: [embed_dim, embed_dim]
    init.constant_(self.value_proj.bias, 0)  # value_proj.bias: [embed_dim]
    # 使用Xavier均匀初始化output_proj的权重
    init.xavier_uniform_(self.output_proj.weight)  # output_proj.weight: [embed_dim, embed_dim]
    init.constant_(self.output_proj.bias, 0)  # output_proj.bias: [embed_dim]

def forward(self,
            query,
            reference_points,
            value,
            value_spatial_shapes,
            value_mask=None):
    # 多尺度可变形注意力的前向传播
    # query: 查询张量 [bs, query_length, embed_dim]
    # reference_points: 参考点坐标 [bs, query_length, n_levels, 2],值域[0,1]
    # value: 值张量 [bs, value_length, embed_dim]
    # value_spatial_shapes: 各特征层的空间形状列表 [(H_0, W_0), (H_1, W_1), ...]
    # value_mask: 有效区域的掩码 [bs, value_length],True表示有效元素

    # 获取batch大小和查询长度
    bs, Len_q = query.shape[:2]  # bs: batch_size, Len_q: query序列长度
    Len_v = value.shape[1]  # Len_v: value序列长度

    # 对value进行投影
    value = self.value_proj(value)  # [bs, Len_v, embed_dim]
    # 应用有效掩码(如果有)
    if value_mask is not None:
        # 将掩码转换为value的数据类型,并扩展维度
        value_mask = value_mask.astype(value.dtype).unsqueeze(-1)  # [bs, Len_v, 1]
        value *= value_mask  # 将无效位置置0
    # 将value重塑为多头形式
    value = value.reshape(bs, Len_v, self.num_heads, self.head_dim)
    # [bs, Len_v, num_heads, head_dim]

    # 生成采样偏移量
    sampling_offsets = self.sampling_offsets(query).reshape(
        bs, Len_q, self.num_heads, self.num_levels, self.num_points, 2)
    # [bs, Len_q, num_heads, num_levels, num_points, 2]

    # 生成注意力权重
    attention_weights = self.attention_weights(query).reshape(
        bs, Len_q, self.num_heads, self.num_levels * self.num_points)
    # [bs, Len_q, num_heads, total_points]
    # 对注意力权重进行softmax归一化
    attention_weights = F.softmax(attention_weights, dim=-1).reshape(
        bs, Len_q, self.num_heads, self.num_levels, self.num_points)
    # [bs, Len_q, num_heads, num_levels, num_points]

    # 根据参考点的维度计算采样位置
    if reference_points.shape[-1] == 2:
        # 2D坐标归一化参考点
        offset_normalizer = torch.tensor(value_spatial_shapes)  # [n_levels, 2]
        offset_normalizer = offset_normalizer.flip([1]).reshape(
            1, 1, 1, self.num_levels, 1, 2)  # [1, 1, 1, n_levels, 1, 2],翻转[1,2]变为[W,H]
        # 计算最终采样位置:参考点 + 归一化偏移
        sampling_locations = reference_points.reshape(
            bs, Len_q, 1, self.num_levels, 1, 2
        ) + sampling_offsets / offset_normalizer  # [bs, Len_q, n_levels, n_levels, n_points, 2]
    elif reference_points.shape[-1] == 4:
        # bbox形式的参考点(x1, y1, x2, y2)
        sampling_locations = (
            reference_points[:, :, None, :, None, :2] + sampling_offsets /
            self.num_points * reference_points[:, :, None, :, None, 2:] * 0.5)
        # [bs, Len_q, 1, n_levels, n_points, 2]
    else:
        raise ValueError(
            "Last dim of reference_points must be 2 or 4, but get {} instead.".
            format(reference_points.shape[-1]))

    # 调用核心可变形注意力计算函数
    output = self.ms_deformable_attn_core(
        value, value_spatial_shapes, sampling_locations, attention_weights)
    # value: [bs, Len_v, num_heads, head_dim]
    # value_spatial_shapes: 各层空间形状
    # sampling_locations: [bs, Len_q, num_heads, num_levels, num_points, 2]
    # attention_weights: [bs, Len_q, num_heads, num_levels, num_points]
    # output: [bs, Len_q, num_heads, head_dim]

    # 输出投影
    output = self.output_proj(output)  # [bs, Len_q, embed_dim]

    return output  # 返回注意力输出 [bs, query_length, embed_dim]

=========================================================================

类名: TransformerDecoderLayer (Transformer解码器层)

类型: nn.Module 子类

代码逻辑链条中的具体职责: 实现Transformer解码器的基本结构,包含三个子层:

1. 自注意力层(Self-Attention):查询自己的历史输出

2. 交叉注意力层(Cross-Attention):从Encoder记忆中查询目标特征

3. 前馈网络(FFN):进一步特征变换

每一层都包含残差连接和层归一化

=========================================================================

class TransformerDecoderLayer(nn.Module):
def init(self,
d_model=256,
n_head=8,
dim_feedforward=1024,
dropout=0.,
activation=”relu”,
n_levels=4,
n_points=4,):

    # 初始化解码器层
    # d_model: 模型维度(特征维度)
    # n_head: 注意力头数
    # dim_feedforward: 前馈网络的隐藏层维度
    # dropout: Dropout比例
    # activation: 激活函数类型
    # n_levels: 特征金字塔层数
    # n_points: 每层采样点数
    super(TransformerDecoderLayer, self).__init__()  # 调用父类初始化方法

    # 自注意力层(Self-Attention)
    self.self_attn = nn.MultiheadAttention(d_model, n_head, dropout=dropout, batch_first=True)
    # MultiheadAttention: 多头自注意力层,batch_first=True表示输入格式为[B, seq, dim]
    self.dropout1 = nn.Dropout(dropout)  # 自注意力的Dropout层
    self.norm1 = nn.LayerNorm(d_model)  # 自注意力后的层归一化

    # 交叉注意力层(Cross-Attention,可变形注意力)
    self.cross_attn = MSDeformableAttention(d_model, n_head, n_levels, n_points)
    # 可变形注意力,从Encoder记忆中查询
    self.dropout2 = nn.Dropout(dropout)  # 交叉注意力的Dropout层
    self.norm2 = nn.LayerNorm(d_model)  # 交叉注意力后的层归一化

    # 前馈网络(FFN)
    self.linear1 = nn.Linear(d_model, dim_feedforward)  # 第一个线性层 [B, seq, d_model] → [B, seq, dim_feedforward]
    self.activation = getattr(F, activation)  # 获取激活函数
    self.dropout3 = nn.Dropout(dropout)  # FFN第一层后的Dropout
    self.linear2 = nn.Linear(dim_feedforward, d_model)  # 第二个线性层 [B, seq, dim_feedforward] → [B, seq, d_model]
    self.dropout4 = nn.Dropout(dropout)  # FFN第二层后的Dropout
    self.norm3 = nn.LayerNorm(d_model)  # FFN后的层归一化

def with_pos_embed(self, tensor, pos):
    # 将位置编码添加到张量
    # tensor: 原始张量 [B, seq, dim]
    # pos: 位置编码 [B, seq, dim] 或 None
    return tensor if pos is None else tensor + pos  # 无位置编码则返回原张量,否则相加

def forward_ffn(self, tgt):
    # 前馈网络的前向传播
    # tgt: 输入张量 [B, seq, d_model]
    return self.linear2(self.dropout3(self.activation(self.linear1(tgt))))
    # linear1: [B, seq, dim_feedforward]
    # activation: 非线性激活
    # dropout3: 随机失活
    # linear2: [B, seq, d_model]

def forward(self,
            tgt,
            reference_points,
            memory,
            memory_spatial_shapes,
            memory_level_start_index,
            attn_mask=None,
            memory_mask=None,
            query_pos_embed=None):
    # 解码器层的前向传播
    # tgt: 目标序列(解码器输入)[B, tgt_len, d_model]
    # reference_points: 参考点坐标 [B, tgt_len, n_levels, 2] 或 [B, tgt_len, 4]
    # memory: Encoder输出 [B, memory_len, d_model]
    # memory_spatial_shapes: 内存的空间形状列表
    # memory_level_start_index: 各层的起始索引
    # attn_mask: 解码器自注意力的掩码
    # memory_mask: 解码器交叉注意力的掩码
    # query_pos_embed: 查询的位置编码

    # ============ 自注意力层 ============
    q = k = self.with_pos_embed(tgt, query_pos_embed)
    # q, k: [B, tgt_len, d_model],添加位置编码后的查询和键

    # 自注意力计算
    tgt2, _ = self.self_attn(q, k, value=tgt, attn_mask=attn_mask)
    # self_attn: [B, tgt_len, d_model],计算查询与键的注意力并加权值
    tgt = tgt + self.dropout1(tgt2)  # 残差连接 + Dropout
    tgt = self.norm1(tgt)  # 层归一化 [B, tgt_len, d_model]

    # ============ 交叉注意力层 ============
    tgt2 = self.cross_attn(
        self.with_pos_embed(tgt, query_pos_embed),  # 添加位置编码的查询
        reference_points,  # 参考点坐标
        memory,  # Encoder输出作为值
        memory_spatial_shapes,  # 空间形状
        memory_mask)  # 掩码
    # cross_attn: [B, tgt_len, d_model],可变形注意力聚合多尺度特征
    tgt = tgt + self.dropout2(tgt2)  # 残差连接 + Dropout
    tgt = self.norm2(tgt)  # 层归一化 [B, tgt_len, d_model]

    # ============ 前馈网络 ============
    tgt2 = self.forward_ffn(tgt)  # FFN前向传播
    # forward_ffn: [B, tgt_len, d_model]
    tgt = tgt + self.dropout4(tgt2)  # 残差连接 + Dropout
    # 对数值进行钳制,防止数值溢出(fp16训练时尤为重要)
    tgt = self.norm3(tgt.clamp(min=-65504, max=65504))  # [B, tgt_len, d_model]

    return tgt  # 返回更新后的目标序列

=========================================================================

类名: TransformerDecoder (Transformer解码器)

类型: nn.Module 子类

代码逻辑链条中的具体职责: 堆叠多个TransformerDecoderLayer形成完整的解码器。

管理多层解码器的顺序执行,并收集每层的输出用于辅助损失计算。

在RT-DETR中,Decoder负责迭代优化目标检测的边界框和类别预测。

=========================================================================

class TransformerDecoder(nn.Module):
def init(self, hidden_dim, decoder_layer, num_layers, eval_idx=-1):

    # 初始化Transformer解码器
    # hidden_dim: 隐藏层维度
    # decoder_layer: 解码器层的配置
    # num_layers: 解码器层数
    # eval_idx: 评估时使用的层索引,-1表示最后一层
    super(TransformerDecoder, self).__init__()  # 调用父类初始化方法
    # 使用深拷贝创建多个解码器层,确保参数不共享
    self.layers = nn.ModuleList([copy.deepcopy(decoder_layer) for _ in range(num_layers)])
    # layers: 解码器层列表,长度为num_layers
    self.hidden_dim = hidden_dim  # 保存隐藏维度
    self.num_layers = num_layers  # 保存层数
    # 计算评估索引,支持负数索引
    self.eval_idx = eval_idx if eval_idx >= 0 else num_layers + eval_idx

def forward(self,
            tgt,
            ref_points_unact,
            memory,
            memory_spatial_shapes,
            memory_level_start_index,
            bbox_head,
            score_head,
            query_pos_head,
            attn_mask=None,
            memory_mask=None):
    # 解码器的前向传播
    # tgt: 初始查询嵌入 [B, num_queries, hidden_dim]
    # ref_points_unact: 初始参考点(未激活)[B, num_queries, 4]
    # memory: Encoder输出 [B, memory_len, hidden_dim]
    # memory_spatial_shapes: 内存的空间形状
    # memory_level_start_index: 各层的起始索引
    # bbox_head: 边界框预测头列表
    # score_head: 分数预测头列表
    # query_pos_head: 查询位置编码头
    # attn_mask: 自注意力掩码
    # memory_mask: 交叉注意力掩码

    output = tgt  # 初始化输出为查询嵌入
    dec_out_bboxes = []  # 存储每层的边界框预测
    dec_out_logits = []  # 存储每层的类别预测

    # 对参考点进行Sigmoid激活并分离,用于后续bbox计算(避免梯度回传到参考点)
    ref_points_detach = F.sigmoid(ref_points_unact)
    # ref_points_detach: [B, num_queries, 4],值域[0,1]

    for i, layer in enumerate(self.layers):
        # ============ 准备输入 ============
        # 为参考点添加一个维度,用于后续注意力计算
        ref_points_input = ref_points_detach.unsqueeze(2)
        # ref_points_input: [B, num_queries, 1, 4]
        # 生成查询位置编码
        query_pos_embed = query_pos_head(ref_points_detach)
        # query_pos_head: [B, num_queries, 4] → [B, num_queries, hidden_dim]

        # ============ 通过解码器层 ============
        output = layer(output, ref_points_input, memory,
                       memory_spatial_shapes, memory_level_start_index,
                       attn_mask, memory_mask, query_pos_embed)
        # output: [B, num_queries, hidden_dim]

        # ============ bbox 预测 ============
        # 使用bbox_head预测bbox增量
        inter_ref_bbox = F.sigmoid(bbox_head[i](output) + inverse_sigmoid(ref_points_detach))
        # bbox_head[i]: [B, num_queries, hidden_dim] → [B, num_queries, 4]
        # inverse_sigmoid: 将[0,1]的参考点转换回sigmoid逆函数空间
        # + : 增量与参考点相加
        # sigmoid: 激活到[0,1]范围
        # inter_ref_bbox: [B, num_queries, 4]

        # ============ 保存输出 ============
        if self.training:
            # 训练模式:保存所有层的分数预测
            dec_out_logits.append(score_head[i](output))
            # score_head[i]: [B, num_queries, hidden_dim] → [B, num_queries, num_classes]
            # 仅第一层保存bbox(后续层的bbox由参考点累积产生)
            if i == 0:
                dec_out_bboxes.append(inter_ref_bbox)
            else:
                # 其他层使用上一层的参考点
                dec_out_bboxes.append(F.sigmoid(bbox_head[i](output) + inverse_sigmoid(ref_points)))

        elif i == self.eval_idx:
            # 评估模式:只保存指定层的输出
            dec_out_logits.append(score_head[i](output))
            dec_out_bboxes.append(inter_ref_bbox)
            break  # 提前退出循环

        # 更新参考点
        ref_points = inter_ref_bbox  # 用于下一层的计算
        # 根据训练/评估模式决定是否detach梯度
        ref_points_detach = inter_ref_bbox.detach(
        ) if self.training else inter_ref_bbox

    # 堆叠所有层的输出
    return torch.stack(dec_out_bboxes), torch.stack(dec_out_logits)
    # dec_out_bboxes: [num_layers, B, num_queries, 4]
    # dec_out_logits: [num_layers, B, num_queries, num_classes]

=========================================================================

类名: RTDETRTransformer (RT-DETR Transformer)

类型: nn.Module 子类(使用@register装饰器注册到模型库)

代码逻辑链条中的具体职责: RT-DETR的完整Transformer实现,整合了Encoder、Decoder、

特征投影、位置编码、预测头等所有组件。是RT-DETR检测器的核心模块,

负责从Backbone特征生成最终的目标检测结果。

=========================================================================

@register # 注册装饰器,将此类注册到模型库中
class RTDETRTransformer(nn.Module):
share = [‘num_classes’] # 共享参数配置,num_classes在所有子模块间共享

def __init__(self,
             num_classes=80,
             hidden_dim=256,
             num_queries=300,
             position_embed_type='sine',
             feat_channels=[512, 1024, 2048],
             feat_strides=[8, 16, 32],
             num_levels=3,
             num_decoder_points=4,
             nhead=8,
             num_decoder_layers=6,
             dim_feedforward=1024,
             dropout=0.,
             activation="relu",
             num_denoising=100,
             label_noise_ratio=0.5,
             box_noise_scale=1.0,
             learnt_init_query=False,
             eval_spatial_size=None,
             eval_idx=-1,
             eps=1e-2,
             aux_loss=True):
    # 初始化RT-DETR Transformer
    # num_classes: 目标类别数
    # hidden_dim: 隐藏层维度(所有层的统一维度)
    # num_queries: 查询向量数量(等于最大检测目标数)
    # position_embed_type: 位置编码类型 ('sine' 或 'learned')
    # feat_channels: Backbone特征通道列表 [C3, C4, C5]
    # feat_strides: Backbone特征步长列表 [8, 16, 32]
    # num_levels: 特征金字塔层数
    # num_decoder_points: 解码器每层采样点数
    # nhead: 注意力头数
    # num_decoder_layers: 解码器层数
    # dim_feedforward: 前馈网络维度
    # dropout: Dropout比例
    # activation: 激活函数类型
    # num_denoising: 去噪查询数量(用于DN-DETR训练)
    # label_noise_ratio: 标签噪声比例
    # box_noise_scale: 边界框噪声规模
    # learnt_init_query: 是否使用可学习的初始查询
    # eval_spatial_size: 评估时的空间尺寸
    # eval_idx: 评估时使用的解码器层索引
    # eps: 数值稳定性参数
    # aux_loss: 是否启用辅助损失

    super(RTDETRTransformer, self).__init__()  # 调用父类初始化方法

    # ============ 参数断言 ============
    # 检查位置编码类型是否支持
    assert position_embed_type in ['sine', 'learned'], \
        f'ValueError: position_embed_type not supported {position_embed_type}!'
    # 检查特征通道数是否不超过层级数
    assert len(feat_channels) <= num_levels
    # 检查特征通道和特征步长长度是否一致
    assert len(feat_strides) == len(feat_channels)
    # 如果层级数大于特征通道数,补充特征步长
    for _ in range(num_levels - len(feat_strides)):
        feat_strides.append(feat_strides[-1] * 2)  # 步长翻倍

    # ============ 保存配置参数 ============
    self.hidden_dim = hidden_dim  # 隐藏维度
    self.nhead = nhead  # 注意力头数
    self.feat_strides = feat_strides  # 特征步长列表
    self.num_levels = num_levels  # 特征层数
    self.num_classes = num_classes  # 类别数
    self.num_queries = num_queries  # 查询数
    self.eps = eps  # 数值稳定性参数
    self.num_decoder_layers = num_decoder_layers  # 解码器层数
    self.eval_spatial_size = eval_spatial_size  # 评估空间尺寸
    self.aux_loss = aux_loss  # 辅助损失开关

    # ============ 构建输入投影层 ============
    self._build_input_proj_layer(feat_channels)  # 构建Backbone到Transformer的特征投影

    # ============ 构建Transformer模块 ============
    # 创建解码器层配置
    decoder_layer = TransformerDecoderLayer(
        hidden_dim, nhead, dim_feedforward, dropout, activation, num_levels, num_decoder_points)
    # 创建完整的解码器
    self.decoder = TransformerDecoder(hidden_dim, decoder_layer, num_decoder_layers, eval_idx)

    # ============ 去噪配置 ============
    self.num_denoising = num_denoising  # 去噪查询数量
    self.label_noise_ratio = label_noise_ratio  # 标签噪声比例
    self.box_noise_scale = box_noise_scale  # 边界框噪声规模

    # ============ 去噪模块 ============
    if num_denoising > 0:
        # 创建去噪类别嵌入(+1是为了额外的padding类别)
        self.denoising_class_embed = nn.Embedding(num_classes+1, hidden_dim, padding_idx=num_classes)

    # ============ 解码器嵌入配置 ============
    self.learnt_init_query = learnt_init_query  # 是否学习初始查询
    if learnt_init_query:
        # 使用可学习的初始查询嵌入
        self.tgt_embed = nn.Embedding(num_queries, hidden_dim)  # [num_queries, hidden_dim]
    # 查询位置编码头:将bbox坐标转换为位置编码
    self.query_pos_head = MLP(4, 2 * hidden_dim, hidden_dim, num_layers=2)

    # ============ Encoder输出头 ============
    self.enc_output = nn.Sequential(
        # Encoder输出的后处理层
        nn.Linear(hidden_dim, hidden_dim),  # [B, seq, hidden_dim]
        nn.LayerNorm(hidden_dim,)  # 层归一化
    )
    self.enc_score_head = nn.Linear(hidden_dim, num_classes)  # Encoder分数预测头 [B, seq, num_classes]
    self.enc_bbox_head = MLP(hidden_dim, hidden_dim, 4, num_layers=3)  # Encoder bbox预测头

    # ============ Decoder预测头 ============
    # Decoder分数预测头(每层一个)
    self.dec_score_head = nn.ModuleList([
        nn.Linear(hidden_dim, num_classes)
        for _ in range(num_decoder_layers)
    ])
    # Decoder bbox预测头(每层一个)
    self.dec_bbox_head = nn.ModuleList([
        MLP(hidden_dim, hidden_dim, 4, num_layers=3)
        for _ in range(num_decoder_layers)
    ])

    # ============ 预计算评估锚点 ============
    if self.eval_spatial_size:
        # 如果指定了评估空间尺寸,预先生成锚点和有效掩码
        self.anchors, self.valid_mask = self._generate_anchors()

    self._reset_parameters()  # 初始化所有参数

def _reset_parameters(self):
    # 初始化所有可学习参数的权重
    bias = bias_init_with_prob(0.01)  # 计算偏置初始化值(基于先验概率)

    # ============ Encoder输出头初始化 ============
    init.constant_(self.enc_score_head.bias, bias)  # Encoder分数头的偏置
    init.constant_(self.enc_bbox_head.layers[-1].weight, 0)  # Encoder bbox头的最后一层权重
    init.constant_(self.enc_bbox_head.layers[-1].bias, 0)  # Encoder bbox头的最后一层偏置

    # ============ Decoder预测头初始化 ============
    for cls_, reg_ in zip(self.dec_score_head, self.dec_bbox_head):
        # 遍历所有Decoder层
        init.constant_(cls_.bias, bias)  # 分数头偏置
        init.constant_(reg_.layers[-1].weight, 0)  # bbox头最后一层权重
        init.constant_(reg_.layers[-1].bias, 0)  # bbox头最后一层偏置

    # ============ 其他参数初始化 ============
    # 使用Xavier均匀初始化
    init.xavier_uniform_(self.enc_output[0].weight)  # Encoder输出层的线性权重
    if self.learnt_init_query:
        init.xavier_uniform_(self.tgt_embed.weight)  # 可学习查询嵌入
    # 初始化查询位置编码头
    init.xavier_uniform_(self.query_pos_head.layers[0].weight)
    init.xavier_uniform_(self.query_pos_head.layers[1].weight)

def _build_input_proj_layer(self, feat_channels):
    # 构建输入投影层:将Backbone特征投影到Transformer维度
    self.input_proj = nn.ModuleList()  # 创建模块列表
    for in_channels in feat_channels:
        # 为每个Backbone输出创建一个投影层
        self.input_proj.append(
            nn.Sequential(OrderedDict([
                # 1x1卷积进行通道变换
                ('conv', nn.Conv2d(in_channels, self.hidden_dim, 1, bias=False)),
                # 批量归一化
                ('norm', nn.BatchNorm2d(self.hidden_dim,))])
            )
        )
        # 输入: [B, in_channels, H, W]
        # 输出: [B, hidden_dim, H, W]

    # 如果num_levels大于特征通道数,需要额外添加投影层
    in_channels = feat_channels[-1]  # 使用最后一个Backbone特征的通道数

    for _ in range(self.num_levels - len(feat_channels)):
        # 添加额外的层级投影(使用3x3卷积+下采样)
        self.input_proj.append(
            nn.Sequential(OrderedDict([
                # 3x3卷积,步长2,实现下采样
                ('conv', nn.Conv2d(in_channels, self.hidden_dim, 3, 2, padding=1, bias=False)),
                ('norm', nn.BatchNorm2d(self.hidden_dim))])
            )
        )
        # 输入: [B, hidden_dim, H, W]
        # 输出: [B, hidden_dim, H/2, W/2]
        in_channels = self.hidden_dim  # 更新输入通道数

def _get_encoder_input(self, feats):
    # 将Backbone特征转换为Encoder输入格式
    # feats: Backbone输出的特征列表 [P3, P4, P5]

    # ============ 特征投影 ============
    # 对所有特征进行通道投影
    proj_feats = [self.input_proj[i](feat) for i, feat in enumerate(feats)]
    # proj_feats: 投影后的特征列表,每个元素 [B, hidden_dim, H_i, W_i]

    # 如果层级数大于投影后的特征数,需要额外处理
    if self.num_levels > len(proj_feats):
        len_srcs = len(proj_feats)  # 当前投影特征数量
        for i in range(len_srcs, self.num_levels):
            # 对额外层级进行投影
            if i == len_srcs:
                # 第一个额外层级:对原始特征进行投影
                proj_feats.append(self.input_proj[i](feats[-1]))
            else:
                # 后续额外层级:对上一投影特征进行下采样
                proj_feats.append(self.input_proj[i](proj_feats[-1]))

    # ============ 展平并拼接特征 ============
    feat_flatten = []  # 存储展平后的特征
    spatial_shapes = []  # 存储各层的空间形状
    level_start_index = [0, ]  # 各层的起始索引

    for i, feat in enumerate(proj_feats):
        # 获取当前特征的空间尺寸
        _, _, h, w = feat.shape  # feat: [B, hidden_dim, h, w]
        # 展平特征并转换维度顺序
        feat_flatten.append(feat.flatten(2).permute(0, 2, 1))
        # flatten(2): [B, hidden_dim, h*w]
        # permute(0,2,1): [B, h*w, hidden_dim]
        # 保存空间形状
        spatial_shapes.append([h, w])  # [(H_0, W_0), (H_1, W_1), ...]
        # 计算并保存层起始索引
        level_start_index.append(h * w + level_start_index[-1])
        # level_start_index: [0, H_0*W_0, H_0*W_0+H_1*W_1, ...]

    # 拼接所有层的特征
    feat_flatten = torch.concat(feat_flatten, 1)  # [B, H_0*W_0+H_1*W_1+..., hidden_dim]
    level_start_index.pop()  # 移除最后一个哨兵值

    return (feat_flatten, spatial_shapes, level_start_index)
    # feat_flatten: [B, total_len, hidden_dim],所有层级特征拼接
    # spatial_shapes: [num_levels, 2],各层空间形状
    # level_start_index: [num_levels],各层起始索引

def _generate_anchors(self,
                      spatial_shapes=None,
                      grid_size=0.05,
                      dtype=torch.float32,
                      device='cpu'):
    # 生成Encoder输出的参考锚点
    # spatial_shapes: 各特征层的空间形状
    # grid_size: 网格大小(作为初始WH的参考)
    # dtype: 数据类型
    # device: 设备

    if spatial_shapes is None:
        # 如果未指定空间形状,根据评估尺寸和步长计算
        spatial_shapes = [[int(self.eval_spatial_size[0] / s), int(self.eval_spatial_size[1] / s)]
            for s in self.feat_strides
        ]
        # spatial_shapes: [[H_0, W_0], [H_1, W_1], ...]

    anchors = []  # 存储各层的锚点
    for lvl, (h, w) in enumerate(spatial_shapes):
        # 为每一层生成锚点
        # 创建网格坐标
        grid_y, grid_x = torch.meshgrid(\
            torch.arange(end=h, dtype=dtype), \
            torch.arange(end=w, dtype=dtype), indexing='ij')
        # grid_y, grid_x: [H, W]
        # 合并x,y坐标
        grid_xy = torch.stack([grid_x, grid_y], -1)  # [H, W, 2]
        # 有效宽高
        valid_WH = torch.tensor([w, h]).to(dtype)  # [2]
        # 归一化中心点坐标到[0,1]
        grid_xy = (grid_xy.unsqueeze(0) + 0.5) / valid_WH
        # grid_xy: [1, H, W, 2],值域(0.5/W, 1-0.5/W)

        # 生成锚点宽高(与层级相关,层级越高WH越大)
        wh = torch.ones_like(grid_xy) * grid_size * (2.0 ** lvl)  # [1, H, W, 2]
        # 拼接中心点和宽高
        anchors.append(torch.concat([grid_xy, wh], -1).reshape(-1, h * w, 4))
        # anchors[-1]: [1, H*W, 4] → [1, h*w, 4]

    # 拼接所有层的锚点
    anchors = torch.concat(anchors, 1).to(device)  # [1, total_anchors, 4]
    # total_anchors = H_0*W_0 + H_1*W_1 + ...

    # 生成有效掩码(锚点坐标在(eps, 1-eps)范围内)
    valid_mask = ((anchors > self.eps) * (anchors < 1 - self.eps)).all(-1, keepdim=True)
    # valid_mask: [1, total_anchors, 1],True表示有效锚点

    # 对锚点进行对数变换(将[0,1]映射到实数空间)
    anchors = torch.log(anchors / (1 - anchors))
    # anchors: [1, total_anchors, 4]

    # 将无效锚点设置为无穷大
    anchors = torch.where(valid_mask, anchors, torch.inf)

    return anchors, valid_mask
    # anchors: [1, total_anchors, 4],对数空间的锚点坐标
    # valid_mask: [1, total_anchors, 1],有效锚点掩码

def _get_decoder_input(self,
                       memory,
                       spatial_shapes,
                       denoising_class=None,
                       denoising_bbox_unact=None):
    # 准备Decoder的输入
    # memory: Encoder输出 [B, memory_len, hidden_dim]
    # spatial_shapes: 空间形状列表
    # denoising_class: 去噪类别嵌入
    # denoising_bbox_unact: 去噪边界框(未激活)

    bs, _, _ = memory.shape  # 获取batch大小

    # ============ 获取锚点 ============
    if self.training or self.eval_spatial_size is None:
        # 训练模式或未指定评估尺寸:动态生成锚点
        anchors, valid_mask = self._generate_anchors(spatial_shapes, device=memory.device)
    else:
        # 评估模式:使用预计算的锚点
        anchors, valid_mask = self.anchors.to(memory.device), self.valid_mask.to(memory.device)

    # ============ 应用有效掩码 ============
    memory = valid_mask.to(memory.dtype) * memory  # 将无效位置置0

    # ============ Encoder输出后处理 ============
    output_memory = self.enc_output(memory)  # [B, memory_len, hidden_dim]

    # Encoder预测
    enc_outputs_class = self.enc_score_head(output_memory)  # [B, memory_len, num_classes]
    enc_outputs_coord_unact = self.enc_bbox_head(output_memory) + anchors  # [B, memory_len, 4]

    # ============ 选择Top-K查询 ============
    # 选择分数最高的Top-K个查询
    _, topk_ind = torch.topk(enc_outputs_class.max(-1).values, self.num_queries, dim=1)
    # enc_outputs_class.max(-1).values: [B, memory_len],取每个位置的最高分数
    # topk_ind: [B, num_queries],Top-K索引

    # 收集Top-K位置的坐标
    reference_points_unact = enc_outputs_coord_unact.gather(
        dim=1, index=topk_ind.unsqueeze(-1).repeat(1, 1, enc_outputs_coord_unact.shape[-1]))
    # topk_ind.unsqueeze(-1): [B, num_queries, 1]
    # repeat: [B, num_queries, 4]
    # gather: [B, num_queries, 4]

    # Sigmoid激活得到归一化坐标
    enc_topk_bboxes = F.sigmoid(reference_points_unact)  # [B, num_queries, 4]

    # ============ 添加去噪查询 ============
    if denoising_bbox_unact is not None:
        # 如果有去噪查询,将其添加到参考点
        reference_points_unact = torch.concat(
            [denoising_bbox_unact, reference_points_unact], 1)
        # [B, num_denoising + num_queries, 4]

    # 收集Top-K位置的类别分数
    enc_topk_logits = enc_outputs_class.gather(
        dim=1, index=topk_ind.unsqueeze(-1).repeat(1, 1, enc_outputs_class.shape[-1]))
    # enc_topk_logits: [B, num_queries, num_classes]

    # ============ 准备Decoder目标嵌入 ============
    if self.learnt_init_query:
        # 使用可学习的初始查询
        target = self.tgt_embed.weight.unsqueeze(0).tile([bs, 1, 1])
        # tgt_embed.weight: [num_queries, hidden_dim]
        # target: [B, num_queries, hidden_dim]
    else:
        # 从Encoder输出中收集查询特征
        target = output_memory.gather(
            dim=1, index=topk_ind.unsqueeze(-1).repeat(1, 1, output_memory.shape[-1]))
        # target: [B, num_queries, hidden_dim]
        target = target.detach()  # 分离梯度

    # 添加去噪类别嵌入
    if denoising_class is not None:
        target = torch.concat([denoising_class, target], 1)
        # [B, num_denoising + num_queries, hidden_dim]

    return target, reference_points_unact.detach(), enc_topk_bboxes, enc_topk_logits
    # target: Decoder输入查询嵌入
    # reference_points_unact.detach(): 分离梯度后的参考点
    # enc_topk_bboxes: Encoder预测的Top-K bbox
    # enc_topk_logits: Encoder预测的Top-K logits

def forward(self, feats, targets=None):
    # 完整的前向传播
    # feats: Backbone特征列表 [P3, P4, P5]
    # targets: 训练时的目标标签(字典列表)

    # ============ Encoder输入准备 ============
    (memory, spatial_shapes, level_start_index) = self._get_encoder_input(feats)
    # memory: [B, total_len, hidden_dim]
    # spatial_shapes: [num_levels, 2]
    # level_start_index: [num_levels]

    # ============ 去噪训练准备 ============
    if self.training and self.num_denoising > 0:
        # 训练模式且启用去噪:生成去噪查询
        denoising_class, denoising_bbox_unact, attn_mask, dn_meta = \
            get_contrastive_denoising_training_group(targets, \
                self.num_classes,  # 类别数
                self.num_queries,  # 查询数
                self.denoising_class_embed,  # 类别嵌入
                num_denoising=self.num_denoising,  # 去噪查询数量
                label_noise_ratio=self.label_noise_ratio,  # 标签噪声比例
                box_noise_scale=self.box_noise_scale,  # bbox噪声规模
            )
    else:
        # 评估模式或无去噪:设置为None
        denoising_class, denoising_bbox_unact, attn_mask, dn_meta = None, None, None, None

    # ============ Decoder输入准备 ============
    target, init_ref_points_unact, enc_topk_bboxes, enc_topk_logits = \
        self._get_decoder_input(memory, spatial_shapes, denoising_class, denoising_bbox_unact)
    # target: [B, num_queries, hidden_dim] 或 [B, num_denoising+num_queries, hidden_dim]
    # init_ref_points_unact: [B, num_queries, 4] 或 [B, num_denoising+num_queries, 4]

    # ============ Decoder前向传播 ============
    out_bboxes, out_logits = self.decoder(
        target,  # 查询嵌入
        init_ref_points_unact,  # 初始参考点
        memory,  # Encoder输出
        spatial_shapes,  # 空间形状
        level_start_index,  # 层起始索引
        self.dec_bbox_head,  # bbox预测头
        self.dec_score_head,  # 分数预测头
        self.query_pos_head,  # 查询位置编码头
        attn_mask=attn_mask)  # 注意力掩码
    # out_bboxes: [num_layers, B, num_queries, 4]
    # out_logits: [num_layers, B, num_queries, num_classes]

    # ============ 分离去噪输出 ============
    if self.training and dn_meta is not None:
        # 如果有去噪输出,分离主输出和去噪输出
        dn_out_bboxes, out_bboxes = torch.split(
            out_bboxes, dn_meta['dn_num_split'], dim=2)
        # dn_num_split: [dn_num, query_num]
        dn_out_logits, out_logits = torch.split(
            out_logits, dn_meta['dn_num_split'], dim=2)

    # ============ 构建输出字典 ============
    out = {'pred_logits': out_logits[-1], 'pred_boxes': out_bboxes[-1]}
    # 只返回最后一层的预测结果
    # pred_logits: [B, num_queries, num_classes]
    # pred_boxes: [B, num_queries, 4]

    # ============ 添加辅助损失 ============
    if self.training and self.aux_loss:
        # 设置所有Decoder层的辅助输出
        out['aux_outputs'] = self._set_aux_loss(out_logits[:-1], out_bboxes[:-1])
        # 包含Encoder的输出
        out['aux_outputs'].extend(self._set_aux_loss([enc_topk_logits], [enc_topk_bboxes]))

        # 如果有去噪输出,添加去噪辅助输出
        if self.training and dn_meta is not None:
            out['dn_aux_outputs'] = self._set_aux_loss(dn_out_logits, dn_out_bboxes)
            out['dn_meta'] = dn_meta

    return out
    # 返回预测结果字典,包含主输出和辅助输出

@torch.jit.unused
def _set_aux_loss(self, outputs_class, outputs_coord):
    # 为辅助损失设置输出格式
    # outputs_class: 类别预测列表
    # outputs_coord: 坐标预测列表
    # this is a workaround to make torchscript happy, as torchscript
    # doesn't support dictionary with non-homogeneous values, such
    # as a dict having both a Tensor and a list.
    return [{'pred_logits': a, 'pred_boxes': b}
            for a, b in zip(outputs_class, outputs_coord)]
    # 返回字典列表,每个字典包含一个layer的输出
    # [{'pred_logits': tensor, 'pred_boxes': tensor}, ...]