Skip to content

Commit

Permalink
add queue
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouaini528 committed Dec 8, 2020
1 parent 3c52f67 commit 6e70dfc
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 21 deletions.
41 changes: 27 additions & 14 deletions src/Api/WebSocket/SocketClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -120,23 +120,38 @@ protected function getData($global,$callback=null,$sub=[]){
//默认返回所有数据
if(empty($sub)){
foreach ($all_sub as $k=>$v){
if(is_array($v)) $table=$k;
else $table=$v;
if(is_array($v)){
print_r($v);
if(empty($this->keysecret) || $this->keysecret['key']!=$v[1]['key']) continue;

$data=$global->get(strtolower($table));
if(empty($data)) continue;
$temp[$table]=$data;
$table=$this->userKey($v[1],$v[0]);
$data=$global->getQueue(strtolower($table));
$temp[strtolower($table)]=$data;
}else{
$data=$global->get(strtolower($v));
$temp[strtolower($v)]=$data;
}
}
}else{
//返回规定的数据
if(!empty($this->keysecret)) {
//是否有私有数据
if(isset($all_sub[$this->keysecret['key']])) $sub=array_merge($sub,$all_sub[$this->keysecret['key']]);
}

/*print_r($sub);
die;*/
foreach ($sub as $k=>$v){
if(count($v)==1) $table=$v[0];
else {
//private
if(!isset($v[1]['key'])) continue;
if(count($v)==1) {
$table=$v[0];
$data=$global->get(strtolower($table));
} else {
//判断私有数据是否需要走队列数据
//$temp_v=explode(self::$USER_DELIMITER,$v);
$table=$this->userKey($v[1],$v[0]);
$data=$global->getQueue(strtolower($table));
}
$data=$global->get(strtolower($table));

if(empty($data)) continue;

$temp[$table]=$data;
Expand All @@ -159,12 +174,10 @@ function test(){
}

function test2(){
//print_r($this->client->global_key);
$global_key=$this->client->global_key;
foreach ($global_key as $k=>$v){
echo $k.PHP_EOL;
print_r($this->client->$v);

echo count($this->client->$v).'----'.$k.PHP_EOL;
echo json_encode($this->client->$v).PHP_EOL;
}
}
}
5 changes: 4 additions & 1 deletion src/Api/WebSocket/SocketFunction.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

trait SocketFunction
{
//标记分隔符
static $USER_DELIMITER='===';

//对数据轮询 获取当前数据的订阅ID
protected function getInstrumentId(array $array){
if(isset($array['currency'])) return $array['currency'];
Expand Down Expand Up @@ -95,6 +98,6 @@ protected function log($message){
* @param $keysecret
*/
protected function userKey(array $keysecret,string $sub){
return $keysecret['key'].'='.$sub;
return $keysecret['key'].self::$USER_DELIMITER.$sub;
}
}
47 changes: 46 additions & 1 deletion src/Api/WebSocket/SocketGlobal.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,51 @@ protected function save($key,$value){
else $this->client->$key=$value;
}

/**
* 对了获取数据
* @param $key
* @return array
*/
protected function getQueue($key){
if(!isset($this->client->$key) || empty($this->client->$key)) return [];

do{
$old_value=$new_value=$this->client->$key;

if(empty($new_value)) return [];
//队列先进先出。
$value=array_shift($new_value);
}
while(!$this->client->cas($key, $old_value, $new_value));

return $value;
}

/**
* 队列保存数据
* @param $key
* @param $value
*/
protected function saveQueue($key,$value){
//最大存储数据量,超过后保留一条最新的数据,其余数据全部删除。
$max=100;

if(!isset($this->client->$key)) $this->add($key,[$value]);
else {
do{
$old_value=$new_value=$this->client->$key;

//超过最大数据量,保留最新数据
if(count($new_value)>$max){
$new_value=[$value];
}else{
array_push($new_value,$value);
}
}
while(!$this->client->cas($key, $old_value, $new_value));
}
}

protected function addSubUpdate($type='public',$data=[]){
do{
$old_value=$new_value=$this->client->add_sub;
Expand Down Expand Up @@ -78,7 +123,7 @@ protected function allSubUpdate($type='public',$data=[]){
$old_value=$new_value=$this->client->all_sub;
foreach ($data['sub'] as $v){
if($type=='public') $key=$v;
if($type=='private') $key=$v[1]['key'].$v[0];
if($type=='private') $key=$v[1]['key'].self::$USER_DELIMITER.$v[0];
$new_value[$key]=$v;
}
}
Expand Down
8 changes: 6 additions & 2 deletions src/Api/WebSocket/SocketServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,13 @@ private function onMessage($global){
$table=$data['table'].':'.$this->getInstrumentId($data);
$table=strtolower($table);

if($con->tag != 'public') $table=$this->userKey($con->tag_keysecret,$table);
if($con->tag != 'public') {
$table=$this->userKey($con->tag_keysecret,$table);
$global->saveQueue($table,$data);
}else{
$global->save($table,$data);
}

$global->save($table,$data);
return;
}

Expand Down
7 changes: 4 additions & 3 deletions tests/websocket/client.php
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@
'futures/position:BCH-USD-210326',//If there are private channels, $okex->keysecret() must be set
]);
print_r(json_encode($data));
die;

//The second way callback
$okex->keysecret($key_secret[0]);
Expand Down Expand Up @@ -250,9 +251,9 @@

case 10005:{
$okex->keysecret($key_secret[0]);
$okex->subscribe([
'futures/position:BCH-USD-210326',
]);
$data=$okex->getSubscribes();
print_r(json_encode($data));
die;
break;
}

Expand Down

0 comments on commit 6e70dfc

Please sign in to comment.