use { crate::{ cpu_worker::{ CpuJob, CpuWork, CpuWorker, jobs::{ img_copy::ImgCopyWork, read_write::{ReadWriteJobError, ReadWriteWork}, }, }, gfx_api::{ AsyncShmGfxTextureCallback, FdSync, PendingShmTransfer, ShmMemory, ShmMemoryBacking, }, gfx_apis::vulkan::{ VulkanError, VulkanSync, command::VulkanCommandBuffer, dmabuf_buffer::{TRANSFER_QUEUE_BUFFER_ALIGNMENT, VulkanDmabufBuffer}, image::{QueueFamily, QueueState, QueueTransfer, VulkanImage, VulkanImageMemory}, renderer::image_barrier, shm_image::VulkanShmImage, staging::{VulkanStagingBuffer, VulkanStagingShell}, }, rect::{Rect, Region}, utils::{clonecell::CloneCell, errorfmt::ErrorFmt}, vulkan_core::sync::VulkanDeviceSyncExt, }, arrayvec::ArrayVec, ash::vk::{ AccessFlags2, BufferImageCopy2, CommandBufferBeginInfo, CommandBufferSubmitInfo, CommandBufferUsageFlags, DependencyInfo, Extent3D, ImageAspectFlags, ImageLayout, ImageSubresourceLayers, Offset3D, PipelineStageFlags2, SemaphoreSubmitInfo, SubmitInfo2, }, std::{ cell::{Cell, RefCell, RefMut}, rc::Rc, slice, }, uapi::OwnedFd, }; pub struct VulkanShmImageAsyncData { pub(super) busy: Cell, pub(super) io_job: Cell>>, pub(super) copy_job: Cell>>, pub(super) staging: CloneCell>>, pub(super) buffer: CloneCell>>, pub(super) client_mem: CloneCell>>, pub(super) callback: Cell>>, pub(super) callback_id: Cell, pub(super) regions: RefCell>>, pub(super) cpu: Rc, pub(super) last_gfx_use: Cell>, pub(super) data_copied: Cell, } impl VulkanShmImageAsyncData { fn complete(&self, result: Result<(), VulkanError>) { self.busy.set(false); if let Some(staging) = self.staging.take() { staging.busy.set(false); } self.buffer.take(); self.client_mem.take(); if let Some(cb) = self.callback.take() { cb.completed(result.map_err(|e| e.into())); } } } #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub(super) enum TransferType { Upload, Download, } impl VulkanShmImage { pub fn async_transfer2( &self, img: &Rc, buffer: Rc, damage: Region, callback: Rc, ) -> Result, VulkanError> { self.async_transfer_(img, damage, callback, |data, damage| { self.try_async_transfer2(img, buffer, data, damage) }) } fn try_async_transfer2( &self, img: &Rc, buffer: Rc, data: &VulkanShmImageAsyncData, mut damage: Region, ) -> Result<(), VulkanError> { if data.busy.get() { return Err(VulkanError::AsyncCopyBusy); } if self.size > buffer.size { return Err(VulkanError::InvalidBufferSize); } data.busy.set(true); data.data_copied.set(true); data.buffer.set(Some(buffer.clone())); if img.contents_are_undefined.get() { damage = Region::new(Rect::new_sized_saturating( 0, 0, img.width as _, img.height as _, )); } self.calculate_copies(img, data, damage, buffer.offset); self.async_release_from_gfx_queue(img, data, TransferType::Upload)?; self.async_upload_copy_buffer_to_image(img, data)?; Ok(()) } pub fn async_transfer( &self, img: &Rc, staging: Rc, client_mem: &Rc, damage: Region, callback: Rc, tt: TransferType, ) -> Result, VulkanError> { self.async_transfer_(img, damage, callback, |data, damage| { self.try_async_transfer(img, staging, data, client_mem, damage, tt) }) } fn async_transfer_( &self, img: &Rc, damage: Region, callback: Rc, f: impl FnOnce(&VulkanShmImageAsyncData, Region) -> Result<(), VulkanError>, ) -> Result, VulkanError> { if damage.is_empty() { return Ok(None); } let data = self.async_data.as_ref().unwrap(); let res = f(data, damage); match res { Ok(()) => { let id = img.renderer.allocate_point(); data.callback_id.set(id); data.callback.set(Some(callback)); Ok(Some(PendingShmTransfer::new(img.clone(), id))) } Err(e) => Err(e), } } fn try_async_transfer( &self, img: &Rc, staging: Rc, data: &VulkanShmImageAsyncData, client_mem: &Rc, mut damage: Region, tt: TransferType, ) -> Result<(), VulkanError> { if data.busy.get() { return Err(VulkanError::AsyncCopyBusy); } if staging.busy.get() { return Err(VulkanError::StagingBufferBusy); } match tt { TransferType::Upload => { if !staging.upload { return Err(VulkanError::StagingBufferNoUpload); } } TransferType::Download => { if !staging.download { return Err(VulkanError::StagingBufferNoDownload); } } } if self.size > client_mem.len() as u64 { return Err(VulkanError::InvalidBufferSize); } data.busy.set(true); data.data_copied.set(false); staging.busy.set(true); data.staging.set(Some(staging.clone())); data.client_mem.set(Some(client_mem.clone())); if img.contents_are_undefined.get() { if tt == TransferType::Download { return Err(VulkanError::UndefinedContents); } damage = Region::new(Rect::new_sized_saturating( 0, 0, img.width as _, img.height as _, )); } let copies = &mut *self.calculate_copies(img, data, damage, 0); self.async_release_from_gfx_queue(img, data, tt)?; if let Some(staging) = staging.staging.get() { return match tt { TransferType::Upload => self .async_transfer_initiate_host_copy(img, data, &staging, copies, client_mem, tt), TransferType::Download => { self.async_download_copy_image_to_buffer(img, &staging, copies) } }; } let img2 = img.clone(); let client_mem = client_mem.clone(); img.renderer .device .fill_staging_shell(&img.renderer, &data.cpu, staging, move |res| { let VulkanImageMemory::Internal(shm) = &img2.ty else { unreachable!(); }; if let Err(e) = shm.async_transfer_after_allocation(&img2, &client_mem, res, tt) { shm.async_data.as_ref().unwrap().complete(Err(e)); } }) } fn calculate_copies<'a>( &self, img: &Rc, data: &'a VulkanShmImageAsyncData, damage: Region, extra_offset: u64, ) -> RefMut<'a, Vec>> { let mut copies_ref = data.regions.borrow_mut(); let copies = &mut *copies_ref; copies.clear(); let mut copy = |mut x, mut y, mut width, mut height| { let x_orig = x; let width_orig = width; let mut buffer_offset; loop { buffer_offset = (y as u32 * img.stride + x as u32 * img.format.bpp) as u64; if buffer_offset.is_multiple_of(TRANSFER_QUEUE_BUFFER_ALIGNMENT) { break; } if x > 0 { x -= 1; width += 1; } else { y -= 1; height += 1; x = x_orig; width = width_orig; } } let copy = BufferImageCopy2::default() .buffer_offset(buffer_offset + extra_offset) .image_offset(Offset3D { x, y, z: 0 }) .image_extent(Extent3D { width, height, depth: 1, }) .image_subresource(ImageSubresourceLayers { aspect_mask: ImageAspectFlags::COLOR, mip_level: 0, base_array_layer: 0, layer_count: 1, }) .buffer_image_height(img.height) .buffer_row_length(img.stride / img.format.bpp); copies.push(copy); }; let (width_mask, height_mask) = img.renderer.device.transfer_granularity_mask; let width_mask = width_mask as i32; let height_mask = height_mask as i32; for damage in damage.rects() { if damage.x2() < 0 || damage.y2() < 0 { continue; } let x1 = damage.x1().max(0); let y1 = damage.y1().max(0); let x2 = damage.x2().min(img.width as i32); let y2 = damage.y2().min(img.height as i32); let x1 = x1 & !width_mask; let y1 = y1 & !height_mask; let x2 = ((x2 + width_mask) & !width_mask).min(img.width as i32); let y2 = ((y2 + height_mask) & !height_mask).min(img.height as i32); let Some(damage) = Rect::new(x1, y1, x2, y2) else { continue; }; if damage.is_empty() { continue; } copy( damage.x1(), damage.y1(), damage.width() as u32, damage.height() as u32, ); } copies_ref } fn async_release_from_gfx_queue( &self, img: &Rc, data: &VulkanShmImageAsyncData, tt: TransferType, ) -> Result<(), VulkanError> { img.renderer.check_defunct()?; let Some(transfer_queue_idx) = img.renderer.device.distinct_transfer_queue_family_idx else { let Some(sync) = data.last_gfx_use.take() else { img.queue_state.set(QueueState::Released { to: QueueFamily::Transfer, }); return Ok(()); }; let id = img.renderer.allocate_point(); let pending = img.renderer.eng.spawn( "await_transfer_to_transfer", await_gfx_queue_release(id, img.clone(), None, None, Some(sync), tt), ); img.renderer.pending_submits.set(id, pending); img.queue_state.set(QueueState::Releasing); return Ok(()); }; let (gfx_access_mask, gfx_layout, transfer_layout) = match tt { TransferType::Upload => ( AccessFlags2::SHADER_SAMPLED_READ, ImageLayout::SHADER_READ_ONLY_OPTIMAL, ImageLayout::TRANSFER_DST_OPTIMAL, ), TransferType::Download => ( AccessFlags2::COLOR_ATTACHMENT_WRITE, ImageLayout::COLOR_ATTACHMENT_OPTIMAL, ImageLayout::TRANSFER_SRC_OPTIMAL, ), }; let mut barriers = ArrayVec::<_, 2>::new(); match img.queue_state.get() { QueueState::Acquired { family } => { assert_eq!(family, QueueFamily::Gfx); } QueueState::Releasing => { unreachable!(); } QueueState::Released { to } => { assert_eq!(to, QueueFamily::Gfx); let barrier = image_barrier() .image(img.image) .src_queue_family_index(transfer_queue_idx) .dst_queue_family_index(img.renderer.device.graphics_queue_idx) .dst_stage_mask(PipelineStageFlags2::ALL_COMMANDS) .old_layout(transfer_layout) .new_layout(gfx_layout); barriers.push(barrier); } } let barrier = image_barrier() .image(img.image) .src_queue_family_index(img.renderer.device.graphics_queue_idx) .dst_queue_family_index(transfer_queue_idx) .src_access_mask(gfx_access_mask) .src_stage_mask(PipelineStageFlags2::ALL_COMMANDS) .old_layout(if img.is_undefined.get() { ImageLayout::UNDEFINED } else { gfx_layout }) .new_layout(transfer_layout); barriers.push(barrier); let dep_info = DependencyInfo::default().image_memory_barriers(&barriers); let dev = &img.renderer.device.device; let begin_info = CommandBufferBeginInfo::default().flags(CommandBufferUsageFlags::ONE_TIME_SUBMIT); let cmd = img.renderer.gfx_command_buffers.allocate()?; let command_buffer_info = CommandBufferSubmitInfo::default().command_buffer(cmd.buffer); let mut semaphore_submit_info = SemaphoreSubmitInfo::default(); let mut submit_info = SubmitInfo2::default().command_buffer_infos(slice::from_ref(&command_buffer_info)); let vulkan_sync = img.renderer.device.create_sync( img.renderer.render_tls.as_ref(), &mut semaphore_submit_info, &mut submit_info, )?; unsafe { dev.begin_command_buffer(cmd.buffer, &begin_info) .map_err(VulkanError::BeginCommandBuffer)?; dev.cmd_pipeline_barrier2(cmd.buffer, &dep_info); dev.end_command_buffer(cmd.buffer) .map_err(VulkanError::EndCommandBuffer)?; dev.queue_submit2( img.renderer.device.graphics_queue, slice::from_ref(&submit_info), vulkan_sync.fence(), ) .inspect_err(img.renderer.device.idl()) .map_err(VulkanError::Submit)?; } let sync = vulkan_sync.to_sync(|| img.renderer.block()); let id = img.renderer.allocate_point(); let pending = img.renderer.eng.spawn( "await_transfer_to_transfer", await_gfx_queue_release(id, img.clone(), Some(cmd), Some(vulkan_sync), sync, tt), ); img.renderer.pending_submits.set(id, pending); img.queue_state.set(QueueState::Releasing); Ok(()) } fn async_transfer_after_allocation( &self, img: &Rc, client_mem: &Rc, res: Result, VulkanError>, tt: TransferType, ) -> Result<(), VulkanError> { let staging = res?; let data = self.async_data.as_ref().unwrap(); let copies = &*data.regions.borrow(); match tt { TransferType::Upload => { self.async_transfer_initiate_host_copy(img, data, &staging, copies, client_mem, tt) } TransferType::Download => { self.async_download_copy_image_to_buffer(img, &staging, copies) } } } pub(super) fn async_transfer_initiate_host_copy( &self, img: &Rc, data: &VulkanShmImageAsyncData, staging: &VulkanStagingBuffer, copies: &[BufferImageCopy2], client_mem: &Rc, tt: TransferType, ) -> Result<(), VulkanError> { img.renderer.check_defunct()?; if tt == TransferType::Download { staging.download(|_, _| ())?; } let id = img.renderer.allocate_point(); let pending; match client_mem.safe_access() { ShmMemoryBacking::Ptr(ptr) => { let mut job = data.copy_job.take().unwrap_or_else(|| { Box::new(CopyTransferJob { img: None, id, _mem: None, work: unsafe { ImgCopyWork::new() }, tt, }) }); job.id = id; job.img = Some(img.clone()); job._mem = Some(client_mem.clone()); job.tt = tt; match tt { TransferType::Upload => { job.work.src = ptr as _; job.work.dst = staging.allocation.mem.unwrap(); } TransferType::Download => { job.work.src = staging.allocation.mem.unwrap(); job.work.dst = ptr as _; } } job.work.width = img.width as _; job.work.stride = img.stride as _; job.work.bpp = img.format.bpp as _; job.work.rects.clear(); for copy in copies { job.work.rects.push(Rect::new_sized_saturating( copy.image_offset.x as _, copy.image_offset.y as _, copy.image_extent.width as _, copy.image_extent.height as _, )); } pending = data.cpu.submit(job); } ShmMemoryBacking::Fd(fd, offset) => { let mut min_offset = client_mem.len() as u64; let mut max_offset = 0; for copy in copies { min_offset = min_offset.min(copy.buffer_offset); let len = img.stride * (copy.image_extent.height - 1) + copy.image_extent.width * img.format.bpp; max_offset = max_offset.max(copy.buffer_offset + len as u64); } let mut job = data.io_job.take().unwrap_or_else(|| { Box::new(IoTransferJob { img: None, id, _mem: None, work: unsafe { ReadWriteWork::new() }, fd: None, tt, }) }); job.id = id; job.img = Some(img.clone()); job._mem = Some(client_mem.clone()); job.fd = Some(fd.clone()); job.tt = tt; unsafe { let config = job.work.config(); config.fd = fd.raw(); config.offset = offset + min_offset as usize; config.ptr = staging.allocation.mem.unwrap().add(min_offset as _); config.len = max_offset.saturating_sub(min_offset) as usize; config.write = tt == TransferType::Download; } pending = data.cpu.submit(job); } } img.renderer.pending_cpu_jobs.set(id, pending); Ok(()) } fn async_upload_copy_buffer_to_image( &self, img: &Rc, data: &VulkanShmImageAsyncData, ) -> Result<(), VulkanError> { if !data.data_copied.get() { return Ok(()); } if img.queue_state.get().acquire(QueueFamily::Transfer) == QueueTransfer::Impossible { return Ok(()); } img.renderer.check_defunct()?; let regions = &*data.regions.borrow(); let (buffer, size, foreign_buffer) = match data.staging.get() { Some(s) => { let staging = s.staging.get().unwrap(); staging.upload(|_, _| ())?; (staging.buffer, staging.size, false) } _ => { let host_buffer = data.buffer.get().unwrap(); (host_buffer.buffer, host_buffer.size, true) } }; let (cmd, vulkan_sync, sync, point) = self.submit_buffer_image_copy( img, buffer, size, regions, true, TransferType::Upload, foreign_buffer, )?; img.queue_state.set(QueueState::Releasing); let future = img.renderer.eng.spawn( "await async upload", await_async_transfer_release_to_gfx( point, img.clone(), cmd, vulkan_sync, sync, TransferType::Upload, ), ); img.renderer.pending_submits.set(point, future); Ok(()) } fn async_download_copy_image_to_buffer( &self, img: &Rc, staging: &VulkanStagingBuffer, copies: &[BufferImageCopy2], ) -> Result<(), VulkanError> { if img.queue_state.get().acquire(QueueFamily::Transfer) == QueueTransfer::Impossible { return Ok(()); } img.renderer.check_defunct()?; let (cmd, vulkan_sync, sync, point) = self.submit_buffer_image_copy( img, staging.buffer, staging.size, copies, true, TransferType::Download, false, )?; img.queue_state.set(QueueState::Releasing); let future = img.renderer.eng.spawn( "await async image to buffer copy", await_async_transfer_release_to_gfx( point, img.clone(), cmd, vulkan_sync, sync, TransferType::Download, ), ); img.renderer.pending_submits.set(point, future); Ok(()) } } pub(super) struct IoTransferJob { img: Option>, id: u64, _mem: Option>, fd: Option>, work: ReadWriteWork, tt: TransferType, } pub(super) struct CopyTransferJob { img: Option>, id: u64, _mem: Option>, work: ImgCopyWork, tt: TransferType, } impl CpuJob for IoTransferJob { fn work(&mut self) -> &mut dyn CpuWork { &mut self.work } fn completed(mut self: Box) { self._mem = None; self.fd = None; let img = self.img.take().unwrap(); let res = self.work.config().result.take().unwrap(); complete_async_host_copy(&img, self.id, res, self.tt, |data| { data.io_job.set(Some(self)) }); } } impl CpuJob for CopyTransferJob { fn work(&mut self) -> &mut dyn CpuWork { &mut self.work } fn completed(mut self: Box) { self._mem = None; let img = self.img.take().unwrap(); complete_async_host_copy(&img, self.id, Ok(()), self.tt, |data| { data.copy_job.set(Some(self)) }); } } fn complete_async_host_copy( img: &Rc, id: u64, res: Result<(), ReadWriteJobError>, tt: TransferType, store: impl FnOnce(&VulkanShmImageAsyncData), ) { img.renderer.pending_cpu_jobs.remove(&id); let VulkanImageMemory::Internal(shm) = &img.ty else { unreachable!(); }; let data = shm.async_data.as_ref().unwrap(); store(data); if let Err(e) = res { data.complete(Err(VulkanError::AsyncCopyToStaging(e))); } data.data_copied.set(true); match tt { TransferType::Upload => { let res = shm.async_upload_copy_buffer_to_image(img, data); if let Err(e) = res { data.complete(Err(e)); } } TransferType::Download => data.complete(Ok(())), } } async fn await_gfx_queue_release( id: u64, img: Rc, buf: Option>, vulkan_sync: Option, sync: Option, tt: TransferType, ) { if let Some(sync) = &sync && let Err(e) = sync.try_signaled(&img.renderer.ring).await { log::error!( "Could not wait for sync file to become readable: {}", ErrorFmt(e) ); img.renderer.block(); } if let Some(vs) = vulkan_sync { vs.handle_validation(); } if let Some(buf) = buf { img.renderer.gfx_command_buffers.buffers.push(buf); } img.renderer.pending_submits.remove(&id); img.queue_state.set(QueueState::Released { to: QueueFamily::Transfer, }); let VulkanImageMemory::Internal(shm) = &img.ty else { unreachable!(); }; let data = shm.async_data.as_ref().unwrap(); let res = match tt { TransferType::Upload => shm.async_upload_copy_buffer_to_image(&img, data), TransferType::Download => match data.staging.get().unwrap().staging.get() { Some(staging) => { let copies = &*data.regions.borrow(); shm.async_download_copy_image_to_buffer(&img, &staging, copies) } None => Ok(()), }, }; if let Err(e) = res { data.complete(Err(e)); } } pub async fn await_async_transfer_release_to_gfx( id: u64, img: Rc, buf: Rc, vulkan_sync: VulkanSync, sync: Option, tt: TransferType, ) { if let Some(sync) = &sync && let Err(e) = sync.try_signaled(&img.renderer.ring).await { log::error!( "Could not wait for sync file to become readable: {}", ErrorFmt(e) ); img.renderer.block(); } vulkan_sync.handle_validation(); match &img.renderer.transfer_command_buffers { Some(b) => b.buffers.push(buf), None => img.renderer.gfx_command_buffers.buffers.push(buf), } img.queue_state.set(QueueState::Released { to: QueueFamily::Gfx, }); img.renderer.pending_submits.remove(&id); let VulkanImageMemory::Internal(shm) = &img.ty else { unreachable!(); }; let data = shm.async_data.as_ref().unwrap(); match tt { TransferType::Upload => { data.complete(Ok(())); } TransferType::Download => { let data = shm.async_data.as_ref().unwrap(); let staging = data.staging.get().unwrap().staging.get().unwrap(); let client_mem = data.client_mem.get().unwrap(); let copies = &*data.regions.borrow(); let res = shm.async_transfer_initiate_host_copy( &img, data, &staging, copies, &client_mem, tt, ); if let Err(e) = res { data.complete(Err(e)); } } } }