diff --git a/CHANGES.rst b/CHANGES.rst index ffd70ad..ef94be0 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -2,6 +2,38 @@ Changelog --------- +2.5.x (unreleased) +~~~~~~~~~~~~~~~~~~ + +* Simplify signature of :meth:`Model.search`: arguments ``order``, + ``limit`` and ``offset`` become keyword-only, and undocumented + argument ``reverse`` is abandoned. + +* New: :meth:`Model.search` returns a :class:`RecordList` which is + lazily evaluated. API method is called only when needed: if attributes + are read or methods are called. It will use `search_read` API method + when it's adequate. + +* New: extracting a part of a lazy :class:`RecordList` will not call + API method, for simple use cases like + ``env['account.move'].search([])[10:90]``. It will set + ``offset`` and ``limit`` on the prepared search instead. + +* Remove undocumented :meth:`Env._web`. + +* Refactor code for ``read`` field formatter. + +* Refactor :class:`Record` and :class:`RecordList` constructors: + get rid of ``__new__`` and simplify. + +* Add private method :meth:`Record._invalidate_cache` to implement + :meth:`Record.refresh`. + +* Property :attr:`Client.verbose` to enable/disable logging. + +* Support ``odooly https://demo.odoo.com`` for simplicity. + + 2.4.7 (2025-11-07) ~~~~~~~~~~~~~~~~~~ diff --git a/README.rst b/README.rst index 43a4f12..a2d86f4 100644 --- a/README.rst +++ b/README.rst @@ -117,7 +117,7 @@ This is a sample session:: It's also possible to set verbosity from the interactive prompt:: - >>> client._printer.cols = 180 + >>> client.verbose = 180 # Width in columns .. note:: diff --git a/odooly.py b/odooly.py index 1b05019..bfee9fa 100644 --- a/odooly.py +++ b/odooly.py @@ -218,14 +218,6 @@ def literal_eval(expression, _octal_digits=frozenset('01234567')): return value -def is_list_of_dict(iterator): - """Return True if the first non-false item is a dict.""" - for item in iterator: - if item: - return isinstance(item, dict) - return False - - def format_params(params, hide=('passw', 'pwd')): secret = {key: ... for key in params if any(sub in key for sub in hide)} return [f'{key}={v!r}' if v != ... else f'{key}=*' @@ -398,21 +390,6 @@ def searchargs(params, kwargs=None): return params -def readfmt(arg): - if '}' in arg: - fields = [re.match(r'\w+', tup[1]).group(0) - for tup in Formatter().parse(arg) if tup[1]] - formatter = arg.format_map - elif '%(' in arg: - fields = re.findall(r'(? ("zip", "city") - fields = arg.split() - formatter = (lambda d: d[fields[0]]) if len(fields) == 1 else None - return fields, formatter - - def parse_http_response(method, result, regex): if method == 'HEAD': return result.url @@ -437,15 +414,13 @@ class ServerError(Exception): class Printer: - def __init__(self, cols): - self.cols = MAXCOL[min(3, cols) - 1] if (cols or 9) < 9 else cols or None + cols = None def _print_(self, message, _prefix): - cols = max(36, self.cols) xch = str(message) - if len(xch) > cols: + if len(xch) > self.cols: suffix = f"... L={len(xch)}" - xch = xch[:cols - len(suffix)] + suffix + xch = xch[:self.cols - len(suffix)] + suffix print(f"{_prefix} {xch}") print_sent = functools.partialmethod(_print_, _prefix='-->') @@ -660,7 +635,6 @@ def __new__(cls, client, db_name=()): if db_name: env._model_names = env._cache_get('model_names', dict) env._models = {} - env._web = client.web return env def __contains__(self, name): @@ -768,7 +742,7 @@ def _configure(self, uid, user, password, api_key, context, session): if isinstance(user, Record): user = user.login env.uid = uid - env.user = env._get('res.users', False).browse(uid) + env.user = Record(env._get('res.users', False), uid) env.context = dict(context) env.session_info = session if user: @@ -814,7 +788,7 @@ def __call__(self, user=None, password=None, api_key=None, context=None): if user is not None: (uid, password, session) = self._check_user_password(user, password, api_key) if context is None: - context = session['user_context'] + context = session.get('user_context') or {} elif context is not None: (uid, user, session) = (self.uid, self.user, self.session_info) else: @@ -843,7 +817,7 @@ def ref(self, xml_id): [('module', '=', module), ('name', '=', name)], 'model res_id') if data: assert len(data) == 1 - return self[data[0]['model']].browse(data[0]['res_id']) + return Record(self[data[0]['model']], data[0]['res_id']) @property def lang(self): @@ -998,11 +972,7 @@ def models(self, name='', transient=False): fld_transient = 'transient' if 'transient' in ir_model._keys else 'osv_memory' domain = [('abstract', '=', False)] if 'abstract' in ir_model._keys else [] # Odoo 19 try: - if self.client.version_info < 8.0: - recs = ir_model.search(domain) - models = ir_model.read(recs.ids, ('model', fld_transient)) - else: - models = ir_model.search_read(domain, ('model', fld_transient)) + models = ir_model.search_read(domain, ('model', fld_transient)) except ServerError: # Only Odoo 15 prevents non-admin user to retrieve models models = ir_model.get_available_models() if self.client.version_info >= 16.0 else {} @@ -1178,7 +1148,7 @@ def generate_api_key(self): key_vals = {'name': f'Created by Odooly {__version__}'} wiz = self["res.users.apikeys.description"].create(key_vals) res = wiz.make_key() - self.user.refresh() + self.user._invalidate_cache() assert res['res_model'] == "res.users.apikeys.show" return self.set_api_key(res['context']['default_key']) @@ -1202,19 +1172,29 @@ class Client: def __init__(self, server, db=None, user=None, password=None, api_key=None, transport=None, verbose=False): self._http = HTTPSession() + self._printer = Printer() self._session_uid = None - self._set_services(server, db, transport, verbose) + self.verbose = verbose + self._set_services(server, db, transport) self.env = Env(self) if user: # Try to login self.login(user, password=password, api_key=api_key, database=db) - def _set_services(self, server, db, transport, cols): + @property + def verbose(self): + return self._printer.cols + + @verbose.setter + def verbose(self, cols): + cols = MAXCOL[min(3, cols) - 1] if (cols or 9) < 9 else cols + self._printer.cols = cols and max(36, cols) or None + + def _set_services(self, server, db, transport): if isinstance(server, list): appname = Path(__file__).name.rstrip('co') server = start_odoo_services(server, appname=appname) elif isinstance(server, str) and server[-1:] == '/': server = server.rstrip('/') - self._printer = Printer(cols) self._server = server self._connections = [] @@ -1357,7 +1337,7 @@ def from_config(cls, environment, user=None, verbose=False): password = None try: client = Env._cache[Env, db, server].client - client._printer.__init__(verbose) + client.verbose = verbose client.login(user or conf_user, password=password, api_key=api_key) except KeyError: client = cls(server, db, user or conf_user, password=password, api_key=api_key, verbose=verbose) @@ -1424,7 +1404,7 @@ def _authenticate_web(self, **kw): # 5. Submit TOTP params = {'csrf_token': csrf, 'totp_token': token, 'remember': 1} rv, session_info = self._request_parse('/web/login/totp', data=params) - return session_info + return session_info if session_info.get('username') == kw['login'] else {'uid': None} def _authenticate_system(self): __, session_info = self._request_parse('/web/become') @@ -1486,22 +1466,22 @@ def login(self, user, password=None, database=None, api_key=None): if not self._is_interactive(): return self._login(user, password=password, database=database, api_key=api_key) try: - self._login(user, password=password, database=database, api_key=api_key) + register = self._login(user, password=password, database=database, api_key=api_key) except Error as exc: print(exc) - else: - # Register the new globals() - self.connect() + register = 'client' not in self._globals + # Register the globals() + register and self.connect() def connect(self, env_name=None, *, server=None, user=None): """Connect to another environment and replace the globals().""" assert self._is_interactive(), 'Not available' if env_name: - self.from_config(env_name, user=user, verbose=self._printer.cols) + self.from_config(env_name, user=user, verbose=self.verbose) elif server: if not user and self.env.uid: user = self.env.user.login - self.__class__(server, user=user, verbose=self._printer.cols) + self.__class__(server, user=user, verbose=self.verbose) else: assert not user, "Use client.login(...) instead" self._globals['client'] = self.env.client @@ -1602,8 +1582,6 @@ def drop_database(self, passwd, database): class BaseModel: - ids = () - def sudo(self, user=None): """Attach to the provided user, or Superuser.""" return self.with_env(self.env.sudo(user=user)) @@ -1693,18 +1671,28 @@ def browse(self, ids=()): If it is a single integer, the return value is a :class:`Record`. Otherwise, the return value is a :class:`RecordList`. """ - return BaseRecord(self, ids) + if isinstance(ids, int) or (len(ids) == 2 and isinstance(ids[1], str)): + return Record(self, ids) + return RecordList(self, ids) - def search(self, domain, *params, **kwargs): + def search(self, domain, **kwargs): """Search for records in the `domain`.""" - reverse = kwargs.pop('reverse', False) - ids = self._execute('search', domain, *params, **kwargs) - return RecordList(self, ids[::-1] if reverse else ids) + return RecordList._prepared(self, domain, kwargs) def search_count(self, domain=None): """Count the records in the `domain`.""" return self._execute('search_count', domain or []) + def search_read(self, domain=None, fields=None, **kwargs): + """Combine search and read.""" + fields, fmt = self._parse_format(fields, browse=False) + if self.env.client.version_info < 8.0: + ids = self._execute('search', domain or [], **kwargs) + res = self._execute('read', ids, fields) + else: + res = self._execute('search_read', domain or [], fields, **kwargs) + return fmt(res) + def get(self, domain, *args, **kwargs): """Return a single :class:`Record`. @@ -1747,7 +1735,7 @@ def create(self, values): else: # Odoo >= 12 values = [self._unbrowse_values(vals) for vals in values] new_ids = self._execute('create', values) - return Record(self, new_ids) + return self.browse(new_ids) def read(self, *params, **kwargs): """Wrapper for ``client.execute(model, 'read', [...], ('a', 'b'))``. @@ -1775,14 +1763,59 @@ def read(self, *params, **kwargs): The optional keyword arguments `offset`, `limit` and `order` are used to restrict the search. """ - fmt = None - if len(params) > 1 and isinstance(params[1], str): - fields, fmt = readfmt(params[1]) + arg = params[1] if len(params) > 1 else None + fields, fmt = self._parse_format(arg, browse=False) + if arg is not None: params = (params[0], fields) + params[2:] res = self._execute('read', *params, **kwargs) - if not fmt or not res: - return res - return [(d and fmt(d)) for d in res] if isinstance(res, list) else fmt(res) + return fmt(res) if isinstance(res, list) else fmt([res])[0] + + def _parse_format(self, arg, browse=True): + if not isinstance(arg, str): + fields, formatter = arg, None + elif '}' in arg: + fields = [re.match(r'\w+', tup[1]).group(0) + for tup in Formatter().parse(arg) if tup[1]] + formatter = arg.format_map + elif '%(' in arg: + fields, formatter = re.findall(r'(? ["zip", "city"] + fields = arg.split() + formatter = (lambda d: d[fields[0]]) if len(fields) == 1 else None + + lst_format = lambda values: [(val and formatter(val)) for val in values] + if browse: + if not formatter: + formatter = self._browse_values + elif fields == arg.split(): + fspec = self.field(fields[0]) + if 'relation' in fspec: + rel_model = self.env._get(fspec['relation'], False) + if fspec['type'] == 'many2one': + m_browse = partial(RecordList, rel_model) + else: + + def m_browse(values): + if not values: + return RecordList(rel_model, ()) + return [RecordList(rel_model, val or ()) for val in values] + + def lst_format(values): + return m_browse([(val and formatter(val)) for val in values]) + + elif fspec['type'] == 'reference': + + def formatter(dic): + value = dic[fields[0]] + if not value: + return value + (res_model, res_id) = value.split(',') + rel_model = self.env._get(res_model, False) + return Record(rel_model, int(res_id)) + elif not formatter: + lst_format = lambda values: values + + return fields, lst_format def _browse_values(self, values): """Wrap the values of a Record. @@ -1804,7 +1837,7 @@ def _browse_values(self, values): else: continue rel_model = self.env._get(res_model, False) - values[key] = BaseRecord(rel_model, value) + values[key] = rel_model.browse(value) return values def _unbrowse_values(self, values): @@ -1836,12 +1869,8 @@ def _get_external_ids(self, ids=None): search_domain = [('model', '=', self._name)] if ids is not None: search_domain.append(('res_id', 'in', ids)) - existing = self.env['ir.model.data'].read(search_domain, - ['module', 'name', 'res_id']) - res = {} - for rec in existing: - res[f"{rec['module']}.{rec['name']}"] = self.get(rec['res_id']) - return res + existing = self.env['ir.model.data'].read(search_domain, 'module name res_id') + return {f"{rec['module']}.{rec['name']}": self.get(rec['res_id']) for rec in existing} def __getattr__(self, attr): if attr == '_fields': @@ -1863,53 +1892,18 @@ def wrapper(self, *params, **kwargs): class BaseRecord(BaseModel): - def __new__(cls, res_model, arg): - if isinstance(arg, int): - inst = object.__new__(Record) - name = None - idnames = [arg] - ids = [arg] - elif len(arg) == 2 and isinstance(arg[1], str): - inst = object.__new__(Record) - (arg, name) = arg - idnames = [(arg, name)] - ids = [arg] - else: - inst = object.__new__(RecordList) - idnames = arg or () - ids = list(idnames) - for index, id_ in enumerate(arg): - if isinstance(id_, (list, tuple)): - ids[index] = id_ = id_[0] - assert isinstance(id_, int), repr(id_) - arg = ids - attrs = { - 'id': arg, - 'ids': ids, - 'env': res_model.env, - '_name': res_model._name, - '_model': res_model, - '_idnames': idnames, - '_execute': res_model._execute, - } - if isinstance(inst, Record): - attrs['_cached_keys'] = set() - if name is not None: - attrs['_Record__name'] = attrs['display_name'] = name - # Bypass the __setattr__ method - inst.__dict__.update(attrs) - return inst + def __init__(self, res_model, arg): + attrs = {'_name': res_model._name, '_model': res_model, + 'env': res_model.env, '_execute': res_model._execute} + # Bypass __setattr__ method + self.__dict__.update(attrs) def __repr__(self): - if len(self.ids) > 16: - ids = f'length={len(self.ids)}' - else: - ids = self.id + ids = f'length={len(self.ids)}' if len(self.ids) > 6 else self.id return f"<{self.__class__.__name__} '{self._name},{ids}'>" def __dir__(self): - attrs = set(self.__dict__) | set(self._model._keys) - return sorted(attrs) + return sorted(set(self.__dict__) | set(self._model._keys)) def __bool__(self): return bool(self.ids) @@ -1919,13 +1913,10 @@ def __len__(self): def __getitem__(self, key): idname = self._idnames[key] - if idname is False and not isinstance(key, slice): - return False - return BaseRecord(self._model, idname) + return self._model.browse(idname) if idname is not False else False def __iter__(self): - for idname in self._idnames: - yield BaseRecord(self._model, idname) + yield from (Record(self._model, idname) for idname in self._idnames) def __contains__(self, item): if isinstance(item, BaseRecord): @@ -1940,7 +1931,7 @@ def __sub__(self, other): other_ids = set(other.ids) ids = [idn for (id_, idn) in zip(self.ids, self._idnames) if id_ not in other_ids] - return BaseRecord(self._model, ids) + return RecordList(self._model, ids) def __and__(self, other): self._check_model(other, '&') @@ -1948,7 +1939,7 @@ def __and__(self, other): self_set = self.union() ids = [idn for (id_, idn) in zip(self_set.ids, self_set._idnames) if id_ in other_ids] - return BaseRecord(self._model, ids) + return RecordList(self._model, ids) def __or__(self, other): return self.union(other) @@ -1987,7 +1978,7 @@ def _keys(self): def _fields(self): return self._model._fields - def refresh(self): + def _invalidate_cache(self): pass def ensure_one(self): @@ -2013,7 +2004,7 @@ def exists(self): ids = self.ids and self._execute(method, arg, context={'active_test': False}) if ids and not isinstance(self.id, list): ids = ids[0] - return BaseRecord(self._model, ids) + return self._model.browse(ids) def get_metadata(self): """Read the metadata of the record(s) @@ -2042,7 +2033,7 @@ def _concat_ids(self, args): def concat(self, *args): """Return the concatenation of all records.""" ids = self._concat_ids(args) - return BaseRecord(self._model, ids) + return RecordList(self._model, ids) def union(self, *args): """Return the union of all records. @@ -2058,7 +2049,7 @@ def union(self, *args): if id_ not in seen and not seen.add(id_) and id_: uniqids.append((id_, name) if name else id_) ids = uniqids - return BaseRecord(self._model, ids) + return RecordList(self._model, ids) @classmethod def _union(cls, args): @@ -2102,7 +2093,7 @@ def filtered(self, func): return self & self._model.search([('id', 'in', self.ids)] + func) else: ids = self[:]._filter(func.split('.')) if func else self._idnames - return BaseRecord(self._model, ids) + return RecordList(self._model, ids) def sorted(self, key=None, reverse=False): """Return the records sorted by ``key``.""" @@ -2111,16 +2102,14 @@ def sorted(self, key=None, reverse=False): return recs if key is None: idnames = dict(zip(recs.ids, recs._idnames)) - recs = self._model.search([('id', 'in', recs.ids)], - reverse=reverse) + recs = self._model.search([('id', 'in', recs.ids)]) ids = [idnames[id_] for id_ in recs.ids] elif isinstance(key, str): - vals = sorted(zip(recs.read(key), recs._idnames), reverse=reverse) + vals = sorted(zip(recs.read(key), recs._idnames)) ids = [idn for (__, idn) in vals] else: - ids = [rec._idnames[0] - for rec in sorted(recs, key=key, reverse=reverse)] - return BaseRecord(self._model, ids) + ids = [rec._idnames[0] for rec in sorted(recs, key=key)] + return RecordList(self._model, ids[::-1] if reverse else ids) def write(self, values): """Write the `values` in the record(s). @@ -2131,17 +2120,15 @@ def write(self, values): if not self.id: return True values = self._model._unbrowse_values(values) - rv = self._execute('write', self.ids, values) - self.refresh() - return rv + self._invalidate_cache() + return self._execute('write', self.ids, values) def unlink(self): """Delete the record(s) from the database.""" if not self.id: return True - rv = self._execute('unlink', self.ids) - self.refresh() - return rv + self._invalidate_cache() + return self._execute('unlink', self.ids) class RecordList(BaseRecord): @@ -2155,38 +2142,77 @@ class RecordList(BaseRecord): to assign a single value to all the selected records. """ + def __init__(self, res_model, arg, search=None): + super().__init__(res_model, arg) + if search is None: + idnames = arg or () + ids = list(idnames) + for index, id_ in enumerate(arg): + if isinstance(id_, (list, tuple)): + ids[index] = id_ = id_[0] + assert isinstance(id_, int), repr(id_) + self.__dict__.update({'id': ids, 'ids': ids, '_idnames': idnames, '_search_args': None}) + else: + self.__dict__['_search_args'] = search + + @classmethod + def _prepared(cls, res_model, domain, params): + [domain] = searchargs((domain,)) + return cls(res_model, None, search={'domain': domain, **params}) + + def refresh(self): + """Reset :class:`RecordList` content.""" + if self._search_args: + for key in 'id', 'ids', '_idnames': + self.__dict__.pop(key, None) + + def __getitem__(self, key): + if 'id' in self.__dict__ or (getattr(key, 'start', -1) or 0) < 0: + return super().__getitem__(key) + is_stop_positive = key.stop is not None and key.stop >= 0 + search_args = {**self._search_args} + new_offset, new_stop = key.start or 0, search_args.get('limit', None) + if is_stop_positive: + new_stop = key.stop if new_stop is None else min(new_stop, key.stop) + if new_stop is not None: + search_args['limit'] = limit = new_stop - new_offset + if limit <= 0: + return RecordList(self._model, []) + if new_offset: + search_args['offset'] = (search_args.get('offset') or 0) + new_offset + result = RecordList(self._model, None, search=search_args) + if (is_stop_positive or key.stop is None) and key.step in (1, None): + return result + key = slice(None, limit if is_stop_positive else key.stop, key.step) + return super(RecordList, result).__getitem__(key) + + def with_env(self, env): + if 'id' in self.__dict__: + return super().with_env(env) + return RecordList(env[self._name], None, {**self._search_args}) + def read(self, fields=None): """Read the `fields` of the :class:`RecordList`. The argument `fields` accepts different kinds of values. See :meth:`Model.read` for details. """ - if self.id: - values = self._model.read(self.id, fields, order=True) - if is_list_of_dict(values): - browse_values = self._model._browse_values - return [v and browse_values(v) for v in values] + fields, fmt = self._model._parse_format(fields) + + if fields == ['id']: + values = [{'id': res_id} for res_id in self.ids] + elif 'id' not in self.__dict__: + params = {**self._search_args} + domain = params.pop('domain') + values = self._model.search_read(domain, fields, **params) + ids = idnames = [val['id'] for val in values] + if values and 'display_name' in values[0]: + idnames = [(val['id'], val['display_name']) for val in values] + self.__dict__.update({'id': ids, 'ids': ids, '_idnames': idnames}) else: - values = [] - - if isinstance(fields, str): - field = self._model._fields.get(fields) - if field: - if 'relation' in field: - rel_model = self.env._get(field['relation'], False) - if not values or field['type'] == 'many2one': - return RecordList(rel_model, values) - return [RecordList(rel_model, v) for v in values] - if field['type'] == 'reference': - records = [] - for value in values: - if value: - (res_model, res_id) = value.split(',') - rel_model = self.env._get(res_model, False) - value = Record(rel_model, int(res_id)) - records.append(value) - return records - return values + values = self._model.read(self.id, fields, order=True) if self.id else [] + + return fmt(values) def copy(self, default=None): """Copy records and return :class:`RecordList`. @@ -2196,8 +2222,7 @@ def copy(self, default=None): Supported since Odoo 18. """ - if default: - default = self._model._unbrowse_values(default) + default = default and self._model._unbrowse_values(default) new_ids = self._execute('copy', self.ids, default) return RecordList(self._model, new_ids) @@ -2214,6 +2239,11 @@ def _external_id(self): return [xml_ids.get(res_id, False) for res_id in self.id] def __getattr__(self, attr): + if attr in ('id', 'ids', '_idnames'): + params = {**self._search_args} + ids = self._execute('search', params.pop('domain'), **params) + self.__dict__.update({'id': ids, 'ids': ids, '_idnames': ids}) + return self.__dict__[attr] if attr in self._model._keys: return self.read(attr) if attr.startswith('_'): @@ -2246,6 +2276,17 @@ class Record(BaseRecord): The Record's cache is invalidated if any attribute is changed. """ + def __init__(self, res_model, arg): + super().__init__(res_model, arg) + if isinstance(arg, int): + name, idnames = None, [arg] + else: + idnames = [(arg, name)] = [arg] + attrs = {'id': arg, 'ids': [arg], '_idnames': idnames, '_cached_keys': set()} + if name is not None: + attrs['_Record__name'] = attrs['display_name'] = name + self.__dict__.update(attrs) + def __str__(self): return self.__name if self.id else 'False' @@ -2262,6 +2303,9 @@ def _get_name(self): def refresh(self): """Force refreshing the record's data.""" + self._invalidate_cache() + + def _invalidate_cache(self): self._cached_keys.discard('id') for key in self._cached_keys: delattr(self, key) @@ -2292,8 +2336,7 @@ def copy(self, default=None): The optional argument `default` is a mapping which overrides some values of the new record. """ - if default: - default = self._model._unbrowse_values(default) + default = default and self._model._unbrowse_values(default) new_id = self._execute('copy', self.id, default) if isinstance(new_id, list): [new_id] = new_id or [False] @@ -2302,7 +2345,7 @@ def copy(self, default=None): def _send(self, signal): """Trigger workflow `signal` for this :class:`Record`.""" assert self.env.client.version_info < 11.0, 'Not supported' - self.refresh() + self._invalidate_cache() return self.env.exec_workflow(self._name, signal, self.id) @property @@ -2337,7 +2380,7 @@ def __getattr__(self, attr): def wrapper(self, *params, **kwargs): """Wrapper for client.execute({!r}, {!r}, {:d}, *params, **kwargs).""" res = self._execute(attr, [self.id], *params, **kwargs) - self.refresh() + self._invalidate_cache() if isinstance(res, list) and len(res) == 1: return res[0] return res @@ -2372,11 +2415,8 @@ def excepthook(exc_type, exc, tb): print(msg.strip()) sys.excepthook = excepthook - class Usage: - def __call__(self): - print(usage) - __repr__ = lambda s: usage - builtins.usage = Usage() + builtins.usage = type('Usage', (), {'__call__': lambda s: print(usage), + '__repr__': lambda s: usage})() try: import readline as rl @@ -2454,7 +2494,7 @@ def main(interact=_interact): if not args.server: args.server = ['-c', args.config] if args.config else DEFAULT_URL if domain and not args.model: - args.server = args.server + domain if args.config else domain + args.server = args.server + domain if args.config else "".join(domain) if not args.user: args.user = ADMIN_USER client = Client(args.server, args.db, args.user, password=args.password, @@ -2472,8 +2512,6 @@ def main(interact=_interact): writer.writerows(data or ()) if client._is_interactive(): - if not client.env.uid: - client.connect() return interact(global_vars) if interact else global_vars diff --git a/tests/_common.py b/tests/_common.py index 05bf1c4..9dbcb43 100644 --- a/tests/_common.py +++ b/tests/_common.py @@ -1,11 +1,10 @@ from io import BytesIO -from urllib.request import HTTPError +from urllib.request import HTTPError, urljoin from unittest import mock, TestCase -from unittest.mock import call, sentinel +from unittest.mock import ANY, call, sentinel import odooly -sample_context = {'lang': 'en_US', 'tz': 'Europe/Zurich'} type_call = type(call) @@ -20,22 +19,24 @@ def popvalue(self): def OBJ(model, method, *params, **kw): if 'context' not in kw: - kw['context'] = sample_context + kw['context'] = {**OdooTestCase.user_context} elif kw['context'] is None: del kw['context'] return ('object.execute_kw', sentinel.AUTH, model, method, params) + ((kw,) if kw else ()) -class XmlRpcTestCase(TestCase): +class OdooTestCase(TestCase): server_version = None server = "http://192.0.2.199:9999" database = user = password = uid = None + user_context = {'lang': 'en_US', 'tz': 'Europe/Zurich'} maxDiff = None def setUp(self): self.addCleanup(mock.patch.stopall) self.stdout = mock.patch('sys.stdout', new=PseudoFile()).start() self.stderr = mock.patch('sys.stderr', new=PseudoFile()).start() + self.http_request = self._patch_http_request() # Clear the login cache mock.patch.dict('odooly.Env._cache', clear=True).start() @@ -43,42 +44,17 @@ def setUp(self): # Avoid hanging on getpass mock.patch('odooly.getpass', side_effect=RuntimeError).start() - self.service = self._patch_service() - if self.server and self.database: - # create the client - self.client = odooly.Client( - self.server, self.database, self.user, self.password) - self.env = self.client.env - # reset the mock - self.service.reset_mock() - - def _patch_http_request(self, uid=None, context=sample_context): + def _patch_http_request(self, uid=None, context=None): def func(url, *, method='POST', data=None, json=None, headers=None): if url.endswith("/web/session/authenticate"): - result = {'uid': uid or self.uid, 'user_context': context} + result = {'uid': uid or self.uid, 'user_context': context or self.user_context} else: with HTTPError(url, 404, 'Not Found', headers, BytesIO()) as not_found: raise not_found return {'result': result} return mock.patch('odooly.HTTPSession.request', side_effect=func).start() - def _patch_service(self): - def get_svc(server, name, *args, **kwargs): - return getattr(svcs, name) - patcher = mock.patch('odooly.Service', side_effect=get_svc) - svcs = patcher.start() - svcs.stop = patcher.stop - for svc_name in 'db common object wizard report'.split(): - svcs.attach_mock(mock.Mock(name=svc_name), svc_name) - # Default values - svcs.db.server_version.return_value = self.server_version - svcs.db.list.return_value = [self.database] - svcs.common.login.return_value = self.uid - # env['res.users'].context_get() - svcs.object.execute_kw.return_value = sample_context - return svcs - - def _assertCalls(self, mock_, expected_calls): + def assertMockCalls(self, mock_, expected_calls): for idx, expected in enumerate(expected_calls): if isinstance(expected, str): if expected[:4] == 'call': @@ -88,17 +64,22 @@ def _assertCalls(self, mock_, expected_calls): self.assertSequenceEqual(mock_.mock_calls, expected_calls) mock_.reset_mock() - def assertServiceCalls(self, *expected_args): + def assertRequests(self, *expected_args): + server = urljoin(self.server, '/').rstrip('/') expected_calls = list(expected_args) for idx, expected in enumerate(expected_calls): - if not isinstance(expected, type_call) and isinstance(expected, tuple): - rpcmethod = expected[0] - if len(expected) > 1 and expected[1] == sentinel.AUTH: - args = (self.database, self.uid, self.password) + expected[2:] - else: - args = expected[1:] - expected_calls[idx] = getattr(call, rpcmethod)(*args) - self._assertCalls(self.service, expected_calls) + if isinstance(expected, tuple): + if expected[0].startswith('/json/2/'): + headers = { + 'Authorization': f'Bearer {self.password}', + 'Content-Type': 'application/json', + 'X-Odoo-Database': self.database, + } + expected_calls[idx] = call(f"{server}{expected[0]}", json=expected[1], headers=headers) + elif expected[0].startswith('/web/'): + jsonrpc_params = {'jsonrpc': '2.0', 'method': 'call', 'params': expected[1], 'id': ANY} + expected_calls[idx] = call(f"{server}{expected[0]}", json=jsonrpc_params) + self.assertMockCalls(self.http_request, expected_calls) def assertOutput(self, stdout='', stderr='', startswith=False): # compare with ANY to make sure output is not empty @@ -117,5 +98,46 @@ def assertOutput(self, stdout='', stderr='', startswith=False): stdout_value = stdout_value[:len(stdout)] self.assertMultiLineEqual(stdout_value, stdout) + +class XmlRpcTestCase(OdooTestCase): + + def setUp(self): + super().setUp() + self.service = self._patch_service() + if self.server and self.database: + # create the client + self.client = odooly.Client( + self.server, self.database, self.user, self.password) + self.env = self.client.env + # reset the mock + self.service.reset_mock() + + def _patch_service(self): + def get_svc(server, name, *args, **kwargs): + return getattr(svcs, name) + patcher = mock.patch('odooly.Service', side_effect=get_svc) + svcs = patcher.start() + svcs.stop = patcher.stop + for svc_name in 'db common object wizard report'.split(): + svcs.attach_mock(mock.Mock(name=svc_name), svc_name) + # Default values + svcs.db.server_version.return_value = self.server_version + svcs.db.list.return_value = [self.database] + svcs.common.login.return_value = self.uid + svcs.object.execute_kw.return_value = self.user_context + return svcs + + def assertServiceCalls(self, *expected_args): + expected_calls = list(expected_args) + for idx, expected in enumerate(expected_calls): + if not isinstance(expected, type_call) and isinstance(expected, tuple): + rpcmethod = expected[0] + if len(expected) > 1 and expected[1] == sentinel.AUTH: + args = (self.database, self.uid, self.password) + expected[2:] + else: + args = expected[1:] + expected_calls[idx] = getattr(call, rpcmethod)(*args) + self.assertMockCalls(self.service, expected_calls) + # Legacy assertCalls = assertServiceCalls diff --git a/tests/test_client.py b/tests/test_client.py index e1ba6a1..e2776a8 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -348,7 +348,7 @@ def test_list_modules(self): def test_module_upgrade(self): self.service.object.execute_kw.side_effect = [ - (42, 0), [42], [], ANY, [42], + (42, 0), [], [42], ANY, [42], [{'id': 42, 'state': ANY, 'name': ANY}], ANY] result = self.env.upgrade('dummy') @@ -356,8 +356,8 @@ def test_module_upgrade(self): self.assertCalls( imm('update_list'), - imm('search', [('name', 'in', ('dummy',))]), imm('search', [('state', 'not in', STABLE)]), + imm('search', [('name', 'in', ('dummy',))]), imm('button_upgrade', [42]), imm('search', [('state', 'not in', STABLE)]), imm('read', [42], ['name', 'state']), @@ -365,6 +365,25 @@ def test_module_upgrade(self): ) self.assertOutput(ANY) + def test_client_verbose(self): + client = self.client + + self.assertIsNone(client.verbose) + + client.verbose = 1 + self.assertEqual(client.verbose, 79) + + client.verbose = 3 + self.assertEqual(client.verbose, 9999) + + client.verbose = 140 + self.assertEqual(client.verbose, 140) + self.assertEqual(client._printer.cols, 140) + + client.verbose = 0 + self.assertIsNone(client.verbose) + self.assertIsNone(client._printer.cols) + class TestClientApi(XmlRpcTestCase): """Test the Client API.""" @@ -581,7 +600,7 @@ def test_report_get(self): def _module_upgrade(self, button='upgrade'): execute_return = [ - [7, 0], [42], [], {'name': 'Upgrade'}, + [7, 0], [], [42], {'name': 'Upgrade'}, [{'id': 4, 'state': ANY, 'name': ANY}, {'id': 5, 'state': ANY, 'name': ANY}, {'id': 42, 'state': ANY, 'name': ANY}], ANY] @@ -589,16 +608,17 @@ def _module_upgrade(self, button='upgrade'): expected_calls = [ imm('update_list'), - imm('search', [('name', 'in', ('dummy', 'spam'))]), imm('search_read', [('state', 'not in', STABLE)], ['name', 'state']), + imm('search', [('name', 'in', ('dummy', 'spam'))]), imm('button_' + button, [42]), imm('search_read', [('state', 'not in', STABLE)], ['name', 'state']), bmu('upgrade_module', []), ] if float(self.server_version) < 8.0: execute_return[4:4] = [[4, 42, 5]] - expected_calls[2:5] = [ + expected_calls[1:5] = [ imm('search', [('state', 'not in', STABLE)]), + imm('search', [('name', 'in', ('dummy', 'spam'))]), imm('button_' + button, [42]), imm('search', [('state', 'not in', STABLE)]), imm('read', [4, 42, 5], ['name', 'state']), @@ -632,7 +652,7 @@ def _module_upgrade(self, button='upgrade'): self.service.object.execute_kw.side_effect = [[0, 0], []] self.assertIsNone(action()) - self.assertCalls(expected_calls[0], expected_calls[2]) + self.assertCalls(*expected_calls[:2]) self.assertOutput('0 module(s) updated\n') def test_module_upgrade(self): @@ -737,10 +757,6 @@ class TestClientApi19(TestClientApi): test_exec_workflow = test_wizard = _skip_test test_report = test_render_report = test_report_get = _skip_test - def _patch_service(self): - self.auth_http = self._patch_http_request() - return super()._patch_service() - def test_obsolete_methods(self): self.assertRaises(AttributeError, getattr, self.env, 'exec_workflow') self.assertRaises(AttributeError, getattr, self.env, 'render_report') diff --git a/tests/test_interact.py b/tests/test_interact.py index 98d3a00..6e22460 100644 --- a/tests/test_interact.py +++ b/tests/test_interact.py @@ -4,12 +4,37 @@ from unittest.mock import call, ANY import odooly -from ._common import XmlRpcTestCase +from ._common import OdooTestCase, XmlRpcTestCase -class TestInteract(XmlRpcTestCase): +class _TestInteract(OdooTestCase): + + def setUp(self): + super().setUp() + # Reset defaults before each run + odooly.Client._set_interactive.__func__.__defaults__ = ({},) + # Hide readline module + mock.patch.dict('sys.modules', {'readline': None}).start() + mock.patch('odooly.Client._globals', None).start() + mock.patch('odooly.Client._set_interactive', wraps=odooly.Client._set_interactive).start() + self.interact = mock.patch('odooly._interact', wraps=odooly._interact).start() + self.infunc = mock.patch('code.InteractiveConsole.raw_input').start() + mock.patch('odooly.main.__defaults__', (self.interact,)).start() + + def _resp_version_info(self): + [major, minor] = self.server_version.split('.') + return { + 'server_version': self.server_version, + 'server_version_info': [int(major), int(minor), 0, 'final', 0, ''], + 'server_serie': self.server_version, + 'protocol_version': 1, + } + + +class TestInteractXmlRpc(XmlRpcTestCase, _TestInteract): + """Test interactive mode with OpenERP 6.1.""" server_version = '6.1' - server = f"{XmlRpcTestCase.server}/xmlrpc" + server = f"{OdooTestCase.server}/xmlrpc" startup_calls = ( call(ANY, 'db', ANY), 'db.server_version', @@ -21,16 +46,6 @@ class TestInteract(XmlRpcTestCase): 'db.list', ) - def setUp(self): - super().setUp() - # Hide readline module - mock.patch.dict('sys.modules', {'readline': None}).start() - mock.patch('odooly.Client._globals', None).start() - mock.patch('odooly.Client._set_interactive', wraps=odooly.Client._set_interactive).start() - self.interact = mock.patch('odooly._interact', wraps=odooly._interact).start() - self.infunc = mock.patch('code.InteractiveConsole.raw_input').start() - mock.patch('odooly.main.__defaults__', (self.interact,)).start() - def test_main(self): env_tuple = (self.server, 'database', 'usr', None, None) mock.patch('sys.argv', new=['odooly', '--env', 'demo']).start() @@ -147,3 +162,70 @@ def usr17(model, method, *params): 'Model not found: res.company', ]) self.assertOutput(stderr=ANY) + + +class TestInteract19(_TestInteract): + """Test interactive mode with Odoo 19.""" + server_version = '19.0' + server = f"{OdooTestCase.server}/" + database, user, password, uid = 'database', 'usr', 'password', 17 + startup_calls = ( + ('/web/webclient/version_info', {}), + ('/web/database/list', {}), + ('/web/session/authenticate', {'db': database, 'login': user, 'password': password}), + ('/json/2/res.users/context_get', {}), + ) + + def test_main(self): + env_tuple = (self.server, self.database, self.user, None, None) + mock.patch('sys.argv', new=['odooly', '--env', 'demo']).start() + read_config = mock.patch('odooly.Client.get_config', + return_value=env_tuple).start() + getpass = mock.patch('odooly.getpass', + return_value=self.password).start() + self.http_request.side_effect = [ + {'result': self._resp_version_info()}, + [], + {'result': {'uid': 17, 'user_context': self.user_context}}, + {'uid': self.uid, **self.user_context}, + # + {'result': {'uid': 51, 'user_context': self.user_context}}, + OSError, + {'result': {'uid': 51, 'user_context': self.user_context}}, + ] + + # Launch interactive + self.infunc.side_effect = [ + "client\n", + "env\n", + "env.sudo('gaspard')\n", + "client.login('gaspard')\n", + "23 + 19\n", + EOFError('Finished')] + odooly.main() + + self.assertEqual(sys.ps1, 'demo >>> ') + self.assertEqual(sys.ps2, ' ... ') + expected_calls = self.startup_calls + ( + ('/web/session/authenticate', {'db': 'database', 'login': 'gaspard', 'password': 'password'}), + ('/json/2/res.users/context_get', {}), + ('/web/session/authenticate', {'db': 'database', 'login': 'gaspard', 'password': 'password'}), + ) + self.assertRequests(*expected_calls) + self.assertEqual(getpass.call_count, 2) + self.assertEqual(read_config.call_count, 1) + self.assertEqual(self.interact.call_count, 1) + outlines = self.stdout.popvalue().splitlines() + self.assertSequenceEqual(outlines[-6:], [ + "Logged in as 'usr'", + f"", + "", + "", + "Logged in as 'gaspard'", + "42", + ]) + self.assertOutput(stderr='\x1b[A\n\n', startswith=True) + + # TODO + test_no_database = None + test_invalid_user_password = None diff --git a/tests/test_model.py b/tests/test_model.py index c55dd10..9d8634f 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -1,6 +1,5 @@ from functools import partial -from unittest.mock import call, sentinel, ANY -from urllib.request import urljoin +from unittest.mock import sentinel, ANY import odooly from ._common import XmlRpcTestCase, OBJ @@ -86,7 +85,7 @@ def __getitem__(self, key): elif model == 'res.users': keys += ('login', 'name', 'password') # etc ... else: - keys += ('name', 'message', 'spam', 'birthdate', 'city') + keys += ('name', 'message', 'spam', 'first_name', 'birthdate', 'city') fields = dict.fromkeys(keys, {'type': sentinel.FIELD_TYPE}) fields['misc_id'] = {'type': 'many2one', 'relation': 'foo.misc'} fields['line_ids'] = {'type': 'one2many', 'relation': 'foo.lines'} @@ -120,6 +119,7 @@ def setUp(self): self.service.object.execute_kw.side_effect = self.obj_exec # preload 'foo.bar' self.env['foo.bar'] + self.http_request.reset_mock() self.service.reset_mock() @@ -179,37 +179,59 @@ def test_access(self): def test_search(self): FooBar = self.env['foo.bar'] - searchterm = 'name like Morice' + domain = [('name', 'like', 'Morice')] + domain2 = [('name', '=', 'mushroom'), ('state', '!=', 'draft')] + + # Search itself does not execute API calls self.assertIsInstance(FooBar.search([searchterm]), odooly.RecordList) FooBar.search([searchterm], limit=2) FooBar.search([searchterm], offset=80, limit=99) + FooBar.search([searchterm], offset=80, limit=99)[100:400:3] FooBar.search([searchterm], order='name ASC') + FooBar.search([searchterm], order='name ASC')[:3] + FooBar.search([searchterm], order='name ASC')[40:] + FooBar.search([searchterm], order='name ASC')[40:99] FooBar.search(['name = mushroom', 'state != draft']) FooBar.search([('name', 'like', 'Morice')]) - FooBar._execute('search', [('name like Morice')]) FooBar.search([]) - domain = [('name', 'like', 'Morice')] - domain2 = [('name', '=', 'mushroom'), ('state', '!=', 'draft')] + self.assertCalls() + + # Low-level search will execute API call immediately + FooBar._execute('search', [searchterm]) + self.assertCalls(OBJ('foo.bar', 'search', domain)) + + FooBar.search([searchterm], limit=2).ids + FooBar.search([searchterm], offset=80, limit=99).id + FooBar.search(['name = mushroom', 'state != draft']) or 42 + FooBar.search([('name', 'like', 'Morice')]).ids + FooBar._execute('search', [('name like Morice')])[0] + FooBar.search([]).ids + self.assertCalls( - OBJ('foo.bar', 'search', domain), OBJ('foo.bar', 'search', domain, 0, 2, None), OBJ('foo.bar', 'search', domain, 80, 99, None), - OBJ('foo.bar', 'search', domain, 0, None, 'name ASC'), OBJ('foo.bar', 'search', domain2), OBJ('foo.bar', 'search', domain), OBJ('foo.bar', 'search', domain), OBJ('foo.bar', 'search', []), ) - self.assertOutput('') # Not supported - FooBar.search('name like Morice') - self.assertCalls(OBJ('foo.bar', 'search', 'name like Morice')) + FooBar.search(searchterm) + FooBar.search([searchterm], limit=2, fields=['birthdate', 'city']) + FooBar.search([searchterm], missingkey=42) + self.assertCalls() - FooBar.search(['name like Morice'], missingkey=42) - self.assertCalls(OBJ('foo.bar', 'search', domain, missingkey=42)) - self.assertOutput('') + FooBar.search(searchterm).ids + FooBar.search([searchterm], limit=2, fields=['birthdate', 'city']).ids + FooBar.search([searchterm], missingkey=42).ids + self.assertCalls( + OBJ('foo.bar', 'search', searchterm), + # Invalid keyword arguments are passed to the API + OBJ('foo.bar', 'search', domain, 0, 2, None, fields=['birthdate', 'city']), + OBJ('foo.bar', 'search', domain, missingkey=42), + ) self.assertRaises(TypeError, FooBar.search) self.assertRaises(ValueError, FooBar.search, ['abc']) @@ -239,15 +261,6 @@ def test_search_count(self): OBJ('foo.bar', 'search_count', []), OBJ('foo.bar', 'search_count', []), ) - self.assertOutput('') - - # Invalid keyword arguments are passed to the API - FooBar.search([searchterm], limit=2, fields=['birthdate', 'city']) - FooBar.search([searchterm], missingkey=42) - self.assertCalls( - OBJ('foo.bar', 'search', domain, 0, 2, None, fields=['birthdate', 'city']), - OBJ('foo.bar', 'search', domain, missingkey=42)) - self.assertOutput('') # Not supported FooBar.search_count(searchterm) @@ -266,6 +279,25 @@ def test_search_count(self): self.assertCalls() self.assertOutput('') + def test_search_read(self): + FooBar = self.env['foo.bar'] + searchterm = 'name like Morice' + domain = [('name', 'like', 'Morice')] + + FooBar.search([searchterm], order='name ASC')[:3].name + + expected_calls = [ + OBJ('foo.bar', 'fields_get'), + OBJ('foo.bar', 'search_read', domain, ['name'], order='name ASC', limit=3), + ] + if float(self.server_version) < 8.0: + expected_calls[1:2] = [ + OBJ('foo.bar', 'search', domain, 0, 3, 'name ASC'), + OBJ('foo.bar', 'read', [1001, 1002], ['name']), + ] + self.assertCalls(*expected_calls) + self.assertOutput('') + def test_read(self): FooBar = self.env['foo.bar'] @@ -1194,12 +1226,12 @@ def test_exists(self): def test_mapped(self): m = self.env['foo.bar'] self.service.object.execute_kw.side_effect = [ - [{'id': k, 'fld1': 'val%s' % k} for k in [4, 17, 7, 42, 112, 13]], {'fld1': {'type': 'char'}, 'display_name': {'type': 'char'}, 'foo_categ_id': {'relation': 'foo.categ', 'type': 'many2one'}}, + [{'id': k, 'fld1': 'val%s' % k} for k in [4, 17, 7, 42, 112, 13]], [{'id': k, 'foo_categ_id': [k * 10, 'Categ C%04d' % k]} for k in [4, 17, 7, 42, 112, 13]], [{'id': k, 'foo_categ_id': [k * 10, 'Categ C%04d' % k]} for k in [4, 17, 7, 42, 112, 13]], - [{'id': k * 10, 'fld2': 'f2_%04d' % k} for k in [4, 17, 7, 42, 112, 13]], {'fld2': {'type': 'char'}}, + [{'id': k * 10, 'fld2': 'f2_%04d' % k} for k in [4, 17, 7, 42, 112, 13]], self._return_display_name(42, 'Record 42'), self._return_display_name(4, 'Record 4'), self._return_display_name(42, 'Record 42'), @@ -1239,12 +1271,12 @@ def test_mapped(self): self.assertRaises(TypeError, records1.mapped) self.assertCalls( - OBJ('foo.bar', 'read', ids1_sorted, ['fld1']), OBJ('foo.bar', 'fields_get'), + OBJ('foo.bar', 'read', ids1_sorted, ['fld1']), OBJ('foo.bar', 'read', ids1_sorted, ['foo_categ_id']), OBJ('foo.bar', 'read', ids1_sorted, ['foo_categ_id']), - OBJ('foo.categ', 'read', [k * 10 for k in ids1_sorted], ['fld2']), OBJ('foo.categ', 'fields_get'), + OBJ('foo.categ', 'read', [k * 10 for k in ids1_sorted], ['fld2']), self._call_display_name('foo.bar', 42), self._call_display_name('foo.bar', 4), self._call_display_name('foo.bar', 42), @@ -1272,7 +1304,8 @@ def test_mapped_empty_relation(self): self.service.object.execute_kw.side_effect = [ {'child_ids': {'relation': 'foo.bar', 'type': 'one2many'}, 'category_ids': {'relation': 'foo.bar', 'type': 'many2many'}, - 'parent_id': {'relation': 'foo.bar', 'type': 'many2one'}} + 'parent_id': {'relation': 'foo.bar', 'type': 'many2one'}, + 'name': {'type': 'char'}} ] records0 = m.browse() @@ -1290,21 +1323,21 @@ def test_filtered(self): m = self.env['foo.bar'] items = [[k, 'Item %d' % k] for k in range(1, 9)] self.service.object.execute_kw.side_effect = [ - [{'id': k, 'flag1': not (k % 3)} for k in [4, 17, 7, 42, 112, 13]], {'flag1': {'type': 'boolean'}, 'foo_child_ids': {'relation': 'foo.child', 'type': 'one2many'}, 'foo_categ_id': {'relation': 'foo.categ', 'type': 'many2one'}}, + [{'id': k, 'flag1': not (k % 3)} for k in [4, 17, 7, 42, 112, 13]], [{'id': k, 'foo_categ_id': [k * 10, 'Categ C%04d' % k]} for k in [4, 17, 7, 42, 112, 13]], [{'id': k, 'foo_categ_id': [k * 10, 'Categ C%04d' % k]} for k in [4, 17, 7, 42, 112, 13]], + {'flag2': {'type': 'char'}, 'flag3': {'type': 'boolean'}}, [{'id': k * 10, 'flag2': bool(k % 2)} for k in [4, 17, 7, 42, 112, 13]], - {'flag2': {'type': 'char'}}, [{'id': k, 'foo_child_ids': {}} for k in [4, 7, 112, 13]] + [{'id': 42, 'foo_child_ids': items[0:6]}, {'id': 17, 'foo_child_ids': items[6:8]}], + {'flag3': {'type': 'boolean'}, 'flag4': {'type': 'char'}}, [{'id': k, 'flag3': (k < 3)} for k in range(1, 9)], - {'flag3': {'type': 'boolean'}}, [{'id': k, 'foo_child_ids': {}} for k in [4, 7, 112, 13]] + [{'id': 42, 'foo_child_ids': items[0:6]}, {'id': 17, 'foo_child_ids': items[6:8]}], @@ -1346,17 +1379,17 @@ def test_filtered(self): self.assertRaises(TypeError, records1.filtered) self.assertCalls( - OBJ('foo.bar', 'read', ids1_sorted, ['flag1']), OBJ('foo.bar', 'fields_get'), + OBJ('foo.bar', 'read', ids1_sorted, ['flag1']), OBJ('foo.bar', 'read', ids1_sorted, ['foo_categ_id']), OBJ('foo.bar', 'read', ids1_sorted, ['foo_categ_id']), - OBJ('foo.categ', 'read', [k * 10 for k in ids1_sorted], ['flag2']), OBJ('foo.categ', 'fields_get'), + OBJ('foo.categ', 'read', [k * 10 for k in ids1_sorted], ['flag2']), OBJ('foo.bar', 'read', ids1_sorted, ['foo_child_ids']), - OBJ('foo.child', 'read', [1, 2, 3, 4, 5, 6, 7, 8], ['flag3']), OBJ('foo.child', 'fields_get'), + OBJ('foo.child', 'read', [1, 2, 3, 4, 5, 6, 7, 8], ['flag3']), OBJ('foo.bar', 'read', ids1_sorted, ['foo_child_ids']), OBJ('foo.child', 'read', [1, 2, 3, 4, 5, 6, 7, 8], ['flag4']), @@ -1380,9 +1413,9 @@ def test_sorted(self): self.service.object.execute_kw.side_effect = [ [42, 4, 7, 17, 112], [42, 4, 7, 17, 112], - [{'id': k, 'fld1': 'val%s' % k} for k in [4, 17, 7, 42, 112, 13]], {'fld1': {'type': 'char'}, 'display_name': {'type': 'char'}}, [{'id': k, 'fld1': 'val%s' % k} for k in [4, 17, 7, 42, 112, 13]], + [{'id': k, 'fld1': 'val%s' % k} for k in [4, 17, 7, 42, 112, 13]], self._return_display_name(4, 'Record 4'), self._return_display_name(4, 'Record 4'), [{'id': k} for k in [4, 17, 7, 42, 112]], @@ -1410,12 +1443,11 @@ def test_sorted(self): self.assertCalls( OBJ('foo.bar', 'search', [('id', 'in', ids1)]), OBJ('foo.bar', 'search', [('id', 'in', ids1)]), - OBJ('foo.bar', 'read', ids1_sorted, ['fld1']), OBJ('foo.bar', 'fields_get'), OBJ('foo.bar', 'read', ids1_sorted, ['fld1']), + OBJ('foo.bar', 'read', ids1_sorted, ['fld1']), self._call_display_name('foo.bar', 4), self._call_display_name('foo.bar', 4), - OBJ('foo.bar', 'read', ids1_sorted, ['fld1.fld2']), ) records2 = m.browse([42, 42]) @@ -1573,23 +1605,6 @@ class TestRecord18(TestRecord): class TestModel19(TestModel): server_version = '19.0' - def _patch_service(self): - self.auth_http = self._patch_http_request() - return super()._patch_service() - - def test_auth_http(self): - headers = { - 'Authorization': 'Bearer passwd', - 'Content-Type': 'application/json', - 'X-Odoo-Database': 'database', - } - test_url = urljoin(self.server, '/json/2/res.users/context_get') - self.assertEqual(self.auth_http.mock_calls, [call(test_url, json={}, headers=headers)]) - class TestRecord19(TestRecord): server_version = '19.0' - - def _patch_service(self): - self.auth_http = self._patch_http_request() - return super()._patch_service() diff --git a/tests/test_util.py b/tests/test_util.py index 872e6ef..f88c68e 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -1,6 +1,7 @@ +from functools import partial from unittest import TestCase -from odooly import issearchdomain, searchargs, readfmt +from odooly import issearchdomain, searchargs, Model, Client, Printer class TestUtils(TestCase): @@ -129,11 +130,42 @@ def test_searchargs_invalid(self): self.assertRaises(ValueError, searchargs, (['some_id child_off'],)) self.assertRaises(ValueError, searchargs, (['someth like3'],)) - def test_readfmt(self): + dummy = object.__new__(Model) + readfmt = partial(dummy._parse_format, browse=False) + # Helper for 'read' methods (fields, fmt) = readfmt('a %(color)s elephant enters %(location)r.\n\n%(firstname)s has left') self.assertEqual(fields, ['color', 'location', 'firstname']) (fields, fmt) = readfmt('a {color} elephant enters {location[1]}.\n\n{firstname!r} has left') self.assertEqual(fields, ['color', 'location', 'firstname']) + + def test_printer(self): + # Verbosity None or 0 --> disable logging + # Verbosity 1 to 8 --> mapped 1 -> 79 / 2 -> 179 / 3+ -> 9999 + # Verbosity >= 36 --> width to print in columns + client = object.__new__(Client) + client._printer = Printer() + client.verbose = 2 + self.assertEqual(client._printer.cols, 179) + self.assertEqual(client.verbose, 179) + client.verbose = 3 + self.assertEqual(client._printer.cols, 9999) + client.verbose = 250 + self.assertEqual(client._printer.cols, 250) + client.verbose = None + self.assertIsNone(client._printer.cols) + client.verbose = 64 + self.assertEqual(client._printer.cols, 64) + self.assertEqual(client.verbose, 64) + client.verbose = True + self.assertEqual(client._printer.cols, 79) + client.verbose = 8 + self.assertEqual(client._printer.cols, 9999) + self.assertEqual(client.verbose, 9999) + client.verbose = 0 + self.assertIsNone(client._printer.cols) + + self.assertRaises(TypeError, setattr, client, 'verbose', 'a') + self.assertRaises(IndexError, setattr, client, 'verbose', -4)