Source code for setuplib.setuplib

#-*- coding: utf-8 -*-
"""Setuplib subcommand 'ENUMERATE'.
Enumerate available commands based on 'pkt_resources'.
Supports parameters and filters.
"""
from __future__ import absolute_import
from __future__ import print_function

import sys
import os
import re
import traceback

import distutils.cmd

import pkg_resources

import yapydata.datatree.datatree as datatree
import setuplib

__author__ = 'Arno-Can Uestuensoez'
__author_email__ = 'acue_sf2@sourceforge.net'
__license__ = "Artistic-License-2.0 + Forced-Fairplay-Constraints"
__copyright__ = "Copyright (C) 2015-2019 Arno-Can Uestuensoez @Ingenieurbuero Arno-Can Uestuensoez"
__uuid__ = "239b0bf7-674a-4f53-a646-119f591af806"

__version__ = "01.01.005"

__product_family__ = "setuplib"
__product__ = "setuplib"
__product_component__ = "list_enty_points"


[docs]class SetuplibCommandsxError(setuplib.SetuplibError): """Error on setuplib calls. """ pass
[docs]class SetupListEntryPointsX(distutils.cmd.Command): """List available entry points.""" description = 'List available entry points.' user_options = [ ('debug', 'd', "Raises degree of debug traces of current context. Supports repetition. " "Each raises the debug verbosity level of the context by one."), ('exit', 'e', "Exit after command 'setuplib' immediately, ignore following. " "Default := off."), ('filter=', None, "Define a filter, for details refer to the manual. " "Default: ''"), ('format=', 'f', "Define display format. " "See '--format=help. " "Default: 'name,module_name,dist.egg_info:::fname'"), ('group=', 'g', "Set group for scan, '--group=none' scans all, " "'--group=console_scripts' displays all console scripts, " "etc. " "For current list: '--group=help', similar to '--list-groups'. " "See 'PyPA.io'." "Default: 'distutils.commands'"), ('ignore-missing', 'i', "Ignore errors due to missing components, and continue. " " For example in case of missing an optional. " "Default: False. "), ('layout=', None, "Define display layout. " "See '--format=help'. " "Default: table"), ('list-groups', None, "Lists all scanned groups. " "Is shortcut for the '--format' option, " "refer to the manuals for additional details. " "Default: names only." ), ('long', 'l', "List long format, similar to shell command 'ls -l'. " "Default: off"), ('quiet', 'q', "Suppress display including warnings. Display error messages only." "Default: off"), ('search-path', 'P', "Set the search path for requested resources. " "Default: 'sys.path'"), ('sort=', None, "Sort a specified field number. " "Default: 0"), ('verbose', 'v', "Raises verbosity of current context. Supports repetition. " "Each raises the command verbosity level of the context by one. " "The value is defined by the global option defined in " "'Distribution'. " "Refer to the manuals for the special behaviour when used as " "either a global option(start 'verbose=1'), " "or as a command context option(start 'verbose=0'). " "Default:=1."), ]
[docs] def initialize_options(self): """Define the API entrypoints and data of the command 'list'. REMARK: verbose and debug are encapsulated/hidden by distutils. """ self.alias = None self.at_once = None self.debug = None self.exit = None self.filter = None self.format = None self.group = None self.ignore_missing = None self.layout = None self.long = None self.list_groups = None self.quiet = None self.search_path = None self.sort = None self.verbose = None
[docs] def finalize_options(self): """Initializae the API of 'lis'. """ # quick-and-dirty hack to resolve the inconsistency of # global and local verbose values of distutils try: # The context option is actually not set by the framework, # instead the global option is reset and intialized to # the number of occurances and passes to the initialization # of the memeber 'self.verbose'. # Thus the poll fails, while the value is already set via the framework. # See code distutils.dist.Distribution. # Anyhow...keeping it as a reminder. _v_opts = self.distribution.get_option_dict('setuplib')['verbose'][1] if _v_opts: self.verbose += 1 except: # fallback to the slightly erroneous behavior when the interface # of distutils changes pass # global and local verbose values of distutils try: # See verbose for description of the global option quiet. # See code distutils.dist.Distribution. _q_opts = self.distribution.get_option_dict('setuplib')['quiet'][1] if _q_opts: self.quiet += 1 except: # fallback to the slightly erroneous behavior when the interface # of distutils changes if self.quiet == None: self.quiet = 0 pass # debug if self.debug == None: self.debug = 0 if self.ignore_missing == None: self.ignore_missing = False if self.exit == None: self.exit = False if self.long == None: self.long = 0 if self.sort == None: self.sort = 0 if self.list_groups != None: self.list_groups = True if self.group == None: # allow pre-selection of a set of groups, e.g. regular expression if not self.list_groups: self.group = 'distutils.commands' elif self.group.lower() == 'none': self.group = None if self.layout == None: self.layout = 'table' # list, table, xml, json, yaml elif self.layout == 'help': print() print("Current layouts are: list, table") print("Soon available: csv, ini, json, .properties, xml, yaml") print() sys.exit(0) if self.format == None: self.format = 'name,module_name,dist.egg_info:::fname' elif self.format.lower() == 'help': print("format := (list | table | csl | xml | json | yaml)") sys.exit(0) # set the internal format representation for the request - or default _format = [] for _f in self.format.split(','): _fx = _f.split(':') if len(_fx) == 0: continue elif _fx[0] and len(_fx) == 1: _format.append([_fx[0], 0, 'auto', '']) elif len(_fx) < 5: _rec = [_fx[0], 0, 'auto', '',] if _fx[1]: _rec[1] = int(_fx[1]) if _fx[2]: if _fx[2].lower() not in ( 'cl', 'cr', 'clip', 'auto', ): raise SetuplibCommandsxError( "parameter error:%s - %s - in: %s" %( str(_fx[2]), str(_f), str(self.format), ) ) _rec[2] = _fx[2].lower() if _fx[3]: # optional special format, passed untouched _rec[3] = _fx[3] if _rec[1] or int(_rec[1]) == 0: # force auto _rec[2] = 'auto' _format.append(_rec) else: raise SetuplibCommandsxError( "parameter error: %s - in: %s" %( str(_f), str(self.format), ) ) self.format = _format # for now reminder only if self.filter == None: self.filter = None # # assemble parameter for the external class # self.task = { "alias": self.alias, "debug": self.debug, "exit": self.exit, "format": self.format, "filter": self.filter, "group": self.group, "ignore_missing": self.ignore_missing, "layout": self.layout, "list_groups": self.list_groups, "long": self.long, "quiet": self.quiet, "search_path": self.search_path, "sort": self.sort, "verbose": self.verbose, } if self.at_once: _m = MyDistributionData(self.distribution, self.task) _m.enumerate(self.task) _m.print() sys.exit(0)
[docs] def run(self): """Creates documents. Calls the defined and activated wrapper scripts. """ _m = MyDistributionData(self.distribution, self.task) _m.enumerate(self.task) _m.print()
# if self.task['list_groups']: # _m.print_groups() # # else: # _m.print()
[docs]class MyDistributionData(object): #: supported types of simple filter combination logic combinelogic = { 'and': 0, #: True if all match 'or': 1, #: True if any match 'nand': 2, #: True if not all matches 'nor': 3, #: True if none matches 'xor': 4, #: True if one only matches }
[docs] def __init__(self, distribution, task): self.distribution = distribution self.task = task #: simple flat cache of all extension points self.ep_cache = {} #: flat cache of all commands self.cmd_ep = [] #: flat cache of standard commands only - from setuptools(eventually distutils too) self.cmd_std = [] #: flat cache of local commands only - from setup() self.cmd_local = [] #: flat cache of extra - non-standard - commands from setuptools self.cmd_ext_setuptools = [] #: flat cache of extra - non-standard - commands from distutils - eventually self.cmd_ext_distutils = [] #: flat cache of extra - non-standard - commands from third-party self.cmd_ext_misc = [] #: flat cache of extra commands from setuplib self.cmd_setuplib = []
[docs] def enumerate(self, task): """Calls *pkg_resources* and caches the results. The cached data is queried later for details required by the extended list and display commands. The caching happens here only, which comprises the flat data cache of all entry points and the additional categorized cache of selective sets for later selection filters. The centralized preparation of the data for later filtering eases the data handling significantly by moderate use of additional resources. Args: self: The current instance of this class. Returns: Results in the member variable:: self.ep_cache The content is a *dict* containing the iterated entry points with the *<ep>.name* as key. Raises: pass-through """ _task = task if not _task: _task = self._task if ( _task['group'] == None or _task['group'] in ('help',) ): self.ep_cache = list(pkg_resources.iter_entry_points(None)) self.ep_cache = [list(x.values())[0] for x in self.ep_cache] # # the shortcut for help display only # if _task['group'] in ('help',): _groups = [] for e in self.ep_cache: _groups.extend(e.dist.get_entry_map().keys()) print() print("Current groups:") print() for g in sorted(set(_groups)): print(" " + str(g)) print() sys.exit(0) else: self.ep_cache = list(pkg_resources.iter_entry_points(_task['group'])) if _task['filter']: _filters = _task['filter'].split(';') _combine = 0 _query_filters = [] for _fx in _filters: if _fx.lower() in 'and': _combine = 0 continue elif _fx.lower() == 'or': _combine = 1 continue elif _fx.lower() == 'nand': _combine = 2 continue elif _fx.lower() == 'nor': _combine = 3 continue elif _fx.lower() == 'xor': _combine = 4 continue # elif _fx.lower() == 'not': # _combine = 5 # continue _record_filter = _fx.split(":") if len(_record_filter) > 1: # a filter requires fieldname and rule for the spanned column _query_filters.append( (_record_filter[0].split('.'), re.compile(_record_filter[1])) ) elif len(_record_filter) == 1: # formal, even may not match at all... _query_filters.append((_record_filter[0].split('.'), '')) if _task['debug'] > 0: print("DBG:input entry points: " + str(len(self.ep_cache))) _all_filters = len(_query_filters) for epi in reversed(range(len(self.ep_cache))): _match_cnt = 0 for _fpath,_fexpr in _query_filters: try: _v = datatree.DataTree(self.ep_cache[epi])(*_fpath, pysyn=True) if not _fexpr: _match_cnt += 1 elif _fexpr.search(str(_v)): _match_cnt += 1 except datatree.YapyDataDataTreeOidError: if _task['ignore_missing']: break else: # fall through to filter logic pass else: if not ( (_combine == 0 and _match_cnt == _all_filters) # and or (_combine == 1 and _match_cnt > 0) # or or (_combine == 2 and _match_cnt < _all_filters) # nand or (_combine == 3 and _match_cnt == 0) # nor or (_combine == 4 and _match_cnt == 1) # xor ): self.ep_cache.pop(epi) if _task['debug'] > 0: print("DBG:filtered entry points: " + str(len(self.ep_cache))) self.ep_index = {} idx = 0 for e in self.ep_cache: self.ep_index[e.name] = idx idx += 1 return
[docs] def print_groups(self, outlist, index, task): """Print '--list-groups' output based on pre-filtered data. Thus the complete set of filters processed in enumerate are applicable. """ _t = self.task self.ep_cache = list(pkg_resources.iter_entry_points(self.task['group'])) if self.task['group'] == None: self.ep_cache = [list(x.values())[0] for x in self.ep_cache] _groups = {} for e in self.ep_cache: _gmap = e.dist.get_entry_map() for k,v in _gmap.items(): if not _groups.get(k): _groups[k] = v else: _groups[k].update(v) pass # # now print the list # if _t['layout'] == 'list': #self.print_list(_list, _index, _t) self.print_groups_list(_groups, index, self.task) elif _t['layout'] == 'xml': raise NotImplementedError("XML is not yet available") elif _t['layout'] == 'json': raise NotImplementedError("JSON is not yet available") elif _t['layout'] == 'yaml': raise NotImplementedError("yaml is not yet available") elif _t['layout'] == 'csv': raise NotImplementedError("csv is not yet available") elif _t['layout'] == 'table': self.print_groups_list(_groups, index, self.task) #self.print_table(_list, _index, _t) else: raise SetuplibCommandsxError( "Unknown layout: " + str(_t['layout']) )
[docs] def print_groups_list(self, outlist, index, task): """Print '--list-groups' output based on pre-filtered data. Thus the complete set of filters are applicable. """ _adjust_level = self.task['long'] print() print("Current groups:") print() _maxsize = 0 if _adjust_level == 3: for k,v in sorted(outlist.items()): if self.task['long']: for k1,v1 in sorted(v.items()): if len(k1) > _maxsize: _maxsize = len(k1) for k,v in sorted(outlist.items()): print(" " + str(k)) if self.task['long']: if _adjust_level == 2: _maxsize = 0 for k1,v1 in sorted(v.items()): if len(k1) > _maxsize: _maxsize = len(k1) for k1,v1 in sorted(v.items()): # I do not want to estimate, whether the tuple v1.attr could contain more than 1... # so simply trust the str(v1) here... _v1 = re.sub(k1 + ' *= *', '', str(v1)) if _maxsize > 0: _format = " {0:"+ str(_maxsize) + "} = {1}" else: _format = " {0} = {1}" print( _format.format( str(k1), str(_v1), ) ) print() print() sys.exit(0)
[docs] def print(self): """Prints the requested data. The printout is again processed in two levels. - print: Prepares the record data for the appropriate format. Calls the inteface:: self.print_<format>(outlist, index, task) outlist := The list of resulting keys within the *self.ep_cache*. index := The sprocessed/sorted list of *(<key>, #index)* mapping of utlist to *self.ep_cache*. task := The parameters of the current task. For example:: self.print_table(outlist, index, task) - print_<format>: Prints out the records of the selcted output format. Uses object data from *self*. """ _t = self.task print() if _t['group'] == None: print("entry points of resource group: None == all") else: print("entry points of resource group: " + str(_t['group'])) if _t['filter']: print("applied filter: " + str(_t['filter'])) print() if _t['sort'] == 0: _index = self.ep_index _list = sorted(_index) elif _t['sort'] == 1: _index = {} idx = 0 _fname = {} for e in self.ep_cache: _fname[e.module_name] = (e.name, idx,) idx += 1 for x in sorted(_fname.keys()): _index[_fname[x][0]] = _fname[x][1] _list = _index elif _t['sort'] != 0: # fieldnames _n = _t['sort'] _nx = _n.split('.') _index = {} idx = 0 _fname = {} _grp = _t['group'] == None for _e in self.ep_cache: _val = datatree.DataTree(_e)(*_nx, pysyn=True) try: _item = _fname[_val] _item.append((_e.name, idx,)) except KeyError: _fname[_val] = [(_e.name, idx,),] idx += 1 for x in sorted(_fname.keys()): for xi in _fname[x]: _index[xi[0]] = xi[1] _list = _index # # now print the list # if self.task['list_groups']: self.print_groups(_list, _index, _t) else: if _t['layout'] == 'list': self.print_list(_list, _index, _t) elif _t['layout'] == 'xml': raise NotImplementedError("XML is not yet available") elif _t['layout'] == 'json': raise NotImplementedError("JSON is not yet available") elif _t['layout'] == 'yaml': raise NotImplementedError("yaml is not yet available") elif _t['layout'] == 'csv': raise NotImplementedError("csv is not yet available") elif _t['layout'] == 'table': self.print_table(_list, _index, _t) else: raise SetuplibCommandsxError( "Unknown layout: " + str(_t['layout']) )
[docs] def print_table(self, outlist, index, task): """Print table. """ # REMINDER:for tests:_header = [['name', 0, 'keep'], ['module_name', 0, 'keep'], ['dist.key', 0, 'keep']] # import prepared format string _header = task['format'] _layout = task['layout'] # determine maximum column widths for each field for _e in self.ep_cache: for i in range(len(_header)): if _header[i][2] not in ('auto', ): continue _nx = _header[i][0].split('.') try: _val = datatree.DataTree(_e)(*_nx, pysyn=True) except datatree.YapyDataDataTreeOidError: _val = '' if ( _header[i][3] and _header[i][3] == 'fname' and not task['long'] ): _val = os.path.basename(_val) elif ( _header[i][3] and _header[i][3] == 'abs' ): _val = os.path.abspath(_val) if len(str(_val)) >= _header[i][1]: # in case of equal add the space _header[i][1] = len(str(_val)) + 1 elif len(_header[i][0]) >= _header[i][1]: # in case of equal add the space _header[i][1] = len(_header[i][0]) + 1 _head = [x[0] for x in _header] _format = '' _tabsep = '' _l = len(_header) for i in range(_l): if _header[i][1] == 0 or i == _l - 1: _format += "{%d}"%(i) else: _format += "{%d:%d}"%(i, _header[i][1]) _tabsep += '-' * _header[i][1] print(_format.format(*_head)) print(_tabsep) _ign = task['ignore_missing'] for x in outlist: try: _e = self.ep_cache[index[x]] _values = [] for _f in range(len(_head)): _nx = _head[_f].split('.') _len = _header[_f][1] - 1 _lx = _header[_f][2] try: _v = datatree.DataTree(_e)(*_nx, pysyn=True) _lv = len(str(_v)) if ( _header[_f][3] and _header[_f][3] == 'fname' and not task['long'] ): _v = os.path.basename(_v) elif ( _header[_f][3] and _header[_f][3] == 'abs' ): _v = os.path.abspath(_v) if _lv > _len: if _lx == 'cl': _v = str(_v)[-_len:] elif _lx == 'cr': _v = str(_v)[:_len] elif _lx == 'clip': #TODO: pass elif _lx == 'auto': pass _values.append(_v) except datatree.YapyDataDataTreeOidError as e: if _ign: # the assumingly most generic type _values.append('') else: # einfo = sys.exc_info() # traceback.print_tb(einfo[2]) traceback.print_exc() print("\n\nHINT: You have selected a non-present path, use:\n" "\n '--ignore-missing' / '-i' to continue" "\n '--filter' for the pre-selection of a valid set\n" ) sys.exit(1) # for now it's enough # Python2 # raise(datatree.YapyDataDataTreeOidError, datatree.YapyDataDataTreeOidError(e), sys.exc_info()[2]) # Python3 # raise datatree.YapyDataDataTreeOidError from e if _ign: if ( _values and len(_values) > 1 ) or ''.join([str(x) for x in _values]): print(_format.format(*[str(v) for v in _values])) else: continue else: print(_format.format(*[str(v) for v in _values])) except AttributeError: # as e: raise except KeyError: # as e: raise print()
[docs] def print_list(self, outlist, index, task): """Print list. """ # import prepared format string _header = task['format'] _layout = task['layout'] _head = [x[0] for x in _header] _format = '' _commonprefix = 0 for i in range(len(_header)): if _commonprefix < len(str(_header[i][0])): _commonprefix = len(str(_header[i][0])) + 1 prewidth = 5 _format = "{0} {1:%d}: {2}"%(_commonprefix) _idxstr = ' ' * prewidth _prestr = "\n{0:%d}" % (prewidth) _idx = 0 for x in outlist: try: for _f in range(len(_head)): if _f: _pre = _idxstr else: _pre = _prestr.format(_idx) _idx += 1 try: _v = datatree.DataTree(self.ep_cache[index[x]])( *_head[_f].split('.'), pysyn=True ) if ( _header[_f][3] and _header[_f][3] == 'fname' and not task['long'] ): _v = os.path.basename(_v) elif ( _header[_f][3] and _header[_f][3] == 'abs' ): _v = os.path.abspath(_v) _x = _format.format( _pre, _head[_f], _v, ) except datatree.YapyDataDataTreeOidError: if task['ignore_missing']: _x = _format.format( _pre, _head[_f], '' ) else: raise print(_x) except AttributeError: # as e: raise except KeyError: # as e: raise print()