Skip to content

fix xpu gather for unified ckpt #8710

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions paddlenlp/trainer/plugins/unified_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
get_checkpoint_shard_files,
is_safetensors_available,
)
from paddlenlp.utils.distributed import distributed_gather
from paddlenlp.utils.distributed import distributed_allgather, distributed_gather
from paddlenlp.utils.env import (
LORA_WEIGHTS_NAME,
PADDLE_MASTER_WEIGHTS_INDEX_NAME,
Expand All @@ -64,6 +64,7 @@
)
from paddlenlp.utils.log import logger
from paddlenlp.utils.nested import nested_copy, nested_copy_place
from paddlenlp.utils.tools import get_env_device

if is_safetensors_available():
# from safetensors import safe_open
Expand Down Expand Up @@ -1753,7 +1754,10 @@
key = filter_keys[i]
tensor = state_dict[key]
if key in tp_actions:
ret = distributed_gather(tensor, dst=j, group=tp_group, offload=False)
if get_env_device() == "xpu":
ret = distributed_allgather(tensor, group=tp_group, offload=False)

Check warning on line 1758 in paddlenlp/trainer/plugins/unified_checkpoint.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/trainer/plugins/unified_checkpoint.py#L1757-L1758

Added lines #L1757 - L1758 were not covered by tests
else:
ret = distributed_gather(tensor, dst=j, group=tp_group, offload=False)

Check warning on line 1760 in paddlenlp/trainer/plugins/unified_checkpoint.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/trainer/plugins/unified_checkpoint.py#L1760

Added line #L1760 was not covered by tests
action = tp_actions.pop(key)
tensor = action(ret) if is_dst else None
else:
Expand Down Expand Up @@ -1790,7 +1794,10 @@
if tensor.numel().item() == 1:
tensor = tensor._copy_to(DEST_PLACE, False) if is_dst else None # Need broadcast when loaded
else:
ret = distributed_gather(tensor, dst=j, group=tp_group, offload=False)
if get_env_device() == "xpu":
ret = distributed_allgather(tensor, group=tp_group, offload=False)

Check warning on line 1798 in paddlenlp/trainer/plugins/unified_checkpoint.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/trainer/plugins/unified_checkpoint.py#L1797-L1798

Added lines #L1797 - L1798 were not covered by tests
else:
ret = distributed_gather(tensor, dst=j, group=tp_group, offload=False)

Check warning on line 1800 in paddlenlp/trainer/plugins/unified_checkpoint.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/trainer/plugins/unified_checkpoint.py#L1800

Added line #L1800 was not covered by tests
action = tp_actions[model_key]
tensor = action(ret) if is_dst else None
else:
Expand Down
Loading