main.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "io"
  6. "os"
  7. "runtime"
  8. "sync"
  9. )
  10. func main() {
  11. op := flag.String("op", "sum", "Operation to be executed")
  12. column := flag.Int("col", 1, "CSV column on which to execute operation")
  13. flag.Parse()
  14. if err := run(flag.Args(), *op, *column, os.Stdout); err != nil {
  15. _, err := fmt.Fprintln(os.Stderr, err)
  16. if err != nil {
  17. return
  18. }
  19. os.Exit(1)
  20. }
  21. }
  22. func run(filenames []string, op string, column int, out io.Writer) error {
  23. var opFunc statsFunc
  24. if len(filenames) == 0 {
  25. return ErrNoFiles
  26. }
  27. if column < 1 {
  28. return fmt.Errorf("%w: %d", ErrInvalidColumn, column)
  29. }
  30. switch op {
  31. case "sum":
  32. opFunc = sum
  33. case "avg":
  34. opFunc = avg
  35. default:
  36. return fmt.Errorf("%w: %s", ErrInvalidOperation, op)
  37. }
  38. consolidate := make([]float64, 0)
  39. resCh := make(chan []float64)
  40. errCh := make(chan error)
  41. doneCh := make(chan struct{})
  42. filesCh := make(chan string)
  43. wg := sync.WaitGroup{}
  44. // Loop through all files sending them through the channel
  45. // so each one will be processed when a worker is available
  46. go func() {
  47. defer close(filesCh)
  48. for _, fname := range filenames {
  49. filesCh <- fname
  50. }
  51. }()
  52. for i := 0; i < runtime.NumCPU(); i++ {
  53. wg.Add(1)
  54. go func() {
  55. defer wg.Done()
  56. for fname := range filesCh {
  57. // Open the file for reading
  58. f, err := os.Open(fname)
  59. if err != nil {
  60. errCh <- fmt.Errorf("cannot open file: %w", err)
  61. return
  62. }
  63. // Parse the CSV into a slice of float64 numbers
  64. data, err := csv2float(f, column)
  65. if err != nil {
  66. errCh <- err
  67. }
  68. if err := f.Close(); err != nil {
  69. errCh <- err
  70. }
  71. resCh <- data
  72. }
  73. }()
  74. } // End for
  75. go func() {
  76. wg.Wait()
  77. close(doneCh)
  78. }()
  79. for {
  80. select {
  81. case err := <-errCh:
  82. return err
  83. case data := <-resCh:
  84. consolidate = append(consolidate, data...)
  85. case <-doneCh:
  86. _, err := fmt.Fprintln(out, opFunc(consolidate))
  87. return err
  88. }
  89. }
  90. }