Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions cuda_core/cuda/core/_cpp/resource_handles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,8 @@ StreamHandle get_per_thread_stream() {
namespace {
struct EventBox {
CUevent resource;
bool timing_disabled;
bool busy_waited;
bool timing_enabled;
bool is_blocking_sync;
bool ipc_enabled;
int device_id;
ContextHandle h_context;
Expand All @@ -368,12 +368,12 @@ static const EventBox* get_box(const EventHandle& h) {
);
}

bool get_event_timing_disabled(const EventHandle& h) noexcept {
return h ? get_box(h)->timing_disabled : true;
bool get_event_timing_enabled(const EventHandle& h) noexcept {
return h ? get_box(h)->timing_enabled : false;
}

bool get_event_busy_waited(const EventHandle& h) noexcept {
return h ? get_box(h)->busy_waited : false;
bool get_event_is_blocking_sync(const EventHandle& h) noexcept {
return h ? get_box(h)->is_blocking_sync : false;
}

bool get_event_ipc_enabled(const EventHandle& h) noexcept {
Expand All @@ -392,7 +392,7 @@ ContextHandle get_event_context(const EventHandle& h) noexcept {
static HandleRegistry<CUevent, EventHandle> event_registry;

EventHandle create_event_handle(const ContextHandle& h_ctx, unsigned int flags,
bool timing_disabled, bool busy_waited,
bool timing_enabled, bool is_blocking_sync,
bool ipc_enabled, int device_id) {
GILReleaseGuard gil;
CUevent event;
Expand All @@ -401,7 +401,7 @@ EventHandle create_event_handle(const ContextHandle& h_ctx, unsigned int flags,
}

auto box = std::shared_ptr<const EventBox>(
new EventBox{event, timing_disabled, busy_waited, ipc_enabled, device_id, h_ctx},
new EventBox{event, timing_enabled, is_blocking_sync, ipc_enabled, device_id, h_ctx},
[h_ctx](const EventBox* b) {
event_registry.unregister_handle(b->resource);
GILReleaseGuard gil;
Expand All @@ -415,27 +415,27 @@ EventHandle create_event_handle(const ContextHandle& h_ctx, unsigned int flags,
}

EventHandle create_event_handle_noctx(unsigned int flags) {
return create_event_handle(ContextHandle{}, flags, true, false, false, -1);
return create_event_handle(ContextHandle{}, flags, false, false, false, -1);
}

EventHandle create_event_handle_ref(CUevent event) {
if (auto h = event_registry.lookup(event)) {
return h;
}
auto box = std::make_shared<const EventBox>(EventBox{event, true, false, false, -1, {}});
auto box = std::make_shared<const EventBox>(EventBox{event, false, false, false, -1, {}});
return EventHandle(box, &box->resource);
}

EventHandle create_event_handle_ipc(const CUipcEventHandle& ipc_handle,
bool busy_waited) {
bool is_blocking_sync) {
GILReleaseGuard gil;
CUevent event;
if (CUDA_SUCCESS != (err = p_cuIpcOpenEventHandle(&event, ipc_handle))) {
return {};
}

auto box = std::shared_ptr<const EventBox>(
new EventBox{event, true, busy_waited, true, -1, {}},
new EventBox{event, false, is_blocking_sync, true, -1, {}},
[](const EventBox* b) {
event_registry.unregister_handle(b->resource);
GILReleaseGuard gil;
Expand Down
10 changes: 5 additions & 5 deletions cuda_core/cuda/core/_cpp/resource_handles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ StreamHandle get_per_thread_stream();
// When the last reference is released, cuEventDestroy is called automatically.
// Returns empty handle on error (caller must check).
EventHandle create_event_handle(const ContextHandle& h_ctx, unsigned int flags,
bool timing_disabled, bool busy_waited,
bool timing_enabled, bool is_blocking_sync,
bool ipc_enabled, int device_id);

// Create an owning event handle without context dependency.
Expand All @@ -225,17 +225,17 @@ EventHandle create_event_handle_noctx(unsigned int flags);
// When the last reference is released, cuEventDestroy is called automatically.
// Returns empty handle on error (caller must check).
EventHandle create_event_handle_ipc(const CUipcEventHandle& ipc_handle,
bool busy_waited);
bool is_blocking_sync);

// Create a non-owning event handle (references existing event).
// Use for events that are managed by the CUDA graph or another owner.
// The event will NOT be destroyed when the handle is released.
// Metadata defaults to unknown (timing_disabled=true, device_id=-1).
// Metadata defaults to unknown (timing_enabled=false, device_id=-1).
EventHandle create_event_handle_ref(CUevent event);

// Event metadata accessors (read from EventBox via pointer arithmetic)
bool get_event_timing_disabled(const EventHandle& h) noexcept;
bool get_event_busy_waited(const EventHandle& h) noexcept;
bool get_event_timing_enabled(const EventHandle& h) noexcept;
bool get_event_is_blocking_sync(const EventHandle& h) noexcept;
bool get_event_ipc_enabled(const EventHandle& h) noexcept;
int get_event_device_id(const EventHandle& h) noexcept;
ContextHandle get_event_context(const EventHandle& h) noexcept;
Expand Down
99 changes: 51 additions & 48 deletions cuda_core/cuda/core/_event.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ from cuda.core._resource_handles cimport (
EventHandle,
create_event_handle,
create_event_handle_ipc,
get_event_timing_disabled,
get_event_busy_waited,
get_event_timing_enabled,
get_event_is_blocking_sync,
get_event_ipc_enabled,
get_event_device_id,
get_event_context,
Expand Down Expand Up @@ -44,22 +44,22 @@ cdef class EventOptions:

Attributes
----------
enable_timing : bool, optional
timing_enabled : bool, optional
Event will record timing data. (Default to False)
busy_waited_sync : bool, optional
If True, event will use blocking synchronization. When a CPU
thread calls synchronize, the call will block until the event
has actually been completed.
Otherwise, the CPU thread will busy-wait until the event has
been completed. (Default to False)
blocking_sync : bool, optional
If True, the event uses blocking synchronization: a CPU
thread that calls :meth:`Event.sync` blocks (yields) until
the event has completed. Otherwise (the default), the CPU
thread busy-waits until the event has completed.
(Default to False)
ipc_enabled : bool, optional
Event will be suitable for interprocess use.
Note that enable_timing must be False. (Default to False)
Note that timing_enabled must be False. (Default to False)

"""

enable_timing: bool | None = False
busy_waited_sync: bool | None = False
timing_enabled: bool | None = False
blocking_sync: bool | None = False
ipc_enabled: bool | None = False


Expand All @@ -79,8 +79,8 @@ cdef class Event:

# To create events and record the timing:
s = Device().create_stream()
e1 = Device().create_event({"enable_timing": True})
e2 = Device().create_event({"enable_timing": True})
e1 = Device().create_event({"timing_enabled": True})
e2 = Device().create_event({"timing_enabled": True})
s.record(e1)
# ... run some GPU works ...
s.record(e2)
Expand All @@ -100,40 +100,41 @@ cdef class Event:
cdef Event self = cls.__new__(cls)
cdef EventOptions opts = check_or_create_options(EventOptions, options, "Event options")
cdef unsigned int flags = 0x0
cdef bint timing_disabled = False
cdef bint busy_waited = False
cdef bint timing_enabled = True
cdef bint is_blocking_sync = False
cdef bint ipc_enabled = False
self._ipc_descriptor = None
if not opts.enable_timing:
if not opts.timing_enabled:
flags |= cydriver.CUevent_flags.CU_EVENT_DISABLE_TIMING
timing_disabled = True
if opts.busy_waited_sync:
timing_enabled = False
if opts.blocking_sync:
flags |= cydriver.CUevent_flags.CU_EVENT_BLOCKING_SYNC
busy_waited = True
is_blocking_sync = True
if opts.ipc_enabled:
if is_free:
raise TypeError(
"IPC-enabled events must be bound; use Stream.record for creation."
)
flags |= cydriver.CUevent_flags.CU_EVENT_INTERPROCESS
ipc_enabled = True
if not timing_disabled:
if timing_enabled:
raise TypeError("IPC-enabled events cannot use timing.")
cdef EventHandle h_event = create_event_handle(
h_context, flags, timing_disabled, busy_waited, ipc_enabled, device_id)
h_context, flags, timing_enabled, is_blocking_sync, ipc_enabled, device_id)
if not h_event:
raise RuntimeError("Failed to create CUDA event")
self._h_event = h_event
if ipc_enabled:
self.get_ipc_descriptor()
_ = self.ipc_descriptor # eagerly populate the descriptor cache
return self

@staticmethod
cdef Event _from_handle(EventHandle h_event):
"""Create an Event wrapping an existing EventHandle.

Metadata (timing, busy_waited, ipc, device_id) is read from the
EventBox via pointer arithmetic — no fields are cached on Event.
Metadata (timing, blocking_sync, ipc, device_id) is read from
the EventBox via pointer arithmetic — no fields are cached on
Event.
"""
cdef Event self = Event.__new__(Event)
self._h_event = h_event
Expand Down Expand Up @@ -163,10 +164,10 @@ cdef class Event:
return timing
else:
if err == cydriver.CUresult.CUDA_ERROR_INVALID_HANDLE:
if self.is_timing_disabled or other.is_timing_disabled:
if not self.is_timing_enabled or not other.is_timing_enabled:
explanation = (
"Both Events must be created with timing enabled in order to subtract them; "
"use EventOptions(enable_timing=True) when creating both events."
"use EventOptions(timing_enabled=True) when creating both events."
)
else:
explanation = (
Expand Down Expand Up @@ -196,8 +197,9 @@ cdef class Event:
def __repr__(self) -> str:
return f"<Event handle={as_intptr(self._h_event):#x}>"

def get_ipc_descriptor(self) -> IPCEventDescriptor:
"""Export an event allocated for sharing between processes."""
@property
def ipc_descriptor(self) -> IPCEventDescriptor:
"""Descriptor for sharing this event with other processes."""
if self._ipc_descriptor is not None:
return self._ipc_descriptor
if not self.is_ipc_enabled:
Expand All @@ -206,7 +208,7 @@ cdef class Event:
with nogil:
HANDLE_RETURN(cydriver.cuIpcGetEventHandle(&data, as_cu(self._h_event)))
cdef bytes data_b = cpython.PyBytes_FromStringAndSize(<char*>(data.reserved), sizeof(data.reserved))
self._ipc_descriptor = IPCEventDescriptor._init(data_b, get_event_busy_waited(self._h_event))
self._ipc_descriptor = IPCEventDescriptor._init(data_b, get_event_is_blocking_sync(self._h_event))
return self._ipc_descriptor

@classmethod
Expand All @@ -215,7 +217,7 @@ cdef class Event:
cdef cydriver.CUipcEventHandle data
memcpy(data.reserved, <const void*><const char*>(ipc_descriptor._reserved), sizeof(data.reserved))
cdef Event self = Event.__new__(cls)
cdef EventHandle h_event = create_event_handle_ipc(data, ipc_descriptor._busy_waited)
cdef EventHandle h_event = create_event_handle_ipc(data, ipc_descriptor._is_blocking_sync)
if not h_event:
raise RuntimeError("Failed to open IPC event handle")
self._h_event = h_event
Expand All @@ -228,23 +230,24 @@ cdef class Event:
return get_event_ipc_enabled(self._h_event)

@property
def is_timing_disabled(self) -> bool:
"""Return True if the event does not record timing data, otherwise False."""
return get_event_timing_disabled(self._h_event)
def is_timing_enabled(self) -> bool:
"""Return True if the event records timing data, otherwise False."""
return get_event_timing_enabled(self._h_event)

@property
def is_sync_busy_waited(self) -> bool:
"""Return True if the event synchronization would keep the CPU busy-waiting, otherwise False."""
return get_event_busy_waited(self._h_event)
def is_blocking_sync(self) -> bool:
"""Return True if the event uses blocking synchronization (the CPU
thread blocks on :meth:`sync` instead of busy-waiting), otherwise False.
"""
return get_event_is_blocking_sync(self._h_event)

def sync(self):
"""Synchronize until the event completes.

If the event was created with busy_waited_sync, then the
calling CPU thread will block until the event has been
completed by the device.
Otherwise the CPU thread will busy-wait until the event
has been completed.
If the event was created with ``blocking_sync=True``, the
calling CPU thread blocks (yields) until the event has been
completed by the device. Otherwise (the default) the CPU
thread busy-waits until the event has completed.

"""
with nogil:
Expand Down Expand Up @@ -302,28 +305,28 @@ cdef class IPCEventDescriptor:

cdef:
bytes _reserved
bint _busy_waited
bint _is_blocking_sync

def __init__(self, *arg, **kwargs):
raise RuntimeError("IPCEventDescriptor objects cannot be instantiated directly. Please use Event APIs.")

@staticmethod
def _init(reserved: bytes, busy_waited: cython.bint):
def _init(reserved: bytes, is_blocking_sync: cython.bint):
cdef IPCEventDescriptor self = IPCEventDescriptor.__new__(IPCEventDescriptor)
self._reserved = reserved
self._busy_waited = busy_waited
self._is_blocking_sync = is_blocking_sync
return self

def __eq__(self, IPCEventDescriptor rhs):
# No need to check self._busy_waited.
# No need to check self._is_blocking_sync.
return self._reserved == rhs._reserved

def __reduce__(self):
return IPCEventDescriptor._init, (self._reserved, self._busy_waited)
return IPCEventDescriptor._init, (self._reserved, self._is_blocking_sync)


def _reduce_event(event):
check_multiprocessing_start_method()
return event.from_ipc_descriptor, (event.get_ipc_descriptor(),)
return event.from_ipc_descriptor, (event.ipc_descriptor,)

multiprocessing.reduction.register(Event, _reduce_event)
11 changes: 11 additions & 0 deletions cuda_core/cuda/core/_kernel_arg_handler.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import numpy
from cuda.core._memory import Buffer
from cuda.core._tensor_map import TensorMapDescriptor as _TensorMapDescriptor_py
from cuda.core._tensor_map cimport TensorMapDescriptor
from cuda.core.graph._graph_definition cimport GraphCondition
from cuda.core._utils.cuda_utils import driver
from cuda.bindings cimport cydriver

Expand Down Expand Up @@ -318,6 +319,11 @@ cdef class ParamHolder:
if arg_type is driver.CUgraphConditionalHandle:
prepare_arg[cydriver.CUgraphConditionalHandle](self.data, self.data_addresses, <intptr_t>int(arg), i)
continue
elif arg_type is GraphCondition:
prepare_arg[cydriver.CUgraphConditionalHandle](
self.data, self.data_addresses,
<intptr_t><unsigned long long>(<GraphCondition>arg)._c_handle, i)
continue
# If no exact types are found, fallback to slower `isinstance` check
elif isinstance(arg, Buffer):
if isinstance(arg.handle, int):
Expand All @@ -341,6 +347,11 @@ cdef class ParamHolder:
elif isinstance(arg, driver.CUgraphConditionalHandle):
prepare_arg[cydriver.CUgraphConditionalHandle](self.data, self.data_addresses, arg, i)
continue
elif isinstance(arg, GraphCondition):
prepare_arg[cydriver.CUgraphConditionalHandle](
self.data, self.data_addresses,
<intptr_t><unsigned long long>(<GraphCondition>arg)._c_handle, i)
continue
# TODO: support ctypes/numpy struct
raise TypeError("the argument is of unsupported type: " + str(type(arg)))

Expand Down
2 changes: 1 addition & 1 deletion cuda_core/cuda/core/_launch_config.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ cdef class LaunchConfig:
public tuple cluster
public tuple block
public int shmem_size
public bint cooperative_launch
public bint is_cooperative

vector[cydriver.CUlaunchAttribute] _attrs
object __weakref__
Expand Down
Loading
Loading