const std = @import("std"); const Log2Int = std.math.Log2Int; const Allocator = std.mem.Allocator; // TODO: Rewrite // pub const TwoWayChannel = struct { // const Self = @This(); // emu: Channel(EmuMessage), // gui: Channel(GuiMessage), // pub fn init(items: []u8) Self { // comptime std.debug.assert(@sizeOf(EmuMessage) == @sizeOf(GuiMessage)); // comptime std.debug.assert(@sizeOf(@typeInfo([]u8).Pointer.child) == @sizeOf(EmuMessage)); // std.debug.assert(items.len % 2 == 0); // const left = @ptrCast([*]EmuMessage, items)[0 .. items.len / 2]; // const right = @ptrCast([*]GuiMessage, items)[items.len / 2 .. items.len]; // return .{ .emu = Channel(EmuMessage).init(left), .gui = Channel(GuiMessage).init(right) }; // } // }; pub fn Channel(comptime T: type, comptime N: usize) type { return struct { const Index = usize; const capacity_limit = (@as(Index, 1) << @typeInfo(Index).Int.bits - 1) - 1; // half the range of index type tx: Sender, rx: Receiver, pub const Sender = struct { const Self = @This(); read: *Index, write: *Index, ptr: *[N]T, const Error = error{buffer_full}; pub fn send(self: Self, value: T) void { const idx_r = @atomicLoad(Index,, .Acquire); const idx_w = @atomicLoad(Index, self.write, .Acquire); // Check to see if Queue is full if (idx_w - idx_r == N) @panic("Channel: Buffer is full"); self.ptr[mask(idx_w)] = value; std.atomic.fence(.Release); @atomicStore(Index, self.write, idx_w + 1, .Release); } pub fn len(self: Self) Index { const idx_r = @atomicLoad(Index,, .Acquire); const idx_w = @atomicLoad(Index, self.write, .Acquire); return idx_w - idx_r; } }; pub const Receiver = struct { const Self = @This(); read: *Index, write: *Index, ptr: *[N]T, pub fn recv(self: Self) ?T { const idx_r = @atomicLoad(Index,, .Acquire); const idx_w = @atomicLoad(Index, self.write, .Acquire); if (idx_r == idx_w) return null; std.atomic.fence(.Acquire); const value = self.ptr[mask(idx_r)]; std.atomic.fence(.Release); @atomicStore(Index,, idx_r + 1, .Release); return value; } pub fn peek(self: Self) ?T { const idx_r = @atomicLoad(Index,, .Acquire); const idx_w = @atomicLoad(Index, self.write, .Acquire); if (idx_r == idx_w) return null; std.atomic.fence(.Acquire); return self.ptr[mask(idx_r)]; } pub fn len(self: Self) Index { const idx_r = @atomicLoad(Index,, .Acquire); const idx_w = @atomicLoad(Index, self.write, .Acquire); return idx_w - idx_r; } }; fn mask(idx: Index) Index { return idx & (@intCast(Index, N) - 1); } pub fn init(allocator: Allocator) !Channel(T, N) { const buf = try allocator.alloc(T, N); const indicies = try allocator.alloc(Index, 2); return .{ .tx = Sender{ .ptr = buf[0..N], .read = &indicies[0], .write = &indicies[1], }, .rx = Receiver{ .ptr = buf[0..N], .read = &indicies[0], .write = &indicies[1], }, }; } pub fn deinit(self: *Channel(T, N), allocator: Allocator) void { const indicies: []Index = @ptrCast([*]Index,[0..2];;; self.* = undefined; } comptime { std.debug.assert(std.math.isPowerOfTwo(N)); std.debug.assert(N <= capacity_limit); } }; } test "Channel init + deinit" { var ch = try Channel(u8, 64).init(std.testing.allocator); defer ch.deinit(std.testing.allocator); } test "Channel basic queue" { var ch = try Channel(u8, 64).init(std.testing.allocator); defer ch.deinit(std.testing.allocator); ch.tx.send(128); try std.testing.expectEqual(@as(?u8, 128), ch.rx.recv()); } test "Channel basic multithreaded" { const builtin = @import("builtin"); if (builtin.single_threaded) return error.SkipZigTest; const run_tx = struct { fn run(tx: anytype) void { tx.send(128); } }.run; const run_rx = struct { fn run(rx: anytype) !void { while (rx.recv()) |value| { try std.testing.expectEqual(@as(?u8, 128), value); } } }.run; var ch = try Channel(u8, 64).init(std.testing.allocator); defer ch.deinit(std.testing.allocator); const tx_handle = try std.Thread.spawn(.{}, run_tx, .{&ch.tx}); defer tx_handle.join(); const rx_handle = try std.Thread.spawn(.{}, run_rx, .{&ch.rx}); defer rx_handle.join(); } pub fn RingBuffer(comptime T: type) type { return struct { const Self = @This(); const Index = usize; const max_capacity = (@as(Index, 1) << @typeInfo(Index).Int.bits - 1) - 1; // half the range of index type const log = std.log.scoped(.RingBuffer); read: Index, write: Index, buf: []T, const Error = error{buffer_full}; pub fn init(buf: []T) Self { std.debug.assert(std.math.isPowerOfTwo(buf.len)); // capacity must be a power of two std.debug.assert(buf.len <= max_capacity); @memset(buf, 0); return .{ .read = 0, .write = 0, .buf = buf }; } pub fn push(self: *Self, value: T) Error!void { if (self.isFull()) return error.buffer_full; defer self.write += 1; self.buf[self.mask(self.write)] = value; } pub fn pop(self: *Self) ?T { if (self.isEmpty()) return null; defer += 1; return self.buf[self.mask(]; } /// Returns the number of entries read pub fn copy(self: *const Self, cpy: []T) Index { const count = std.math.min(self.len(), cpy.len); var start: Index =; for (cpy, 0..) |*v, i| { if (i >= count) break; v.* = self.buf[self.mask(start)]; start += 1; } return count; } fn len(self: *const Self) Index { return self.write -; } fn isFull(self: *const Self) bool { return self.len() == self.buf.len; } fn isEmpty(self: *const Self) bool { return == self.write; } fn mask(self: *const Self, idx: Index) Index { return idx & (self.buf.len - 1); } }; } // Sign-Extend value of type `T` to type `U` pub fn sext(comptime T: type, comptime U: type, value: T) T { // U must have less bits than T comptime std.debug.assert(@typeInfo(U).Int.bits <= @typeInfo(T).Int.bits); const iT = std.meta.Int(.signed, @typeInfo(T).Int.bits); const ExtU = if (@typeInfo(U).Int.signedness == .unsigned) T else iT; const shift_amt = @intCast(Log2Int(T), @typeInfo(T).Int.bits - @typeInfo(U).Int.bits); return @bitCast(T, @bitCast(iT, @as(ExtU, @truncate(U, value)) << shift_amt) >> shift_amt); } /// See pub inline fn rotr(comptime T: type, x: T, r: anytype) T { if (@typeInfo(T).Int.signedness == .signed) @compileError("cannot rotate signed integer"); const ar = @intCast(Log2Int(T), @mod(r, @typeInfo(T).Int.bits)); return x >> ar | x << (1 +% ~ar); }