| from importlib import import_module |
| from typing import Tuple |
|
|
| import torch |
| import transformers |
| from torch import nn |
| from torch.nn import functional as F |
|
|
| __all__ = ["patch"] |
|
|
|
|
| def _get_unpad_data(attention_mask: torch.Tensor, *args, **kwargs) -> Tuple[torch.Tensor, torch.Tensor, int]: |
| if hasattr(_get_unpad_data, "seqlens_in_batch"): |
| seqlens_in_batch = _get_unpad_data.seqlens_in_batch |
| else: |
| seqlens_in_batch = torch.sum(attention_mask, dim=1) |
|
|
| indices = torch.nonzero(attention_mask.flatten(), as_tuple=False).flatten() |
| max_seqlen_in_batch = seqlens_in_batch.max().item() |
| cu_seqlens = F.pad(torch.cumsum(seqlens_in_batch, dim=0, dtype=torch.int32), (1, 0)) |
| return indices, cu_seqlens, max_seqlen_in_batch |
|
|
|
|
| def set_seqlens_in_batch(seqlens_in_batch: torch.Tensor) -> None: |
| _get_unpad_data.seqlens_in_batch = seqlens_in_batch |
|
|
|
|
| def patch(model: nn.Module) -> None: |
| if transformers.__version__ < "4.43.0": |
| m = import_module(model.__module__) |
| if not hasattr(m, "_get_unpad_data"): |
| raise ValueError(f"Module {m} does not have function '_get_unpad_data' for packing") |
| m._get_unpad_data = _get_unpad_data |
| else: |
| transformers.modeling_flash_attention_utils._get_unpad_data = _get_unpad_data |
|
|