1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362
|
#!/usr/bin/env perl -w
use strict;
use warnings;
#use threads;
use threads ('yield',
'stack_size' => 64*4096,
'exit' => 'threads_only',
'stringify');
use Data::Dumper;
$Data::Dumper::Sortkeys=1;
#=========================================================================================#
# Déclaration de globale
#=========================================================================================#
my $masterthr=threads->self;
my %monitorRules=();
my %thrIDForRules=();
my %ruleNameForthrID=();
my $sleepRuleGroup=3;
my $sleepRulePred=3;
my %scheduleRules=(
groups => {
group100 => {
rules => {
rule01 => { param => [ 5 ] },
rule02 => { preds => [ ], param => [ 6 ] },
},
},
group200 => {
rules => {
rule06 => { preds => [ ], param => [ 5 ] },
rule07 => { preds => [ 'rule01' ], param => [ 6 ] },
rule09 => { param => [ 8 ] },
rule08 => { preds => [ 'rule02' ], param => [ 7 ] },
}
},
group300 => {
rules => {
rule03 => { preds => [ 'rule06' ], param => [ 5 ] },
rule04 => { preds => [ 'rule03' ], param => [ 6 ] },
rule05 => { preds => [ 'rule08', 'rule03' ], param => [ 7 ] },
}
}
}
);
sub _timeNow(@) {
use DateTime;
my $dt=DateTime->now;
return $dt->hms;
}
sub _trace(@) {
my ( $function, $message, $skipLine )=@_;
if ( $skipLine ) {
print $skipLine._timeNow." - $function - $message\n";
} else {
print _timeNow." - $function - $message\n";
}
return 1;
}
#=========================================================================================#
# Attente des prédécesseurs de fin de groupe de règles
#=========================================================================================#
sub _waitAllThreadsForCurrentGroup(@) { #
#-----------------------------------------------------------------------------------------#
# Function de surveillance de status d'une liste de règle
# On s'appui sur le hash %thrIDForRules, pour obtenir la référence d'un Thread,
# pour un nom de règle
#-----------------------------------------------------------------------------------------#
#
my ( $objetcType, $objectName, @ruleNameList )=@_; # Prend le type et le nom du contraint et la liste des règles contraignantes
#
_trace( "_waitAllThreadsForCurrentGroup", "For $objetcType=$objectName, waiting Rules=".join(', ',@ruleNameList) );
#
my $stop=0; # Positionne le flag d'arrêt de la boucle à faux
until ( $stop ) { # Boucle interminable...
foreach my $ruleName ( @ruleNameList ) { # Boucle sur la liste des règles à surveiller
#
unless ( exists( $monitorRules{rules}{$ruleName}{status} ) ) { # Si la règle à surveiller, n'a pas de status dans le monitor...
$monitorRules{rules}{$ruleName}{status}='running'; # On lui donne le status 'running' par défaut
} #
#
foreach my $thr ( threads->list ) { # Boucle sur la liste des threads actifs
#
my $refThrCurrentRule=$thrIDForRules{$ruleName}{thr}; # Prend le pointeur du thread de la règle à surveiller
if ( $thr->equal( $refThrCurrentRule ) ) { # Si le thread actif courant est le même que celui de la règle courante...
_trace( "_waitAllThreadsForCurrentGroup", "For $objetcType=$objectName, dependency constraint waiting Rule=$ruleName" );
$refThrCurrentRule->join; # On est donc obligé d'attendre la fin de cette règle
$monitorRules{rules}{$ruleName}{status}='terminated'; # On lui donne le status 'terminated'
} #
#
} #
#
} #
sleep $sleepRuleGroup; # On attend une poignée de seconde (évite trop de charge)
#
#--------------------------------------------------------------------------------------#
# On fait ce qu'il faut pour sortir de la boucle interminable
#--------------------------------------------------------------------------------------#
my $countRules=scalar(@ruleNameList); # On compte le nombre de règles à surveiller
foreach my $ruleName ( @ruleNameList ) { # Boucle sur la liste des règles à surveiller
if ( $monitorRules{rules}{$ruleName}{status} eq 'terminated' ) { # Si la règle à un statut 'terminated'
$countRules--; # On décrémente le compteur des règles à surveiller
_trace( "_waitAllThreadsForCurrentGroup", "Rule $ruleName has been terminated" );
} #
} #
unless ( $countRules ) { # Si le compteur est à 0, il n'y a plus de règles à surveiller
$stop=1; # On met le flag de stop de la boucle interminable à vrai
} #
} #
#
return 1; # Renvoi toujours vrai
} #
#=========================================================================================#
#=========================================================================================#
# Fonction fournissant une liste des règles toujours en exécution, d'après une liste de départ
#=========================================================================================#
sub _removeThreadFinished(@) { #
my ( $ruleName, @expectedPredRules )=@_; # En argument la règle contrainte, la liste des règles contraignantes
my @runningPredRules=(); # Initialisation à vide, de la liste en sortie
#
foreach my $predRule ( @expectedPredRules ) { # Boucle sur la liste des règles contraignantes
my $currentRuleRunning=1; # Flag, considérant que la règle prédécesseur est toujours running
#
if ( $monitorRules{rules}{$predRule}{status} eq 'terminated' ) { # On regarde d'abord dans le monitor, si la règle contraignante courante n'est pas déjà terminée...
$currentRuleRunning=0; # On met le flag running à faux
next; # Et on passe à la règle contraignante suivante
} #
#
foreach my $terminatedThread ( threads->list(threads::joinable) ) { # Boucle sur la liste des threads terminés et "joinable"
my $thrID=$terminatedThread->tid();
if ( exists( $ruleNameForthrID{$thrID} ) ) {
print "_removeThreadFinished - Test $thrID for Rule ".$ruleNameForthrID{$thrID}." joinable $terminatedThread=".$thrIDForRules{$predRule}{thr}." \n";
}
if ( $terminatedThread eq $thrIDForRules{$predRule}{thr} ) { # Si le thread courant est le même que le threads de la règle contraignante courante...
$currentRuleRunning=0; # On met le flag running à faux
$monitorRules{rules}{$predRule}{status}='terminated'; # Dans la monitor, on change le status la règle contraignate à "terminated"
$terminatedThread->join(); # On join ce threads, pour le marqué terminé
} #
} # Fin de boucle sur la list des threads "joinable"
if ( $currentRuleRunning ) { # Si le flag de "running" est toujours à vrai...
_trace( "_removeThreadFinished", "Rule $predRule still running" );
push( @runningPredRules, $predRule ); # on ajoute la règle contraignante dans la list en sortie
} #
} # Fin de la boucle sur les règles contraignantes
#
unless ( @runningPredRules ) { # Si la liste des règles contraignantes est vide...
_trace( "_removeThreadFinished", "No more rule running for $ruleName" );
} else { # Si elle n'est pas vide...'
_trace( "_removeThreadFinished", "Rules still running for $ruleName (".join(', ', @runningPredRules ).")" );
} #
return @runningPredRules; # Renvoi de la liste des règles contraignates restantes
} #
#=========================================================================================#
#=========================================================================================#
# Fonction contrôle des règles contraignantes avant soumission de règle
#=========================================================================================#
sub _controlAndSubmitOneRule(@) { #
my ( $refHashWork, $ruleName, $groupName )=@_; # Prend en argument : La référence du hash de travail, le nom de la règle à contrôler/soumettre, le groupe
#
unless( _removeThreadFinished($ruleName, @{ $refHashWork->{$ruleName}{preds} }) ) { # Si le contrôle des règles contraignantes est vide...
_submitlOneRule( $refHashWork, $ruleName, $groupName ); # Demande de soumission de la règle
delete( $refHashWork->{$ruleName} ); # Suppression de la clé de la règle, du hash de travail
} #
#
return 1; # Renvoi toujours vrai
} #
#=========================================================================================#
#=========================================================================================#
# Fonction de soumission d'une règle
#=========================================================================================#
sub _submitlOneRule(@) { #
my ( $refHashWork, $ruleName, $groupName )=@_; # Prend en arguments : la référence du hash de travail, le nom de la règle, le nom du groupe
#
my $thr=threads->new( # On démarre un nouveau thread
\&$ruleName, # Avec la référence à la fonction (par le nom de la règle courante)
$ruleName, # Premier paramètre, le nom de la règle
@{ $refHashWork->{$ruleName}{param} } # Paramètres suivants, ceux du hash de déclaration des règles
); #
my $thrID=$thr->tid(); # Au passage, on prend l'ID du thread
#
$thrIDForRules{$ruleName}{tid}=$thrID; # On rapproche le nom de la règle et l'ID de son thread (ça peu servir)
$thrIDForRules{$ruleName}{thr}=$thr; # On rapproche le nom de la règle et la référence de son thread (ça peu servir)
#
$ruleNameForthrID{$thrID}=$ruleName; # On rapproche l'ID du thread à la règle courante (ça peu servir)
#
$monitorRules{rules}{$ruleName}{status}='started'; # On fixe le statut de la règle à 'started' dans le hash de monitoring
$monitorRules{rules}{$ruleName}{group}=$groupName; # On fixe le nom du group courant pour la règle courante dans le hash de monitoring (ça peu servir)
#
return 1; #
} #
#=========================================================================================#
#=========================================================================================#
# Fonction de soumission des thread pour toutes les règles d'un group
#=========================================================================================#
sub _submitAllRuleForCurrentGroup(@) { #
#-----------------------------------------------------------------------------------------#
# Function de création de thread pour chaque règle et mise à jour du hash de monoring
# Stocke aussi les références et ID de rapprochement des noms de règles et les leur thread
# permettant un accès plus rapide et concis, pour la surveillance
#-----------------------------------------------------------------------------------------#
my ($refGroup, $groupName)=@_; # Prend le pointeur du groupe de règle, du hash de déclaration des règles
#
my %hashWork=%{ $refGroup->{rules} }; # Copie de la list des règles (avec leurs paramètres) dans un hash de travail
#
while ( keys( %hashWork ) ) { # Boucle tant qu'il y a des règles à soumettre restante
#print Data::Dumper->Dump( [\%hashWork], ['hashWork'] );
#print Data::Dumper->Dump( [\%monitorRules], ['monitorRules'] );
foreach my $ruleName ( keys( %hashWork ) ) { # Boucle sur la liste des règles du hash de travail
_controlAndSubmitOneRule( \%hashWork, $ruleName, $groupName ); # Demande de contrôle des contraignantes et soumission (si possible)
} # Fin de boucle sur le hash de travail
sleep $sleepRulePred; # Attente avant de rebalayer le hash de travail
} # Fin de boucle sur la liste des règles restantes
#
return 1; # Renvoi toujours vrai
} #
#=========================================================================================#
#=========================================================================================#
# Fonction d'appel à l'attente de règles, pour un groupe
#=========================================================================================#
sub _hasPreviousGroup(@) { #
my ($group)=@_; # Prend le nom d'un group en paramètre
if ( scalar( threads->list ) ) { # Si des threads esclaves d'un groupe tournent...
my @ruleNameList=keys( %{ $scheduleRules{groups}{$group}{rules} } ); # On prend la liste des règles du groupe précédent
_trace( "_hasPreviousGroup", "Wait all rules on group $group" );
_waitAllThreadsForCurrentGroup( # On demande d'attendre la fin de toutes ses règles
'group', # Passe le type de l'objet contraint
$group, # Passe le nom de l'objet contraint
@ruleNameList # Passe la liste des règles contraignantes
); #
} #
return 1; # Renvoi toujours vrai
} #
#=========================================================================================#
sub _doStuff(@) {
my ( $previousGroup, $currentGroup )=@_;
if ( $previousGroup and $previousGroup eq $currentGroup ) {
_trace( "_doStuff", "All groupe terminated : Do some stuff at the end !!!", "\n" );
}
elsif ( ! $previousGroup ) {
_trace( "_doStuff", "No group started : Do some stuff at the begin !!!" );
}
else {
_trace( "_doStuff", "Do stuff between group=$previousGroup and group=$currentGroup" );
}
}
#=========================================================================================#
# Fonction principale du MASTER de gestion des groupes de règles
#=========================================================================================#
sub startManageGroup(@) { #
my $previousGroup; # Déclare le group précédent vide par défaut
foreach my $group ( sort keys( %{ $scheduleRules{groups} } ) ) { # Boucle sur la liste des groupe de règles
my $refGroup=\%{ $scheduleRules{groups}{$group} }; # Pour alleger le code, on prend la ref du hash des règles du groupe courant
if ( $previousGroup ) { # Si il y avait un groupe précédent...
_trace( "startManageGroup", "New rules group, wait all rules previous group" );
_hasPreviousGroup( $previousGroup ); # Y-a-t'il encore des règles en cours ?
_trace( "startManageGroup", "Rules group=$previousGroup terminated" );
} #
#
#-----------------------------------------------------------------------------------------#
# Ici il n'y a pas (ou il n'y a plus) de threads esclaves actifs pour le group précédent
#-----------------------------------------------------------------------------------------#
_doStuff( $previousGroup, $group ); # On peut faire des trucs entre le $previousGroup et le $group, ici
$previousGroup=$group; # Comme on débute un nouveau group, on met son nom dans le previousGroup (pour plus tard)
_trace( "startManageGroup", "Rules group=$group started", "\n" );
_submitAllRuleForCurrentGroup( $refGroup, $group ); # On demande la soumission de tous les threads des règles du group courant
#
} #
#
print "startManageGroup - All group ended wait last rules\n";
_hasPreviousGroup( $previousGroup ); # Attend la fin de toutes les règles du dernier groupe
_doStuff( $previousGroup, $previousGroup ); # On peut faire des trucs à la fin du dernier groupe, ici
#
return 1; # Renvoi toujours vrai
} #
#=========================================================================================#
sub makeRuleGraph(@) {
}
#=========================================================================================#
# Déclaration toute les règles en local pour les tests
#=========================================================================================#
sub rule01 { my ($ruleName, $wait)=@_; _ruleContent( $ruleName, $wait ); return 1; }
sub rule02 { my ($ruleName, $wait)=@_; _ruleContent( $ruleName, $wait ); return 1; }
sub rule03 { my ($ruleName, $wait)=@_; _ruleContent( $ruleName, $wait ); return 1; }
sub rule04 { my ($ruleName, $wait)=@_; _ruleContent( $ruleName, $wait ); return 1; }
sub rule05 { my ($ruleName, $wait)=@_; _ruleContent( $ruleName, $wait ); return 1; }
sub rule06 { my ($ruleName, $wait)=@_; _ruleContent( $ruleName, $wait ); return 1; }
sub rule07 { my ($ruleName, $wait)=@_; _ruleContent( $ruleName, $wait ); return 1; }
sub rule08 { my ($ruleName, $wait)=@_; _ruleContent( $ruleName, $wait ); return 1; }
sub rule09 { my ($ruleName, $wait)=@_; _ruleContent( $ruleName, $wait ); return 1; }
#=========================================================================================#
# Pour les tests, toutes les règles executent le même code (avec un wait différent)
#=========================================================================================#
sub _ruleContent(@) {
my ($ruleName, $wait)=@_;
_trace( "===BEG=== $ruleName", "Start for wait $wait sec" );
sleep $wait;
_trace( "===END=== $ruleName", "terminated" );
return 1;
}
#=========================================================================================#
# MAIN : Boucle de control des états entre les groupes
#=========================================================================================#
print "MAIN - Start scheduler\n";
startManageGroup();
#=========================================================================================#
# Pour le debug, on regarde un peut si tout est OK...
#=========================================================================================#
#print Data::Dumper->Dump( [\%thrIDForRules], ['thrIDForRules'] );
#print Data::Dumper->Dump( [\%ruleNameForthrID], ['ruleNameForthrID'] );
#print Data::Dumper->Dump( [\%monitorRules], ['monitorRules'] );
print "\nMAIN - End scheduler\n";
exit 0; |