Le bon vieux mail : contact@ccoste.fr
c'est quoi le problème ?
php-rdkafka (librdkafka C++)
Mais pouvoir tout débugguer
améliorer le DX
Je veux pouvoir boucler sur un tableau, inifiniement sur les 3 premières valeurs supérieures à 5.
while(true) {
$counter = 0;
foreach ($values as $value) {
if ($counter++ > 3) {
break;
} elseif ($value > 5) {
continue;
}
// code
}
}
$values = range(0,10);
$filter = function($value){ return $value > 5; };
$iterator = new ArrayIterator($values);
$iterator = new CallbackFilterIterator($iterator, $filter);
$iterator = new LimitIterator($iterator, 0, 3);
$iterator = new InfiniteIterator($iterator);
foreach ($iterator as $value) {
// code
}
$traversable = new ...
foreach ($traversable as $key => $value)
{
// code
}
class ConsumerIterator implements Iterator {
private $consumer;
private $current;
public function __construct(...) {...}
public function rewind(){
$this->consumer->subscribe('my_topic');
}
public function next(){
$this->current = $this->consumer->consume();
}
public function current(){ return $this->current->payload; }
public function key(){ return $this->current->key; }
public function valid(){ return !empty($this->current); }
}
et programmation orientée événement
$db->beginTransaction();
$db->createSomething();
// Et si send() était non-bloquant ?
$result = $mailer->send($message);
if ($result) {
$db->commit();
} else {
$db->rollback();
}
class MonMailer extends Mailer
{
private $db;
public __construct($db, ...$options)
{
parent::construct(...$options);
$this->db = $db;
}
public function send($message)
{
// ...
if ($success) {
$this->db->commit();
} else {
$this->db->rollback();
};
}
}
Un peu plus tard...
class MonMailer extends Mailer
{
private $db;
private $logger;
public __construct($db, $logger, ...$options)
{
parent::construct(...$options);
$this->db = $db;
$this->logger = $logger;
}
public function send($message)
{
// ...
if ($success) {
$this->logger->info("OK");
$this->db->commit();
} else {
$this->logger->error("KO");
$this->db->rollback();
};
}
}
$db->beginTransaction();
$db->createSomething();
$result = $mailer->send($message);
$mailer->onSuccess(function() use ($db) {
$db->commit();
});
$mailer->onFailure(function() use($db) {
$db->rollback();
});
$db->beginTransaction();
$db->createSomething();
$result = $mailer->send($message);
// Faux !
$mailer->onSuccess($db->commit());
$mailer->onFailure($db->rollback());
$db->beginTransaction();
$db->createSomething();
$result = $mailer->send($message);
$mailer
->on('success', function() use ($db) {
$db->commit();
})
->on('error', function() use($db) {
$db->rollback();
})
;
public function on($signal, $callback): self
{
$this->events[$signal][] = $callback;
return $this;
}
private function emit($signal, ...$args)
{
if (!empty($this->events[$signal])) {
return;
}
foreach ($this->events[$signal] as $callback) {
$callback(...$args);
}
}
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
$kafka->assign($partitions);
$this->emit('partition:assign');
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
$kafka->assign(null);
$this->emit('partition:revoke');
break;
default:
throw new \Exception($err);
}
});
$consumer
->on('partition:assign', function() {
echo "Assignation de partition";
})
->on('partition:revoke', function() {
// peut se produire n'importe quand.
echo "Un autre consommateur a rejoint la partie";
})
$consumer = new ConsumerIterator(...);
foreach ($consumer as $key => $message) {
echo "$key => $message \n";
}
$consumer = new ConsumerIterator(...);
if ($limit) {
$consumer = new LimitIterator($consumer, 0, $limit);
}
$consumer = new CachingIterator($consumer);
...
// ...
if ($debug) {
$consumer
->on('partition:assign', function() use ($logger) {
$logger->info("Assignation de partitions");
});
}
// ...