----------------------------------------------------------------------------- -- -- Onions Network Streams Library -- -- O N I O N S . O U T S T R E A M S -- -- B o d y -- -- Copyright (C) 1997-1998 Regents of the University of California -- -- Onions is free software; you can redistribute it and/or modify it under -- the terms of the GNU General Public License as published by the Free -- Software Foundation, with or without the single exception listed below; -- either version 2, or (at your option) any later version. Onions is -- distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; -- without even the implied warranty of MERCHANTABILITY or FITNESS FOR A -- PARTICULAR PURPOSE. See the GNU General Public License for more details. -- You should have received a copy of the GNU General Public License -- distributed with Onions; see the file COPYING. If not, write to the -- Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA -- 02111-1307, USA. -- -- As a special exception, if other files instantiate generics from this -- library, or you link this library with other files to produce an -- executable, this library does not by itself cause the resulting -- executable to be covered by the GNU General Public License. This -- exception does not however invalidate any other reasons why the -- executable file might be covered by the GNU General Public License. -- -- Created in 1997 by Roy T. Fielding ----------------------------------------------------------------------------- -- -- The Onions Output Streams class is described in onions-outstreams.ads. -- -- This package defines an Output Stream with a null filter. -- The child packages File and Channel implement streams for writing to -- files and TCP/IP sockets, respectively. -- Additional child packages can implement a filter simply by -- overriding the Process procedure for the derived stream object -- and duplicating the dispatched routines that call Process. -- -- Each stream object supports dual interfaces: the Ada.Streams interface -- that uses a caller-provided Stream_Element_Array to pass data, and our -- own Bucket interface that passes the data as a list of dynamically -- allocated Iovec structures. The Ada.Streams interface requires that -- the data be copied every time it is filtered or placed in a buffer. -- The Bucket interface minimizes copying and corresponds nicely to the -- more efficient C_writev system call. -- with Ada.Streams; with Interfaces.C; with System.Storage_Elements; with Onions.Buckets; use Onions.Buckets; with Onions.Constants; use Onions.Constants; with Onions.OS; with Onions.Thin; with Unchecked_Conversion; with Unchecked_Deallocation; package body Onions.Outstreams is use Onions.Buckets.Bucket_Queues; function Pipe_To_Stream_Ptr is new Unchecked_Conversion (Output_Pipe, Output_Stream_Ptr); ----------------------------------------------------- -- Output Pipe Classwide Manipulation Operations -- ----------------------------------------------------- -- Close and free a stream pipe after flushing its write buffers. -- procedure Close (Pipe : in out Output_Pipe) is Head : Output_Stream_Ptr := Pipe_To_Stream_Ptr (Pipe); begin if Pipe /= null then Close (Pipe.all); -- dispatch to do object-specific close Pipe := null; Free (Head); end if; end Close; -- Abort_Stream should only be used if a stream is interrupted -- by the user, or an error occurs that makes the whole stream bad. -- It forces the stream closed without a flush. -- procedure Abort_Stream (Pipe : in out Output_Pipe) is Head : Output_Stream_Ptr := Pipe_To_Stream_Ptr (Pipe); begin if Pipe /= null then Abort_Stream (Pipe.all); -- dispatch to do object-specific abort Pipe := null; Free (Head); end if; end Abort_Stream; -- Reset is like Close, but resets the stream to the -- state it would be in if it was just created. -- procedure Reset (Pipe : in Output_Pipe) is begin Reset (Pipe.all); -- dispatch to do object-specific reset end Reset; -- Flush a stream pipe's write buffers. -- procedure Flush (Pipe : in Output_Pipe) is begin Flush (Pipe.all); -- dispatch to do object-specific flush end Flush; -- Push places the old Pipe outbound of Head and sets Pipe := Head -- procedure Push (Pipe : in out Output_Pipe; Head : in Output_Pipe) is Tail : Output_Pipe := Head; begin if Pipe /= null then while Tail.Outbound /= null loop Tail := Tail.Outbound; end loop; Tail.Outbound := Pipe; Set_Timeout (Pipe, Head.Timeout); end if; Pipe := Head; end Push; -- Pop flushes anything in the current Pipe head's buffers, -- moves that object to Head, sets Pipe to whatever is outbound of Head, -- and then disconnects Head from that outbound stream. -- procedure Pop (Pipe : in out Output_Pipe; Head : out Output_Pipe) is begin Flush (Pipe.all); Head := Pipe; Pipe := Pipe.Outbound; Head.Outbound := null; end Pop; ----------------------------------------------- -- Output Pipe Classwide Output Operations -- ----------------------------------------------- -- Write places a list of buckets onto Stream in order. -- This corresponds nicely with C_writev. -- procedure Write (Pipe : in Output_Pipe; ItemList : in Bucket_List) is begin Enqueue (Pipe.Unprocessed, ItemList); Process (Pipe.all, False); end Write; procedure Write (Pipe : in Output_Pipe; Item : in Bucket) is begin Enqueue (Pipe.Unprocessed, Item); Process (Pipe.all, False); end Write; procedure Write (Pipe : in Output_Pipe; Item : in String) is begin Write (Pipe, New_Bucket (Item)); end Write; ----------------------------------------------- -- Output Pipe Classwide Status Operations -- ----------------------------------------------- -- Set_Timeout places a limit on the amount of time in milliseconds -- any atomic stream operation is allowed to block. This limit applies -- to the entire stream pipe, but is only likely to be used by the most -- outbound stream object. A value of 0 means never timeout. The call -- is propagated to the Outbound stream object. Get_Timeout retrieves -- the current stream timeout value in milliseconds. -- procedure Set_Timeout (Pipe : in Output_Pipe; Millisec : in Natural) is begin Pipe.Timeout := Millisec; if Pipe.Outbound /= null then Set_Timeout (Pipe.Outbound, Millisec); end if; end Set_Timeout; function Get_Timeout (Pipe : in Output_Pipe) return Natural is begin return Pipe.Timeout; end Get_Timeout; -- Get_Error can be called after an error exception has been raised -- to get the C errno or error string associated with the original error. -- The call is propagated to the Outbound stream object. -- function Get_Error (Pipe : in Output_Pipe) return C.int is begin if Pipe.Outbound = null then return Pipe.System_Error; else return Get_Error (Pipe.Outbound); end if; end Get_Error; function Get_Error (Pipe : in Output_Pipe) return String is begin return C.Strings.Value (Onions.Thin.C_strerror (Get_Error (Pipe))); end Get_Error; -- Bytes returns a count of the stream pipe's outbound interface, -- usually for diagnostic purposes. -- function Bytes (Pipe : in Output_Pipe) return Natural is begin if Pipe.Outbound = null then return Natural (Pipe.Byte_Count); else return Bytes (Pipe.Outbound); end if; end Bytes; -- Get a stream pipe's write buffer size for max blocks. -- function Get_Max_Buffer_Blocksize (Pipe : in Output_Pipe) return Natural is begin return Get_Max_Buffer_Blocksize (Pipe.all); end Get_Max_Buffer_Blocksize; -- Set a stream pipe's write buffer size for max blocks. -- A Num_Blocks of 0 will set it to unbuffered. -- procedure Set_Max_Buffer_Blocksize (Pipe : in Output_Pipe; Num_Blocks : in Natural) is begin Set_Max_Buffer_Blocksize (Pipe.all, Num_Blocks); end Set_Max_Buffer_Blocksize; -- Name returns a string containing a meaningful name for this pipe, -- usually obtained from the most outbound stream object (because that's -- where things like file, directory, or host names are stored). -- function Name (Pipe : in Output_Pipe) return String is begin return Name (Pipe.all); end Name; --------------------------------------------- -- Dispatching Stream Control Operations -- --------------------------------------------- procedure Dispose is new Unchecked_Deallocation (Output_Stream, Output_Stream_Ptr); -- Free the storage associated with a stream object. -- procedure Free (SP : in out Output_Stream_Ptr) is ItemList : Bucket_List; begin if SP /= null then if Length (SP.Unprocessed) > 0 then Dequeue (SP.Unprocessed, ItemList); Free (ItemList); end if; Dispose (SP); end if; end Free; -- Close a stream object (flushes the buffers) and -- propagate the close downstream. -- procedure Close (Stream : in out Output_Stream) is begin Flush (Stream); Close (Stream.Outbound); end Close; -- Abort_Stream should only be used if a stream is interrupted -- by the user, or an error occurs that makes the whole stream bad. -- It forces the stream closed without a flush. -- procedure Abort_Stream (Stream : in out Output_Stream) is ItemList : Bucket_List; begin if Length (Stream.Unprocessed) > 0 then Dequeue (Stream.Unprocessed, ItemList); Free (ItemList); end if; Abort_Stream (Stream.Outbound); end Abort_Stream; -- Reset is like Close, but resets the stream to the state -- it would be in if it was just created. It flushes -- anything in its own buffers. -- procedure Reset (Stream : in out Output_Stream) is begin Flush (Stream); Stream.Byte_Count := 0; Stream.Timeout := 0; Stream.System_Error := 0; if Stream.Outbound /= null then Reset (Stream.Outbound); end if; end Reset; -- The Flush method tells the stream to send any outbound buffered data -- downstream, but without taking down the stream. The stream decides -- whether or not it has buffered data to write. -- procedure Flush (Stream : in out Output_Stream) is begin Process (Stream, True); if Stream.Outbound /= null then Flush (Stream.Outbound); end if; end Flush; -- Get a stream's write buffer size for max blocks. -- function Get_Max_Buffer_Blocksize (Stream : in Output_Stream) return Natural is begin if Stream.Outbound = null then return 0; else return Get_Max_Buffer_Blocksize (Stream.Outbound); end if; end Get_Max_Buffer_Blocksize; -- Set a stream's write buffer size for max blocks. -- A Num_Blocks of 0 will set it to unbuffered. -- procedure Set_Max_Buffer_Blocksize (Stream : in out Output_Stream; Num_Blocks : in Natural) is begin if Stream.Outbound /= null then Set_Max_Buffer_Blocksize (Stream.Outbound, Num_Blocks); end if; end Set_Max_Buffer_Blocksize; -- Name returns a string containing a meaningful name for this stream, -- usually obtained from the most outbound stream object (because that's -- where things like file, directory, or host names are stored). -- function Name (Stream : in Output_Stream) return String is begin if Stream.Outbound /= null then return Name (Stream.Outbound); else return ""; -- The parent type has no name end if; end Name; ------------------------------------------ -- Ada.Streams Dispatching Operations -- ------------------------------------------ -- Read is defined by Ada.Streams for abstract stream operations. -- Raises Mode_Error for an Output Stream. -- procedure Read (Stream : in out Output_Stream; Item : out Ada.Streams.Stream_Element_Array; Last : out Ada.Streams.Stream_Element_Offset) is begin raise Mode_Error; end Read; -- Write places each of the elements of Item into Stream in order. This -- interface is defined by Ada.Streams for abstract stream operations. -- We won't use it much because it forces a full data copy when filtering. -- procedure Write (Stream : in out Output_Stream; Item : in Ada.Streams.Stream_Element_Array) is begin Enqueue (Stream.Unprocessed, New_Bucket (Item)); Process (Stream, False); end Write; ---------------------------------- -- Data Processing Operations -- ---------------------------------- -- Process does the magic necessary to move the input data from -- the Unprocessed write queue to the downstream object. -- In this case, there is no magic. If Everything, then process -- the entire Unprocessed buffer as if it were the end-of-stream. -- procedure Process (Stream : in out Output_Stream; Everything : Boolean) is ItemList : Bucket_List; begin if Length (Stream.Unprocessed) > 0 then Dequeue (Stream.Unprocessed, ItemList); if Stream.Outbound /= null then -- Only the outermost object should update Byte_Count here. Write (Stream.Outbound, ItemList); else Free (ItemList); raise Status_Error; end if; end if; end Process; ------------------------ -- Utility Routines -- ------------------------ use Onions.Thin; use System.Storage_Elements; use type C.int; -- Write_Vector will perform a system writev on an open descriptor, -- from the write Buffer_List of length Buffer_Num, containing a total -- Buffer_Bytes. If the descriptor is not ready for writing, -- it will wait up to Timeout milliseconds for it to become ready, -- or forever if Timeout = 0. -- -- Raises Timeout_Exceeded if we have to wait longer than Timeout; -- Device_Error if anything else goes fatally wrong. -- procedure Write_Vector (Filedes : in Descriptor; Timeout : in Natural; Buffer_List : in Bucket_List; Buffer_Num : in Natural; Buffer_Bytes : in Storage_Count) is Io_Vector : Iov_Array (0 .. C.int (Buffer_Num - 1)); Iov : Iovec_Access; Iovcnt : C.int := 0; Node : Bucket_List := Buffer_List; Remaining : Storage_Count := Buffer_Bytes; Status : Storage_Offset; Pos : C.int; Fds : C.int; begin -- Set up the I/O vector to reference our list of buckets -- while Node /= null and Iovcnt < C.int (Buffer_Num) loop Iov := Io_Vector (Iovcnt)'Unchecked_Access; To_Iovec (Node.Data, Iov); Iovcnt := Iovcnt + 1; Node := Node.Next; end loop; Pos := Io_Vector'First; Iov := Io_Vector (Pos)'Unchecked_Access; -- Loop while there is still data to write and no fatal errors -- loop if Iovcnt > IOV_MAX then Iovcnt := IOV_MAX; end if; Status := C_writev (Filedes, Iov, Iovcnt); if Status > 0 then -- Success if Status = Remaining then return; end if; Remaining := Remaining - Status; while Status > 0 loop if Status >= Iov.Iov_Len then Status := Status - Iov.Iov_Len; Pos := Pos + 1; if Pos > Io_Vector'Last then -- should never happen return; end if; Iov := Io_Vector (Pos)'Unchecked_Access; else Iov.Iov_Base := Iov.Iov_Base + Status; Iov.Iov_Len := Iov.Iov_Len - Status; end if; end loop; Iovcnt := Io_Vector'Last - Pos + 1; elsif Status = 0 then Fds := OS.Wait_For_Fd (OS.Fd_Writable, Filedes, Timeout); if Fds = -1 then raise Device_Error; elsif Fds = 0 then raise Timeout_Exceeded; end if; elsif Status < 0 then case OS.C_errno is when EINTR => null; when EAGAIN => Fds := OS.Wait_For_Fd (OS.Fd_Writable, Filedes, Timeout); if Fds = -1 then raise Device_Error; elsif Fds = 0 then raise Timeout_Exceeded; end if; when others => raise Device_Error; end case; end if; end loop; end Write_Vector; end Onions.Outstreams;