Source code for GSASII.GSASIIpath

# -*- coding: utf-8 -*-
#GSASIIpath - file location & update routines
'''
:mod:`GSASIIpath` Classes & routines follow
'''

from __future__ import division, print_function
import os
import sys
import platform
import glob
#import pathlib
import subprocess
import datetime as dt
try:
    import numpy as np
except ImportError:
    print("numpy import failed in GSASIIpath")

# fix up path before using git. Needed when using conda without
#   activate (happens on MacOS w/GSAS-II.app)
pyPath = os.path.dirname(os.path.realpath(sys.executable))
if sys.platform != "win32" and pyPath not in os.environ['PATH'].split(':'):
    os.environ['PATH'] = pyPath + ':' + os.environ['PATH']

# hard-coded github repo locations
G2binURL = "https://api.github.com/repos/AdvancedPhotonSource/GSAS-II-buildtools"
g2URL = "https://github.com/AdvancedPhotonSource/GSAS-II.git"
# tutorial repo owner & Repo name
gitTutorialOwn,gitTutorialRepo = 'AdvancedPhotonSource', 'GSAS-II-Tutorials'

path2GSAS2 = os.path.dirname(os.path.abspath(os.path.expanduser(__file__))) # location of this file; save before any changes in pwd

# convert version numbers as '1.2.3' to integers (1002) and back (to 1.2)
fmtver = lambda v: str(v//1000)+'.'+str(v%1000)
intver = lambda vs: sum([int(i) for i in vs.split('.')[0:2]]*np.array((1000,1)))

[docs] def GetConfigValue(key,default=None,getDefault=False): '''Return the configuration file value for key or a default value if not specified. :param str key: a value to be found in the configuration settings :param any default: a value to be supplied if a value for key is not specified in the config file or the config file is not found. Defaults to None. :param bool getDefault: If True looks up the default value from the config_example.py file (default value is False). Do not specify a getDefault=True if a value is provided for default. :returns: the value found or the default. ''' if getDefault: if default is not None: raise ValueError('Cannot use default and getDefault together') default = GetConfigDefault(key) try: return configDict.get(key,default) except NameError: # this happens when building docs return None
[docs] def SetConfigValue(parmdict): '''Set configuration variables. Note that parmdict is a dictionary from :func:`GSASIIctrlGUI.GetConfigValsDocs` where each element is a lists. The first item in list is the default value, the second is the value to use for that configuration variable. Most of the information gathered in GetConfigValsDocs is no longer used. ''' global configDict for var in parmdict: if var in configDict: del configDict[var] if isinstance(parmdict[var],tuple): configDict[var] = parmdict[var] else: if parmdict[var][1] is None: continue if parmdict[var][1] == '': continue if parmdict[var][0] == parmdict[var][1]: continue configDict[var] = parmdict[var][1]
[docs] def GetConfigDefault(key): '''Return the default value for a config value :param str key: a value to be found in the configuration (config_example.py) file :returns: the default value or None ''' try: from . import config_example except: return None return config_example.__dict__.get(key)
[docs] def addPrevGPX(fil,cDict): '''Add a GPX file to the list of previous files. Move previous names to start of list. Keep most recent five files ''' global configDict fil = os.path.abspath(os.path.expanduser(fil)) if 'previous_GPX_files' not in cDict: cDict['previous_GPX_files'] = [[],[],[],'Previous .gpx files'] # unexpected try: pos = cDict['previous_GPX_files'][1].index(fil) if pos == 0: return cDict['previous_GPX_files'][1].pop(pos) # if present, remove if not 1st except ValueError: pass except AttributeError: cDict['previous_GPX_files'][1] = [] files = list(cDict['previous_GPX_files'][1]) files.insert(0,fil) cDict['previous_GPX_files'][1] = files[:5] configDict['previous_GPX_files'] = cDict['previous_GPX_files'][1]
[docs] def LoadConfigFile(filename): '''Read a GSAS-II configuration file. Comments (starting with "%") are removed, as are empty lines :param str filename: base file name (such as 'file.dat'). Files with this name are located from the path and the contents of each are concatenated. :returns: a list containing each non-empty (after removal of comments) line found in every matching config file. ''' info = [] for path in sys.path: fil = os.path.join(path,'inputs',filename) if not os.path.exists(fil): # patch 3/2024 for svn dir organization fil = os.path.join(path,filename) if not os.path.exists(fil): continue try: i = 0 fp = open(fil,'r') for line in fp: expr = line.split('#')[0].strip() if expr: info.append(expr) i += 1 print(str(i)+' lines read from config file '+fil) finally: fp.close() return info
[docs] def GetBinaryPrefix(pyver=None): '''Creates the first part of the binary directory name such as linux_64_p3.9 (where the full name will be linux_64_p3.9_n1.21). Note that any change made here is also needed in GetBinaryDir in fsource/SConstruct or GSASII-buildtools/compile/nameTar.py ''' if sys.platform == "win32": prefix = 'win' elif sys.platform == "darwin": prefix = 'mac' elif sys.platform.startswith("linux"): prefix = 'linux' else: print(u'Unknown platform: '+sys.platform) raise Exception('Unknown platform') if 'arm' in platform.machine() and sys.platform == "darwin": bits = 'arm' elif 'aarch' in platform.machine() and '64' in platform.architecture()[0]: bits = 'arm64' elif 'arm' in platform.machine(): bits = 'arm32' elif '64' in platform.architecture()[0]: bits = '64' else: bits = '32' # format current python version if pyver: pyver = 'p'+pyver else: pyver = 'p{}.{}'.format(*sys.version_info[0:2]) return '_'.join([prefix,bits,pyver])
#============================================================================== #============================================================================== G2_installed_result = None
[docs] def HowIsG2Installed(): '''Determines if GSAS-II was installed with git. Result is cached to avoid time needed for multiple calls of this. :returns: * a string starting with 'git' from git, if installed from the GSAS-II GitHub repository (defined in g2URL), the string is 'github', if the post-3/2024 directory structure is in use '-rev' is appended. * or 'noVCS' if installed without a connection to a version control system ''' global G2_installed_result if G2_installed_result is not None: return G2_installed_result try: g2repo = openGitRepo(path2GSAS2) if os.path.samefile(os.path.dirname(g2repo.common_dir),path2GSAS2): rev = '' else: rev = '-rev' if g2URL in g2repo.remote().urls: return 'github'+rev elif g2URL.replace('https://github.com/', 'git@github.com:') in g2repo.remote().urls: return 'github'+rev G2_installed_result = 'git'+rev return G2_installed_result except: pass G2_installed_result = 'noVCS' return G2_installed_result
[docs] def getSavedVersionInfo(): '''Get version number information from a file written by install routines. This is faster than getting the information from git. Also, when GSAS-II is installed into Python, files are no longer in a git repository, so querying git is not possible. The git_verinfo.py file is written by install/incr-version.py or by install/save_versions.py The saved_version.py file was previousl written by install/save_versions.py but this file is no longer in use. :returns: a reference to the version variables or None if no version info file is found. ''' # try: # from . import saved_version as gv # return gv # except: # pass try: from . import git_verinfo as gv return gv except: pass return None # this is unexpected as all dists should have git_verinfo.py
[docs] def GetVersionNumber(): '''Obtain a numeric (sequential) version number for GSAS-II from version files, or directly from git if no other choice. :returns: an int value normally, but unexpected error conditions could result in a value of 'unknown' or '?'. ''' # look for a previously recorded tag -- this is quick gv = getSavedVersionInfo() if gv is not None: for item in gv.git_tags+gv.git_prevtags: if item.isnumeric(): return int(item) if HowIsG2Installed().startswith('git'): # unexpected: should always find a version from getSavedVersionInfo() # from a version file, but if that fails ask Git for the most # recent tag that starts & ends with a digit and does not contain a '.' try: g2repo = openGitRepo(path2GSAS2) tag,vers,gnum = g2repo.git.describe('--tags','--match','[0-9]*[0-9]', '--exclude','*.*').split('-') if tag.isnumeric(): return int(tag) except: pass return "unknown" # Not installed from git and no version file -- very strange! return "?"
[docs] def GetVersionTag(): '''Obtain a release (X.Y.Z) version number for GSAS-II from version files, or directly from git if no other choice. :returns: a string of form <Major>.<Minor>.<mini> normally, but unexpected error conditions could result in a value of '?.?.?' or '?'. ''' # look for a previously recorded tag -- this is quick gv = getSavedVersionInfo() if gv is not None: if '?' not in gv.git_versiontag: return gv.git_versiontag if HowIsG2Installed().startswith('git'): # unexpected: should always find a version from getSavedVersionInfo() # from a version file, but if that fails ask Git for the most # recent tag that has the X.Y.Z format. try: g2repo = openGitRepo(path2GSAS2) tag,vers,gnum = g2repo.git.describe('--tags','--match','*.*.*').split('-') return tag except: pass return "?.?.?" # Not installed from git and no version file -- very strange! return "?"
[docs] def getG2Branch(): '''Get name of current branch, as named on local computer ''' if HowIsG2Installed().startswith('git'): g2repo = openGitRepo(path2GSAS2) try: return g2repo.active_branch.name except TypeError: # likely on a detached head return '?'
[docs] def getG2VersionInfo(): '''Get the git version information. This can be a bit slow, so reading .../GSASII/saved_version.py may be faster ''' gv = getSavedVersionInfo() if HowIsG2Installed().startswith('git'): g2repo = openGitRepo(path2GSAS2) commit = g2repo.head.commit ctim = commit.committed_datetime.strftime('%d-%b-%y %H:%M') now = dt.datetime.now().replace( tzinfo=commit.committed_datetime.tzinfo) delta = now - commit.committed_datetime age = delta.total_seconds()/(60*60*24.) msg = '' if g2repo.head.is_detached: msg = ("\n" + "**** You have reverted to a past version of GSAS-II. Please \n" + "contact the developers with what is preferred in this version ****" ) else: msg = '' rc,lc,_ = gitCheckForUpdates(False,g2repo) if rc is None: msg += f"\n\tNo history found. On development branch? ({g2repo.active_branch})" elif str(g2repo.active_branch) != 'main': msg += f'\n\tUsing development branch: {g2repo.active_branch}' elif age > 60 and len(rc) > 0: msg += f"\n\t**** This version is really old ({age:.1f} days). Please update.\n\t**** At least {len(rc)} updates have been posted ****" elif (age > 5 and len(rc) > 0) or len(rc) > 5: msg += f"\n\t**** Please consider updating. This version is {age:.1f} days old\n\t**** and {len(rc)} or more updates behind." # elif len(rc) > 0: # msg += f"\n\tThis GSAS-II version is ~{len(rc)} updates behind current." # could consider getting version & tag from gv if not None (see below) gversion = f"{GetVersionNumber()}/{GetVersionTag()}" if len(rc) > 0: return f" GSAS-II: {gversion} posted {ctim} (\u2265{len(rc)} new updates) [{commit.hexsha[:8]}]{msg}" else: return f" GSAS-II: {gversion} posted {ctim} (no new updates) [{commit.hexsha[:8]}]{msg}" elif gv is not None: vt = '' cvt = '' try: if gv.git_versiontag: vt = gv.git_versiontag cvt,cvn = getGitHubVersion() except: pass for item in gv.git_tags+gv.git_prevtags: if item.isnumeric(): tags = item if vt: tags += f', {vt}' msg = f"GSAS-II version: Git: {gv.git_version[:8]}, #{tags} (reinstall to update)" if vt != cvt and cvt is not None: msg += f'\n\tNote that the current GSAS-II version is {cvt}' return msg # Failed to get version info, fallback on old version number routine; should not happen anymore return f"GSAS-II installed without git, last tag: #{GetVersionNumber()}, {GetVersionTag()}"
#============================================================================== #============================================================================== # routines to interface with GitHub.
[docs] def saveGitHubVersion(): '''Get the latest GSAS-II version tags from the GitHub site and place them into the config.ini file. This is always done in background so that app startup time is minimally delayed. :returns: Returns a Popen object (see subprocess). ''' try: import requests requests except: print('Unable to use requests module') return return subprocess.Popen([sys.executable, __file__, '--github-tags']) if GetConfigValue('debug'): print('Updates fetched')
[docs] def getGitHubVersion(): '''Get the latest git version info when not accessible from git as saved by saveGitHubVersion ''' import configparser cfgfile = os.path.expanduser(os.path.normpath('~/.GSASII/config.ini')) if not os.path.exists(cfgfile): if GetConfigValue('debug'): print(f"{cfgfile} not found") return None,None try: cfg = configparser.ConfigParser() # Read the configuration file cfg.read(cfgfile) except Exception as err: if GetConfigValue('debug'): print(f"Error reading {cfgfile}\n",err) return None,None if 'version info' not in cfg: if GetConfigValue('debug'): print(f"no saved version number in {cfgfile}") return None,None return cfg['version info'].get('lastVersionTag'),cfg['version info'].get('lastVersionNumber')
# routines to interface with git. BASE_HEADER = {'Accept': 'application/vnd.github+json', 'X-GitHub-Api-Version': '2022-11-28'} def openGitRepo(repo_path): try: import git except: return None try: # patch 3/2024 for svn dir organization return git.Repo(path2GSAS2) except git.InvalidGitRepositoryError: pass return git.Repo(os.path.dirname(path2GSAS2))
[docs] def gitLookup(repo_path,gittag=None,githash=None): '''Return information on a particular checked-in version of GSAS-II. :param str repo_path: location where GSAS-II has been installed :param str gittag: a tag value. :param str githash: hex hash code (abbreviated to as few characters as needed to keep it unique). If None (default), a tag must be supplied. :returns: either None if the tag/hash is not found or a tuple with four values (hash, tag-list, message,date_time) where * hash (str) is the git checking hash code; * tag-list is a list of tags (typically there will be one or two); * message is the check-in message (str) * date_time is the check-in date as a datetime object ''' try: import git except: return None g2repo = openGitRepo(repo_path) if gittag is not None and githash is not None: raise ValueError("Cannot specify a hash and a tag") if gittag is not None: try: commit = g2repo.tag(gittag).commit except ValueError: return None elif githash is not None: try: commit = g2repo.commit(githash) except git.BadName: return None else: raise ValueError("Must specify either a hash or a tag") tags = [i.name for i in g2repo.tags if i.commit == commit] return (commit.hexsha, tags, commit.message,commit.committed_datetime)
[docs] def gitHash2Tags(githash=None,g2repo=None): '''Find tags associated with a particular git commit. Note that if `githash` cannot be located because it does not exist or is not unique, a `git.BadName` exception is raised. :param str githash: hex hash code (abbreviated to as few characters as needed to keep it unique). If None (default), the HEAD is used. :param str g2repo: git.Rwpo connecton to GSAS-II installation. If None (default) it will be opened. :returns: a list of tags (each a string) ''' if g2repo is None: g2repo = openGitRepo(path2GSAS2) if githash is None: commit = g2repo.head.object else: commit = g2repo.commit(githash) #return [i.name for i in g2repo.tags if i.commit == commit] # slow with a big repo return g2repo.git.tag('--points-at',commit).split('\n')
[docs] def gitTag2Hash(gittag,g2repo=None): '''Provides the hash number for a git tag. Note that if `gittag` cannot be located because it does not exist or is too old and is beyond the `depth` of the local repository, a `ValueError` exception is raised. :param str repo_path: location where GSAS-II has been installed. :param str gittag: a tag value. :param str g2repo: git.Rwpo connecton to GSAS-II installation. If None (default) it will be opened. :returns: a str value with the hex hash for the commit. ''' if g2repo is None: g2repo = openGitRepo(path2GSAS2) return g2repo.tag(gittag).commit.hexsha
[docs] def gitTestGSASII(verbose=True,g2repo=None): '''Test a the status of a GSAS-II installation :param bool verbose: if True (default), status messages are printed :param str g2repo: git.Rwpo connecton to GSAS-II installation. If None (default) it will be opened. :returns: istat, with the status of the repository, with one of the following values: * -1: path is not found * -2: no git repository at path * -3: unable to access repository * value&1==1: repository has local changes (uncommitted/stashed) * value&2==2: repository has been regressed (detached head) * value&4==4: repository has staged files * value&8==8: repository has has been switched to other than main branch * value==0: no problems noted ''' if g2repo is None: if not os.path.exists(path2GSAS2): if verbose: print(f'Warning: Directory {path2GSAS2} not found') return -1 if os.path.exists(os.path.join(path2GSAS2,'..','.git')): path2repo = os.path.join(path2GSAS2,'..') # expected location elif os.path.exists(os.path.join(path2GSAS2,'.git')): path2repo = path2GSAS2 else: if verbose: print(f'Warning: Repository {path2GSAS2} not found') return -2 try: g2repo = openGitRepo(path2repo) except Exception as msg: if verbose: print(f'Warning: Failed to open repository. Error: {msg}') return -3 code = 0 if g2repo.is_dirty(): # has changed files code += 1 #count_modified_files = len(g2repo.index.diff(None)) if g2repo.head.is_detached: code += 2 # detached else: if g2repo.active_branch.name != 'main': code += 8 # not on main branch if g2repo.index.diff("HEAD"): code += 4 # staged # test if there are local changes committed return code
[docs] def gitCheckForUpdates(fetch=True,g2repo=None): '''Provides a list of the commits made locally and those in the local copy of the repo that have not been applied. Does not provide useful information in the case of a detached Head (see :func:`countDetachedCommits` for that.) :param bool fetch: if True (default), updates are downloaded from the remote repository (git fetch), before checking for changes. At present, this feature is not used anywhere in GSAS-II (fetch=False in all calls). :param str g2repo: git.Rwpo connecton to GSAS-II installation. If None (default) it will be opened. :returns: a list containing (remotecommits, localcommits, fetched) where * remotecommits is a list of hex hash numbers of remote commits and * localcommits is a list of hex hash numbers of local commits and * fetched is a bool that will be True if the update (fetch) step ran successfully Note that if the head is detached (GSAS-II has been reverted to an older version) or the branch has been changed, the values for each of the three items above will be None. ''' try: import git except: print('Failed to import git in gitCheckForUpdates()') return (None,None,None) fetched = False if g2repo is None: g2repo = openGitRepo(path2GSAS2) if g2repo.head.is_detached: return (None,None,None) prevremotecommits = None if fetch: try: head = g2repo.head.ref tracking = head.tracking_branch() prevremotecommits = [i.hexsha for i in head.commit.iter_items(g2repo, f'{head.path}..{tracking.path}')] g2repo.remote().fetch() fetched = True except git.GitCommandError as msg: print(f'Failed to get updates from {g2repo.remote().url}\nerror: {msg}') try: head = g2repo.head.ref tracking = head.tracking_branch() localcommits = [i.hexsha for i in head.commit.iter_items(g2repo, f'{tracking.path}..{head.path}')] remotecommits = [i.hexsha for i in head.commit.iter_items(g2repo, f'{head.path}..{tracking.path}')] if prevremotecommits is None: prevremotecommits = remotecommits if fetch and len(prevremotecommits) != len(remotecommits): print(f"Fetch of new GSAS-II versions uploaded {len(remotecommits)-len(prevremotecommits)} new updates.") elif fetch and GetConfigValue('debug'): print('Updates fetched; nothing new') return remotecommits,localcommits,fetched except: return (None,None,None)
[docs] def countDetachedCommits(g2repo=None): '''Count the number of commits that have been made since a commit that is containined in the main branch returns the count and the commit object for the parent commit that connects the current stranded branch to the main branch. None is returned if no connection is found ''' if g2repo is None: g2repo = openGitRepo(path2GSAS2) if not g2repo.head.is_detached: return 0,g2repo.commit() # is detached head in main branch? if g2repo.commit() in g2repo.iter_commits('main'): return 0,g2repo.commit() # count number of commits since leaving main branch masterList = list(g2repo.iter_commits('main')) for c,i in enumerate(g2repo.commit().iter_parents()): if i in masterList: return c+1,i else: return None,None
[docs] def gitCountRegressions(g2repo=None): '''Count the number of new check ins on the main branch since the head was detached as well as any checkins made on the detached head. :returns: maincount,detachedcount, where * maincount is the number of check ins made on the main branch remote repository since the reverted check in was first made. * detachedcount is the number of check ins made locally starting from the detached head (hopefully 0) If the connection between the current head and the main branch cannot be established, None is returned for both. If the connection from the reverted check in to the newest version (I don't see how this could happen) then only maincount will be None. ''' if g2repo is None: g2repo = openGitRepo(path2GSAS2) # get parent of current head that is in main branch detachedcount,parent = countDetachedCommits(g2repo) if detachedcount is None: return None,None maincount = 0 for h in g2repo.iter_commits('main'): if h == parent: return maincount,detachedcount maincount += 1 return None,detachedcount
[docs] def gitGetUpdate(mode='Background'): '''Download the latest updates into the local copy of the GSAS-II repository from the remote main, but don't actually update the GSAS-II files being used. This can be done immediately or in background. In 'Background' mode, a background process is launched. The results from the process are recorded in file in ~/GSASII_bkgUpdate.log (located in %HOME% on Windows). A pointer to the created process is returned. In 'immediate' mode, the update is performed immediately. The function does not return until after the update is downloaded. :returns: In 'Background' mode, returns a Popen object (see subprocess). In 'immediate' mode nothing is returned. ''' if mode == 'Background': return subprocess.Popen([sys.executable, __file__, '--git-fetch']) else: g2repo = openGitRepo(path2GSAS2) g2repo.remote().fetch() if GetConfigValue('debug'): print('Updates fetched')
[docs] def gitHistory(values='tag',g2repo=None,maxdepth=100): '''Provides the history of commits to the main, either as tags or hash values :param str values: specifies what type of values are returned. If values=='hash', then hash values or for values=='tag', a list of list of tag(s). :param str g2repo: git.Rwpo connecton to GSAS-II installation. If None (default) it will be opened. :returns: a list of str values where each value is a hash for a commit (values=='hash'), for values=='tag', a list of lists, where a list of tags is provided for each commit. When tags are provided, for any commit that does not have any associated tag(s), that entry is omitted from the list. for values=='both', a list of lists, where a hash is followed by a list of tags (if any) is provided ''' if g2repo is None: g2repo = openGitRepo(path2GSAS2) history = list(g2repo.iter_commits('main')) if values.lower().startswith('h'): return [i.hexsha for i in history] elif values.lower().startswith('t'): tagmap = {} # generate lookup table for to get tags for t in g2repo.tags: tagmap.setdefault(t.commit.hexsha, []).append(t.name) return [tagmap[i.hexsha] for i in history if i.hexsha in tagmap] elif values.lower().startswith('b'): # slow with history >thousands # tagmap = {} # generate lookup table for to get tags # for t in g2repo.tags: # tagmap.setdefault(t.commit.hexsha, []).append(t.name) # return [[i.hexsha]+tagmap.get(i.hexsha,[]) for i in history] # potentially faster code r1 = [[i.hexsha]+g2repo.git.tag('--points-at',i).split('\n') for i in history[:maxdepth]] return [[i[0]] if i[1]=='' else i for i in r1] else: raise ValueError(f'gitHistory has invalid values specified: {values}')
[docs] def getGitBinaryReleases(cache=False): '''Retrieves the binaries and download urls of the latest release :param bool cache: when cache is True and the binaries file names are retrieved (does not always succeed when done via GitHub Actions), the results are saved in a file for reuse should the retrieval fail. Default is False so the file is not changed. :returns: a URL dict for GSAS-II binary distributions found in the newest release in a GitHub repository. The repo location is defined in global `G2binURL`. The dict keys are references to binary distributions, which are named as f"{platform}_p{pver}_n{npver}" where platform is determined in :func:`GSASIIpath.GetBinaryPrefix` (linux_64, mac_arm, win_64,...) and where `pver` is the Python version (such as "3.10") and `npver` is the numpy version (such as "1.26"). The value associated with each key contains the full URL to download a tar containing that binary distribution. ''' try: import requests except: print('Unable to install binaries in getGitBinaryReleases():\n requests module not available') return # Get first page of releases. (Could there be more than one?) releases = [] tries = 0 while tries < 5: # this has been known to fail, so retry tries += 1 releases = requests.get( url=f"{G2binURL}/releases", headers=BASE_HEADER ).json() try: # loop over assets of latest release (will [-1] always get this?) versions = [] URLs = [] for asset in releases[-1]['assets']: if not asset['name'].endswith('.tgz'): continue versions.append(asset['name'][:-4]) # Remove .tgz tail URLs.append(asset['browser_download_url']) count = len(versions) # Cache the binary releases for later use in case GitHub # prevents us from using a query to get them if cache and count > 4: fp = open(os.path.join(path2GSAS2,'inputs','BinariesCache.txt'),'w') res = dict(zip(versions,URLs)) for key in res: fp.write(f'{key} : {res[key]}\n') fp.close() return dict(zip(versions,URLs)) except: print('Attempt to get GSAS-II binary releases/assets failed, sleeping for 10 sec and then retrying') import time time.sleep(10) # this does not seem to help when GitHub is not letting the queries through print(f'Could not get releases from {G2binURL}. Using cache') res = {} try: fp = open(os.path.join(path2GSAS2,'inputs','BinariesCache.txt'),'r') for line in fp.readlines(): key,val = line.split(':',1)[:2] res[key.strip()] = val.strip() fp.close() return res except: raise IOError('Cache read of releases failed too.')
[docs] def getGitBinaryLoc(npver=None,pyver=None,verbose=True,debug=False): '''Identify the best GSAS-II binary download location from the distributions in the latest release section of the github repository on the CPU platform, and Python & numpy versions. The CPU & Python versions must match, but the numpy version may only be close. :param str npver: Version number to use for numpy, if None (default) the version is taken from numpy in the current Python interpreter. :param str pyver: Version number to use for Python, if None (default) the version is taken from the current Python interpreter. :param bool verbose: if True (default), status messages are printed :returns: a URL for the tar file (success) or None (failure) ''' bindir = GetBinaryPrefix(pyver) if npver: inpver = intver(npver) else: npver = np.__version__ inpver = intver(np.__version__) # get binaries matching the required install, approximate match for numpy URLdict = getGitBinaryReleases() if debug: print('URLdict:') for k in URLdict: print(k,URLdict[k]) versions = {} for d in URLdict: if d.startswith(bindir): v = intver(d.rstrip('/').split('_')[3].lstrip('n')) versions[v] = d if debug: print('versions:',versions) intVersionsList = sorted(versions.keys()) if not intVersionsList: print('No binaries located to match',bindir) return elif inpver < min(intVersionsList): vsel = min(intVersionsList) if verbose: print( f'Warning: The requested numpy, version, {npver},' f' is older than\n\tthe oldest dist version, {fmtver(vsel)}') elif inpver >= max(intVersionsList): vsel = max(intVersionsList) if verbose and inpver == max(intVersionsList): print( f'The requested numpy version, {npver},' f' matches the binary dist, version {fmtver(vsel)}') elif verbose: print( f'Note: using a binary dist for numpy version {fmtver(vsel)} ' f'which is older than the requested numpy, version {npver}') else: vsel = min(intVersionsList) for v in intVersionsList: if v <= inpver: vsel = v else: if verbose: print( f'FYI: Selecting dist version {fmtver(vsel)}' f' as the requested numpy, version, {npver},' f'\n\tis older than the next dist version {fmtver(v)}') break return URLdict[versions[vsel]]
[docs] def InstallGitBinary(tarURL, instDir, nameByVersion=False, verbose=True): '''Install the GSAS-II binary files into the location specified. :param str tarURL: a URL for the tar file. :param str instDir: location directory to install files. This directory may not exist and will be created if needed. :param bool nameByVersion: if True, files are put into a subdirectory of `instDir`, named to match the tar file (with plaform, Python & numpy versions). Default is False, where the binary files are put directly into `instDir`. :param bool verbose: if True (default), status messages are printed. :returns: None ''' # packages not commonly used so import them here not on startup import tempfile import tarfile try: import requests except: print('Unable to install binaries in InstallGitBinary():\n requests module not available') return # download to scratch tarobj = None tar = tempfile.NamedTemporaryFile(suffix='.tgz',delete=False) try: tar.close() if verbose: print(f'Downloading {tarURL}') r = requests.get(tarURL, allow_redirects=True) with open(tar.name, 'wb') as fp: fp.write(r.content) # open in tar tarobj = tarfile.open(name=tar.name) if nameByVersion: binnam = os.path.splitext(os.path.split(tarURL)[1])[0] install2dir = os.path.join(instDir,binnam) else: install2dir = instDir for f in tarobj.getmembers(): # loop over files in repository # do a bit of sanity checking for safety. Don't install anything # unless it goes into in the specified directory if sys.platform == "win32" and f.name.startswith('._'): continue # clean up Mac cruft for Windows if '/' in f.name or '\\' in f.name: print(f'skipping file {f.name} -- path alteration not allowed') continue if f.name != os.path.basename(f.name): print(f'skipping file {f.name} -- how did this happen?') continue newfil = os.path.normpath(os.path.join(install2dir,f.name)) tarobj.extract(f, path=install2dir, set_attrs=False) # set file mode and mod/access times (but not ownership) os.chmod(newfil,f.mode) os.utime(newfil,(f.mtime,f.mtime)) if verbose: print(f'Created GSAS-II binary file {os.path.split(newfil)[1]}') if verbose: print(f'Binary files created in {os.path.split(newfil)[0]}') finally: if tarobj: del tarobj os.unlink(tar.name)
[docs] def GetRepoUpdatesInBackground(): '''Get the latest GSAS-II version info. This serves to make sure that :func:`gitGetUpdate` is called only if git has been used to install GSAS-II. :returns: returns a Popen object (see subprocess) ''' if HowIsG2Installed().startswith('git'): return gitGetUpdate(mode='Background') else: return saveGitHubVersion()
[docs] def gitStartUpdate(cmdopts): '''Update GSAS-II in a separate process, by running this script with the options supplied in the call to this function and then exiting GSAS-II. ''' cmd = [sys.executable, __file__] + cmdopts if GetConfigValue('debug'): print('Starting updates with command\n\t'+ f'{" ".join(cmd)}') proc = subprocess.Popen(cmd) # on windows the current process needs to end so that the source files can # be written over. On unix the current process needs to stay running # so the child is not killed. if sys.platform != "win32": proc.wait() sys.exit()
[docs] def dirGitHub(dirlist,orgName=gitTutorialOwn, repoName=gitTutorialRepo): '''Obtain a the contents of a GitHub repository directory using the GitHub REST API. :param str dirlist: a list of sub-directories `['parent','child',sub']` for `parent/child/sub` or `[]` for a file in the top-level directory. :param str orgName: the name of the GitHub organization :param str repoName: the name of the GitHub repository :returns: a list of file names or None if the dirlist info does not reference a directory examples:: dirGitHub([], 'GSASII', 'TutorialTest') dirGitHub(['TOF Sequential Single Peak Fit', 'data']) The first example will get the contents of the top-level directory for the specified repository The second example will provide the contents of the "TOF Sequential Single Peak Fit"/data directory. ''' try: import requests except: print('Unable to search GitHub in dirGitHub():\n requests module not available') return dirname = '' for item in dirlist: dirname += item + '/' URL = f"https://api.github.com/repos/{orgName}/{repoName}/contents/{dirname}" r = requests.get(URL, allow_redirects=True) try: return [rec['name'] for rec in r.json()] except: return None
[docs] def rawGitHubURL(dirlist,filename,orgName=gitTutorialOwn, repoName=gitTutorialRepo, branchname="main"): '''Create a URL that can be used to view/downlaod the raw version of file in a GitHub repository. :param str dirlist: a list of sub-directories `['parent','child',sub']` for `parent/child/sub` or `[]` for a file in the top-level directory. :param str filename: the name of the file :param str orgName: the name of the GitHub organization :param str repoName: the name of the GitHub repository :param str branchname: the name of the GitHub branch. Defaults to "main". :returns: a URL-encoded URL ''' import urllib.parse # not used very often, import only when needed dirname = '' for item in dirlist: # it's not clear that the URLencode is needed for the directory name dirname += urllib.parse.quote(item) + '/' #filename = urllib.parse.quote(filename) return f"https://raw.githubusercontent.com/{orgName}/{repoName}/{branchname}/{dirname}{filename}"
[docs] def downloadDirContents(dirlist,targetDir,orgName=gitTutorialOwn, repoName=gitTutorialRepo): '''Download the entire contents of a directory from a repository on GitHub. Used to download data for a tutorial. ''' try: import requests except: print('Unable to download Tutorial data in downloadDirContents():\n requests module not available') return filList = dirGitHub(dirlist, orgName=orgName, repoName=repoName) if filList is None: print(f'Directory {"/".join(dirlist)!r} does not have any files') return None for fil in filList: if fil.lower() == 'index.html': continue URL = rawGitHubURL(dirlist,fil,orgName=orgName,repoName=repoName) r = requests.get(URL, allow_redirects=True) outfil = os.path.join(targetDir,fil) if r.status_code == 200: with open(outfil, 'wb') as fp: fp.write(r.content) print(f'wrote {outfil}') elif r.status_code == 404: print(f'Warning: {fil} is likely a subdirectory of directory {"/".join(dirlist)!r}') else: print(f'Unexpected web response for {fil}: {r.status_code}') return
#============================================================================== #==============================================================================
[docs] def runScript(cmds=[], wait=False, G2frame=None): '''run a shell script of commands in an external process :param list cmds: a list of str's, each ietm containing a shell (cmd.exe or bash) command :param bool wait: if True indicates the commands should be run and then the script should return. If False, then the currently running Python will exit. Default is False :param wx.Frame G2frame: provides the location of the current .gpx file to be used to restart GSAS-II after running the commands, if wait is False. Default is None which prevents restarting GSAS-II regardless of the value of wait. ''' import tempfile if not cmds: #debug print('nothing to do in runScript') return if sys.platform != "win32": suffix = '.sh' else: suffix = '.bat' fp = tempfile.NamedTemporaryFile(mode='w', suffix=suffix, delete=False) shellname = fp.name for line in cmds: fp.write(line) fp.write('\n') if not wait: if G2frame: projectfile = '' if G2frame.GSASprojectfile: projectfile = os.path.realpath(G2frame.GSASprojectfile) main = os.path.join(path2GSAS2,'G2.py') ex = sys.executable if sys.platform == "darwin": # mac requires pythonw which is not always reported as sys.executable if os.path.exists(ex+'w'): ex += 'w' print ('restart using ',' '.join([ex,main,projectfile])) fp.write(' '.join([ex,main,projectfile])) fp.write('\n') fp.close() # start the upgrade in a separate interpreter (avoids loading .pyd files) if sys.platform != "win32": proc = subprocess.Popen(['bash',shellname]) else: proc = subprocess.Popen([shellname],shell=True) if wait: proc.wait() else: if sys.platform != "win32": proc.wait() sys.exit()
[docs] def IPyBreak_base(userMsg=None): '''A routine that invokes an IPython session at the calling location This routine is only used when debug=True is set in the configuration settings ''' savehook = sys.excepthook # save the exception hook try: from IPython.terminal.embed import InteractiveShellEmbed except ImportError: try: # try the IPython 0.12 approach from IPython.frontend.terminal.embed import InteractiveShellEmbed except ImportError: print ('IPython InteractiveShellEmbed not found') return import inspect #from IPython import __version__ #if __version__.startswith('8.12.'): # see https://github.com/ipython/ipython/issues/13966 from IPython.core import getipython if getipython.get_ipython() is None: ipshell = InteractiveShellEmbed.instance() else: ipshell = InteractiveShellEmbed() frame = inspect.currentframe().f_back msg = 'Entering IPython console inside {0.f_code.co_filename} at line {0.f_lineno}\n'.format(frame) if userMsg: msg += userMsg # globals().update(locals()) # This might help with vars inside list comprehensions, etc. ipshell(msg,stack_depth=2) # Go up one level, to see the calling routine sys.excepthook = savehook # reset IPython's change to the exception hook
[docs] def exceptHook(*args): '''A routine to be called when an exception occurs. It prints the traceback with fancy formatting and then calls an IPython shell with the environment of the exception location. This routine is only used when debug=True is set in the configuration settings ''' try: #from IPython.core import ultratb import IPython.core.ultratb except: pass try: from IPython.terminal.embed import InteractiveShellEmbed import IPython.core if sys.platform.startswith('win'): IPython.core.ultratb.FormattedTB(call_pdb=False,color_scheme='NoColor')(*args) else: IPython.core.ultratb.FormattedTB(call_pdb=False,color_scheme='LightBG')(*args) from IPython.core import getipython if getipython.get_ipython() is None: ipshell = InteractiveShellEmbed.instance() else: ipshell = InteractiveShellEmbed() except ImportError: print ('IPython not installed or is really old') return except TypeError: # Ipython 9.x removes color_scheme try: IPython.core.ultratb.FormattedTB(call_pdb=False)(*args) from IPython.core import getipython if getipython.get_ipython() is None: ipshell = InteractiveShellEmbed.instance() else: ipshell = InteractiveShellEmbed() except Exception as msg: print('IPython patch failed, msg=',msg) import inspect frame = inspect.getinnerframes(args[2])[-1][0] msg = 'Entering IPython console at {0.f_code.co_filename} at line {0.f_lineno}\n'.format(frame) savehook = sys.excepthook # save the exception hook try: ipshell(msg,local_ns=frame.f_locals,global_ns=frame.f_globals) # newest (IPython >= 8) except DeprecationWarning: # IPython <=7 try: # IPython >=5 class c(object): pass pseudomod = c() # create something that acts like a module pseudomod.__dict__ = frame.f_locals InteractiveShellEmbed(banner1=msg)(module=pseudomod,global_ns=frame.f_globals) except: # 'IPython <5 InteractiveShellEmbed(banner1=msg)(local_ns=frame.f_locals,global_ns=frame.f_globals) sys.excepthook = savehook # reset IPython's change to the exception hook
[docs] def DoNothing(): '''A routine that does nothing. This is called in place of IPyBreak and pdbBreak except when the debug option is set True in the configuration settings ''' pass
[docs] def InvokeDebugOpts(): 'Called to set up debug options' if any('SPYDER' in name for name in os.environ): print('Running from Spyder, keeping breakpoint() active & skipping exception trapping') elif GetConfigValue('debug'): try: import pdb global pdbBreak pdbBreak = pdb.set_trace import IPython IPython global IPyBreak IPyBreak = IPyBreak_base sys.excepthook = exceptHook os.environ['PYTHONBREAKPOINT'] = 'GSASIIpath.IPyBreak_base' print ('Debug on: IPython: Exceptions and G2path.IPyBreak(); pdb: G2path.pdbBreak()') except: print ('Debug on failed. IPython not installed?') else: # not in spyder or debug enabled, hide breakpoints os.environ['PYTHONBREAKPOINT'] = '0'
[docs] def TestSPG(): '''Test if pyspg.[so,.pyd] can be run from a location in the existing path Do not modify the path if not. ''' def showVersion(): try: f = os.path.join(os.path.dirname(pyspg.__file__),'GSASIIversion.txt') with open(f,'r') as fp: version = fp.readline().strip() vnum = fp.readline().strip() print(f' Binary ver: {vnum}, {version}') except: if GetConfigValue('debug'): print(' Binaries: undated') try: from . import pyspg pyspg showVersion() return True except ImportError: pass try: import pyspg pyspg except ImportError: return False try: pyspg.sgforpy('P -1') except Exception as err: print(70*'=') print(f'Module pyspg in {pyspg.__file__} could not be run\nerror msg: {err}') print(70*'=') return False showVersion() return True
[docs] def pathhack_TestSPG(fpth): '''Test if pyspg.[so,.pyd] can be run from a specified location. If so modify the path to include it. ''' try: if not os.path.exists(fpth): return False if not glob.glob(os.path.join(fpth,'pyspg.*')): return False except: return False savpath = sys.path[:] sys.path = [fpth] # test to see if a shared library can be used try: import pyspg except ModuleNotFoundError as err: print(70*'=') print(f'Binary module pyspg not found in {fpth!r}\nerror msg: {err}') print(70*'=') sys.path = savpath return False except ImportError as err: print(70*'=') print(f'Module pyspg in {fpth!r} could not be loaded\nerror msg: {err}') print(70*'=') sys.path = savpath return False except Exception as err: print(70*'=') print(f'Error importing module pyspg in {fpth!r}\nerror msg: {err}') print(70*'=') sys.path = savpath return False try: pyspg.sgforpy('P -1') except Exception as err: print(70*'=') print(f'Module pyspg in {fpth!r} could not be run\nerror msg: {err}') print(70*'=') sys.path = savpath return False sys.path = savpath return True
[docs] def SetBinaryPath(showConfigMsg=False): ''' Add location of GSAS-II shared libraries (binaries: .so or .pyd files) to path (when needed). When GSAS-II is installed by pixi, no change in the path is needed. This routine must be executed after GSASIIpath is imported and before any other GSAS-II imports are done, since they may assume binary files are in path :param bool showConfigMsg: When True, config info is shown (default is False) ''' # cache the results of this routine so that repeated calls # only search for binary routines once global BinaryPathLoaded,binaryPath,BinaryPathFailed if BinaryPathLoaded or BinaryPathFailed: return try: intver(np.__version__) except (AttributeError,TypeError): # happens on building docs return try: from GSASII import pypowder pypowder binaryPath = None # special value to indicate that binaries have been installed into package if showConfigMsg: print(f'GSAS-II binaries co-located with GSAS-II: {os.path.dirname(__file__)}') BinaryPathLoaded = True LoadConfig(showConfigMsg) return except ImportError: pass try: from . import pathHacking except ImportError: print('Binary load failed and module pathHacking not present') BinaryPathFailed = True return LoadConfig(showConfigMsg) BinaryPathFailed = pathHacking._path_discovery(showConfigMsg)
[docs] def WriteConfig(configDict): '''Write the configDict information to the GSAS-II ini settings into file ~/.GSASII/config.ini. Called from :func:`GSASIIctrlGUI.SaveConfigVars`. ''' import configparser localdir = os.path.expanduser(os.path.normpath('~/.GSASII')) if not os.path.exists(localdir): try: os.mkdir(localdir) print(f'Created directory {localdir}') except Exception as msg: print(f'Error trying to create directory {localdir}\n{msg}') return True cfgfile = os.path.join(localdir,'config.ini') cfgP = configparser.ConfigParser() if os.path.exists(cfgfile): cfgP.read(cfgfile) # read previous file so other sections are retained cfgP['GUI settings'] = configDict # Write the configuration file with open(cfgfile, 'w') as configfile: cfgP.write(configfile) print(f"Configuration settings saved as {cfgfile}")
[docs] def LoadConfig(printInfo=True): '''Read configuration settings from ~/.GSASII/config.ini, if present. Place the values into global dict configDict. :param bool printInfo: if printInfo is True (default) then a message is shown with the number of settings read (upon startup). ''' def XferConfigIni(): '''copy the contents of the config.py file to file ~/.GSASII/config.ini. This "patch code" used for master->main transition and can eventually be removed. ''' import types configDict = {} try: import config #import config_example as config for i in config.__dict__: if i.startswith('__') and i.endswith('__'): continue if isinstance(config.__dict__[i],types.ModuleType): continue configDict.update({i:str(config.__dict__[i])}) except ImportError: print("New install: start without a config.py file") return except Exception as err: print("Error reading config.py file\n",err) return print(f"Contents of {config.__file__} to be written from config.py...") WriteConfig(configDict) import configparser global configDict configDict = {} cfgfile = os.path.expanduser(os.path.normpath('~/.GSASII/config.ini')) if not os.path.exists(cfgfile): print(f'N.B. Configuration file {cfgfile} does not exist') # patch 2/7/25: transform GSAS-II config.py contents to config.ini XferConfigIni() try: from . import config_example except ImportError: try: import GSASII.config_example as config_example except ImportError as err: print("Error importing config_example.py file\n",err) return # get the original capitalization (lost by configparser) capsDict = {key.lower():key for key in config_example.__dict__ if not key.startswith('__')} try: cfg = configparser.ConfigParser() # Read the configuration file cfg.read(cfgfile) except Exception as err: print("Error reading {cfgfile}\n",err) return # Access values from the configuration file try: cfgG = cfg['GUI settings'] except KeyError: cfgG = {} for key in cfgG: key = key.lower() # not needed... but in case configparser ever changes capKey = capsDict.get(key) if capKey is None: print(f'Item {key} not defined in config_example') continue try: if cfgG[key] == 'None': configDict[capKey] = None elif key.endswith('_pos') or key.endswith('_size'): # list of integers configDict[capKey] = tuple([int(i) for i in cfgG[key].strip('()').split(',')]) elif key.endswith('_location') or key.endswith('_directory') or key.endswith('_exec'): # None (above) or str configDict[capKey] = cfgG.get(key) elif cfgG[key].startswith('[') and cfgG[key].endswith(']'): # list of strings s = cfgG[key].strip('[]') if s == '': res = [] else: res = [i.strip("'").replace(r'\\','\\') for i in s.split(', ')] configDict[capKey] = res elif isinstance(config_example.__dict__[capKey],bool): configDict[capKey] = cfgG.getboolean(key) elif isinstance(config_example.__dict__[capKey],float): configDict[capKey] = cfgG.getfloat(key) elif isinstance(config_example.__dict__[capKey],int): configDict[capKey] = cfgG.getint(key) elif isinstance(config_example.__dict__[capKey],str): configDict[capKey] = cfgG.get(key).replace(r'\\','\\') else: print('*** problem with',type(config_example.__dict__[capKey])) continue except: continue if printInfo: print (f'{len(configDict)} values read from {cfgfile}') # make sure this value is set configDict['Clip_on'] = configDict.get('Clip_on',True)
[docs] def MacRunScript(script): '''Start a bash script in a new terminal window. Used on Mac OS X only. :param str script: file name for a bash script ''' script = os.path.abspath(script) osascript = ''' set bash to "/bin/bash" set filename to "{}" tell application "Terminal" activate do script bash & " " & filename & "; exit" end tell '''.format(script) subprocess.Popen(["osascript","-e",osascript])
#============================================================================== #============================================================================== # conda/pip routines
[docs] def findConda(): '''Determines if GSAS-II has been installed as g2conda or gsas2full with conda located relative to this file. We could also look for conda relative to the python (sys.executable) image, but I don't want to muck around with python that someone else installed. ''' parent = os.path.split(path2GSAS2)[0] if sys.platform != "win32": activate = os.path.join(parent,'bin','activate') conda = os.path.join(parent,'bin','conda') else: activate = os.path.join(parent,'Scripts','activate.bat') conda = os.path.join(parent,'condabin','conda.bat') if os.path.exists(activate) and os.path.exists(conda): return conda,activate else: return None
[docs] def condaTest(requireAPI=False): '''Returns True if it appears that Python is being run under Anaconda Python with conda present. Tests for conda environment vars and that the conda package is installed in the current environment. :returns: True, if running under Conda ''' if not all([(i in os.environ) for i in ('CONDA_DEFAULT_ENV','CONDA_EXE', 'CONDA_PREFIX', 'CONDA_PYTHON_EXE')]): return False if requireAPI: # is the conda package available? try: import conda.cli.python_api conda.cli.python_api except: print('You do not have the conda package installed in this environment', '\nConsider using the "conda install conda" command') return False # There is no foolproof way to check if someone activates conda # but then calls a different Python using its path... # ...If we are in the base environment then the conda Python # should be the same path as the one currently being run: if os.environ['CONDA_DEFAULT_ENV'] == 'base': try: if os.path.samefile(os.environ['CONDA_PYTHON_EXE'], sys.executable): return True except: return False # ...If not in the base environment, what we can do is check if the # python we are running in shares the beginning part of its path with # the one in the base installation: dir1 = os.path.dirname(os.environ['CONDA_PYTHON_EXE']) dir2 = os.path.dirname(sys.executable) if sys.platform != "win32": # python in .../bin/.. dir1 = os.path.dirname(dir1) dir2 = os.path.dirname(dir2) return commonPath(dir1,dir2)
[docs] def condaInstall(packageList): '''Installs one or more packages using the anaconda conda package manager. Can be used to install multiple packages and optionally use channels. :param list packageList: a list of strings with name(s) of packages and optionally conda options. Examples:: packageList=['gsl'] packageList=['-c','conda-forge','wxpython'] packageList=['numpy','scipy','matplotlib'] :returns: None if the the command ran normally, or an error message if it did not. ''' try: import conda.cli.python_api except: print('You do not have the conda package installed in this environment', '\nConsider using the "conda install conda" command') return None try: print(f'Preparing to install package(s): {" ,".join(packageList)}'+ '\nThis can take a while') # the next line works, but the subsequent cli is considered more stable #conda.cli.main('install', '-y', *packageList) # this is considered to be supported in the long term (out, err, rc) = conda.cli.python_api.run_command( conda.cli.python_api.Commands.INSTALL,packageList, # search_path=('conda-forge'), # broken! # use_exception_handler=True#, stdout=sys.stdout, stderr=sys.stderr) #print('rc=',rc) print('Ran conda. output follows...') print(70*'='+'\n'+out+'\n'+70*'=') #print('err=',err) if rc != 0: return str(out) except Exception as msg: print(f"\nConda error occurred, see below\n{msg}") return "error occurred" return None
[docs] def fullsplit(fil,prev=None): '''recursive routine to split all levels of directory names ''' if prev is None: # first call: normalize and drop file name fil = os.path.normcase(os.path.abspath(os.path.dirname(fil))) prev = [] i,j = os.path.split(fil) if j: prev.insert(0,j) out = fullsplit(i,prev) else: return [i]+prev return out
[docs] def commonPath(dir1,dir2): '''Check if two directories share a path. Note that paths are considered the same if either directory is a subdirectory of the other, but not if they are in different subdirectories /a/b/c shares a path with /a/b/c/d but /a/b/c/d and /a/b/c/e do not. :returns: True if the paths are common ''' for i,j in zip(fullsplit(dir1),fullsplit(dir2)): if i != j: return False return True
[docs] def pipInstall(packageList): '''Installs one or more packages using the pip package installer. Use of this should be avoided if conda can be used (see :func:`condaTest` to test for conda). Can be used to install multiple packages together. One can use pip options, but this is probably not needed. :param list packageList: a list of strings with name(s) of packages Examples:: packageList=['gsl'] packageList=['wxpython','matplotlib','scipy'] packageList=[r'\\Mac\\Home\\Scratch\\wheels\\pygsl-2.3.3-py3-none-any.whl'] packageList=['z:/Scratch/wheels/pygsl-2.3.3-py3-none-any.whl'] :returns: None if the the command ran normally, or an error message if it did not. ''' # update conda package specs (pkg=1.2.3) to pip package specs (pkg==1.2.3) # at present no other specifiers are used. for i,val in enumerate(packageList): if '=' in val and '==' not in val: packageList[i] = packageList[i].replace('=','==') try: subprocess.check_call([sys.executable, '-m', 'pip', 'install']+packageList) except Exception as msg: return msg return None
[docs] def condaEnvCreate(envname, packageList, force=False): '''Create a Python interpreter in a new conda environment. Use this when there is a potential conflict between packages and it would be better to keep the packages separate (which is one of the reasons conda supports environments). Note that conda should be run from the base environment; this attempts to deal with issues if it is not. Currently, this is used only to install diffpy.PDFfit2. :param str envname: the name of the environment to be created. If the environment exists, it will be overwritten only if force is True. :param list packageList: a list of conda install create command options, such as:: ['python=3.7', 'conda', 'gsl', 'diffpy.pdffit2', '-c', 'conda-forge', '-c', 'diffpy'] :param bool force: if False (default) an error will be generated if an environment exists :returns: (status,msg) where status is True if an error occurs and msg is a string with error information if status is True or the location of the newly-created Python interpreter. ''' if not all([(i in os.environ) for i in ('CONDA_DEFAULT_ENV', 'CONDA_EXE', 'CONDA_PREFIX', 'CONDA_PYTHON_EXE')]): p = sys.exec_prefix else: # workaround for bug that avoids nesting packages if running from an # environment (see https://github.com/conda/conda/issues/11493) p = os.path.dirname(os.path.dirname(os.environ['CONDA_EXE'])) try: import conda.cli.python_api except: return True,'conda package not available (in environment)' if not os.path.exists(os.path.join(p,'envs')): msg = ('Error derived installation path not found: '+ os.path.join(p,'envs')) print(msg) return True,msg newenv = os.path.join(p,'envs',envname) if os.path.exists(newenv) and not force: msg = 'path '+newenv+' already exists and force is not set, aborting' print(msg) return True,msg pathList = ['-p',newenv] try: (out, err, rc) = conda.cli.python_api.run_command( conda.cli.python_api.Commands.CREATE, packageList + pathList, use_exception_handler=True) # ,stdout=sys.stdout, stderr=sys.stderr) print(out) if rc != 0: print(err) return True,str(out+err) if sys.platform == "win32": newpython = os.path.join(newenv,'python.exe') else: newpython = os.path.join(newenv,'bin','python') if os.path.exists(newpython): return False,newpython return True,'Unexpected, '+newpython+' not found' except Exception as msg: print("Error occurred, see below\n",msg) return True,'Error: '+str(msg)
[docs] def addCondaPkg(): '''Install the conda API into the current conda environment using the command line, so that the API can be used in the current Python interpreter Attempts to do this without a shell failed on the Mac because it seems that the environment was inherited; seems to work w/o shell on Windows. ''' if not all([(i in os.environ) for i in ('CONDA_DEFAULT_ENV','CONDA_EXE', 'CONDA_PREFIX', 'CONDA_PYTHON_EXE')]): return None #condaexe = os.environ['CONDA_EXE'] currenv = os.environ['CONDA_DEFAULT_ENV'] if sys.platform == "win32": cmd = [os.environ['CONDA_EXE'],'install','conda','-n',currenv,'-y'] with subprocess.Popen(cmd, #stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='UTF-8') as p: out,err = p.communicate() else: script = 'source ' + os.path.join( os.path.dirname(os.environ['CONDA_PYTHON_EXE']), 'activate') + ' base; ' script += 'conda install conda -n '+currenv+' -y' with subprocess.Popen(script,shell=True,env={}, #stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='UTF-8') as p: out,err = p.communicate() if out is not None and GetConfigValue('debug'): print('Output from adding conda:\n',out) if err and err is not None: print('Note error/warning from running conda:\n',err) if currenv == "base": print('\nUnexpected action: adding conda to base environment???')
#============================================================================== #============================================================================== # routines for reorg of GSAS-II directory layout
[docs] def getIconFile(imgfile): '''Looks in either the main GSAS-II install location (old) or subdirectory icons (after reorg) for an icon :returns: the full path for the icon file ''' if os.path.exists(os.path.join(path2GSAS2,'icons',imgfile)): return os.path.join(path2GSAS2,'icons',imgfile) if os.path.exists(os.path.join(path2GSAS2,imgfile)): # patch 3/2024 for svn dir organization return os.path.join(path2GSAS2,imgfile) print(f'getIconFile Warning: file {imgfile} not found') return None
#============================================================================== #==============================================================================
[docs] def makeScriptShortcut(): '''Creates a shortcut to GSAS-II in the current Python installation so that "import G2script" (or "import G2script as GSASIIscripting") can be used without having to add GSASII to the path. The new shortcut is then tested. :returns: returns the name of the created file if successful. None indicates an error. ''' import datetime as dt if not HowIsG2Installed().startswith('git'): print('GSAS-II installed directly, shortcut likely not needed') return None for p in sys.path: if 'site-packages' in p: break else: print('No site-packages directory found in Python path') return newfil = os.path.join(p,'G2script.py') with open(newfil,'w') as fp: fp.write(f'#Created in makeScriptShortcut from {__file__}') fp.write(dt.datetime.strftime(dt.datetime.now(), " at %Y-%m-%dT%H:%M\n")) fp.write(f""" import sys,os Path2GSASII=r'{path2GSAS2}' if os.path.exists(os.path.join(Path2GSASII,'GSASIIscriptable.py')): print('setting up GSASIIscriptable from',Path2GSASII) if os.path.dirname(Path2GSASII) not in sys.path: sys.path.insert(0,os.path.dirname(Path2GSASII)) try: from GSASII.GSASIIscriptable import * except: print('Import of GSASIIscriptable failed.\\nRerun "Install GSASIIscriptable shortcut" from inside GSAS-II?') sys.exit() else: print('GSASIIscriptable not found in ',Path2GSASII) print('Rerun "Install GSASIIscriptable shortcut" from inside GSAS-II') sys.exit() """) fp.close() print('Created file',newfil) try: import G2script G2script except ImportError: print('Unexpected error: import of G2script failed!') return return newfil
# see if a directory for local modifications is defined. If so, stick that in the path if os.path.exists(os.path.expanduser('~/.G2local/')): sys.path.insert(0,os.path.expanduser('~/.G2local/')) fl = glob.glob(os.path.expanduser('~/.G2local/GSASII*.py*')) files = "" prev = None for f in sorted(fl): # make a list of files, dropping .pyc files where a .py exists f = os.path.split(f)[1] if os.path.splitext(f)[0] == prev: continue prev = os.path.splitext(f)[0] if files: files += ", " files += f if files: print("*"*75) print("Warning: the following source files are locally overridden in "+os.path.expanduser('~/.G2local/')) print(" "+files) print("*"*75) BinaryPathFailed = False BinaryPathLoaded = False binaryPath = '' IPyBreak = DoNothing pdbBreak = DoNothing
[docs] def postURL(URL,postdict,getcookie=None,usecookie=None, timeout=None,retry=2,mode='get'): '''Posts a set of values as from a web form using the "get" or "post" protocols. If access fails to an https site, the access is retried with http. :param str URL: the URL to post; typically something like 'http://www.../dir/page?' :param dict postdict: contains keywords and values, such as {'centrosymmetry': '0', 'crystalsystem': '0', ...} :param dict getcookie: dict to save cookies created in call, or None (default) if not needed. :param dict usecookie: dict containing cookies to be used in call, or None (default) if not needed. :param int timeout: specifies a timeout period for the get or post (default is None, which means the timeout period is set by the server). The value when specified is the time in seconds to wait before giving up on the request. :param int retry: the number of times to retry the request, if it times out. This is only used if timeout is specified. The default is 2. Note that if retry is left at the default value (2), The timeout is increased by 25% for the second try. :param str mode: either 'get' (default) or 'post'. Determines how the request will be submitted. :returns: a string with the response from the web server or None if access fails. ''' try: import requests # delay this until now, since rarely needed except: # this import seems to fail with the Anaconda pythonw on # macs; it should not! print('Warning: failed to import requests. Python config error') return None if mode == 'get': reqopt = requests.get else: reqopt = requests.post repeat = True count = 0 while repeat: count += 1 r = None repeat = False try: if timeout is not None: r = reqopt(URL,params=postdict,cookies=usecookie, timeout=timeout) else: r = reqopt(URL,params=postdict,cookies=usecookie) if r.status_code == 200: if GetConfigValue('debug'): print('request OK') page = r.text if getcookie is not None: getcookie.update(r.cookies) return page # success else: print('request to {} failed. Reason={}'.format(URL,r.reason)) except requests.exceptions.ConnectionError as msg: if 'time' in str(msg) and 'out' in str(msg): print(f'server timeout accessing {URL}') if GetConfigValue('debug'): print('full error=',msg) if timeout is not None and count < retry: if retry == 2: timeout *= 1.25 print(f'retry with timout={timeout} sec') repeat = True else: print('connection error - not on internet?') if URL.startswith('https:'): print('Retry with http://') repeat = True URL = URL.replace('https:','http:') except requests.exceptions.Timeout as msg: print(f'timeout accessing {URL}') if GetConfigValue('debug'): print('full error=',msg) if timeout is not None and count < retry: if retry == 2: timeout *= 1.25 print(f'retry with timout={timeout} sec') repeat = True if timeout is not None and count <= retry: repeat = True except requests.exceptions.ReadTimeout as msg: print(f'timeout reading from {URL}') if GetConfigValue('debug'): print('full error=',msg) if timeout is not None and count < retry: if retry == 2: timeout *= 1.25 print(f'retry with timout={timeout} sec') repeat = True except requests.exceptions.ConnectTimeout: print(f'timeout accessing {URL}') except Exception as msg: # other error print(f'Error accessing {URL}') if GetConfigValue('debug'): print(msg) finally: if r: r.close() else: return None
#=========================================================================== # duplicated from GSASIIfiles to avoid using an import below # perhaps the references to G2fil.openInNewTerm() should be # changed to reference this here.
[docs] def openInNewTerm(project=None,g2script=None,pythonapp=sys.executable): '''Open a new and independent GSAS-II session in separate terminal or console window and as a separate process that will continue even if the calling process exits. Intended to work on all platforms. This could be used to run other scripts inside python other than GSAS-II :param str project: the name of an optional parameter to be passed to the script (usually a .gpx file to be opened in a new GSAS-II session) :param str g2script: the script to be run. If None (default) the G2.py file in the same directory as this file will be used. :param str pythonapp: the Python interpreter to be used. Defaults to sys.executable which is usually what is wanted. :param str terminal: a name for a preferred terminal emulator ''' #import subprocess if g2script is None: g2script = os.path.join(os.path.dirname(__file__),'G2.py') if sys.platform == "darwin": if project: script = f''' set python to "{pythonapp}" set appwithpath to "{g2script}" set filename to "{project}" set filename to the quoted form of the POSIX path of filename tell application "Terminal" activate do script python & " " & appwithpath & " " & filename & "; exit" end tell ''' else: script = f''' set python to "{pythonapp}" set appwithpath to "{g2script}" tell application "Terminal" activate do script python & " " & appwithpath & " " & "; exit" end tell ''' subprocess.Popen(["osascript","-e",script]) elif sys.platform.startswith("win"): cmds = [pythonapp, g2script] if project: cmds += [project] subprocess.Popen(cmds,creationflags=subprocess.CREATE_NEW_CONSOLE) else: import shutil script = '' # try a bunch of common terminal emulators in Linux # there does not appear to be a good way to way to specify this # perhaps this should be a GSAS-II config option for term in ("lxterminal", "gnome-terminal", 'konsole', "xterm", "terminator", "terminology", "tilix"): try: found = shutil.which(term) if not found: continue except AttributeError: print(f'shutil.which() failed (why?); assuming {term} present') found = True if term == "gnome-terminal": #terminal = 'gnome-terminal -t "GSAS-II console" --' cmds = [term,'--title','"GSAS-II console"','--'] script = "echo; echo Press Enter to close window; read line" break elif term == "lxterminal": #terminal = 'lxterminal -t "GSAS-II console" -e' cmds = [term,'-t','"GSAS-II console"','-e'] script = "echo;echo Press Enter to close window; read line" break elif term == "xterm": #terminal = 'xterm -title "GSAS-II console" -hold -e' cmds = [term,'-title','"GSAS-II console"','-hold','-e'] script = "echo; echo This window can now be closed" break elif term == "terminator": cmds = [term,'-T','"GSAS-II console"','-x'] script = "echo;echo Press Enter to close window; read line" break elif term == "konsole": cmds = [term,'-p','tabtitle="GSAS-II console"','--hold','-e'] script = "echo; echo This window can now be closed" break elif term == "tilix": cmds = [term,'-t','"GSAS-II console"','-e'] script = "echo;echo Press Enter to close window; read line" break elif term == "terminology": cmds = [term,'-T="GSAS-II console"','--hold','-e'] script = "echo; echo This window can now be closed" break else: print("No known terminal was found to use, Can't start {}") return fil = '/tmp/GSAS2-launch.sh' cmds += ['/bin/sh',fil] fp = open(fil,'w') if project: fp.write(f"{pythonapp} {g2script} {project}\n") else: fp.write(f"{pythonapp} {g2script}\n") fp.write(f"rm {fil}\n") if script: fp.write(f"{script}\n") fp.close() subprocess.Popen(cmds,start_new_session=True)
if __name__ == '__main__': '''What follows is called to update (or downdate) GSAS-II in a separate process. ''' # check what type of update is being called for import git gitUpdate = False preupdateType = None updateType = None regressversion = None help = False project = None for arg in sys.argv[1:]: if '--git-fetch' in arg: # pulls latest updates from server but does not apply them if preupdateType or updateType: print(f'previous option conflicts with {arg}') help = True break updateType = 'fetch' elif '--github-tags' in arg: # gets latest tags from github if preupdateType or updateType or gitUpdate: print(f'previous option conflicts with {arg}') help = True break updateType = 'tags' elif '--git-reset' in arg: # restores locally changed GSAS-II files to distributed versions also updates gitUpdate = True if preupdateType: print(f'previous option conflicts with {arg}') help = True break preupdateType = 'reset' elif '--git-stash' in arg: # saves locally changed GSAS-II files in "stash" argsplit = arg.split('=') if len(argsplit) == 1: message = None elif len(argsplit) == 2: message=argsplit[1] if message.startswith('"') and message.endswith('"'): message = message.strip('"') if message.startswith("'") and message.endswith("'"): message = message.strip("'") message = message.replace('"',"'") # double quote not allowed else: print('invalid form for --git-stash') help = True break gitUpdate = True if preupdateType: print(f'previous option conflicts with {arg}') help = True break preupdateType = 'stash' elif '--git-update' in arg: # update to latest downloaded version gitUpdate = True if updateType: print(f'previous option conflicts with {arg}') help = True break updateType = 'update' elif '--git-regress' in arg: argsplit = arg.split('=') if len(argsplit) != 2: print('invalid form for --git-regress') help = True break gitversion = argsplit[1] # make sure the version or tag supplied is valid and convert to # a full sha hash g2repo = openGitRepo(path2GSAS2) try: regressversion = g2repo.commit(gitversion).hexsha except git.BadName: print(f'invalid version specified ({regressversion}) for GitHub regression') help = True break if updateType: print(f'previous option conflicts with {arg}') help = True break updateType = 'regress' gitUpdate = True elif '--help' in arg: help = True break elif os.path.exists(arg): # this is just checking project = arg pass else: print(f'unknown arg {arg}') help = True if help or len(sys.argv) == 1: print('''Options when running GSASIIpath.py standalone to update/regress repository from git repository: python GSASIIpath.py option <project> where option will be one or more of the following: --git-fetch downloads lastest changes from repo any other options will be ignored --git-stash="message" saves local changes --git-reset discards local changes --git-update --git-regress=version --github-tags saves most recent tag info from GitHub and where <project> is an optional path reference to a .gpx file Note: --git-reset and --git-stash cannot be used together. Likewise --git-update and --git-regress cannot be used together. However either --git-reset or --git-stash can be used with either --git-update or --git-regress. --git-fetch cannot be used with any other options. --github-tags cannot be used with any other options. ''') sys.exit() if updateType == 'tags': # get the most recent tag numbers from the GitHub site. Do this # via GitHub when git access is not available. Use here # allows this to be done in the background. import requests url='https://github.com/AdvancedPhotonSource/GSAS-II/tags' releases = requests.get(url=url) taglist = [tag.split('"')[0] for tag in releases.text.split('AdvancedPhotonSource/GSAS-II/releases/tag/')[1:]] lastver = sorted([t for t in taglist if 'v' in t])[-1] lastnum = sorted([t for t in taglist if 'v' not in t],key=int)[-1] #print('tags=',lastver,lastnum) # add tag info to config file import configparser cfgfile = os.path.expanduser(os.path.normpath('~/.GSASII/config.ini')) cfg = configparser.ConfigParser() cfg.read(cfgfile) if 'version info' not in cfg: cfg.add_section('version info') cfg['version info'].update( {'lastVersionTag':lastver,'lastVersionNumber':lastnum}) with open(cfgfile, 'w') as configfile: cfg.write(configfile) sys.exit() if updateType == 'fetch': # download the latest updates from GitHub to the local repository # in background while GSAS-II runs no updates are applied. # invoked by gitGetUpdate(mode='Background') logfile = os.path.join(os.path.expanduser('~'),'GSASII_bkgUpdate.log') mode = 'a' # don't let log file get too large (20K bytes) if os.path.exists(logfile) and os.path.getsize(logfile) > 20000: mode = 'w' # if file open fails, there is probably a concurent update process try: fp = open(logfile,mode) except: print('background git update was unable to open log file') sys.exit() try: import git except: fp.write('Git background import failed') fp.write(dt.datetime.strftime(dt.datetime.now(), " at %Y-%m-%dT%H:%M\n")) fp.close() sys.exit() try: g2repo = openGitRepo(path2GSAS2) prevremotecommits = None if not g2repo.head.is_detached: head = g2repo.head.ref tracking = head.tracking_branch() prevremotecommits = [i.hexsha for i in head.commit.iter_items(g2repo, f'{head.path}..{tracking.path}')] g2repo.remote().fetch() if not g2repo.head.is_detached: head = g2repo.head.ref tracking = head.tracking_branch() remotecommits = [i.hexsha for i in head.commit.iter_items(g2repo, f'{head.path}..{tracking.path}')] new = len(remotecommits)-len(prevremotecommits) msg = f'{new} new update(s) found and downloaded, so {len(remotecommits)} total are available to install' #fp.write(f'{msg}\n') if new > 0: print(msg) except Exception as msg: fp.write('Background git update message') fp.write(dt.datetime.strftime(dt.datetime.now(), " at %Y-%m-%dT%H:%M\n")) fp.write(f'Update failed with message {msg}\n') if g2repo.head.is_detached: fp.write('Background git update message') fp.write(dt.datetime.strftime(dt.datetime.now(), " at %Y-%m-%dT%H:%M\n")) fp.write('Status: reverted to an old install\n') # else: # try: # rc,lc,_ = gitCheckForUpdates(False,g2repo) # if len(rc) == 0: # fp.write('Status: no unapplied commits\n') # else: # fp.write(f'Status: unapplied commits now {len(rc)}\n') # except Exception as msg: # fp.write(f'\ngitCheckForUpdates failed with message {msg}\n') # fp.write('update done at') # fp.write(dt.datetime.strftime(dt.datetime.now(), # " at %Y-%m-%dT%H:%M\n\n")) fp.close() sys.exit() if gitUpdate: import time time.sleep(1) # delay to give the main process a chance to exit # so we don't change code for a running process # windows does not like that try: import git except: print('git import failed') sys.exit() try: g2repo = openGitRepo(path2GSAS2) except Exception as msg: print(f'Update failed with message {msg}\n') sys.exit() print('git repo opened') if preupdateType == 'reset': # --git-reset (preupdateType = 'reset') print('Restoring locally-updated GSAS-II files to original status') openGitRepo(path2GSAS2).git.reset('--hard','origin/main') try: if g2repo.active_branch.name != 'main': g2repo.git.switch('main') except TypeError: # fails on detached head pass elif preupdateType == 'stash': # --git-stash (preupdateType = 'stash') print('Stashing locally-updated GSAS-II files') if message: g2repo.git.stash(f'-m"{message}"') else: g2repo.git.stash() # Update to the latest GSAS-II version. This assumes that a fetch has # been done prior, or this will only update to the last time that # it was done. if updateType == 'update': # --git-update (updateType = 'update') if g2repo.is_dirty(): print('Cannot update a directory with locally-made changes') sys.exit() print('Updating to latest GSAS-II version') if g2repo.head.is_detached: g2repo.git.switch('main') g2repo.git.merge('--ff-only') print('git: updated to latest version') # Update or regress to a specific GSAS-II version. # this will always cause a "detached head" status elif updateType == 'regress': # --git-regress (updateType = 'regress') if g2repo.is_dirty(): print('Cannot regress a directory with locally-made changes') sys.exit() print(f'Regressing to git version {regressversion[:6]}') g2repo.git.checkout(regressversion) if gitUpdate: # path hack for restart, when needed #import importlib.util #try: # importlib.util.find_spec('GSASII.GSASIIGUI') #except ModuleNotFoundError: # print('Adding GSAS-II location to Python system path') # sys.path.insert(0,os.path.dirname(os.path.dirname(__file__))) # now restart GSAS-II with the new version # G2scrpt = os.path.join(path2GSAS2,'G2.py') if project: print(f"Restart GSAS-II with project file {project!r}") else: print("Restart GSAS-II without a project file ") #from . import GSASIIfiles #GSASIIfiles.openInNewTerm(project) openInNewTerm(project) print ('exiting update process') sys.exit()