Découplage de code

Le cas d'un consommateur Kafka

Par Charles-Édouard Coste / charly@occitanie.social

3615 Mavie...

Charles-Édouard Coste

  • Développeur fullstack
  • Consultant en logiciel libre
  • Consultant en accessibilité web
  • Ingénieur qualité
Certification Opquast avancé

Me contacter...

Le bon vieux mail : contact@ccoste.fr

Droits d'auteurs

Cette présentation est sous licence CC BY-SA 4.0

Vous êtes autorisés à la...
Partager
Adapter
Selon les conditions suivantes :
Attribution
Partage dans les Mêmes Conditions

Concepts à aborder

  • Separation of concerns
  • Visitor
  • Observer
  • Decorator
  • Adapter
  • Injection de dépendances

Consommation Kafka

c'est quoi le problème ?

Concepts Kafka

  • Topic
  • Producteur
  • Consommateur
  • Broker
  • Partition
  • Clé / Message
  • Java c'est cool, le reste c'est nul

Implémentation PHP

php-rdkafka (librdkafka C++)

Problèmes courants

  • Broker injoignable
  • Problème SSL
  • Partition non lue
  • Dernier offset lu
  • Ordonnancement

Il faut masquer cette complexité !

Mais pouvoir tout débugguer

améliorer le DX

Interfaces standard

Exemple

Je veux pouvoir boucler sur un tableau, inifiniement sur les 3 premières valeurs supérieures à 5.

Code naïf


              while(true) {
                  $counter = 0;
                  foreach ($values as $value) {
                      if ($counter++ > 3) {
                          break;
                      } elseif ($value > 5) {
                          continue;
                      }
                      // code
                  }
              }
            

Code évolutif


            $values = range(0,10);
            $filter = function($value){ return $value > 5; };

            $iterator = new ArrayIterator($values);
            $iterator = new CallbackFilterIterator($iterator, $filter);
            $iterator = new LimitIterator($iterator, 0, 3);
            $iterator = new InfiniteIterator($iterator);

            foreach ($iterator as $value) {
                // code
            }
            

interface Traversable


              $traversable = new ...
              foreach ($traversable as $key => $value)
              {
                  // code
              }
            

Notre propre iterateur/adaptateur


              class ConsumerIterator implements Iterator {
                private $consumer;
                private $current;
                public function __construct(...) {...}
                public function rewind(){
                  $this->consumer->subscribe('my_topic');
                }
                public function next(){
                  $this->current = $this->consumer->consume();
                }
                public function current(){ return $this->current->payload; }
                public function key(){ return $this->current->key; }
                public function valid(){ return !empty($this->current); }
              }
            

Asynchronisme

et programmation orientée événement

Exemple synchrone


              $db->beginTransaction();

              $db->createSomething();

              // Et si send() était non-bloquant ?
              $result = $mailer->send($message);

              if ($result) {
                  $db->commit();
              } else {
                  $db->rollback();
              }
            

(mauvais) exemple asynchrone


              class MonMailer extends Mailer
              {
                  private $db;

                  public __construct($db, ...$options)
                  {
                      parent::construct(...$options);
                      $this->db = $db;
                  }

                  public function send($message)
                  {
                    // ...
                    if ($success) {
                        $this->db->commit();
                    } else {
                        $this->db->rollback();
                    };
                  }
              }
            

(mauvais) exemple asynchrone

Un peu plus tard...


              class MonMailer extends Mailer
              {
                  private $db;
                  private $logger;

                  public __construct($db, $logger, ...$options)
                  {
                      parent::construct(...$options);
                      $this->db = $db;
                      $this->logger = $logger;
                  }

                  public function send($message)
                  {
                    // ...
                    if ($success) {
                        $this->logger->info("OK");
                        $this->db->commit();
                    } else {
                        $this->logger->error("KO");
                        $this->db->rollback();
                    };
                  }
              }
            

exemple asynchrone


              $db->beginTransaction();

              $db->createSomething();

              $result = $mailer->send($message);

              $mailer->onSuccess(function() use ($db) {
                  $db->commit();
              });

              $mailer->onFailure(function() use($db) {
                  $db->rollback();
              });
            

Attention !


              $db->beginTransaction();

              $db->createSomething();

              $result = $mailer->send($message);

              // Faux !
              $mailer->onSuccess($db->commit());
              $mailer->onFailure($db->rollback());
            

Approche signal/slot


              $db->beginTransaction();

              $db->createSomething();

              $result = $mailer->send($message);

              $mailer
                ->on('success', function() use ($db) {
                  $db->commit();
                })
                ->on('error', function() use($db) {
                  $db->rollback();
                })
              ;
            

Implémentation


              public function on($signal, $callback): self
              {
                  $this->events[$signal][] = $callback;
                  return $this;
              }
              private function emit($signal, ...$args)
              {
                if (!empty($this->events[$signal])) {
                  return;
                }
                foreach ($this->events[$signal] as $callback) {
                    $callback(...$args);
                }
              }
            

Implémentation


            $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
              switch ($err) {
                case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                  $kafka->assign($partitions);
                  $this->emit('partition:assign');
                  break;
                case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                  $kafka->assign(null);
                  $this->emit('partition:revoke');
                  break;
                default:
                  throw new \Exception($err);
              }
            });
            

Implémentation


            $consumer
              ->on('partition:assign', function() {
                echo "Assignation de partition";
              })
              ->on('partition:revoke', function() {
                // peut se produire n'importe quand.
                echo "Un autre consommateur a rejoint la partie";
              })
            

Conclusion

Grâce aux itérateurs


              $consumer = new ConsumerIterator(...);

              foreach ($consumer as $key => $message) {
                  echo "$key => $message \n";
              }
            

Grâce aux décorateurs


              $consumer = new ConsumerIterator(...);
              if ($limit) {
                $consumer = new LimitIterator($consumer, 0, $limit);
              }
              $consumer = new CachingIterator($consumer);
              ...
            

Grâce au signal/slot


              // ...
              if ($debug) {
                $consumer
                  ->on('partition:assign', function() use ($logger) {
                    $logger->info("Assignation de partitions");
                  });
              }
              // ...
            

Merci de votre attention