1 ///
2 module redisd.client;
3 
4 import std.typecons;
5 import std.stdio;
6 import std.algorithm;
7 import std.range;
8 import std..string;
9 
10 import std.meta: allSatisfy;
11 import std.traits: isSomeString;
12 
13 import std.experimental.logger;
14 
15 import redisd.connection;
16 import redisd.codec;
17 
18 import url: URL, parseURL;
19 
20 private immutable bufferSize = 4*1024;
21 
22 ///
23 class NotAuthenticated : Exception {
24     ///
25     this(string msg) {
26         super(msg);
27     }
28 }
29 /// client API
30 class Client {
31 
32     private {
33         URL             _url;
34         ConnectionMaker _connection_maker;
35         Connection      _connection;
36         Decoder         _input_stream;
37     }
38     /// Constructor
39     this(string url="redis://localhost:6379", ConnectionMaker connectionMaker=&stdConnectionMaker) {
40         _url = parseURL(url);
41         _connection_maker = connectionMaker;
42         _connection = _connection_maker();
43         _connection.connect(_url);
44         _input_stream = new Decoder();
45         if ( _url.pass ) {
46             auto v = execCommand("AUTH", _url.pass);
47             if ( v.svar != "OK" ) {
48                 throw new NotAuthenticated("Can't authenticate");
49             }
50         }
51     }
52 
53     private void reconnect() {
54         _connection = _connection_maker();
55         _connection.connect(_url);
56         _input_stream = new Decoder;
57         if (_url.pass) {
58             auto auth = execCommand("AUTH", _url.pass);
59             if (auth.svar != "OK") {
60                 throw new NotAuthenticated("Can't authenticate");
61             }
62         }
63     }
64     /// Build redis command from command name and args.
65     /// All args must be of type string.
66     RedisdValue makeCommand(A...)(A args) {
67         static assert(allSatisfy!(isSomeString, A), "all command parameters must be of type string");
68         return redisdValue(tuple(args));
69     }
70     /// Build and execute redis transaction from command array.
71     RedisdValue transaction(RedisdValue[] commands) {
72         RedisdValue[] results;
73         RedisdValue r = this.execCommand("MULTI");
74         foreach (c; commands) {
75             exec(c);
76         }
77         r = this.execCommand("EXEC");
78         return r;
79     }
80     /// build and execute redis pipeline from commands array.
81     RedisdValue[] pipeline(RedisdValue[] commands) {
82         immutable(ubyte)[] data = commands.map!encode.join();
83         _connection.send(data);
84         RedisdValue[] response;
85         while (response.length < commands.length) {
86             debug(redisd) tracef("response length=%d, commands.length=%d", response.length, commands.length);
87             auto b = _connection.recv(bufferSize);
88             if (b.length == 0) {
89                 break;
90             }
91             _input_stream.put(b);
92             while(true) {
93                 auto v = _input_stream.get();
94                 if (v.type == ValueType.Incomplete) {
95                     break;
96                 }
97                 response ~= v;
98                 if (v.type == ValueType.Error
99                         && cast(string) v.svar[4 .. 18] == "Protocol error") {
100                     debug (redisd)
101                         trace("reopen connection");
102                     _connection.close();
103                     reconnect();
104                     return response;
105                 }
106             }
107         }
108         return response;
109     }
110 
111     private RedisdValue exec(RedisdValue command) {
112         RedisdValue response;
113         _connection.send(command.encode);
114         while (true) {
115             auto b = _connection.recv(bufferSize);
116             if (b.length == 0) {
117                 break;
118             }
119             _input_stream.put(b);
120             response = _input_stream.get();
121             if (response.type != ValueType.Incomplete) {
122                 break;
123             }
124         }
125         if (response.type == ValueType.Error && response.svar[4 .. 18] == "Protocol error") {
126             _connection.close();
127             debug (redisd)
128                 trace("reopen connection");
129             reconnect();
130         }
131         if (response.type == ValueType.Error && response.svar[0..6] == "NOAUTH" ) {
132             throw new NotAuthenticated("Auth required");
133         }
134         return response;
135     }
136     /// build and execute single redis command.
137     RedisdValue execCommand(A...)(A args) {
138         immutable(ubyte)[][] data;
139         RedisdValue request = makeCommand(args);
140         RedisdValue response;
141         _connection.send(request.encode);
142         while(true) {
143             auto b = _connection.recv(bufferSize);
144             if ( b.length == 0 ) {
145                 // error, timeout or something bad
146                 break;
147             }
148             _input_stream.put(b);
149             response = _input_stream.get();
150             if (response.type != ValueType.Incomplete) {
151                 break;
152             }
153         }
154         if ( response.type == ValueType.Error && 
155                 cast(string)response.svar[4..18] == "Protocol error") {
156             _connection.close();
157             debug(redisd) trace("reopen connection");
158             reconnect();
159         }
160         if (response.type == ValueType.Error && response.svar[0 .. 6] == "NOAUTH") {
161             throw new NotAuthenticated("Auth required");
162         }
163         return response;
164     }
165     /// Simple key/value set
166     RedisdValue set(K, V)(K k, V v) {
167         return execCommand("SET", k, v);
168     }
169     /// Simple key/value get
170     RedisdValue get(K)(K k) {
171         return execCommand("GET", k);
172     }
173     /// Consume reply
174     RedisdValue read() {
175         RedisdValue response;
176         response = _input_stream.get();
177         while(response.type == ValueType.Incomplete) {
178             auto b = _connection.recv(bufferSize);
179             if (b.length == 0) {
180                 break;
181             }
182             _input_stream.put(b);
183             response = _input_stream.get();
184             if (response.type != ValueType.Incomplete) {
185                 break;
186             }
187         }
188         if (response.type == ValueType.Error && cast(string) response.svar[4 .. 18]
189                 == "Protocol error") {
190             _connection.close();
191             debug (redisd)
192                 trace("reopen connection");
193             reconnect();
194         }
195         return response;
196     }
197 
198     void close() {
199         _connection.close();
200     }
201 }