----------------------------------------------------------------------------- -- -- Onions Network Streams Library -- -- O N I O N S . I N S T R E A M S -- -- B o d y -- -- Copyright (C) 1997-1998 Regents of the University of California -- -- Onions is free software; you can redistribute it and/or modify it under -- the terms of the GNU General Public License as published by the Free -- Software Foundation, with or without the single exception listed below; -- either version 2, or (at your option) any later version. Onions is -- distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; -- without even the implied warranty of MERCHANTABILITY or FITNESS FOR A -- PARTICULAR PURPOSE. See the GNU General Public License for more details. -- You should have received a copy of the GNU General Public License -- distributed with Onions; see the file COPYING. If not, write to the -- Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA -- 02111-1307, USA. -- -- As a special exception, if other files instantiate generics from this -- library, or you link this library with other files to produce an -- executable, this library does not by itself cause the resulting -- executable to be covered by the GNU General Public License. This -- exception does not however invalidate any other reasons why the -- executable file might be covered by the GNU General Public License. -- -- Created in 1997 by Roy T. Fielding ----------------------------------------------------------------------------- -- -- The Onions Input Streams class is described in onions-instreams.ads. -- -- This package defines an Input Stream with a null filter. -- The child packages File, Dir, and Channel implement streams for reading -- from files, directories, and TCP/IP sockets, respectively. -- Additional child packages can implement a filter simply by -- overriding the Process procedure for the derived stream object -- and duplicating the dispatched routines that call Process. -- -- Each stream object supports dual interfaces: the Ada.Streams interface -- that uses a caller-provided Stream_Element_Array to pass data, and our -- own Bucket interface that passes the data as a list of dynamically -- allocated Iovec structures. The Ada.Streams interface requires that -- the data be copied every time it is filtered or placed in a buffer. -- The Bucket interface minimizes copying and corresponds nicely to the -- more efficient C_readv system call. -- with Ada.Streams; use Ada.Streams; with Interfaces.C; with System.Storage_Elements; with Onions.Buckets; use Onions.Buckets; with Onions.Constants; use Onions.Constants; with Onions.OS; with Onions.Thin; with Unchecked_Conversion; with Unchecked_Deallocation; package body Onions.Instreams is use Onions.Buckets.Bucket_Queues; function Pipe_To_Stream_Ptr is new Unchecked_Conversion (Input_Pipe, Input_Stream_Ptr); ----------------------------------------------------- -- Input Pipe Classwide Manipulation Operations -- ----------------------------------------------------- -- Close and free a stream pipe. -- procedure Close (Pipe : in out Input_Pipe) is Head : Input_Stream_Ptr := Pipe_To_Stream_Ptr (Pipe); begin if Pipe /= null then Close (Pipe.all); -- dispatch to do object-specific close Free (Head); Pipe := null; end if; end Close; -- Abort_Stream should only be used if a stream is interrupted -- by the user, or an error occurs that makes the whole stream bad. -- It forces the stream closed and discards any buffered data. -- procedure Abort_Stream (Pipe : in out Input_Pipe) is Head : Input_Stream_Ptr := Pipe_To_Stream_Ptr (Pipe); begin if Pipe /= null then Abort_Stream (Pipe.all); -- dispatch to do object-specific abort Free (Head); Pipe := null; end if; end Abort_Stream; -- Reset is like Close, but resets the stream to the -- state it would be in if it was just created. -- procedure Reset (Pipe : in Input_Pipe) is begin Reset (Pipe.all); -- dispatch to do object-specific reset end Reset; -- Drain a stream pipe's read buffers. -- procedure Drain (Pipe : in Input_Pipe; ItemList : out Bucket_List) is begin Drain (Pipe.all, ItemList); -- dispatch to do object-specific drain end Drain; -- Push places the old Pipe outbound of Head and sets Pipe := Head -- procedure Push (Pipe : in out Input_Pipe; Head : in Input_Pipe) is Tail : Input_Pipe := Head; begin if Pipe /= null then while Tail.Outbound /= null loop Tail := Tail.Outbound; end loop; Tail.Outbound := Pipe; Set_Timeout (Pipe, Head.Timeout); end if; Pipe := Head; end Push; -- Pop unreads anything in the current Pipe head's buffers, -- moves that object to Head, sets Pipe to whatever is outbound of Head, -- and then disconnects Head from that outbound stream. -- procedure Pop (Pipe : in out Input_Pipe; Head : out Input_Pipe) is begin Unprocess (Pipe.all); Head := Pipe; Pipe := Pipe.Outbound; Head.Outbound := null; end Pop; --------------------------------------------- -- Input Pipe Classwide Input Operations -- --------------------------------------------- -- Read obtains a list of buckets from a Stream Pipe, -- usually as many as can be provided without blocking. -- Since the interface does not require data copying, it is fast. -- procedure Read (Pipe : in Input_Pipe; ItemList : out Bucket_List) is begin while Length (Pipe.Processed) = 0 loop Process (Pipe.all, False); end loop; Dequeue (Pipe.Processed, ItemList); end Read; -- Read_Line blocks until the stream head can return a complete line -- of input. A line is defined as the zero or more characters before -- the next End_of_Line (a CRLF sequence, bare LF, or bare CR) or -- End_of_File. The End_of_Line character(s) are discarded. -- procedure Read_Line (Pipe : in Input_Pipe; ItemList : out Bucket_List) is Complete_Line : Boolean := False; ReadList : Bucket_List; begin ItemList := null; loop while Length (Pipe.Processed) = 0 loop Process (Pipe.all, False); end loop; Dequeue (Pipe.Processed, ReadList); Dump_Line (ReadList, ItemList, Complete_Line); if not IsEmpty (ReadList) then Undequeue (Pipe.Processed, ReadList); end if; exit when Complete_Line; Process (Pipe.all, False); end loop; Chop (ItemList); exception when End_Error => if IsEmpty (ItemList) then raise End_Error; end if; end Read_Line; -- Unread allows the caller to return unprocessed buckets to this -- object's processed queue buffer, usually so that the caller -- can be replaced by some other processing routine that will do -- its own read for that data. -- procedure UnRead (Pipe : in Input_Pipe; ItemList : in Bucket_List) is begin Undequeue (Pipe.Processed, ItemList); end UnRead; procedure UnRead (Pipe : in Input_Pipe; Item : in Stream_Element_Array; Last : in Stream_Element_Offset) is begin Undequeue (Pipe.Processed, New_Bucket (Item, Last)); end UnRead; ---------------------------------------------- -- Input Pipe Classwide Status Operations -- ---------------------------------------------- -- Set_Timeout places a limit on the amount of time in milliseconds -- any atomic stream operation is allowed to block. This limit applies -- to the entire stream pipe, but is only likely to be used by the most -- outbound stream object. A value of 0 means never timeout. The call -- is propagated to the Outbound stream object. Get_Timeout retrieves -- the current stream timeout value in milliseconds. -- procedure Set_Timeout (Pipe : in Input_Pipe; Millisec : in Natural) is begin Pipe.Timeout := Millisec; if Pipe.Outbound /= null then Set_Timeout (Pipe.Outbound, Millisec); end if; end Set_Timeout; function Get_Timeout (Pipe : in Input_Pipe) return Natural is begin return Pipe.Timeout; end Get_Timeout; -- Get_Error can be called after an error exception has been raised -- to get the C errno or error string associated with the original error. -- The call is propagated to the Outbound stream object. -- function Get_Error (Pipe : in Input_Pipe) return C.int is begin if Pipe.Outbound = null then return Pipe.System_Error; else return Get_Error (Pipe.Outbound); end if; end Get_Error; function Get_Error (Pipe : in Input_Pipe) return String is begin return C.Strings.Value (Onions.Thin.C_strerror (Get_Error (Pipe))); end Get_Error; -- Bytes returns a count of the stream pipe's outbound interface, -- usually for diagnostic purposes. -- function Bytes (Pipe : in Input_Pipe) return Natural is begin if Pipe.Outbound = null then return Natural (Pipe.Byte_Count); else return Bytes (Pipe.Outbound); end if; end Bytes; -- Name returns a string containing a meaningful name for this pipe, -- usually obtained from the most outbound stream object (because that's -- where things like file, directory, or host names are stored). -- function Name (Pipe : in Input_Pipe) return String is begin return Name (Pipe.all); end Name; --------------------------------------------- -- Dispatching Stream Control Operations -- --------------------------------------------- procedure Dispose is new Unchecked_Deallocation (Input_Stream, Input_Stream_Ptr); -- Free the storage associated with a stream object. -- procedure Free (SP : in out Input_Stream_Ptr) is ItemList : Bucket_List; begin if SP /= null then if Length (SP.Unprocessed) > 0 then Dequeue (SP.Unprocessed, ItemList); Free (ItemList); end if; if Length (SP.Processed) > 0 then Dequeue (SP.Processed, ItemList); Free (ItemList); end if; Dispose (SP); end if; end Free; -- Close a stream object and propagate the close upstream. -- procedure Close (Stream : in out Input_Stream) is begin Close (Stream.Outbound); end Close; -- Abort_Stream should only be used if a stream is interrupted -- by the user, or an error occurs that makes the whole stream bad. -- It forces the stream closed without a flush. -- procedure Abort_Stream (Stream : in out Input_Stream) is ItemList : Bucket_List; begin if Length (Stream.Unprocessed) > 0 then Dequeue (Stream.Unprocessed, ItemList); Free (ItemList); end if; Abort_Stream (Stream.Outbound); end Abort_Stream; -- Reset is like Close, but resets the stream to the state -- it would be in if it was just created. It discards -- anything in its own buffers. -- procedure Reset (Stream : in out Input_Stream) is ItemList : Bucket_List; begin if Length (Stream.Unprocessed) > 0 then Dequeue (Stream.Unprocessed, ItemList); Free (ItemList); end if; if Length (Stream.Processed) > 0 then Dequeue (Stream.Processed, ItemList); Free (ItemList); end if; Stream.Byte_Count := 0; Stream.Timeout := 0; Stream.System_Error := 0; if Stream.Outbound /= null then Reset (Stream.Outbound); end if; end Reset; -- Drain the stream by reading once from outbound and processing any -- unprocessed data as if it were the end-of-stream. -- procedure Drain (Stream : in out Input_Stream; ItemList : out Bucket_List) is begin if Stream.Outbound /= null then Drain (Stream.Outbound, ItemList); Enqueue (Stream.Unprocessed, ItemList); end if; Process (Stream, True); Dequeue (Stream.Processed, ItemList); end Drain; -- Name returns a string containing a meaningful name for this stream, -- usually obtained from the most outbound stream object (because that's -- where things like file, directory, or host names are stored). -- function Name (Stream : in Input_Stream) return String is begin if Stream.Outbound /= null then return Name (Stream.Outbound); else return ""; -- The parent type has no name end if; end Name; ------------------------------------------ -- Ada.Streams Dispatching Operations -- ------------------------------------------ -- Read obtains stream elements from Stream and places them into -- the components of Item until either each component is filled or -- no elements remain on the stream. Last is set to the index of -- the last component of Item that was filled. This interface is -- defined by Ada.Streams for abstract stream operations. We won't -- use it much because it forces a full data copy when filtering. -- procedure Read (Stream : in out Input_Stream; Item : out Stream_Element_Array; Last : out Stream_Element_Offset) is ReadList : Bucket_List; begin Last := Item'First - 1; loop while Length (Stream.Processed) = 0 loop Process (Stream, False); end loop; Dequeue (Stream.Processed, ReadList); Dump_Into (ReadList, Item, Last); if not IsEmpty (ReadList) then Undequeue (Stream.Processed, ReadList); end if; exit when Last = Item'Last; end loop; exception when End_Error => if Last < Item'First then raise End_Error; end if; end Read; -- Write is defined by Ada.Streams for abstract stream operations. -- Raises Mode_Error for an Input Stream. -- procedure Write (Stream : in out Input_Stream; Item : in Stream_Element_Array) is begin raise Mode_Error; end Write; ---------------------------------- -- Data Processing Operations -- ---------------------------------- -- Process does the magic necessary to read from the upstream object -- and move the data from the Unprocessed read queue to the Processed -- read queue. In this case, there is no magic. If Everything, then -- process the entire Unprocessed buffer as if it were the end-of-stream. -- Raises End_Error if end-of-stream is encountered and nothing has been -- processed, or Status_Error if the outbound stream is not connected. -- procedure Process (Stream : in out Input_Stream; Everything : Boolean) is ItemList : Bucket_List; EOS : Boolean := Everything; begin if not EOS then if Stream.Outbound /= null then begin Read (Stream.Outbound, ItemList); -- Only the outermost object should update Byte_Count here. Enqueue (Stream.Unprocessed, ItemList); exception when End_Error => if Length (Stream.Unprocessed) = 0 then raise End_Error; else EOS := True; end if; end; else raise Status_Error; end if; end if; if Length (Stream.Unprocessed) > 0 then Dequeue (Stream.Unprocessed, ItemList); -- This is where ItemList would be filtered (entirely if EOS) Enqueue (Stream.Processed, ItemList); end if; end Process; -- Unprocess undoes the magic of Process and UnReads the data to -- the upstream object. -- procedure Unprocess (Stream : in out Input_Stream) is ItemList : Bucket_List; begin if Length (Stream.Processed) > 0 then Dequeue (Stream.Processed, ItemList); -- A filter would try to unfilter ItemList here (if possible) Undequeue (Stream.Unprocessed, ItemList); end if; if Length (Stream.Unprocessed) > 0 then Dequeue (Stream.Unprocessed, ItemList); if Stream.Outbound /= null then UnRead (Stream.Outbound, ItemList); else Free (ItemList); end if; end if; end Unprocess; ------------------------ -- Utility Routines -- ------------------------ use Onions.Thin; use System.Storage_Elements; use type C.int; -- Read_Vector will perform a system readv on an open descriptor, -- using the available read buffer list of length Buffer_Num, -- put the filled buckets into Filled_List, and tell us what -- Filled_Amt was read. If the descriptor is not ready for reading, -- it will wait up to Timeout milliseconds for data to arrive, -- or forever if Timeout = 0. If Filled_Amt = 0, then End-of-File -- has been reached. -- -- Raises Timeout_Exceeded if we have to wait longer than Timeout; -- Device_Error if anything else goes fatally wrong. -- procedure Read_Vector (Filedes : in Descriptor; Timeout : in Natural; Buffer_List : in out Bucket_List; Buffer_Num : in Natural; Filled_List : out Bucket_List; Filled_Amt : out System.Storage_Elements.Storage_Offset) is Io_Last : C.int := C.int'Min (IOV_MAX, C.int (Buffer_Num)) - 1; Io_Vector : Iov_Array (0 .. Io_Last); Iov : Iovec_Access; Iovcnt : C.int := 0; Node : Bucket_List := Buffer_List; Status : C.int; begin -- Set up the I/O vector to reference our list of buckets -- while Node /= null and Iovcnt <= Io_Last loop Iov := Io_Vector (Iovcnt)'Unchecked_Access; To_Iovec (Node.Data, Iov); Iovcnt := Iovcnt + 1; Node := Node.Next; end loop; Iov := Io_Vector (Io_Vector'First)'Unchecked_Access; -- Loop until we get a successful read, eof, or fatal error -- loop Filled_Amt := C_readv (Filedes, Iov, Iovcnt); if Filled_Amt > 0 then -- Success exit; elsif Filled_Amt = 0 then -- End-of-File return; elsif Filled_Amt < 0 then case OS.C_errno is when EINTR => null; when EAGAIN => Status := OS.Wait_For_Fd (OS.Fd_Readable, Filedes, Timeout); if Status = -1 then raise Device_Error; elsif Status = 0 then raise Timeout_Exceeded; end if; when others => raise Device_Error; end case; end if; end loop; -- Test for partial read and partition into Filled_List -- Filled_List := Buffer_List; Trim (Filled_Amt, Filled_List, Buffer_List); end Read_Vector; end Onions.Instreams;