Bonjour

Voici le code de Base

il charge la base sqllight depuis un fichier csv
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
 
 
# coding: utf-8  
'''
Created on 23 mars 2018
 
@author: 
'''
import csv
import sqlite3
import sys
import datetime
import csv
import os
import encodings
 
from codecs import open
 
def convertie_date(src):
    if src =='NULL':
        valid_date = None
    else:
        alpha_aaaa = int(src[0:4]) 
        alpha_mm =   int(src[4:6]) 
        alpha_jj =  int(src[6:8]) 
        try :
            valid_date = datetime.date(alpha_aaaa,alpha_mm,alpha_jj) 
            return valid_date
        except ValueError:
            print('%s Date invalide !' % src)
            return None
pass  
 
def EstceNull(src):
    if src =='NULL':
        return  None
    else:
        return src
 
def Estce_nombreNull(src):
    if src =='NULL':
        return  None
    else:
        return int(src)    
 
class ExcelFr(csv.excel):
    # Séparateur de champ
    delimiter = ";"
    # Séparateur de ''chaîne''
    quotechar = None
    # Gestion du séparateur dans les ''chaînes''
    escapechar = None
    doublequote = True
    # Fin de ligne
    lineterminator = "\r\n"
    # Ajout automatique du séparateur de chaîne (pour ''writer'')
    quoting = csv.QUOTE_NONE
    # Ne pas ignorer les espaces entre le délimiteur de chaîne
    # et le texte
    skipinitialspace = False 
    encodage ='utf8'
 
class vls_actes(object):
 
    def __init__(self,liste):
 
        self.ACT_CODE = EstceNull(liste[0]);
        self.ACT_LIBELLE = EstceNull(liste[1]);
 
    def __str__(self):
        resultat = "Act_Code : " +  str(self.ACT_CODE) + "\n"
        resultat += "ACT_LIBELLE: " + str(self.ACT_LIBELLE )  
        return  resultat
 
    def table_sqllite(self,Base):
        creer_table  = """CREATE TABLE vsl_actes(
            ACT_CODE TEXT(255) NOT NULL,
            ACT_LIBELLE TEXT(255),
            CONSTRAINT actes_pk PRIMARY KEY(ACT_CODE)
        );
        """ 
 
        detruire_table  = """DROP TABLE IF EXISTS vsl_actes;"""
 
        curseur =  Base.cursor() # appel au curseur
 
        # supprimer table existante
        if(detruire_table !=""):
            try:
                curseur.execute(detruire_table)  
            except sqlite3.Error as e:
                print("Problème"+ " :" +   e.args[0]  + " sur commande /n"+ detruire_table )
                Base.rollback()
            else:
                Base.commit()    
 
 
        if(creer_table !=""):
            print("creer base")
            try:
                curseur.execute(creer_table)
            except sqlite3.Error as e:
                print("Problème"+ " :" +   e.args[0]  + " sur commande /n"+ creer_table )  
                Base.rollback()
            else:
                Base.commit()    
 
        index_table =""" CREATE INDEX vsl_actes_IDX ON vsl_actes(ACT_LIBELLE);"""       
        if(index_table !=""):
            try:
                curseur.execute(index_table)
 
            except sqlite3.Error as e:
                print("Problème de creation d'index : "  +   e.args[0]  + "/n" )  
                print(index_table)
                Base.rollback()
            else: 
                Base.commit()      
 
 # self.GEL_CODE = self.GEL_CODE[7:12]
        pass
 
    def insertion_sqllite(self,Base):
 
        curseur = Base.cursor() # appel au curseur
        data=[self.ACT_CODE,self.ACT_LIBELLE]
 
        rq = """INSERT OR IGNORE INTO vsl_actes
                    (ACT_CODE, ACT_LIBELLE)
                       VALUES(?, ?);"""
 
        try:
            curseur.execute(rq,data)
 
        except sqlite3.Error as e:
            print("Problème"+ " :" +   e.args[0]  + " sur commande d'insertion /n" )  
            print(rq)
            Base.rollback()
        else: 
            Base.commit()  
 
 
 
 
def actes(nomfichier,Base):
 
    file =open(nomfichier ) 
    i=0 # indeteur de ligne du fichier pour ne pas traiter l'entete 
    #
    # Ouverture du fichier source.
    #
    try:
        #
        # Cr�ation du ''lecteur'' CSV.
        #
        fichier_lu = csv.reader(file, 'excel-fr')
 
        max_long_ACT_CODE = 0
        max_long_ACT_LIBELLE = 0
        for row in fichier_lu:
            i+=1
 
            if(i>1):
                nouvel_acte = vls_actes(row)  
 
                if(i==2): # Suppprsseion de la table pr�c�dente  si besion 
                    # cr�ation de la nouvelle table 
                    nouvel_acte.table_sqllite(Base)
                # insertion de donn�es
                nouvel_acte.insertion_sqllite(Base)
 
                if(max_long_ACT_CODE < len(nouvel_acte.ACT_CODE)):
                     max_long_ACT_CODE = len(nouvel_acte.ACT_CODE)
 
                if(max_long_ACT_LIBELLE < len(nouvel_acte.ACT_LIBELLE)):
                     max_long_ACT_LIBELLE = len(nouvel_acte.ACT_LIBELLE)
 
        #print(max_long_ACT_CODE, end='\n' )
        #print(max_long_ACT_LIBELLE , end='\n')
    except OSError:
            print("Problème d'access au fichier :" + nomfichier)
    finally:
        #
        # Fermeture du fichier source
        #
        file.close()     
 
 
 
if __name__ == '__main__':
    # Enregistre ce dialecte aupr�s du module csv
    encodage ="utf-8"
    LaBase ="acte.db" 
    Base = sqlite3.connect(LaBase)
    csv.register_dialect('excel-fr', ExcelFr())
 
    actes("actes2.csv" , Base)
 
    Base.close()
Voici la solution base sur un concept de reader utilsant la collections /namedtuple sur la base d'une solution proposé par wiztricks


https://www.developpez.net/forums/d1836440/autres-langages/python-zope/general-python/convertion-csv-vers-python/


Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# coding: utf-8  
'''
Created on 23 mars 2018
 
@author: 
'''
import csv
import sqlite3
import sys
import csv
import os
import encodings
from collections import namedtuple
from src.convertie import *
 
 
 
 
class record(namedtuple('record', 'ACT_CODE, ACT_LIBELLE')):
    def __new__(cls, ACT_CODE, ACT_LIBELLE):
        if ACT_CODE == 'NULL':
            ACT_CODE = None
 
        if   ACT_LIBELLE == 'NULL': 
             ACT_LIBELLE = None
 
        return super().__new__(cls, ACT_CODE, ACT_LIBELLE)
    def __str__(self):
        resultat = str(self.ACT_CODE)
        resultat += str(self.ACT_LIBELLE)  
        return  resultat   
 
 
 
 
class Reader:
    def __init__(self, file):
        self._reader = csv.reader(file)  
 
    def get_records(self, count=10):
        records = []
        try:
            for _ in range(count): #injection par groupe de 3 lignes et vérifier les doublons
                row = next(self._reader)
                # on construit les tuples 
                enregistrement = record (row[0] , row[1])
                print(enregistrement)
                # on vérifie les doubles  
                if(enregistrement  not in records):
                    records.append(enregistrement)
        except StopIteration:
            pass
        except:
            raise
        return records
 
 
class Writer:
    def __init__(self, path=':memory:'):
    #def __init__(self, path='mem.db'):    
        cnx = self._cnx = sqlite3.connect(path)
        commandesql = '''drop table if  exists actes;  ''' 
 
        try:
            cnx.execute(commandesql)   
        except sqlite3.Error as e: 
            print("Problème"+ " :" +   e.args[0]  + " sur commande /n"+ commandesql )
            self._cnx.rollback()
        else:
            self._cnx.commit()      
 
 
        commandesql = """CREATE TABLE vsl_actes(
            ACT_CODE TEXT(255),
            ACT_LIBELLE TEXT(255),
            CONSTRAINT actes_pk PRIMARY KEY(ACT_CODE));"""
 
        try:
            cnx.execute(commandesql)   
        except sqlite3.Error as e: 
            print("Problème"+ " :" +   e.args[0]  + " sur commande /n"+ commandesql )
            self._cnx.rollback()
        else:
            self._cnx.commit()            
 
        commandesql = """CREATE INDEX vsl_actes_IDX ON vsl_actes(ACT_LIBELLE);"""      
 
        try:
            cnx.execute(commandesql)  
        except sqlite3.Error as e: 
            print("Problème"+ " :" +   e.args[0]  + " sur commande /n"+ commandesql )
            self._cnx.rollback()
        else:
            self._cnx.commit()        
 
    def writesmany(self, records):
        rq = """INSERT OR IGNORE INTO vsl_actes
        (ACT_CODE, ACT_LIBELLE)
        VALUES(?, ?);"""
        with self._cnx:
            try:
                self._cnx.executemany(rq, records)
            except sqlite3.Error as e:
                print("Problème"+ " :" +   e.args[0]  + " sur commande d'insertion /n" )  
 
                print("\t"  +str(records))
                self._cnx.rollback()
                # dans le cas d'un problème d'injection multipls on repasse à la mono injection au cas par cas
                # et on laisse les mécasnisme de sécurite de la base agir
                self.writemono( records)
            else: 
                self._cnx.commit()
 
    def writemono(self, records):
        rq = """INSERT OR IGNORE INTO vsl_actes
        (ACT_CODE, ACT_LIBELLE)
        VALUES(?, ?);"""
 
        with self._cnx:
            for enregistrement in records:
                try:
                    self._cnx.execute(rq, enregistrement)
                except sqlite3.Error as e:
                    print("Problème"+ " :" +   e.args[0]  + " sur commande d'insertion /n" )  
 
                    print("\t"  +str(enregistrement))
                    self._cnx.rollback()
                else: 
                    self._cnx.commit()
 
    def dump(self):
        with self._cnx:
            for row in self._cnx.execute('select * from vsl_actes'):
                print (row)
 
 
def load(reader, writer):
    while 1:
        records = reader.get_records()
        print(records)
        if not records:
            break
        writer.writesmany(records)
 
 
if __name__ == '__main__':
 
    csv.register_dialect('excel-fr', ExcelFr())
    writer = Writer()
    with open('actes.csv' ,'r') as file:
        load(Reader(file), writer)
    writer.dump()

Mon problème est le chargement du fichier


enregistrement = record (row[0] , row[1])
IndexError: list index out of range