# -*- test-case-name: twisted.web2.test.test_stream -*- """ The stream module provides a simple abstraction of streaming data. While Twisted already has some provisions for handling this in its Producer/Consumer model, the rather complex interactions between producer and consumer makes it difficult to implement something like the CompoundStream object. Thus, this API. The IStream interface is very simple. It consists of two methods: read, and close. The read method should either return some data, None if there is no data left to read, or a Deferred. Close frees up any underlying resources and causes read to return None forevermore. IByteStream adds a bit more to the API: 1) read is required to return objects conforming to the buffer interface. 2) .length, which may either an integer number of bytes remaining, or None if unknown 3) .split(position). Split takes a position, and splits the stream in two pieces, returning the two new streams. Using the original stream after calling split is not allowed. There are two builtin source stream classes: FileStream and MemoryStream. The first produces data from a file object, the second from a buffer in memory. Any number of these can be combined into one stream with the CompoundStream object. Then, to interface with other parts of Twisted, there are two transcievers: StreamProducer and ProducerStream. The first takes a stream and turns it into an IPushProducer, which will write to a consumer. The second is a consumer which is a stream, so that other producers can write to it. """ from __future__ import generators import copy, os, types, sys from zope.interface import Interface, Attribute, implements from twisted.internet.defer import Deferred from twisted.internet import interfaces as ti_interfaces, defer, reactor, protocol, error as ti_error from twisted.python import components, log from twisted.python.failure import Failure # Python 2.4.2 (only) has a broken mmap that leaks a fd every time you call it. if sys.version_info[0:3] != (2,4,2): try: import mmap except ImportError: mmap = None else: mmap = None ############################## #### Interfaces #### ############################## class IStream(Interface): """A stream of arbitrary data.""" def read(): """Read some data. Returns some object representing the data. If there is no more data available, returns None. Can also return a Deferred resulting in one of the above. Errors may be indicated by exception or by a Deferred of a Failure. """ def close(): """Prematurely close. Should also cause further reads to return None.""" class IByteStream(IStream): """A stream which is of bytes.""" length = Attribute("""How much data is in this stream. Can be None if unknown.""") def read(): """Read some data. Returns an object conforming to the buffer interface, or if there is no more data available, returns None. Can also return a Deferred resulting in one of the above. Errors may be indicated by exception or by a Deferred of a Failure. """ def split(point): """Split this stream into two, at byte position 'point'. Returns a tuple of (before, after). After calling split, no other methods should be called on this stream. Doing so will have undefined behavior. If you cannot implement split easily, you may implement it as:: return fallbackSplit(self, point) """ def close(): """Prematurely close this stream. Should also cause further reads to return None. Additionally, .length should be set to 0. """ class ISendfileableStream(Interface): def read(sendfile=False): """ Read some data. If sendfile == False, returns an object conforming to the buffer interface, or else a Deferred. If sendfile == True, returns either the above, or a SendfileBuffer. """ class SimpleStream(object): """Superclass of simple streams with a single buffer and a offset and length into that buffer.""" implements(IByteStream) length = None start = None def read(self): return None def close(self): self.length = 0 def split(self, point): if self.length is not None: if point > self.length: raise ValueError("split point (%d) > length (%d)" % (point, self.length)) b = copy.copy(self) self.length = point if b.length is not None: b.length -= point b.start += point return (self, b) ############################## #### FileStream #### ############################## # maximum mmap size MMAP_LIMIT = 4*1024*1024 # minimum mmap size MMAP_THRESHOLD = 8*1024 # maximum sendfile length SENDFILE_LIMIT = 16777216 # minimum sendfile size SENDFILE_THRESHOLD = 256 def mmapwrapper(*args, **kwargs): """ Python's mmap call sucks and ommitted the "offset" argument for no discernable reason. Replace this with a mmap module that has offset. """ offset = kwargs.get('offset', None) if offset in [None, 0]: if 'offset' in kwargs: del kwargs['offset'] else: raise mmap.error("mmap: Python sucks and does not support offset.") return mmap.mmap(*args, **kwargs) class FileStream(SimpleStream): implements(ISendfileableStream) """A stream that reads data from a file. File must be a normal file that supports seek, (e.g. not a pipe or device or socket).""" # 65K, minus some slack CHUNK_SIZE = 2 ** 2 ** 2 ** 2 - 32 f = None def __init__(self, f, start=0, length=None, useMMap=bool(mmap)): """ Create the stream from file f. If you specify start and length, use only that portion of the file. """ self.f = f self.start = start if length is None: self.length = os.fstat(f.fileno()).st_size else: self.length = length self.useMMap = useMMap def read(self, sendfile=False): if self.f is None: return None length = self.length if length == 0: self.f = None return None if sendfile and length > SENDFILE_THRESHOLD: # XXX: Yay using non-existent sendfile support! # FIXME: if we return a SendfileBuffer, and then sendfile # fails, then what? Or, what if file is too short? readSize = min(length, SENDFILE_LIMIT) res = SendfileBuffer(self.f, self.start, readSize) self.length -= readSize self.start += readSize return res if self.useMMap and length > MMAP_THRESHOLD: readSize = min(length, MMAP_LIMIT) try: res = mmapwrapper(self.f.fileno(), readSize, access=mmap.ACCESS_READ, offset=self.start) #madvise(res, MADV_SEQUENTIAL) self.length -= readSize self.start += readSize return res except mmap.error: pass # Fall back to standard read. readSize = min(length, self.CHUNK_SIZE) self.f.seek(self.start) b = self.f.read(readSize) bytesRead = len(b) if not bytesRead: raise RuntimeError("Ran out of data reading file %r, expected %d more bytes" % (self.f, length)) else: self.length -= bytesRead self.start += bytesRead return b def close(self): self.f = None SimpleStream.close(self) components.registerAdapter(FileStream, file, IByteStream) ############################## #### MemoryStream #### ############################## class MemoryStream(SimpleStream): """A stream that reads data from a buffer object.""" def __init__(self, mem, start=0, length=None): """ Create the stream from buffer object mem. If you specify start and length, use only that portion of the buffer. """ self.mem = mem self.start = start if length is None: self.length = len(mem) - start else: if len(mem) < length: raise ValueError("len(mem) < start + length") self.length = length def read(self): if self.mem is None: return None if self.length == 0: result = None else: result = buffer(self.mem, self.start, self.length) self.mem = None self.length = 0 return result def close(self): self.mem = None SimpleStream.close(self) components.registerAdapter(MemoryStream, str, IByteStream) components.registerAdapter(MemoryStream, types.BufferType, IByteStream) ############################## #### CompoundStream #### ############################## class CompoundStream(object): """A stream which is composed of many other streams. Call addStream to add substreams. """ implements(IByteStream, ISendfileableStream) deferred = None length = 0 def __init__(self, buckets=()): self.buckets = [IByteStream(s) for s in buckets] def addStream(self, bucket): """Add a stream to the output""" bucket = IByteStream(bucket) self.buckets.append(bucket) if self.length is not None: if bucket.length is None: self.length = None else: self.length += bucket.length def read(self, sendfile=False): if self.deferred is not None: raise RuntimeError("Call to read while read is already outstanding") if not self.buckets: return None if sendfile and ISendfileableStream.providedBy(self.buckets[0]): try: result = self.buckets[0].read(sendfile) except: return self._gotFailure(Failure()) else: try: result = self.buckets[0].read() except: return self._gotFailure(Failure()) if isinstance(result, Deferred): self.deferred = result result.addCallbacks(self._gotRead, self._gotFailure, (sendfile,)) return result return self._gotRead(result, sendfile) def _gotFailure(self, f): self.deferred = None del self.buckets[0] self.close() return f def _gotRead(self, result, sendfile): self.deferred = None if result is None: del self.buckets[0] # Next bucket return self.read(sendfile) if self.length is not None: self.length -= len(result) return result def split(self, point): num = 0 origPoint = point for bucket in self.buckets: num+=1 if point == 0: b = CompoundStream() b.buckets = self.buckets[num:] del self.buckets[num:] return self,b if bucket.length is None: # Indeterminate length bucket. # give up and use fallback splitter. return fallbackSplit(self, origPoint) if point < bucket.length: before,after = bucket.split(point) b = CompoundStream() b.buckets = self.buckets[num:] b.buckets[0] = after del self.buckets[num+1:] self.buckets[num] = before return self,b point -= bucket.length def close(self): for bucket in self.buckets: bucket.close() self.buckets = [] self.length = 0 ############################## #### readStream #### ############################## class _StreamReader(object): """Process a stream's data using callbacks for data and stream finish.""" def __init__(self, stream, gotDataCallback): self.stream = stream self.gotDataCallback = gotDataCallback self.result = Deferred() def run(self): # self.result may be del'd in _read() result = self.result self._read() return result def _read(self): try: result = self.stream.read() except: self._gotError(Failure()) return if isinstance(result, Deferred): result.addCallbacks(self._gotData, self._gotError) else: self._gotData(result) def _gotError(self, failure): result = self.result del self.result, self.gotDataCallback, self.stream result.errback(failure) def _gotData(self, data): if data is None: result = self.result del self.result, self.gotDataCallback, self.stream result.callback(None) return try: self.gotDataCallback(data) except: self._gotError(Failure()) return reactor.callLater(0, self._read) def readStream(stream, gotDataCallback): """Pass a stream's data to a callback. Returns Deferred which will be triggered on finish. Errors in reading the stream or in processing it will be returned via this Deferred. """ return _StreamReader(stream, gotDataCallback).run() def readAndDiscard(stream): """Read all the data from the given stream, and throw it out. Returns Deferred which will be triggered on finish. """ return readStream(stream, lambda _: None) def readIntoFile(stream, outFile): """Read a stream and write it into a file. Returns Deferred which will be triggered on finish. """ def done(_): outFile.close() return _ return readStream(stream, outFile.write).addBoth(done) def connectStream(inputStream, factory): """Connect a protocol constructed from a factory to stream. Returns an output stream from the protocol. The protocol's transport will have a finish() method it should call when done writing. """ # XXX deal better with addresses p = factory.buildProtocol(None) out = ProducerStream() out.disconnecting = False # XXX for LineReceiver suckage p.makeConnection(out) readStream(inputStream, lambda _: p.dataReceived(_)).addCallbacks( lambda _: p.connectionLost(ti_error.ConnectionDone()), lambda _: p.connectionLost(_)) return out ############################## #### fallbackSplit #### ############################## def fallbackSplit(stream, point): after = PostTruncaterStream(stream, point) before = TruncaterStream(stream, point, after) return (before, after) class TruncaterStream(object): def __init__(self, stream, point, postTruncater): self.stream = stream self.length = point self.postTruncater = postTruncater def read(self): if self.length == 0: if self.postTruncater is not None: postTruncater = self.postTruncater self.postTruncater = None postTruncater.sendInitialSegment(self.stream.read()) self.stream = None return None result = self.stream.read() if isinstance(result, Deferred): return result.addCallback(self._gotRead) else: return self._gotRead(result) def _gotRead(self, data): if data is None: raise ValueError("Ran out of data for a split of a indeterminate length source") if self.length >= len(data): self.length -= len(data) return data else: before = buffer(data, 0, self.length) after = buffer(data, self.length) self.length = 0 if self.postTruncater is not None: postTruncater = self.postTruncater self.postTruncater = None postTruncater.sendInitialSegment(after) self.stream = None return before def split(self, point): if point > self.length: raise ValueError("split point (%d) > length (%d)" % (point, self.length)) post = PostTruncaterStream(self.stream, point) trunc = TruncaterStream(post, self.length - point, self.postTruncater) self.length = point self.postTruncater = post return self, trunc def close(self): if self.postTruncater is not None: self.postTruncater.notifyClosed(self) else: # Nothing cares about the rest of the stream self.stream.close() self.stream = None self.length = 0 class PostTruncaterStream(object): deferred = None sentInitialSegment = False truncaterClosed = None closed = False length = None def __init__(self, stream, point): self.stream = stream self.deferred = Deferred() if stream.length is not None: self.length = stream.length - point def read(self): if not self.sentInitialSegment: self.sentInitialSegment = True if self.truncaterClosed is not None: readAndDiscard(self.truncaterClosed) self.truncaterClosed = None return self.deferred return self.stream.read() def split(self, point): return fallbackSplit(self, point) def close(self): self.closed = True if self.truncaterClosed is not None: # have first half close itself self.truncaterClosed.postTruncater = None self.truncaterClosed.close() elif self.sentInitialSegment: # first half already finished up self.stream.close() self.deferred = None # Callbacks from TruncaterStream def sendInitialSegment(self, data): if self.closed: # First half finished, we don't want data. self.stream.close() self.stream = None if self.deferred is not None: if isinstance(data, Deferred): data.chainDeferred(self.deferred) else: self.deferred.callback(data) def notifyClosed(self, truncater): if self.closed: # we are closed, have first half really close truncater.postTruncater = None truncater.close() elif self.sentInitialSegment: # We are trying to read, read up first half readAndDiscard(truncater) else: # Idle, store closed info. self.truncaterClosed = truncater ######################################## #### ProducerStream/StreamProducer #### ######################################## class ProducerStream(object): """Turns producers into a IByteStream. Thus, implements IConsumer and IByteStream.""" implements(IByteStream, ti_interfaces.IConsumer) length = None closed = False failed = False producer = None producerPaused = False deferred = None bufferSize = 5 def __init__(self, length=None): self.buffer = [] self.length = length # IByteStream implementation def read(self): if self.buffer: return self.buffer.pop(0) elif self.closed: self.length = 0 if self.failed: f = self.failure del self.failure return defer.fail(f) return None else: deferred = self.deferred = Deferred() if self.producer is not None and (not self.streamingProducer or self.producerPaused): self.producerPaused = False self.producer.resumeProducing() return deferred def split(self, point): return fallbackSplit(self, point) def close(self): """Called by reader of stream when it is done reading.""" self.buffer=[] self.closed = True if self.producer is not None: self.producer.stopProducing() self.producer = None self.deferred = None # IConsumer implementation def write(self, data): if self.closed: return if self.deferred: deferred = self.deferred self.deferred = None deferred.callback(data) else: self.buffer.append(data) if(self.producer is not None and self.streamingProducer and len(self.buffer) > self.bufferSize): self.producer.pauseProducing() self.producerPaused = True def finish(self, failure=None): """Called by producer when it is done. If the optional failure argument is passed a Failure instance, the stream will return it as errback on next Deferred. """ self.closed = True if not self.buffer: self.length = 0 if self.deferred is not None: deferred = self.deferred self.deferred = None if failure is not None: self.failed = True deferred.errback(failure) else: deferred.callback(None) else: if failure is not None: self.failed = True self.failure = failure def registerProducer(self, producer, streaming): if self.producer is not None: raise RuntimeError("Cannot register producer %s, because producer %s was never unregistered." % (producer, self.producer)) if self.closed: producer.stopProducing() else: self.producer = producer self.streamingProducer = streaming if not streaming: producer.resumeProducing() def unregisterProducer(self): self.producer = None class StreamProducer(object): """A push producer which gets its data by reading a stream.""" implements(ti_interfaces.IPushProducer) deferred = None finishedCallback = None paused = False consumer = None def __init__(self, stream, enforceStr=True): self.stream = stream self.enforceStr = enforceStr def beginProducing(self, consumer): if self.stream is None: return defer.succeed(None) self.consumer = consumer finishedCallback = self.finishedCallback = Deferred() self.consumer.registerProducer(self, True) self.resumeProducing() return finishedCallback def resumeProducing(self): self.paused = False if self.deferred is not None: return try: data = self.stream.read() except: self.stopProducing(Failure()) return if isinstance(data, Deferred): self.deferred = data.addCallbacks(self._doWrite, self.stopProducing) else: self._doWrite(data) def _doWrite(self, data): if self.consumer is None: return if data is None: # The end. if self.consumer is not None: self.consumer.unregisterProducer() if self.finishedCallback is not None: self.finishedCallback.callback(None) self.finishedCallback = self.deferred = self.consumer = self.stream = None return self.deferred = None if self.enforceStr: # XXX: sucks that we have to do this. make transport.write(buffer) work! data = str(buffer(data)) self.consumer.write(data) if not self.paused: self.resumeProducing() def pauseProducing(self): self.paused = True def stopProducing(self, failure=ti_error.ConnectionLost()): if self.consumer is not None: self.consumer.unregisterProducer() if self.finishedCallback is not None: if failure is not None: self.finishedCallback.errback(failure) else: self.finishedCallback.callback(None) self.finishedCallback = None self.paused = True if self.stream is not None: self.stream.close() self.finishedCallback = self.deferred = self.consumer = self.stream = None ############################## #### ProcessStreamer #### ############################## class _ProcessStreamerProtocol(protocol.ProcessProtocol): def __init__(self, inputStream, outStream, errStream): self.inputStream = inputStream self.outStream = outStream self.errStream = errStream self.resultDeferred = defer.Deferred() def connectionMade(self): p = StreamProducer(self.inputStream) # if the process stopped reading from the input stream, # this is not an error condition, so it oughtn't result # in a ConnectionLost() from the input stream: p.stopProducing = lambda err=None: StreamProducer.stopProducing(p, err) d = p.beginProducing(self.transport) d.addCallbacks(lambda _: self.transport.closeStdin(), self._inputError) def _inputError(self, f): log.msg("Error in input stream for %r" % self.transport) log.err(f) self.transport.closeStdin() def outReceived(self, data): self.outStream.write(data) def errReceived(self, data): self.errStream.write(data) def outConnectionLost(self): self.outStream.finish() def errConnectionLost(self): self.errStream.finish() def processEnded(self, reason): self.resultDeferred.errback(reason) del self.resultDeferred class ProcessStreamer(object): """Runs a process hooked up to streams. Requires an input stream, has attributes 'outStream' and 'errStream' for stdout and stderr. outStream and errStream are public attributes providing streams for stdout and stderr of the process. """ def __init__(self, inputStream, program, args, env={}): self.outStream = ProducerStream() self.errStream = ProducerStream() self._protocol = _ProcessStreamerProtocol(IByteStream(inputStream), self.outStream, self.errStream) self._program = program self._args = args self._env = env def run(self): """Run the process. Returns Deferred which will eventually have errback for non-clean (exit code > 0) exit, with ProcessTerminated, or callback with None on exit code 0. """ # XXX what happens if spawn fails? reactor.spawnProcess(self._protocol, self._program, self._args, env=self._env) del self._env return self._protocol.resultDeferred.addErrback(lambda _: _.trap(ti_error.ProcessDone)) ############################## #### generatorToStream #### ############################## class _StreamIterator(object): done=False def __iter__(self): return self def next(self): if self.done: raise StopIteration return self.value wait=object() class _IteratorStream(object): length = None def __init__(self, fun, stream, args, kwargs): self._stream=stream self._streamIterator = _StreamIterator() self._gen = fun(self._streamIterator, *args, **kwargs) def read(self): try: val = self._gen.next() except StopIteration: return None else: if val is _StreamIterator.wait: newdata = self._stream.read() if isinstance(newdata, defer.Deferred): return newdata.addCallback(self._gotRead) else: return self._gotRead(newdata) return val def _gotRead(self, data): if data is None: self._streamIterator.done=True else: self._streamIterator.value=data return self.read() def close(self): self._stream.close() del self._gen, self._stream, self._streamIterator def split(self): return fallbackSplit(self) def generatorToStream(fun): """Converts a generator function into a stream. The function should take an iterator as its first argument, which will be converted *from* a stream by this wrapper, and yield items which are turned *into* the results from the stream's 'read' call. One important point: before every call to input.next(), you *MUST* do a "yield input.wait" first. Yielding this magic value takes care of ensuring that the input is not a deferred before you see it. >>> from twisted.web2 import stream >>> from string import maketrans >>> alphabet = 'abcdefghijklmnopqrstuvwxyz' >>> >>> def encrypt(input, key): ... code = alphabet[key:] + alphabet[:key] ... translator = maketrans(alphabet+alphabet.upper(), code+code.upper()) ... yield input.wait ... for s in input: ... yield str(s).translate(translator) ... yield input.wait ... >>> encrypt = stream.generatorToStream(encrypt) >>> >>> plaintextStream = stream.MemoryStream('SampleSampleSample') >>> encryptedStream = encrypt(plaintextStream, 13) >>> encryptedStream.read() 'FnzcyrFnzcyrFnzcyr' >>> >>> plaintextStream = stream.MemoryStream('SampleSampleSample') >>> encryptedStream = encrypt(plaintextStream, 13) >>> evenMoreEncryptedStream = encrypt(encryptedStream, 13) >>> evenMoreEncryptedStream.read() 'SampleSampleSample' """ def generatorToStream_inner(stream, *args, **kwargs): return _IteratorStream(fun, stream, args, kwargs) return generatorToStream_inner ############################## #### BufferedStream #### ############################## class BufferedStream(object): """A stream which buffers its data to provide operations like readline and readExactly.""" data = "" def __init__(self, stream): self.stream = stream def _readUntil(self, f): """Internal helper function which repeatedly calls f each time after more data has been received, until it returns non-None.""" while True: r = f() if r is not None: yield r; return newdata = self.stream.read() if isinstance(newdata, defer.Deferred): newdata = defer.waitForDeferred(newdata) yield newdata; newdata = newdata.getResult() if newdata is None: # End Of File newdata = self.data self.data = '' yield newdata; return self.data += str(newdata) _readUntil = defer.deferredGenerator(_readUntil) def readExactly(self, size=None): """Read exactly size bytes of data, or, if size is None, read the entire stream into a string.""" if size is not None and size < 0: raise ValueError("readExactly: size cannot be negative: %s", size) def gotdata(): data = self.data if size is not None and len(data) >= size: pre,post = data[:size], data[size:] self.data = post return pre return self._readUntil(gotdata) def readline(self, delimiter='\r\n', size=None): """ Read a line of data from the string, bounded by delimiter. The delimiter is included in the return value. If size is specified, read and return at most that many bytes, even if the delimiter has not yet been reached. If the size limit falls within a delimiter, the rest of the delimiter, and the next line will be returned together. """ if size is not None and size < 0: raise ValueError("readline: size cannot be negative: %s" % (size, )) def gotdata(): data = self.data if size is not None: splitpoint = data.find(delimiter, 0, size) if splitpoint == -1: if len(data) >= size: splitpoint = size else: splitpoint += len(delimiter) else: splitpoint = data.find(delimiter) if splitpoint != -1: splitpoint += len(delimiter) if splitpoint != -1: pre = data[:splitpoint] self.data = data[splitpoint:] return pre return self._readUntil(gotdata) def pushback(self, pushed): """Push data back into the buffer.""" self.data = pushed + self.data def read(self): data = self.data if data: self.data = "" return data return self.stream.read() def _len(self): l = self.stream.length if l is None: return None return l + len(self.data) length = property(_len) def split(self, offset): off = offset - len(self.data) pre, post = self.stream.split(max(0, off)) pre = BufferedStream(pre) post = BufferedStream(post) if off < 0: pre.data = self.data[:-off] post.data = self.data[-off:] else: pre.data = self.data return pre, post def substream(stream, start, end): if start > end: raise ValueError("start position must be less than end position %r" % ((start, end),)) stream = stream.split(start)[1] return stream.split(end - start)[0] __all__ = ['IStream', 'IByteStream', 'FileStream', 'MemoryStream', 'CompoundStream', 'readAndDiscard', 'fallbackSplit', 'ProducerStream', 'StreamProducer', 'BufferedStream', 'readStream', 'ProcessStreamer', 'readIntoFile', 'generatorToStream']