""" Little python code that implements the machine learning kernel rigde regression (KKR) algorithm. Authors: Alejandro Perez Paz. Frid Sep 23, 2016. Ask Hjorth Larsen , 2016. """ from __future__ import print_function import os import ase import math import json as js import gzip as gz import numpy as np import datetime as dt import orcaparse as op from glob import glob from time import time from contextlib import contextmanager from ase.io import read from readxyz import iread_xyz @contextmanager def timer(name): """ count time on a given context :param name: context name,name of a funtion, method, or enviroment :type name: string :return: None :rtype: None """ tstart = time() yield tstop = time() print('Time {}: {}s'.format(name, tstop - tstart)) def gen_random_index(nsystems, ntestsystems, seed=0): """ Generate random indices to be use as test systems :param nsystems: total number of systems :type nsystems: int :param ntestsystems: total numbers os test system :type ntestsystems: int :param seed: randomness of the systems :type seed: int :return: a list of indices :rtype: int array """ gen = np.random.RandomState(0) assert ntestsystems < nsystems test_indices = gen.choice(range(nsystems), size=ntestsystems, replace=False) return test_indices def random_choice(systems, refs, ntestsystems, seed=0): """ separate learn_systems from test_systems on :param systems: complete systems group :type systems: ase.Atoms :param refs: array with the data use to learn :type refs: array, floats, same dimention as systems :param ntestsystems: numbers of test systems :type ntestsystems: int :param seed: randomnes of the selected system :type seed: int :return: learn_systems, learn_references, test_systems, test_references, test_indices :rtype: learn_systems and test_systems, ase.Atoms array learn_references and test_references, same as refs array test_indices int array """ nsystems = len(systems) test_indices = gen_random_index(nsystems, ntestsystems, seed) testset = set(test_indices) learn_systems = [] learn_references = [] test_systems = [] test_references = [] for i, (system, ref) in enumerate(zip(systems, refs)): if i in testset: test_systems.append(system) test_references.append(ref) else: learn_systems.append(system) learn_references.append(ref) return learn_systems, learn_references, \ test_systems, test_references, test_indices def coulomb_matrix(atoms): """ compute the coulumb matrix of a atom :param atoms: atom object :type atoms: ase.Atoms :return: numpy.matrix :rtype: numpy.matrix float """ natoms = len(atoms) C = np.zeros((natoms, natoms), dtype=float) Z = atoms.get_atomic_numbers() r = np.array(atoms.get_positions()) # calculate Rupp's coulomb matrix: do only the lower triangular part for i in range(natoms): C[i][i] = 0.5 * Z[i]**2.4 for j in range(i): C[i][j] = Z[i]*Z[j]/np.linalg.norm(r[i]-r[j]) C = C + C.T - np.diag(np.diag(C)) Cf = C.ravel() return Cf def sorted_coulomb_matrix(atoms): """ compute the sorted coulumb matrix using the norm2 of each row and then re arrange the atoms from high value to low value :param atoms: atom object :type atoms: ase.Atom :return: numpy.matrix :rtype: numpy.matrix float """ cm = coulomb_matrix(atoms) N = len(atoms) cm = cm.reshape((N, N)) norm2s = -cm.sum(axis=1)**2 args = np.argsort(norm2s) atoms = atoms[args] cm = coulomb_matrix(atoms) return cm def diagonalize_coulumb_matrix(atoms): N = len(atoms) cm = coulomb_matrix(atoms) v, dcm = np.linalg.eigh(cm.reshape((N, N))) return dcm.ravel() def read22k(nsystems=None, data_root_dir=None): """ read data from a file, probably this will be remove in the future :param nsystems: how many systems do you want to read :param data_root_dir: :return: systems array :rtype: ase.Atoms """ if data_root_dir is None: thisdir = os.path.dirname(__file__) filepath = os.path.join(thisdir, '../data/XYZ_B3LYP_631G2dfp.dat') else: filepath = data_root_dir + '/' + 'XYZ_B3LYP_631G2dfp.dat' with open(filepath) as fd: xyz = iread_xyz(fd) systems = [] if nsystems is None: systems = list(xyz) else: for i in range(nsystems): systems.append(next(xyz)) return systems def read22k_energy(nsystems=None): """ read the energy values from the dataset, probably it will remove in the future :param nsystems: number of systems to read :type nsystems: :return: numpy.array :rtype: """ headers = ('Index E1-CC2 E2-CC2 f1-CC2 f2-CC2' ' E1-PBE0 E2-PBE0 f1-PBE0 f2-PBE0' ' E1-PBE0 E2-PBE0 f1-PBE0 f2-PBE0' ' E1-CAM E2-CAM f1-CAM f2-CAM').split() all_energies = {} with open('22k-testset/gdb8_22k_elec_spec.txt') as fd, timer('read energy'): for line in fd: if not line.lstrip().startswith('#'): break numbers = [] if nsystems is None: numbers = [line.split() for line in fd] else: for i in range(nsystems): numbers.append(line.split()) line = next(fd) numbers = np.array(numbers).astype(float).T.copy() assert numbers.flags.contiguous for header, array in zip(headers, numbers): all_energies[header] = array.astype(float) return all_energies def read22k_all(nsystems=None): """ this wil read systems and energies values from the dataset, it will be remove in the future :param nsystems: number of systems to read :type nsystems: int :return: a system array and a energies array :rtype: ase.Atom, numpy.array(float) """ return read22k(nsystems), read22k_energy(nsystems) def read_orca22kall(nsystems=None, index=None, xyz=True, theory="PBE0", data_root_dir="" ): """ read the orca calculations, use the orcaparse.py to do the heavy part :param nsystems: :param index: :param xyz: :param theory: :param data_root_dir: :return: """ all_gs = [] all_absspec = [] systems = [] if index is None: index = gen_random_index(21785, nsystems) for i in index: filepath = data_root_dir + "/{}/{:05}/{:05}".format(theory, i, i) fd = open(filepath + ".out") gs, absspec = op.parse(fd) if xyz: system = read22k(1, filepath + ".xyz")[0] systems.append(system) all_gs.append(gs) all_absspec.append(absspec) return systems, all_gs, all_absspec, index def read_orca22json(nsystems=None, index=None, xyz=True, theory="pbe0", energies=True, hlgap=True, data_root_dir=""): """ read the orca calculations, use the orcaparse.py to do the heavy part :param nsystems: :param index: :param xyz: :param theory: :param energies: :param hlgap: :param data_root_dir: :return: """ if index is None: index = gen_random_index(21785, nsystems, seed=dt.datetime.now().microsecond) data = {} if energies: with gz.open(data_root_dir + "/split/orca-{}.absspec.e_1_eV.json.gz".format(theory)) \ as fd: data['exe'] = np.array(js.load(fd))[index] else: with gz.open(data_root_dir + "/split/orca-{}.absspec.f_1.json.gz".format(theory)) \ as fd: data['fosc'] = np.array(js.load(fd))[index] if hlgap: with gz.open(data_root_dir + "/split/orca-{}.gs.homolumogap.json.gz".format(theory)) \ as fd: data['hlgap'] = np.array(js.load(fd))[index] # data = js.load(open(data_root_dir + "/{}.json".format(theory))) # data = np.array(data, object) # data = data[index] if xyz: systems = np.array(read22k(None, data_root_dir)) systems = systems[index].tolist() else: systems = [] return systems, data, index def distmatrix2kernel(*distmatrix): """ compute exp of the kernel rigde regression :param distmatrix: distance matrix :type distmatrix: np.array[float] :return: numpy.array(float) :rtype: numpy.array(float), one dimention """ exp = 1 for dist in distmatrix: if isinstance(dist, tuple): for d in dist: # exp *= np.exp(-0.5 * d) exp *= np.exp(-d) return exp else: exp = np.exp(-dist) # exp = np.exp(-0.5 * dist) return exp def get_sigma(d): """width for the Laplace/Gaussian? kernel. Recommended value: max{|Di-Dj|}/log(2).""" N = len(d) L1norm = [] # do only the lower triangular part for i in range(N): for j in range(i): L1norm.append(np.linalg.norm((d[i] - d[j]), ord=1)) sigma = np.max(L1norm)/np.log(2.0) return sigma # ---------------------------Laplace Kernel------------------------------------- def laplace_kernel_all(descriptors, s_sigma, e_sigma): """ Define Laplace kernel matrix K (square NxN matrix). Uses molecular descriptors of the training set :param descriptors: obje with all descriptors :type descriptors: GDesc :param s_sigma: width of the kernel :type s_sigma: int, float :param e_sigma: width of the kernel, not used if L1 norm is use to calculate distances :type e_sigma: int, float :return: kernel matrix :rtype: numpy.matrix """ nsys = descriptors.nsystems K = np.zeros((nsys, nsys)) for i in range(nsys): s, e = descriptors.dists_l1(i, i, nsys) if s_sigma is not None and e_sigma is not None: K[i:, i] = distmatrix2kernel((s / s_sigma + e / e_sigma)) else: if s_sigma is not None: K[i:, i] = distmatrix2kernel(s / s_sigma) else: K[i:, i] = distmatrix2kernel(e / e_sigma) K[i, i] = 1 # symmetrize K = K + K.T - np.diag(np.diag(K)) print("kernel") print(K) return K def dist_l1_d1_d2(desc1, desc2, s_sigma, e_sigma): """ compute the distance between two descriptors, L1 norm :param desc1: base descriptor :type desc1: GDesc :param desc2: new descriptor :type desc2: GDesc :param s_sigma: :param e_sigma: :return: distance matrix or array, depending of the dimentions od the descriptors :rtype: numpy.array, numpy.matrix """ s_dist, e_dist = desc1.dists_v_l1(desc2) if s_sigma is not None: s_dist /= s_sigma if e_sigma is not None: e_dist /= e_sigma print("distances") print(s_dist) return s_dist, e_dist def predict_v_l1(learn_desc, test_desc, coeffs, s_sigma, e_sigma): """ prediction funtion, use L1 norm to calculate distance between descriptors :param learn_desc: base descriptor :type learn_desc: GDesc :param test_desc: new descriptor :type test_desc: GDesc :param coeffs: base coeficient extract from the learning process :type coeffs: numpy.array[float] :param s_sigma: width of the lernel :type s_sigma: int, float :param e_sigma: not use with this norm, needed for compativility reason :type e_sigma: int, float :return: predicte values :rtype: float, int """ s_dist, e_dist = \ dist_l1_d1_d2(learn_desc, test_desc, s_sigma, e_sigma) print("distances-exp()") dis_m = distmatrix2kernel((s_dist + e_dist)) print(dis_m) return [np.dot(dis_m, coeffs)] # ---------------------------Gaussian Kernel------------------------------------ def gausian_kernel_all(descriptors, s_sigma, e_sigma): """ Define Gaussian kernel matrix K (square NxN matrix). Uses molecular descriptors of the training set :param descriptors: obje with all descriptors :type descriptors: GDesc :param s_sigma: width of the kernel :type s_sigma: int, float :param e_sigma: width of the kernel, not used if L1 norm is use to calculate distances :type e_sigma: int, float :return: kernel matrix :rtype: numpy.matrix """ nsys = descriptors.nsystems K = np.zeros((nsys, nsys)) for i in range(nsys): s, e = descriptors.dists_l2(i, i, nsys) if s_sigma is not None and e_sigma is not None: K[i:, i] = distmatrix2kernel((s/s_sigma, e/e_sigma)) else: if s_sigma is not None: K[i:, i] = distmatrix2kernel(s/s_sigma) else: K[i:, i] = distmatrix2kernel(e/e_sigma) K[i, i] = 1 # symmetrize K = K + K.T - np.diag(np.diag(K)) return K def dist_l2_d1_d2(desc1, desc2, s_sigma=1, e_sigma=1): """ compute the distance between two descriptors, L2 norm :param desc1: base descriptor :type desc1: GDesc :param desc2: new descriptor :type desc2: GDesc :param s_sigma: :param e_sigma: :return: distance matrix or array, depending of the dimentions od the descriptors :rtype: numpy.array, numpy.matrix """ s_dist, e_dist = desc1.dists_v_l2(desc2) if s_sigma is not None and e_sigma is not None: return s_dist / s_sigma, e_dist / e_sigma else: if s_sigma is not None: return s_dist / s_sigma, e_dist else: return s_dist, e_dist / e_sigma def predict_v_l2(learn_desc, test_desc, coeffs, s_sigma, e_sigma): """ prediction funtion, use L2 norm to calculate distance between descriptors :param learn_desc: base descriptor :type learn_desc: GDesc :param test_desc: new descriptor :type test_desc: GDesc :param coeffs: base coeficient extract from the learning process :type coeffs: numpy.array[float] :param s_sigma: width of the lernel :type s_sigma: int, float :param e_sigma: not use with this norm, needed for compativility reason :type e_sigma: int, float :return: predicte values :rtype: float, int """ # dist = dist_l2_d1_d2(learn_desc, test_desc, s_sigma, e_sigma) # values = [] # for s in dist: # dis_m = distmatrix2kernel(s) # dis_m *= coeffs # val = np.sum(dis_m, axis=0) # values.append(val) s_dist, e_dist = dist_l2_d1_d2(learn_desc, test_desc, s_sigma, e_sigma) print("distances-exp()") dis_m = distmatrix2kernel((s_dist, e_dist)) print(dis_m) return [np.dot(dis_m, coeffs)] class Machine: """ machine class for the kernel rigde regression """ def __init__(self, desc, struct_sigma=None, elect_sigma=None, kernel=laplace_kernel_all): """ contructor function, with some default values :param desc: base descriptors :type desc: GDesc :param struct_sigma: structural width of the kernel, the value 900 prove to be the first value not to wide and not to narrow :type struct_sigma: int, float :param elect_sigma: electrical width of the kernel :type elect_sigma: int, float :param kernel: function to be use to calculate the kernel :type kernel: function pointer, lapalace is default """ self.descriptors = desc self.s_sigma = struct_sigma self.e_sigma = elect_sigma if desc is None: return self.K = kernel(desc, self.s_sigma, self.e_sigma) def save_state(self, filepath="./ml_kernel"): """ serialization routine, to save the current state of the kernel :param filepath: where to save the kernel :type filepath: string :return: None :rtype: None """ with timer('save_kernel'): fml_kernel_c = open(filepath + "_coef", "w") fml_kernel_d = open(filepath + "_desc", "w") np.save(file=fml_kernel_c, arr=self.coeffs) np.save(file=fml_kernel_d, arr=self.descriptors.buf) def load_state(self, filepath="./ml_kernel"): """ load some save kernel state :param filepath: where the kernel is :type filepath: string :return: :rtype: """ with timer('load_kernel'): fml_kernel_c = open(filepath + "_coef", "r") fml_kernel_d = open(filepath + "_desc", "r") self.descriptors = SCMDescriptors(0, 0, 0) self.coeffs = np.load(file=fml_kernel_c) self.descriptors.buf = np.load(file=fml_kernel_d) self.descriptors.nsystems = len(self.descriptors.buf) self.descriptors.maxlen = math.sqrt(len(self.descriptors.buf[0])) def coeff_cal(self, references, gamma=0.0): """ compute the coefficient, from the kernel matrix using some reference value as :param references: numpy.array of values to train the algorithm, :type references: numpy.array[float] :param gamma: not use :type gamma: float :return: None. change the internal state of the kernel :rtype: None """ # regularization parameter. Determine its value by cross-validation! self.coeffs = np.linalg.solve(self.K + gamma * np.eye(len(self.descriptors)), references) print("coeff") print(self.coeffs) def predict(self, test_des, dist=predict_v_l1): """ evalute the distance between the kernel and a new descriptor to produce a predicter result base of that distance and the coefficient :param test_des: new descriptor :type test_des: GDesc :param dist: function use to evaluate the distance :type dist: funtion pointer, default predict_v_l1 :return: predicted value(s) :rtype: numpy.array[float] dimention can chage depending of test_des """ # maching descriptors size to # t = self.descriptors.buf.copy() # if test_des.maxlen > self.descriptors.maxlen: # gap = (test_des.maxlen ** 2 - int(self.descriptors.maxlen) ** 2) # self.descriptors.buf = np.require(self.descriptors.buf, # requirements=['OWNDATA']) # self.descriptors.buf = np.c_[self.descriptors.buf, # np.zeros((len(self.descriptors.buf), # gap))] values = dist(self.descriptors, test_des, self.coeffs, self.s_sigma, self.e_sigma) values = np.array(values) return values class SCMDescriptors: """ descriptor class, only take care of mantain descriptor list and calculate distance, of descriptors """ def __init__(self, systems, maxlen, extra_descrip=None, struct_desc=sorted_coulomb_matrix): """ contructor for the descriptor class :param systems: array with the atoms structure :type systems: array[ase.Atoms] :param maxlen: max len of atems in all the space :type maxlen: int :param extra_descrip: example of adding extra descriptor, insert in the first position on the self.buf array extra_descrip = {'a': [34, 45], 'b': [86, 86]} :type extra_descrip: dictionary :param struct_desc: structural descriptor tu use :type struct_desc: funtion pointer, default sorted_coulomb_matrix """ # example of adding extra descriptor, insert in the first position # on the self.buf array # extra_descrip = {'a': [34, 45], # 'b': [86, 86]} if not systems: self.nsystems = 0 self.maxlen = 0 self.buf = 0 return self.nsystems = len(systems) self.maxlen = maxlen if not extra_descrip: self.buf = np.zeros((int(self.nsystems), int(maxlen**2))) with timer('selected-descriptor'): for i, atoms in enumerate(systems): scm = struct_desc(atoms) self.buf[i, :len(scm)] = scm else: self.buf = np.zeros((int(self.nsystems), int(maxlen ** 2))) with timer('selected-descriptor'): for i, atoms in enumerate(systems): scm = struct_desc(atoms) self.buf[i, :len(scm)] = scm for desc in extra_descrip: values = extra_descrip[desc] assert len(values) == len(systems) self.buf = np.insert(self.buf, 0, values, axis=1) def __len__(self): return self.nsystems def dists_L1(self, desc, jmin, jmax): """ nternal L2 norm calculation between index "desc" to all descriptors in the jmin < range < jmax :param desc: index of de reference descriptor :type desc: int :param jmin: from where to start :type jmin: int :param jmax: where to finish :type jmax: :return: matrix of distances :rtype: numpy.array[float,float] """ sub_buf = self.buf[jmin:jmax] desc_cm = self.buf[desc] dists = np.abs(sub_buf - desc_cm).sum(axis=1) return dists def dists_v_L1(self, scmDesc): """ L1 norm vertor distance, between local descriptor and an external one :param scmDesc: other descriptor :type scmDesc: SCMDescriptors :return: matrix of distance :rtype: numpy.matrix[float] """ dists = np.zeros((scmDesc.nsystems, self.nsystems)) for i in range(scmDesc.nsystems): sub_buf = self.buf[0:self.nsystems] desc_cm = scmDesc.buf[i] dists[i, :] = np.abs(sub_buf - desc_cm).sum(axis=1) return dists def dists_L2(self, desc, jmin, jmax): """ internal L2 norm calculation between index "desc" to all descriptors in the jmin < range < jmax :param desc: index of de reference descriptor :type desc: int :param jmin: from where to start :type jmin: int :param jmax: where to finish :type jmax: :return: matrix of distances :rtype: numpy.array[float,float] """ sub_buf = self.buf[jmin:jmax] desc_cm = self.buf[desc] dists = ((sub_buf - desc_cm)**2).sum(axis=1) return dists def dists_v_L2(self, scmDesc): """ L2 norm vertor distance, between local descriptor and an external one :param scmDesc: other descriptor :type scmDesc: SCMDescriptors :return: matrix of distance :rtype: numpy.matrix[float] """ dists = np.zeros((scmDesc.nsystems, self.nsystems)) for i in range(scmDesc.nsystems): sub_buf = self.buf[0:self.nsystems] desc_cm = scmDesc.buf[i] dists[i, :] = ((sub_buf - desc_cm)**2).sum(axis=1) return dists class GDesc: """ descriptor class, only take care of mantain descriptor list and calculate distance, of descriptors """ def __init__(self, struc_desc=None, struc_desc_method=None, elect_desc=None, electr_desc_method=None, nsystems=0, maxlen=0): if not struc_desc and not elect_desc: self.nsystems = 0 self.maxlen = 0 self.buf = 0 return self.nsystems = nsystems # if we want to export this, take in account that a bug here exist, # it works if we know the size of every system, but if it's # an unknown system wish a bigger size then everything need to be # recalculate, in order to mach the new size self.maxlen = maxlen # structural descriptor part if struc_desc: if struc_desc_method is None: struc_desc_method = {'key': 'sys', 'method': sorted_coulomb_matrix} systems = struc_desc[struc_desc_method['key']] # TODO this is nonsenses descriptor does # not need to now about # atoms, but it works if isinstance(systems, ase.Atoms): # we only have one, so we need to mach with someone else # self.maxlen = np.array([len(systems), maxlen]).max() self.struc_buf = np.zeros((self.maxlen**2)) lsys = len(systems) scm = struc_desc_method['method'](systems) scm = scm.reshape(lsys, lsys) target = self.struc_buf.reshape(self.maxlen, self.maxlen) target[:lsys, :lsys] = scm else: # self.maxlen = max(len(atoms) for atoms in systems) self.struc_buf = np.zeros((int(self.nsystems), int(self.maxlen**2))) for i, atoms in enumerate(systems): scm = struc_desc_method['method'](atoms) lsys = len(atoms) scm = scm.reshape(lsys, lsys) target = self.struc_buf[i].reshape(self.maxlen, self.maxlen) target[:lsys, :lsys] = scm # electrical descriptor part if elect_desc: for k in elect_desc: self.elec_buf = np.array(np.array(elect_desc[k])) def __len__(self): return self.nsystems @staticmethod def match_struct_desc_size(buf1, buf2): """ this funtion is for matching the column-part shape, of two numpy buffers . It help to calculate distance, latter. :param buf1: :type buf1: np :param buf2: :type buf2: np :return: new buffers copy of the originals with matching columns :rtype: [], [] """ nbuf1 = buf1.copy() nbuf2 = buf2.copy() sbuff1 = nbuf1.shape sbuff2 = nbuf2.shape if sbuff1[0] == sbuff2[0]: return nbuf1, nbuf2 gap = np.abs(sbuff1[0] - sbuff2[0]) if sbuff1[0] > sbuff2[0]: nbuf2 = np.require(nbuf2, requirements=['OWNDATA']) nbuf2 = np.concatenate((nbuf1, np.zeros((gap)))) else: nbuf1 = np.require(nbuf1, requirements=['OWNDATA']) nbuf1 = np.concatenate((nbuf1, np.zeros((gap)))) return nbuf1, nbuf2 def dists_l1(self, desc, jmin, jmax): """ nternal L2 norm calculation between index "desc" to all descriptors in the jmin < range < jmax :param desc: index of de reference descriptor :type desc: int :param jmin: from where to start :type jmin: int :param jmax: where to finish :type jmax: :return: matrix of distances :rtype: numpy.array[float,float] """ if hasattr(self, 'struc_buf'): sub_buf = self.struc_buf[jmin:jmax] desc_cm = self.struc_buf[desc] s_dists = np.sum(np.abs(sub_buf - desc_cm), axis=1) else: s_dists = np.zeros((self.nsystems)) if hasattr(self, 'elec_buf'): sub_buf = self.elec_buf[jmin:jmax] desc_cm = self.elec_buf[desc] e_dists = np.abs(sub_buf - desc_cm) else: e_dists = np.zeros((self.nsystems)) return s_dists, e_dists def dists_v_l1(self, ext_desc): """ L1 norm vertor distance, between local descriptor and an external one :param ext_desc: other descriptor :type ext_desc: GDesc :return: matrix of distance :rtype: numpy.matrix[float] """ if hasattr(self, 'struc_buf'): s_dists = [] for i in range(self.nsystems): sub_buf, ext_buf = \ self.match_struct_desc_size( self.struc_buf[i], ext_desc.struc_buf) s_val = np.abs(ext_buf - sub_buf) s_dists.append(s_val.sum()) s_dists = np.array(s_dists) else: s_dists = np.zeros((self.nsystems)) if hasattr(self, 'elec_buf'): e_dists = [] for i in range(ext_desc.nsystems): e_val = np.abs(self.elec_buf[i] - ext_desc.elec_buf) e_dists.append(e_val) e_dists = np.array(e_dists) else: e_dists = np.zeros((self.nsystems)) return s_dists, e_dists def dists_l2(self, desc, jmin, jmax): """ internal L2 norm calculation between index "desc" to all descriptors in the jmin < range < jmax :param desc: index of de reference descriptor :type desc: int :param jmin: from where to start :type jmin: int :param jmax: where to finish :type jmax: :return: matrix of distances :rtype: numpy.array[float,float] """ if hasattr(self, 'struc_buf'): sub_buf = self.struc_buf[jmin:jmax] desc_cm = self.struc_buf[desc] s_dists = np.sum((np.abs(sub_buf - desc_cm))**2, axis=1) else: s_dists = np.zeros((1, self.nsystems)) if hasattr(self, 'elec_buf'): sub_buf = self.elec_buf[jmin:jmax] desc_cm = self.elec_buf[desc] e_dists = np.abs((sub_buf - desc_cm)**2) else: e_dists = np.zeros((self.nsystems)) return s_dists, e_dists def dists_v_l2(self, ext_desc): """ L2 norm vertor distance, between local descriptor and an external one :param ext_desc: other descriptor :type ext_desc: GDesc :return: matrix of distance :rtype: numpy.matrix[float] """ if hasattr(self, 'struc_buf'): s_dists = [] for i in range(self.nsystems): sub_buf, ext_buf = \ self.match_struct_desc_size( self.struc_buf[i], ext_desc.struc_buf) s_val = np.abs(sub_buf - ext_buf)**2 s_dists.append(np.sum(s_val, axis=0)) s_dists = np.array(s_dists) else: s_dists = np.zeros((self.nsystems)) if hasattr(self, 'elec_buf'): e_dists = [] for i in range(ext_desc.nsystems): e_val = np.abs(self.elec_buf[i] - ext_desc.elec_buf)**2 e_dists.append(e_val) e_dists = np.array(e_dists) else: e_dists = np.zeros((self.nsystems)) return s_dists, e_dists # s_dists = np.zeros((ext_desc.nsystems, self.nsystems)) # e_dists = np.zeros((ext_desc.nsystems, self.nsystems)) # if hasattr(self, "struc_buf"): # sub_s_buf, desc_s_cm = \ # self.match_struct_desc_size( # self.struc_buf[0:self.nsystems], # ext_desc.struc_buf) # else: # sub_s_buf = np.zeros((ext_desc.nsystems)) # desc_s_cm = np.zeros((1, self.nsystems)) # if hasattr(self, "elec_buf"): # sub_e_buf, desc_e_cm = \ # self.match_struct_desc_size( # self.elec_buf[0:self.nsystems], # ext_desc.elec_buf) # else: # sub_e_buf = np.zeros((1, ext_desc.nsystems)) # desc_e_cm = np.zeros((1, self.nsystems)) # for i in range(ext_desc.nsystems): # s_val = np.abs((sub_s_buf - desc_s_cm) ** 2) # e_val = np.abs((sub_e_buf - desc_e_cm) ** 2) # if len(s_val.shape) == 1: # s_dists[i, :] = s_val # else: # s_dists[i, :] = np.sum(s_val, axis=1) # if len(e_val.shape) == 1: # e_dists[i, :] = e_val # else: # e_dists[i, :] = np.sum(e_val, axis=1) # return s_dists, e_dists def select_best_fit_desc(learn_desc, test_desc, min_select_size=None, threshold_adjust=None, dist_d1d2=None, dist2kernel=None, s_sigma=None, e_sigma=None): """ elect those system that are "closer" to the test system, those systems are use to create a new training set. for multiples descriptors TODO save the calculate exponential for the selected train_set, TODO avoid recalculation :param learn_desc: class object with the learn descriptors :type learn_desc: GDesc :param test_desc: class object with the test descriptors :type test_desc: GDesc :param dist_d1d2: function pointer to the method for calculate distance :type dist_d1d2: function pointer :param min_select_size: value to adjust the amount of selected indices for small thresholds, how % is expected to be use from the learn_desc :type min_select_size: int :param threshold_adjust: value to adjust the threshold, the average distance will be divided by this value in order to getter smaller :type threshold_adjust: float :param dist2kernel: function pointer to the method that convert distance into a kernel :type dist2kernel:function pointer :param s_sigma: sigma value for the structural descriptor :type s_sigma: float :param e_sigma:sigma value for the electrical descriptor :type e_sigma: float :return: array of index wish descriptors in learn_desc, are the best, unbral :rtype:[int], float """ threshold = 0 index = [] # dist = dist2kernel(dist_d1d2(learn_desc, test_desc, # s_sigma, e_sigma)) dist = distmatrix2kernel( dist_l1_d1_d2(learn_desc, test_desc, s_sigma, e_sigma)) # dist could be an array for i, dis in enumerate(dist): # need a better way to calculate threshold, for now I'll use average # I think for a big amount of molecules this value could by lower, threshold = np.average(dis) while True: for j, d in enumerate(dis): # this will get rid of descriptors how has lower impact # or are "far away" from the molecule if d >= threshold: index.append(j) if threshold_adjust is None: break if ((len(index)*100) / len(dis)) >= min_select_size: break else: index = [] threshold -= threshold_adjust return index, threshold # -------------------------unused code, for test only--------------------------- def get_e_descriptor(ksname): f = open(ksname, 'r') kse = [] for line in f: words = line.split() occ = float(words[1]) kse.append(float(words[2])) if occ == 0.0: ilumo = int(words[0]) ihomo = ilumo-1 break f.close() gap = kse[ilumo]-kse[ihomo] return gap def get_spec_property(efname): with open(efname) as fd: for i in range(4): next(fd) line = next(fd) tokens = line.split() energy = float(tokens[1]) fosc = float(tokens[3]) return energy,fosc def define_training_set(): # define the training set T as an array of molecules xyzpath = 'XYZs' epath = 'ELECTRONIC/ORCA/PBE0' TS = [] energies = [] foscs = [] gaps = [] filenames = glob('%s/*.xyz' % xyzpath) filenames.sort() for xyzname in filenames: molecule = read(xyzname) TS.append(molecule) bname = os.path.basename(xyzname) num, name, ext = bname.split('.') efname = '%s/%s.excitations' % (epath, num) energy, fosc = get_spec_property(efname) energies.append(energy) foscs.append(fosc) ksname = '%s/%s.eigenvalues' % (epath, num) gap = get_e_descriptor(ksname) gaps.append(gap) return TS, energies, foscs, gaps