Module: streams-internals Synopsis: Support for asynchronous writes. Author: Seth LaForge Copyright: Original Code is Copyright (c) 1995-2004 Functional Objects, Inc. All rights reserved. License: Functional Objects Library Public License Version 1.0 Dual-license: GNU Lesser General Public License Warranty: Distributed WITHOUT WARRANTY OF ANY KIND // Number of reserve buffers in the buffer pool. // Should this perhaps be dynamic? // Note: I tried creating new buffers as necessary but that slowed // performance horribly - probably from additional allocation/GC costs. define constant $buffer-pool-size :: = 16; // A pending operation. Operations which require additional data should be // subclasses, e.g. . define class () // The function to call to perform the operation. Should accept a // for its first argument, and signal any errors. constant slot pending-operation :: , init-keyword: operation:; slot pending-status :: one-of(#f, #"in-queue", #"in-progress", #"complete") = #f; // The stream on which to perform (mostly for errors): constant slot pending-stream :: , init-keyword: stream:; // The accessor on which to perform: constant slot pending-accessor :: , init-keyword: accessor:; end class ; define class () // Offset within the file at which to write the data: constant slot pending-file-offset :: , init-keyword: file-offset:; // Buffer of data to write: constant slot pending-buffer :: , init-keyword: buffer:; slot pending-pool-buffer? :: = #f; // Number of bytes to write: constant slot pending-count :: , init-keyword: count:; // Offset within buffer to write: constant slot pending-buffer-offset :: , init-keyword: buffer-offset:; end class ; // A of s: define constant *pending-operations* :: = make(); define constant *pending-operations-lock* :: = make(); define constant *pending-operations-add-notification* :: = make(, lock: *pending-operations-lock*); define constant *pending-operations-remove-notification* :: = make(, lock: *pending-operations-lock*); // Any errors which have occurred, by accessor: define constant *async-error-table* :: = make(); define constant *async-error-lock* :: = make(); // The thread which actually performs the writes: define variable *writer-thread* :: false-or() = #f; define constant *writer-thread-lock* :: = make(); // A pool of buffers for asynchronous operations. Buffers are lazily created. define variable *buffer-pool* :: = make(, size: $buffer-pool-size); define constant *buffer-pool-lock* :: = make(); // Enqueue an operation. define function enqueue-operation (op :: ) => () if (~ *writer-thread*) with-lock (*writer-thread-lock*) if (~ *writer-thread*) *writer-thread* := make(, function: async-IO-handler, name: "asynchronous I/O handler"); end if; end with-lock; end if; with-lock (*pending-operations-lock*) push-last(*pending-operations*, op); op.pending-status := #"in-queue"; release-all(*pending-operations-add-notification*); end with-lock; end function enqueue-operation; // Enqueue a write operation, perhaps returning a fresh buffer from the pool. define function enqueue-write (op :: , return-fresh-buffer? :: ) => (fresh-buffer :: ) // format-out("STREAMS: enqueuing write, buffer-size == %d.\n", op.pending-buffer.size); let preferred-buffer-size = accessor-preferred-buffer-size(op.pending-stream.accessor); if (op.pending-buffer.size == preferred-buffer-size) // format-out("STREAMS: async write initiated.\n"); // Make this buffer be returned to the buffer pool on completion: op.pending-pool-buffer? := return-fresh-buffer?; // Enqueue: enqueue-operation(op); if (return-fresh-buffer?) // Wait for a free buffer and return it: with-lock (*buffer-pool-lock*) if (empty?(*buffer-pool*)) with-lock (*pending-operations-lock*) while (empty?(*buffer-pool*)) wait-for(*pending-operations-remove-notification*); end while; end with-lock; end if; let new-buffer = *buffer-pool*.head; *buffer-pool* := *buffer-pool*.tail; // If the buffer is #f, we need to create it: new-buffer | make- (known-power-of-two-size?: #t, size: preferred-buffer-size); end with-lock else op.pending-buffer end if else enqueue-operation(op); with-lock (*pending-operations-lock*) while (op.pending-status ~= #"complete") wait-for(*pending-operations-remove-notification*); end while; end with-lock; op.pending-buffer end if end function enqueue-write; // Wait for all writes on accessor which overlap the range given to complete. define function async-wait-for-overlapping-write-completion (accessor :: , offset :: , size :: ) => () local method overlap? (op :: ) => (r :: ) instance?(op, ) & op.pending-accessor == accessor & begin let low = max(offset, op.pending-file-offset); let high = min(offset + size, op.pending-file-offset + op.pending-count); high > low end end method overlap?; with-lock (*pending-operations-lock*) while (any?(overlap?, *pending-operations*)) wait-for(*pending-operations-remove-notification*); end while; end with-lock; end async-wait-for-overlapping-write-completion; // Wait for all operations on accessor to complete. define method async-wait-for-completion (accessor :: ) => () local method same? (op :: ) => (r :: ) op.pending-accessor == accessor end method same?; with-lock (*pending-operations-lock*) while (any?(same?, *pending-operations*)) wait-for(*pending-operations-remove-notification*); end while; end with-lock; end async-wait-for-completion; define function async-get-error (accessor :: ) => (err :: false-or()) with-lock (*async-error-lock*) element(*async-error-table*, accessor, default: #f) end with-lock end function async-get-error; // If there's an error reported for this accessor, throw it. define function async-check-for-errors (accessor :: ) => () let err = async-get-error(accessor); if (err) error(err); end if; end function async-check-for-errors; // The actual async operation handler - runs in its own thread, and performs // queued operations. define function get-op () => (r :: ) with-lock (*pending-operations-lock*) while (*pending-operations*.empty?) wait-for(*pending-operations-add-notification*); end while; let op :: = *pending-operations*.first; op.pending-status := #"in-progress"; op end with-lock; end function get-op; define function async-IO-handler () => () while (#t) // Get an operation: let operation :: = get-op(); // Perform operation: block () operation.pending-operation(operation); exception (e :: ) with-lock (*async-error-lock*) if (~ element(*async-error-table*, operation.pending-accessor, default: #f)) *async-error-table*[operation.pending-accessor] := e; end if; end with-lock; end block; async-post-operation(operation); // Remove operation from queue: with-lock (*pending-operations-lock*) operation.pending-status := #"complete"; pop(*pending-operations*); release-all(*pending-operations-remove-notification*); end with-lock; end while; end function async-IO-handler; // async-post-operation gets called after an operation has completed (even if // there was an error). One good use is to return buffers to the buffer pool. define generic async-post-operation (operation :: ) => (); define method async-post-operation (operation :: ) => () end method async-post-operation; define method async-post-operation (operation :: ) => () if (operation.pending-pool-buffer?) // Return the buffer to the pool. No lock necessary, since this is an // atomic operation. *buffer-pool* := pair(operation.pending-buffer, *buffer-pool*); end if; end method async-post-operation;