Module: streams-internals Synopsis: Implementation of streams for streaming over sequences Author: Scott McKay, Marc Ferguson, Eliot Miranda Copyright: Original Code is Copyright (c) 1995-2004 Functional Objects, Inc. All rights reserved. License: Functional Objects Library Public License Version 1.0 Dual-license: GNU Lesser General Public License Warranty: Distributed WITHOUT WARRANTY OF ANY KIND /// Sequence stream classes define open primary class () slot stream-sequence :: = make(), init-keyword: contents:; slot stream-limit :: false-or() = #f; // slot initial-position :: = 0; // inherited from // slot current-position :: = 0; // inherited from // slot final-position :: = 0; // inherited from end class ; define open primary class () inherited slot stream-sequence = make(); end class ; define sealed class () inherited slot stream-sequence = make(); end class ; define sealed class () inherited slot stream-sequence = make(); end class ; /// Macros define macro with-output-to-string { with-output-to-string (?stream:name, #rest ?options:*) ?body:body end } => { let ?stream :: = make(, direction: #"output", ?options); ?body; stream-contents(?stream, clear-contents?: #f) } { with-output-to-string (?stream:name :: ?class:name, #rest ?options:*) ?body:body end } => { let ?stream :: ?class = make(?class, direction: #"output", ?options); ?body; stream-contents(?stream, clear-contents?: #f) } end macro with-output-to-string; /// Creating sequence streams define method make (class == , #rest initargs, #key contents, element-type) => (stream /* ---*** :: subclass() */) let type = type-for-sequence-stream(contents); if (type == class) next-method() else apply(make, type, initargs) end end method make; define method initialize (stream :: , #key direction, start: _start = 0, end: _end) => () next-method(); stream.initial-position := _start; stream.current-position := _start; stream.stream-limit := _end; if (direction == #"output") stream.final-position := _start else stream.final-position := size(stream-sequence(stream)) end end method initialize; define open generic type-for-sequence-stream (contents :: false-or()) => (sequence-stream-type :: ); define method type-for-sequence-stream (contents == #f) => (type :: singleton()) end method type-for-sequence-stream; define method type-for-sequence-stream (contents :: ) => (type :: singleton()) end method type-for-sequence-stream; define method type-for-sequence-stream (contents :: ) => (type :: singleton()) end method type-for-sequence-stream; define method type-for-sequence-stream (contents :: ) => (type :: singleton()) end method type-for-sequence-stream; define method type-for-sequence-stream (contents :: ) => (type :: singleton()) end method type-for-sequence-stream; /// Readable stream protocol define method read-element (stream :: , #key on-end-of-stream = unsupplied()) => (element :: ) ensure-readable(stream); let seq :: = stream-sequence(stream); let pos :: = stream.current-position; let limit :: = stream-limit(stream) | stream.final-position; if (pos < limit) let elt = seq[pos]; stream.current-position := pos + 1; elt else end-of-stream-value(stream, on-end-of-stream) end end method read-element; define method unread-element (stream :: , elt :: ) => (element :: ) ensure-readable(stream); let pos :: = stream.current-position; if (pos > 0) stream.current-position := pos - 1; let sequence = stream-sequence(stream); // Try not to upset immutable sequences if (sequence[pos - 1] ~== elt) sequence[pos - 1] := elt end end; elt end method unread-element; define method peek (stream :: , #key on-end-of-stream = unsupplied()) => (element :: ) ensure-readable(stream); let seq :: = stream-sequence(stream); let pos :: = stream.current-position; let limit :: = stream-limit(stream) | stream.final-position; if (pos < limit) seq[pos] else end-of-stream-value(stream, on-end-of-stream) end end method peek; define method read (stream :: , n :: , #key on-end-of-stream = unsupplied()) => (elements) ensure-readable(stream); let seq :: = stream-sequence(stream); let pos :: = stream.current-position; let src-n :: = (stream-limit(stream) | stream.final-position) - pos; if (n > src-n) if (unsupplied?(on-end-of-stream)) signal(make(, stream: stream, count: src-n, sequence: copy-sequence(seq, start: pos, end: pos + src-n))); end; n := src-n end; let elements = copy-sequence(seq, start: pos, end: pos + n); stream.current-position := pos + n; elements end method read; define method read-into! (stream :: , n :: , dst :: , #key start = 0, on-end-of-stream = unsupplied()) => (n-read) ensure-readable(stream); let seq :: = stream-sequence(stream); let pos :: = stream.current-position; let src-n :: = (stream-limit(stream) | stream.final-position) - pos; let dst-n :: = dst.size - start; let n-read :: = min(n, src-n, dst-n); copy-bytes(seq, pos, dst, start, n-read); stream.current-position := pos + n-read; if (n > src-n & dst-n > src-n & unsupplied?(on-end-of-stream)) signal(make(, stream: stream, count: n-read, sequence: copy-sequence(dst, start: start, end: start + n-read))) end; n-read end method read-into!; define method stream-input-available? (stream :: ) => (available? :: ) readable?(stream) & ~stream-at-end?(stream) end method stream-input-available?; /// Writable stream protocol define method write-element (stream :: , elt :: ) => () ensure-writable(stream); let seq :: = stream-sequence(stream); let pos :: = stream.current-position; // Grow the sequence if necessary if (pos >= seq.size) while (pos >= seq.size) seq := grow-for-stream(seq, pos + 1); stream-sequence(stream) := seq end end; // Insert the new element seq[pos] := elt; stream.current-position := pos + 1; if (pos >= stream.final-position) stream.final-position := pos + 1 end end method write-element; define method write (stream :: , src :: , #key start: start-index = 0, end: _end = #f) => () ensure-writable(stream); let dst :: = stream-sequence(stream); let pos :: = stream.current-position; let count :: = (_end | src.size) - start-index; let required-space = pos + count; while (dst.size < required-space) dst := grow-for-stream(dst, required-space); stream-sequence(stream) := dst end; copy-bytes(src, start-index, dst, pos, count); let new-pos = pos + count; stream.current-position := new-pos; if (new-pos > stream.final-position) stream.final-position := new-pos end end method write; define method grow-for-stream (seq :: , min-size :: ) => (new-seq :: ) let n :: = seq.size; let new-seq = make(object-class(seq), size: max(min-size, 2 * n)); copy-bytes(seq, 0, new-seq, 0, n); new-seq end method grow-for-stream; define method grow-for-stream (vec :: , min-size :: ) => (vec :: ) vec.size := min-size; vec end method grow-for-stream; /// Stream query and contents accessing define method stream-at-end? (stream :: ) => (at-end? :: ) if (write-only?(stream)) #f else let limit = (read-only?(stream) & stream-limit(stream)); stream.current-position >= (limit | stream.final-position) end end method stream-at-end?; define method stream-size (stream :: ) => (size :: ) (stream-limit(stream) | stream.final-position) - stream.initial-position end method stream-size; define method clear-contents (stream :: ) => () stream.current-position := 0; stream.final-position := stream-limit(stream) | 0 end method clear-contents; define method stream-contents (stream :: , #key clear-contents? = #t) => (contents :: ) let type = type-for-copy(stream-sequence(stream)); stream-contents-as(type, stream, clear-contents?: clear-contents?) end method stream-contents; define method stream-contents-as (type :: , stream :: , #key clear-contents? = #t) => (contents :: ) let _start = stream.initial-position; let _end = stream-limit(stream) | stream.final-position; let n = _end - _start; let result = make(type, size: n); copy-bytes(stream-sequence(stream), _start, result, 0, n); if (clear-contents?) clear-contents(stream) end; result end method stream-contents-as; define method stream-sequence-class (stream :: ) => (class /* ---*** subclass() */) type-for-copy(stream-sequence(stream)) end method stream-sequence-class; /// Seal some domains define sealed domain make (singleton()); define sealed domain initialize (); define sealed domain read-element (); define sealed domain unread-element (, ); define sealed domain peek (); define sealed domain read (, ); define sealed domain read-into! (, , ); define sealed domain stream-input-available? (); define sealed domain write-element (, ); define sealed domain write (, ); define sealed domain stream-at-end? (); define sealed domain stream-size (); define sealed domain clear-contents (); define sealed domain stream-contents (); define sealed domain stream-contents-as (, ); define sealed domain make (singleton()); define sealed domain initialize (); define sealed domain read-element (); define sealed domain unread-element (, ); define sealed domain peek (); define sealed domain read (, ); define sealed domain read-into! (, , ); define sealed domain stream-input-available? (); define sealed domain write-element (, ); define sealed domain write (, ); define sealed domain stream-at-end? (); define sealed domain stream-size (); define sealed domain clear-contents (); define sealed domain stream-contents (); define sealed domain stream-contents-as (, );