1 module observe;
2 
3 import vibe.d;
4 import vibe.core.core;
5 import vibe.core.concurrency;
6 import vibe.core.drivers.native;	// had to add this to get rdmd working
7 import vibe.core.drivers.win32;		// had to add this to get rdmd working
8 import std.traits;
9 import std.functional;
10 import std.algorithm;
11 import std.range;
12 import std.stdio;
13 import core.atomic;
14 
15 template ElementType(R)
16 {
17 	static if (isInputRange!R)
18 	{
19 		alias std.range.ElementType!R ElementType;
20 	} else
21 		alias ParameterTypeTuple!(R.NextFunc)[0] ElementType;
22 }
23 alias ElementType PayloadType;
24 template isObservable(O)
25 {
26     enum bool isObservable = is(typeof(
27     (inout int = 0)
28     {
29         O o = void;       // can define a observable object
30         alias ElementType!O E;
31         o.subscribe((scope E e){},(Exception e){},(){});	// has subscribe function
32     }));
33 }
34 alias void delegate (Exception) ErrorFunc;
35 alias void delegate () CompletedFunc;
36 
37 struct Subscription(Observable)
38 {
39 	alias void delegate (ElementType!Observable ) NextFunc;
40 	Observable o;
41 	NextFunc onNext;
42 	ErrorFunc onError;
43 	CompletedFunc onCompleted;
44 	bool active = true;
45 	void dispose()
46 	{
47 		o.dispose(this);
48 	}
49 }
50 class ObservableRange(R)
51 {
52 	alias void delegate (ElementType!R) NextFunc;
53 	alias Subscription!(ObservableRange!R) Sub;
54 	private
55 	{
56 		Sub[] subscriptions;
57 		R range;
58 	}
59 	this(R r)
60 	{
61 		this.range = r;
62 	}
63 	Sub subscribe(NextFunc onNext = null, ErrorFunc onError = null, CompletedFunc onCompleted = null)
64 	{
65 		auto sub = Sub(this,onNext,onError,onCompleted);
66 		subscriptions ~= sub;
67 		if (onNext is null)	// why is this again?
68 			return sub;
69 		runTask(
70 		{
71 			foreach(e; this.range)
72 			{
73 				foreach(s; subscriptions)
74 				{
75 					if (s.onNext && s.active)
76 						s.onNext(e);
77 				}
78 				// todo: we can probably do this a little bit more efficient
79 				// remove inactive subscriptions
80 				subscriptions = subscriptions.remove!(s => !s.active).array();
81 			}
82 			foreach(s; subscriptions)
83 				if (s.onCompleted && s.active)
84 					s.onCompleted();
85 			subscriptions.length = 0;
86 		});
87 		return sub;
88 	}
89 	void dispose(Sub s)
90 	{
91 		// simply mark the subscription in the list as inactive
92 		auto idx = subscriptions.countUntil(s);
93 		if (idx > 0)
94 			subscriptions[idx].active = false;
95 	}
96 }
97 ObservableRange!R observe(R)(R r)
98 	if (isInputRange!R || isIterable!R)
99 {
100 	return new ObservableRange!R(r);
101 }
102 alias observe observeRange;
103 unittest
104 {
105 	assert(isObservable!(typeof(iota(1, 10).observe)));
106 }
107 class Subject(E)
108 {
109 	alias E ElementType;
110 	alias void delegate (ElementType) NextFunc;
111 	alias Subscription!(Subject!E) Sub;
112 	private
113 	{
114 		Sub[] subscriptions;
115 	}
116 	void onNext(ElementType e)
117 	{
118 		foreach(s; subscriptions)
119 			if (s.onNext && s.active)
120 				s.onNext(e);
121 	}
122 	void onError(Exception e)
123 	{
124 		foreach(s; subscriptions)
125 			if (s.onError && s.active)
126 				s.onError(e);
127 	}
128 	void onCompleted()
129 	{
130 		foreach(s; subscriptions)
131 			if (s.onCompleted && s.active)
132 				s.onCompleted();
133 	}
134 	Sub subscribe(NextFunc onNext = null, ErrorFunc onError = null, CompletedFunc onCompleted = null)
135 	{
136 		auto sub = Sub(this,onNext,onError,onCompleted);
137 		subscriptions ~= sub;
138 		return sub;
139 	}
140 	void dispose(Sub s)
141 	{
142 		subscriptions = subscriptions.remove!(s => !s.active).array();
143 		s.onCompleted();
144 	}
145 }
146 unittest
147 {
148 	assert(is(ElementType!(Subject!int) : int));
149 	assert(isObservable!(Subject!int));
150 }
151 unittest
152 {
153 	auto s = new Subject!int;
154 	s.subscribe((i) => assert(i == 1));
155 	s.onNext(1);
156 	assert(is(ElementType!(Subject!int) : int));
157 }
158 class Observer(E) : Subject!E
159 {
160 	alias void delegate(Observer!(E)) Subscriber;
161 	private
162 	{
163 		bool subscribed = false;
164 		Subscriber subscriber;
165 	}
166 	this(Subscriber subscriber)
167 	{
168 		this.subscriber = subscriber;
169 	}
170 	override Sub subscribe(NextFunc onNext = null, ErrorFunc onError = null, CompletedFunc onCompleted = null)
171 	{
172 		auto sub = super.subscribe(onNext,onError,onCompleted);
173 		if (!subscribed)
174 		{
175 			subscribed = true;
176 			subscriber(this);
177 		}
178 		return sub;
179 	}
180 }
181 // here we test some basic UFCS and unaryFun behaviour that we rely on
182 template foo(alias fun)
183 {
184 	auto foo(T)(T n)
185 	{
186 		alias func = unaryFun!fun;
187 		return func(n);
188 	}
189 }
190 unittest
191 {
192 	assert(foo!(a => a*a)(4) == 16);
193 	assert(4.foo!(a => a*a) == 16);
194 	assert(foo!"a*a"(4) == 16);
195 	assert(4.foo!"a*a" == 16);
196 	auto square = (int n) { return n*n; };
197 	assert(foo!square(4) == 16);
198 	assert(4.foo!square == 16);
199 }
200 template map(alias fun)
201 {
202 	auto map(Obs)(Obs observable)
203 		if (isObservable!Obs)
204 	{
205 		alias func = unaryFun!fun;
206 		alias AppliedReturnType = typeof(func(ElementType!(Obs).init));
207 
208 		return new Observer!(AppliedReturnType)
209 			((self){
210 				observable.subscribe(
211 					(e){ self.onNext(func(e)); },
212 					&self.onError,
213 					&self.onCompleted
214 				);
215 			});
216 	}
217 }
218 //alias map oMap;
219 unittest
220 {
221 	auto s = new Subject!int;
222 	auto p = s.map!(s => "abs");
223 	p.subscribe((s){ assert(s == "abs");});
224 	s.onNext(1);
225 }
226 unittest
227 {
228 	bool completed = false;
229 	auto s = new Subject!int;
230 	auto q = s.map!(s => s*s).map!(s => s*2);
231 	q.subscribe((s) { assert(s == 18);},null,(){completed = true;});
232 	s.onNext(3);
233 	s.onCompleted();
234 	assert(completed == true);
235 }
236 unittest
237 {
238 	bool completed = false;
239 	auto s = new Subject!int;
240 	auto sub = s.subscribe((v) { assert(v == 18);},null,(){completed = true;});
241 	s.onNext(18);
242 	sub.dispose();
243 	assert(completed == true);
244 }
245 
246 class FiberSubject(E) : Subject!E
247 {
248 	alias void delegate (E) NextFunc;
249 	private
250 	{
251 		Task t;
252 		bool started = false;
253 	}
254 	void join()
255 	{
256 		forceTask();
257 		t.join();
258 	}
259 	private void forceTask()
260 	{
261 		if (started)
262 			return;
263 		t = runTask(
264 		{
265 			bool running = true;
266 			while (running) {
267 				receive(
268 					(E e)
269 					{
270 						foreach(s; subscriptions)
271 							if (s.onNext && s.active)
272 								s.onNext(e);
273 					},
274 					(Exception e)
275 					{
276 						foreach(s; subscriptions)
277 							if (s.onError && s.active)
278 								s.onError(e);
279 						running = false;
280 					},
281 					(bool)
282 					{
283 						foreach(s; subscriptions)
284 							if (s.onCompleted && s.active)
285 								s.onCompleted();
286 						running = false;
287 					}
288 				);
289 			}
290 		});
291 		started = true;
292 	}
293 	override void onNext(E e)
294 	{
295 		forceTask();
296 		t.send(e);
297 	}
298 	override void onError(Exception e)
299 	{
300 		forceTask();
301 		t.send(e);
302 	}
303 	override void onCompleted()
304 	{
305 		forceTask();
306 		t.send(true);
307 	}
308 }
309 
310 unittest
311 {
312 	auto s = new FiberSubject!int;
313 	s.subscribe((s) { assert(s == 14);},null,(){writeln("completed");exitEventLoop();});
314 	s.onNext(14);
315 	s.onCompleted();
316 	runEventLoop();
317 }
318 unittest
319 {
320 	auto s = new FiberSubject!int;
321 	auto q = s.map!(s=>s*s);
322 	q.subscribe((s) { assert(s == 28*28);},null,(){exitEventLoop();});
323 	s.onNext(28);
324 	s.onCompleted();
325 	runEventLoop();
326 }
327 
328 Observer!(Tuple!(ElementType!ObsA,ElementType!ObsB)) zip(ObsA, ObsB)(ObsA a, ObsB b)
329 	if (isObservable!ObsA && isObservable!ObsB)
330 {
331 	auto result = new Observer!(Tuple!(ElementType!ObsA,ElementType!ObsB))
332 		((self){
333 			import vibe.core.sync;
334 			auto event = createManualEvent();
335 			int emitA = event.emitCount();
336 			int emitB = emitA;
337 			ElementType!ObsA objA;
338 			bool complete = false;
339 			a.subscribe((ae) 
340 				{
341 					objA = ae;
342 					event.emit();
343 					emitA = event.wait(emitA + 1);
344 				},
345 				(e) { self.onError(e); },	// TODO: unsubscribe from underlying observable
346 				() { complete = true; event.emit(); self.onCompleted(); }
347 			);
348 			b.subscribe((be) 
349 				{
350 					emitB = event.wait(emitB) + 1;
351 					if (!complete)
352 						self.onNext(Tuple!(ElementType!ObsA,ElementType!ObsB)(objA,be));
353 					event.emit();
354 				},
355 				(e) { self.onError(e); }, // TODO: unsubscribe from underlying observable
356 				() { event.emit(); self.onCompleted(); }
357 			);
358 		});
359 
360 	return result;
361 }
362 
363 unittest
364 {
365 	auto s1 = new FiberSubject!int;
366 	auto s2 = new FiberSubject!string;
367 	auto p = s1.zip(s2);
368 	p.subscribe((t) { assert(t[0] == 1); assert(t[1] == "abc"); },null,(){exitEventLoop();});
369 	s1.onNext(1);
370 	s2.onNext("abc");
371 	s2.onNext("abc");
372 	s1.onNext(1);
373 	s1.onCompleted();
374 	s2.onCompleted();
375 	runEventLoop();
376 }
377 
378 unittest
379 {
380 	auto range = iota(0, 10);
381 	auto s1 = new FiberSubject!int;
382 	auto s = range.observe.zip(s1);
383 	s.subscribe((t) { assert(t == tuple(0,5)); },null,(){exitEventLoop();});
384 	s1.onNext(5);
385 	s1.onCompleted();
386 	runEventLoop();
387 }
388 
389 template filter(alias fun)
390 {
391 	alias func = unaryFun!fun;
392 	auto filter(Obs)(Obs observable)
393 		if (is(typeof(func(ElementType!(Obs).init)) : bool) && isObservable!Obs)
394 	{
395 		return new Observer!(ElementType!Obs)
396 			((self){
397 				observable.subscribe
398 				(
399 					(i) { if (func(i)) self.onNext(i); },
400 					&self.onError,
401 					&self.onCompleted
402 				);
403 			});
404 	}
405 }
406 unittest
407 {
408 	// somehow this has tasks still running at exit
409 	auto range = iota(1, 10);
410 	range
411 		.observe
412 		.map!((i){return i*i;})
413 		.filter!(i=>i==81)
414 		.subscribe((i) { assert(i == 81); },null,(){exitEventLoop();});
415 	runEventLoop();
416 }
417 
418 unittest
419 {
420 	auto range = iota(1, 3);
421 	auto s = range.observe.filter!(i=>i % 2 == 0);
422 	s.subscribe((t) { assert(t == 2); },null,(){exitEventLoop();});
423 	runEventLoop();
424 }
425 
426 unittest
427 {
428 	auto event = createManualEvent();
429 	assert(event.wait(40) == 0);
430 	event.emit();
431 	assert(event.wait(40) == 1);
432 }
433 unittest
434 {
435 	auto event = createManualEvent();
436 	event.emit();
437 	assert(event.wait(0) == 1);
438 }
439 class Fork(E) : Observer!E
440 {
441 	private
442 	{
443 		shared int legs;
444 		int size;
445 		ReturnType!createManualEvent event;
446 	}
447 	this(int legs, Subscriber s)
448 	{
449 		super(s);
450 		this.legs = legs;
451 		size = legs;
452 		event = createManualEvent();
453 	}
454 	override void onCompleted()
455 	{
456 		int cnt = event.emitCount();
457 		while (cnt+size > legs)
458 			cnt = event.wait(cnt);
459 		super.onCompleted();
460 	}
461 	override void onNext(E e)
462 	{
463 		event.wait(legs);
464 		legs.atomicOp!"-="(1);
465 		runTask(
466 			{
467 				foreach(s; subscriptions)
468 					if (s.onNext && s.active)
469 						s.onNext(e);
470 				legs.atomicOp!"+="(2);
471 				event.emit();
472 			});
473 	}
474 }
475 
476 Fork!(ElementType!Obs) fork(Obs)(Obs o, int legs)
477 	if (isObservable!Obs)
478 {
479 	import std.exception;
480 	enforce(legs > 0, "can't fork without legs");
481 	return new Fork!(ElementType!Obs)(legs,
482 		(self){
483 			o.subscribe(
484 				&self.onNext,
485 				&self.onError,
486 				&self.onCompleted
487 			);
488 		});
489 }
490 
491 
492 unittest
493 {
494 	int cnt = 0;
495 	iota(0,10).observe.fork(10).subscribe((e){ cnt += e; },null,() {exitEventLoop(); assert(cnt == 45); });
496 	runEventLoop();
497 }
498 
499 Observer!(ElementType!ObsA) chain(ObsA, ObsB)(ObsA a, ObsB b)
500 	if (isObservable!ObsA && isObservable!ObsA && is (ElementType!ObsA : ElementType!ObsB))
501 {
502 	return new Observer!(ElementType!ObsA)
503 		((self)
504 		{
505 			auto event = createManualEvent();
506 			a.subscribe(
507 				&self.onNext,
508 				&self.onError,	// TODO: instead we need to unsubscribe from b, then propagate error
509 				() { event.emit(); if (event.emitCount() == 2) self.onCompleted(); }
510 			);
511 			b.subscribe(
512 				&self.onNext,
513 				&self.onError,	// TODO: instead we need to unsubscribe from a, then propagate error
514 				() { event.emit(); if (event.emitCount() == 2) self.onCompleted(); }
515 			);
516 		});
517 }
518 
519 unittest
520 {
521 	auto s = new FiberSubject!int;
522 	int cnt = 0;
523 	iota(0,10).observe.chain(s).subscribe((e){ cnt += e; },null,() {exitEventLoop(); assert(cnt == 76); });
524 	s.onNext(15);
525 	s.onNext(16);
526 	s.onCompleted();
527 	runEventLoop();
528 }
529 class ConcatMap(T, alias func)
530 {
531 	alias AppliedReturnType = typeof(func(T.init));
532 	alias ElementType!(AppliedReturnType) E;
533 	alias void delegate (E) NextFunc;
534 	alias void delegate(ConcatMap!(T, func)) SubscribeFunc;
535 	alias Subscription!(ConcatMap!(T, func)) Sub;
536 	private
537 	{
538 		bool subscribed = false;
539 		SubscribeFunc subscriber;
540 		Sub[] subscriptions;
541 		shared int cnt;
542 	}
543 	this(SubscribeFunc sFunc)
544 	{
545 		this.subscriber = sFunc;
546 		this.cnt = 1;
547 	}
548 	private void subCompleted()
549 	{
550 		if (cnt.atomicOp!"-="(1) != 0)
551 			return;
552 		foreach (s; subscriptions)
553 			if (s.onCompleted)
554 				s.onCompleted();
555 		subscriptions.destroy();
556 	}
557 	void onNext(T e)
558 	{
559 		cnt.atomicOp!"+="(1);
560 		auto obs = func(e);
561 		obs.subscribe(null,null,&this.subCompleted);
562 		foreach (s; subscriptions)
563 			obs.subscribe(s.onNext,s.onError); // here we need to call the class' onCompleted which should only trigger after all obs.onCompleted are triggered and a call has been made to onCompleted from the underlying subject.
564 	}
565 	void onError(Exception e)
566 	{
567 		foreach (s; subscriptions)
568 			if (s.onError)
569 				s.onError(e);
570 	}
571 	void onCompleted()
572 	{
573 		subCompleted();
574 	}
575 	Sub subscribe(NextFunc onNext = null, ErrorFunc onError = null, CompletedFunc onCompleted = null)
576 	{
577 		auto sub = Sub(this,onNext,onError,onCompleted);
578 		subscriptions ~= sub;
579 		if (!subscribed)
580 		{
581 			subscribed = true;
582 			subscriber(this);
583 		}
584 		return sub;
585 	}
586 	void dispose(Sub s)
587 	{
588 		// todo: make work
589 	}
590 }
591 template concatMap(alias fun)
592 {
593 	alias func = unaryFun!fun;
594 
595 	auto concatMap(Obs)(Obs o)
596 		if (isObservable!Obs && isObservable!(typeof(func(ElementType!(Obs).init))))
597 	{
598 		return new ConcatMap!(ElementType!Obs,func)(
599 			(self)
600 			{
601 				o.subscribe(
602 					&self.onNext,
603 					&self.onError,
604 					&self.onCompleted);
605 			});
606 	}
607 	auto concatMap(Obs)(Obs o)
608 		if (isObservable!Obs && isInputRange!(typeof(func(ElementType!(Obs).init))))
609 	{
610 		return o.concatMap!(a => func(a).observe())();
611 	}
612 }
613 unittest
614 {
615 	auto s = iota(0,5).observe;
616 	size_t c;
617 	s.concatMap!(s => iota(0,5).observe() ).subscribe((p){c += p;},null,(){ exitEventLoop(); assert(c == 50); });
618 	runEventLoop();
619 }
620 unittest
621 {
622 	auto s = iota(0,5).observe;
623 	size_t c;
624 	s.concatMap!(s => iota(0,5) ).subscribe((p){c += p;},null,(){ exitEventLoop(); assert(c == 50); });
625 	runEventLoop();
626 }
627 //class ObservableTimer
628 //{
629 //	alias void delegate (int) NextFunc;
630 //	private
631 //	{
632 //		uint delay, period;
633 //	}
634 //	this(uint d, uint p)
635 //	{
636 //		this.delay = d;
637 //		this.period = p;
638 //	}
639 //	void subscribe(NextFunc onNext = null, ErrorFunc onError = null, CompletedFunc onCompleted = null)
640 //	{
641 //		if (onNext is null)
642 //			return;
643 //		runTask(
644 //		{
645 //			sleep(this.delay);
646 //			ulong cnt;
647 //			while (true)
648 //			{
649 //				onNext(cnt);
650 //				cnt++;
651 //				sleep(period);
652 //			}
653 //		});
654 //	}	
655 //}
656 //auto timer(int delay, int period)
657 //{
658 //	return new ObservableTimer(delay,period);
659 //}
660 template tap(alias fun)
661 {
662 	alias func = unaryFun!fun;
663 
664 	auto tap(Obs)(Obs o)
665 		if (isObservable!Obs && is(typeof(func(ElementType!(Obs).init)) : void))
666 	{
667 		return new Observer!(ElementType!Obs)(
668 			(self)
669 			{
670 				o.subscribe(
671 					(e){ func(e); self.onNext(e); },
672 					&self.onError,
673 					&self.onCompleted);
674 			});
675 	}
676 	//auto tap(Obs)(Obs o)
677 	//	if (isObservable!Obs && isObservable!(typeof(func(ElementType!(Obs).init))))
678 	//{
679 	//	return new Observer!(ElementType!Obs)(
680 	//		(self)
681 	//		{
682 	//			// the o observable might trigger onCompleted before the observable returned by the func completes
683 	//			// in which case one or more onNext will be called AFTER onCompleted
684 	//			o.subscribe(
685 	//				(e){ func(e).subscribe(null,$self.onError,(){self.onNext(e);}); },
686 	//				&self.onError,
687 	//				&self.onCompleted);
688 	//		});
689 	//}
690 }
691 /*unittest
692 {
693 	auto s = iota(0,5).observe;
694 	size_t c;
695 	s.tap!((i) { c += i; }).subscribe((i){ c+= i; },null,(){ exitEventLoop(); assert(c == 20); });
696 	runEventLoop();
697 }*/
698 //class Catch(Obs) : Subject!(ElementType!Obs)
699 //{
700 //	private
701 //	{
702 //		bool subscribed = false;
703 //		Obs o;
704 //	}
705 //	this(Obs o)
706 //	{
707 //		this.o = o;
708 //	}
709 //	override void onError(Exception e)
710 //	{
711 //		onCompleted();
712 //	}
713 //	override void subscribe(NextFunc onNext = null, ErrorFunc onError = null, CompletedFunc onCompleted = null)
714 //	{
715 //		super.subscribe(onNext,onError,onCompleted);
716 //		if (!subscribed)
717 //		{
718 //			o.subscribe(&this.onNext,&this.onError,&this.onCompleted);
719 //			subscribed = true;
720 //		}
721 //	}
722 //}
723 //Catch!(Obs) catchException(Obs)(Obs o)
724 //	if (isObservable!Obs)
725 //{
726 //	return new Catch!(Obs)(o);
727 //}