diff --git a/CMakeLists.txt b/CMakeLists.txt index d10890e77dc91e677c601735dd7c1e314e50106d..94444027fd15bd1b5b4f3070dd3cebb9771b80ff 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -138,6 +138,7 @@ include_directories(${PROJECT_SOURCE_DIR}) include_directories(${PROJECT_SOURCE_DIR}/torch_npu/csrc/aten) include_directories(${PROJECT_SOURCE_DIR}/third_party/hccl/inc) include_directories(${PROJECT_SOURCE_DIR}/third_party/acl/inc) +include_directories(${PROJECT_SOURCE_DIR}/patch/include) # Set installed PyTorch dir if(DEFINED PYTORCH_INSTALL_DIR) diff --git a/patch/include/c10d/comm.hpp b/patch/include/c10d/comm.hpp new file mode 100644 index 0000000000000000000000000000000000000000..3a39baccc9532569c6833a5fc927a7135453c606 --- /dev/null +++ b/patch/include/c10d/comm.hpp @@ -0,0 +1,125 @@ +#pragma once + +#include +#include +#include + +namespace c10d { + +// Broadcast many tensors to all processes in the process group. +void broadcast_coalesced( + c10::intrusive_ptr process_group, + at::TensorList tensors, + size_t buffer_size, + int rank = 0); + +// This class passes bucket contents tensor (for multiple replicas) to +// DDP communication hook. +// Optionally in the future this can be enhanced with parameter to bucket +// mappings as well. +class GradBucket { + public: + explicit GradBucket( + size_t index, + const std::vector& tensors, + const std::vector& offsets = {}, + const std::vector& lengths = {}, + const std::vector& sizes_vec = {}) + : index_(index), + tensors_(tensors), + offsets_(offsets), + lengths_(lengths), + sizes_vec_(sizes_vec) {} + + // Returns the index of the bucket, which is unique across all the buckets. + size_t getIndex() const { + return index_; + } + + // Each tensor in the list that getTensors returns refers to the replica on + // each device. There will be multiple replicas only in the case of single + // process multiple device mode. In the single process single device mode, + // this list would consist of only a single tensor. + const std::vector& getTensors() const { + return tensors_; + } + + // Returns a mutable tensor vector compared with the above method. + std::vector& getTensorsRef() { + return tensors_; + } + + // Returns the start index of each variable in tensors_[0]. + const std::vector& getOffsets() const { + return offsets_; + } + + // Returns the total (i.e., flattened) length of each variable in + // tensors_[0]. + const std::vector& getLengths() const { + return lengths_; + } + + // Returns the multi-dimensional sizes/shape of each variable in tensors_[0]. + const std::vector& getSizesVec() const { + return sizes_vec_; + } + + private: + size_t index_; + std::vector tensors_; + + // Per-variable info in tensors_[0]. + std::vector offsets_; + std::vector lengths_; + std::vector sizes_vec_; +}; + +// Base class of both `PythonCommHook` and `CppCommHook`. +// Requires implementing 1) `runHook` method that communicates gradients +// asynchronously, and 2) `parseHookResult` method that converts the hook +// result into a tensor vector. +class TORCH_PYTHON_API CommHookInterface { + public: + virtual ~CommHookInterface() {} + + // Passes the input grad bucket to the registered communication hook. + // Once the tensors in the bucket are ready, kicks off the hook asynchronously + // and returns a future that holds the communication results. + virtual c10::intrusive_ptr runHook( + GradBucket& bucket) = 0; + + // Returns the resulting tensors once the communication hook result is + // ready. The resulting tensors will then be copied to the grads of + // individual parameters. + virtual std::vector parseHookResult( + const c10::IValue& result) = 0; +}; + +// This CppCommHook interface only requires implementing runHook method that +// potentially uses a state. +// Still need TORCH_PYTHON_API instead of TORCH_API to support Windows platform. +template +class TORCH_PYTHON_API CppCommHookInterface : public CommHookInterface { + public: + explicit CppCommHookInterface(T& state) : state_(state) {} + + virtual ~CppCommHookInterface() {} + + std::vector parseHookResult(const c10::IValue& result) override { + TORCH_INTERNAL_ASSERT( + result.isTensor() || result.isTensorList(), + "expected the hook result is either a Tensor or a TensorList"); + + if (result.isTensor()) { + return {result.toTensor()}; + } + + return result.toTensorVector(); + } + + protected: + T state_; // Not owned. +}; + +} // namespace c10d diff --git a/patch/include/c10d/default_comm_hooks.hpp b/patch/include/c10d/default_comm_hooks.hpp new file mode 100644 index 0000000000000000000000000000000000000000..077d29bd977de9563e6824497bb20fd89374f187 --- /dev/null +++ b/patch/include/c10d/default_comm_hooks.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include +#include + +namespace c10d { + +enum class BuiltinCommHookType { + ALLREDUCE = 1, + FP16_COMPRESS = 2, +}; + +class AllReduceCommHook : public CppCommHookInterface { + public: + explicit AllReduceCommHook(ProcessGroup* state) + : CppCommHookInterface(state) {} + + ~AllReduceCommHook() override {} + + c10::intrusive_ptr runHook(GradBucket& bucket) override; +}; + +class FP16CompressCommHook : public CppCommHookInterface { + public: + explicit FP16CompressCommHook(ProcessGroup* state) + : CppCommHookInterface(state) {} + + ~FP16CompressCommHook() override {} + + c10::intrusive_ptr runHook(GradBucket& bucket) override; +}; + +} // namespace c10d diff --git a/patch/include/c10d/frontend.hpp b/patch/include/c10d/frontend.hpp new file mode 100644 index 0000000000000000000000000000000000000000..d91ac3cb674eaccd448d7ba3f9cac2f0e4d59755 --- /dev/null +++ b/patch/include/c10d/frontend.hpp @@ -0,0 +1,261 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace c10d { + +#ifdef USE_C10D_GLOO +constexpr char* GLOO_SOCKET_IFNAME_ENV = "GLOO_SOCKET_IFNAME"; +#endif + +inline std::vector split( + char separator, + const std::string& string) { + std::vector pieces; + std::stringstream ss(string); + std::string item; + while (std::getline(ss, item, separator)) { + pieces.push_back(std::move(item)); + } + return pieces; +} + +class Backend { + public: + // Maps to Backend.__new__ in Python. + static std::string get(const std::string&); + + // TODO: How to support registering third_party backend? + static void registerBackend(); + + private: + // TODO: Should this be an enum list instead since this set doesn't + // change at all. + std::unordered_set registered_backends_; +}; + +class TORCH_PYTHON_API DistributedC10d : public torch::CustomClassHolder { + public: + static c10::intrusive_ptr get(); + + DistributedC10d() = default; + + void initProcessGroup( + const std::string& backend, + const std::string& init_method, + const std::chrono::milliseconds& timeout, + int64_t world_size, + int64_t rank, + c10::intrusive_ptr store, + const std::string& group_name); + + void destroyProcessGroup(c10::intrusive_ptr group); + int64_t getRank(const c10::intrusive_ptr& group) const; + int64_t getWorldSize(const c10::intrusive_ptr& group) const; + + c10::intrusive_ptr isend( + at::Tensor tensor, + int64_t dst, + const c10::intrusive_ptr& group, + c10::optional& tag); + + c10::intrusive_ptr irecv( + at::Tensor tensor, + int64_t src, + const c10::intrusive_ptr& group, + c10::optional& tag); + + void send( + at::Tensor tensor, + int64_t dst, + const c10::intrusive_ptr& group, + c10::optional& tag); + + int64_t recv( + at::Tensor tensor, + const c10::optional& src, + const c10::intrusive_ptr& group, + c10::optional& tag); + + c10::intrusive_ptr broadcastMultiGPU( + std::vector& tensor_list, + int64_t src, + const c10::intrusive_ptr& group, + bool async_op = false, + int64_t src_tensor = 0); + + c10::intrusive_ptr broadcast( + at::Tensor tensor, + int64_t src, + const c10::intrusive_ptr& group, + bool async_op = false); + + c10::intrusive_ptr allReduceMultiGPU( + std::vector& tensor_list, + const c10::intrusive_ptr& group, + ReduceOp op = ReduceOp::SUM, + bool async_op = false); + + c10::intrusive_ptr allReduce( + at::Tensor tensor, + const c10::intrusive_ptr& group, + ReduceOp op = ReduceOp::SUM, + bool async_op = false); + + c10::intrusive_ptr allReduceCoalesced( + std::vector& tensors, + const c10::intrusive_ptr& group, + ReduceOp op = ReduceOp::SUM, + bool async_op = false); + + c10::intrusive_ptr reduceMultiGPU( + std::vector& tensor_list, + int64_t dst, + const c10::intrusive_ptr& group, + ReduceOp op = ReduceOp::SUM, + bool async_op = false, + int64_t dst_tensor = 0); + + c10::intrusive_ptr reduce( + at::Tensor tensor, + int64_t dst, + const c10::intrusive_ptr& group, + ReduceOp op = ReduceOp::SUM, + bool async_op = false); + + c10::intrusive_ptr allGatherMultiGPU( + std::vector>& output_tensor_lists, + std::vector& input_tensor_list, + const c10::intrusive_ptr& group, + bool async_op = false); + + c10::intrusive_ptr allGather( + std::vector& tensor_list, + at::Tensor tensor, + const c10::intrusive_ptr& group, + bool async_op = false); + + c10::intrusive_ptr allGatherCoalesced( + std::vector>& output_tensor_lists, + std::vector& input_tensor_list, + const c10::intrusive_ptr& group, + bool async_op = false); + + c10::intrusive_ptr gather( + at::Tensor tensor, + const c10::optional>& gather_list, + const c10::intrusive_ptr& group, + int64_t dst = 0, + bool async_op = false); + + c10::intrusive_ptr scatter( + at::Tensor tensor, + std::vector& scatter_list, + const c10::intrusive_ptr& group, + int64_t src = 0, + bool async_op = false); + + c10::intrusive_ptr reduceScatterMultiGPU( + std::vector& output_tensor_list, + std::vector>& input_tensor_lists, + const c10::intrusive_ptr& group, + ReduceOp op = ReduceOp::SUM, + bool async_op = false); + + c10::intrusive_ptr reduceScatter( + at::Tensor output, + std::vector& input_tensor_list, + const c10::intrusive_ptr& group, + ReduceOp op = ReduceOp::SUM, + bool async_op = false); + + c10::intrusive_ptr allToAllSingle( + at::Tensor output, + at::Tensor input, + std::vector& output_split_sizes, + std::vector& input_split_sizes, + const c10::intrusive_ptr& group, + bool async_op = false); + + c10::intrusive_ptr allToAll( + std::vector& output_tensor_list, + std::vector& input_tensor_list, + const c10::intrusive_ptr& group, + bool async_op = false); + + c10::intrusive_ptr barrier( + const c10::intrusive_ptr& group, + bool async_op = false); + + c10::intrusive_ptr newGroup( + std::vector ranks, + std::chrono::milliseconds timeout, + Backend backend); + + c10::intrusive_ptr worldProcessGroup(); + + c10::intrusive_ptr newProcessGroupHelper( + const int64_t world_size, + const int64_t rank, + const std::vector& group_ranks, + const std::string& backend_str, + const c10::intrusive_ptr& store, + c10::optional group_name, + int64_t timeout_milisesonds); + + c10::intrusive_ptr getProcessGroupByName( + const std::string& name) const; + + std::string getNameOfProcessGroup( + const c10::intrusive_ptr& pg) const; + + void registerProcessGroupName(const c10::intrusive_ptr& process_group, const std::string& name); + + private: + + bool rankNotInGroup(const c10::intrusive_ptr& group) const; + int64_t getGroupRank( + const c10::intrusive_ptr& group, + const int64_t rank) const; + int64_t getGlobalRank( + const c10::intrusive_ptr& group, + const int64_t group_rank) const; + void checkDefaultPg() const; + int64_t getGroupSize(const c10::intrusive_ptr& group) const; + std::string getBackend(const c10::intrusive_ptr& group); + + std::string backend_; + // TODO: Ask Alex what kind of equality we need. It determine whether we + // need to use ProcessGroup or ProcesGroup* as key. + std::unordered_map< + c10::intrusive_ptr, + std::pair>> + pg_map_; + + // Note, this is different mapping relationship than original Python + // implementation. + std::unordered_map, std::string> pg_names_; + + // Process group's global rank to local rank mapping + std::unordered_map< + c10::intrusive_ptr, + std::unordered_map> + pg_group_ranks_; + + c10::intrusive_ptr default_pg_; + + // Default value should be "env://" + std::string default_pg_init_method_; + + int64_t group_count_; +}; + +} // namespace c10d