
|
#!/usr/bin/env perl -w
use strict;
use warnings;
#use threads;
use threads ('yield',
'stack_size' => 64*4096,
'exit' => 'threads_only',
'stringify');
use Data::Dumper;
$Data::Dumper::Sortkeys=1;
#=========================================================================================#
# Déclaration de globale
#=========================================================================================#
my $masterthr=threads->self;
my %monitorRules=();
my %thrIDForRules=();
my %ruleNameForthrID=();
my $sleepRuleGroup=3;
my $sleepRulePred=3;
my %scheduleRules=(
groups => {
group100 => {
rules => {
rule01 => { param => [ 5 ] },
rule02 => { preds => [ ], param => [ 6 ] },
},
},
group200 => {
rules => {
rule06 => { preds => [ ], param => [ 5 ] },
rule07 => { preds => [ 'rule01' ], param => [ 6 ] },
rule09 => { param => [ 8 ] },
rule08 => { preds => [ 'rule02' ], param => [ 7 ] },
}
},
group300 => {
rules => {
rule03 => { preds => [ 'rule06' ], param => [ 5 ] },
rule04 => { preds => [ 'rule03' ], param => [ 6 ] },
rule05 => { preds => [ 'rule08', 'rule03' ], param => [ 7 ] },
}
}
}
);
sub _timeNow(@) {
use DateTime;
my $dt=DateTime->now;
return $dt->hms;
}
sub _trace(@) {
my ( $function, $message, $skipLine )=@_;
if ( $skipLine ) {
print $skipLine._timeNow." - $function - $message\n";
} else {
print _timeNow." - $function - $message\n";
}
return 1;
}
#=========================================================================================#
# Attente des prédécesseurs de fin de groupe de règles
#=========================================================================================#
sub _waitAllThreadsForCurrentGroup(@) { #
#-----------------------------------------------------------------------------------------#
# Function de surveillance de status d'une liste de règle
# On s'appui sur le hash %thrIDForRules, pour obtenir la référence d'un Thread,
# pour un nom de règle
#-----------------------------------------------------------------------------------------#
#
my ( $objetcType, $objectName, @ruleNameList )=@_; # Prend le type et le nom du contraint et la liste des règles contraignantes
#
_trace( "_waitAllThreadsForCurrentGroup", "For $objetcType=$objectName, waiting Rules=".join(', ',@ruleNameList) );
#
my $stop=0; # Positionne le flag d'arrêt de la boucle à faux
until ( $stop ) { # Boucle interminable...
foreach my $ruleName ( @ruleNameList ) { # Boucle sur la liste des règles à surveiller
#
unless ( exists( $monitorRules{rules}{$ruleName}{status} ) ) { # Si la règle à surveiller, n'a pas de status dans le monitor...
$monitorRules{rules}{$ruleName}{status}='running'; # On lui donne le status 'running' par défaut
} #
#
foreach my $thr ( threads->list ) { # Boucle sur la liste des threads actifs
#
my $refThrCurrentRule=$thrIDForRules{$ruleName}{thr}; # Prend le pointeur du thread de la règle à surveiller
if ( $thr->equal( $refThrCurrentRule ) ) { # Si le thread actif courant est le même que celui de la règle courante...
_trace( "_waitAllThreadsForCurrentGroup", "For $objetcType=$objectName, dependency constraint waiting Rule=$ruleName" );
$refThrCurrentRule->join; # On est donc obligé d'attendre la fin de cette règle
$monitorRules{rules}{$ruleName}{status}='terminated'; # On lui donne le status 'terminated'
} #
#
} #
#
} #
sleep $sleepRuleGroup; # On attend une poignée de seconde (évite trop de charge)
#
#--------------------------------------------------------------------------------------#
# On fait ce qu'il faut pour sortir de la boucle interminable
#--------------------------------------------------------------------------------------#
my $countRules=scalar(@ruleNameList); # On compte le nombre de règles à surveiller
foreach my $ruleName ( @ruleNameList ) { # Boucle sur la liste des règles à surveiller
if ( $monitorRules{rules}{$ruleName}{status} eq 'terminated' ) { # Si la règle à un statut 'terminated'
$countRules--; # On décrémente le compteur des règles à surveiller
_trace( "_waitAllThreadsForCurrentGroup", "Rule $ruleName has been terminated" );
} #
} #
unless ( $countRules ) { # Si le compteur est à 0, il n'y a plus de règles à surveiller
$stop=1; # On met le flag de stop de la boucle interminable à vrai
} #
} #
#
return 1; # Renvoi toujours vrai
} #
#=========================================================================================#
#=========================================================================================#
# Fonction fournissant une liste des règles toujours en exécution, d'après une liste de départ
#=========================================================================================#
sub _removeThreadFinished(@) { #
my ( $ruleName, @expectedPredRules )=@_; # En argument la règle contrainte, la liste des règles contraignantes
my @runningPredRules=(); # Initialisation à vide, de la liste en sortie
#
foreach my $predRule ( @expectedPredRules ) { # Boucle sur la liste des règles contraignantes
my $currentRuleRunning=1; # Flag, considérant que la règle prédécesseur est toujours running
#
if ( $monitorRules{rules}{$predRule}{status} eq 'terminated' ) { # On regarde d'abord dans le monitor, si la règle contraignante courante n'est pas déjà terminée...
$currentRuleRunning=0; # On met le flag running à faux
next; # Et on passe à la règle contraignante suivante
} #
#
foreach my $terminatedThread ( threads->list(threads::joinable) ) { # Boucle sur la liste des threads terminés et "joinable"
my $thrID=$terminatedThread->tid();
if ( exists( $ruleNameForthrID{$thrID} ) ) {
print "_removeThreadFinished - Test $thrID for Rule ".$ruleNameForthrID{$thrID}." joinable $terminatedThread=".$thrIDForRules{$predRule}{thr}." \n";
}
if ( $terminatedThread eq $thrIDForRules{$predRule}{thr} ) { # Si le thread courant est le même que le threads de la règle contraignante courante...
$currentRuleRunning=0; # On met le flag running à faux
$monitorRules{rules}{$predRule}{status}='terminated'; # Dans la monitor, on change le status la règle contraignate à "terminated"
$terminatedThread->join(); # On join ce threads, pour le marqué terminé
} #
} # Fin de boucle sur la list des threads "joinable"
if ( $currentRuleRunning ) { # Si le flag de "running" est toujours à vrai...
_trace( "_removeThreadFinished", "Rule $predRule still running" );
push( @runningPredRules, $predRule ); # on ajoute la règle contraignante dans la list en sortie
} #
} # Fin de la boucle sur les règles contraignantes
#
unless ( @runningPredRules ) { # Si la liste des règles contraignantes est vide...
_trace( "_removeThreadFinished", "No more rule running for $ruleName" );
} else { # Si elle n'est pas vide...'
_trace( "_removeThreadFinished", "Rules still running for $ruleName (".join(', ', @runningPredRules ).")" );
} #
return @runningPredRules; # Renvoi de la liste des règles contraignates restantes
} #
#=========================================================================================#
#=========================================================================================#
# Fonction contrôle des règles contraignantes avant soumission de règle
#=========================================================================================#
sub _controlAndSubmitOneRule(@) { #
my ( $refHashWork, $ruleName, $groupName )=@_; # Prend en argument : La référence du hash de travail, le nom de la règle à contrôler/soumettre, le groupe
#
unless( _removeThreadFinished($ruleName, @{ $refHashWork->{$ruleName}{preds} }) ) { # Si le contrôle des règles contraignantes est vide...
_submitlOneRule( $refHashWork, $ruleName, $groupName ); # Demande de soumission de la règle
delete( $refHashWork->{$ruleName} ); # Suppression de la clé de la règle, du hash de travail
} #
#
return 1; # Renvoi toujours vrai
} #
#=========================================================================================#
#=========================================================================================#
# Fonction de soumission d'une règle
#=========================================================================================#
sub _submitlOneRule(@) { #
my ( $refHashWork, $ruleName, $groupName )=@_; # Prend en arguments : la référence du hash de travail, le nom de la règle, le nom du groupe
#
my $thr=threads->new( # On démarre un nouveau thread
\&$ruleName, # Avec la référence à la fonction (par le nom de la règle courante)
$ruleName, # Premier paramètre, le nom de la règle
@{ $refHashWork->{$ruleName}{param} } # Paramètres suivants, ceux du hash de déclaration des règles
); #
my $thrID=$thr->tid(); # Au passage, on prend l'ID du thread
#
$thrIDForRules{$ruleName}{tid}=$thrID; # On rapproche le nom de la règle et l'ID de son thread (ça peu servir)
$thrIDForRules{$ruleName}{thr}=$thr; # On rapproche le nom de la règle et la référence de son thread (ça peu servir)
#
$ruleNameForthrID{$thrID}=$ruleName; # On rapproche l'ID du thread à la règle courante (ça peu servir)
#
$monitorRules{rules}{$ruleName}{status}='started'; # On fixe le statut de la règle à 'started' dans le hash de monitoring
$monitorRules{rules}{$ruleName}{group}=$groupName; # On fixe le nom du group courant pour la règle courante dans le hash de monitoring (ça peu servir)
#
return 1; #
} #
#=========================================================================================#
#=========================================================================================#
# Fonction de soumission des thread pour toutes les règles d'un group
#=========================================================================================#
sub _submitAllRuleForCurrentGroup(@) { #
#-----------------------------------------------------------------------------------------#
# Function de création de thread pour chaque règle et mise à jour du hash de monoring
# Stocke aussi les références et ID de rapprochement des noms de règles et les leur thread
# permettant un accès plus rapide et concis, pour la surveillance
#-----------------------------------------------------------------------------------------#
my ($refGroup, $groupName)=@_; # Prend le pointeur du groupe de règle, du hash de déclaration des règles
#
my %hashWork=%{ $refGroup->{rules} }; # Copie de la list des règles (avec leurs paramètres) dans un hash de travail
#
while ( keys( %hashWork ) ) { # Boucle tant qu'il y a des règles à soumettre restante
#print Data::Dumper->Dump( [\%hashWork], ['hashWork'] );
#print Data::Dumper->Dump( [\%monitorRules], ['monitorRules'] );
foreach my $ruleName ( keys( %hashWork ) ) { # Boucle sur la liste des règles du hash de travail
_controlAndSubmitOneRule( \%hashWork, $ruleName, $groupName ); # Demande de contrôle des contraignantes et soumission (si possible)
} # Fin de boucle sur le hash de travail
sleep $sleepRulePred; # Attente avant de rebalayer le hash de travail
} # Fin de boucle sur la liste des règles restantes
#
return 1; # Renvoi toujours vrai
} #
#=========================================================================================#
#=========================================================================================#
# Fonction d'appel à l'attente de règles, pour un groupe
#=========================================================================================#
sub _hasPreviousGroup(@) { #
my ($group)=@_; # Prend le nom d'un group en paramètre
if ( scalar( threads->list ) ) { # Si des threads esclaves d'un groupe tournent...
my @ruleNameList=keys( %{ $scheduleRules{groups}{$group}{rules} } ); # On prend la liste des règles du groupe précédent
_trace( "_hasPreviousGroup", "Wait all rules on group $group" );
_waitAllThreadsForCurrentGroup( # On demande d'attendre la fin de toutes ses règles
'group', # Passe le type de l'objet contraint
$group, # Passe le nom de l'objet contraint
@ruleNameList # Passe la liste des règles contraignantes
); #
} #
return 1; # Renvoi toujours vrai
} #
#=========================================================================================#
sub _doStuff(@) {
my ( $previousGroup, $currentGroup )=@_;
if ( $previousGroup and $previousGroup eq $currentGroup ) {
_trace( "_doStuff", "All groupe terminated : Do some stuff at the end !!!", "\n" );
}
elsif ( ! $previousGroup ) {
_trace( "_doStuff", "No group started : Do some stuff at the begin !!!" );
}
else {
_trace( "_doStuff", "Do stuff between group=$previousGroup and group=$currentGroup" );
}
}
#=========================================================================================#
# Fonction principale du MASTER de gestion des groupes de règles
#=========================================================================================#
sub startManageGroup(@) { #
my $previousGroup; # Déclare le group précédent vide par défaut
foreach my $group ( sort keys( %{ $scheduleRules{groups} } ) ) { # Boucle sur la liste des groupe de règles
my $refGroup=\%{ $scheduleRules{groups}{$group} }; # Pour alleger le code, on prend la ref du hash des règles du groupe courant
if ( $previousGroup ) { # Si il y avait un groupe précédent...
_trace( "startManageGroup", "New rules group, wait all rules previous group" );
_hasPreviousGroup( $previousGroup ); # Y-a-t'il encore des règles en cours ?
_trace( "startManageGroup", "Rules group=$previousGroup terminated" );
} #
#
#-----------------------------------------------------------------------------------------#
# Ici il n'y a pas (ou il n'y a plus) de threads esclaves actifs pour le group précédent
#-----------------------------------------------------------------------------------------#
_doStuff( $previousGroup, $group ); # On peut faire des trucs entre le $previousGroup et le $group, ici
$previousGroup=$group; # Comme on débute un nouveau group, on met son nom dans le previousGroup (pour plus tard)
_trace( "startManageGroup", "Rules group=$group started", "\n" );
_submitAllRuleForCurrentGroup( $refGroup, $group ); # On demande la soumission de tous les threads des règles du group courant
#
} #
#
print "startManageGroup - All group ended wait last rules\n";
_hasPreviousGroup( $previousGroup ); # Attend la fin de toutes les règles du dernier groupe
_doStuff( $previousGroup, $previousGroup ); # On peut faire des trucs à la fin du dernier groupe, ici
#
return 1; # Renvoi toujours vrai
} #
#=========================================================================================#
sub makeRuleGraph(@) {
}
#=========================================================================================#
# Déclaration toute les règles en local pour les tests
#=========================================================================================#
sub rule01 { my ($ruleName, $wait)=@_; _ruleContent( $ruleName, $wait ); return 1; }
sub rule02 { my ($ruleName, $wait)=@_; _ruleContent( $ruleName, $wait ); return 1; }
sub rule03 { my ($ruleName, $wait)=@_; _ruleContent( $ruleName, $wait ); return 1; }
sub rule04 { my ($ruleName, $wait)=@_; _ruleContent( $ruleName, $wait ); return 1; }
sub rule05 { my ($ruleName, $wait)=@_; _ruleContent( $ruleName, $wait ); return 1; }
sub rule06 { my ($ruleName, $wait)=@_; _ruleContent( $ruleName, $wait ); return 1; }
sub rule07 { my ($ruleName, $wait)=@_; _ruleContent( $ruleName, $wait ); return 1; }
sub rule08 { my ($ruleName, $wait)=@_; _ruleContent( $ruleName, $wait ); return 1; }
sub rule09 { my ($ruleName, $wait)=@_; _ruleContent( $ruleName, $wait ); return 1; }
#=========================================================================================#
# Pour les tests, toutes les règles executent le même code (avec un wait différent)
#=========================================================================================#
sub _ruleContent(@) {
my ($ruleName, $wait)=@_;
_trace( "===BEG=== $ruleName", "Start for wait $wait sec" );
sleep $wait;
_trace( "===END=== $ruleName", "terminated" );
return 1;
}
#=========================================================================================#
# MAIN : Boucle de control des états entre les groupes
#=========================================================================================#
print "MAIN - Start scheduler\n";
startManageGroup();
#=========================================================================================#
# Pour le debug, on regarde un peut si tout est OK...
#=========================================================================================#
#print Data::Dumper->Dump( [\%thrIDForRules], ['thrIDForRules'] );
#print Data::Dumper->Dump( [\%ruleNameForthrID], ['ruleNameForthrID'] );
#print Data::Dumper->Dump( [\%monitorRules], ['monitorRules'] );
print "\nMAIN - End scheduler\n";
exit 0; |