1 module observe; 2 3 import vibe.d; 4 import vibe.core.core; 5 import vibe.core.concurrency; 6 import vibe.core.drivers.native; // had to add this to get rdmd working 7 import vibe.core.drivers.win32; // had to add this to get rdmd working 8 import std.traits; 9 import std.functional; 10 import std.algorithm; 11 import std.range; 12 import std.stdio; 13 import core.atomic; 14 15 template ElementType(R) 16 { 17 static if (isInputRange!R) 18 { 19 alias std.range.ElementType!R ElementType; 20 } else 21 alias ParameterTypeTuple!(R.NextFunc)[0] ElementType; 22 } 23 alias ElementType PayloadType; 24 template isObservable(O) 25 { 26 enum bool isObservable = is(typeof( 27 (inout int = 0) 28 { 29 O o = void; // can define a observable object 30 alias ElementType!O E; 31 o.subscribe((scope E e){},(Exception e){},(){}); // has subscribe function 32 })); 33 } 34 alias void delegate (Exception) ErrorFunc; 35 alias void delegate () CompletedFunc; 36 37 struct Subscription(Observable) 38 { 39 alias void delegate (ElementType!Observable ) NextFunc; 40 Observable o; 41 NextFunc onNext; 42 ErrorFunc onError; 43 CompletedFunc onCompleted; 44 bool active = true; 45 void dispose() 46 { 47 o.dispose(this); 48 } 49 } 50 class ObservableRange(R) 51 { 52 alias void delegate (ElementType!R) NextFunc; 53 alias Subscription!(ObservableRange!R) Sub; 54 private 55 { 56 Sub[] subscriptions; 57 R range; 58 } 59 this(R r) 60 { 61 this.range = r; 62 } 63 Sub subscribe(NextFunc onNext = null, ErrorFunc onError = null, CompletedFunc onCompleted = null) 64 { 65 auto sub = Sub(this,onNext,onError,onCompleted); 66 subscriptions ~= sub; 67 if (onNext is null) // why is this again? 68 return sub; 69 runTask( 70 { 71 foreach(e; this.range) 72 { 73 foreach(s; subscriptions) 74 { 75 if (s.onNext && s.active) 76 s.onNext(e); 77 } 78 // todo: we can probably do this a little bit more efficient 79 // remove inactive subscriptions 80 subscriptions = subscriptions.remove!(s => !s.active).array(); 81 } 82 foreach(s; subscriptions) 83 if (s.onCompleted && s.active) 84 s.onCompleted(); 85 subscriptions.length = 0; 86 }); 87 return sub; 88 } 89 void dispose(Sub s) 90 { 91 // simply mark the subscription in the list as inactive 92 auto idx = subscriptions.countUntil(s); 93 if (idx > 0) 94 subscriptions[idx].active = false; 95 } 96 } 97 ObservableRange!R observe(R)(R r) 98 if (isInputRange!R || isIterable!R) 99 { 100 return new ObservableRange!R(r); 101 } 102 alias observe observeRange; 103 unittest 104 { 105 assert(isObservable!(typeof(iota(1, 10).observe))); 106 } 107 class Subject(E) 108 { 109 alias E ElementType; 110 alias void delegate (ElementType) NextFunc; 111 alias Subscription!(Subject!E) Sub; 112 private 113 { 114 Sub[] subscriptions; 115 } 116 void onNext(ElementType e) 117 { 118 foreach(s; subscriptions) 119 if (s.onNext && s.active) 120 s.onNext(e); 121 } 122 void onError(Exception e) 123 { 124 foreach(s; subscriptions) 125 if (s.onError && s.active) 126 s.onError(e); 127 } 128 void onCompleted() 129 { 130 foreach(s; subscriptions) 131 if (s.onCompleted && s.active) 132 s.onCompleted(); 133 } 134 Sub subscribe(NextFunc onNext = null, ErrorFunc onError = null, CompletedFunc onCompleted = null) 135 { 136 auto sub = Sub(this,onNext,onError,onCompleted); 137 subscriptions ~= sub; 138 return sub; 139 } 140 void dispose(Sub s) 141 { 142 subscriptions = subscriptions.remove!(s => !s.active).array(); 143 s.onCompleted(); 144 } 145 } 146 unittest 147 { 148 assert(is(ElementType!(Subject!int) : int)); 149 assert(isObservable!(Subject!int)); 150 } 151 unittest 152 { 153 auto s = new Subject!int; 154 s.subscribe((i) => assert(i == 1)); 155 s.onNext(1); 156 assert(is(ElementType!(Subject!int) : int)); 157 } 158 class Observer(E) : Subject!E 159 { 160 alias void delegate(Observer!(E)) Subscriber; 161 private 162 { 163 bool subscribed = false; 164 Subscriber subscriber; 165 } 166 this(Subscriber subscriber) 167 { 168 this.subscriber = subscriber; 169 } 170 override Sub subscribe(NextFunc onNext = null, ErrorFunc onError = null, CompletedFunc onCompleted = null) 171 { 172 auto sub = super.subscribe(onNext,onError,onCompleted); 173 if (!subscribed) 174 { 175 subscribed = true; 176 subscriber(this); 177 } 178 return sub; 179 } 180 } 181 // here we test some basic UFCS and unaryFun behaviour that we rely on 182 template foo(alias fun) 183 { 184 auto foo(T)(T n) 185 { 186 alias func = unaryFun!fun; 187 return func(n); 188 } 189 } 190 unittest 191 { 192 assert(foo!(a => a*a)(4) == 16); 193 assert(4.foo!(a => a*a) == 16); 194 assert(foo!"a*a"(4) == 16); 195 assert(4.foo!"a*a" == 16); 196 auto square = (int n) { return n*n; }; 197 assert(foo!square(4) == 16); 198 assert(4.foo!square == 16); 199 } 200 template map(alias fun) 201 { 202 auto map(Obs)(Obs observable) 203 if (isObservable!Obs) 204 { 205 alias func = unaryFun!fun; 206 alias AppliedReturnType = typeof(func(ElementType!(Obs).init)); 207 208 return new Observer!(AppliedReturnType) 209 ((self){ 210 observable.subscribe( 211 (e){ self.onNext(func(e)); }, 212 &self.onError, 213 &self.onCompleted 214 ); 215 }); 216 } 217 } 218 //alias map oMap; 219 unittest 220 { 221 auto s = new Subject!int; 222 auto p = s.map!(s => "abs"); 223 p.subscribe((s){ assert(s == "abs");}); 224 s.onNext(1); 225 } 226 unittest 227 { 228 bool completed = false; 229 auto s = new Subject!int; 230 auto q = s.map!(s => s*s).map!(s => s*2); 231 q.subscribe((s) { assert(s == 18);},null,(){completed = true;}); 232 s.onNext(3); 233 s.onCompleted(); 234 assert(completed == true); 235 } 236 unittest 237 { 238 bool completed = false; 239 auto s = new Subject!int; 240 auto sub = s.subscribe((v) { assert(v == 18);},null,(){completed = true;}); 241 s.onNext(18); 242 sub.dispose(); 243 assert(completed == true); 244 } 245 246 class FiberSubject(E) : Subject!E 247 { 248 alias void delegate (E) NextFunc; 249 private 250 { 251 Task t; 252 bool started = false; 253 } 254 void join() 255 { 256 forceTask(); 257 t.join(); 258 } 259 private void forceTask() 260 { 261 if (started) 262 return; 263 t = runTask( 264 { 265 bool running = true; 266 while (running) { 267 receive( 268 (E e) 269 { 270 foreach(s; subscriptions) 271 if (s.onNext && s.active) 272 s.onNext(e); 273 }, 274 (Exception e) 275 { 276 foreach(s; subscriptions) 277 if (s.onError && s.active) 278 s.onError(e); 279 running = false; 280 }, 281 (bool) 282 { 283 foreach(s; subscriptions) 284 if (s.onCompleted && s.active) 285 s.onCompleted(); 286 running = false; 287 } 288 ); 289 } 290 }); 291 started = true; 292 } 293 override void onNext(E e) 294 { 295 forceTask(); 296 t.send(e); 297 } 298 override void onError(Exception e) 299 { 300 forceTask(); 301 t.send(e); 302 } 303 override void onCompleted() 304 { 305 forceTask(); 306 t.send(true); 307 } 308 } 309 310 unittest 311 { 312 auto s = new FiberSubject!int; 313 s.subscribe((s) { assert(s == 14);},null,(){writeln("completed");exitEventLoop();}); 314 s.onNext(14); 315 s.onCompleted(); 316 runEventLoop(); 317 } 318 unittest 319 { 320 auto s = new FiberSubject!int; 321 auto q = s.map!(s=>s*s); 322 q.subscribe((s) { assert(s == 28*28);},null,(){exitEventLoop();}); 323 s.onNext(28); 324 s.onCompleted(); 325 runEventLoop(); 326 } 327 328 Observer!(Tuple!(ElementType!ObsA,ElementType!ObsB)) zip(ObsA, ObsB)(ObsA a, ObsB b) 329 if (isObservable!ObsA && isObservable!ObsB) 330 { 331 auto result = new Observer!(Tuple!(ElementType!ObsA,ElementType!ObsB)) 332 ((self){ 333 import vibe.core.sync; 334 auto event = createManualEvent(); 335 int emitA = event.emitCount(); 336 int emitB = emitA; 337 ElementType!ObsA objA; 338 bool complete = false; 339 a.subscribe((ae) 340 { 341 objA = ae; 342 event.emit(); 343 emitA = event.wait(emitA + 1); 344 }, 345 (e) { self.onError(e); }, // TODO: unsubscribe from underlying observable 346 () { complete = true; event.emit(); self.onCompleted(); } 347 ); 348 b.subscribe((be) 349 { 350 emitB = event.wait(emitB) + 1; 351 if (!complete) 352 self.onNext(Tuple!(ElementType!ObsA,ElementType!ObsB)(objA,be)); 353 event.emit(); 354 }, 355 (e) { self.onError(e); }, // TODO: unsubscribe from underlying observable 356 () { event.emit(); self.onCompleted(); } 357 ); 358 }); 359 360 return result; 361 } 362 363 unittest 364 { 365 auto s1 = new FiberSubject!int; 366 auto s2 = new FiberSubject!string; 367 auto p = s1.zip(s2); 368 p.subscribe((t) { assert(t[0] == 1); assert(t[1] == "abc"); },null,(){exitEventLoop();}); 369 s1.onNext(1); 370 s2.onNext("abc"); 371 s2.onNext("abc"); 372 s1.onNext(1); 373 s1.onCompleted(); 374 s2.onCompleted(); 375 runEventLoop(); 376 } 377 378 unittest 379 { 380 auto range = iota(0, 10); 381 auto s1 = new FiberSubject!int; 382 auto s = range.observe.zip(s1); 383 s.subscribe((t) { assert(t == tuple(0,5)); },null,(){exitEventLoop();}); 384 s1.onNext(5); 385 s1.onCompleted(); 386 runEventLoop(); 387 } 388 389 template filter(alias fun) 390 { 391 alias func = unaryFun!fun; 392 auto filter(Obs)(Obs observable) 393 if (is(typeof(func(ElementType!(Obs).init)) : bool) && isObservable!Obs) 394 { 395 return new Observer!(ElementType!Obs) 396 ((self){ 397 observable.subscribe 398 ( 399 (i) { if (func(i)) self.onNext(i); }, 400 &self.onError, 401 &self.onCompleted 402 ); 403 }); 404 } 405 } 406 unittest 407 { 408 // somehow this has tasks still running at exit 409 auto range = iota(1, 10); 410 range 411 .observe 412 .map!((i){return i*i;}) 413 .filter!(i=>i==81) 414 .subscribe((i) { assert(i == 81); },null,(){exitEventLoop();}); 415 runEventLoop(); 416 } 417 418 unittest 419 { 420 auto range = iota(1, 3); 421 auto s = range.observe.filter!(i=>i % 2 == 0); 422 s.subscribe((t) { assert(t == 2); },null,(){exitEventLoop();}); 423 runEventLoop(); 424 } 425 426 unittest 427 { 428 auto event = createManualEvent(); 429 assert(event.wait(40) == 0); 430 event.emit(); 431 assert(event.wait(40) == 1); 432 } 433 unittest 434 { 435 auto event = createManualEvent(); 436 event.emit(); 437 assert(event.wait(0) == 1); 438 } 439 class Fork(E) : Observer!E 440 { 441 private 442 { 443 shared int legs; 444 int size; 445 ReturnType!createManualEvent event; 446 } 447 this(int legs, Subscriber s) 448 { 449 super(s); 450 this.legs = legs; 451 size = legs; 452 event = createManualEvent(); 453 } 454 override void onCompleted() 455 { 456 int cnt = event.emitCount(); 457 while (cnt+size > legs) 458 cnt = event.wait(cnt); 459 super.onCompleted(); 460 } 461 override void onNext(E e) 462 { 463 event.wait(legs); 464 legs.atomicOp!"-="(1); 465 runTask( 466 { 467 foreach(s; subscriptions) 468 if (s.onNext && s.active) 469 s.onNext(e); 470 legs.atomicOp!"+="(2); 471 event.emit(); 472 }); 473 } 474 } 475 476 Fork!(ElementType!Obs) fork(Obs)(Obs o, int legs) 477 if (isObservable!Obs) 478 { 479 import std.exception; 480 enforce(legs > 0, "can't fork without legs"); 481 return new Fork!(ElementType!Obs)(legs, 482 (self){ 483 o.subscribe( 484 &self.onNext, 485 &self.onError, 486 &self.onCompleted 487 ); 488 }); 489 } 490 491 492 unittest 493 { 494 int cnt = 0; 495 iota(0,10).observe.fork(10).subscribe((e){ cnt += e; },null,() {exitEventLoop(); assert(cnt == 45); }); 496 runEventLoop(); 497 } 498 499 Observer!(ElementType!ObsA) chain(ObsA, ObsB)(ObsA a, ObsB b) 500 if (isObservable!ObsA && isObservable!ObsA && is (ElementType!ObsA : ElementType!ObsB)) 501 { 502 return new Observer!(ElementType!ObsA) 503 ((self) 504 { 505 auto event = createManualEvent(); 506 a.subscribe( 507 &self.onNext, 508 &self.onError, // TODO: instead we need to unsubscribe from b, then propagate error 509 () { event.emit(); if (event.emitCount() == 2) self.onCompleted(); } 510 ); 511 b.subscribe( 512 &self.onNext, 513 &self.onError, // TODO: instead we need to unsubscribe from a, then propagate error 514 () { event.emit(); if (event.emitCount() == 2) self.onCompleted(); } 515 ); 516 }); 517 } 518 519 unittest 520 { 521 auto s = new FiberSubject!int; 522 int cnt = 0; 523 iota(0,10).observe.chain(s).subscribe((e){ cnt += e; },null,() {exitEventLoop(); assert(cnt == 76); }); 524 s.onNext(15); 525 s.onNext(16); 526 s.onCompleted(); 527 runEventLoop(); 528 } 529 class ConcatMap(T, alias func) 530 { 531 alias AppliedReturnType = typeof(func(T.init)); 532 alias ElementType!(AppliedReturnType) E; 533 alias void delegate (E) NextFunc; 534 alias void delegate(ConcatMap!(T, func)) SubscribeFunc; 535 alias Subscription!(ConcatMap!(T, func)) Sub; 536 private 537 { 538 bool subscribed = false; 539 SubscribeFunc subscriber; 540 Sub[] subscriptions; 541 shared int cnt; 542 } 543 this(SubscribeFunc sFunc) 544 { 545 this.subscriber = sFunc; 546 this.cnt = 1; 547 } 548 private void subCompleted() 549 { 550 if (cnt.atomicOp!"-="(1) != 0) 551 return; 552 foreach (s; subscriptions) 553 if (s.onCompleted) 554 s.onCompleted(); 555 subscriptions.destroy(); 556 } 557 void onNext(T e) 558 { 559 cnt.atomicOp!"+="(1); 560 auto obs = func(e); 561 obs.subscribe(null,null,&this.subCompleted); 562 foreach (s; subscriptions) 563 obs.subscribe(s.onNext,s.onError); // here we need to call the class' onCompleted which should only trigger after all obs.onCompleted are triggered and a call has been made to onCompleted from the underlying subject. 564 } 565 void onError(Exception e) 566 { 567 foreach (s; subscriptions) 568 if (s.onError) 569 s.onError(e); 570 } 571 void onCompleted() 572 { 573 subCompleted(); 574 } 575 Sub subscribe(NextFunc onNext = null, ErrorFunc onError = null, CompletedFunc onCompleted = null) 576 { 577 auto sub = Sub(this,onNext,onError,onCompleted); 578 subscriptions ~= sub; 579 if (!subscribed) 580 { 581 subscribed = true; 582 subscriber(this); 583 } 584 return sub; 585 } 586 void dispose(Sub s) 587 { 588 // todo: make work 589 } 590 } 591 template concatMap(alias fun) 592 { 593 alias func = unaryFun!fun; 594 595 auto concatMap(Obs)(Obs o) 596 if (isObservable!Obs && isObservable!(typeof(func(ElementType!(Obs).init)))) 597 { 598 return new ConcatMap!(ElementType!Obs,func)( 599 (self) 600 { 601 o.subscribe( 602 &self.onNext, 603 &self.onError, 604 &self.onCompleted); 605 }); 606 } 607 auto concatMap(Obs)(Obs o) 608 if (isObservable!Obs && isInputRange!(typeof(func(ElementType!(Obs).init)))) 609 { 610 return o.concatMap!(a => func(a).observe())(); 611 } 612 } 613 unittest 614 { 615 auto s = iota(0,5).observe; 616 size_t c; 617 s.concatMap!(s => iota(0,5).observe() ).subscribe((p){c += p;},null,(){ exitEventLoop(); assert(c == 50); }); 618 runEventLoop(); 619 } 620 unittest 621 { 622 auto s = iota(0,5).observe; 623 size_t c; 624 s.concatMap!(s => iota(0,5) ).subscribe((p){c += p;},null,(){ exitEventLoop(); assert(c == 50); }); 625 runEventLoop(); 626 } 627 //class ObservableTimer 628 //{ 629 // alias void delegate (int) NextFunc; 630 // private 631 // { 632 // uint delay, period; 633 // } 634 // this(uint d, uint p) 635 // { 636 // this.delay = d; 637 // this.period = p; 638 // } 639 // void subscribe(NextFunc onNext = null, ErrorFunc onError = null, CompletedFunc onCompleted = null) 640 // { 641 // if (onNext is null) 642 // return; 643 // runTask( 644 // { 645 // sleep(this.delay); 646 // ulong cnt; 647 // while (true) 648 // { 649 // onNext(cnt); 650 // cnt++; 651 // sleep(period); 652 // } 653 // }); 654 // } 655 //} 656 //auto timer(int delay, int period) 657 //{ 658 // return new ObservableTimer(delay,period); 659 //} 660 template tap(alias fun) 661 { 662 alias func = unaryFun!fun; 663 664 auto tap(Obs)(Obs o) 665 if (isObservable!Obs && is(typeof(func(ElementType!(Obs).init)) : void)) 666 { 667 return new Observer!(ElementType!Obs)( 668 (self) 669 { 670 o.subscribe( 671 (e){ func(e); self.onNext(e); }, 672 &self.onError, 673 &self.onCompleted); 674 }); 675 } 676 //auto tap(Obs)(Obs o) 677 // if (isObservable!Obs && isObservable!(typeof(func(ElementType!(Obs).init)))) 678 //{ 679 // return new Observer!(ElementType!Obs)( 680 // (self) 681 // { 682 // // the o observable might trigger onCompleted before the observable returned by the func completes 683 // // in which case one or more onNext will be called AFTER onCompleted 684 // o.subscribe( 685 // (e){ func(e).subscribe(null,$self.onError,(){self.onNext(e);}); }, 686 // &self.onError, 687 // &self.onCompleted); 688 // }); 689 //} 690 } 691 /*unittest 692 { 693 auto s = iota(0,5).observe; 694 size_t c; 695 s.tap!((i) { c += i; }).subscribe((i){ c+= i; },null,(){ exitEventLoop(); assert(c == 20); }); 696 runEventLoop(); 697 }*/ 698 //class Catch(Obs) : Subject!(ElementType!Obs) 699 //{ 700 // private 701 // { 702 // bool subscribed = false; 703 // Obs o; 704 // } 705 // this(Obs o) 706 // { 707 // this.o = o; 708 // } 709 // override void onError(Exception e) 710 // { 711 // onCompleted(); 712 // } 713 // override void subscribe(NextFunc onNext = null, ErrorFunc onError = null, CompletedFunc onCompleted = null) 714 // { 715 // super.subscribe(onNext,onError,onCompleted); 716 // if (!subscribed) 717 // { 718 // o.subscribe(&this.onNext,&this.onError,&this.onCompleted); 719 // subscribed = true; 720 // } 721 // } 722 //} 723 //Catch!(Obs) catchException(Obs)(Obs o) 724 // if (isObservable!Obs) 725 //{ 726 // return new Catch!(Obs)(o); 727 //}