From 5725565f2401316339f46a3bb81a3b2bd6ad5528 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 6 Sep 2024 17:58:59 -0300 Subject: [PATCH 01/35] http: Compressed response example in Python --- http/get_compressed/README.md | 67 +++ .../python/client/make_requests.sh | 30 ++ http/get_compressed/python/server/server.py | 437 ++++++++++++++++++ 3 files changed, 534 insertions(+) create mode 100755 http/get_compressed/python/client/make_requests.sh create mode 100644 http/get_compressed/python/server/server.py diff --git a/http/get_compressed/README.md b/http/get_compressed/README.md index dde6e20..99259cc 100644 --- a/http/get_compressed/README.md +++ b/http/get_compressed/README.md @@ -20,3 +20,70 @@ # HTTP GET Arrow Data: Compression Examples This directory contains examples of HTTP servers/clients that transmit/receive data in the Arrow IPC streaming format and use compression (in various ways) to reduce the size of the transmitted data. + +## HTTP/1.1 Response Compression + +HTTP/1.1 offers an elaborate way for clients to specify their preferred +content encoding (read compression algorithm) using the `Accept-Encoding` +header.[^1] + +At least the Python server (in `python/`) implements a fully compliant +parser for the `Accept-Encoding` header. Application servers may choose +to implement a simpler check of the `Accept-Encoding` header or assume +that the client accepts the chosen compression scheme when talking +to that server. + +Here is an example of a header that a client may send and what it means: + + Accept-Encoding: zstd;q=1.0, gzip;q=0.5, br;q=0.8, identity;q=0 + +This header says that the client prefers that the server compress the +response with `zstd`, but if that is not possible, then `brotli` and `gzip` +are acceptable (in that order because 0.8 is greater than 0.5). The client +does not want the response to be uncompressed. This is communicated by +`"identity"` being listed with `q=0`. + +To tell the server the client only accepts `zstd` responses and nothing +else, not even uncompressed responses, the client would send: + + Accept-Encoding: zstd, *;q=0 + +RFC 2616[^1] specifies the rules for how a server should interpret the +`Accept-Encoding` header: + + A server tests whether a content-coding is acceptable, according to + an Accept-Encoding field, using these rules: + + 1. If the content-coding is one of the content-codings listed in + the Accept-Encoding field, then it is acceptable, unless it is + accompanied by a qvalue of 0. (As defined in section 3.9, a + qvalue of 0 means "not acceptable.") + + 2. The special "*" symbol in an Accept-Encoding field matches any + available content-coding not explicitly listed in the header + field. + + 3. If multiple content-codings are acceptable, then the acceptable + content-coding with the highest non-zero qvalue is preferred. + + 4. The "identity" content-coding is always acceptable, unless + specifically refused because the Accept-Encoding field includes + "identity;q=0", or because the field includes "*;q=0" and does + not explicitly include the "identity" content-coding. If the + Accept-Encoding field-value is empty, then only the "identity" + encoding is acceptable. + +If you're targeting web browsers, check the compatibility table of [compression +algorithms on MDN Web Docs][^2]. + +Another important rule is that if the server compresses the response, it +must include a `Content-Encoding` header in the response. + + If the content-coding of an entity is not "identity", then the + response MUST include a Content-Encoding entity-header (section + 14.11) that lists the non-identity content-coding(s) used. + + +[^1]: [Fielding, R. et al. (1999). HTTP/1.1. RFC 2616, Section 14.3 Accept-Encoding.](https://www.rfc-editor.org/rfc/rfc2616#section-14.3) +[^2]: [MDN Web Docs: Accept-Encoding](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding#browser_compatibility) + diff --git a/http/get_compressed/python/client/make_requests.sh b/http/get_compressed/python/client/make_requests.sh new file mode 100755 index 0000000..0679a18 --- /dev/null +++ b/http/get_compressed/python/client/make_requests.sh @@ -0,0 +1,30 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +OUT=output.arrows +URI="http://localhost:8008" +CURL="curl --verbose" + +# HTTP/1.0 means that response is not chunked and not compressed... +$CURL --http1.0 -o $OUT $URI +# ...but it may be compressed with an explicitly set Accept-Encoding +# header +# $CURL --http1.0 -H "Accept-Encoding: gzip" -o $OUT $URI +$CURL --http1.0 -H "Accept-Encoding: zstd" -o $OUT.zstd $URI +$CURL --http1.0 -H "Accept-Encoding: br" -o $OUT.brotli $URI diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py new file mode 100644 index 0000000..f487442 --- /dev/null +++ b/http/get_compressed/python/server/server.py @@ -0,0 +1,437 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from random import choice, randint +from http.server import BaseHTTPRequestHandler, HTTPServer +import io +import pyarrow as pa +import re +import socketserver +import string + +# HTTP/1.1: Use chunked responses for HTTP/1.1 requests. +USE_CHUNKED_HTTP1_1_ENCODING = True +# HTTP/1.0: put entire response in a buffer, set the Content-Length header, +# and send it all at once. If False, stream the response into the socket. +BUFFER_HTTP1_0_CONTENT = False + + +def random_string(alphabet, length): + return "".join(choice(alphabet) for _ in range(length)) + + +def random_name(initial): + length = randint(3, 7) + return initial + random_string(string.ascii_lowercase, length) + + +def example_tickers(num_tickers): + tickers = [] + while len(tickers) < num_tickers: + length = randint(3, 4) + random_ticker = random_string(string.ascii_uppercase, length) + if random_ticker not in tickers: + tickers.append(random_ticker) + return tickers + + +the_schema = pa.schema( + [ + ("ticker", pa.utf8()), + ("price", pa.int64()), + ("volume", pa.int64()), + ] +) + + +def example_batch(tickers, length): + data = {"ticker": [], "price": [], "volume": []} + for _ in range(length): + data["ticker"].append(choice(tickers)) + data["price"].append(randint(1, 1000) * 100) + data["volume"].append(randint(1, 10000)) + + return pa.RecordBatch.from_pydict(data, the_schema) + + +def example_batches(tickers): + # these parameters are chosen to generate a response + # of ~1 GB and chunks of ~140 KB (uncompressed) + total_records = 42_000_000 + batch_len = 6 * 1024 + # all the batches sent are random slices of the larger base batch + base_batch = example_batch(tickers, length=8 * batch_len) + batches = [] + records = 0 + while records < total_records: + length = min(batch_len, total_records - records) + offset = randint(0, base_batch.num_rows - length - 1) + batch = base_batch.slice(offset, length) + batches.append(batch) + records += length + return batches + + +# end of example data generation + +# [L]inear [W]hite [S]pace pattern (HTTP/1.1 - RFC 2616) +LWS_RE = "(?:[ \\t]|\\r\\n[ \\t]+)*" +CONTENT_CODING_PATTERN = re.compile( + r"%s(?:([A-Za-z]+|\*)%s(?:;%sq%s=(%s[01]+(?:\.\d{1,3})?))?%s)?" + % (LWS_RE, LWS_RE, LWS_RE, LWS_RE, LWS_RE, LWS_RE) +) + + +def parse_accept_encoding(s): + """ + Parse the Accept-Encoding request header value. + + Returns + ------- + list of (str, float|None) + The list of lowercase codings (or "*") and their qvalues in the order + they appear in the header. The qvalue is None if not specified. + """ + pieces = s.split(",") + accepted = [] + for piece in pieces: + m = re.fullmatch(CONTENT_CODING_PATTERN, piece) + if m is None: + raise ValueError(f"Malformed Accept-Encoding header: {s!r}") + if m.group(1) is None: # whitespace is skipped + continue + coding = m.group(1).lower() + qvalue = m.group(2) + pair = coding, float(qvalue) if qvalue is not None else None + accepted.append(pair) + return accepted + + +def check_parser(s, expected): + try: + parsed = parse_accept_encoding(s) + # print("parsed:", parsed, "\nexpected:", expected) + assert parsed == expected + except ValueError as e: + print(e) + + +check_parser("", []) +expected = [("gzip", None), ("zstd", 1.0), ("*", None)] +check_parser("gzip, zstd;q=1.0, *", expected) +check_parser("gzip , zstd; q= 1.0 , *", expected) +expected = [("gzip", None), ("zstd", 1.0), ("*", 0.0)] +check_parser("gzip , zstd; q= 1.0 \t \r\n ,*;q =0", expected) +expected = [("zstd", 1.0), ("gzip", 0.5), ("br", 0.8), ("identity", 0.0)] +check_parser("zstd;q=1.0, gzip;q=0.5, br;q=0.8, identity;q=0", expected) + + +def pick_coding(accept_encoding_header, available): + """ + Pick the content-coding that the server should use to compress the response. + + Parameters + ---------- + accept_encoding_header : str + The value of the Accept-Encoding header from an HTTP request. + available : list of str + The content-codings that the server can provide in the order preferred + by the server. + + Returns + ------- + str + The content-coding that the server should use to compress the response. + "identity" is returned if no acceptable content-coding is found in the + list of available codings. + + None if the client does not accept any of the available content-codings + and doesn't accept "identity" (uncompressed) either. In this case, + a "406 Not Acceptable" response should be sent. + """ + accepted = parse_accept_encoding(accept_encoding_header) + + def value_or(value, default): + return default if value is None else value + + if "identity" not in available: + available = available + ["identity"] + state = {} + for coding, qvalue in accepted: + qvalue = value_or(qvalue, 0.0001 if coding == "identity" else 1.0) + if coding == "*": + for coding in available: + if coding not in state: + state[coding] = qvalue + elif coding in available: + state[coding] = qvalue + # "identity" is always acceptable unless explicitly refused (;q=0) + if "identity" not in state: + state["identity"] = 0.0001 + # all the candidate codings are now in the state dictionary and we + # have to consider only the ones that have the maximum qvalue + max_qvalue = max(state.values()) + if max_qvalue == 0.0: + return None + for coding in available: + if coding in state and state[coding] == max_qvalue: + return coding + return None + + +def check_picker(header, expected): + available = ["zstd", "gzip"] # no "br" an no "deflate" + chosen = pick_coding(header, available) + # print("Accept-Encoding:", header, "\nexpected:", expected, "\t\tchosen:", chosen) + assert chosen == expected + + +check_picker("gzip, zstd;q=1.0, *", "zstd") +check_picker("gzip , zstd; q= 1.0 , *", "zstd") +check_picker("gzip , zstd; q= 1.0 \t \r\n ,*;q =0", "zstd") +check_picker("zstd;q=1.0, gzip;q=0.5, br;q=0.8, identity;q=0", "zstd") + +check_picker("compress, gzip", "gzip") +check_picker("", "identity") +check_picker("*", "zstd") +check_picker("compress;q=0.5, gzip;q=1.0", "gzip") +check_picker("gzip;q=1.0, identity; q=0.5, *;q=0", "gzip") +check_picker("br, *;q=0", None) +check_picker("br", "identity") + + +class LateClosingBytesIO(io.BytesIO): + """ + BytesIO that does not close on close(). + + When a stream wrapping a a file-like object is closed, the underlying + file-like object is also closed. This subclass prevents that from + happening by overriding the close method. + + If we close a RecordBatchStreamWriter wrapping a BytesIO object, we want + to be able to create a memory view of the buffer. But that is only possible + if the BytesIO object is not closed yet. + """ + def close(self): + pass + + def close_now(self): + super().close() + + +class SocketWriterSink(socketserver._SocketWriter): + """Wrapper to make wfile usable as a sink for Arrow stream writing.""" + def __init__(self, wfile): + self.wfile = wfile + + def writable(self): + return True + + def write(self, b): + self.wfile.write(b) + + def fileno(self): + return self._sock.fileno() + + def close(self): + """Do nothing so Arrow stream wrappers don't close the socket.""" + pass + + +def stream_all(schema, source, coding, sink): + if coding == "identity": + # source: RecordBatchReader + # |> writer: RecordBatchStreamWriter + # |> sink: file-like + with pa.ipc.new_stream(sink, schema) as writer: + for batch in source: + writer.write_batch(batch) + writer.close() # write EOS marker and flush + else: + # IANA nomenclature for Brotli is "br" and not "brotli" + compression = "brotli" if coding == "br" else coding + with pa.CompressedOutputStream(sink, compression) as compressor: + # source: RecordBatchReader + # |> writer: RecordBatchStreamWriter + # |> compressor: CompressedOutputStream + # |> sink: file-like + with pa.ipc.new_stream(compressor, schema) as writer: + for batch in source: + writer.write_batch(batch) + writer.close() # write EOS marker and flush + # ensure buffered data is compressed and written to the sink + compressor.close() + + +def generate_single_buffer(schema, source, coding): + """ + Put all the record batches from the source into a single buffer. + + If `coding` is "identity", the source is written to the buffer as is. + Otherwise, the source is compressed using the given coding. + """ + # the sink holds the buffer and we give a view of it to the caller + with LateClosingBytesIO() as sink: + stream_all(schema, source, coding, sink) + # zero-copy buffer access using getbuffer() keeping the sink alive + # after the yield statement until this function is done executing + with sink.getbuffer() as buffer: + yield buffer + sink.close_now() + + +def generate_buffers(coding, schema, source): + compression = "brotli" if coding == "br" else coding + with LateClosingBytesIO() as sink, pa.CompressedOutputStream( + sink, compression + ) as compressor: + # pyarrow.ipc.RecordBatchStream writer that writes into the compressor + # which compresses data on the fly. Compressed data is written to the + # io.BytesIO sink which holds the buffer we give to the caller. + writer = pa.ipc.new_stream(compressor, schema) + print(type(writer)) + try: + while True: + sink.seek(0) + writer.write_batch(source.read_next_batch()) + compressor.flush() + sink.truncate() + with sink.getbuffer() as buffer: + print(len(buffer)) + yield buffer + except StopIteration: + print("StopIteration") + pass + + print("At the end") + sink.seek(0) + writer.close() + compressor.close() + sink.truncate() + with sink.getbuffer() as buffer: + yield buffer + sink.close_now() + + +AVAILABLE_ENCODINGS = ["zstd", "br", "gzip", "deflate"] +""" +List of available content-codings as used in HTTP. + +Note that Arrow stream classes refer to Brotli as "brotli" and not "br". +""" + + +class MyRequestHandler(BaseHTTPRequestHandler): + """ + Response handler for a simple HTTP server. + + This HTTP request handler serves a compressed HTTP response with an Arrow + stream in it or a (TODO) compressed Arrow stream in a uncompressed HTTP + response. + + The Arrow data is randomly generated "trading data" with a schema consisting + of a ticker, price (in cents), and volume. + """ + + def _resolve_batches(self): + return pa.RecordBatchReader.from_batches(the_schema, all_batches) + + def _send_not_acceptable(self, accept_encoding, parsing_error=None): + self.send_response(406, "Not Acceptable") + self.send_header("Content-Type", "text/plain") + self.end_headers() + if parsing_error: + message = f"Error parsing `Accept-Encoding` header: {parsing_error}\n" + else: + message = "None of the available codings are accepted by this client.\n" + message += f"`Accept-Encoding` header was {accept_encoding!r}.\n" + self.wfile.write(bytes(message, "utf-8")) + + def do_GET(self): + # HTTP/1.0 requests don't get chunked responses + if self.request_version == "HTTP/1.0": + self.protocol_version = "HTTP/1.0" + chunked = False + else: + self.protocol_version = "HTTP/1.1" + chunked = USE_CHUNKED_HTTP1_1_ENCODING + + coding = None + parsing_error = None + accept_encoding = self.headers.get("Accept-Encoding") + if accept_encoding is None: + # if the Accept-Encoding header is not explicitly set, return the + # uncompressed data for HTTP/1.0 requests and compressed data for + # HTTP/1.1 requests with the safest compression format choice: "gzip". + coding = "identity" if self.request_version == "HTTP/1.0" else "gzip" + else: + try: + coding = pick_coding(accept_encoding, AVAILABLE_ENCODINGS) + except ValueError as e: + parsing_error = e + + if coding is None: + self._send_not_acceptable(accept_encoding, parsing_error) + return + + ### in a real application the data would be resolved from a database or + ### another source like a file and error handling would be done here + ### before the 200 OK response starts being sent to the client. + source = self._resolve_batches() + + self.send_response(200) + self.send_header("Content-Type", "application/vnd.apache.arrow.stream") + if coding != "identity": + self.send_header("Content-Encoding", coding) + ### set these headers if testing with a local browser-based client: + # self.send_header('Access-Control-Allow-Origin', 'http://localhost:8008') + # self.send_header('Access-Control-Allow-Methods', 'GET') + # self.send_header('Access-Control-Allow-Headers', 'Content-Type') + if chunked: + self.send_header("Transfer-Encoding", "chunked") + self.end_headers() + for buffer in generate_buffers(the_schema, source, coding): + self.wfile.write(f"{len(buffer):X}\r\n".encode("utf-8")) + self.wfile.write(buffer) + self.wfile.write("\r\n".encode("utf-8")) + self.wfile.write("0\r\n\r\n".encode("utf-8")) + else: + if BUFFER_HTTP1_0_CONTENT: + for buffer in generate_single_buffer(the_schema, source, coding): + self.send_header("Content-Length", str(len(buffer))) + self.end_headers() + self.wfile.write(buffer) + break + else: + self.end_headers() + sink = SocketWriterSink(self.wfile) + stream_all(the_schema, source, coding, sink) + + +print("Generating example data...") + +all_tickers = example_tickers(60) +all_batches = example_batches(all_tickers) + +server_address = ("localhost", 8008) +try: + httpd = HTTPServer(server_address, MyRequestHandler) + print(f"Serving on {server_address[0]}:{server_address[1]}...") + httpd.serve_forever() +except KeyboardInterrupt: + print("Shutting down server") + httpd.socket.close() From 24973e35e0829a19c3068dd7ed0c0a26dc28aa08 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 6 Sep 2024 19:27:19 -0300 Subject: [PATCH 02/35] complete the chunked response loop --- .../python/client/make_requests.sh | 16 +++- http/get_compressed/python/server/server.py | 80 ++++++++++++------- 2 files changed, 62 insertions(+), 34 deletions(-) diff --git a/http/get_compressed/python/client/make_requests.sh b/http/get_compressed/python/client/make_requests.sh index 0679a18..1bad0c1 100755 --- a/http/get_compressed/python/client/make_requests.sh +++ b/http/get_compressed/python/client/make_requests.sh @@ -17,14 +17,24 @@ # specific language governing permissions and limitations # under the License. -OUT=output.arrows -URI="http://localhost:8008" CURL="curl --verbose" +URI="http://localhost:8008" +OUT=output.arrows +OUT2=output2.arrows # HTTP/1.0 means that response is not chunked and not compressed... $CURL --http1.0 -o $OUT $URI # ...but it may be compressed with an explicitly set Accept-Encoding # header -# $CURL --http1.0 -H "Accept-Encoding: gzip" -o $OUT $URI +$CURL --http1.0 -H "Accept-Encoding: gzip" -o $OUT.gz $URI $CURL --http1.0 -H "Accept-Encoding: zstd" -o $OUT.zstd $URI $CURL --http1.0 -H "Accept-Encoding: br" -o $OUT.brotli $URI + +# HTTP/1.1 means compression is on by default... +# ...but it can be refused with the Accept-Encoding: identity header. +$CURL -H "Accept-Encoding: identity" -o $OUT2 $URI +# ...with gzip if no Accept-Encoding header is set. +$CURL -o $OUT2.gz $URI +# ...or with the compression algorithm specified in the Accept-Encoding. +$CURL -H "Accept-Encoding: zstd" -o $OUT2.zstd $URI +$CURL -H "Accept-Encoding: br" -o $OUT2.brotli $URI diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py index f487442..7714061 100644 --- a/http/get_compressed/python/server/server.py +++ b/http/get_compressed/python/server/server.py @@ -294,37 +294,55 @@ def generate_single_buffer(schema, source, coding): sink.close_now() -def generate_buffers(coding, schema, source): - compression = "brotli" if coding == "br" else coding - with LateClosingBytesIO() as sink, pa.CompressedOutputStream( - sink, compression - ) as compressor: - # pyarrow.ipc.RecordBatchStream writer that writes into the compressor - # which compresses data on the fly. Compressed data is written to the - # io.BytesIO sink which holds the buffer we give to the caller. - writer = pa.ipc.new_stream(compressor, schema) - print(type(writer)) - try: - while True: - sink.seek(0) - writer.write_batch(source.read_next_batch()) - compressor.flush() - sink.truncate() - with sink.getbuffer() as buffer: - print(len(buffer)) - yield buffer - except StopIteration: - print("StopIteration") - pass - - print("At the end") - sink.seek(0) - writer.close() - compressor.close() - sink.truncate() - with sink.getbuffer() as buffer: - yield buffer - sink.close_now() +def generate_buffers(schema, source, coding): + # the sink holds the buffer and we give a view of it to the caller + with LateClosingBytesIO() as sink: + # keep buffering until we have at least MIN_BUFFER_SIZE bytes + # in the buffer before yielding it to the caller + MIN_BUFFER_SIZE = 256 * 1024 + if coding == "identity": + # source: RecordBatchReader + # |> writer: RecordBatchStreamWriter + # |> sink: LateClosingBytesIO + writer = pa.ipc.new_stream(sink, schema) + try: + while True: + writer.write_batch(source.read_next_batch()) + if sink.tell() >= MIN_BUFFER_SIZE: + sink.truncate() + with sink.getbuffer() as buffer: + yield buffer + sink.seek(0) + except StopIteration: + pass + + writer.close() # write EOS marker and flush + else: + compression = "brotli" if coding == "br" else coding + with pa.CompressedOutputStream(sink, compression) as compressor: + # source: RecordBatchReader + # |> writer: RecordBatchStreamWriter + # |> compressor: CompressedOutputStream + # |> sink: LateClosingBytesIO + writer = pa.ipc.new_stream(compressor, schema) + try: + while True: + writer.write_batch(source.read_next_batch()) + if sink.tell() >= MIN_BUFFER_SIZE: + sink.truncate() + with sink.getbuffer() as buffer: + yield buffer + sink.seek(0) + except StopIteration: + pass + + writer.close() # write EOS marker and flush + compressor.close() + + sink.truncate() + with sink.getbuffer() as buffer: + yield buffer + sink.close_now() AVAILABLE_ENCODINGS = ["zstd", "br", "gzip", "deflate"] From 62277d76e2ac90b3ac2ae230d260bccb543af21a Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 6 Sep 2024 20:32:19 -0300 Subject: [PATCH 03/35] more strict list of available compressors --- http/get_compressed/README.md | 1 - http/get_compressed/python/client/make_requests.sh | 10 +++++----- http/get_compressed/python/server/server.py | 2 +- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/http/get_compressed/README.md b/http/get_compressed/README.md index 99259cc..4b02995 100644 --- a/http/get_compressed/README.md +++ b/http/get_compressed/README.md @@ -86,4 +86,3 @@ must include a `Content-Encoding` header in the response. [^1]: [Fielding, R. et al. (1999). HTTP/1.1. RFC 2616, Section 14.3 Accept-Encoding.](https://www.rfc-editor.org/rfc/rfc2616#section-14.3) [^2]: [MDN Web Docs: Accept-Encoding](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding#browser_compatibility) - diff --git a/http/get_compressed/python/client/make_requests.sh b/http/get_compressed/python/client/make_requests.sh index 1bad0c1..9c0c673 100755 --- a/http/get_compressed/python/client/make_requests.sh +++ b/http/get_compressed/python/client/make_requests.sh @@ -26,9 +26,9 @@ OUT2=output2.arrows $CURL --http1.0 -o $OUT $URI # ...but it may be compressed with an explicitly set Accept-Encoding # header -$CURL --http1.0 -H "Accept-Encoding: gzip" -o $OUT.gz $URI -$CURL --http1.0 -H "Accept-Encoding: zstd" -o $OUT.zstd $URI -$CURL --http1.0 -H "Accept-Encoding: br" -o $OUT.brotli $URI +$CURL --http1.0 -H "Accept-Encoding: gzip, *;q=0" -o $OUT.gz $URI +$CURL --http1.0 -H "Accept-Encoding: zstd, *;q=0" -o $OUT.zstd $URI +$CURL --http1.0 -H "Accept-Encoding: br, *;q=0" -o $OUT.brotli $URI # HTTP/1.1 means compression is on by default... # ...but it can be refused with the Accept-Encoding: identity header. @@ -36,5 +36,5 @@ $CURL -H "Accept-Encoding: identity" -o $OUT2 $URI # ...with gzip if no Accept-Encoding header is set. $CURL -o $OUT2.gz $URI # ...or with the compression algorithm specified in the Accept-Encoding. -$CURL -H "Accept-Encoding: zstd" -o $OUT2.zstd $URI -$CURL -H "Accept-Encoding: br" -o $OUT2.brotli $URI +$CURL -H "Accept-Encoding: zstd, *;q=0" -o $OUT2.zstd $URI +$CURL -H "Accept-Encoding: br, *;q=0" -o $OUT2.brotli $URI diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py index 7714061..63fbb9a 100644 --- a/http/get_compressed/python/server/server.py +++ b/http/get_compressed/python/server/server.py @@ -345,7 +345,7 @@ def generate_buffers(schema, source, coding): sink.close_now() -AVAILABLE_ENCODINGS = ["zstd", "br", "gzip", "deflate"] +AVAILABLE_ENCODINGS = ["zstd", "br", "gzip"] """ List of available content-codings as used in HTTP. From 3409b1f6bdeed2c33ef5c7a9c70ebe01121e0ac5 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 6 Sep 2024 21:11:11 -0300 Subject: [PATCH 04/35] simplify config --- http/get_compressed/python/server/server.py | 35 ++++++++++----------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py index 63fbb9a..434215b 100644 --- a/http/get_compressed/python/server/server.py +++ b/http/get_compressed/python/server/server.py @@ -23,11 +23,9 @@ import socketserver import string -# HTTP/1.1: Use chunked responses for HTTP/1.1 requests. -USE_CHUNKED_HTTP1_1_ENCODING = True -# HTTP/1.0: put entire response in a buffer, set the Content-Length header, -# and send it all at once. If False, stream the response into the socket. -BUFFER_HTTP1_0_CONTENT = False +# put entire response in a buffer, set the Content-Length header, and +# send it all at once. If False, stream the response into the socket. +BUFFER_ENTIRE_RESPONSE = False def random_string(alphabet, length): @@ -386,7 +384,7 @@ def do_GET(self): chunked = False else: self.protocol_version = "HTTP/1.1" - chunked = USE_CHUNKED_HTTP1_1_ENCODING + chunked = not BUFFER_ENTIRE_RESPONSE coding = None parsing_error = None @@ -412,13 +410,13 @@ def do_GET(self): source = self._resolve_batches() self.send_response(200) - self.send_header("Content-Type", "application/vnd.apache.arrow.stream") - if coding != "identity": - self.send_header("Content-Encoding", coding) ### set these headers if testing with a local browser-based client: # self.send_header('Access-Control-Allow-Origin', 'http://localhost:8008') # self.send_header('Access-Control-Allow-Methods', 'GET') # self.send_header('Access-Control-Allow-Headers', 'Content-Type') + self.send_header("Content-Type", "application/vnd.apache.arrow.stream") + if coding != "identity": + self.send_header("Content-Encoding", coding) if chunked: self.send_header("Transfer-Encoding", "chunked") self.end_headers() @@ -427,17 +425,16 @@ def do_GET(self): self.wfile.write(buffer) self.wfile.write("\r\n".encode("utf-8")) self.wfile.write("0\r\n\r\n".encode("utf-8")) - else: - if BUFFER_HTTP1_0_CONTENT: - for buffer in generate_single_buffer(the_schema, source, coding): - self.send_header("Content-Length", str(len(buffer))) - self.end_headers() - self.wfile.write(buffer) - break - else: + elif BUFFER_ENTIRE_RESPONSE: + for buffer in generate_single_buffer(the_schema, source, coding): + self.send_header("Content-Length", str(len(buffer))) self.end_headers() - sink = SocketWriterSink(self.wfile) - stream_all(the_schema, source, coding, sink) + self.wfile.write(buffer) + break + else: + self.end_headers() + sink = SocketWriterSink(self.wfile) + stream_all(the_schema, source, coding, sink) print("Generating example data...") From cc7bdae8bef7ed731d7b2f36c0b46eb308bd553c Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 6 Sep 2024 21:18:56 -0300 Subject: [PATCH 05/35] better names --- http/get_compressed/python/server/server.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py index 434215b..5dcbcc4 100644 --- a/http/get_compressed/python/server/server.py +++ b/http/get_compressed/python/server/server.py @@ -275,7 +275,7 @@ def stream_all(schema, source, coding, sink): compressor.close() -def generate_single_buffer(schema, source, coding): +def generate_response_buffer(schema, source, coding): """ Put all the record batches from the source into a single buffer. @@ -292,7 +292,7 @@ def generate_single_buffer(schema, source, coding): sink.close_now() -def generate_buffers(schema, source, coding): +def generate_chunk_buffers(schema, source, coding): # the sink holds the buffer and we give a view of it to the caller with LateClosingBytesIO() as sink: # keep buffering until we have at least MIN_BUFFER_SIZE bytes @@ -420,13 +420,13 @@ def do_GET(self): if chunked: self.send_header("Transfer-Encoding", "chunked") self.end_headers() - for buffer in generate_buffers(the_schema, source, coding): + for buffer in generate_chunk_buffers(the_schema, source, coding): self.wfile.write(f"{len(buffer):X}\r\n".encode("utf-8")) self.wfile.write(buffer) self.wfile.write("\r\n".encode("utf-8")) self.wfile.write("0\r\n\r\n".encode("utf-8")) elif BUFFER_ENTIRE_RESPONSE: - for buffer in generate_single_buffer(the_schema, source, coding): + for buffer in generate_response_buffer(the_schema, source, coding): self.send_header("Content-Length", str(len(buffer))) self.end_headers() self.wfile.write(buffer) From 3131eed2ea0dd3d1bad1e94e9a954ad5c75e726d Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 6 Sep 2024 21:23:36 -0300 Subject: [PATCH 06/35] turns out I can use for..in in this loop as well --- http/get_compressed/python/server/server.py | 34 +++++++++------------ 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py index 5dcbcc4..3bfb875 100644 --- a/http/get_compressed/python/server/server.py +++ b/http/get_compressed/python/server/server.py @@ -303,16 +303,13 @@ def generate_chunk_buffers(schema, source, coding): # |> writer: RecordBatchStreamWriter # |> sink: LateClosingBytesIO writer = pa.ipc.new_stream(sink, schema) - try: - while True: - writer.write_batch(source.read_next_batch()) - if sink.tell() >= MIN_BUFFER_SIZE: - sink.truncate() - with sink.getbuffer() as buffer: - yield buffer - sink.seek(0) - except StopIteration: - pass + for batch in source: + writer.write_batch(batch) + if sink.tell() >= MIN_BUFFER_SIZE: + sink.truncate() + with sink.getbuffer() as buffer: + yield buffer + sink.seek(0) writer.close() # write EOS marker and flush else: @@ -323,16 +320,13 @@ def generate_chunk_buffers(schema, source, coding): # |> compressor: CompressedOutputStream # |> sink: LateClosingBytesIO writer = pa.ipc.new_stream(compressor, schema) - try: - while True: - writer.write_batch(source.read_next_batch()) - if sink.tell() >= MIN_BUFFER_SIZE: - sink.truncate() - with sink.getbuffer() as buffer: - yield buffer - sink.seek(0) - except StopIteration: - pass + for batch in source: + writer.write_batch(batch) + if sink.tell() >= MIN_BUFFER_SIZE: + sink.truncate() + with sink.getbuffer() as buffer: + yield buffer + sink.seek(0) writer.close() # write EOS marker and flush compressor.close() From ef90f4906c14fbb917062ddea1278bd1a0284ce9 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 6 Sep 2024 21:34:54 -0300 Subject: [PATCH 07/35] fix indent --- http/get_compressed/python/server/server.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py index 3bfb875..98b4910 100644 --- a/http/get_compressed/python/server/server.py +++ b/http/get_compressed/python/server/server.py @@ -331,10 +331,10 @@ def generate_chunk_buffers(schema, source, coding): writer.close() # write EOS marker and flush compressor.close() - sink.truncate() - with sink.getbuffer() as buffer: - yield buffer - sink.close_now() + sink.truncate() + with sink.getbuffer() as buffer: + yield buffer + sink.close_now() AVAILABLE_ENCODINGS = ["zstd", "br", "gzip"] From 4cb867bc7fc4340cf6b7b193b481f6c18eaff2c0 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 6 Sep 2024 21:42:34 -0300 Subject: [PATCH 08/35] don't pick gzip as default when it's not in AVAILABLE_CODINGS --- http/get_compressed/python/server/server.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py index 98b4910..2cc8af2 100644 --- a/http/get_compressed/python/server/server.py +++ b/http/get_compressed/python/server/server.py @@ -224,6 +224,7 @@ class LateClosingBytesIO(io.BytesIO): to be able to create a memory view of the buffer. But that is only possible if the BytesIO object is not closed yet. """ + def close(self): pass @@ -233,6 +234,7 @@ def close_now(self): class SocketWriterSink(socketserver._SocketWriter): """Wrapper to make wfile usable as a sink for Arrow stream writing.""" + def __init__(self, wfile): self.wfile = wfile @@ -387,7 +389,12 @@ def do_GET(self): # if the Accept-Encoding header is not explicitly set, return the # uncompressed data for HTTP/1.0 requests and compressed data for # HTTP/1.1 requests with the safest compression format choice: "gzip". - coding = "identity" if self.request_version == "HTTP/1.0" else "gzip" + coding = ( + "identity" + if self.request_version == "HTTP/1.0" + or ("gzip" not in AVAILABLE_ENCODINGS) + else "gzip" + ) else: try: coding = pick_coding(accept_encoding, AVAILABLE_ENCODINGS) From b2c6c88ef662cf795fec7c535e2f0b53c15846ef Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 6 Sep 2024 22:00:43 -0300 Subject: [PATCH 09/35] suggest default filename --- http/get_compressed/python/server/server.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py index 2cc8af2..1cf55a5 100644 --- a/http/get_compressed/python/server/server.py +++ b/http/get_compressed/python/server/server.py @@ -416,6 +416,9 @@ def do_GET(self): # self.send_header('Access-Control-Allow-Methods', 'GET') # self.send_header('Access-Control-Allow-Headers', 'Content-Type') self.send_header("Content-Type", "application/vnd.apache.arrow.stream") + # suggest a default filename in case this response is saved by the user + self.send_header("Content-Disposition", r'attachment; filename="output.arrows"') + if coding != "identity": self.send_header("Content-Encoding", coding) if chunked: From c53805cdaf6e2c1409d0a0f8135af823885dc1c2 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Mon, 9 Sep 2024 22:04:28 -0300 Subject: [PATCH 10/35] fix brotli file extension --- http/get_compressed/python/client/make_requests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/http/get_compressed/python/client/make_requests.sh b/http/get_compressed/python/client/make_requests.sh index 9c0c673..c8a44d0 100755 --- a/http/get_compressed/python/client/make_requests.sh +++ b/http/get_compressed/python/client/make_requests.sh @@ -37,4 +37,4 @@ $CURL -H "Accept-Encoding: identity" -o $OUT2 $URI $CURL -o $OUT2.gz $URI # ...or with the compression algorithm specified in the Accept-Encoding. $CURL -H "Accept-Encoding: zstd, *;q=0" -o $OUT2.zstd $URI -$CURL -H "Accept-Encoding: br, *;q=0" -o $OUT2.brotli $URI +$CURL -H "Accept-Encoding: br, *;q=0" -o $OUT2.br $URI From d98c7f76671e0267031015497a8cc158e3d721f1 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Mon, 9 Sep 2024 22:04:53 -0300 Subject: [PATCH 11/35] expand README with note about simpler Accept-Encoding headers --- http/get_compressed/README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/http/get_compressed/README.md b/http/get_compressed/README.md index 4b02995..fc77451 100644 --- a/http/get_compressed/README.md +++ b/http/get_compressed/README.md @@ -83,6 +83,16 @@ must include a `Content-Encoding` header in the response. response MUST include a Content-Encoding entity-header (section 14.11) that lists the non-identity content-coding(s) used. +Since not all servers implement the full `Accept-Encoding` header parsing +logic, clients tend to stick to simple header values like +`Accept-Encoding: identity` when no compression is desired, and +`Accept-Encoding: gzip, deflate, zstd, br` when the client supports different +compression formats and is indifferent to which one the server chooses. Clients +should expect uncompressed responses as well in theses cases. The only way to +force a "406 Not Acceptable" response when no compression is available is to +send `identity;q=0` or `*;q=0` somewhere in the end of the `Accept-Encoding` +header. But that relies on the server implementing the full `Accept-Encoding` +handling logic. [^1]: [Fielding, R. et al. (1999). HTTP/1.1. RFC 2616, Section 14.3 Accept-Encoding.](https://www.rfc-editor.org/rfc/rfc2616#section-14.3) [^2]: [MDN Web Docs: Accept-Encoding](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding#browser_compatibility) From 31aecefa070e971cd2eab2ae796413bf989adb83 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Mon, 9 Sep 2024 22:09:20 -0300 Subject: [PATCH 12/35] Add client.py --- http/get_compressed/python/client/client.py | 84 +++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 http/get_compressed/python/client/client.py diff --git a/http/get_compressed/python/client/client.py b/http/get_compressed/python/client/client.py new file mode 100644 index 0000000..1f1ceeb --- /dev/null +++ b/http/get_compressed/python/client/client.py @@ -0,0 +1,84 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import urllib.request +import pyarrow as pa +import time + +URI = "http://localhost:8008" +ARROW_STREAM_FORMAT = "application/vnd.apache.arrow.stream" + + +def make_request(uri, coding): + # urllib.request.urlopen() always sends an HTTP/1.1 request + # with Accept-Encoding: identity, so we need to setup a request + # object to customize the request headers + request = urllib.request.Request( + uri, + headers={ + "Accept-Encoding": f"{coding}, *;q=0", + }, + ) + response = urllib.request.urlopen(request) + content_type = response.headers["Content-Type"] + if content_type != ARROW_STREAM_FORMAT: + raise ValueError(f"Expected {ARROW_STREAM_FORMAT}, got {content_type}") + if coding == "identity": + return response + # IANA nomenclature for Brotli is "br" and not "brotli" + compression = "brotli" if coding == "br" else coding + return pa.CompressedInputStream(response, compression) + + +def request_and_process(uri, coding): + batches = [] + log_prefix = f"{'[' + coding + ']':>10}:" + print(f"{log_prefix} Requesting data from {uri} with `{coding}` encoding.") + start_time = time.time() + response = make_request(uri, coding) + with pa.ipc.open_stream(response) as reader: + schema = reader.schema + time_to_schema = time.time() - start_time + try: + batch = reader.read_next_batch() + time_to_first_batch = time.time() - start_time + batches.append(batch) + while True: + batch = reader.read_next_batch() + batches.append(batch) + except StopIteration: + pass + processing_time = time.time() - start_time + print( + f"{log_prefix} Schema received in {time_to_schema:.3f} seconds." + f" schema=({', '.join(schema.names)})." + ) + print( + f"{log_prefix} First batch of {len(batches)} received and processed in" + f" {time_to_first_batch:.3f} seconds" + ) + print( + f"{log_prefix} Processing of all batches completed in" + f" {processing_time:.3f} seconds." + ) + return batches + + +request_and_process(URI, "identity") +request_and_process(URI, "zstd") +request_and_process(URI, "br") +request_and_process(URI, "gzip") From 07c4dd51ce2c9e06e2e52ba07d911d4cfb16f637 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Mon, 9 Sep 2024 23:35:47 -0300 Subject: [PATCH 13/35] reduce buffering and reduce latency --- http/get_compressed/python/server/server.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py index 1cf55a5..3c93b66 100644 --- a/http/get_compressed/python/server/server.py +++ b/http/get_compressed/python/server/server.py @@ -298,8 +298,10 @@ def generate_chunk_buffers(schema, source, coding): # the sink holds the buffer and we give a view of it to the caller with LateClosingBytesIO() as sink: # keep buffering until we have at least MIN_BUFFER_SIZE bytes - # in the buffer before yielding it to the caller - MIN_BUFFER_SIZE = 256 * 1024 + # in the buffer before yielding it to the caller. Setting it + # to 1 means we yield as soon as the compression frames are + # formed and reach the sink buffer. + MIN_BUFFER_SIZE = 1 if coding == "identity": # source: RecordBatchReader # |> writer: RecordBatchStreamWriter @@ -324,6 +326,10 @@ def generate_chunk_buffers(schema, source, coding): writer = pa.ipc.new_stream(compressor, schema) for batch in source: writer.write_batch(batch) + # A record batch might be buffered in the + # CompressorOutputStream before a full compression frame is + # written to the sink, so we must check the sink size since + # the last time we yielded a memory view. if sink.tell() >= MIN_BUFFER_SIZE: sink.truncate() with sink.getbuffer() as buffer: From 727d3e5572411b873e93277e602dbe85fe9a78ce Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Tue, 10 Sep 2024 17:57:52 -0300 Subject: [PATCH 14/35] expedite the yielding of the first buffer --- http/get_compressed/python/server/server.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py index 3c93b66..4a88c7d 100644 --- a/http/get_compressed/python/server/server.py +++ b/http/get_compressed/python/server/server.py @@ -299,9 +299,9 @@ def generate_chunk_buffers(schema, source, coding): with LateClosingBytesIO() as sink: # keep buffering until we have at least MIN_BUFFER_SIZE bytes # in the buffer before yielding it to the caller. Setting it - # to 1 means we yield as soon as the compression frames are + # to 1 means we yield as soon as the compression blocks are # formed and reach the sink buffer. - MIN_BUFFER_SIZE = 1 + MIN_BUFFER_SIZE = 64 * 1024 if coding == "identity": # source: RecordBatchReader # |> writer: RecordBatchStreamWriter @@ -319,6 +319,8 @@ def generate_chunk_buffers(schema, source, coding): else: compression = "brotli" if coding == "br" else coding with pa.CompressedOutputStream(sink, compression) as compressor: + # has the first buffer been yielded already? + sent_first = False # source: RecordBatchReader # |> writer: RecordBatchStreamWriter # |> compressor: CompressedOutputStream @@ -326,15 +328,16 @@ def generate_chunk_buffers(schema, source, coding): writer = pa.ipc.new_stream(compressor, schema) for batch in source: writer.write_batch(batch) - # A record batch might be buffered in the - # CompressorOutputStream before a full compression frame is - # written to the sink, so we must check the sink size since - # the last time we yielded a memory view. - if sink.tell() >= MIN_BUFFER_SIZE: + # we try to yield a buffer ASAP no matter how small + if not sent_first and sink.tell() == 0: + compressor.flush() + pos = sink.tell() + if pos >= MIN_BUFFER_SIZE or (not sent_first and pos >= 1): sink.truncate() with sink.getbuffer() as buffer: yield buffer sink.seek(0) + sent_first = True writer.close() # write EOS marker and flush compressor.close() From 83d241dcda52d22b4b1ca2a85f68abf37d341f23 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Wed, 11 Sep 2024 15:49:01 -0300 Subject: [PATCH 15/35] expand README --- http/get_compressed/README.md | 42 +++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/http/get_compressed/README.md b/http/get_compressed/README.md index fc77451..c1f6417 100644 --- a/http/get_compressed/README.md +++ b/http/get_compressed/README.md @@ -21,6 +21,42 @@ This directory contains examples of HTTP servers/clients that transmit/receive data in the Arrow IPC streaming format and use compression (in various ways) to reduce the size of the transmitted data. +Since we re-use the [Arrow IPC format][ipc] for transferring Arrow data over +HTTP and both Arrow IPC and HTTP standards support compression on their own, +there are at least two approaches to this problem: + +1. Compressed HTTP responses carrying Arrow IPC streams with uncompressed + array buffers. +2. Uncompressed HTTP responses carrying Arrow IPC streams with compressed + array buffers. + +Applying IPC buffer and HTTP compression at the same is not recommended. The +extra CPU overhead of decompressing the data twice is not worth any possible +gains that double compression might bring. If compression ratios are +unambiguously more important than reducing CPU overhead, then a different +compression algorithm that optimizes for that can be chosen. + +This table shows the support for different compression algorithms in HTTP and +Arrow IPC: + +| Format | HTTP Support | IPC Support | +| ------------------ | --------------- | --------------- | +| gzip (GZip) | X | | +| deflate (DEFLATE) | X | | +| br (Brotli) | X[^2] | | +| zstd (Zstandard) | X[^2] | X | +| lz4 (LZ4) | | X | + +Since not all Arrow IPC implementations support compression, HTTP compression +based on accepted formats negotiated with the client is a great way to increase +the chances of efficient data transfer. + +Servers may check the `Accept-Encoding` header of the client and choose the +compression format in this order of preference: `zstd`, `br`, `gzip`, +`identity` (no compression). If the client does not specify a preference, the +only constraint on the server is the availability of the compression algorithm +in the server environment. + ## HTTP/1.1 Response Compression HTTP/1.1 offers an elaborate way for clients to specify their preferred @@ -94,5 +130,11 @@ send `identity;q=0` or `*;q=0` somewhere in the end of the `Accept-Encoding` header. But that relies on the server implementing the full `Accept-Encoding` handling logic. +## Arrow IPC Compression + +TODO: this section will be added once examples are expanded to include Arrow IPC compression. + [^1]: [Fielding, R. et al. (1999). HTTP/1.1. RFC 2616, Section 14.3 Accept-Encoding.](https://www.rfc-editor.org/rfc/rfc2616#section-14.3) [^2]: [MDN Web Docs: Accept-Encoding](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding#browser_compatibility) + +[ipc]: https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc From 36924f7c37d08817bcf0c2fcede7ec4120241dd0 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Wed, 11 Sep 2024 15:52:23 -0300 Subject: [PATCH 16/35] remove test code --- http/get_compressed/python/server/server.py | 40 --------------------- 1 file changed, 40 deletions(-) diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py index 4a88c7d..05133e5 100644 --- a/http/get_compressed/python/server/server.py +++ b/http/get_compressed/python/server/server.py @@ -119,25 +119,6 @@ def parse_accept_encoding(s): return accepted -def check_parser(s, expected): - try: - parsed = parse_accept_encoding(s) - # print("parsed:", parsed, "\nexpected:", expected) - assert parsed == expected - except ValueError as e: - print(e) - - -check_parser("", []) -expected = [("gzip", None), ("zstd", 1.0), ("*", None)] -check_parser("gzip, zstd;q=1.0, *", expected) -check_parser("gzip , zstd; q= 1.0 , *", expected) -expected = [("gzip", None), ("zstd", 1.0), ("*", 0.0)] -check_parser("gzip , zstd; q= 1.0 \t \r\n ,*;q =0", expected) -expected = [("zstd", 1.0), ("gzip", 0.5), ("br", 0.8), ("identity", 0.0)] -check_parser("zstd;q=1.0, gzip;q=0.5, br;q=0.8, identity;q=0", expected) - - def pick_coding(accept_encoding_header, available): """ Pick the content-coding that the server should use to compress the response. @@ -191,27 +172,6 @@ def value_or(value, default): return None -def check_picker(header, expected): - available = ["zstd", "gzip"] # no "br" an no "deflate" - chosen = pick_coding(header, available) - # print("Accept-Encoding:", header, "\nexpected:", expected, "\t\tchosen:", chosen) - assert chosen == expected - - -check_picker("gzip, zstd;q=1.0, *", "zstd") -check_picker("gzip , zstd; q= 1.0 , *", "zstd") -check_picker("gzip , zstd; q= 1.0 \t \r\n ,*;q =0", "zstd") -check_picker("zstd;q=1.0, gzip;q=0.5, br;q=0.8, identity;q=0", "zstd") - -check_picker("compress, gzip", "gzip") -check_picker("", "identity") -check_picker("*", "zstd") -check_picker("compress;q=0.5, gzip;q=1.0", "gzip") -check_picker("gzip;q=1.0, identity; q=0.5, *;q=0", "gzip") -check_picker("br, *;q=0", None) -check_picker("br", "identity") - - class LateClosingBytesIO(io.BytesIO): """ BytesIO that does not close on close(). From 4319edeb76fa61357ffb65a30b790b5ba8964381 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Wed, 11 Sep 2024 21:15:13 -0300 Subject: [PATCH 17/35] add an option to use dictionary-encoded string column --- http/get_compressed/python/client/client.py | 4 ++- http/get_compressed/python/server/server.py | 27 +++++++++++++++------ 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/http/get_compressed/python/client/client.py b/http/get_compressed/python/client/client.py index 1f1ceeb..e7581ff 100644 --- a/http/get_compressed/python/client/client.py +++ b/http/get_compressed/python/client/client.py @@ -63,18 +63,20 @@ def request_and_process(uri, coding): except StopIteration: pass processing_time = time.time() - start_time + reader_stats = reader.stats print( f"{log_prefix} Schema received in {time_to_schema:.3f} seconds." f" schema=({', '.join(schema.names)})." ) print( - f"{log_prefix} First batch of {len(batches)} received and processed in" + f"{log_prefix} First batch received and processed in" f" {time_to_first_batch:.3f} seconds" ) print( f"{log_prefix} Processing of all batches completed in" f" {processing_time:.3f} seconds." ) + print(f"{log_prefix}", reader_stats) return batches diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py index 05133e5..be8d288 100644 --- a/http/get_compressed/python/server/server.py +++ b/http/get_compressed/python/server/server.py @@ -19,6 +19,7 @@ from http.server import BaseHTTPRequestHandler, HTTPServer import io import pyarrow as pa +import pyarrow.compute as pc import re import socketserver import string @@ -27,6 +28,9 @@ # send it all at once. If False, stream the response into the socket. BUFFER_ENTIRE_RESPONSE = False +# use dictionary encoding for the ticker column +USE_DICTIONARY_ENCODING = True + def random_string(alphabet, length): return "".join(choice(alphabet) for _ in range(length)) @@ -47,9 +51,12 @@ def example_tickers(num_tickers): return tickers +the_ticker_type = ( + pa.dictionary(pa.int32(), pa.utf8()) if USE_DICTIONARY_ENCODING else pa.utf8() +) the_schema = pa.schema( [ - ("ticker", pa.utf8()), + ("ticker", the_ticker_type), ("price", pa.int64()), ("volume", pa.int64()), ] @@ -57,13 +64,19 @@ def example_tickers(num_tickers): def example_batch(tickers, length): - data = {"ticker": [], "price": [], "volume": []} + ticker_indices = [] + price = [] + volume = [] for _ in range(length): - data["ticker"].append(choice(tickers)) - data["price"].append(randint(1, 1000) * 100) - data["volume"].append(randint(1, 10000)) - - return pa.RecordBatch.from_pydict(data, the_schema) + ticker_indices.append(randint(0, len(tickers) - 1)) + price.append(randint(1, 1000) * 100) + volume.append(randint(1, 10000)) + ticker = ( + pa.DictionaryArray.from_arrays(ticker_indices, tickers) + if USE_DICTIONARY_ENCODING + else pc.take(tickers, ticker_indices, boundscheck=False) + ) + return pa.RecordBatch.from_arrays([ticker, price, volume], schema=the_schema) def example_batches(tickers): From 2d992ada63fba7709f9b4ecec21496e4d101d0b7 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Wed, 11 Sep 2024 22:41:58 -0300 Subject: [PATCH 18/35] readme: add note about IPC compression codec negotiation --- http/get_compressed/README.md | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/http/get_compressed/README.md b/http/get_compressed/README.md index c1f6417..5b5aa90 100644 --- a/http/get_compressed/README.md +++ b/http/get_compressed/README.md @@ -57,6 +57,30 @@ compression format in this order of preference: `zstd`, `br`, `gzip`, only constraint on the server is the availability of the compression algorithm in the server environment. +## Arrow IPC Compression + +When IPC buffer compression is preferred and servers can't assume all clients +support it[^3], clients may be asked to explicitly list the supported compression +algorithms in the request headers. The `Accept` header can be used for this +since `Accept-Encoding` (and `Content-Encoding`) is used to control compression +of the entire HTTP response stream and instruct HTTP clients (like browsers) to +decompress the response before giving data to the application or saving the +data. + + Accept: application/vnd.apache.arrow.ipc; codecs="zstd, lz4" + +There is similar to clients requesting video streams by specifying the +container format and the codecs they support +(e.g. `Accept: video/webm; codecs="vp8, vorbis"`). + +The server is allowed to choose any of the listed codecs, or not compress the +IPC buffers at all. Uncompressed IPC buffers should always be acceptable by +clients. + +If a server adopts this approach and a client does not specify any codecs in +the `Accept` header, the server can fall back to checking `Accept-Encoding` +header to pick a compression algorithm for the entire HTTP response stream. + ## HTTP/1.1 Response Compression HTTP/1.1 offers an elaborate way for clients to specify their preferred @@ -130,11 +154,10 @@ send `identity;q=0` or `*;q=0` somewhere in the end of the `Accept-Encoding` header. But that relies on the server implementing the full `Accept-Encoding` handling logic. -## Arrow IPC Compression - -TODO: this section will be added once examples are expanded to include Arrow IPC compression. [^1]: [Fielding, R. et al. (1999). HTTP/1.1. RFC 2616, Section 14.3 Accept-Encoding.](https://www.rfc-editor.org/rfc/rfc2616#section-14.3) [^2]: [MDN Web Docs: Accept-Encoding](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding#browser_compatibility) +[^3]: Web applications using the JavaScript Arrow implementation don't have + access to the compression APIs to decompress `zstd` and `lz4` IPC buffers. [ipc]: https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc From 6725886b422ec00a67bb3a5d26efa8f8bedb0e6a Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Wed, 11 Sep 2024 22:47:29 -0300 Subject: [PATCH 19/35] remove BUFFER_ENTIRE_RESPONSE option --- http/get_compressed/python/server/server.py | 57 ++------------------- 1 file changed, 3 insertions(+), 54 deletions(-) diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py index be8d288..99850a9 100644 --- a/http/get_compressed/python/server/server.py +++ b/http/get_compressed/python/server/server.py @@ -24,10 +24,6 @@ import socketserver import string -# put entire response in a buffer, set the Content-Length header, and -# send it all at once. If False, stream the response into the socket. -BUFFER_ENTIRE_RESPONSE = False - # use dictionary encoding for the ticker column USE_DICTIONARY_ENCODING = True @@ -225,48 +221,6 @@ def close(self): pass -def stream_all(schema, source, coding, sink): - if coding == "identity": - # source: RecordBatchReader - # |> writer: RecordBatchStreamWriter - # |> sink: file-like - with pa.ipc.new_stream(sink, schema) as writer: - for batch in source: - writer.write_batch(batch) - writer.close() # write EOS marker and flush - else: - # IANA nomenclature for Brotli is "br" and not "brotli" - compression = "brotli" if coding == "br" else coding - with pa.CompressedOutputStream(sink, compression) as compressor: - # source: RecordBatchReader - # |> writer: RecordBatchStreamWriter - # |> compressor: CompressedOutputStream - # |> sink: file-like - with pa.ipc.new_stream(compressor, schema) as writer: - for batch in source: - writer.write_batch(batch) - writer.close() # write EOS marker and flush - # ensure buffered data is compressed and written to the sink - compressor.close() - - -def generate_response_buffer(schema, source, coding): - """ - Put all the record batches from the source into a single buffer. - - If `coding` is "identity", the source is written to the buffer as is. - Otherwise, the source is compressed using the given coding. - """ - # the sink holds the buffer and we give a view of it to the caller - with LateClosingBytesIO() as sink: - stream_all(schema, source, coding, sink) - # zero-copy buffer access using getbuffer() keeping the sink alive - # after the yield statement until this function is done executing - with sink.getbuffer() as buffer: - yield buffer - sink.close_now() - - def generate_chunk_buffers(schema, source, coding): # the sink holds the buffer and we give a view of it to the caller with LateClosingBytesIO() as sink: @@ -362,7 +316,7 @@ def do_GET(self): chunked = False else: self.protocol_version = "HTTP/1.1" - chunked = not BUFFER_ENTIRE_RESPONSE + chunked = True coding = None parsing_error = None @@ -411,16 +365,11 @@ def do_GET(self): self.wfile.write(buffer) self.wfile.write("\r\n".encode("utf-8")) self.wfile.write("0\r\n\r\n".encode("utf-8")) - elif BUFFER_ENTIRE_RESPONSE: - for buffer in generate_response_buffer(the_schema, source, coding): - self.send_header("Content-Length", str(len(buffer))) - self.end_headers() - self.wfile.write(buffer) - break else: self.end_headers() sink = SocketWriterSink(self.wfile) - stream_all(the_schema, source, coding, sink) + for buffer in generate_chunk_buffers(the_schema, source, coding): + sink.write(buffer) print("Generating example data...") From 152157b03fb3c5475bad45bc2e3fd1cc78d8b748 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Thu, 12 Sep 2024 00:02:55 -0300 Subject: [PATCH 20/35] write a parser based on a tokenizer --- http/get_compressed/python/server/server.py | 71 +++++++++++++++++---- 1 file changed, 57 insertions(+), 14 deletions(-) diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py index 99850a9..e8354e2 100644 --- a/http/get_compressed/python/server/server.py +++ b/http/get_compressed/python/server/server.py @@ -97,12 +97,33 @@ def example_batches(tickers): # [L]inear [W]hite [S]pace pattern (HTTP/1.1 - RFC 2616) LWS_RE = "(?:[ \\t]|\\r\\n[ \\t]+)*" -CONTENT_CODING_PATTERN = re.compile( - r"%s(?:([A-Za-z]+|\*)%s(?:;%sq%s=(%s[01]+(?:\.\d{1,3})?))?%s)?" - % (LWS_RE, LWS_RE, LWS_RE, LWS_RE, LWS_RE, LWS_RE) +# tokenizer pattern to support the Accept-Encoding header parser +TOKENS_PATTERN = re.compile( + r"(?P[A-Za-z][A-Za-z0-9]*|\*)" # a name or a wildcard token + r"|(?P,)" + r"|(?P;)" + r"|(?P=)" + r"|(?P\d+(\.\d{1,3})?)" + f"|(?P{LWS_RE})" + r"|(?P.+)" ) +def unexpected(header_name, label, value): + msg = f"Malformed {header_name} header: unexpected {label} at {value!r}" + return ValueError(msg) + + +def tokenize(header_name, s): + for mo in re.finditer(TOKENS_PATTERN, s): + kind = mo.lastgroup + if kind == "SKIP": + continue + elif kind == "MISMATCH": + raise unexpected(header_name, "character", mo.group()) + yield [kind, mo.group()] + + def parse_accept_encoding(s): """ Parse the Accept-Encoding request header value. @@ -113,18 +134,40 @@ def parse_accept_encoding(s): The list of lowercase codings (or "*") and their qvalues in the order they appear in the header. The qvalue is None if not specified. """ - pieces = s.split(",") + AE = "Accept-Encoding" + tokens = tokenize(AE, s) + + def expect(expected_kind): + kind, value = next(tokens) + if kind != expected_kind: + raise unexpected(AE, "token", value) + return value + accepted = [] - for piece in pieces: - m = re.fullmatch(CONTENT_CODING_PATTERN, piece) - if m is None: - raise ValueError(f"Malformed Accept-Encoding header: {s!r}") - if m.group(1) is None: # whitespace is skipped - continue - coding = m.group(1).lower() - qvalue = m.group(2) - pair = coding, float(qvalue) if qvalue is not None else None - accepted.append(pair) + while True: + try: + coding = None + qvalue = None + coding = expect("ID").lower() + kind, value = next(tokens) + if kind == "COMMA": + accepted.append((coding, qvalue)) + continue + if kind == "SEMI": + value = expect("ID") + if value != "q": + raise unexpected(AE, "token", value) + expect("EQ") + qvalue = float(expect("NUM")) + expect("COMMA") + accepted.append((coding, qvalue)) + continue + raise unexpected(AE, "token", value) + except StopIteration: + break + # this parser ignores any unfinished ;q=NUM sequence or trailing commas + if coding is not None: + accepted.append((coding, qvalue)) return accepted From 427a8b744efee816663da84bfdffe0e6c061a183 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Thu, 12 Sep 2024 17:54:35 -0300 Subject: [PATCH 21/35] make parser generic to Accept and Accept-Encoding --- http/get_compressed/python/server/server.py | 127 +++++++++++--------- 1 file changed, 70 insertions(+), 57 deletions(-) diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py index e8354e2..19b5061 100644 --- a/http/get_compressed/python/server/server.py +++ b/http/get_compressed/python/server/server.py @@ -95,79 +95,86 @@ def example_batches(tickers): # end of example data generation +# what the HTTP spec calls a token (any character except CTLs or separators) +TOKEN_RE = r"(?:[A-Za-z0-9!#$%&'*+./^_`|~-]+)" # [L]inear [W]hite [S]pace pattern (HTTP/1.1 - RFC 2616) -LWS_RE = "(?:[ \\t]|\\r\\n[ \\t]+)*" -# tokenizer pattern to support the Accept-Encoding header parser -TOKENS_PATTERN = re.compile( - r"(?P[A-Za-z][A-Za-z0-9]*|\*)" # a name or a wildcard token +LWS_RE = r"(?:[ \t]|\r\n[ \t]+)*" +TOKENIZER_PAT = re.compile( + f"(?P{TOKEN_RE})" + r'|(?P"([^"\\]|\\.)*")' # a quoted string (escaped pairs allowed) r"|(?P,)" r"|(?P;)" r"|(?P=)" - r"|(?P\d+(\.\d{1,3})?)" - f"|(?P{LWS_RE})" - r"|(?P.+)" + f"|(?P{LWS_RE})" # LWS is skipped + r"|(?P.+)", + flags=re.ASCII, # HTTP headers are encoded in ASCII ) -def unexpected(header_name, label, value): - msg = f"Malformed {header_name} header: unexpected {label} at {value!r}" - return ValueError(msg) - - -def tokenize(header_name, s): - for mo in re.finditer(TOKENS_PATTERN, s): - kind = mo.lastgroup - if kind == "SKIP": - continue - elif kind == "MISMATCH": - raise unexpected(header_name, "character", mo.group()) - yield [kind, mo.group()] - - -def parse_accept_encoding(s): +def parse_header_value(header_name, header_value): """ - Parse the Accept-Encoding request header value. + Parse the Accept or Accept-Encoding request header values. Returns ------- - list of (str, float|None) - The list of lowercase codings (or "*") and their qvalues in the order - they appear in the header. The qvalue is None if not specified. + list of (str, dict) + The list of lowercase tokens and their parameters in the order they + appear in the header. The parameters are stored in a dictionary where + the keys are the parameter names and the values are the parameter + values. If a parameter is not followed by an equal sign and a value, + the value is None. """ - AE = "Accept-Encoding" - tokens = tokenize(AE, s) + + def unexpected(label, value): + msg = f"Malformed {header_name} header: unexpected {label} at {value!r}" + return ValueError(msg) + + def tokenize(): + for mo in re.finditer(TOKENIZER_PAT, header_value): + kind = mo.lastgroup + if kind == "SKIP": + continue + elif kind == "MISMATCH": + raise unexpected("character", mo.group()) + yield (kind, mo.group()) + + tokens = tokenize() def expect(expected_kind): - kind, value = next(tokens) + kind, text = next(tokens) if kind != expected_kind: - raise unexpected(AE, "token", value) - return value + raise unexpected("token", text) + return text accepted = [] while True: try: - coding = None - qvalue = None - coding = expect("ID").lower() - kind, value = next(tokens) - if kind == "COMMA": - accepted.append((coding, qvalue)) - continue - if kind == "SEMI": - value = expect("ID") - if value != "q": - raise unexpected(AE, "token", value) - expect("EQ") - qvalue = float(expect("NUM")) - expect("COMMA") - accepted.append((coding, qvalue)) - continue - raise unexpected(AE, "token", value) + name, params = None, {} + name = expect("TOK").lower() + kind, text = next(tokens) + while True: + if kind == "COMMA": + accepted.append((name, params)) + break + if kind == "SEMI": + ident = expect("TOK") + params[ident] = None # init param to None + kind, text = next(tokens) + if kind != "EQ": + continue + kind, text = next(tokens) + if kind in ["TOK", "QUOTED"]: + if kind == "QUOTED": + text = text[1:-1] # remove the quotes + params[ident] = text # set param to value + kind, text = next(tokens) + continue + raise unexpected("token", text) except StopIteration: break - # this parser ignores any unfinished ;q=NUM sequence or trailing commas - if coding is not None: - accepted.append((coding, qvalue)) + if name is not None: + # any unfinished ;param=value sequence or trailing separators are ignored + accepted.append((name, params)) return accepted @@ -194,16 +201,22 @@ def pick_coding(accept_encoding_header, available): and doesn't accept "identity" (uncompressed) either. In this case, a "406 Not Acceptable" response should be sent. """ - accepted = parse_accept_encoding(accept_encoding_header) + accepted = parse_header_value("Accept-Encoding", accept_encoding_header) - def value_or(value, default): - return default if value is None else value + def qvalue_or(params, default): + qvalue = params.get("q") + if qvalue is not None: + try: + return float(qvalue) + except ValueError: + raise ValueError(f"Invalid qvalue in Accept-Encoding header: {qvalue}") + return default if "identity" not in available: available = available + ["identity"] state = {} - for coding, qvalue in accepted: - qvalue = value_or(qvalue, 0.0001 if coding == "identity" else 1.0) + for coding, params in accepted: + qvalue = qvalue_or(params, 0.0001 if coding == "identity" else 1.0) if coding == "*": for coding in available: if coding not in state: From 0df44adccecfbccc1370fdece95e932cd62b49a9 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Thu, 12 Sep 2024 19:18:16 -0300 Subject: [PATCH 22/35] support IPC buffer compression based on Accept header --- .../python/client/make_requests.sh | 26 ++- http/get_compressed/python/server/server.py | 182 ++++++++++++++---- 2 files changed, 164 insertions(+), 44 deletions(-) diff --git a/http/get_compressed/python/client/make_requests.sh b/http/get_compressed/python/client/make_requests.sh index c8a44d0..1706953 100755 --- a/http/get_compressed/python/client/make_requests.sh +++ b/http/get_compressed/python/client/make_requests.sh @@ -19,22 +19,28 @@ CURL="curl --verbose" URI="http://localhost:8008" -OUT=output.arrows -OUT2=output2.arrows +OUT_HTTP1=out.arrows +OUT_CHUNKED=out_from_chunked.arrows # HTTP/1.0 means that response is not chunked and not compressed... -$CURL --http1.0 -o $OUT $URI +$CURL --http1.0 -o $OUT_HTTP1 $URI # ...but it may be compressed with an explicitly set Accept-Encoding # header -$CURL --http1.0 -H "Accept-Encoding: gzip, *;q=0" -o $OUT.gz $URI -$CURL --http1.0 -H "Accept-Encoding: zstd, *;q=0" -o $OUT.zstd $URI -$CURL --http1.0 -H "Accept-Encoding: br, *;q=0" -o $OUT.brotli $URI +$CURL --http1.0 -H "Accept-Encoding: gzip, *;q=0" -o $OUT_HTTP1.gz $URI +$CURL --http1.0 -H "Accept-Encoding: zstd, *;q=0" -o $OUT_HTTP1.zstd $URI +$CURL --http1.0 -H "Accept-Encoding: br, *;q=0" -o $OUT_HTTP1.br $URI +# ...or with IPC buffer compression if the Accept header specifies codecs. +$CURL --http1.0 -H "Accept: application/vnd.apache.arrow.stream; codecs=\"zstd, lz4\"" -o $OUT_HTTP1+zstd $URI +$CURL --http1.0 -H "Accept: application/vnd.apache.arrow.stream; codecs=lz4" -o $OUT_HTTP1+lz4 $URI # HTTP/1.1 means compression is on by default... # ...but it can be refused with the Accept-Encoding: identity header. -$CURL -H "Accept-Encoding: identity" -o $OUT2 $URI +$CURL -H "Accept-Encoding: identity" -o $OUT_CHUNKED $URI # ...with gzip if no Accept-Encoding header is set. -$CURL -o $OUT2.gz $URI +$CURL -o $OUT_CHUNKED.gz $URI # ...or with the compression algorithm specified in the Accept-Encoding. -$CURL -H "Accept-Encoding: zstd, *;q=0" -o $OUT2.zstd $URI -$CURL -H "Accept-Encoding: br, *;q=0" -o $OUT2.br $URI +$CURL -H "Accept-Encoding: zstd, *;q=0" -o $OUT_CHUNKED.zstd $URI +$CURL -H "Accept-Encoding: br, *;q=0" -o $OUT_CHUNKED.br $URI +# ...or with IPC buffer compression if the Accept header specifies codecs. +$CURL -H "Accept: application/vnd.apache.arrow.stream; codecs=\"zstd, lz4\"" -o $OUT_CHUNKED+zstd $URI +$CURL -H "Accept: application/vnd.apache.arrow.stream; codecs=lz4" -o $OUT_CHUNKED+lz4 $URI diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py index 19b5061..57a62f1 100644 --- a/http/get_compressed/python/server/server.py +++ b/http/get_compressed/python/server/server.py @@ -178,9 +178,71 @@ def expect(expected_kind): return accepted +ARROW_STREAM_FORMAT = "application/vnd.apache.arrow.stream" + + +def pick_ipc_codec(accept_header, available, default): + """ + Pick the IPC stream codec according to the Accept header. + + This is used when deciding which codec to use for compression of IPC buffer + streams. This is a feature of the Arrow IPC stream format and is different + from the HTTP content-coding used to compress the entire HTTP response. + + This is how a client may specify the IPC buffer compression codecs it + accepts: + + Accept: application/vnd.apache.arrow.ipc; codecs="zstd, lz4" + + Parameters + ---------- + accept_header : str|None + The value of the Accept header from an HTTP request. + available : list of str + The codecs that the server can provide in the order preferred by the + server. Example: ["zstd", "lz4"]. + default : str|None + The codec to use if the client does not specify the ";codecs" parameter + in the Accept header. + + Returns + ------- + str|None + The codec that the server should use to compress the IPC buffer stream. + None if the client does not accept any of the available codecs + explicitly listed. ;codecs="" means no codecs are accepted. + If the client does not specify the codecs parameter, the default codec + is returned. + """ + did_specify_codecs = False + accepted_codecs = [] + if accept_header is not None: + accepted = parse_header_value("Accept", accept_header) + for media_range, params in accepted: + if ( + media_range == "*/*" + or media_range == "application/*" + or media_range == ARROW_STREAM_FORMAT + ): + did_specify_codecs = "codecs" in params + codecs_str = params.get("codecs") + if codecs_str is None: + continue + for codec in codecs_str.split(","): + accepted_codecs.append(codec.strip()) + + for codec in available: + if codec in accepted_codecs: + return codec + return None if did_specify_codecs else default + + def pick_coding(accept_encoding_header, available): """ - Pick the content-coding that the server should use to compress the response. + Pick the content-coding according to the Accept-Encoding header. + + This is used when using HTTP response compression instead of IPC buffer + compression. Parameters ---------- @@ -188,7 +250,7 @@ def pick_coding(accept_encoding_header, available): The value of the Accept-Encoding header from an HTTP request. available : list of str The content-codings that the server can provide in the order preferred - by the server. + by the server. Example: ["zstd", "br", "gzip"]. Returns ------- @@ -237,6 +299,45 @@ def qvalue_or(params, default): return None +def pick_compression(headers, available_ipc_codecs, available_codings, default): + """ + Pick the compression strategy based on the Accept and Accept-Encoding headers. + + Parameters + ---------- + headers : dict + The HTTP request headers. + available_ipc_codecs : list of str + The codecs that the server can provide for IPC buffer compression. + available_codings : list of str + The content-codings that the server can provide for HTTP response + compression. + default : str + The default compression strategy to use if the client does explicitly + choose. + + Returns + ------- + str|None + The compression strategy to use. It can be one of the following: + "identity": no compression at all. + "identity+zstd": No HTTP compression + IPC buffer compression with Zstd. + "identity+lz4": No HTTP compression + IPC buffer compression with LZ4. + "zstd", "br", "gzip", ...: HTTP compression without IPC buffer compression. + None means a "406 Not Acceptable" response should be sent. + """ + accept = headers.get("Accept") + ipc_codec = pick_ipc_codec(accept, available_ipc_codecs, default=None) + if ipc_codec is None: + accept_encoding = headers.get("Accept-Encoding") + return ( + default + if accept_encoding is None + else pick_coding(accept_encoding, available_codings) + ) + return "identity+" + ipc_codec + + class LateClosingBytesIO(io.BytesIO): """ BytesIO that does not close on close(). @@ -277,7 +378,7 @@ def close(self): pass -def generate_chunk_buffers(schema, source, coding): +def generate_chunk_buffers(schema, source, compression): # the sink holds the buffer and we give a view of it to the caller with LateClosingBytesIO() as sink: # keep buffering until we have at least MIN_BUFFER_SIZE bytes @@ -285,11 +386,17 @@ def generate_chunk_buffers(schema, source, coding): # to 1 means we yield as soon as the compression blocks are # formed and reach the sink buffer. MIN_BUFFER_SIZE = 64 * 1024 - if coding == "identity": + if compression.startswith("identity"): + if compression == "identity+zstd": + options = pa.ipc.IpcWriteOptions(compression="zstd") + elif compression == "identity+lz4": + options = pa.ipc.IpcWriteOptions(compression="lz4") + else: + options = None # source: RecordBatchReader # |> writer: RecordBatchStreamWriter # |> sink: LateClosingBytesIO - writer = pa.ipc.new_stream(sink, schema) + writer = pa.ipc.new_stream(sink, schema, options=options) for batch in source: writer.write_batch(batch) if sink.tell() >= MIN_BUFFER_SIZE: @@ -300,7 +407,7 @@ def generate_chunk_buffers(schema, source, coding): writer.close() # write EOS marker and flush else: - compression = "brotli" if coding == "br" else coding + compression = "brotli" if compression == "br" else compression with pa.CompressedOutputStream(sink, compression) as compressor: # has the first buffer been yielded already? sent_first = False @@ -331,7 +438,10 @@ def generate_chunk_buffers(schema, source, coding): sink.close_now() -AVAILABLE_ENCODINGS = ["zstd", "br", "gzip"] +AVAILABLE_IPC_CODECS = ["zstd", "lz4"] +"""List of available codecs Arrow IPC buffer compression.""" + +AVAILABLE_CODINGS = ["zstd", "br", "gzip"] """ List of available content-codings as used in HTTP. @@ -354,15 +464,20 @@ class MyRequestHandler(BaseHTTPRequestHandler): def _resolve_batches(self): return pa.RecordBatchReader.from_batches(the_schema, all_batches) - def _send_not_acceptable(self, accept_encoding, parsing_error=None): + def _send_not_acceptable(self, parsing_error=None): self.send_response(406, "Not Acceptable") self.send_header("Content-Type", "text/plain") self.end_headers() if parsing_error: - message = f"Error parsing `Accept-Encoding` header: {parsing_error}\n" + message = f"Error parsing header: {parsing_error}\n" else: message = "None of the available codings are accepted by this client.\n" - message += f"`Accept-Encoding` header was {accept_encoding!r}.\n" + accept = self.headers.get("Accept") + if accept is not None: + message += f"`Accept` header was {accept!r}.\n" + accept_encoding = self.headers.get("Accept-Encoding") + if accept_encoding is not None: + message += f"`Accept-Encoding` header was {accept_encoding!r}.\n" self.wfile.write(bytes(message, "utf-8")) def do_GET(self): @@ -374,27 +489,26 @@ def do_GET(self): self.protocol_version = "HTTP/1.1" chunked = True - coding = None - parsing_error = None - accept_encoding = self.headers.get("Accept-Encoding") - if accept_encoding is None: - # if the Accept-Encoding header is not explicitly set, return the - # uncompressed data for HTTP/1.0 requests and compressed data for - # HTTP/1.1 requests with the safest compression format choice: "gzip". - coding = ( - "identity" - if self.request_version == "HTTP/1.0" - or ("gzip" not in AVAILABLE_ENCODINGS) - else "gzip" + # if client's intent cannot be derived from the headers, return + # uncompressed data for HTTP/1.0 requests and compressed data for + # HTTP/1.1 requests with the safest compression format choice: "gzip". + default_compression = ( + "identity" + if self.request_version == "HTTP/1.0" or ("gzip" not in AVAILABLE_CODINGS) + else "gzip" + ) + try: + compression = pick_compression( + self.headers, + AVAILABLE_IPC_CODECS, + AVAILABLE_CODINGS, + default_compression, ) - else: - try: - coding = pick_coding(accept_encoding, AVAILABLE_ENCODINGS) - except ValueError as e: - parsing_error = e - - if coding is None: - self._send_not_acceptable(accept_encoding, parsing_error) + if compression is None: + self._send_not_acceptable() + return + except ValueError as e: + self._send_not_acceptable(str(e)) return ### in a real application the data would be resolved from a database or @@ -411,12 +525,12 @@ def do_GET(self): # suggest a default filename in case this response is saved by the user self.send_header("Content-Disposition", r'attachment; filename="output.arrows"') - if coding != "identity": - self.send_header("Content-Encoding", coding) + if not compression.startswith("identity"): + self.send_header("Content-Encoding", compression) if chunked: self.send_header("Transfer-Encoding", "chunked") self.end_headers() - for buffer in generate_chunk_buffers(the_schema, source, coding): + for buffer in generate_chunk_buffers(the_schema, source, compression): self.wfile.write(f"{len(buffer):X}\r\n".encode("utf-8")) self.wfile.write(buffer) self.wfile.write("\r\n".encode("utf-8")) @@ -424,7 +538,7 @@ def do_GET(self): else: self.end_headers() sink = SocketWriterSink(self.wfile) - for buffer in generate_chunk_buffers(the_schema, source, coding): + for buffer in generate_chunk_buffers(the_schema, source, compression): sink.write(buffer) From 42195c00cba32efca4581bbc9d55e998e55e4005 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Thu, 12 Sep 2024 19:43:04 -0300 Subject: [PATCH 23/35] return codec in header --- http/get_compressed/python/client/client.py | 2 +- http/get_compressed/python/server/server.py | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/http/get_compressed/python/client/client.py b/http/get_compressed/python/client/client.py index e7581ff..fd213b5 100644 --- a/http/get_compressed/python/client/client.py +++ b/http/get_compressed/python/client/client.py @@ -35,7 +35,7 @@ def make_request(uri, coding): ) response = urllib.request.urlopen(request) content_type = response.headers["Content-Type"] - if content_type != ARROW_STREAM_FORMAT: + if not content_type.startswith(ARROW_STREAM_FORMAT): raise ValueError(f"Expected {ARROW_STREAM_FORMAT}, got {content_type}") if coding == "identity": return response diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py index 57a62f1..ddbb894 100644 --- a/http/get_compressed/python/server/server.py +++ b/http/get_compressed/python/server/server.py @@ -521,7 +521,14 @@ def do_GET(self): # self.send_header('Access-Control-Allow-Origin', 'http://localhost:8008') # self.send_header('Access-Control-Allow-Methods', 'GET') # self.send_header('Access-Control-Allow-Headers', 'Content-Type') - self.send_header("Content-Type", "application/vnd.apache.arrow.stream") + self.send_header( + "Content-Type", + ( + f"{ARROW_STREAM_FORMAT}; codec={compression[9:]}" + if compression.startswith("identity+") + else ARROW_STREAM_FORMAT + ), + ) # suggest a default filename in case this response is saved by the user self.send_header("Content-Disposition", r'attachment; filename="output.arrows"') From ad2d3f2470ae9fad87177d93a51de87328e2fb59 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Thu, 12 Sep 2024 20:30:57 -0300 Subject: [PATCH 24/35] extend client.py cases --- http/get_compressed/python/client/client.py | 38 +++++++++++++-------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/http/get_compressed/python/client/client.py b/http/get_compressed/python/client/client.py index fd213b5..427d0f4 100644 --- a/http/get_compressed/python/client/client.py +++ b/http/get_compressed/python/client/client.py @@ -23,33 +23,39 @@ ARROW_STREAM_FORMAT = "application/vnd.apache.arrow.stream" -def make_request(uri, coding): +def make_request(uri, compression): + coding = "identity" if compression.startswith("identity") else compression # urllib.request.urlopen() always sends an HTTP/1.1 request # with Accept-Encoding: identity, so we need to setup a request - # object to customize the request headers - request = urllib.request.Request( - uri, - headers={ - "Accept-Encoding": f"{coding}, *;q=0", - }, - ) + # object with custom headers to request a specific compression + headers = { + "Accept-Encoding": f"{coding}, *;q=0", + } + if compression.startswith("identity+"): + # request IPC buffer compression instead of HTTP compression + ipc_codec = compression.split("+")[1] + headers["Accept"] = f'{ARROW_STREAM_FORMAT};codec="{ipc_codec}"' + request = urllib.request.Request(uri, headers=headers) + response = urllib.request.urlopen(request) content_type = response.headers["Content-Type"] if not content_type.startswith(ARROW_STREAM_FORMAT): raise ValueError(f"Expected {ARROW_STREAM_FORMAT}, got {content_type}") - if coding == "identity": + if compression.startswith("identity"): return response # IANA nomenclature for Brotli is "br" and not "brotli" - compression = "brotli" if coding == "br" else coding + compression = "brotli" if compression == "br" else compression return pa.CompressedInputStream(response, compression) -def request_and_process(uri, coding): +def request_and_process(uri, compression): batches = [] - log_prefix = f"{'[' + coding + ']':>10}:" - print(f"{log_prefix} Requesting data from {uri} with `{coding}` encoding.") + log_prefix = f"{'[' + compression + ']':>10}:" + print( + f"{log_prefix} Requesting data from {uri} with `{compression}` compression strategy." + ) start_time = time.time() - response = make_request(uri, coding) + response = make_request(uri, compression) with pa.ipc.open_stream(response) as reader: schema = reader.schema time_to_schema = time.time() - start_time @@ -80,7 +86,11 @@ def request_and_process(uri, coding): return batches +# HTTP compression request_and_process(URI, "identity") request_and_process(URI, "zstd") request_and_process(URI, "br") request_and_process(URI, "gzip") +# using IPC buffer compression instead of HTTP compression +request_and_process(URI, "identity+zstd") +request_and_process(URI, "identity+lz4") From 73897d454623a209f32f8d2bdcbf34a669c020af Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Thu, 19 Sep 2024 21:01:55 -0300 Subject: [PATCH 25/35] Update paragraph about double-compression Co-authored-by: Ian Cook --- http/get_compressed/README.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/http/get_compressed/README.md b/http/get_compressed/README.md index 5b5aa90..5aaaac8 100644 --- a/http/get_compressed/README.md +++ b/http/get_compressed/README.md @@ -30,11 +30,12 @@ there are at least two approaches to this problem: 2. Uncompressed HTTP responses carrying Arrow IPC streams with compressed array buffers. -Applying IPC buffer and HTTP compression at the same is not recommended. The -extra CPU overhead of decompressing the data twice is not worth any possible -gains that double compression might bring. If compression ratios are -unambiguously more important than reducing CPU overhead, then a different -compression algorithm that optimizes for that can be chosen. +Applying both IPC buffer and HTTP compression to the same data is not +recommended. The extra CPU overhead of decompressing the data twice is +not worth any possible gains that double compression might bring. If +compression ratios are unambiguously more important than reducing CPU +overhead, then a different compression algorithm that optimizes for that can +be chosen. This table shows the support for different compression algorithms in HTTP and Arrow IPC: From bff94ae703f6d5de6676a7da03f711af6108390e Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Thu, 19 Sep 2024 21:02:19 -0300 Subject: [PATCH 26/35] Fix typo in README Co-authored-by: Ian Cook --- http/get_compressed/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/http/get_compressed/README.md b/http/get_compressed/README.md index 5aaaac8..2b79b1f 100644 --- a/http/get_compressed/README.md +++ b/http/get_compressed/README.md @@ -70,7 +70,7 @@ data. Accept: application/vnd.apache.arrow.ipc; codecs="zstd, lz4" -There is similar to clients requesting video streams by specifying the +This is similar to clients requesting video streams by specifying the container format and the codecs they support (e.g. `Accept: video/webm; codecs="vp8, vorbis"`). From 3d60a54721bc1dc581ba6d5acc58d0d3bb926e8d Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Thu, 19 Sep 2024 21:42:35 -0300 Subject: [PATCH 27/35] Add note about meaning and interpretation of Content-Type --- http/get_compressed/README.md | 15 +++++++++++++++ http/get_compressed/python/client/client.py | 2 +- http/get_compressed/python/server/server.py | 2 +- .../python/client/urllib.request/client.py | 2 +- 4 files changed, 18 insertions(+), 3 deletions(-) diff --git a/http/get_compressed/README.md b/http/get_compressed/README.md index 2b79b1f..c2619e6 100644 --- a/http/get_compressed/README.md +++ b/http/get_compressed/README.md @@ -82,6 +82,21 @@ If a server adopts this approach and a client does not specify any codecs in the `Accept` header, the server can fall back to checking `Accept-Encoding` header to pick a compression algorithm for the entire HTTP response stream. +To make debugging easier servers may include the chosen compression codec(s) +in the `Content-Type` header of the response (quotes are optional): + + Content-Type: application/vnd.apache.arrow.ipc; codecs=zstd + +This is not necessary for correct decompression because the payload already +contains information that tells the IPC reader how to decompress the buffers, +but it can help developers understand what is going on. + +When programatically checking if the `Content-Type` header contains a specific +format, it is important to use a parser that can handle parameters or look +only at the media type part of the header. This is not an exclusivity of the +Arrow IPC format, but a general rule for all media types. For example, +`application/json; charset=utf-8` should be match `application/json`. + ## HTTP/1.1 Response Compression HTTP/1.1 offers an elaborate way for clients to specify their preferred diff --git a/http/get_compressed/python/client/client.py b/http/get_compressed/python/client/client.py index 427d0f4..d09aeb0 100644 --- a/http/get_compressed/python/client/client.py +++ b/http/get_compressed/python/client/client.py @@ -34,7 +34,7 @@ def make_request(uri, compression): if compression.startswith("identity+"): # request IPC buffer compression instead of HTTP compression ipc_codec = compression.split("+")[1] - headers["Accept"] = f'{ARROW_STREAM_FORMAT};codec="{ipc_codec}"' + headers["Accept"] = f'{ARROW_STREAM_FORMAT};codecs="{ipc_codec}"' request = urllib.request.Request(uri, headers=headers) response = urllib.request.urlopen(request) diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py index ddbb894..f52a586 100644 --- a/http/get_compressed/python/server/server.py +++ b/http/get_compressed/python/server/server.py @@ -524,7 +524,7 @@ def do_GET(self): self.send_header( "Content-Type", ( - f"{ARROW_STREAM_FORMAT}; codec={compression[9:]}" + f"{ARROW_STREAM_FORMAT}; codecs={compression[9:]}" if compression.startswith("identity+") else ARROW_STREAM_FORMAT ), diff --git a/http/get_simple/python/client/urllib.request/client.py b/http/get_simple/python/client/urllib.request/client.py index 1d5d198..a2f24de 100644 --- a/http/get_simple/python/client/urllib.request/client.py +++ b/http/get_simple/python/client/urllib.request/client.py @@ -25,7 +25,7 @@ response = urllib.request.urlopen('http://localhost:8008') content_type = response.headers['Content-Type'] -if content_type != ARROW_STREAM_FORMAT: +if not content_type.startswith(ARROW_STREAM_FORMAT): raise ValueError(f"Expected {ARROW_STREAM_FORMAT}, got {content_type}") batches = [] From ffd07e1aa3f5ec161f0c2084f9914f6c17269ca9 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Thu, 19 Sep 2024 22:16:47 -0300 Subject: [PATCH 28/35] fix typo --- http/get_compressed/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/http/get_compressed/README.md b/http/get_compressed/README.md index c2619e6..f44aede 100644 --- a/http/get_compressed/README.md +++ b/http/get_compressed/README.md @@ -95,7 +95,7 @@ When programatically checking if the `Content-Type` header contains a specific format, it is important to use a parser that can handle parameters or look only at the media type part of the header. This is not an exclusivity of the Arrow IPC format, but a general rule for all media types. For example, -`application/json; charset=utf-8` should be match `application/json`. +`application/json; charset=utf-8` should match `application/json`. ## HTTP/1.1 Response Compression From 4f4977610de55fd08a34f96f589d26c78e21415f Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 22 Nov 2024 18:03:44 -0300 Subject: [PATCH 29/35] Apply suggestions from code review Co-authored-by: Ian Cook Co-authored-by: Bryce Mecum --- http/get_compressed/README.md | 32 ++++++++++++--------- http/get_compressed/python/server/server.py | 2 +- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/http/get_compressed/README.md b/http/get_compressed/README.md index f44aede..a3fce47 100644 --- a/http/get_compressed/README.md +++ b/http/get_compressed/README.md @@ -40,13 +40,13 @@ be chosen. This table shows the support for different compression algorithms in HTTP and Arrow IPC: -| Format | HTTP Support | IPC Support | -| ------------------ | --------------- | --------------- | -| gzip (GZip) | X | | -| deflate (DEFLATE) | X | | -| br (Brotli) | X[^2] | | -| zstd (Zstandard) | X[^2] | X | -| lz4 (LZ4) | | X | +| Codec | Identifier | HTTP Support | IPC Support | +|----------- | ----------- | ------------- | ------------ | +| GZip | `gzip` | X | | +| DEFLATE | `deflate` | X | | +| Brotli | `br` | X[^2] | | +| Zstandard | `zstd` | X[^2] | X[^3] | +| LZ4 | `lz4` | | X[^3] | Since not all Arrow IPC implementations support compression, HTTP compression based on accepted formats negotiated with the client is a great way to increase @@ -61,14 +61,14 @@ in the server environment. ## Arrow IPC Compression When IPC buffer compression is preferred and servers can't assume all clients -support it[^3], clients may be asked to explicitly list the supported compression +support it[^4], clients may be asked to explicitly list the supported compression algorithms in the request headers. The `Accept` header can be used for this since `Accept-Encoding` (and `Content-Encoding`) is used to control compression of the entire HTTP response stream and instruct HTTP clients (like browsers) to decompress the response before giving data to the application or saving the data. - Accept: application/vnd.apache.arrow.ipc; codecs="zstd, lz4" + Accept: application/vnd.apache.arrow.stream; codecs="zstd, lz4" This is similar to clients requesting video streams by specifying the container format and the codecs they support @@ -85,7 +85,7 @@ header to pick a compression algorithm for the entire HTTP response stream. To make debugging easier servers may include the chosen compression codec(s) in the `Content-Type` header of the response (quotes are optional): - Content-Type: application/vnd.apache.arrow.ipc; codecs=zstd + Content-Type: application/vnd.apache.arrow.stream; codecs=zstd This is not necessary for correct decompression because the payload already contains information that tells the IPC reader how to decompress the buffers, @@ -97,13 +97,15 @@ only at the media type part of the header. This is not an exclusivity of the Arrow IPC format, but a general rule for all media types. For example, `application/json; charset=utf-8` should match `application/json`. +When considering use of IPC buffer compression, check the [IPC format section of the Arrow Implementation Status page][^5] to see whether the the Arrow implementations you are targeting support it. + ## HTTP/1.1 Response Compression HTTP/1.1 offers an elaborate way for clients to specify their preferred content encoding (read compression algorithm) using the `Accept-Encoding` header.[^1] -At least the Python server (in `python/`) implements a fully compliant +At least the Python server (in [`python/`](./python)) implements a fully compliant parser for the `Accept-Encoding` header. Application servers may choose to implement a simpler check of the `Accept-Encoding` header or assume that the client accepts the chosen compression scheme when talking @@ -111,7 +113,7 @@ to that server. Here is an example of a header that a client may send and what it means: - Accept-Encoding: zstd;q=1.0, gzip;q=0.5, br;q=0.8, identity;q=0 + Accept-Encoding: zstd;q=1.0, gzip;q=0.5, br;q=0.8, identity;q=0 This header says that the client prefers that the server compress the response with `zstd`, but if that is not possible, then `brotli` and `gzip` @@ -122,7 +124,7 @@ does not want the response to be uncompressed. This is communicated by To tell the server the client only accepts `zstd` responses and nothing else, not even uncompressed responses, the client would send: - Accept-Encoding: zstd, *;q=0 + Accept-Encoding: zstd, *;q=0 RFC 2616[^1] specifies the rules for how a server should interpret the `Accept-Encoding` header: @@ -173,7 +175,9 @@ handling logic. [^1]: [Fielding, R. et al. (1999). HTTP/1.1. RFC 2616, Section 14.3 Accept-Encoding.](https://www.rfc-editor.org/rfc/rfc2616#section-14.3) [^2]: [MDN Web Docs: Accept-Encoding](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding#browser_compatibility) -[^3]: Web applications using the JavaScript Arrow implementation don't have +[^3]: [Arrow Columnar Format: Compression](https://arrow.apache.org/docs/format/Columnar.html#compression) +[^4]: Web applications using the JavaScript Arrow implementation don't have access to the compression APIs to decompress `zstd` and `lz4` IPC buffers. +[^5]: [Arrow Implementation Status: IPC Format](https://arrow.apache.org/docs/status.html#ipc-format) [ipc]: https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py index f52a586..c82c551 100644 --- a/http/get_compressed/python/server/server.py +++ b/http/get_compressed/python/server/server.py @@ -192,7 +192,7 @@ def pick_ipc_codec(accept_header, available, default): This is how a client may specify the IPC buffer compression codecs it accepts: - Accept: application/vnd.apache.arrow.ipc; codecs="zstd, lz4" + Accept: application/vnd.apache.arrow.stream; codecs="zstd, lz4" Parameters ---------- From 4be06cd4738219eeb5666859fe60bf6df6021c5b Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 22 Nov 2024 18:10:04 -0300 Subject: [PATCH 30/35] README.md: Break long lines --- http/get_compressed/README.md | 37 +++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/http/get_compressed/README.md b/http/get_compressed/README.md index a3fce47..c8ea47d 100644 --- a/http/get_compressed/README.md +++ b/http/get_compressed/README.md @@ -19,7 +19,9 @@ # HTTP GET Arrow Data: Compression Examples -This directory contains examples of HTTP servers/clients that transmit/receive data in the Arrow IPC streaming format and use compression (in various ways) to reduce the size of the transmitted data. +This directory contains examples of HTTP servers/clients that transmit/receive +data in the Arrow IPC streaming format and use compression (in various ways) to +reduce the size of the transmitted data. Since we re-use the [Arrow IPC format][ipc] for transferring Arrow data over HTTP and both Arrow IPC and HTTP standards support compression on their own, @@ -97,7 +99,9 @@ only at the media type part of the header. This is not an exclusivity of the Arrow IPC format, but a general rule for all media types. For example, `application/json; charset=utf-8` should match `application/json`. -When considering use of IPC buffer compression, check the [IPC format section of the Arrow Implementation Status page][^5] to see whether the the Arrow implementations you are targeting support it. +When considering use of IPC buffer compression, check the [IPC format section of +the Arrow Implementation Status page][^5] to see whether the the Arrow +implementations you are targeting support it. ## HTTP/1.1 Response Compression @@ -105,11 +109,11 @@ HTTP/1.1 offers an elaborate way for clients to specify their preferred content encoding (read compression algorithm) using the `Accept-Encoding` header.[^1] -At least the Python server (in [`python/`](./python)) implements a fully compliant -parser for the `Accept-Encoding` header. Application servers may choose -to implement a simpler check of the `Accept-Encoding` header or assume -that the client accepts the chosen compression scheme when talking -to that server. +At least the Python server (in [`python/`](./python)) implements a fully +compliant parser for the `Accept-Encoding` header. Application servers may +choose to implement a simpler check of the `Accept-Encoding` header or assume +that the client accepts the chosen compression scheme when talking to that +server. Here is an example of a header that a client may send and what it means: @@ -161,16 +165,15 @@ must include a `Content-Encoding` header in the response. response MUST include a Content-Encoding entity-header (section 14.11) that lists the non-identity content-coding(s) used. -Since not all servers implement the full `Accept-Encoding` header parsing -logic, clients tend to stick to simple header values like -`Accept-Encoding: identity` when no compression is desired, and -`Accept-Encoding: gzip, deflate, zstd, br` when the client supports different -compression formats and is indifferent to which one the server chooses. Clients -should expect uncompressed responses as well in theses cases. The only way to -force a "406 Not Acceptable" response when no compression is available is to -send `identity;q=0` or `*;q=0` somewhere in the end of the `Accept-Encoding` -header. But that relies on the server implementing the full `Accept-Encoding` -handling logic. +Since not all servers implement the full `Accept-Encoding` header parsing logic, +clients tend to stick to simple header values like `Accept-Encoding: identity` +when no compression is desired, and `Accept-Encoding: gzip, deflate, zstd, br` +when the client supports different compression formats and is indifferent to +which one the server chooses. Clients should expect uncompressed responses as +well in theses cases. The only way to force a "406 Not Acceptable" response when +no compression is available is to send `identity;q=0` or `*;q=0` somewhere in +the end of the `Accept-Encoding` header. But that relies on the server +implementing the full `Accept-Encoding` handling logic. [^1]: [Fielding, R. et al. (1999). HTTP/1.1. RFC 2616, Section 14.3 Accept-Encoding.](https://www.rfc-editor.org/rfc/rfc2616#section-14.3) From ac6b45e5dce0cb61b77d7bbc0ef86411f1c20423 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 22 Nov 2024 19:09:32 -0300 Subject: [PATCH 31/35] Move make_requests.sh to curl/client.sh --- .../{python/client/make_requests.sh => curl/client/client.sh} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename http/get_compressed/{python/client/make_requests.sh => curl/client/client.sh} (100%) diff --git a/http/get_compressed/python/client/make_requests.sh b/http/get_compressed/curl/client/client.sh similarity index 100% rename from http/get_compressed/python/client/make_requests.sh rename to http/get_compressed/curl/client/client.sh From c44f49e78ed4194685f720a601be1db371369356 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 22 Nov 2024 19:10:10 -0300 Subject: [PATCH 32/35] Add README files to sub directories --- http/get_compressed/curl/client/README.md | 80 +++++++++++++++++++++ http/get_compressed/python/client/README.md | 27 +++++++ http/get_compressed/python/server/README.md | 27 +++++++ 3 files changed, 134 insertions(+) create mode 100644 http/get_compressed/curl/client/README.md create mode 100644 http/get_compressed/python/client/README.md create mode 100644 http/get_compressed/python/server/README.md diff --git a/http/get_compressed/curl/client/README.md b/http/get_compressed/curl/client/README.md new file mode 100644 index 0000000..6694ce0 --- /dev/null +++ b/http/get_compressed/curl/client/README.md @@ -0,0 +1,80 @@ + + +# HTTP GET Arrow Data: Compressed Arrow Data Examples + +This directory contains a simple `curl` script that issues multiple HTTP GET +requests to the server implemented in the parent directory, negotiating +different compression algorithms for the Arrow IPC stream data piping the output +to different files with extensions that indicate the compression algorithm used. + +To run this example, first start one of the server examples in the parent +directory, then run the `client.sh` script. + +You can check all the sizes with a simple command: + +```bash +$ du -sh out* | sort -gr +816M out.arrows +804M out_from_chunked.arrows +418M out_from_chunked.arrows+lz4 +405M out.arrows+lz4 +257M out.arrows.gz +256M out_from_chunked.arrows.gz +229M out_from_chunked.arrows+zstd +229M out.arrows+zstd +220M out.arrows.zstd +219M out_from_chunked.arrows.zstd + 39M out_from_chunked.arrows.br + 38M out.arrows.br +``` + +> [!WARNING] +> Better compression is not the only relevant metric as it might come with a +> trade-off in terms of CPU usage. The best compression algorithm for your use +> case will depend on your specific requirements. + +## Meaning of the file extensions + +Files produced by HTTP/1.0 requests are not chunked, they get buffered in memory +at the server before being sent to the client. If compressed, they end up +slightly smaller than the results of chunked responses, but the extra delay for +first byte is not worth it in most cases. + + - `out.arrows` (Uncompressed) + - `out.arrows.gz` (Gzip HTTP compression) + - `out.arrows.zstd` (Zstandard HTTP compression) + - `out.arrows.br` (Brotli HTTP compression) + + - `out.arrows+zstd` (Zstandard IPC compression) + - `out.arrows+lz4` (LZ4 IPC compression) + +HTTP/1.1 requests are returned by the server with `Transfer-Encoding: chunked` +to send the data in smaller chunks that are sent to the socket as soon as they +are ready. This is useful for large responses that take a long time to generate +at the cost of a small overhead caused by the independent compression of each +chunk. + + - `out_from_chunked.arrows` (Uncompressed) + - `out_from_chunked.arrows.gz` (Gzip HTTP compression) + - `out_from_chunked.arrows.zstd` (Zstandard HTTP compression) + - `out_from_chunked.arrows.br` (Brotli HTTP compression) + + - `out_from_chunked.arrows+lz4` (LZ4 IPC compression) + - `out_from_chunked.arrows+zstd` (Zstandard IPC compression) diff --git a/http/get_compressed/python/client/README.md b/http/get_compressed/python/client/README.md new file mode 100644 index 0000000..c37629a --- /dev/null +++ b/http/get_compressed/python/client/README.md @@ -0,0 +1,27 @@ + + +# HTTP GET Arrow Data: Compressed Arrow Data Examples + +This directory contains an example of an HTTP client implemented in Python using +the Compressed Arrow HTTP Server example from the parent directory. The client +makes multiple requests with different headers to negotiate the compression +algorithm used to transfer Arrow IPC streams. + + python client.py diff --git a/http/get_compressed/python/server/README.md b/http/get_compressed/python/server/README.md new file mode 100644 index 0000000..d896927 --- /dev/null +++ b/http/get_compressed/python/server/README.md @@ -0,0 +1,27 @@ + + + +# HTTP GET Arrow Data: Compressed Arrow Data Examples + +This directory contains an example of an HTTP server implemented in Python +able to serve Arrow IPC streams compressed with different algorithms negotiated +with the client via different standard HTTP headers. + + python server.py From fb4d5dd84307357b20596df32da40499133b7578 Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Wed, 27 Nov 2024 10:26:38 -0500 Subject: [PATCH 33/35] Improve python/server/README.md --- http/get_compressed/python/server/README.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/http/get_compressed/python/server/README.md b/http/get_compressed/python/server/README.md index d896927..cf4bed7 100644 --- a/http/get_compressed/python/server/README.md +++ b/http/get_compressed/python/server/README.md @@ -24,4 +24,9 @@ This directory contains an example of an HTTP server implemented in Python able to serve Arrow IPC streams compressed with different algorithms negotiated with the client via different standard HTTP headers. - python server.py +To run this example: + +```sh +pip install pyarrow +python server.py +``` From 382984b394e787ca994f40a4782506d441a38ae9 Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Wed, 27 Nov 2024 10:26:53 -0500 Subject: [PATCH 34/35] Improve python/client/README.md --- http/get_compressed/python/client/README.md | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/http/get_compressed/python/client/README.md b/http/get_compressed/python/client/README.md index c37629a..903d5a3 100644 --- a/http/get_compressed/python/client/README.md +++ b/http/get_compressed/python/client/README.md @@ -19,9 +19,8 @@ # HTTP GET Arrow Data: Compressed Arrow Data Examples -This directory contains an example of an HTTP client implemented in Python using -the Compressed Arrow HTTP Server example from the parent directory. The client -makes multiple requests with different headers to negotiate the compression -algorithm used to transfer Arrow IPC streams. +This directory contains an HTTP client implemented in Python that issues multiple +requests to one of the server examples implemented in the parent directory, +negotiating different compression algorithms for the Arrow IPC stream data. python client.py From 0f20539fe00ae1e6f8e3848ab612773bcbcb2a05 Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Wed, 27 Nov 2024 10:27:16 -0500 Subject: [PATCH 35/35] Improve python/client/README.md --- http/get_compressed/python/client/README.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/http/get_compressed/python/client/README.md b/http/get_compressed/python/client/README.md index 903d5a3..1285a74 100644 --- a/http/get_compressed/python/client/README.md +++ b/http/get_compressed/python/client/README.md @@ -23,4 +23,10 @@ This directory contains an HTTP client implemented in Python that issues multipl requests to one of the server examples implemented in the parent directory, negotiating different compression algorithms for the Arrow IPC stream data. - python client.py +To run this example, first start one of the compressed server examples in the +parent directory, then: + +```sh +pip install pyarrow +python client.py +```