:github_url: https://github.com/pytorch/pytorch .. _program_listing_file_c10_xpu_XPUStream.h: Program Listing for File XPUStream.h ==================================== |exhale_lsh| :ref:`Return to documentation for file ` (``c10/xpu/XPUStream.h``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #pragma once #include #include #include namespace c10::xpu { /* * Note [Stream Management] * * An XPUStream is an abstraction of an actual SYCL queue in which SYCL kernel * can execute. Currently, there are several pools per device to manage SYCL * queue, and a device's pool is lazily created. * * There are two pools per device. The first pool contains "normal priority" * queues. The second pool is the "high priority" queues. There are 32 queues in * per pool per device, and when a queue is requested one of these queues is * returned round-robin. That is, the first queue requested is at index 0, the * second at index 1... to index 31, then index 0 again. * * This means that if 33 queues are requested, the first and last queues * requested are actually the same queue (under the covers) and kernels enqueued * on them cannot run concurrently. * * It is safe to enqueue a kernel on the same queue from two different * threads as the SYCL specification described. */ static constexpr int max_compile_time_stream_priorities = 3; /* * This serves as a wrapper around c10::Stream and acts as a representation for * a SYCL queue, which allows asynchronous execution of XPU tasks. */ class C10_XPU_API XPUStream { public: enum Unchecked { UNCHECKED }; explicit XPUStream(Stream stream) : stream_(stream) { TORCH_CHECK(stream_.device_type() == DeviceType::XPU); } explicit XPUStream(Unchecked, Stream stream) : stream_(stream) {} bool operator==(const XPUStream& other) const noexcept { return unwrap() == other.unwrap(); } bool operator!=(const XPUStream& other) const noexcept { return unwrap() != other.unwrap(); } operator sycl::queue&() const { return queue(); } operator sycl::queue*() const { return &queue(); } operator Stream() const { return unwrap(); } DeviceType device_type() const { return DeviceType::XPU; } DeviceIndex device_index() const { return stream_.device_index(); } Device device() const { return Device(DeviceType::XPU, device_index()); } StreamId id() const { return stream_.id(); } bool query() const { return queue().ext_oneapi_empty(); } void synchronize() const { queue().wait_and_throw(); const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace(); if (C10_UNLIKELY(interp)) { (*interp)->trace_gpu_stream_synchronization( c10::kXPU, reinterpret_cast(&queue())); } } int priority() const; sycl::queue& queue() const; Stream unwrap() const { return stream_; } struct c10::StreamData3 pack3() const { return stream_.pack3(); } static XPUStream unpack3( StreamId stream_id, DeviceIndex device_index, DeviceType device_type) { return XPUStream(Stream::unpack3(stream_id, device_index, device_type)); } static std::tuple priority_range() { // See Note [XPU Stream priorities] return std::make_tuple(1, -max_compile_time_stream_priorities + 2); } private: Stream stream_; }; C10_XPU_API XPUStream getStreamFromPool(const bool isHighPriority = false, DeviceIndex device = -1); C10_XPU_API XPUStream getStreamFromPool(const int priority, DeviceIndex device = -1); C10_XPU_API XPUStream getStreamFromExternal(sycl::queue* ext_queue, DeviceIndex device_index); C10_XPU_API XPUStream getCurrentXPUStream(DeviceIndex device = -1); C10_XPU_API void setCurrentXPUStream(XPUStream stream); C10_XPU_API std::ostream& operator<<(std::ostream& stream, const XPUStream& s); C10_XPU_API void syncStreamsOnDevice(DeviceIndex device = -1); } // namespace c10::xpu namespace std { template <> struct hash { size_t operator()(c10::xpu::XPUStream s) const noexcept { return std::hash{}(s.unwrap()); } }; } // namespace std