The parallel\Events class

(0.9.0)

The Event Loop

The Event loop monitors the state of sets of futures and or channels (targets) in order to perform read (parallel\Future::value(), parallel\Channel::recv()) and write (parallel\Channel::send()) operations as the targets become available and the operations may be performed without blocking the event loop.

类摘要

final parallel\Events implements Countable , Traversable {
/* Input */
public setInput ( Input $input ) : void
/* Targets */
public addChannel ( parallel\Channel $channel ) : void
public addFuture ( string $name , parallel\Future $future ) : void
public remove ( string $target ) : void
/* Behaviour */
public setBlocking ( bool $blocking ) : void
public setTimeout ( int $timeout ) : void
/* Polling */
public poll ( ) : parallel\Events\Event|null
}

Table of Contents

User Contributed Notes

gam6itko 25-Sep-2021 07:24
<?php

// example below shows how to stop child thread from main thread

use parallel\{Channel, Events, Events\Event\Type, Runtime};

$fnThread = static function (Channel $channel) {
   
$events = new Events();
   
$events->addChannel($channel);
   
$events->setBlocking(false); // don't block on Events::poll()

   
while (true) {
        if (
$event = $events->poll()) {
           
//It seems, the target gets deleted after returning an event, so add it again.
           
$events->addChannel($channel);

            if (
Type::Read === $event->type) {
                echo
"- received value: ".$event->value.PHP_EOL;
            } elseif (
Type::Close === $event->type) {
                echo
"- receiving close event".PHP_EOL;
                return
"i'm done";
            }

        }
        echo
"-\n";
       
usleep(500_000);
    }
};

// main thread
$runtime = new Runtime();
$channel = new Channel();
$future = $runtime->run($fnThread, [$channel]);

$channel->send('message1');
sleep(2);
$channel->send('message2');
sleep(2);

echo
"closing channel\n";
$channel->close();
echo
"future said: ".$future->value();
echo
PHP_EOL;
gam6itko 11-Sep-2021 03:28
<?php

// this example shows how Events handle Future events

use parallel\{Events, Events\Event, Runtime};

$events = new Events();

$runtime = new Runtime();

//Read (type: 1)
$future = $runtime->run(
    static function (
string $name) {
        return
"Future#$name result";
    },
    [
'Read']
);
$events->addFuture("Future#Read", $future);

//Cancel  (type: 4)
$future = $runtime->run(
    static function (
string $name) {
        throw new \
Exception("Exception#$name");
    },
    [
'Cancel']
);
$events->addFuture("Future#Cancel", $future);

//Kill  (type: 5)
$future = $runtime->run(
    static function () {
       
sleep(100000);
    },
    []
);
$events->addFuture("Future#Kill", $future);
$future->cancel(); //kill it

//Error  (type: 6)
$future = $runtime->run(
    static function () {
       
$memoryEater = [];
       
$i = 0;
        while (++
$i) {
           
$memoryEater[] = $i;
        }
    }
);
$events->addFuture("Future#Error", $future);

// reading events

/** @var Event $event */
foreach ($events as $i => $event) {
    echo
str_pad('', 50, '=') . " EVENT_$i\n";
   
var_dump($event);
    echo
"\n";
}
s dot laufer at homegear dot email 22-Dec-2019 03:02
<?php
/**
Example showing the usage of events.

The documentation still is very thin, so I'm not sure, the example is the best solution. But it works.
*/
use parallel\{Channel,Runtime,Events,Events\Event};

$myThread = function(Channel $channel) {
   
$events = new Events();
   
$events->addChannel($channel);
   
//$events->setBlocking(false); //Uncomment to don't block on Events::poll()
   
$events->setTimeout(1000000); //Comment when not blocking

   
while(true)
    {
       
/*
        ...
        Your code.
        ...
        */

        //Read all available events
       
try
        {
           
$event = NULL;
            do
            {
               
$event = $events->poll(); //Returns non-null if there is an event
               
if($event && $event->source == 'myChannel')
                {
                   
//It seems, the target gets deleted after returning an event,
                    //so add it again.
                   
$events->addChannel($channel);
                    if(
$event->type == Event\Type::Read)
                    {
                        if(
is_array($event->value) && count($event->value) > 0)
                        {
                            if(
$event->value['name'] == 'stop')
                            {
                                echo
'Stopping thread';
                                return;
//Stop
                           
}
                            else
                            {
                                echo
'Event: '.$event->value['name'].' => '.$event->value['value'].PHP_EOL;
                            }
                        }
                    }
                    else if(
$event->type == Event\Type::Close) return; //Stop
               
}
            }
            while(
$event);
        }
        catch(
Events\Error\Timeout $ex)
        {
           
//Timeout
           
echo 'Timeout'.PHP_EOL;
        }
    }
};

class
MyClass {
    private
$runtime;
    private
$future;
    private
$channel;

    public function
start() {
       
//Create runtime
       
$this->runtime = new Runtime();

       
//Create buffered channel.
        //Buffered channels don't block on Channel::send().
        //Note that target names need to be unique within the process.
       
$this->channel = Channel::make('myChannel', Channel::Infinite);

        global
$myThread;
       
$this->future = $this->runtime->run($myThread, [$this->channel]);
    }

    public function
stop() {
       
$this->channel->send(['name' => 'stop', 'value' => true]);

       
$this->future->value(); //Wait for thread to finish
       
$this->channel->close();
    }

    public function
emit(string $name, $value)
    {
       
$this->channel->send(['name' => $name, 'value' => $value]);
    }
}

$a = new MyClass();
$a->start();

for(
$i = 0; $i < 5; $i++)
{
   
$a->emit('test', $i);
   
sleep(0.5);
}

sleep(2);

for(
$i = 5; $i < 10; $i++)
{
   
$a->emit('test', $i);
   
sleep(0.5);
}

$a->stop();
?>
PHP8中文手册 站长在线 整理 版权归PHP文档组所有