Estoy trabajando con los últimos Angular y Typescript y RxJS 5.

Angular actualmente ha hecho de RxJS una necesidad. He usado C# principalmente durante más de 10 años y estoy muy acostumbrado a la sintaxis Linq/Lambdas/fluida que supongo formó la base de Reactive.

Me gustaría hacer una llamada Http get con un valor de tiempo de espera creciente en el reintento, pero tengo problemas para ver cómo hacerlo y sigo manteniendo todo en la tubería (sin usar el estado externo).

Entiendo que puedo hacer esto, pero simplemente volverá a intentarlo usando el mismo valor de tiempo de espera.

1
myHttpObservable.timeout(1000).retry(2);

La documentación para RxJS ha sido deficiente en muchos lugares y al preguntar sobre ella aquí solo se eliminó mi pregunta, lo cual es triste ... así que me vi obligado a buscar en la fuente.

¿Hay alguna manera de volver a intentar con una duración de tiempo de espera cada vez mayor de una manera que mantenga el estado en la canalización? Además, quiero un tiempo de espera inicial en el primer intento.

Probé cosas similares a esta al principio, pero me di cuenta de que el operador retryWhen confuso no está realmente destinado a lo que quiero:

1 
2 
3
myHttpObservable.timeout(1000).retryWhen((theSubject: Observable<Error>) => {
 return  aNewMyObservableCreatedinHere.timeout(2000);  
});

Sé que podría lograr esto usando un estado externo, pero básicamente estoy buscando una solución elegante que, creo, es lo que buscan con el estilo reactivo de programación.

Uno de los mayores problemas con RxJs5 en este momento es la documentación. Está realmente fragmentado y aún no está a la altura de la versión anterior. Al mirar la documentación de RxJs4, puede ver que.retryWhen()
ya tiene un ejemplo para construir un backoff exponencial disponible que se puede migrar fácilmente a RxJs5:

1 
2 
3 
4 
5 
6 
7 
8
Rx.Observable.throw(new Error('splut'))
  .retryWhen(attempts => Rx.Observable.range(1, 3)
    .zip(attempts, i => i)
    .mergeMap(i => {
      console.log("delay retry by " + i + " second(s)");
      return Rx.Observable.timer(i * 1000);
    })
  ).subscribe();

Hay un operador en backoff-rxjsel paquete npm para tratar este caso llamado retryBackoffLo describí en el artículo en blog.angularindepth.com , pero en pocas palabras es esto:

1 
2 
3 
4
source.pipe(
  retryWhen(errors => errors.pipe(
      concatMap((error, iteration) => 
          timer(Math.pow(2, iteration) * initialInterval, maxInterval))));

Aquí están las fuentes para la versión más personalizable.

Basado en información adicional en los comentarios de mi respuesta anterior, he estado experimentando con el .expand()operador para resolver su requisito:

Quiero obtener una duración de tiempo de espera = X y, si se agota,
volver a intentarlo con tiempo de espera = X+1

El siguiente fragmento comienza con el tiempo de espera = 500 y, por cada intento, el tiempo de espera se incrementa con el intento * 500 hasta que se alcanzan los intentos máximos o el resultado se recupera correctamente:

1 
2 
3 
4 
5 
6 
7 
8 
9 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
31 
32 
33 
34 
35 
36 
37 
38 
39 
40 
41 
42
getWithExpandingTimeout('http://reaches-max-attempts', 3)
  .subscribe(
    res => console.log(res),
    err => console.log('ERROR: ' + err.message)
  );
getWithExpandingTimeout('http://eventually-finishes-within-timeout', 10)
  .subscribe(
    res => console.log(res),
    err => console.log('ERROR: ' + err.message)
  );
/*
retrieve the given url and keep trying with an expanding 
timeout until it succeeds or maxAttempts has been reached
*/
function getWithExpandingTimeout(url, maxAttempts) {
  return get(url, 1)
    .expand(res => {
      if(res.error) {
        const nextAttempt = 1 + res.attempt;
        if(nextAttempt > maxAttempts) {
          // too many retry attempts done
          return Rx.Observable.throw(new Error(`Max attempts reached to retrieve url ${url}: ${res.error.message}`));
        }
        
        // retry next attempt
        return get(url, nextAttempt);
      }
      return Rx.Observable.empty(); // done with retrying, result has been emitted
    })
    .filter(res => !res.error);
}
/*
 retrieve info from server with timeout based on attempt
 NOTE: does not errors the stream so expand() keeps working
*/
function get(url, attempt) {
  return Rx.Observable.of(`result for ${url} returned after #${attempt} attempts`)
    .do(() => console.log(`starting attempt #${attempt} to retrieve ${url}`))
    .delay(5 * 500)
    .timeout(attempt * 500)
    .catch(error => Rx.Observable.of({ attempt: attempt, error }));
}
1
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>

Como funciona esto

Cada valor que se produce aguas arriba o por el .expand()operador se emite aguas abajo Y se utiliza como entrada para el .expand()operador. Esto continúa hasta que no se emiten más valores. Al aprovechar este comportamiento, la .get()función se vuelve a intentar con un intento creciente cuando la emisión contiene un .errorvalor en su interior.

La .get()función no arroja un error porque, de lo contrario, debemos detectarlo en el .expand()o la recursión se romperá inesperadamente.

Cuando se exceden los intentos máximos , el .expand()operador genera un error que detiene la recursividad. Cuando no hay ninguna .errorpropiedad presente en la emisión, esperamos que sea un resultado exitoso y emitamos un Observable vacío, deteniendo la recursión.

NOTA: Utiliza .filter()para eliminar todas las emisiones en función de la .errorpropiedad porque todos los valores producidos por .expand()también se emiten aguas abajo, pero estos .errorvalores son de estado interno.

Utilizo delayWhenpara crear una función de notificación que hace que se retryWhenemitan retrasos crecientes después de cada error. Puede elegir diferentes series temporales cambiando la fórmula utilizada para calcular el retraso entre reintentos. Consulte las dos fórmulas de ejemplo a continuación para las estrategias de reintento de retroceso exponencial y geométrico. Tenga en cuenta también cómo limito el número de reintentos y arrojo un error si alcanzo ese límite.

1 
2 
3 
4 
5 
6 
7 
8 
9 
10 
11 
12 
13 
14 
15 
16 
17 
18
const initialDelay = 1000;
const maxRetries = 4;
throwError('oops')
  .pipe(
    retryWhen(errors =>
      errors.pipe(
        delayWhen((_,i) => {
          // const delay = (i+1)*initialDelay;            // geometric
          const delay = Math.pow(2,i)*initialDelay;       // exponential
          console.log(`retrying after ${delay} msec...`);
          return timer(delay);
        }),
        take(maxRetries),
        concat(throwError('number of retries exceeded')))))
  .subscribe(
    x => console.log('value:', x),
    e => console.error('error:', e)
  );

Creo que el retroceso no es correcto. Este retraso es el error de la siguiente solicitud, no la solicitud de la siguiente solicitud, el tiempo de espera en el rxjs dice que la fuente pasa el tiempo es mayor que el tiempo de espera, quiero cambiar el tiempo de espera de la solicitud, no agregar el retraso entre el error y la siguiente solicitud, ¿quién sabe cómo hacerlo?

Así:
Solicitud: 8 s, tiempo de espera: 3 s, cambio de tiempo de espera: 3 * 2 * retryIndex
esperar: esta solicitud se volverá a intentar dos veces y finalmente pasará 8 s.

Retroceso: no utilice el tiempo de espera, esta solicitud se realizará correctamente la primera vez. Si la solicitud tiene otro error, la siguiente solicitud se enviará después 3 * 2 * retryIndex.