From 932bdde4bc83339a68d403ca46b862da891de5a4 Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Thu, 19 Nov 2015 17:59:39 -0500 Subject: [PATCH 01/29] Preliminary commit for Avro backend. --- odo/backends/avro.py | 90 ++++++++++++++++++++++++++++++ odo/backends/tests/test_avro.py | 69 +++++++++++++++++++++++ odo/backends/tests/test_file.avro | Bin 0 -> 1216 bytes 3 files changed, 159 insertions(+) create mode 100644 odo/backends/avro.py create mode 100644 odo/backends/tests/test_avro.py create mode 100755 odo/backends/tests/test_file.avro diff --git a/odo/backends/avro.py b/odo/backends/avro.py new file mode 100644 index 000000000..87367bdd3 --- /dev/null +++ b/odo/backends/avro.py @@ -0,0 +1,90 @@ +from __future__ import absolute_import, division, print_function + +from avro import schema, datafile, io +import pandas as pd +from datashape import discover, dshape, var, Record, Map, date_, datetime_, \ + Option, null, string, int32, int64, float64, float32, boolean +from collections import Iterator +from ..append import append +from ..convert import convert +from ..resource import resource + +AVRO_TYPE_MAP = { + 'string': string, + 'int': int32, + 'long': int64, + 'null': null, + 'double': float64, + 'float': float32, + 'bool': boolean, +} + +TD_AVRO_TYPES = { + "BIGINT": "long", + "BYTEINT": "int", + "DECIMAL_SHORT": "int", + "DECIMAL_LONG": "long", + "DECIMAL": "double", + "FLOAT": "double", + "INT": "int", + "INTEGER": "int", + "SMALLINT": "int", + "CHAR": "string", + "DATE": "string", + "TIME": "string", + "TIMESTAMP": "string" +} + + +@resource.register('.+\.(avro)') +def resource_avro(uri, **kwargs): + + rec_reader = io.DatumReader() + df_reader = datafile.DataFileReader( + open(uri, 'r'), + rec_reader + ) + return df_reader + +def _get_schema(f): + return schema.parse(f.meta['avro.schema']) + +def discover_schema(sch): + + if isinstance(sch, schema.RecordSchema): + return var * Record([(f.name, discover_schema(f.type)) for f in sch.fields]) + elif isinstance(sch, schema.UnionSchema): + try: + types = [s.type for s in sch.schemas] + assert "null" in types + types.remove("null") + assert len(types) == 1 + return Option(AVRO_TYPE_MAP[types[0]]) + except AssertionError: + import pdb; pdb.set_trace() + return null + elif isinstance(sch, schema.PrimitiveSchema): + return AVRO_TYPE_MAP[sch.type] + elif isinstance(sch, schema.MapSchema): + return Map(string, discover_schema(sch.values)) + elif isinstance(sch, schema.ArraySchema): + raise Exception("ArraySchema TODO") + else: + raise Exception(str(type(sch))) + +@discover.register(datafile.DataFileReader) +def discover_avro(f, **kwargs): + return discover_schema(_get_schema(f)) + +@convert.register(pd.DataFrame, datafile.DataFileReader, cost=4.0) +def avro_to_DataFrame(avro, dshape=None, **kwargs): + df = pd.DataFrame([r for r in avro]) + names = [f.name.decode('utf-8') for f in _get_schema(avro).fields] + df = df[names] #Reorder names to match avro schema + #names = [col.decode('utf-8') for col in avro + #df = df[names] # Reorder names to match sasfile + return df + +@convert.register(Iterator, datafile.DataFileReader, cost=1.0) +def avro_to_iterator(s, **kwargs): + return s \ No newline at end of file diff --git a/odo/backends/tests/test_avro.py b/odo/backends/tests/test_avro.py new file mode 100644 index 000000000..1d6bbc159 --- /dev/null +++ b/odo/backends/tests/test_avro.py @@ -0,0 +1,69 @@ +from __future__ import absolute_import, division, print_function + +from avro import datafile, io, schema +import pandas as pd +from pandas.util.testing import assert_frame_equal +from odo.backends.avro import discover, avro_to_DataFrame, avro_to_iterator, resource + +import unittest +from odo.utils import tmpfile, into_path +from odo import append, convert, resource, dshape + +test_schema_str = """ +{ + "type" : "record", + "namespace" : "dataset", + "name" : "test_dataset", + "fields": [ + {"type": "int" , "name": "field_1"}, + {"type": "string", "name": "field_2"}, + {'default': None, 'name': u'field_3', 'type': [u'null', u'long']}, + { "name": "features", "type": { "type": "map", "values": "double"}} + ] +} +""" + +test_data = [ + {"field_1":1512357953,"field_2":"dgqbrudjnvvhdsa","field_3":-2529599232589628512,"features":{"glbumsyvgbmv":0.5262249719797424,"ivlmyhtbhpr":0.34864726886026076}}, + {"field_1":-422002496,"field_2":"ixjcpxaoqklbyp","field_3":2549258632438482521,"features":{"iqbxiagrparn":0.7367035457367471,"afjwfns":0.025058777723185655,"ndlrehie":0.4967780945917538,"di":0.810456042568542,"jgxyg":0.9625263303627106}}, + {"field_1":-811876665,"field_2":"efixmnaidxlva","field_3":None,"features":{"hdrvmxafp":0.872563073642209,"gair":0.518073477541009,"humgtonyoii":0.8214816239303898}}, + {"field_1":-761746797,"field_2":"toomqrltrbhechq","field_3":-3875987146454676496,"features":{"ksgwqlybs":0.3694600154809954,"ubtxnuhuqcqkvirkmvnbjsggvmbasnghdiwvfpwsihcfmgdcefadrqerqjhudteucjvjwhekgpjsytfrfjubqulsxmj":0.13827426007790045,"pxrjc":0.20131148613588346,"bmkalqykovaaqbc":0.8034756735507932,"xkkiigjtfgbnj":0.6750268550341367}}, + {"field_1":955048956,"field_2":"stogtdy","field_3":-3437921209509339504,"features":{"lndm":0.25025327732757696,"qeecpdxq":0.44506724761212746,"csosogkyuckanv":0.8739675025249061,"oyvnsshedr":0.8225465933655276}}, + {"field_1":-161240515,"field_2":"qcisooo","field_3":1311605767031982299,"features":{"gxias":0.11162484249218518,"pgdyatbp":0.5487679009288856,"itvxelmfnbrq":0.906496888778101,"kkififqgvs":0.863635294016924}}, + {"field_1":1956293589,"field_2":"rsidajtxnum","field_3":3246732014409650694,"features":{"":0.14456022508672162,"fknbdjfcnag":0.2742703631839102,"rtttccsdf":0.8117164405618132}}, + {"field_1":1554383058,"field_2":"xyahxnxc","field_3":None,"features":{"lvvw":0.8727205291174954,"pytquvfii":0.43930224145682706}}, + {"field_1":1955780624,"field_2":"wvbte","field_3":7012092663456604164,"features":{"uow":0.9091043815461999,"vbrqiknjrfli":0.43792647841064225,"djf":0.22671511625162166}}, + {"field_1":-1215933043,"field_2":"nsibpyqsuf","field_3":None,"features":{"":0.1543825213081953,"mndjtjl":0.5742890151730541,"gbhiam":0.3137956141490078,"cojkna":0.9846856885267534}}, +] + +ds = dshape("""var * { + field_1: int32, + field_2: string, + field_3: ?int64, + features: map[string, float64] + }""") + +test_path = into_path('backends', 'tests', 'test_file.avro') + +class TestAvro(unittest.TestCase): + + def setUp(self): + self.avrofile = resource(test_path) + + def test_resource_datafile(self): + self.assertIsInstance(resource(test_path), datafile.DataFileReader) + + def test_discover(self): + self.assertEquals(discover(self.avrofile), ds) + + def test_convert_avro_to_dataframe(self): + df = convert(pd.DataFrame, self.avrofile) + self.assertIsInstance(df, pd.DataFrame) + + names = ["field_1", "field_2", "field_3", "features"] + expected_output = pd.DataFrame(test_data, columns=names) + assert_frame_equal(df, expected_output) + + +if __name__=="__main__": + unittest.main() \ No newline at end of file diff --git a/odo/backends/tests/test_file.avro b/odo/backends/tests/test_file.avro new file mode 100755 index 0000000000000000000000000000000000000000..8e967b9a7e3c33b83e15aa995fd34f180b74915f GIT binary patch literal 1216 zcmeZI%3@>@ODrqO*DFrWNX<=bV5wFrsVqoUvQjEaP0lY$QPNS$OUwoFOHzwV;!_e! z5{pwyprXYEiOC=-glJl3YEDYAl2vpyOdD7v-Vmq*YHns;iBhc&n!FK0zPO|)GcO%e zi7{MBv{GJaP7aWplMhrFtD}^XnwD6aQ=()A5=S;WHL;|$D76@_6Yja(!~&pTSz=Bp zP!{Nc{L-YHRHfS5*jf>AXe8&Sq$cwL&CN+HNo8Q3|LvTk+mko1WcKPFd?0c}+Q2=EmL59+`nk=s)e18=3Zi1ylts&=MfFo#TVFecw@R1Td##Ng?mNCb9i0(Z2!aN z{_?$VD>g2RJZzV;FfB-tr&s&a_xJHH?SDRgwefDJTh8GN4>umTAXu}aykeILzf{MD zxi>f#JluGD>xvz#x3Vm$zrNqH$X~eA=W4ge_3b$l)t?;Be^|Nhr1a>XmlqNchE^^%#uq5T$g&74(td$uFtM`2jlU%aFX+}&{ zs#fuqM`lj!?0UViPI@k_D{D>*O-NhT?sfHS<7+Fs&CG1Pv8FXUzd!o%@uKz7&tX>C zg>yc?mH4!~+k5_-H@kV*Ry#?!n3y=lf7NxI@sxwhw{K&?foVTw{3a4dv5i&utl=FYW6PIdc3-2_d0F16G z%w1`+uI9|*_`KYtpZSh++qXt=F8F>!vuxMe-%=SWjU6So9Yu`(zZA2H($#PNb&ADh zLO*ML+3{2V?sumu)D|XH*<3zgaZ%*+|0{ie&fS>#l|S!K)sLl1LinxU|9$*`Bf~+g zbyjD{st3x8S3duBZ!gOx9S)|@r&719Y)js~xTQ3~uKw|ppR;ypg!Oc|t(xL-)9J~# zidP$JPp3}!`se^ZtI`v%9f{S~r$s8%TK#`mp*C08;KEaLelfdOGQVFxpSUW;sHFIR z$&JVznUW@5UGpQZq?}MoE>5qD@;}#<()#GD-kV9Q)Ei1R*DuOWwtdP{weh^O%F-p* zR+}Dtazf*TMTjx;<#%&$elPmB_4NWV=Vpy_jo0NXzI6Ux?y&aNEV0l@*{$!k-wtp1 IB9ERC0YeN$*8l(j literal 0 HcmV?d00001 From b2091f463e1d61bb799ed70f3d4a7758c625c866 Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Thu, 19 Nov 2015 18:06:47 -0500 Subject: [PATCH 02/29] Adding test for avro to iterator --- odo/backends/tests/test_avro.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/odo/backends/tests/test_avro.py b/odo/backends/tests/test_avro.py index 1d6bbc159..abeb90677 100644 --- a/odo/backends/tests/test_avro.py +++ b/odo/backends/tests/test_avro.py @@ -1,6 +1,7 @@ from __future__ import absolute_import, division, print_function from avro import datafile, io, schema +from collections import Iterator import pandas as pd from pandas.util.testing import assert_frame_equal from odo.backends.avro import discover, avro_to_DataFrame, avro_to_iterator, resource @@ -64,6 +65,11 @@ def test_convert_avro_to_dataframe(self): expected_output = pd.DataFrame(test_data, columns=names) assert_frame_equal(df, expected_output) + def test_convert_avro_to_iterator(self): + itr = convert(Iterator, self.avrofile) + self.assertIsInstance(itr, Iterator) + self.assertEqual(list(itr), test_data) + if __name__=="__main__": unittest.main() \ No newline at end of file From 4cbe99f8bc4aa5476830affebfeb4e884c4fed84 Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Wed, 25 Nov 2015 08:57:00 -0500 Subject: [PATCH 03/29] Enabling read and write, adding append methods. --- odo/backends/avro.py | 214 ++++++++++++++++++++++++++++---- odo/backends/tests/test_avro.py | 29 ++++- 2 files changed, 218 insertions(+), 25 deletions(-) diff --git a/odo/backends/avro.py b/odo/backends/avro.py index 87367bdd3..7c7474bf6 100644 --- a/odo/backends/avro.py +++ b/odo/backends/avro.py @@ -1,6 +1,10 @@ from __future__ import absolute_import, division, print_function +import errno +import os + from avro import schema, datafile, io +from avro.schema import AvroException import pandas as pd from datashape import discover, dshape, var, Record, Map, date_, datetime_, \ Option, null, string, int32, int64, float64, float32, boolean @@ -35,22 +39,173 @@ "TIMESTAMP": "string" } +class AVRO(object): + """Wrapper object for reading and writing an Avro container file -@resource.register('.+\.(avro)') -def resource_avro(uri, **kwargs): + Parameters + ---------- + + uri : str + uri of avro data + + schema : avro.schema.Schema + User specified Avro schema object. Used to decode file or serialize new records to file. + schema is required to create a new Avro file. + If reading or appending to an existing Avro file, the writers_schema embedded in that file + will be used. + + codec : str + compression codec. Valid values: 'null', 'deflate', 'snappy' + + """ + def __init__(self, uri, schema=None, codec='null', **kwargs): + self._uri = uri + self._schema = schema + self._codec = codec + self._kwargs = kwargs #CURRENTLY UNUSED + + if not schema: + sch = self._get_writers_schema() + if sch is None: + raise AvroException("Couldn't extract writers schema from '{0}'. User must provide a valid schema".format(uri)) + self._schema = sch + + def __iter__(self): + return self.reader + + def next(self): + return self.reader.next() + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + # Perform a close if there's no exception + if type is None: + self.reader.close() + self.writer.close() + + def _get_writers_schema(self): + """ + Extract writers schema embedded in an existing Avro file. + """ + reader = self.reader + return schema.parse(self.reader.meta['avro.schema']) if reader else None + + uri = property(lambda self: self._uri) + codec = property(lambda self: self._codec) + schema = property(lambda self: self._schema) + + @property + def reader(self): + if hasattr(self, '_reader'): + if hasattr(self, '_writer'): + self.flush() + return self._reader + else: + try: + rec_reader = io.DatumReader(readers_schema=self.schema) + + df_reader = datafile.DataFileReader( + open(self.uri, 'rb'), + rec_reader + ) + + return df_reader + except IOError as exc: + #If file doesn't exist, don't set _reader now. + #Allow for reevaluation later after file has been created. + #Otherwise, reraise exception + if exc.errno != errno.ENOENT: + raise exc + return None + + @staticmethod + def _get_append_writer(uri, writers_schema=None): + """ + Returns an isntance of avro.datafile.DataFileWriter for appending + to an existing avro file at `uri`. Does not take a writers schema, + because avro library requires that writers_schema embedded in existing + file be used for appending. + + Parameters + ---------- - rec_reader = io.DatumReader() - df_reader = datafile.DataFileReader( - open(uri, 'r'), - rec_reader - ) - return df_reader + uri : str + uri of avro existing, non-empty avro file -def _get_schema(f): - return schema.parse(f.meta['avro.schema']) + writers_schema : avro.schema.Schema object + If not None, checks that writers_schema in existing file is the same as supplied schema. + Avro does not allow writing records to a container file with multiple writers_schema. + + Returns + ------- + avro.datafile.DataFileWriter + """ + rec_writer = io.DatumWriter() + df_writer = datafile.DataFileWriter( + open(uri, 'ab+'), + rec_writer + ) + #Check for embedded schema to ensure existing file is an avro file. + embedded_schema = schema.parse(df_writer.get_meta('avro.schema')) + + #If writers_schema supplied, check for equality with embedded schema. + if writers_schema: + assert embedded_schema == writers_schema, \ + "writers_schema embedded in {uri} differs from user supplied schema for appending." + + return df_writer + + @staticmethod + def _get_new_writer(uri, sch): + """ + Returns an isntance of avro.datafile.DataFileWriter for writing + to a new avro file at `uri`. + + Parameters + ---------- + + uri : str + uri of avro existing, non-empty avro file + + sch : avro.schema.Schema object + + Returns + ------- + avro.datafile.DataFileWriter + """ + rec_writer = io.DatumWriter() + df_writer = datafile.DataFileWriter( + open(uri, 'wb'), + rec_writer, + writers_schema = sch + ) + return df_writer + + @property + def writer(self): + if hasattr(self, '_writer'): + return self._writer + else: + if os.path.exists(self.uri) and os.path.getsize(self.uri) > 0: + df_writer = self._get_append_writer(self.uri, self.schema) + else: + df_writer = self._get_new_writer(self.uri, self.schema) + self._writer = df_writer + return df_writer + + def flush(self): + if hasattr(self, '_writer'): + self._writer.close() + del(self._writer) + + +@resource.register('.+\.(avro)') +def resource_avro(uri, schema=None, **kwargs): + return AVRO(uri, schema=schema, **kwargs) def discover_schema(sch): - if isinstance(sch, schema.RecordSchema): return var * Record([(f.name, discover_schema(f.type)) for f in sch.fields]) elif isinstance(sch, schema.UnionSchema): @@ -61,8 +216,8 @@ def discover_schema(sch): assert len(types) == 1 return Option(AVRO_TYPE_MAP[types[0]]) except AssertionError: - import pdb; pdb.set_trace() - return null + raise TypeError("odo supports avro UnionSchema only for nullabel fields. " + "Received {0}".format(str([s.type for s in sch.schemas]))) elif isinstance(sch, schema.PrimitiveSchema): return AVRO_TYPE_MAP[sch.type] elif isinstance(sch, schema.MapSchema): @@ -72,19 +227,34 @@ def discover_schema(sch): else: raise Exception(str(type(sch))) -@discover.register(datafile.DataFileReader) +@discover.register(AVRO) def discover_avro(f, **kwargs): - return discover_schema(_get_schema(f)) + return discover_schema(f.schema) -@convert.register(pd.DataFrame, datafile.DataFileReader, cost=4.0) +@convert.register(pd.DataFrame, AVRO, cost=4.0) def avro_to_DataFrame(avro, dshape=None, **kwargs): + #XXX:AEH:todo - correct for pandas automated type conversions. e.g. strings containing numbers get cast to numeric. + #XXX:AEH:todo - column with nulls just becomes an "object" column. df = pd.DataFrame([r for r in avro]) - names = [f.name.decode('utf-8') for f in _get_schema(avro).fields] - df = df[names] #Reorder names to match avro schema - #names = [col.decode('utf-8') for col in avro - #df = df[names] # Reorder names to match sasfile + names = [f.name.decode('utf-8') for f in avro.schema.fields] + df = df[names] return df -@convert.register(Iterator, datafile.DataFileReader, cost=1.0) +@convert.register(Iterator, AVRO, cost=1.0) def avro_to_iterator(s, **kwargs): - return s \ No newline at end of file + return s + +@append.register(AVRO, Iterator) +def append_iterator_to_avro(tgt_avro, src_itr, **kwargs): + for datum in src_itr: + try: + tgt_avro.writer.append(datum) + except: + import pdb; pdb.set_trace() + _ = 1 + tgt_avro.flush() + +@append.register(AVRO, object) # anything else +def append_anything_to_list(tgt, src, **kwargs): + source_as_iter = convert(Iterator, src, **kwargs) + return append(tgt, source_as_iter, **kwargs) \ No newline at end of file diff --git a/odo/backends/tests/test_avro.py b/odo/backends/tests/test_avro.py index abeb90677..a0ddc1c79 100644 --- a/odo/backends/tests/test_avro.py +++ b/odo/backends/tests/test_avro.py @@ -4,9 +4,11 @@ from collections import Iterator import pandas as pd from pandas.util.testing import assert_frame_equal -from odo.backends.avro import discover, avro_to_DataFrame, avro_to_iterator, resource +from odo.backends.avro import discover, avro_to_DataFrame, avro_to_iterator, resource, AVRO import unittest +import tempfile + from odo.utils import tmpfile, into_path from odo import append, convert, resource, dshape @@ -18,7 +20,7 @@ "fields": [ {"type": "int" , "name": "field_1"}, {"type": "string", "name": "field_2"}, - {'default': None, 'name': u'field_3', 'type': [u'null', u'long']}, + {"default": null, "name": "field_3", "type": ["null", "long"]}, { "name": "features", "type": { "type": "map", "values": "double"}} ] } @@ -50,9 +52,13 @@ class TestAvro(unittest.TestCase): def setUp(self): self.avrofile = resource(test_path) + self.temp_output = tempfile.NamedTemporaryFile(delete=False, suffix=".avro") + + def tearDown(self): + self.temp_output.unlink(self.temp_output.name) def test_resource_datafile(self): - self.assertIsInstance(resource(test_path), datafile.DataFileReader) + self.assertIsInstance(resource(test_path), AVRO) def test_discover(self): self.assertEquals(discover(self.avrofile), ds) @@ -65,11 +71,28 @@ def test_convert_avro_to_dataframe(self): expected_output = pd.DataFrame(test_data, columns=names) assert_frame_equal(df, expected_output) + # def test_convert_avro_to_dataframe_roundtrip(self): + # df = convert(pd.DataFrame, self.avrofile) + # self.assertIsInstance(df, pd.DataFrame) + # + # avro2 = append(AVRO, pd.DataFrame) + # + # #assert_frame_equal(df, expected_output) + def test_convert_avro_to_iterator(self): itr = convert(Iterator, self.avrofile) self.assertIsInstance(itr, Iterator) self.assertEqual(list(itr), test_data) + def test_require_schema_for_new_file(self): + self.assertRaises(schema.AvroException, AVRO, "doesntexist.avro") + + def test_append_and_convert_round_trip(self): + x = AVRO(self.temp_output.name, schema=schema.parse(test_schema_str)) + append(x, test_data) + append(x, test_data) + assert convert(list, x) == test_data * 2 + if __name__=="__main__": unittest.main() \ No newline at end of file From e07cfa70e479875deda03749b61ae0d36bd26ad5 Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Mon, 30 Nov 2015 10:35:00 -0500 Subject: [PATCH 04/29] Adding dependencies to build files --- conda.recipe/meta.yaml | 1 + recommended-requirements.txt | 1 + 2 files changed, 2 insertions(+) diff --git a/conda.recipe/meta.yaml b/conda.recipe/meta.yaml index 644f06fec..7f6c5f7c0 100644 --- a/conda.recipe/meta.yaml +++ b/conda.recipe/meta.yaml @@ -31,6 +31,7 @@ requirements: test: requires: + - avro - pytest - h5py - pytables >=3.0.0 diff --git a/recommended-requirements.txt b/recommended-requirements.txt index 3fa45acac..29b0e5926 100644 --- a/recommended-requirements.txt +++ b/recommended-requirements.txt @@ -12,3 +12,4 @@ sas7bdat paramiko pywebhdfs boto +avro From 6cba4753af06a8cdcfea87e8df7e32b76482b266 Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Thu, 3 Dec 2015 10:12:42 -0500 Subject: [PATCH 05/29] Adding a convert edge from iterator back to avro --- odo/backends/avro.py | 19 +++++++++++++------ odo/backends/tests/test_avro.py | 8 -------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/odo/backends/avro.py b/odo/backends/avro.py index 7c7474bf6..c42cfc229 100644 --- a/odo/backends/avro.py +++ b/odo/backends/avro.py @@ -2,6 +2,7 @@ import errno import os +import uuid from avro import schema, datafile, io from avro.schema import AvroException @@ -12,6 +13,7 @@ from ..append import append from ..convert import convert from ..resource import resource +from ..temp import Temp AVRO_TYPE_MAP = { 'string': string, @@ -227,6 +229,8 @@ def discover_schema(sch): else: raise Exception(str(type(sch))) +@discover.register(schema.RecordSchema) + @discover.register(AVRO) def discover_avro(f, **kwargs): return discover_schema(f.schema) @@ -240,6 +244,13 @@ def avro_to_DataFrame(avro, dshape=None, **kwargs): df = df[names] return df +@convert.register(Temp(AVRO), Iterator, cost=1.0) +def convert_iterator_to_temporary_avro(data, schema=None, **kwargs): + fn = '.%s.avro' % uuid.uuid1() + avro = Temp(AVRO)(fn, schema, **kwargs) + return append(avro, data, **kwargs) + + @convert.register(Iterator, AVRO, cost=1.0) def avro_to_iterator(s, **kwargs): return s @@ -247,14 +258,10 @@ def avro_to_iterator(s, **kwargs): @append.register(AVRO, Iterator) def append_iterator_to_avro(tgt_avro, src_itr, **kwargs): for datum in src_itr: - try: - tgt_avro.writer.append(datum) - except: - import pdb; pdb.set_trace() - _ = 1 + tgt_avro.writer.append(datum) tgt_avro.flush() @append.register(AVRO, object) # anything else -def append_anything_to_list(tgt, src, **kwargs): +def append_anything_to_iterator(tgt, src, **kwargs): source_as_iter = convert(Iterator, src, **kwargs) return append(tgt, source_as_iter, **kwargs) \ No newline at end of file diff --git a/odo/backends/tests/test_avro.py b/odo/backends/tests/test_avro.py index a0ddc1c79..070655e93 100644 --- a/odo/backends/tests/test_avro.py +++ b/odo/backends/tests/test_avro.py @@ -71,14 +71,6 @@ def test_convert_avro_to_dataframe(self): expected_output = pd.DataFrame(test_data, columns=names) assert_frame_equal(df, expected_output) - # def test_convert_avro_to_dataframe_roundtrip(self): - # df = convert(pd.DataFrame, self.avrofile) - # self.assertIsInstance(df, pd.DataFrame) - # - # avro2 = append(AVRO, pd.DataFrame) - # - # #assert_frame_equal(df, expected_output) - def test_convert_avro_to_iterator(self): itr = convert(Iterator, self.avrofile) self.assertIsInstance(itr, Iterator) From 9ded2e1f358dab83d39d7f290b6278e7bbfb33cb Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Thu, 3 Dec 2015 10:13:06 -0500 Subject: [PATCH 06/29] Ensuring avro backend is registered on import --- odo/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/odo/__init__.py b/odo/__init__.py index 5eb2442f4..249966f1f 100644 --- a/odo/__init__.py +++ b/odo/__init__.py @@ -63,6 +63,8 @@ from .backends.sparksql import SparkDataFrame with ignoring(ImportError): from .backends.url import URL +with ignoring(ImportError): + from .backends.avro import AVRO restart_ordering() # Restart multipledispatch ordering and do ordering From d7496bad6fb519c5cfa425c1e6858e48c02f1df6 Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Thu, 3 Dec 2015 10:13:32 -0500 Subject: [PATCH 07/29] Fixing version problem with toolz --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index b617482e0..112840fa9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ datashape >= 0.4.6 numpy >= 1.7 pandas >= 0.15.0 -toolz >= 0.7.2 +toolz == 0.7.4 multipledispatch >= 0.4.7 networkx From 2adb2dfc7b5034f001bcfd2f9f232a81b8db6207 Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Fri, 18 Dec 2015 09:43:31 -0500 Subject: [PATCH 08/29] Adding support for array types --- odo/backends/avro.py | 25 +++++-------------------- odo/backends/tests/test_avro.py | 28 +++++++++++++++------------- odo/backends/tests/test_file.avro | Bin 1216 -> 1419 bytes 3 files changed, 20 insertions(+), 33 deletions(-) diff --git a/odo/backends/avro.py b/odo/backends/avro.py index c42cfc229..8d309a833 100644 --- a/odo/backends/avro.py +++ b/odo/backends/avro.py @@ -7,7 +7,7 @@ from avro import schema, datafile, io from avro.schema import AvroException import pandas as pd -from datashape import discover, dshape, var, Record, Map, date_, datetime_, \ +from datashape import discover, var, Record, Map, Var, \ Option, null, string, int32, int64, float64, float32, boolean from collections import Iterator from ..append import append @@ -23,22 +23,9 @@ 'double': float64, 'float': float32, 'bool': boolean, -} - -TD_AVRO_TYPES = { - "BIGINT": "long", - "BYTEINT": "int", - "DECIMAL_SHORT": "int", - "DECIMAL_LONG": "long", - "DECIMAL": "double", - "FLOAT": "double", - "INT": "int", - "INTEGER": "int", - "SMALLINT": "int", - "CHAR": "string", - "DATE": "string", - "TIME": "string", - "TIMESTAMP": "string" + 'map': Map, + 'record': Record, + 'array': Var, } class AVRO(object): @@ -225,12 +212,10 @@ def discover_schema(sch): elif isinstance(sch, schema.MapSchema): return Map(string, discover_schema(sch.values)) elif isinstance(sch, schema.ArraySchema): - raise Exception("ArraySchema TODO") + return var * discover_schema(sch.items) else: raise Exception(str(type(sch))) -@discover.register(schema.RecordSchema) - @discover.register(AVRO) def discover_avro(f, **kwargs): return discover_schema(f.schema) diff --git a/odo/backends/tests/test_avro.py b/odo/backends/tests/test_avro.py index 070655e93..0c444f295 100644 --- a/odo/backends/tests/test_avro.py +++ b/odo/backends/tests/test_avro.py @@ -21,29 +21,31 @@ {"type": "int" , "name": "field_1"}, {"type": "string", "name": "field_2"}, {"default": null, "name": "field_3", "type": ["null", "long"]}, - { "name": "features", "type": { "type": "map", "values": "double"}} + { "name": "features", "type": { "type": "map", "values": "double"}}, + { "name": "words", "type": {"type": "array", "items": "string"}} ] } """ test_data = [ - {"field_1":1512357953,"field_2":"dgqbrudjnvvhdsa","field_3":-2529599232589628512,"features":{"glbumsyvgbmv":0.5262249719797424,"ivlmyhtbhpr":0.34864726886026076}}, - {"field_1":-422002496,"field_2":"ixjcpxaoqklbyp","field_3":2549258632438482521,"features":{"iqbxiagrparn":0.7367035457367471,"afjwfns":0.025058777723185655,"ndlrehie":0.4967780945917538,"di":0.810456042568542,"jgxyg":0.9625263303627106}}, - {"field_1":-811876665,"field_2":"efixmnaidxlva","field_3":None,"features":{"hdrvmxafp":0.872563073642209,"gair":0.518073477541009,"humgtonyoii":0.8214816239303898}}, - {"field_1":-761746797,"field_2":"toomqrltrbhechq","field_3":-3875987146454676496,"features":{"ksgwqlybs":0.3694600154809954,"ubtxnuhuqcqkvirkmvnbjsggvmbasnghdiwvfpwsihcfmgdcefadrqerqjhudteucjvjwhekgpjsytfrfjubqulsxmj":0.13827426007790045,"pxrjc":0.20131148613588346,"bmkalqykovaaqbc":0.8034756735507932,"xkkiigjtfgbnj":0.6750268550341367}}, - {"field_1":955048956,"field_2":"stogtdy","field_3":-3437921209509339504,"features":{"lndm":0.25025327732757696,"qeecpdxq":0.44506724761212746,"csosogkyuckanv":0.8739675025249061,"oyvnsshedr":0.8225465933655276}}, - {"field_1":-161240515,"field_2":"qcisooo","field_3":1311605767031982299,"features":{"gxias":0.11162484249218518,"pgdyatbp":0.5487679009288856,"itvxelmfnbrq":0.906496888778101,"kkififqgvs":0.863635294016924}}, - {"field_1":1956293589,"field_2":"rsidajtxnum","field_3":3246732014409650694,"features":{"":0.14456022508672162,"fknbdjfcnag":0.2742703631839102,"rtttccsdf":0.8117164405618132}}, - {"field_1":1554383058,"field_2":"xyahxnxc","field_3":None,"features":{"lvvw":0.8727205291174954,"pytquvfii":0.43930224145682706}}, - {"field_1":1955780624,"field_2":"wvbte","field_3":7012092663456604164,"features":{"uow":0.9091043815461999,"vbrqiknjrfli":0.43792647841064225,"djf":0.22671511625162166}}, - {"field_1":-1215933043,"field_2":"nsibpyqsuf","field_3":None,"features":{"":0.1543825213081953,"mndjtjl":0.5742890151730541,"gbhiam":0.3137956141490078,"cojkna":0.9846856885267534}}, + {"field_1":2072373602,"field_2":"mxllbfxk","field_3":-3887990995227229804,"features":{"bhettcdl":0.8581552641969377,"vdqvnqgqbrjtkug":0.4938648291874551,"sgmlbagyfb":0.5796466618955293,"ka":0.9873135485253831},"words":["ciplc","htvixoujptehr","rbeiimkevsn"]}, + {"field_1":517434305,"field_2":"frgcnqrocddimu","field_3":None,"features":{"atqqsuttysdrursxlynwcrmfrwcrdxaegfnidvwjxamoj":0.2697279678696263,"kjb":0.8279248178446112,"wqlecjb":0.8241169129373344,"inihhrtnawyopu":0.08511455977126114,"dpjw":0.760489536392584},"words":["ignsrafxpgu","ckg"]}, + {"field_1":1925434607,"field_2":"aurlydvgfygmu","field_3":None,"features":{"crslipya":0.1596449079423896,"":0.4304848508533662,"imbfgwnaphh":0.19323554138270294},"words":["rqdpanbbcemg","auurshsxxkp","rdngxdthekt"]}, + {"field_1":636669589,"field_2":"","field_3":-1858103537322807465,"features":{"dv":0.9635053430456509,"lhljgywersxjp":0.5289026834129389,"nmtns":0.7645922724023969},"words":["vviuffehxh","jpquemsx","xnoj",""]}, + {"field_1":-1311284713,"field_2":"infejerere","field_3":5673921375069484569,"features":{"iaen":0.7412670573684966,"ekqfnn":0.6685382939302145,"innfcqqbdrpcdn":0.39528359165136695,"fd":0.8572519278668735,"fbryid":0.7244784428105817},"words":["ciqu","emfruneloqh"]}, + {"field_1":1716247766,"field_2":"gmmfghijngo","field_3":None,"features":{"ourul":0.1849234265503661,"vhvwhech":0.41140968300430625,"m":0.9576395352199625,"fgh":0.9547116485401502,"gqpdtvncno":0.027038814818686197},"words":["ugwcfecipffmkwi","kttgclwjlk","siejdtrpjkqennx","ixwrpmywtbgiygaoxpwnvuckdygttsssqfrplbyyv","mfsrhne"]}, + {"field_1":101453273,"field_2":"frjaqnrbfspsuw","field_3":None,"features":{"ffps":0.02989888991738765,"fxkhyomw":0.2963204572188527},"words":["jwi","rfxlxngyethg"]}, + {"field_1":-1792425886,"field_2":"pqkawoyw","field_3":None,"features":{"vsovnbsdhbkydf":0.09777409545072746,"eovoiix":0.10890846076556715},"words":["xntmmvpbrq","uof"]}, + {"field_1":-1828393530,"field_2":"nkflrmkxiry","field_3":None,"features":{"qewmpdviapfyjma":0.8727493942139006},"words":["lgtrtjhpf"]}, + {"field_1":1048099453,"field_2":"jsle","field_3":None,"features":{"qbndce":0.5459572647413652},"words":["d"]}, ] ds = dshape("""var * { field_1: int32, field_2: string, field_3: ?int64, - features: map[string, float64] + features: map[string, float64], + words: var * string }""") test_path = into_path('backends', 'tests', 'test_file.avro') @@ -67,7 +69,7 @@ def test_convert_avro_to_dataframe(self): df = convert(pd.DataFrame, self.avrofile) self.assertIsInstance(df, pd.DataFrame) - names = ["field_1", "field_2", "field_3", "features"] + names = ["field_1", "field_2", "field_3", "features", "words"] expected_output = pd.DataFrame(test_data, columns=names) assert_frame_equal(df, expected_output) diff --git a/odo/backends/tests/test_file.avro b/odo/backends/tests/test_file.avro index 8e967b9a7e3c33b83e15aa995fd34f180b74915f..b3ea5d127fbafd70aaf8442b2a970875a1f64633 100755 GIT binary patch literal 1419 zcmZuxdx%v<7(d=_*XwOvZ*sZFA{*Q&r535Qhf8?K{s_V{)c(D27Hw4^tEivoQ0C>9!c*gEZ)&Bx!=KQtYbjil}8-)>-jI#=yY*zVG+({eJV! zOt;)*(FhSW4^1=CHXG3QKHQE~z*@1+Owgw0O&{-p;J0Q?NA$=890@y>L8z`oT zLDguV(|L_{1?hM&XI!mIxo&i|{U`p=jkwa1a&l|R#Oo<@8a!ewKe8ZCoZHrb2!n{} zM*9$cb8Q54B$2w#ye`fMA_rr@qCA=l2qKH2M%PW;Bp3fbL?9)|aw^rxA7WnXwrg%z z?TU|f0FhlDYQ)&L;9w7W^wQZUUY@>j1H1fe?XBwW4>xQdRrOQGES!3UAOHA!!580L zIPaCKmV>mmA^YpInZpb5X|pwkVa!7kTGG{CM0P$izrAz$F|($K&n!SPY;B1OtVjSJFa$R!^h$PEkOmIf>?n+f#Dn>;H(z8!5nG)v?ad6OVVGeUimZ01USG@MEF=NlKy=H~`nyZDo-hJfNB=5dl zi(?vLjGWY|x+0A7nMlhi4_w38|NHI}BWjdmU{S@5Ubv3l*L)q`d!=NvvE!+#bJnl%y%sAF#P;LEMqJ=KlNO{3Bp zk;KL*pEHboFQGM0Yhts+WnQbI$c0)8*9#Hn=?t1CQutY-EkZK_f>b0tj%*KRL~Er~ zh^1gwmc`9JR?^|f*u1Ux#e6zl5OQf@B~+B;3NaRHRnx*n->+C{R`ZwRWPzXb4x3y% z_1l5mfs&ihF%qXN%^woVh+& zR(tI3*9T6UMqxA~T58u3*!c70p1$E_S21K1nnH_18;$GwU|wf>JkKk@TRw+>0j*0o AzW@LL delta 977 zcmV;?11|iF3&05yPhx5T1Qua-a&InkV`ybKwf-!Qd3c6zmNZ(eF zG8#>lm?or6Ow>f(+1;6){V}^cvpct&nx-HHr`X1YQ212TE>@e6Yx zp^=%d&bB|bvIx>9j$+*R{A40AaT`c?I*Ogi%{n5HLlvS$wos&G*3g)6A_aM9V5`3I zRvDzQLYQ27Wh!&D;x4kY=WbO4ZGQ>pDKO0|pIUHGH}E;`s5= zJAcLVRyu2Oob&A%^1F*mpX}_r3^DRa2NbL$|PJ@;E>YnIZH;n@5h>^jq-W>cBaW_9AAgUlt47 z@;{7&X6RUo?Z=iY6vc;Xb*k`jc5o4Fj(;qR5NrkrLux*xwv;dR+yv5U_rl=~86LPe z&|-P+(24%Gv9+_WrcyT5oM*lc_ij&I1a-cytDYA5rMpkQ@b~^%(4=ygyV#0D)l#VF z=ze(Mtv}14-i-z2+|V_)wB@^20Yru77;Y zqhD@qV)<`Dorkqp3844bDR#Q8`x9o=@vX%EiZ%m$^=A^j|G2r$IAZS zi)A2vabkKr)4({<6!ZVoj{MHpne`8D{CfDMq*M<(@BQQ82xveSg_?;}s^BA|sptCM zy#%r>2m)2*8rnKNbnfWdB9K0RCs%+4pk( zwdc8SAg^%1E2QJK}O{4>Kwf-!Qd3cQh4}# From 491c4d19fcc4f107cbf87d72730b81fc4d3cd35c Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Fri, 18 Dec 2015 11:18:22 -0500 Subject: [PATCH 09/29] Attempting to fix travis build --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index b1df14b0a..3da61f5fe 100644 --- a/.travis.yml +++ b/.travis.yml @@ -59,7 +59,7 @@ install: # Install various deps - conda uninstall toolz - - pip install -U toolz sas7bdat psycopg2 dill 'pymongo<3' + - pip install -U toolz sas7bdat psycopg2 dill 'pymongo<3' avro - pip install --upgrade git+git://github.com/blaze/dask.git#egg=dask-dev[complete] - if [ -n "$PANDAS_VERSION" ]; then pip install $PANDAS_VERSION; fi From 2eeba0a7a8832f72da234096ccd320295ec3c398 Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Fri, 18 Dec 2015 12:11:48 -0500 Subject: [PATCH 10/29] Reformatting test data --- odo/backends/tests/test_avro.py | 86 ++++++++++++++++++++++++++++----- 1 file changed, 74 insertions(+), 12 deletions(-) diff --git a/odo/backends/tests/test_avro.py b/odo/backends/tests/test_avro.py index 0c444f295..caaa3e1eb 100644 --- a/odo/backends/tests/test_avro.py +++ b/odo/backends/tests/test_avro.py @@ -27,18 +27,80 @@ } """ -test_data = [ - {"field_1":2072373602,"field_2":"mxllbfxk","field_3":-3887990995227229804,"features":{"bhettcdl":0.8581552641969377,"vdqvnqgqbrjtkug":0.4938648291874551,"sgmlbagyfb":0.5796466618955293,"ka":0.9873135485253831},"words":["ciplc","htvixoujptehr","rbeiimkevsn"]}, - {"field_1":517434305,"field_2":"frgcnqrocddimu","field_3":None,"features":{"atqqsuttysdrursxlynwcrmfrwcrdxaegfnidvwjxamoj":0.2697279678696263,"kjb":0.8279248178446112,"wqlecjb":0.8241169129373344,"inihhrtnawyopu":0.08511455977126114,"dpjw":0.760489536392584},"words":["ignsrafxpgu","ckg"]}, - {"field_1":1925434607,"field_2":"aurlydvgfygmu","field_3":None,"features":{"crslipya":0.1596449079423896,"":0.4304848508533662,"imbfgwnaphh":0.19323554138270294},"words":["rqdpanbbcemg","auurshsxxkp","rdngxdthekt"]}, - {"field_1":636669589,"field_2":"","field_3":-1858103537322807465,"features":{"dv":0.9635053430456509,"lhljgywersxjp":0.5289026834129389,"nmtns":0.7645922724023969},"words":["vviuffehxh","jpquemsx","xnoj",""]}, - {"field_1":-1311284713,"field_2":"infejerere","field_3":5673921375069484569,"features":{"iaen":0.7412670573684966,"ekqfnn":0.6685382939302145,"innfcqqbdrpcdn":0.39528359165136695,"fd":0.8572519278668735,"fbryid":0.7244784428105817},"words":["ciqu","emfruneloqh"]}, - {"field_1":1716247766,"field_2":"gmmfghijngo","field_3":None,"features":{"ourul":0.1849234265503661,"vhvwhech":0.41140968300430625,"m":0.9576395352199625,"fgh":0.9547116485401502,"gqpdtvncno":0.027038814818686197},"words":["ugwcfecipffmkwi","kttgclwjlk","siejdtrpjkqennx","ixwrpmywtbgiygaoxpwnvuckdygttsssqfrplbyyv","mfsrhne"]}, - {"field_1":101453273,"field_2":"frjaqnrbfspsuw","field_3":None,"features":{"ffps":0.02989888991738765,"fxkhyomw":0.2963204572188527},"words":["jwi","rfxlxngyethg"]}, - {"field_1":-1792425886,"field_2":"pqkawoyw","field_3":None,"features":{"vsovnbsdhbkydf":0.09777409545072746,"eovoiix":0.10890846076556715},"words":["xntmmvpbrq","uof"]}, - {"field_1":-1828393530,"field_2":"nkflrmkxiry","field_3":None,"features":{"qewmpdviapfyjma":0.8727493942139006},"words":["lgtrtjhpf"]}, - {"field_1":1048099453,"field_2":"jsle","field_3":None,"features":{"qbndce":0.5459572647413652},"words":["d"]}, -] +test_data = [{'features': {'bhettcdl': 0.8581552641969377, + 'ka': 0.9873135485253831, + 'sgmlbagyfb': 0.5796466618955293, + 'vdqvnqgqbrjtkug': 0.4938648291874551}, + 'field_1': 2072373602, + 'field_2': 'mxllbfxk', + 'field_3': -3887990995227229804, + 'words': ['ciplc', 'htvixoujptehr', 'rbeiimkevsn']}, + {'features': {'atqqsuttysdrursxlynwcrmfrwcrdxaegfnidvwjxamoj': 0.2697279678696263, + 'dpjw': 0.760489536392584, + 'inihhrtnawyopu': 0.08511455977126114, + 'kjb': 0.8279248178446112, + 'wqlecjb': 0.8241169129373344}, + 'field_1': 517434305, + 'field_2': 'frgcnqrocddimu', + 'field_3': None, + 'words': ['ignsrafxpgu', 'ckg']}, + {'features': {'': 0.4304848508533662, + 'crslipya': 0.1596449079423896, + 'imbfgwnaphh': 0.19323554138270294}, + 'field_1': 1925434607, + 'field_2': 'aurlydvgfygmu', + 'field_3': None, + 'words': ['rqdpanbbcemg', 'auurshsxxkp', 'rdngxdthekt']}, + {'features': {'dv': 0.9635053430456509, + 'lhljgywersxjp': 0.5289026834129389, + 'nmtns': 0.7645922724023969}, + 'field_1': 636669589, + 'field_2': '', + 'field_3': -1858103537322807465, + 'words': ['vviuffehxh', 'jpquemsx', 'xnoj', '']}, + {'features': {'ekqfnn': 0.6685382939302145, + 'fbryid': 0.7244784428105817, + 'fd': 0.8572519278668735, + 'iaen': 0.7412670573684966, + 'innfcqqbdrpcdn': 0.39528359165136695}, + 'field_1': -1311284713, + 'field_2': 'infejerere', + 'field_3': 5673921375069484569, + 'words': ['ciqu', 'emfruneloqh']}, + {'features': {'fgh': 0.9547116485401502, + 'gqpdtvncno': 0.027038814818686197, + 'm': 0.9576395352199625, + 'ourul': 0.1849234265503661, + 'vhvwhech': 0.41140968300430625}, + 'field_1': 1716247766, + 'field_2': 'gmmfghijngo', + 'field_3': None, + 'words': ['ugwcfecipffmkwi', + 'kttgclwjlk', + 'siejdtrpjkqennx', + 'ixwrpmywtbgiygaoxpwnvuckdygttsssqfrplbyyv', + 'mfsrhne']}, + {'features': {'ffps': 0.02989888991738765, 'fxkhyomw': 0.2963204572188527}, + 'field_1': 101453273, + 'field_2': 'frjaqnrbfspsuw', + 'field_3': None, + 'words': ['jwi', 'rfxlxngyethg']}, + {'features': {'eovoiix': 0.10890846076556715, + 'vsovnbsdhbkydf': 0.09777409545072746}, + 'field_1': -1792425886, + 'field_2': 'pqkawoyw', + 'field_3': None, + 'words': ['xntmmvpbrq', 'uof']}, + {'features': {'qewmpdviapfyjma': 0.8727493942139006}, + 'field_1': -1828393530, + 'field_2': 'nkflrmkxiry', + 'field_3': None, + 'words': ['lgtrtjhpf']}, + {'features': {'qbndce': 0.5459572647413652}, + 'field_1': 1048099453, + 'field_2': 'jsle', + 'field_3': None, + 'words': ['d']}] ds = dshape("""var * { field_1: int32, From 471beccf4af37c6f11094d8d4e3030db3aae0ba2 Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Fri, 18 Dec 2015 12:13:29 -0500 Subject: [PATCH 11/29] Adding conda selector for avro, since it is not Python 3 compatible --- conda.recipe/meta.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conda.recipe/meta.yaml b/conda.recipe/meta.yaml index 7a420465a..351f9cc2c 100644 --- a/conda.recipe/meta.yaml +++ b/conda.recipe/meta.yaml @@ -31,7 +31,7 @@ requirements: test: requires: - - avro + - avro # [py27] - pytest - h5py - pytables >=3.0.0 From e05d1e13827f6327e13a1877b63e80d89c60a6ba Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Sat, 19 Dec 2015 15:51:48 -0500 Subject: [PATCH 12/29] Small fixes from code review. --- odo/backends/avro.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/odo/backends/avro.py b/odo/backends/avro.py index 8d309a833..5802e226d 100644 --- a/odo/backends/avro.py +++ b/odo/backends/avro.py @@ -133,7 +133,7 @@ def _get_append_writer(uri, writers_schema=None): """ rec_writer = io.DatumWriter() df_writer = datafile.DataFileWriter( - open(uri, 'ab+'), + open(uri, 'a+b'), rec_writer ) #Check for embedded schema to ensure existing file is an avro file. @@ -141,8 +141,10 @@ def _get_append_writer(uri, writers_schema=None): #If writers_schema supplied, check for equality with embedded schema. if writers_schema: - assert embedded_schema == writers_schema, \ - "writers_schema embedded in {uri} differs from user supplied schema for appending." + try: + assert embedded_schema == writers_schema + except AssertionError: + raise ValueError("writers_schema embedded in {uri} differs from user supplied schema for appending.") return df_writer @@ -210,11 +212,12 @@ def discover_schema(sch): elif isinstance(sch, schema.PrimitiveSchema): return AVRO_TYPE_MAP[sch.type] elif isinstance(sch, schema.MapSchema): + # Avro map types always have string keys, see https://avro.apache.org/docs/1.7.7/spec.html#Maps return Map(string, discover_schema(sch.values)) elif isinstance(sch, schema.ArraySchema): return var * discover_schema(sch.items) else: - raise Exception(str(type(sch))) + raise TypeError('Unable to discover avro type %r' % type(sch).__name__) @discover.register(AVRO) def discover_avro(f, **kwargs): @@ -225,7 +228,7 @@ def avro_to_DataFrame(avro, dshape=None, **kwargs): #XXX:AEH:todo - correct for pandas automated type conversions. e.g. strings containing numbers get cast to numeric. #XXX:AEH:todo - column with nulls just becomes an "object" column. df = pd.DataFrame([r for r in avro]) - names = [f.name.decode('utf-8') for f in avro.schema.fields] + names = [f.name for f in avro.schema.fields] df = df[names] return df @@ -238,7 +241,7 @@ def convert_iterator_to_temporary_avro(data, schema=None, **kwargs): @convert.register(Iterator, AVRO, cost=1.0) def avro_to_iterator(s, **kwargs): - return s + return iter(s) @append.register(AVRO, Iterator) def append_iterator_to_avro(tgt_avro, src_itr, **kwargs): From 31821d2711f38a409908f10e64222a7728a3ea79 Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Sat, 19 Dec 2015 18:09:25 -0500 Subject: [PATCH 13/29] Converting tests to pytest idiom. --- odo/backends/tests/test_avro.py | 93 +++++++++++++++++---------------- 1 file changed, 48 insertions(+), 45 deletions(-) diff --git a/odo/backends/tests/test_avro.py b/odo/backends/tests/test_avro.py index caaa3e1eb..23c5fd522 100644 --- a/odo/backends/tests/test_avro.py +++ b/odo/backends/tests/test_avro.py @@ -1,17 +1,17 @@ from __future__ import absolute_import, division, print_function -from avro import datafile, io, schema +#from avro import schema from collections import Iterator import pandas as pd from pandas.util.testing import assert_frame_equal -from odo.backends.avro import discover, avro_to_DataFrame, avro_to_iterator, resource, AVRO - -import unittest -import tempfile +from odo.backends.avro import discover, AVRO from odo.utils import tmpfile, into_path from odo import append, convert, resource, dshape +import pytest +schema = pytest.importorskip('avro.schema') + test_schema_str = """ { "type" : "record", @@ -112,43 +112,46 @@ test_path = into_path('backends', 'tests', 'test_file.avro') -class TestAvro(unittest.TestCase): - - def setUp(self): - self.avrofile = resource(test_path) - self.temp_output = tempfile.NamedTemporaryFile(delete=False, suffix=".avro") - - def tearDown(self): - self.temp_output.unlink(self.temp_output.name) - - def test_resource_datafile(self): - self.assertIsInstance(resource(test_path), AVRO) - - def test_discover(self): - self.assertEquals(discover(self.avrofile), ds) - - def test_convert_avro_to_dataframe(self): - df = convert(pd.DataFrame, self.avrofile) - self.assertIsInstance(df, pd.DataFrame) - - names = ["field_1", "field_2", "field_3", "features", "words"] - expected_output = pd.DataFrame(test_data, columns=names) - assert_frame_equal(df, expected_output) - - def test_convert_avro_to_iterator(self): - itr = convert(Iterator, self.avrofile) - self.assertIsInstance(itr, Iterator) - self.assertEqual(list(itr), test_data) - - def test_require_schema_for_new_file(self): - self.assertRaises(schema.AvroException, AVRO, "doesntexist.avro") - - def test_append_and_convert_round_trip(self): - x = AVRO(self.temp_output.name, schema=schema.parse(test_schema_str)) - append(x, test_data) - append(x, test_data) - assert convert(list, x) == test_data * 2 - - -if __name__=="__main__": - unittest.main() \ No newline at end of file +@pytest.fixture +def avrofile(): + return resource(test_path) + +@pytest.yield_fixture +def temp_output_path(): + with tmpfile('.avro') as fn: + yield fn + +def test_discover(avrofile): + assert discover(avrofile) == ds + +def test_resource_datafile(): + assert isinstance(resource(test_path), AVRO) + +def test_convert_avro_to_dataframe(avrofile): + df = convert(pd.DataFrame, avrofile) + + assert isinstance(df, pd.DataFrame) + + names = ["field_1", "field_2", "field_3", "features", "words"] + expected_output = pd.DataFrame(test_data, columns=names) + assert_frame_equal(df, expected_output) + +def test_convert_avro_to_iterator(avrofile): + itr = convert(Iterator, avrofile) + assert isinstance(itr, Iterator) + assert list(itr) == test_data + +def test_require_schema_for_new_file(): + try: + x = AVRO("doesntexist.avro") + assert False, "Previous line should throw an schema.AvroException" + except schema.AvroException: + assert True + except Exception: + assert False + +def test_append_and_convert_round_trip(temp_output_path): + x = AVRO(temp_output_path, schema=schema.parse(test_schema_str)) + append(x, test_data) + append(x, test_data) + assert convert(list, x) == test_data * 2 From 3773348f9f7d693a40352d6d65b3b31d8a7e707a Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Sat, 19 Dec 2015 18:11:05 -0500 Subject: [PATCH 14/29] Changing import order to ensure test skip if avro not installed --- odo/backends/tests/test_avro.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/odo/backends/tests/test_avro.py b/odo/backends/tests/test_avro.py index 23c5fd522..c2a8c17ab 100644 --- a/odo/backends/tests/test_avro.py +++ b/odo/backends/tests/test_avro.py @@ -1,6 +1,8 @@ from __future__ import absolute_import, division, print_function -#from avro import schema +import pytest +schema = pytest.importorskip('avro.schema') + from collections import Iterator import pandas as pd from pandas.util.testing import assert_frame_equal @@ -9,9 +11,6 @@ from odo.utils import tmpfile, into_path from odo import append, convert, resource, dshape -import pytest -schema = pytest.importorskip('avro.schema') - test_schema_str = """ { "type" : "record", From 2c0246ac47d96123814882b8fdb9f00365dbf856 Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Sat, 19 Dec 2015 18:14:59 -0500 Subject: [PATCH 15/29] Final small changes requested from code review --- odo/backends/avro.py | 1 - requirements.txt | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/odo/backends/avro.py b/odo/backends/avro.py index 5802e226d..9768aff9a 100644 --- a/odo/backends/avro.py +++ b/odo/backends/avro.py @@ -51,7 +51,6 @@ def __init__(self, uri, schema=None, codec='null', **kwargs): self._uri = uri self._schema = schema self._codec = codec - self._kwargs = kwargs #CURRENTLY UNUSED if not schema: sch = self._get_writers_schema() diff --git a/requirements.txt b/requirements.txt index 112840fa9..16078a1b1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ datashape >= 0.4.6 numpy >= 1.7 pandas >= 0.15.0 -toolz == 0.7.4 +toolz >= 0.7.4 multipledispatch >= 0.4.7 networkx From 01b67a4fd3c2a3f44803fd9704c192b9a6e15f4f Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Mon, 21 Dec 2015 19:29:39 -0500 Subject: [PATCH 16/29] Use multipledispatch for schema discovery --- odo/backends/avro.py | 53 ++++++++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/odo/backends/avro.py b/odo/backends/avro.py index 9768aff9a..2143710ed 100644 --- a/odo/backends/avro.py +++ b/odo/backends/avro.py @@ -6,6 +6,7 @@ from avro import schema, datafile, io from avro.schema import AvroException +from multipledispatch import dispatch import pandas as pd from datashape import discover, var, Record, Map, Var, \ Option, null, string, int32, int64, float64, float32, boolean @@ -195,28 +196,38 @@ def flush(self): def resource_avro(uri, schema=None, **kwargs): return AVRO(uri, schema=schema, **kwargs) +@dispatch(schema.RecordSchema) def discover_schema(sch): - if isinstance(sch, schema.RecordSchema): - return var * Record([(f.name, discover_schema(f.type)) for f in sch.fields]) - elif isinstance(sch, schema.UnionSchema): - try: - types = [s.type for s in sch.schemas] - assert "null" in types - types.remove("null") - assert len(types) == 1 - return Option(AVRO_TYPE_MAP[types[0]]) - except AssertionError: - raise TypeError("odo supports avro UnionSchema only for nullabel fields. " - "Received {0}".format(str([s.type for s in sch.schemas]))) - elif isinstance(sch, schema.PrimitiveSchema): - return AVRO_TYPE_MAP[sch.type] - elif isinstance(sch, schema.MapSchema): - # Avro map types always have string keys, see https://avro.apache.org/docs/1.7.7/spec.html#Maps - return Map(string, discover_schema(sch.values)) - elif isinstance(sch, schema.ArraySchema): - return var * discover_schema(sch.items) - else: - raise TypeError('Unable to discover avro type %r' % type(sch).__name__) + return var * Record([(f.name, discover_schema(f.type)) for f in sch.fields]) + +@dispatch(schema.UnionSchema) +def discover_schema(sch): + try: + types = [s.type for s in sch.schemas] + assert "null" in types + types.remove("null") + assert len(types) == 1 + return Option(AVRO_TYPE_MAP[types[0]]) + except AssertionError: + raise TypeError("odo supports avro UnionSchema only for nullable fields." + \ + "Received {0}".format(str([s.type for s in sch.schemas]))) + +@dispatch(schema.PrimitiveSchema) +def discover_schema(sch): + return AVRO_TYPE_MAP[sch.type] + +@dispatch(schema.MapSchema) +def discover_schema(sch): + # Avro map types always have string keys, see https://avro.apache.org/docs/1.7.7/spec.html#Maps + return Map(string, discover_schema(sch.values)) + +@dispatch(schema.ArraySchema) +def discover_schema(sch): + return var * discover_schema(sch.items) + +@dispatch(object) +def discover_schema(sch): + raise TypeError('Unable to discover avro type %r' % type(sch).__name__) @discover.register(AVRO) def discover_avro(f, **kwargs): From a583e5514babaa636defb83d72670abb66479811 Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Mon, 21 Dec 2015 22:11:42 -0500 Subject: [PATCH 17/29] Bug fix for boolean types --- odo/backends/avro.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/odo/backends/avro.py b/odo/backends/avro.py index 2143710ed..bd0a4d5c9 100644 --- a/odo/backends/avro.py +++ b/odo/backends/avro.py @@ -23,7 +23,7 @@ 'null': null, 'double': float64, 'float': float32, - 'bool': boolean, + 'boolean': boolean, 'map': Map, 'record': Record, 'array': Var, From eb004c76bf635c9e371ab1105510c67fada07688 Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Tue, 29 Dec 2015 18:13:52 -0500 Subject: [PATCH 18/29] Cleanup --- odo/backends/avro.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/odo/backends/avro.py b/odo/backends/avro.py index bd0a4d5c9..5ddc4af24 100644 --- a/odo/backends/avro.py +++ b/odo/backends/avro.py @@ -9,14 +9,14 @@ from multipledispatch import dispatch import pandas as pd from datashape import discover, var, Record, Map, Var, \ - Option, null, string, int32, int64, float64, float32, boolean + Option, null, string, int8, int32, int64, float64, float32, boolean, bytes_ from collections import Iterator from ..append import append from ..convert import convert from ..resource import resource from ..temp import Temp -AVRO_TYPE_MAP = { +PRIMITIVE_TYPES_MAP = { 'string': string, 'int': int32, 'long': int64, @@ -24,11 +24,28 @@ 'double': float64, 'float': float32, 'boolean': boolean, - 'map': Map, 'record': Record, +} + +NAMED_TYPES_MAP = { + 'fixed': bytes_, #TODO + 'enum': int8, #TODO + 'record': Record, + 'error': Record, #TODO +} + +COMPOUND_TYPES_MAP = { 'array': Var, + 'map': Map, } + +AVRO_TYPE_MAP = {} +AVRO_TYPE_MAP.update(PRIMITIVE_TYPES_MAP) +AVRO_TYPE_MAP.update(NAMED_TYPES_MAP) +AVRO_TYPE_MAP.update(COMPOUND_TYPES_MAP) + + class AVRO(object): """Wrapper object for reading and writing an Avro container file @@ -248,7 +265,6 @@ def convert_iterator_to_temporary_avro(data, schema=None, **kwargs): avro = Temp(AVRO)(fn, schema, **kwargs) return append(avro, data, **kwargs) - @convert.register(Iterator, AVRO, cost=1.0) def avro_to_iterator(s, **kwargs): return iter(s) From 58d4e6e7bbdb9e82e092b3e28cd29c130c7e2efe Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Wed, 30 Dec 2015 09:57:51 -0500 Subject: [PATCH 19/29] Adding dshape to avro schema generation, with doctest --- odo/backends/avro.py | 109 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 108 insertions(+), 1 deletion(-) diff --git a/odo/backends/avro.py b/odo/backends/avro.py index 5ddc4af24..e1c59bff2 100644 --- a/odo/backends/avro.py +++ b/odo/backends/avro.py @@ -8,8 +8,9 @@ from avro.schema import AvroException from multipledispatch import dispatch import pandas as pd -from datashape import discover, var, Record, Map, Var, \ +from datashape import dshape, discover, var, Record, Map, Var, \ Option, null, string, int8, int32, int64, float64, float32, boolean, bytes_ +import datashape.coretypes as ct from collections import Iterator from ..append import append from ..convert import convert @@ -45,6 +46,19 @@ AVRO_TYPE_MAP.update(NAMED_TYPES_MAP) AVRO_TYPE_MAP.update(COMPOUND_TYPES_MAP) +dshape_to_avro_primitive_types = { + ct.int8: 'bytes', + ct.int16: 'int', + ct.int32: 'int', + ct.int64: 'long', + ct.float32: 'float', + ct.float64: 'double', + ct.date_: 'long', + ct.datetime_: 'long', + ct.string: 'string', + ct.bool_: 'boolean' +} + class AVRO(object): """Wrapper object for reading and writing an Avro container file @@ -250,6 +264,99 @@ def discover_schema(sch): def discover_avro(f, **kwargs): return discover_schema(f.schema) +def make_avsc_object(ds, name="name", namespace="default", depth=0): + """ + Build Avro Schema from datashape definition + + Parameters + ---------- + ds : str, unicode, DataShape, or datashapes.coretypes.* + : string -- applied to named schema elements (i.e. record, error, fixed, enum) + : string -- applied to named schema elements + depth=0: Tracking parameter for recursion depth. Should not be set by user. + + Examples + -------- + >>> from pprint import pprint + >>> pprint( + ... make_avsc_object( + ... "var * {letter: string, value: ?int32}", name="my_record", namespace="com.blaze.odo" + ... ) + ... ) + {'fields': [{'name': 'letter', 'type': 'string'}, + {'name': 'value', 'type': ['null', 'int']}], + 'name': 'my_record', + 'namespace': 'com.blaze.odo', + 'type': 'record'} + >>> test_dshape = ''' + ... var * { + ... field_1: int32, + ... field_2: string, + ... field_3: ?int64, + ... features: map[string, float64], + ... words: var * string, + ... nested_record: var * {field_1: int64, field_2: float32} + ... } + ... ''' + >>> pprint(make_avsc_object(test_dshape, name="my_record", namespace="com.blaze.odo")) + {'fields': [{'name': 'field_1', 'type': 'int'}, + {'name': 'field_2', 'type': 'string'}, + {'name': 'field_3', 'type': ['null', 'long']}, + {'name': 'features', 'type': {'type': 'map', 'values': 'double'}}, + {'name': 'words', 'type': {'items': 'string', 'type': 'array'}}, + {'name': 'nested_record', + 'type': {'items': {'fields': [{'name': 'field_1', + 'type': 'long'}, + {'name': 'field_2', + 'type': 'float'}], + 'name': 'my_recordd0d1', + 'namespace': 'com.blaze.odo', + 'type': 'record'}, + 'type': 'array'}}], + 'name': 'my_record', + 'namespace': 'com.blaze.odo', + 'type': 'record'} + """ + + try: + assert depth >= 0 + except AssertionError: + raise ValueError("depth argument must be >= 0") + + #parse string to datashape object if necessary + if isinstance(ds, (str, unicode)): + ds = dshape(ds) + if isinstance(ds, ct.DataShape): + if depth>0: + assert isinstance(ds.parameters[0], ct.Var), "Cannot support fixed length substructure in Avro schemas" + return {"type": "array", "items": make_avsc_object(ds.measure, name=name+"d%d" % depth, namespace=namespace, depth=depth+1)} + elif depth==0: + ds = ds.measure + + if isinstance(ds, ct.Record): + return { + "type": "record", + "namespace": namespace, + "name": name, + "fields": [{"type": make_avsc_object(typ, name=name+"d%d" % depth, namespace=namespace, depth=depth+1), + "name": n} for (typ, n) in zip(ds.measure.types, ds.measure.names)] + + } + if isinstance(ds, ct.Map): + assert ds.key == ct.string, "Avro map types only support string keys. Cannot form map with key type %s" % ds.key + return { + "type": "map", + "values": make_avsc_object(ds.value, name=name+"d%d" % depth, namespace=namespace, depth=depth+1) + } + + if isinstance(ds, ct.Option): + return ["null", make_avsc_object(ds.ty, name=name+"d%d" % depth, namespace=namespace, depth=depth+1)] + if ds in dshape_to_avro_primitive_types: + return dshape_to_avro_primitive_types[ds] + + raise NotImplementedError("No avro type known for %s" % ds) + + @convert.register(pd.DataFrame, AVRO, cost=4.0) def avro_to_DataFrame(avro, dshape=None, **kwargs): #XXX:AEH:todo - correct for pandas automated type conversions. e.g. strings containing numbers get cast to numeric. From 64753e2dfdd95d1241ca7f4bfed54fa918d8f654 Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Mon, 4 Jan 2016 14:56:09 -0500 Subject: [PATCH 20/29] Bumping datashape version to fix build --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 16078a1b1..71f55a737 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -datashape >= 0.4.6 +datashape >= 0.5.0 numpy >= 1.7 pandas >= 0.15.0 toolz >= 0.7.4 From 5c2d082d0f2afc2da025899ade0aad74fe5ec52c Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Thu, 28 Jan 2016 22:09:50 -0500 Subject: [PATCH 21/29] Attempt to fix build for python3 --- .travis.yml | 4 +++- conda.recipe/meta.yaml | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 3da61f5fe..be7ab7e64 100644 --- a/.travis.yml +++ b/.travis.yml @@ -59,9 +59,11 @@ install: # Install various deps - conda uninstall toolz - - pip install -U toolz sas7bdat psycopg2 dill 'pymongo<3' avro + - pip install -U toolz sas7bdat psycopg2 dill 'pymongo<3' - pip install --upgrade git+git://github.com/blaze/dask.git#egg=dask-dev[complete] - if [ -n "$PANDAS_VERSION" ]; then pip install $PANDAS_VERSION; fi + - if [[ $TRAVIS_PYTHON_VERSION == '3.4' || $TRAVIS_PYTHON_VERSION == '3.5' ]]; then pip install -U avro-python3; fi + - if [[ $TRAVIS_PYTHON_VERSION == '2.7']]; then pip install -U avro; fi # install pyspark - if [[ $TRAVIS_PYTHON_VERSION == '2.7' || $TRAVIS_PYTHON_VERSION == '3.4' ]]; then conda install spark=$SPARK_VERSION -c blaze -c https://conda.binstar.org/blaze/channel/dev -c anaconda-cluster; fi diff --git a/conda.recipe/meta.yaml b/conda.recipe/meta.yaml index 351f9cc2c..22d84117f 100644 --- a/conda.recipe/meta.yaml +++ b/conda.recipe/meta.yaml @@ -32,6 +32,7 @@ requirements: test: requires: - avro # [py27] + - avro-python3 # [not py27] - pytest - h5py - pytables >=3.0.0 From 78a591ffb4604c51f21e96fe994683ff75eec1d4 Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Thu, 28 Jan 2016 22:19:32 -0500 Subject: [PATCH 22/29] Whitespace error. --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index be7ab7e64..4d7233d0a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -63,7 +63,7 @@ install: - pip install --upgrade git+git://github.com/blaze/dask.git#egg=dask-dev[complete] - if [ -n "$PANDAS_VERSION" ]; then pip install $PANDAS_VERSION; fi - if [[ $TRAVIS_PYTHON_VERSION == '3.4' || $TRAVIS_PYTHON_VERSION == '3.5' ]]; then pip install -U avro-python3; fi - - if [[ $TRAVIS_PYTHON_VERSION == '2.7']]; then pip install -U avro; fi + - if [[ $TRAVIS_PYTHON_VERSION == '2.7' ]]; then pip install -U avro; fi # install pyspark - if [[ $TRAVIS_PYTHON_VERSION == '2.7' || $TRAVIS_PYTHON_VERSION == '3.4' ]]; then conda install spark=$SPARK_VERSION -c blaze -c https://conda.binstar.org/blaze/channel/dev -c anaconda-cluster; fi From ee5e6f384fb809decbcf709aaf74df072a99d067 Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Fri, 29 Jan 2016 00:39:17 -0500 Subject: [PATCH 23/29] Python 3 support * Switching to fastavro for reads, because its faster and Python2/3 compatible. * Introducing try blocks around API calls that are different in avro-python3 * Stuck with avro-python3 library for writing because fastavro does not support appending yet. --- .travis.yml | 2 +- odo/backends/avro.py | 44 +++++++++++++++++++++++---------- odo/backends/tests/test_avro.py | 9 +++++-- recommended-requirements.txt | 1 + 4 files changed, 40 insertions(+), 16 deletions(-) diff --git a/.travis.yml b/.travis.yml index 4d7233d0a..88d80e98e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -59,7 +59,7 @@ install: # Install various deps - conda uninstall toolz - - pip install -U toolz sas7bdat psycopg2 dill 'pymongo<3' + - pip install -U toolz sas7bdat psycopg2 dill 'pymongo<3' fastavro - pip install --upgrade git+git://github.com/blaze/dask.git#egg=dask-dev[complete] - if [ -n "$PANDAS_VERSION" ]; then pip install $PANDAS_VERSION; fi - if [[ $TRAVIS_PYTHON_VERSION == '3.4' || $TRAVIS_PYTHON_VERSION == '3.5' ]]; then pip install -U avro-python3; fi diff --git a/odo/backends/avro.py b/odo/backends/avro.py index e1c59bff2..3b0956dec 100644 --- a/odo/backends/avro.py +++ b/odo/backends/avro.py @@ -3,7 +3,8 @@ import errno import os import uuid - +import json +import fastavro from avro import schema, datafile, io from avro.schema import AvroException from multipledispatch import dispatch @@ -17,6 +18,11 @@ from ..resource import resource from ..temp import Temp +try: + from avro.schema import make_avsc_object as schema_from_dict #Python 2.x +except ImportError: + from avro.schema import SchemaFromJSONData as schema_from_dict #Python 3.x + PRIMITIVE_TYPES_MAP = { 'string': string, 'int': int32, @@ -91,7 +97,7 @@ def __init__(self, uri, schema=None, codec='null', **kwargs): self._schema = sch def __iter__(self): - return self.reader + return self.reader.__iter__() def next(self): return self.reader.next() @@ -110,7 +116,7 @@ def _get_writers_schema(self): Extract writers schema embedded in an existing Avro file. """ reader = self.reader - return schema.parse(self.reader.meta['avro.schema']) if reader else None + return schema_from_dict(self.reader.schema) if reader else None uri = property(lambda self: self._uri) codec = property(lambda self: self._codec) @@ -124,11 +130,10 @@ def reader(self): return self._reader else: try: - rec_reader = io.DatumReader(readers_schema=self.schema) - - df_reader = datafile.DataFileReader( + reader_schema = self.schema.to_json() if self.schema else None + df_reader = fastavro.reader( open(self.uri, 'rb'), - rec_reader + reader_schema=reader_schema ) return df_reader @@ -168,7 +173,12 @@ def _get_append_writer(uri, writers_schema=None): rec_writer ) #Check for embedded schema to ensure existing file is an avro file. - embedded_schema = schema.parse(df_writer.get_meta('avro.schema')) + try: #Python 2.x API + schema_str = df_writer.get_meta('avro.schema') + except AttributeError: #Python 3.x API + schema_str = df_writer.GetMeta('avro.schema').decode("utf-8") + + embedded_schema = schema_from_dict(json.loads(schema_str)) #If writers_schema supplied, check for equality with embedded schema. if writers_schema: @@ -198,11 +208,19 @@ def _get_new_writer(uri, sch): avro.datafile.DataFileWriter """ rec_writer = io.DatumWriter() - df_writer = datafile.DataFileWriter( - open(uri, 'wb'), - rec_writer, - writers_schema = sch - ) + try: #Python 2.x API + df_writer = datafile.DataFileWriter( + open(uri, 'wb'), + rec_writer, + writers_schema = sch + ) + except TypeError: #Python 3.x API + df_writer = datafile.DataFileWriter( + open(uri, 'wb'), + rec_writer, + writer_schema = sch + ) + return df_writer @property diff --git a/odo/backends/tests/test_avro.py b/odo/backends/tests/test_avro.py index c2a8c17ab..f18ac9072 100644 --- a/odo/backends/tests/test_avro.py +++ b/odo/backends/tests/test_avro.py @@ -1,7 +1,12 @@ from __future__ import absolute_import, division, print_function import pytest -schema = pytest.importorskip('avro.schema') + +import avro.schema as schema +try: + from avro.schema import parse #Python 2.x +except ImportError: + from avro.schema import Parse as parse #Python 3.x from collections import Iterator import pandas as pd @@ -150,7 +155,7 @@ def test_require_schema_for_new_file(): assert False def test_append_and_convert_round_trip(temp_output_path): - x = AVRO(temp_output_path, schema=schema.parse(test_schema_str)) + x = AVRO(temp_output_path, schema=parse(test_schema_str)) append(x, test_data) append(x, test_data) assert convert(list, x) == test_data * 2 diff --git a/recommended-requirements.txt b/recommended-requirements.txt index 29b0e5926..cca435404 100644 --- a/recommended-requirements.txt +++ b/recommended-requirements.txt @@ -13,3 +13,4 @@ paramiko pywebhdfs boto avro +fastavro From ff32e9ec84b89cb1c281ebc014758f10b332a9ef Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Fri, 29 Jan 2016 00:56:07 -0500 Subject: [PATCH 24/29] One more python3 bug I missed. --- odo/backends/avro.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/odo/backends/avro.py b/odo/backends/avro.py index 3b0956dec..29fde63cd 100644 --- a/odo/backends/avro.py +++ b/odo/backends/avro.py @@ -5,6 +5,7 @@ import uuid import json import fastavro +import six from avro import schema, datafile, io from avro.schema import AvroException from multipledispatch import dispatch @@ -342,7 +343,7 @@ def make_avsc_object(ds, name="name", namespace="default", depth=0): raise ValueError("depth argument must be >= 0") #parse string to datashape object if necessary - if isinstance(ds, (str, unicode)): + if isinstance(ds, six.string_types) or isinstance(ds, six.text_type): ds = dshape(ds) if isinstance(ds, ct.DataShape): if depth>0: From cbec204b1430b6100c7b44a29155f8683a394bf3 Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Fri, 29 Jan 2016 09:35:23 -0500 Subject: [PATCH 25/29] Changing doctest not to be format sensitive. --- odo/backends/avro.py | 56 +++++++++++++++++++++----------------------- 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/odo/backends/avro.py b/odo/backends/avro.py index 29fde63cd..60e77b544 100644 --- a/odo/backends/avro.py +++ b/odo/backends/avro.py @@ -296,17 +296,14 @@ def make_avsc_object(ds, name="name", namespace="default", depth=0): Examples -------- - >>> from pprint import pprint - >>> pprint( - ... make_avsc_object( - ... "var * {letter: string, value: ?int32}", name="my_record", namespace="com.blaze.odo" - ... ) - ... ) - {'fields': [{'name': 'letter', 'type': 'string'}, - {'name': 'value', 'type': ['null', 'int']}], - 'name': 'my_record', - 'namespace': 'com.blaze.odo', - 'type': 'record'} + >>> test_dshape = "var * {letter: string, value: ?int32}" + >>> x = make_avsc_object(test_dshape, name="my_record", namespace="com.blaze.odo") + >>> x == {'fields': [{'name': 'letter', 'type': 'string'}, + ... {'name': 'value', 'type': ['null', 'int']}], + ... 'name': 'my_record', + ... 'namespace': 'com.blaze.odo', + ... 'type': 'record'} + True >>> test_dshape = ''' ... var * { ... field_1: int32, @@ -317,24 +314,25 @@ def make_avsc_object(ds, name="name", namespace="default", depth=0): ... nested_record: var * {field_1: int64, field_2: float32} ... } ... ''' - >>> pprint(make_avsc_object(test_dshape, name="my_record", namespace="com.blaze.odo")) - {'fields': [{'name': 'field_1', 'type': 'int'}, - {'name': 'field_2', 'type': 'string'}, - {'name': 'field_3', 'type': ['null', 'long']}, - {'name': 'features', 'type': {'type': 'map', 'values': 'double'}}, - {'name': 'words', 'type': {'items': 'string', 'type': 'array'}}, - {'name': 'nested_record', - 'type': {'items': {'fields': [{'name': 'field_1', - 'type': 'long'}, - {'name': 'field_2', - 'type': 'float'}], - 'name': 'my_recordd0d1', - 'namespace': 'com.blaze.odo', - 'type': 'record'}, - 'type': 'array'}}], - 'name': 'my_record', - 'namespace': 'com.blaze.odo', - 'type': 'record'} + >>> x = make_avsc_object(test_dshape, name="my_record", namespace="com.blaze.odo") + >>> x == {'fields': [{'name': 'field_1', 'type': 'int'}, + ... {'name': 'field_2', 'type': 'string'}, + ... {'name': 'field_3', 'type': ['null', 'long']}, + ... {'name': 'features', 'type': {'type': 'map', 'values': 'double'}}, + ... {'name': 'words', 'type': {'items': 'string', 'type': 'array'}}, + ... {'name': 'nested_record', + ... 'type': {'items': {'fields': [{'name': 'field_1', + ... 'type': 'long'}, + ... {'name': 'field_2', + ... 'type': 'float'}], + ... 'name': 'my_recordd0d1', + ... 'namespace': 'com.blaze.odo', + ... 'type': 'record'}, + ... 'type': 'array'}}], + ... 'name': 'my_record', + ... 'namespace': 'com.blaze.odo', + ... 'type': 'record'} + True """ try: From 060c2b4c89a97661604c9bb1362d3a97a349caa9 Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Tue, 2 Feb 2016 18:48:48 -0500 Subject: [PATCH 26/29] Updating avro type mappings --- odo/backends/avro.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/odo/backends/avro.py b/odo/backends/avro.py index 60e77b544..e3222433d 100644 --- a/odo/backends/avro.py +++ b/odo/backends/avro.py @@ -25,14 +25,14 @@ from avro.schema import SchemaFromJSONData as schema_from_dict #Python 3.x PRIMITIVE_TYPES_MAP = { + 'null': null, + 'boolean': boolean, 'string': string, + 'bytes': int8, 'int': int32, 'long': int64, - 'null': null, 'double': float64, 'float': float32, - 'boolean': boolean, - 'record': Record, } NAMED_TYPES_MAP = { @@ -45,6 +45,9 @@ COMPOUND_TYPES_MAP = { 'array': Var, 'map': Map, + 'union': null, #TODO + 'request': null, #TODO + 'error_union': null, #TODO } From 32900b9bf55b885e6074b1ff400edbb7becf7f15 Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Tue, 2 Feb 2016 18:51:18 -0500 Subject: [PATCH 27/29] Changing AVRO.uri attribute to AVRO.path, and allowing codec keyword argument to be passed to resource. --- odo/backends/avro.py | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/odo/backends/avro.py b/odo/backends/avro.py index e3222433d..c6835e5f5 100644 --- a/odo/backends/avro.py +++ b/odo/backends/avro.py @@ -89,8 +89,8 @@ class AVRO(object): compression codec. Valid values: 'null', 'deflate', 'snappy' """ - def __init__(self, uri, schema=None, codec='null', **kwargs): - self._uri = uri + def __init__(self, path, schema=None, codec='null', **kwargs): + self._path = path self._schema = schema self._codec = codec @@ -122,7 +122,7 @@ def _get_writers_schema(self): reader = self.reader return schema_from_dict(self.reader.schema) if reader else None - uri = property(lambda self: self._uri) + path = property(lambda self: self._path) codec = property(lambda self: self._codec) schema = property(lambda self: self._schema) @@ -136,7 +136,7 @@ def reader(self): try: reader_schema = self.schema.to_json() if self.schema else None df_reader = fastavro.reader( - open(self.uri, 'rb'), + open(self.path, 'rb'), reader_schema=reader_schema ) @@ -150,7 +150,7 @@ def reader(self): return None @staticmethod - def _get_append_writer(uri, writers_schema=None): + def _get_append_writer(uri, writers_schema=None, codec='null'): """ Returns an isntance of avro.datafile.DataFileWriter for appending to an existing avro file at `uri`. Does not take a writers schema, @@ -174,7 +174,8 @@ def _get_append_writer(uri, writers_schema=None): rec_writer = io.DatumWriter() df_writer = datafile.DataFileWriter( open(uri, 'a+b'), - rec_writer + rec_writer, + codec=codec ) #Check for embedded schema to ensure existing file is an avro file. try: #Python 2.x API @@ -194,7 +195,7 @@ def _get_append_writer(uri, writers_schema=None): return df_writer @staticmethod - def _get_new_writer(uri, sch): + def _get_new_writer(uri, sch, codec='null'): """ Returns an isntance of avro.datafile.DataFileWriter for writing to a new avro file at `uri`. @@ -216,13 +217,15 @@ def _get_new_writer(uri, sch): df_writer = datafile.DataFileWriter( open(uri, 'wb'), rec_writer, - writers_schema = sch + writers_schema = sch, + codec=codec ) except TypeError: #Python 3.x API df_writer = datafile.DataFileWriter( open(uri, 'wb'), rec_writer, - writer_schema = sch + writer_schema = sch, + codec=codec ) return df_writer @@ -232,10 +235,10 @@ def writer(self): if hasattr(self, '_writer'): return self._writer else: - if os.path.exists(self.uri) and os.path.getsize(self.uri) > 0: - df_writer = self._get_append_writer(self.uri, self.schema) + if os.path.exists(self.path) and os.path.getsize(self.path) > 0: + df_writer = self._get_append_writer(self.path, self.schema, codec=self.codec) else: - df_writer = self._get_new_writer(self.uri, self.schema) + df_writer = self._get_new_writer(self.path, self.schema, codec=self.codec) self._writer = df_writer return df_writer @@ -244,10 +247,9 @@ def flush(self): self._writer.close() del(self._writer) - @resource.register('.+\.(avro)') -def resource_avro(uri, schema=None, **kwargs): - return AVRO(uri, schema=schema, **kwargs) +def resource_avro(uri, schema=None, codec='null', **kwargs): + return AVRO(uri, schema=schema, codec=codec, **kwargs) @dispatch(schema.RecordSchema) def discover_schema(sch): From 95724cf07ffbf2e45c3eea586029549602948afc Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Tue, 2 Feb 2016 18:53:54 -0500 Subject: [PATCH 28/29] Dropping requirement for schema to be defined for new AVRO resource on initialization --- odo/backends/avro.py | 18 ++++++++++-------- odo/backends/tests/test_avro.py | 5 +++-- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/odo/backends/avro.py b/odo/backends/avro.py index c6835e5f5..7897458f5 100644 --- a/odo/backends/avro.py +++ b/odo/backends/avro.py @@ -94,12 +94,6 @@ def __init__(self, path, schema=None, codec='null', **kwargs): self._schema = schema self._codec = codec - if not schema: - sch = self._get_writers_schema() - if sch is None: - raise AvroException("Couldn't extract writers schema from '{0}'. User must provide a valid schema".format(uri)) - self._schema = sch - def __iter__(self): return self.reader.__iter__() @@ -124,7 +118,15 @@ def _get_writers_schema(self): path = property(lambda self: self._path) codec = property(lambda self: self._codec) - schema = property(lambda self: self._schema) + + @property + def schema(self): + if not self._schema: + sch = self._get_writers_schema() + if sch is None: + raise AvroException("Couldn't extract writers schema from '{0}'. User must provide a valid schema".format(self.path)) + self._schema = sch + return self._schema @property def reader(self): @@ -134,7 +136,7 @@ def reader(self): return self._reader else: try: - reader_schema = self.schema.to_json() if self.schema else None + reader_schema = self._schema.to_json() if self._schema else None df_reader = fastavro.reader( open(self.path, 'rb'), reader_schema=reader_schema diff --git a/odo/backends/tests/test_avro.py b/odo/backends/tests/test_avro.py index f18ac9072..7b59f72f5 100644 --- a/odo/backends/tests/test_avro.py +++ b/odo/backends/tests/test_avro.py @@ -148,11 +148,12 @@ def test_convert_avro_to_iterator(avrofile): def test_require_schema_for_new_file(): try: x = AVRO("doesntexist.avro") + sch = x.schema assert False, "Previous line should throw an schema.AvroException" except schema.AvroException: assert True - except Exception: - assert False + except Exception as e: + assert False, "Expected AvroException, got exception of type %s" % str(type(e)) def test_append_and_convert_round_trip(temp_output_path): x = AVRO(temp_output_path, schema=parse(test_schema_str)) From 264fca2ed40f743ecf8aee7622c01951890c2d85 Mon Sep 17 00:00:00 2001 From: Alex Hasha Date: Mon, 27 Jun 2016 09:25:01 -0400 Subject: [PATCH 29/29] Adding fastavro to meta.yaml --- conda.recipe/meta.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/conda.recipe/meta.yaml b/conda.recipe/meta.yaml index 7abe675e6..75dbb8b34 100644 --- a/conda.recipe/meta.yaml +++ b/conda.recipe/meta.yaml @@ -33,6 +33,7 @@ test: requires: - avro # [py27] - avro-python3 # [not py27] + - fastavro - pytest - h5py - pytables >=3.0.0